跳过正文

Spark系列 - 数据存储

·1316 字·
大数据 Spark DataFrame SparkSQL 分布式数据库
EZ
作者
EZ
Take it EZ!
目录
Spark - 这篇文章属于一个选集。
§ 5: 本文

在数据处理完成后,可以将数据存储回内部或外部系统中。但不建议直接将数据写入外部系统中,因为这可能会带来性能和可靠性的问题。Spark 允许你使用 DataFrameWriter API 写入数据。

1. 文件形式存储
#

df.write\
    .format("parquet") \
    .mode(saveMode) \
    .option("path", "/data/flights") \
    .save()
  • saveMode 有四种:

    • append:创建新的文件,不会覆盖或删除现有的文件
    • overwrite:删掉现有的文件并创建新的文件
    • errorIfExists:如果已于该文件,程序会引发错误,从而终止程序继续运行
    • ignore:如果已有该文件,则不会执行任何操作
  • 存储说明:

    • 在存储数据时,还需要考虑文件的数量、文件的大小、分区、分桶和排序等问题。
    • 可以使用 DataFrame.repartition(n) 进行简单的重新分区,但这种分区可能不是理想的。
    • 更好的方法是使用 DataFrameWriter.partitionBy(col1, col2),根据列的值进行分区,
    • 还可以使用 DataFrameWriter.bucketBy(n, col1, col2) 进行分桶,并结合 sortBy() 创建排序的存储桶。
    • 此外,可以使用 maxRecordsPerFile 选项限制每个文件的记录数,这对于控制文件大小非常有用。

1.1. 示例
#

from pyspark.sql import SparkSession

from lib.logger import Log4J

spark = SparkSession \
    .builder \
    .master("local[3]") \
    .appName("DataSinkDemo") \
    .getOrCreate()

logger = Log4J(spark)

flightTimeParquetDF = spark.read. \
    format("parquet") \
    .load("data/flight*.parquet")

flightTimeParquetDF.write \
    .format("json") \
    .mode("overwrite") \
    .option("path", "dataSink/json/") \
    .partitionBy("OP_CARRIER", "ORIGIN") \
    .save()
- 数据集里有十个不同的 CARRIER,每个目录里还有不同的 ORIGIN 目录(这里就不全部展示了)。ORIGIN 目录里有存储相关的数据。 - 如果我们读这些数据的话,并只针对一个 CARRIER 进行筛选,那Spark SQL引擎可以通过这个存储格式来优化读取和过滤的操作,忽略其他的目录,只读取相关的目录。 - OP_CARRIER 和 ORIGIN 的信息不会存储在JSON文件里,因为两层目录的名字已经包含了相关的信息,避免造成数据冗余。

通常情况下,每个文件的大小建议控制在500MB到几GB之间。可以通过以下代码限制每个文件保存最多为 10000 条记录:

flightTimeParquetDF.write \
    .format("json") \
    .mode("overwrite") \
    .option("path", "dataSink/json/") \
    .partitionBy("OP_CARRIER", "ORIGIN") \
    .option("maxRecordsPerFile", 10000)\
    .save()
  • 只需要加上 .option("maxRecordsPerFile", 10000) 就可以限制每个文件保存最多为10000条记录。

2. 数据库形式存储
#

Spark 不仅仅是一组 API 和一个处理引擎,它本身就是一个数据库,因此可以在 Spark 中创建数据库并存储表和视图。 表可以存储为文件,并且格式可以自定义。表的元数据存储在 catalog 中。 Spark Tables 有两种类型:

  1. Managed Tables
    • Spark 管理表里的数据以及元数据。
    • 数据固定存储在 spark.sql.warehouse 目录里。
    • 使用 drop 时,会同时删掉表里的数据和元数据。
    • 比 Unmanaged Tables 多一些功能,比如 bucketing 和 sorting。
  2. Unmanaged Tables (External Tables)
    • 表里的数据存储在任意地方,Spark 只管理元数据。
    • 创建表时必须指定存储位置。
    • 使用 drop 时,Spark 不会删除数据文件,只会删除元数据。

2.1. 示例
#

  • 首先我们需要通过 .enableHiveSupport() 打开 Hive 支持,因为要用到 Hive Metastore。

    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSQLTableDemo") \
        .enableHiveSupport() \
        .getOrCreate()
    
  • Apache Spark 自带了一个默认数据库,这个数据库的名字叫“default”。

  • 我们可以在 Spark 中创建自定义的数据库:

    spark.sql("CREATE DATABASE IF NOT EXISTS AIRLINE_DB")
    
  • 写入数据库时,我们可以在表的名字前面加上数据库名字,从而存储到指定的数据库中。

    flightTimeParquetDF.write \
        .mode("overwrite") \
        .saveAsTable("AIRLINE_DB.flight_data_tbl")
    
  • 或者设置当前的数据库

    spark.catalog.setCurrentDatabase("AIRLINE_DB")
    flightTimeParquetDF.write \
        .mode("overwrite") \
        .saveAsTable("flight_data_tbl")
    
  • 分区存储

    • bucketBy() 允许你限制分区的数量(有时,存储桶可以显著改进 join 操作)。
    • sortBy()bucketBy() 一起使用创建排序的存储桶。
    • 如果想在本地查看存储的文件内容,可以在 .write 后面加上 .format("csv"),以 CSV 格式存储,可以方便本地查看。这里没加,所以会默认存储 Parquet 二进制文件格式。
      flightTimeParquetDF.write \
          .mode("overwrite") \
          .bucketBy(5, "OP_carrier", "ORIGIN") \
          .sortBy("OP_carrier", "ORIGIN") \
          .saveAsTable("AIRLINE_DB.flight_data_tbl")
      
  • 在本地运行时,会在项目目录下创建 metastore_dbspark-warehouse 目录分别存储元数据和表数据。

  • 在云环境下,这两个目录都由集群管理员来配置,存在所有Spark应用程序的公共位置。

Spark - 这篇文章属于一个选集。
§ 5: 本文

相关文章

Spark系列 - 数据读取
·2470 字
大数据 Spark DataFrame SparkSQL 分布式数据库
本章主要讨论 pySpark 的数据读取。
Spark系列 - 配置Spark
·1559 字
大数据 Spark 日志 Log4J 硬编码 软编码
本文将详细介绍如何在 Spark 项目中配置 Log4J 日志模块,以及配置 Spark Session。
Spark系列 - 本地环境的搭建
·544 字
大数据 Spark 环境安装
本篇文章将介绍如何在本地 Mac 环境下搭建 Spark,包括安装 JDK、配置环境变量、安装和配置 Spark 以及安装 PySpark。
Spark系列 - 初识大数据
·2952 字
大数据 Spark Hadoop 数据库
这篇文章初步介绍了大数据、Hadoop 和 Spark 这三个关键方面。本文提供了一个简要的概述,为读者进一步了解大数据处理提供了基础。
AB测试系列 - AB测试里的统计学 PART 2
·2670 字
AB测试 统计 数据分析
本文主要讨论在AB测试中遇到的统计学知识点,主要包括:最小样本量计算、实验时间计算、以及一些其他相关的统计知识点。
AB测试系列 - AB测试里的统计学 PART 1
·3793 字
AB测试 统计 数据分析
本文主要讨论在AB测试中遇到的统计学知识点,主要包括:效应大小、MDE、置信区间、假设检验等等。