跳过正文

Spark系列 - 数据转换(II)

·2008 字·
大数据 Spark DataFrame SparkSQL withColumn Transformations
EZ
作者
EZ
Take it EZ!
目录
Spark - 这篇文章属于一个选集。
§ 7: 本文

1. withColumn 简介
#

在PySpark中,withColumn 是一个用于在DataFrame中添加替换修改列的方法。 它返回一个新的DataFrame,并不会修改原始的DataFrame。

  • 用法

    • withColumn 方法有两个参数,DataFrame.withColumn(colName, col)
      • colName:要添加或修改的列名。
      • col:要添加的列的表达式,可以是一个现有列的操作或一个新的列的表达式。
  • 示例

    • 下面是一些使用 withColumn 方法的示例:
    1. 添加新列

    假设我们有一个包含员工数据的DataFrame,我们想要添加一个新的列 bonus,其值为 salary 的10%。

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    
    # 创建 SparkSession
    spark = SparkSession.builder.appName("example").getOrCreate()
    
    # 示例 DataFrame
    data = [("Alice", 3000), ("Bob", 4000), ("Cathy", 5000)]
    columns = ["name", "salary"]
    
    df = spark.createDataFrame(data, columns)
    
    # 添加新列
    df_with_bonus = df.withColumn("bonus", col("salary") * 0.1)
    df_with_bonus.show()
    
    • 输出:
    +-----+------+-----+
    | name|salary|bonus|
    +-----+------+-----+
    |Alice|  3000| 300.0|
    |  Bob|  4000| 400.0|
    |Cathy|  5000| 500.0|
    +-----+------+-----+
    
    1. 修改现有列

    我们可以修改现有的列,例如将 salary 列的值增加1000。

    df_with_incremented_salary = df.withColumn("salary", col("salary") + 1000)
    df_with_incremented_salary.show()
    
    • 输出:
    +-----+------+
    | name|salary|
    +-----+------+
    |Alice|  4000|
    |  Bob|  5000|
    |Cathy|  6000|
    +-----+------+
    
    1. 使用复杂表达式

    我们可以使用更复杂的表达式。例如,根据条件添加新列。

    from pyspark.sql.functions import when
    
    df_with_bonus_flag = df.withColumn("bonus_flag", when(col("salary") > 4000, "High").otherwise("Low"))
    df_with_bonus_flag.show()
    
    • 输出:
    +-----+------+----------+
    | name|salary|bonus_flag|
    +-----+------+----------+
    |Alice|  3000|       Low|
    |  Bob|  4000|       Low|
    |Cathy|  5000|      High|
    +-----+------+----------+
    
  • 使用内置函数

    PySpark提供了许多内置函数,可以与 withColumn 一起使用。下面是一些常用的内置函数示例:

    1. 使用 lit 添加常量列(literal value)
    from pyspark.sql.functions import lit
    
    df_with_constant = df.withColumn("constant", lit(100))
    df_with_constant.show()
    
    • 输出:
    +-----+------+--------+
    | name|salary|constant|
    +-----+------+--------+
    |Alice|  3000|     100|
    |  Bob|  4000|     100|
    |Cathy|  5000|     100|
    +-----+------+--------+
    
    1. 使用 concat 连接字符串列
    from pyspark.sql.functions import concat
    
    df_with_fullname = df.withColumn("full_name", concat(col("name"), lit(" Doe")))
    df_with_fullname.show()
    
    • 输出:
    +-----+------+---------+
    | name|salary|full_name|
    +-----+------+---------+
    |Alice|  3000|Alice Doe|
    |  Bob|  4000|  Bob Doe|
    |Cathy|  5000|Cathy Doe|
    +-----+------+---------+
    

2. 案例:日期处理
#

  • 构建 DataFrame,并手动创建记录(Rows)
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col
from pyspark.sql.types import (Row, StructType, 
                               StructField, StringType)

# 创建DF Schema
my_schema = StructType([
    StructField("ID", StringType()),
    StructField("EventDate", StringType())])

# 构造 Demo 数据
my_rows = [Row("123", "04/05/2020"),
           Row("124", "4/5/2020"),
           Row("125", "04/5/2020"),
           Row("126", "4/05/2020")]

# 创建 Spark Session
spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkDemo") \
        .getOrCreate()

# 创建 DataFrame
my_rdd = spark.sparkContext.parallelize(my_rows, 2)
my_df = spark.createDataFrame(my_rdd, my_schema)

# 展示数据
my_df.printSchema()
my_df.show()
  • 输出
root
 |-- ID: string (nullable = true)
 |-- EventDate: string (nullable = true)

