1. Inner Join #
我们首先创建两个 DataFrame,分别包含订单和产品信息,用于演示 Join 操作。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Spark Join Demo") \
.master("local[3]") \
.getOrCreate()
orders_list = [("01", "02", 350, 1),
("01", "04", 580, 1),
("01", "07", 320, 2),
("02", "03", 450, 1),
("02", "06", 220, 1),
("03", "01", 195, 1),
("04", "09", 270, 3),
("04", "08", 410, 2),
("05", "02", 350, 1)]
order_df = spark.createDataFrame(orders_list).toDF("order_id", "prod_id", "unit_price", "qty")
product_list = [("01", "Scroll Mouse", 250, 20),
("02", "Optical Mouse", 350, 20),
("03", "Wireless Mouse", 450, 50),
("04", "Wireless Keyboard", 580, 50),
("05", "Standard Keyboard", 360, 10),
("06", "16 GB Flash Storage", 240, 100),
("07", "32 GB Flash Storage", 320, 50),
("08", "64 GB Flash Storage", 430, 25)]
product_df = spark.createDataFrame(product_list).toDF("prod_id", "prod_name", "list_price", "qty")
product_df.show()
order_df.show()
- 输出
+-------+-------------------+----------+---+
|prod_id| prod_name|list_price|qty|
+-------+-------------------+----------+---+
| 01| Scroll Mouse| 250| 20|
| 02| Optical Mouse| 350| 20|
| 03| Wireless Mouse| 450| 50|
| 04| Wireless Keyboard| 580| 50|
| 05| Standard Keyboard| 360| 10|
| 06|16 GB Flash Storage| 240|100|
| 07|32 GB Flash Storage| 320| 50|
| 08|64 GB Flash Storage| 430| 25|
+-------+-------------------+----------+---+
+--------+-------+----------+---+
|order_id|prod_id|unit_price|qty|
+--------+-------+----------+---+
| 01| 02| 350| 1|
| 01| 04| 580| 1|
| 01| 07| 320| 2|
| 02| 03| 450| 1|
| 02| 06| 220| 1|
| 03| 01| 195| 1|
| 04| 09| 270| 3|
| 04| 08| 410| 2|
| 05| 02| 350| 1|
+--------+-------+----------+---+
接下来,我们执行 Inner Join 操作。由于两个 DataFrame 都有 qty 列,我们可以在 Join 时更改列名以避免冲突。
join_expr = order_df.prod_id == product_df.prod_id
product_renamed_df = product_df.withColumnRenamed("qty", "reorder_qty")
order_df.join(product_renamed_df, join_expr, "inner") \
.drop(product_renamed_df.prod_id) \
.select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty") \
.show()
- 输出
+--------+-------+-------------------+----------+----------+---+
|order_id|prod_id| prod_name|unit_price|list_price|qty|
+--------+-------+-------------------+----------+----------+---+
| 03| 01| Scroll Mouse| 195| 250| 1|
| 01| 02| Optical Mouse| 350| 350| 1|
| 05| 02| Optical Mouse| 350| 350| 1|
| 02| 03| Wireless Mouse| 450| 450| 1|
| 01| 04| Wireless Keyboard| 580| 580| 1|
| 02| 06|16 GB Flash Storage| 220| 240| 1|
| 01| 07|32 GB Flash Storage| 320| 320| 2|
| 04| 08|64 GB Flash Storage| 410| 430| 2|
+--------+-------+-------------------+----------+----------+---+
2. Outer Join #
- Outer Join 包含 Left Join、Right Join 和 Full Outer Join。
- 我们可以通过 coalesce 函数将某些列的空值替换为其他列的值。
- 例如通过
expr("coalesce(prod_name, prod_id)")
可以将prod_name
里的空值用prod_id
代替.
- 例如通过
from pyspark.sql.functions import expr
join_expr = order_df.prod_id == product_df.prod_id
product_renamed_df = product_df.withColumnRenamed("qty", "reorder_qty")
print("Outer Join:")
order_df.join(product_renamed_df, join_expr, "outer") \
.drop(product_renamed_df.prod_id) \
.select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty") \
.withColumn("prod_name", expr("coalesce(prod_name, prod_id)")) \
.withColumn("list_price", expr("coalesce(list_price, unit_price)")) \
.sort("order_id") \
.show()
print("Left Join:")
order_df.join(product_renamed_df, join_expr, "left") \
.drop(product_renamed_df.prod_id) \
.select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty") \
.withColumn("prod_name", expr("coalesce(prod_name, prod_id)")) \
.withColumn("list_price", expr("coalesce(list_price, unit_price)")) \
.sort("order_id") \
.show()
print("Right Join:")
order_df.join(product_renamed_df, join_expr, "right") \
.drop(product_renamed_df.prod_id) \
.select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty") \
.withColumn("prod_name", expr("coalesce(prod_name, prod_id)")) \
.withColumn("list_price", expr("coalesce(list_price, unit_price)")) \
.sort("order_id") \
.show()
- 输出:
Outer Join:
+--------+-------+-------------------+----------+----------+----+
|order_id|prod_id| prod_name|unit_price|list_price| qty|
+--------+-------+-------------------+----------+----------+----+
| NULL| NULL| Standard Keyboard| NULL| 360|NULL|
| 01| 02| Optical Mouse| 350| 350| 1|
| 01| 04| Wireless Keyboard| 580| 580| 1|
| 01| 07|32 GB Flash Storage| 320| 320| 2|
| 02| 03| Wireless Mouse| 450| 450| 1|
| 02| 06|16 GB Flash Storage| 220| 240| 1|
| 03| 01| Scroll Mouse| 195| 250| 1|
| 04| 08|64 GB Flash Storage| 410| 430| 2|
| 04| 09| 09| 270| 270| 3|
| 05| 02| Optical Mouse| 350| 350| 1|
+--------+-------+-------------------+----------+----------+----+
Left Join:
+--------+-------+-------------------+----------+----------+---+
|order_id|prod_id| prod_name|unit_price|list_price|qty|
+--------+-------+-------------------+----------+----------+---+
| 01| 07|32 GB Flash Storage| 320| 320| 2|
| 01| 02| Optical Mouse| 350| 350| 1|
| 01| 04| Wireless Keyboard| 580| 580| 1|
| 02| 03| Wireless Mouse| 450| 450| 1|
| 02| 06|16 GB Flash Storage| 220| 240| 1|
| 03| 01| Scroll Mouse| 195| 250| 1|
| 04| 09| 09| 270| 270| 3|
| 04| 08|64 GB Flash Storage| 410| 430| 2|
| 05| 02| Optical Mouse| 350| 350| 1|
+--------+-------+-------------------+----------+----------+---+
Right Join:
+--------+-------+-------------------+----------+----------+----+
|order_id|prod_id| prod_name|unit_price|list_price| qty|
+--------+-------+-------------------+----------+----------+----+
| NULL| NULL| Standard Keyboard| NULL| 360|NULL|
| 01| 02| Optical Mouse| 350| 350| 1|
| 01| 04| Wireless Keyboard| 580| 580| 1|
| 01| 07|32 GB Flash Storage| 320| 320| 2|
| 02| 03| Wireless Mouse| 450| 450| 1|
| 02| 06|16 GB Flash Storage| 220| 240| 1|
| 03| 01| Scroll Mouse| 195| 250| 1|
| 04| 08|64 GB Flash Storage| 410| 430| 2|
| 05| 02| Optical Mouse| 350| 350| 1|
+--------+-------+-------------------+----------+----------+----+
3. Spark Join 原理 #
在 Spark 中,Join 操作是分布式数据处理中最常见且计算量最大的一种操作。 了解 Spark 中不同 Join 操作的原理和优化方法对于提升性能至关重要。 这里介绍两种常见的 Join 策略:Shuffle Join 和 Broadcast Join。
3.1. Shuffle Join #
- 介绍: Shuffle Join 是 Spark 默认的 Join 策略之一,适用于两个大数据集的 Join 操作。它的原理是将两个数据集按 Join 键重新分区(Shuffle),然后在每个分区内进行 Join 操作。
- 优化:
- 在执行 Join 前过滤掉不必要的数据: 通过提前过滤掉不需要的数据,可以减少参与 Shuffle 的数据量,从而减少网络传输和计算量。
- 优化分区: 确保数据均匀分布在分区中,避免数据倾斜(Data Skew)。
- 增加并行度: 调整并行度参数来提升性能,例如
spark.sql.shuffle.partitions
。
3.2. Broadcast Join #
Broadcast Join 适用于一个数据集较大而另一个数据集较小的情况。其原理是将较小的数据集广播到每个执行节点,然后在本地进行 Join 操作。
-
示例:
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Broadcast Join Example") \ .getOrCreate() df1 = spark.read.json("data/large_data.json") df2 = spark.read.json("data/small_data.json") # 设置广播阈值,确保 df2 可以被广播 spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760) # 10 MB join_expr = df1.id == df2.id join_df = df1.join(df2.hint("broadcast"), join_expr, "inner") join_df.show(10)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 10485760)
- 这行代码设置了自动广播 Join 的阈值为 10 MB(10485760 字节)。这意味着任何小于 10 MB 的数据集都可以被广播。这是为了确保
df2
可以被广播。
- 这行代码设置了自动广播 Join 的阈值为 10 MB(10485760 字节)。这意味着任何小于 10 MB 的数据集都可以被广播。这是为了确保
- 使用
hint("broadcast")
方法提示 Spark 对 df2 进行广播,从而强制使用 Broadcast Join。Join 类型是 inner,即内连接。
-
Broadcast Join 的优势
- 减少网络传输:由于小数据集被广播到每个节点,避免了大数据集的 Shuffle 操作,从而减少了网络传输。
- 提高性能:Broadcast Join 通常比 Shuffle Join 快,因为它消除了数据重分布的开销。
-
使用 Broadcast Join 的注意事项
- 小数据集大小:确保被广播的数据集足够小,否则会导致内存不足和性能下降。
- 合理设置广播阈值:根据集群的内存配置,合理设置
spark.sql.autoBroadcastJoinThreshold
参数。
通过合理使用 Broadcast Join,可以显著提升大数据处理的效率,特别是在处理一个大数据集和一个小数据集的场景中。
4. Bucket Join #
- Bucket Join 是一种优化大数据集 Join 操作的方法,通过预先对数据进行分桶(Bucketing),减少 Join 时的数据 Shuffle 操作。
- 下面通过一个案例演示 Bucket Join 的具体实现过程。
- 读取并展示数据
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Bucket Join Demo") \ .master("local[3]") \ .enableHiveSupport() \ .getOrCreate() df1 = spark.read.json("data/d1/") df2 = spark.read.json("data/d2/") df1.show(5) df2.show(5)
- 输出:
+----+--------------+---------+----------+-----------------+------+----------------+----------+
|DEST|DEST_CITY_NAME| FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|ORIGIN_CITY_NAME| id|
+----+--------------+---------+----------+-----------------+------+----------------+----------+
| BOS| Boston, MA|1/12/2000| UA| 911| IAD| Washington, DC|8589934592|
| BOS| Boston, MA|1/12/2000| UA| 1210| IAD| Washington, DC|8589934593|
| BOS| Boston, MA|1/12/2000| UA| 1244| IAD| Washington, DC|8589934594|
| BOS| Boston, MA|1/12/2000| UA| 1250| IAD| Washington, DC|8589934595|
| BOS| Boston, MA|1/12/2000| UA| 1278| IAD| Washington, DC|8589934596|
+----+--------------+---------+----------+-----------------+------+----------------+----------+
only showing top 5 rows
+--------+---------+------------+------------+--------+--------+-------+---------+----------+
|ARR_TIME|CANCELLED|CRS_ARR_TIME|CRS_DEP_TIME|DEP_TIME|DISTANCE|TAXI_IN|WHEELS_ON| id|
+--------+---------+------------+------------+--------+--------+-------+---------+----------+
| 2317| 0| 2254| 2130| 2151| 413| 6| 2311|8589934592|
| 1357| 0| 1356| 1230| 1231| 413| 6| 1351|8589934593|
| 1557| 0| 1556| 1430| 1427| 413| 4| 1553|8589934594|
| 1709| 0| 1708| 1530| 1524| 413| 3| 1706|8589934595|
| 1820| 0| 1806| 1630| 1629| 413| 11| 1809|8589934596|
+--------+---------+------------+------------+--------+--------+-------+---------+----------+
only showing top 5 rows
- 保存数据表
- 将数据按 id 列进行分桶并保存为 Hive 表。
spark.sql("CREATE DATABASE IF NOT EXISTS MY_DB")
spark.sql("USE MY_DB")
df1.coalesce(1).write \
.bucketBy(3, "id") \
.mode("overwrite") \
.saveAsTable("MY_DB.flight_data1")
df2.coalesce(1).write \
.bucketBy(3, "id") \
.mode("overwrite") \
.saveAsTable("MY_DB.flight_data2")
- Bucketing 和 Join 操作
- 读取已分桶的表并进行 Join 操作。通过关闭自动广播功能,确保使用分桶表进行 Join。
df3 = spark.read.table("MY_DB.flight_data1")
df4 = spark.read.table("MY_DB.flight_data2")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
join_expr = df3.id == df4.id
join_df = df3.join(df4, join_expr, "inner")
join_df.show(10)
- 输出
+----+--------------+--------+----------+-----------------+------+----------------+---+--------+---------+------------+------------+--------+--------+-------+---------+---+
|DEST|DEST_CITY_NAME| FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|ORIGIN_CITY_NAME| id|ARR_TIME|CANCELLED|CRS_ARR_TIME|CRS_DEP_TIME|DEP_TIME|DISTANCE|TAXI_IN|WHEELS_ON| id|
+----+--------------+--------+----------+-----------------+------+----------------+---+--------+---------+------------+------------+--------+--------+-------+---------+---+
| ATL| Atlanta, GA|1/1/2000| DL| 1997| BOS| Boston, MA| 3| 2005| 0| 2013| 1715| 1720| 946| 10| 1955| 3|
| ATL| Atlanta, GA|1/1/2000| US| 2621| BOS| Boston, MA| 6| 1717| 0| 1738| 1440| 1446| 946| 4| 1713| 6|
| ATL| Atlanta, GA|1/1/2000| DL| 346| BTR| Baton Rouge, LA| 7| 2006| 0| 2008| 1740| 1744| 449| 9| 1957| 7|
| ATL| Atlanta, GA|1/1/2000| DL| 677| BUF| Buffalo, NY| 11| 947| 0| 925| 710| 710| 712| 7| 940| 11|
| ATL| Atlanta, GA|1/1/2000| DL| 2063| BWI| Baltimore, MD| 16| NULL| 1| 1449| 1250| NULL| 576| NULL| NULL| 16|
| ATL| Atlanta, GA|1/1/2000| DL| 1519| CAE| Columbia, SC| 24| 821| 0| 818| 720| 720| 191| 10| 811| 24|
| ATL| Atlanta, GA|1/1/2000| DL| 423| CHS| Charleston, SC| 28| 711| 0| 707| 600| 556| 259| 17| 654| 28|
| ATL| Atlanta, GA|1/1/2000| DL| 771| CHS| Charleston, SC| 30| 2224| 0| 2242| 2135| 2125| 259| 5| 2219| 30|
| ATL| Atlanta, GA|1/1/2000| DL| 1424| CHS| Charleston, SC| 31| 1642| 0| 1640| 1520| 1524| 259| 15| 1627| 31|
| ATL| Atlanta, GA|1/1/2000| DL| 1919| CLE| Cleveland, OH| 37| 854| 0| 918| 720| 716| 554| 9| 845| 37|
+----+--------------+--------+----------+-----------------+------+----------------+---+--------+---------+------------+------------+--------+--------+-------+---------+---+
only showing top 10 rows
- 详细说明:
- 读取和展示数据: 首先读取 JSON 格式的数据,并展示前五行,确保数据读取正确。
- 保存数据表:
- 使用
bucketBy
方法按id
列进行分桶,并将数据保存为 Hive 表。 coalesce(1)
将数据缩减到一个分区,确保每个桶的数据在一个文件中。
- 使用
- Bucketing 和 Join 操作:
- 读取分桶后的表数据。
- 关闭自动广播功能,确保 Join 操作使用分桶表,而不是广播 Join。
- 定义 Join 表达式,并执行 Join 操作,最终展示前十行结果。
通过上述步骤,使用 Bucket Join 可以显著减少大数据集 Join 时的 Shuffle 操作,从而提高性能。