在大数据处理中,聚合操作是不可或缺的。本篇文章将介绍Spark中常用的聚合函数及其应用,并探讨窗口函数的使用。
1. 简单聚合 #
Spark提供了丰富的聚合函数,如:avg()
, count()
, max()
, min()
, sum()
等。
- 示例:
from pyspark.sql import SparkSession from pyspark.sql import functions as f from lib.logger import Log4J if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("Agg Demo") \ .master("local[2]") \ .getOrCreate() logger = Log4J(spark) invoice_df = spark.read \ .format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load("data/invoices.csv") invoice_df.select(f.count("*").alias("Count *"), f.sum("Quantity").alias("TotalQuantity"), f.avg("UnitPrice").alias("AvgPrice"), f.countDistinct("InvoiceNo").alias("CountDistinct") ).show()
- 输出:
+-------+-------------+-----------------+-------------+ |Count *|TotalQuantity| AvgPrice|CountDistinct| +-------+-------------+-----------------+-------------+ | 541909| 5176450|4.611113626088498| 25900| +-------+-------------+-----------------+-------------+
- 也可以使用SQL表达式进行聚合
invoice_df.selectExpr( "count(1) as `count 1`", "count(StockCode) as `count field`", "sum(Quantity) as TotalQuantity", "avg(UnitPrice) as AvgPrice" ).show()
- 输出:
+-------+-----------+-------------+----------------+ |count 1|count field|TotalQuantity| AvgPrice| +-------+-----------+-------------+----------------+ | 541909| 541908| 5176450|4.61111362608295| +-------+-----------+-------------+----------------+
2. 分组聚合 #
- 使用
groupBy()
方法可以对数据进行分组,并应用聚合函数。 agg()
是专门设计用来获取聚合函数列表的。- 示例:
summary_df = invoice_df \ .groupBy("Country", "InvoiceNo") \ .agg(f.sum("Quantity").alias("TotalQuantity"), f.round(f.sum(f.expr("Quantity * UnitPrice")), 2).alias("InvoiceValue"), f.expr("round(sum(Quantity * UnitPrice),2) as InvoiceValueExpr") ) summary_df.show()
- 输出:
+--------------+---------+-------------+------------+----------------+
| Country|InvoiceNo|TotalQuantity|InvoiceValue|InvoiceValueExpr|
+--------------+---------+-------------+------------+----------------+
|United Kingdom| 536446| 329| 440.89| 440.89|
|United Kingdom| 536508| 216| 155.52| 155.52|
|United Kingdom| 537018| -3| 0.0| 0.0|
|United Kingdom| 537401| -24| 0.0| 0.0|
|United Kingdom| 537811| 74| 268.86| 268.86|
|United Kingdom| C537824| -2| -14.9| -14.9|
|United Kingdom| 538895| 370| 247.38| 247.38|
|United Kingdom| 540453| 341| 302.45| 302.45|
|United Kingdom| 541291| 217| 305.81| 305.81|
|United Kingdom| 542551| -1| 0.0| 0.0|
|United Kingdom| 542576| -1| 0.0| 0.0|
|United Kingdom| 542628| 9| 132.35| 132.35|
|United Kingdom| 542886| 199| 320.51| 320.51|
|United Kingdom| 542907| 75| 313.85| 313.85|
|United Kingdom| 543131| 134| 164.1| 164.1|
|United Kingdom| 543189| 102| 153.94| 153.94|
|United Kingdom| 543265| -4| 0.0| 0.0|
| Cyprus| 544574| 173| 320.69| 320.69|
|United Kingdom| 545077| 24| 10.08| 10.08|
|United Kingdom| 545300| 116| 323.16| 323.16|
+--------------+---------+-------------+------------+----------------+
only showing top 20 rows
- 也可以使用SQL表达式:
invoice_df.createOrReplaceTempView("sales") summary_sql = spark.sql(""" SELECT Country, InvoiceNo, sum(Quantity) as TotalQuantity, round(sum(Quantity*UnitPrice),2) as InvoiceValue FROM sales GROUP BY Country, InvoiceNo""") summary_sql.show()
- 输出:
+--------------+---------+-------------+------------+ | Country|InvoiceNo|TotalQuantity|InvoiceValue| +--------------+---------+-------------+------------+ |United Kingdom| 536446| 329| 440.89| |United Kingdom| 536508| 216| 155.52| |United Kingdom| 537018| -3| 0.0| |United Kingdom| 537401| -24| 0.0| |United Kingdom| 537811| 74| 268.86| |United Kingdom| C537824| -2| -14.9| |United Kingdom| 538895| 370| 247.38| |United Kingdom| 540453| 341| 302.45| |United Kingdom| 541291| 217| 305.81| |United Kingdom| 542551| -1| 0.0| |United Kingdom| 542576| -1| 0.0| |United Kingdom| 542628| 9| 132.35| |United Kingdom| 542886| 199| 320.51| |United Kingdom| 542907| 75| 313.85| |United Kingdom| 543131| 134| 164.1| |United Kingdom| 543189| 102| 153.94| |United Kingdom| 543265| -4| 0.0| | Cyprus| 544574| 173| 320.69| |United Kingdom| 545077| 24| 10.08| |United Kingdom| 545300| 116| 323.16| +--------------+---------+-------------+------------+ only showing top 20 rows
- 另一个分组聚合的示例:
NumInvoices = f.countDistinct("InvoiceNo").alias("NumInvoices")
TotalQuantity = f.sum("Quantity").alias("TotalQuantity")
InvoiceValue = f.expr("round(sum(Quantity * UnitPrice),2) as InvoiceValue")
exSummary_df = invoice_df \
.withColumn("InvoiceDate", f.to_date(f.col("InvoiceDate"), "dd-MM-yyyy H.mm")) \
.where("year(InvoiceDate) == 2010") \
.withColumn("WeekNumber", f.weekofyear(f.col("InvoiceDate"))) \
.groupBy("Country", "WeekNumber") \
.agg(NumInvoices, TotalQuantity, InvoiceValue)
exSummary_df.sort("Country", "WeekNumber").show()
- 输出:
+---------------+----------+-----------+-------------+------------+
| Country|WeekNumber|NumInvoices|TotalQuantity|InvoiceValue|
+---------------+----------+-----------+-------------+------------+
| Australia| 48| 1| 107| 358.25|
| Australia| 49| 1| 214| 258.9|
| Australia| 50| 2| 133| 387.95|
| Austria| 50| 2| 3| 257.04|
| Bahrain| 51| 1| 54| 205.74|
| Belgium| 48| 1| 528| 346.1|
| Belgium| 50| 2| 285| 625.16|
| Belgium| 51| 2| 942| 838.65|
|Channel Islands| 49| 1| 80| 363.53|
| Cyprus| 50| 1| 917| 1590.82|
| Denmark| 49| 1| 454| 1281.5|
| EIRE| 48| 7| 2822| 3147.23|
| EIRE| 49| 5| 1280| 3284.1|
| EIRE| 50| 5| 1184| 2321.78|
| EIRE| 51| 5| 95| 276.84|
| Finland| 50| 1| 1254| 892.8|
| France| 48| 4| 1299| 2808.16|
| France| 49| 9| 2303| 4527.01|
| France| 50| 6| 529| 537.32|
| France| 51| 5| 847| 1702.87|
+---------------+----------+-----------+-------------+------------+
only showing top 20 rows
- 保存 DataFrame,用于下面的窗口聚合
exSummary_df.coalesce(1) \ .write \ .format("parquet") \ .mode("overwrite") \ .save("output")
3. 窗口聚合 #
- 窗口函数允许我们在数据的一个分区内进行聚合操作,例如累积和、排名等。
- 常见的窗口函数:
lead()
,lag()
,rank()
,dense_rank()
,cume_dist()
等。 - 示例:
- 读取上面的文件
- 确定需要partition 的列
- 确定排序条件
- 确定窗口的起点和终点
Window.unboundedPreceding
:从开头取所有的行;改成-2,就是前面两行再加当前行。
- 最后添加列时,用
sum()
和over()
来做分组后的累计求和。sum()
也可以用其他函数来代替,例如:dense_rank()
、lead()
、和lag
等等。
from pyspark.sql import Window
summary_df = spark.read.parquet("data/*.parquet")
running_total_window = Window.partitionBy("Country") \
.orderBy("WeekNumber") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
summary_df.withColumn("RunningTotal",
f.sum("InvoiceValue").over(running_total_window)) \
.show()
- 输出:
+---------------+----------+-----------+-------------+------------+------------------+
| Country|WeekNumber|NumInvoices|TotalQuantity|InvoiceValue| RunningTotal|
+---------------+----------+-----------+-------------+------------+------------------+
| Australia| 48| 1| 107| 358.25| 358.25|
| Australia| 49| 1| 214| 258.9| 617.15|
| Australia| 50| 2| 133| 387.95|1005.0999999999999|
| Austria| 50| 2| 3| 257.04| 257.04|
| Bahrain| 51| 1| 54| 205.74| 205.74|
| Belgium| 48| 1| 528| 346.1| 346.1|
| Belgium| 50| 2| 285| 625.16| 971.26|
| Belgium| 51| 2| 942| 838.65|1809.9099999999999|
|Channel Islands| 49| 1| 80| 363.53| 363.53|
| Cyprus| 50| 1| 917| 1590.82| 1590.82|
| Denmark| 49| 1| 454| 1281.5| 1281.5|
| EIRE| 48| 7| 2822| 3147.23| 3147.23|
| EIRE| 49| 5| 1280| 3284.1| 6431.33|
| EIRE| 50| 5| 1184| 2321.78| 8753.11|
| EIRE| 51| 5| 95| 276.84| 9029.95|
| Finland| 50| 1| 1254| 892.8| 892.8|
| France| 48| 4| 1299| 2808.16| 2808.16|
| France| 49| 9| 2303| 4527.01| 7335.17|
| France| 50| 6| 529| 537.32| 7872.49|
| France| 51| 5| 847| 1702.87| 9575.36|
+---------------+----------+-----------+-------------+------------+------------------+
only showing top 20 rows