跳过正文

Spark系列 - 数据聚合

·1170 字·
大数据 Spark DataFrame 聚合
EZ
作者
EZ
Take it EZ!
目录
Spark - 这篇文章属于一个选集。
§ 8: 本文

在大数据处理中,聚合操作是不可或缺的。本篇文章将介绍Spark中常用的聚合函数及其应用,并探讨窗口函数的使用。

1. 简单聚合
#

Spark提供了丰富的聚合函数,如:avg(), count(), max(), min(), sum()等。

  • 示例:
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as f
    
    from lib.logger import Log4J
    
    if __name__ == "__main__":
        spark = SparkSession \
            .builder \
            .appName("Agg Demo") \
            .master("local[2]") \
            .getOrCreate()
    
        logger = Log4J(spark)
    
        invoice_df = spark.read \
            .format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .load("data/invoices.csv")
    
        invoice_df.select(f.count("*").alias("Count *"),
                          f.sum("Quantity").alias("TotalQuantity"),
                          f.avg("UnitPrice").alias("AvgPrice"),
                          f.countDistinct("InvoiceNo").alias("CountDistinct")
                          ).show()
    
  • 输出:
    +-------+-------------+-----------------+-------------+
    |Count *|TotalQuantity|         AvgPrice|CountDistinct|
    +-------+-------------+-----------------+-------------+
    | 541909|      5176450|4.611113626088498|        25900|
    +-------+-------------+-----------------+-------------+
    
  • 也可以使用SQL表达式进行聚合
    invoice_df.selectExpr(
            "count(1) as `count 1`",
            "count(StockCode) as `count field`",
            "sum(Quantity) as TotalQuantity",
            "avg(UnitPrice) as AvgPrice"
        ).show()
    
  • 输出:
    +-------+-----------+-------------+----------------+
    |count 1|count field|TotalQuantity|        AvgPrice|
    +-------+-----------+-------------+----------------+
    | 541909|     541908|      5176450|4.61111362608295|
    +-------+-----------+-------------+----------------+
    

2. 分组聚合
#

  • 使用 groupBy() 方法可以对数据进行分组,并应用聚合函数。
  • agg()是专门设计用来获取聚合函数列表的。
  • 示例:
    summary_df = invoice_df \
        .groupBy("Country", "InvoiceNo") \
        .agg(f.sum("Quantity").alias("TotalQuantity"),
             f.round(f.sum(f.expr("Quantity * UnitPrice")), 2).alias("InvoiceValue"),
             f.expr("round(sum(Quantity * UnitPrice),2) as InvoiceValueExpr")
             )
    
    summary_df.show()
    
  • 输出:
