在数据处理完成后,可以将数据存储回内部或外部系统中。但不建议直接将数据写入外部系统中,因为这可能会带来性能和可靠性的问题。Spark 允许你使用 DataFrameWriter
API 写入数据。
1. 文件形式存储 #
df.write\
.format("parquet") \
.mode(saveMode) \
.option("path", "/data/flights") \
.save()
-
saveMode
有四种:append
:创建新的文件,不会覆盖或删除现有的文件overwrite
:删掉现有的文件并创建新的文件errorIfExists
:如果已于该文件,程序会引发错误,从而终止程序继续运行ignore
:如果已有该文件,则不会执行任何操作
-
存储说明:
- 在存储数据时,还需要考虑文件的数量、文件的大小、分区、分桶和排序等问题。
- 可以使用
DataFrame.repartition(n)
进行简单的重新分区,但这种分区可能不是理想的。 - 更好的方法是使用
DataFrameWriter.partitionBy(col1, col2)
,根据列的值进行分区, - 还可以使用
DataFrameWriter.bucketBy(n, col1, col2)
进行分桶,并结合sortBy()
创建排序的存储桶。 - 此外,可以使用
maxRecordsPerFile
选项限制每个文件的记录数,这对于控制文件大小非常有用。
1.1. 示例 #
from pyspark.sql import SparkSession
from lib.logger import Log4J
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("DataSinkDemo") \
.getOrCreate()
logger = Log4J(spark)
flightTimeParquetDF = spark.read. \
format("parquet") \
.load("data/flight*.parquet")
flightTimeParquetDF.write \
.format("json") \
.mode("overwrite") \
.option("path", "dataSink/json/") \
.partitionBy("OP_CARRIER", "ORIGIN") \
.save()

通常情况下,每个文件的大小建议控制在500MB到几GB之间。可以通过以下代码限制每个文件保存最多为 10000 条记录:
flightTimeParquetDF.write \
.format("json") \
.mode("overwrite") \
.option("path", "dataSink/json/") \
.partitionBy("OP_CARRIER", "ORIGIN") \
.option("maxRecordsPerFile", 10000)\
.save()
- 只需要加上
.option("maxRecordsPerFile", 10000)
就可以限制每个文件保存最多为10000条记录。
2. 数据库形式存储 #
Spark 不仅仅是一组 API 和一个处理引擎,它本身就是一个数据库,因此可以在 Spark 中创建数据库并存储表和视图。 表可以存储为文件,并且格式可以自定义。表的元数据存储在 catalog 中。 Spark Tables 有两种类型:
- Managed Tables
- Spark 管理表里的数据以及元数据。
- 数据固定存储在
spark.sql.warehouse
目录里。 - 使用
drop
时,会同时删掉表里的数据和元数据。 - 比 Unmanaged Tables 多一些功能,比如 bucketing 和 sorting。
- Unmanaged Tables (External Tables)
- 表里的数据存储在任意地方,Spark 只管理元数据。
- 创建表时必须指定存储位置。
- 使用
drop
时,Spark 不会删除数据文件,只会删除元数据。
2.1. 示例 #
-
首先我们需要通过
.enableHiveSupport()
打开 Hive 支持,因为要用到 Hive Metastore。spark = SparkSession \ .builder \ .master("local[3]") \ .appName("SparkSQLTableDemo") \ .enableHiveSupport() \ .getOrCreate()
-
Apache Spark 自带了一个默认数据库,这个数据库的名字叫“default”。
-
我们可以在 Spark 中创建自定义的数据库:
spark.sql("CREATE DATABASE IF NOT EXISTS AIRLINE_DB")
-
写入数据库时,我们可以在表的名字前面加上数据库名字,从而存储到指定的数据库中。
flightTimeParquetDF.write \ .mode("overwrite") \ .saveAsTable("AIRLINE_DB.flight_data_tbl")
-
或者设置当前的数据库
spark.catalog.setCurrentDatabase("AIRLINE_DB") flightTimeParquetDF.write \ .mode("overwrite") \ .saveAsTable("flight_data_tbl")
-
分区存储
bucketBy()
允许你限制分区的数量(有时,存储桶可以显著改进join
操作)。sortBy()
和bucketBy()
一起使用创建排序的存储桶。- 如果想在本地查看存储的文件内容,可以在
.write
后面加上.format("csv")
,以 CSV 格式存储,可以方便本地查看。这里没加,所以会默认存储 Parquet 二进制文件格式。flightTimeParquetDF.write \ .mode("overwrite") \ .bucketBy(5, "OP_carrier", "ORIGIN") \ .sortBy("OP_carrier", "ORIGIN") \ .saveAsTable("AIRLINE_DB.flight_data_tbl")
-
在本地运行时,会在项目目录下创建
metastore_db
和spark-warehouse
目录分别存储元数据和表数据。 -
在云环境下,这两个目录都由集群管理员来配置,存在所有Spark应用程序的公共位置。