+---+----------+
| ID| EventDate|
+---+----------+
|123|04/05/2020|
|124|  4/5/2020|
|125| 04/5/2020|
|126| 4/05/2020|
+---+----------+
  • 自定义简单的数据转换
# 自定义数据类别转换
def to_date_df(df, fmt, fld):
    return df.withColumn(fld, to_date(col(fld), fmt))

# 应用数据转换,并展示
new_df = to_date_df(my_df,  "M/d/y", "EventDate")
new_df.printSchema()
new_df.show()
  • to_date_df(df, fmt, fld) 是我们自定义的转换程序。
    • 将输入的 DataFrame dffld 列转换成以 fmt 格式的 date 类别,并返回一个新的 DataFrame。
  • 下面试运行后的输出:
root
 |-- ID: string (nullable = true)
 |-- EventDate: date (nullable = true)

+---+----------+
| ID| EventDate|
+---+----------+
|123|2020-04-05|
|124|2020-04-05|
|125|2020-04-05|
|126|2020-04-05|
+---+----------+
  • 可以看到 EventDate 由之前的 string 成功地转换成了 date 类别

3. 列操作
#

  • 首先,读取文件后,我们可以看到有许多不同的方法获取列(columns)的数据。
  • 然后,可以看到使用两种不同的方法将多个列组合成了一个新的列,并赋予一个新的列名。
from pyspark.sql.functions import *

# 读取数据
airlinesDF = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema","true") \
    .option("samplingRatio", "0.0001") \
    .load("/databricks-datasets/airlines/part-00000")

# 比较通过不同的方法选择 column,结果是一样的
airlinesDF.select("Origin", "Dest", "Distance" ).show(10)
airlinesDF.select(column("Origin"), col("Dest"), airlinesDF.Distance).show(10)

# 将 Year、Month、DayofMonth 组合成 Date
airlinesDF.select("Origin", "Dest", "Distance", "Year","Month","DayofMonth").show(10)
# 方法1 使用 SQL Experssions 
airlinesDF.selectExpr("Origin", "Dest", "Distance", expr("to_date(concat(Year,Month,DayofMonth),'yyyyMMdd') as FlightDate")).show(10)
# 方法2 使用 Column Object Experssions
airlinesDF.select("Origin", "Dest", "Distance", to_date(concat("Year","Month","DayofMonth"),"yyyyMMdd").alias("FlightDate")).show(10)

4. 常用的数据转换
#

当 DataFrame 有schema 时,可以使用下面的方法做转换

agg(*exprs)
cov(col1, col2)
crosstab(col1, col2)
cube(*cols)
filter(condition)
groupBy(*cols)
join(other, on=None, how=None)
orderBy(*cols, **kwargs)
replace(to_replace, value, subset)
rollup(*cols)

select(*cols)
sort(*cols, **kwargs)
where(condition)
withColumn(colName, col)
avg(*cols)
max(*cols)
mean(*cols)
min(*cols)
sum(*cols)

5. 非结构化数据
#

  • 了解到如何对非结构化的数据(如 log 文件)进行处理
from pyspark.sql import *
from pyspark.sql.functions import regexp_extract, substring_index

spark = SparkSession \
    .builder \
    .master("local[3]") \
    .appName("LogFileDemo") \
    .getOrCreate()

# 读取非结构化 log 文件
file_df = spark.read.text("data/apache_logs.txt")
file_df.printSchema()   # 查看 schema,只有一个叫 value 的column,对应的数据类型是 string

# 使用正则表达式抽取:IP、client、user、datetime、cmd、request、protocol、status、bytes、referrer、userAgent 信息
log_reg = r'^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\S+) "(\S+)" "([^"]*)'

logs_df = file_df.select(regexp_extract('value', log_reg, 1).alias('ip'),
                         regexp_extract('value', log_reg, 4).alias('date'),
                         regexp_extract('value', log_reg, 6).alias('request'),
                         regexp_extract('value', log_reg, 10).alias('referrer'))
logs_df.printSchema()   # 新的df 有四个 field,分别是:ip,date, request,和 referrer。对应的数据类型都是 string

logs_df \
    .where("trim(referrer) != '-' ") \
    .withColumn("referrer", substring_index("referrer", "/", 3)) \
    .groupBy("referrer") \
    .count() \
    .show(100, truncate=False)
  • select 中使用了 regexp_extract 并结合正则表达式,可以帮助我们从非结构化的数据中提取我们想要的信息。
  • 提取完后就可以对其进行数据转换或分析了。

6. 自定义转换函数
#

  • 我们想要把数据里的性别数据改成标准化的格式,标准化的格式为 Female、Male、和 Unknown。
  • 结合 withColumn 和自定义函数实现数据的转换。
  • 代码中使用两种方式实现,第一种方法无法使用 SQL Expression 以及没有注册在 Catalog 里。