+--------------+---------+-------------+------------+----------------+
|       Country|InvoiceNo|TotalQuantity|InvoiceValue|InvoiceValueExpr|
+--------------+---------+-------------+------------+----------------+
|United Kingdom|   536446|          329|      440.89|          440.89|
|United Kingdom|   536508|          216|      155.52|          155.52|
|United Kingdom|   537018|           -3|         0.0|             0.0|
|United Kingdom|   537401|          -24|         0.0|             0.0|
|United Kingdom|   537811|           74|      268.86|          268.86|
|United Kingdom|  C537824|           -2|       -14.9|           -14.9|
|United Kingdom|   538895|          370|      247.38|          247.38|
|United Kingdom|   540453|          341|      302.45|          302.45|
|United Kingdom|   541291|          217|      305.81|          305.81|
|United Kingdom|   542551|           -1|         0.0|             0.0|
|United Kingdom|   542576|           -1|         0.0|             0.0|
|United Kingdom|   542628|            9|      132.35|          132.35|
|United Kingdom|   542886|          199|      320.51|          320.51|
|United Kingdom|   542907|           75|      313.85|          313.85|
|United Kingdom|   543131|          134|       164.1|           164.1|
|United Kingdom|   543189|          102|      153.94|          153.94|
|United Kingdom|   543265|           -4|         0.0|             0.0|
|        Cyprus|   544574|          173|      320.69|          320.69|
|United Kingdom|   545077|           24|       10.08|           10.08|
|United Kingdom|   545300|          116|      323.16|          323.16|
+--------------+---------+-------------+------------+----------------+
only showing top 20 rows
  • 也可以使用SQL表达式:
    invoice_df.createOrReplaceTempView("sales")
    summary_sql = spark.sql("""
          SELECT Country, InvoiceNo,
                sum(Quantity) as TotalQuantity,
                round(sum(Quantity*UnitPrice),2) as InvoiceValue
          FROM sales
          GROUP BY Country, InvoiceNo""")
    
    summary_sql.show()
    
  • 输出:
    +--------------+---------+-------------+------------+
    |       Country|InvoiceNo|TotalQuantity|InvoiceValue|
    +--------------+---------+-------------+------------+
    |United Kingdom|   536446|          329|      440.89|
    |United Kingdom|   536508|          216|      155.52|
    |United Kingdom|   537018|           -3|         0.0|
    |United Kingdom|   537401|          -24|         0.0|
    |United Kingdom|   537811|           74|      268.86|
    |United Kingdom|  C537824|           -2|       -14.9|
    |United Kingdom|   538895|          370|      247.38|
    |United Kingdom|   540453|          341|      302.45|
    |United Kingdom|   541291|          217|      305.81|
    |United Kingdom|   542551|           -1|         0.0|
    |United Kingdom|   542576|           -1|         0.0|
    |United Kingdom|   542628|            9|      132.35|
    |United Kingdom|   542886|          199|      320.51|
    |United Kingdom|   542907|           75|      313.85|
    |United Kingdom|   543131|          134|       164.1|
    |United Kingdom|   543189|          102|      153.94|
    |United Kingdom|   543265|           -4|         0.0|
    |        Cyprus|   544574|          173|      320.69|
    |United Kingdom|   545077|           24|       10.08|
    |United Kingdom|   545300|          116|      323.16|
    +--------------+---------+-------------+------------+
    only showing top 20 rows
    
  • 另一个分组聚合的示例:
NumInvoices = f.countDistinct("InvoiceNo").alias("NumInvoices")
    TotalQuantity = f.sum("Quantity").alias("TotalQuantity")
    InvoiceValue = f.expr("round(sum(Quantity * UnitPrice),2) as InvoiceValue")

    exSummary_df = invoice_df \
        .withColumn("InvoiceDate", f.to_date(f.col("InvoiceDate"), "dd-MM-yyyy H.mm")) \
        .where("year(InvoiceDate) == 2010") \
        .withColumn("WeekNumber", f.weekofyear(f.col("InvoiceDate"))) \
        .groupBy("Country", "WeekNumber") \
        .agg(NumInvoices, TotalQuantity, InvoiceValue)

    exSummary_df.sort("Country", "WeekNumber").show()
  • 输出:
+---------------+----------+-----------+-------------+------------+
|        Country|WeekNumber|NumInvoices|TotalQuantity|InvoiceValue|
+---------------+----------+-----------+-------------+------------+
|      Australia|        48|          1|          107|      358.25|
|      Australia|        49|          1|          214|       258.9|
|      Australia|        50|          2|          133|      387.95|
|        Austria|        50|          2|            3|      257.04|
|        Bahrain|        51|          1|           54|      205.74|
|        Belgium|        48|          1|          528|       346.1|
|        Belgium|        50|          2|          285|      625.16|
|        Belgium|        51|          2|          942|      838.65|
|Channel Islands|        49|          1|           80|      363.53|
|         Cyprus|        50|          1|          917|     1590.82|
|        Denmark|        49|          1|          454|      1281.5|
|           EIRE|        48|          7|         2822|     3147.23|
|           EIRE|        49|          5|         1280|      3284.1|
|           EIRE|        50|          5|         1184|     2321.78|
|           EIRE|        51|          5|           95|      276.84|
|        Finland|        50|          1|         1254|       892.8|
|         France|        48|          4|         1299|     2808.16|
|         France|        49|          9|         2303|     4527.01|
|         France|        50|          6|          529|      537.32|
|         France|        51|          5|          847|     1702.87|
+---------------+----------+-----------+-------------+------------+
only showing top 20 rows
  • 保存 DataFrame,用于下面的窗口聚合
    exSummary_df.coalesce(1) \
            .write \
            .format("parquet") \
            .mode("overwrite") \
            .save("output")
    

