1. withColumn
简介
#
在PySpark中,withColumn 是一个用于在DataFrame中添加、替换或修改列的方法。 它返回一个新的DataFrame,并不会修改原始的DataFrame。
-
用法
- withColumn 方法有两个参数,
DataFrame.withColumn(colName, col)
:- colName:要添加或修改的列名。
- col:要添加的列的表达式,可以是一个现有列的操作或一个新的列的表达式。
- withColumn 方法有两个参数,
-
示例
- 下面是一些使用 withColumn 方法的示例:
- 添加新列
假设我们有一个包含员工数据的DataFrame,我们想要添加一个新的列 bonus,其值为 salary 的10%。
from pyspark.sql import SparkSession from pyspark.sql.functions import col # 创建 SparkSession spark = SparkSession.builder.appName("example").getOrCreate() # 示例 DataFrame data = [("Alice", 3000), ("Bob", 4000), ("Cathy", 5000)] columns = ["name", "salary"] df = spark.createDataFrame(data, columns) # 添加新列 df_with_bonus = df.withColumn("bonus", col("salary") * 0.1) df_with_bonus.show()
- 输出:
+-----+------+-----+ | name|salary|bonus| +-----+------+-----+ |Alice| 3000| 300.0| | Bob| 4000| 400.0| |Cathy| 5000| 500.0| +-----+------+-----+
- 修改现有列
我们可以修改现有的列,例如将 salary 列的值增加1000。
df_with_incremented_salary = df.withColumn("salary", col("salary") + 1000) df_with_incremented_salary.show()
- 输出:
+-----+------+ | name|salary| +-----+------+ |Alice| 4000| | Bob| 5000| |Cathy| 6000| +-----+------+
- 使用复杂表达式
我们可以使用更复杂的表达式。例如,根据条件添加新列。
from pyspark.sql.functions import when df_with_bonus_flag = df.withColumn("bonus_flag", when(col("salary") > 4000, "High").otherwise("Low")) df_with_bonus_flag.show()
- 输出:
+-----+------+----------+ | name|salary|bonus_flag| +-----+------+----------+ |Alice| 3000| Low| | Bob| 4000| Low| |Cathy| 5000| High| +-----+------+----------+
-
使用内置函数
PySpark提供了许多内置函数,可以与 withColumn 一起使用。下面是一些常用的内置函数示例:
- 使用 lit 添加常量列(literal value)
from pyspark.sql.functions import lit df_with_constant = df.withColumn("constant", lit(100)) df_with_constant.show()
- 输出:
+-----+------+--------+ | name|salary|constant| +-----+------+--------+ |Alice| 3000| 100| | Bob| 4000| 100| |Cathy| 5000| 100| +-----+------+--------+
- 使用 concat 连接字符串列
from pyspark.sql.functions import concat df_with_fullname = df.withColumn("full_name", concat(col("name"), lit(" Doe"))) df_with_fullname.show()
- 输出:
+-----+------+---------+ | name|salary|full_name| +-----+------+---------+ |Alice| 3000|Alice Doe| | Bob| 4000| Bob Doe| |Cathy| 5000|Cathy Doe| +-----+------+---------+
2. 案例:日期处理 #
- 构建 DataFrame,并手动创建记录(Rows)
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col
from pyspark.sql.types import (Row, StructType,
StructField, StringType)
# 创建DF Schema
my_schema = StructType([
StructField("ID", StringType()),
StructField("EventDate", StringType())])
# 构造 Demo 数据
my_rows = [Row("123", "04/05/2020"),
Row("124", "4/5/2020"),
Row("125", "04/5/2020"),
Row("126", "4/05/2020")]
# 创建 Spark Session
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkDemo") \
.getOrCreate()
# 创建 DataFrame
my_rdd = spark.sparkContext.parallelize(my_rows, 2)
my_df = spark.createDataFrame(my_rdd, my_schema)
# 展示数据
my_df.printSchema()
my_df.show()
- 输出
root
|-- ID: string (nullable = true)
|-- EventDate: string (nullable = true)
+---+----------+
| ID| EventDate|
+---+----------+
|123|04/05/2020|
|124| 4/5/2020|
|125| 04/5/2020|
|126| 4/05/2020|
+---+----------+
- 自定义简单的数据转换
# 自定义数据类别转换
def to_date_df(df, fmt, fld):
return df.withColumn(fld, to_date(col(fld), fmt))
# 应用数据转换,并展示
new_df = to_date_df(my_df, "M/d/y", "EventDate")
new_df.printSchema()
new_df.show()
to_date_df(df, fmt, fld)
是我们自定义的转换程序。- 将输入的 DataFrame
df
的fld
列转换成以fmt
格式的date
类别,并返回一个新的 DataFrame。
- 将输入的 DataFrame
- 下面试运行后的输出:
root
|-- ID: string (nullable = true)
|-- EventDate: date (nullable = true)
+---+----------+
| ID| EventDate|
+---+----------+
|123|2020-04-05|
|124|2020-04-05|
|125|2020-04-05|
|126|2020-04-05|
+---+----------+
- 可以看到
EventDate
由之前的string
成功地转换成了date
类别
3. 列操作 #
- 首先,读取文件后,我们可以看到有许多不同的方法获取列(columns)的数据。
- 然后,可以看到使用两种不同的方法将多个列组合成了一个新的列,并赋予一个新的列名。
from pyspark.sql.functions import *
# 读取数据
airlinesDF = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema","true") \
.option("samplingRatio", "0.0001") \
.load("/databricks-datasets/airlines/part-00000")
# 比较通过不同的方法选择 column,结果是一样的
airlinesDF.select("Origin", "Dest", "Distance" ).show(10)
airlinesDF.select(column("Origin"), col("Dest"), airlinesDF.Distance).show(10)
# 将 Year、Month、DayofMonth 组合成 Date
airlinesDF.select("Origin", "Dest", "Distance", "Year","Month","DayofMonth").show(10)
# 方法1 使用 SQL Experssions
airlinesDF.selectExpr("Origin", "Dest", "Distance", expr("to_date(concat(Year,Month,DayofMonth),'yyyyMMdd') as FlightDate")).show(10)
# 方法2 使用 Column Object Experssions
airlinesDF.select("Origin", "Dest", "Distance", to_date(concat("Year","Month","DayofMonth"),"yyyyMMdd").alias("FlightDate")).show(10)
4. 常用的数据转换 #
当 DataFrame 有schema 时,可以使用下面的方法做转换
agg(*exprs)
cov(col1, col2)
crosstab(col1, col2)
cube(*cols)
filter(condition)
groupBy(*cols)
join(other, on=None, how=None)
orderBy(*cols, **kwargs)
replace(to_replace, value, subset)
rollup(*cols)
select(*cols)
sort(*cols, **kwargs)
where(condition)
withColumn(colName, col)
avg(*cols)
max(*cols)
mean(*cols)
min(*cols)
sum(*cols)
5. 非结构化数据 #
- 了解到如何对非结构化的数据(如 log 文件)进行处理
from pyspark.sql import *
from pyspark.sql.functions import regexp_extract, substring_index
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("LogFileDemo") \
.getOrCreate()
# 读取非结构化 log 文件
file_df = spark.read.text("data/apache_logs.txt")
file_df.printSchema() # 查看 schema,只有一个叫 value 的column,对应的数据类型是 string
# 使用正则表达式抽取:IP、client、user、datetime、cmd、request、protocol、status、bytes、referrer、userAgent 信息
log_reg = r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+) "(\S+)" "([^"]*)'
logs_df = file_df.select(regexp_extract('value', log_reg, 1).alias('ip'),
regexp_extract('value', log_reg, 4).alias('date'),
regexp_extract('value', log_reg, 6).alias('request'),
regexp_extract('value', log_reg, 10).alias('referrer'))
logs_df.printSchema() # 新的df 有四个 field,分别是:ip,date, request,和 referrer。对应的数据类型都是 string
logs_df \
.where("trim(referrer) != '-' ") \
.withColumn("referrer", substring_index("referrer", "/", 3)) \
.groupBy("referrer") \
.count() \
.show(100, truncate=False)
- 在
select
中使用了regexp_extract
并结合正则表达式,可以帮助我们从非结构化的数据中提取我们想要的信息。 - 提取完后就可以对其进行数据转换或分析了。
6. 自定义转换函数 #
- 我们想要把数据里的性别数据改成标准化的格式,标准化的格式为 Female、Male、和 Unknown。
- 结合
withColumn
和自定义函数实现数据的转换。 - 代码中使用两种方式实现,第一种方法无法使用 SQL Expression 以及没有注册在 Catalog 里。
import re
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
from lib.logger import Log4J
# 自定义数据转换函数,将性别归一化至Female、Male、和 Unknown
def parse_gender(gender):
female_pattern = r"^f$|f.m|w.m"
male_pattern = r"^m$|ma|m.l"
if re.search(female_pattern, gender.lower()):
return "Female"
elif re.search(male_pattern, gender.lower()):
return "Male"
else:
return "Unknown"
# 定义 Spark Session
spark = SparkSession \
.builder \
.appName("UDF Demo") \
.master("local[2]") \
.getOrCreate()
logger = Log4J(spark)
# 读取文件
survey_df = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("data/survey.csv")
survey_df.show(10)
# 方法一
# 需要将自定义的函数注册到 driver 中,使其成为一个 UDF
parse_gender_udf = udf(parse_gender, returnType=StringType())
# 注意这种方法不会将函数注册到 catalog 里,下面不会打印出自定义的函数名字
logger.info("Catalog Entry:")
[logger.info(r) for r in spark.catalog.listFunctions() if "parse_gender" in r.name]
# withColumn 方法只会修改提供列名的数据,保持其他数据不变
survey_df2 = survey_df.withColumn("Gender", parse_gender_udf("Gender"))
survey_df2.show(10)
# 方法2
# 将自定义函数注册为 SQL 函数,同时还会注册在在 catalog 里
spark.udf.register("parse_gender_udf", parse_gender, StringType())
# 下面会从 catalog 里找到我们自定义的函数,并打印出来
logger.info("Catalog Entry:")
[logger.info(r) for r in spark.catalog.listFunctions() if "parse_gender" in r.name]
# 使用 string Expression
survey_df3 = survey_df.withColumn("Gender", expr("parse_gender_udf(Gender)"))
survey_df3.show(10)
7. 案例:融合用法 #
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, when, expr
from pyspark.sql.types import *
from lib.logger import Log4j
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Misc Demo") \
.master("local[2]") \
.getOrCreate()
logger = Log4j(spark)
data_list = [("Ravi", "28", "1", "2002"),
("Abdul", "23", "5", "81"), # 1981
("John", "12", "12", "6"), # 2006
("Rosy", "7", "8", "63"), # 1963
("Abdul", "23", "5", "81")] # 1981
# 创建 DataFrame
raw_df = spark.createDataFrame(data_list) \
.toDF("name", "day", "month", "year") \
.repartition(3)
# 查看 schema
raw_df.printSchema()
final_df = raw_df.withColumn("id", monotonically_increasing_id()) \
.withColumn("day", col("day").cast(IntegerType())) \
.withColumn("month", col("month").cast(IntegerType())) \
.withColumn("year", col("year").cast(IntegerType())) \
.withColumn("year", when (col("year") < 20, col("year") + 2000)
.when(col("year") < 100, col("year") + 1900)
.otherwise(col("year"))) \
.withColumn("dob", expr("to_date(concat(day,'/',month,'/',year), 'd/M/y')")) \
.drop("day", "month", "year") \
.dropDuplicates(["name", "dob"]) \
.sort(col("dob").desc())
final_df.show()