import re
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

from lib.logger import Log4J


# 自定义数据转换函数,将性别归一化至Female、Male、和 Unknown
def parse_gender(gender):
    female_pattern = r"^f$|f.m|w.m"
    male_pattern = r"^m$|ma|m.l"
    if re.search(female_pattern, gender.lower()):
        return "Female"
    elif re.search(male_pattern, gender.lower()):
        return "Male"
    else:
        return "Unknown"


# 定义 Spark Session
spark = SparkSession \
    .builder \
    .appName("UDF Demo") \
    .master("local[2]") \
    .getOrCreate()

logger = Log4J(spark)

# 读取文件
survey_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("data/survey.csv")

survey_df.show(10)

# 方法一
# 需要将自定义的函数注册到 driver 中,使其成为一个 UDF
parse_gender_udf = udf(parse_gender, returnType=StringType())
# 注意这种方法不会将函数注册到 catalog 里,下面不会打印出自定义的函数名字
logger.info("Catalog Entry:")
[logger.info(r) for r in spark.catalog.listFunctions() if "parse_gender" in r.name]
# withColumn 方法只会修改提供列名的数据,保持其他数据不变
survey_df2 = survey_df.withColumn("Gender", parse_gender_udf("Gender"))
survey_df2.show(10)

# 方法2
# 将自定义函数注册为 SQL 函数,同时还会注册在在 catalog 里
spark.udf.register("parse_gender_udf", parse_gender, StringType())
# 下面会从 catalog 里找到我们自定义的函数,并打印出来
logger.info("Catalog Entry:")
[logger.info(r) for r in spark.catalog.listFunctions() if "parse_gender" in r.name]
# 使用 string Expression
survey_df3 = survey_df.withColumn("Gender", expr("parse_gender_udf(Gender)"))
survey_df3.show(10)

7. 案例:融合用法
#

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id, when, expr
from pyspark.sql.types import *

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .appName("Misc Demo") \
        .master("local[2]") \
        .getOrCreate()

    logger = Log4j(spark)

    data_list = [("Ravi", "28", "1", "2002"),
                 ("Abdul", "23", "5", "81"),  # 1981
                 ("John", "12", "12", "6"),  # 2006
                 ("Rosy", "7", "8", "63"),  # 1963
                 ("Abdul", "23", "5", "81")]  # 1981

    # 创建 DataFrame
    raw_df = spark.createDataFrame(data_list) \
              .toDF("name", "day", "month", "year") \
              .repartition(3)
    # 查看 schema
    raw_df.printSchema()

    final_df = raw_df.withColumn("id", monotonically_increasing_id()) \
        .withColumn("day", col("day").cast(IntegerType())) \
        .withColumn("month", col("month").cast(IntegerType())) \
        .withColumn("year", col("year").cast(IntegerType())) \
        .withColumn("year", when (col("year") < 20, col("year") + 2000)
                    .when(col("year") < 100, col("year") + 1900)
                    .otherwise(col("year"))) \
        .withColumn("dob", expr("to_date(concat(day,'/',month,'/',year), 'd/M/y')")) \
        .drop("day", "month", "year") \
        .dropDuplicates(["name", "dob"]) \
        .sort(col("dob").desc())

    final_df.show()
Spark - 这篇文章属于一个选集。
§ 7: 本文

相关文章

Spark系列 - 数据转换(I)
·2750 字
大数据 Spark DataFrame SparkSQL Transformations
本章主要讨论 Spark 的数据转换。
Spark系列 - 数据存储
·1316 字
大数据 Spark DataFrame SparkSQL 分布式数据库
本章主要讨论 pySpark 的数据存储。
Spark系列 - 数据读取
·2470 字
大数据 Spark DataFrame SparkSQL 分布式数据库
本章主要讨论 pySpark 的数据读取。
Spark系列 - 配置Spark
·1559 字
大数据 Spark 日志 Log4J 硬编码 软编码
本文将详细介绍如何在 Spark 项目中配置 Log4J 日志模块,以及配置 Spark Session。
Spark系列 - 本地环境的搭建
·544 字
大数据 Spark 环境安装
本篇文章将介绍如何在本地 Mac 环境下搭建 Spark,包括安装 JDK、配置环境变量、安装和配置 Spark 以及安装 PySpark。
Spark系列 - 初识大数据
·2952 字
大数据 Spark Hadoop 数据库
这篇文章初步介绍了大数据、Hadoop 和 Spark 这三个关键方面。本文提供了一个简要的概述,为读者进一步了解大数据处理提供了基础。