3. 窗口聚合
#

  • 窗口函数允许我们在数据的一个分区内进行聚合操作,例如累积和、排名等。
  • 常见的窗口函数:lead(), lag(), rank(), dense_rank(), cume_dist()等。
  • 示例:
    • 读取上面的文件
    • 确定需要partition 的列
    • 确定排序条件
    • 确定窗口的起点和终点
      • Window.unboundedPreceding:从开头取所有的行;改成-2,就是前面两行再加当前行。
    • 最后添加列时,用 sum()over() 来做分组后的累计求和。
      • sum() 也可以用其他函数来代替,例如:dense_rank()lead()、和lag等等。
from pyspark.sql import Window

summary_df = spark.read.parquet("data/*.parquet")

running_total_window = Window.partitionBy("Country") \
  .orderBy("WeekNumber") \
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

summary_df.withColumn("RunningTotal",
                    f.sum("InvoiceValue").over(running_total_window)) \
  .show()
  • 输出:
+---------------+----------+-----------+-------------+------------+------------------+
|        Country|WeekNumber|NumInvoices|TotalQuantity|InvoiceValue|      RunningTotal|
+---------------+----------+-----------+-------------+------------+------------------+
|      Australia|        48|          1|          107|      358.25|            358.25|
|      Australia|        49|          1|          214|       258.9|            617.15|
|      Australia|        50|          2|          133|      387.95|1005.0999999999999|
|        Austria|        50|          2|            3|      257.04|            257.04|
|        Bahrain|        51|          1|           54|      205.74|            205.74|
|        Belgium|        48|          1|          528|       346.1|             346.1|
|        Belgium|        50|          2|          285|      625.16|            971.26|
|        Belgium|        51|          2|          942|      838.65|1809.9099999999999|
|Channel Islands|        49|          1|           80|      363.53|            363.53|
|         Cyprus|        50|          1|          917|     1590.82|           1590.82|
|        Denmark|        49|          1|          454|      1281.5|            1281.5|
|           EIRE|        48|          7|         2822|     3147.23|           3147.23|
|           EIRE|        49|          5|         1280|      3284.1|           6431.33|
|           EIRE|        50|          5|         1184|     2321.78|           8753.11|
|           EIRE|        51|          5|           95|      276.84|           9029.95|
|        Finland|        50|          1|         1254|       892.8|             892.8|
|         France|        48|          4|         1299|     2808.16|           2808.16|
|         France|        49|          9|         2303|     4527.01|           7335.17|
|         France|        50|          6|          529|      537.32|           7872.49|
|         France|        51|          5|          847|     1702.87|           9575.36|
+---------------+----------+-----------+-------------+------------+------------------+
only showing top 20 rows
Spark - 这篇文章属于一个选集。
§ 8: 本文

相关文章

Spark系列 - 数据转换(II)
·2008 字
大数据 Spark DataFrame SparkSQL withColumn Transformations
本章主要讨论 Spark 的数据转换。
Spark系列 - 数据转换(I)
·2750 字
大数据 Spark DataFrame SparkSQL Transformations
本章主要讨论 Spark 的数据转换。
Spark系列 - 数据存储
·1316 字
大数据 Spark DataFrame SparkSQL 分布式数据库
本章主要讨论 pySpark 的数据存储。
Spark系列 - 数据读取
·2470 字
大数据 Spark DataFrame SparkSQL 分布式数据库
本章主要讨论 pySpark 的数据读取。
Spark系列 - 配置Spark
·1559 字
大数据 Spark 日志 Log4J 硬编码 软编码
本文将详细介绍如何在 Spark 项目中配置 Log4J 日志模块,以及配置 Spark Session。
Spark系列 - 本地环境的搭建
·544 字
大数据 Spark 环境安装
本篇文章将介绍如何在本地 Mac 环境下搭建 Spark,包括安装 JDK、配置环境变量、安装和配置 Spark 以及安装 PySpark。