跳过正文

Spark系列 - 数据读取

·2470 字·
大数据 Spark DataFrame SparkSQL 分布式数据库
EZ
作者
EZ
Take it EZ!
目录
Spark - 这篇文章属于一个选集。
§ 4: 本文

1. 分布式介绍
#

在实际情况中,文件数据通常存储在分布式系统中,如 HDFS、Amazon S3 等。这些文件会被分区,并将文件的分区存储在不同的分布式节点上。 例如,一个文件可以被划分为 100 个分区,然后存储在 10 个节点上。一些存储系统允许你控制文件的分区数,而另一些存储系统可能不允许你控制分区数。

  • Driver 与存储系统交互:在 Driver 中使用spark.read.csv()方法时,Driver 会与集群管理器(例如 YARN)和存储管理器(例如 HDFS)交互,以获取有关数据文件分区的详细信息。
  • DataFrame 的分区:可以将 DataFrame 视为一组较小的 DataFrame 的组合,每个小的 DataFrame 代表一个分区(partition)。Spark 会自动将这些分区分配给集群中的不同节点进行并行处理。
  • 资源分配:可以通过 Spark 配置文件或程序代码指定为 YARN 中的执行器(executors)分配多少内存和 CPU 资源。每个执行器都是一个独立的 JVM 进程,负责处理分配给它的任务。
  • 分区的加载与处理:每个执行器从存储系统中加载它们各自负责的分区到 JVM 内存中进行处理。这样可以充分利用集群的计算资源,实现高效的并行计算。

2. 数据来源
#

Spark 支持从多种外部和内部数据源读取数据。下面是几种常见的数据来源:

2.1. 外部数据来源
#

  • 方法一:使用数据集成工具:通过合适的数据集成工具,将数据存储到数据湖中(适用于批处理)。
  • 方法二:Spark 数据源 API:直接连接外部数据系统(适用于流处理)。
    • JDBC 数据源:如 Oracle、SQL Server、PostgreSQL。
    • NoSQL 数据系统:如 Cassandra、MongoDB。
    • 云数据仓库:如 Snowflake、Redshift。
    • 流数据集成器:如 Kafka、Kinesis。

2.2. 内部数据来源
#

  • HDFS
  • Amazon S3
  • Azure Blob
  • Google Cloud

3. 读取文件数据
#

3.1. 标准方式
#

下面的代码展示了一个标准方式如何从文件中读取数据。

spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("path", "/data/mycsvfiles/") \
    .option("mode", "FAILFAST") \
    .schema(mySchema) \
    .load()
  • .read 是Spark的属性,它提供了一个 DataframeReader 对象。
  • format 支持多种内置的数据格式:CSV、JSON、Parquet、ORC、JDBC等 。
    • 同时,通过社区的贡献,它还支持数百个外部的数据源,如:Cassandra、MongoDB、AVRO、XML、HBase、Redshift等。
  • option 用于设置读取文件的配置参数。每个 option 里都有一组对应的 key 和 value,用于确定DataFrameReader 读取数据的方式。
    • "header":是否包含标题行。
    • "mode" 在读取数据文件时,指定遇到格式错误的记录时的处理方式。
      • "PERMISSIVE":默认选项,遇到损坏的记录时将所有字段设置为空,并将损坏的记录放在 _corrupt_record 字段列表中。
      • "DROPMALFORMED":删除格式错误的记录。
      • "FAILFAST":遇到格式错误的记录时,引发异常并立即终止程序。
  • schema:指定数据 schema(可选)。

3.2. 介绍不同的读取方式
#

  • 方法1
    df = spark.read \
            .format('csv') \
            .option('header', 'true') \
            .option('inferScheme', 'true') \
            .load('./data/sample.csv')
    
    • 这个写法与上面的标准写法一致,也是最推荐的写法。
    • 使用 format() 方法指定文件格式。
    • inferSchema 设置为true,表示 Spark 会自动推断每列的数据类型(如整数、浮点数、日期等),但是不能保证其准确性,后面将介绍具体的解决办法。
    • 最后调用 load 方法,从给定的位置加载文件。
  • 方法2
    df = spark.read \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .csv('./data/sample.csv')
    
    • .csv 方法表示要读取 CSV 文件,并指定文件路径。
  • 方法3
    df = spark.read \
            .csv('./data/sample.csv', 
             header='true', 
             inferSchema='true')
    
    • 这段代码直接将之前的 option 的相关参数写在了 csv 配置里 以上三个方法得到相同的结果,但是推荐使用方法1。因为在读取数据时,会遇到不同格式的文件格式,第一种方法最通用的。下面将介绍一些不同文件格式的读取。

3.3. 展示数据
#

  • 显示所有数据
    df.show()
    
  • 显示10行数据
    df.show(10)
    
  • 在 Databricks 平台上友好地展示数据
    display(df)
    
    • display 不是 Apache Spark 里的 function, 它是 Databricks 平台的一个附加工具,用于格式化 DataFrame 的输出

3.4. 读取 CSV 文件
#

from pyspark.sql import SparkSession
from lib.logger import Log4J
spark = SparkSession \
    .builder \
    .master("local[3]") \
    .appName("SparkSchemaDemo") \
    .getOrCreate()

logger = Log4J(spark)

flightTimeCsvDF = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("data/flight*.csv")

flightTimeCsvDF.show(5)
logger.info("CSV Schema:" + flightTimeCsvDF.schema.simpleString())

与上面介绍的读取 CSV 文件一样。最后一行通过 df.schema.simpleString() 查看读取出来的 schema 信息

3.5. 读取 JSON 文件
#

flightTimeJsonDF = spark.read \
    .format("json") \
    .load("data/flight*.json")

flightTimeJsonDF.show(5)
logger.info("JSON Schema:" + flightTimeJsonDF.schema.simpleString())

读取 JSON 文件时,会自动推断 schema,所以不需要特别指定 inferSchema 为 true。

3.6. 读取 Parquet 文件
#

flightTimeParquetDF = spark.read \
    .format("parquet") \
    .load("data/flight*.parquet")

flightTimeParquetDF.show(5)
logger.info("Parquet Schema:" + flightTimeParquetDF.schema.simpleString())

Parquet 是一个二进制的文件格式,它是Apache Spark 推荐的默认文件格式。

读取 Parquet 文件时,由于 Parquet 文件会保存数据 schema 信息,所以只要文件里的 schema 信息正确,读取出来的 schema 就是正确的。

需要注意的是 CSV 和 JSON 文件读出来的日期字段的类型都是 string 类型,只有 Parquet 正确的表明日期字段是 date 类型。 尽量使用Parquet 格式的文件,因为它会保存schema信息。Spark 从 CSV 和 JSON 格式的数据文件中推断schema 不一定准确。

4. 定义 DataFrame 架构
#

4.1. Spark Data Types
#

在定义 DataFrame 架构(Schema)之前,我们需要了解 Spark 的数据类型。 下面是一些常见的 Spark 数据类型和与之对应的 Python 数据类型:

Spark Types Python Types
IntegerType int
LongType long
FloatType float
DoubleType float
StringType string
DateType datetime.date
TimestampType datetime.datetime
ArrayType list, tuple, or array
MapType dict

4.2. 编程式定义架构
#

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType

from lib.logger import Log4J

flightSchemaStruct = StructType([
    StructField("FL_DATE", DateType()),
    StructField("OP_CARRIER", StringType()),
    StructField("OP_CARRIER_FL_NUM", IntegerType()),
    StructField("ORIGIN", StringType()),
    StructField("ORIGIN_CITY_NAME", StringType()),
    StructField("DEST", StringType()),
    StructField("DEST_CITY_NAME", StringType()),
    StructField("CRS_DEP_TIME", IntegerType()),
    StructField("DEP_TIME", IntegerType()),
    StructField("WHEELS_ON", IntegerType()),
    StructField("TAXI_IN", IntegerType()),
    StructField("CRS_ARR_TIME", IntegerType()),
    StructField("ARR_TIME", IntegerType()),
    StructField("CANCELLED", IntegerType()),
    StructField("DISTANCE", IntegerType())
])

flightTimeCsvDF = spark.read \
    .format("csv") \
    .option("header", "true") \
    .schema(flightSchemaStruct) \
    .option("mode", "FAILFAST") \
    .option("dateFormat", "M/d/y") \
    .load("data/flight*.csv")
  • 定义架构:首先,通过 StructType 定义 DataFrame 的架构。架构由多个 StructField 组成, 每个 StructField 包含字段名称和对应的数据类型。例如,"FL_DATE" 字段的数据类型是 DateType()
  • 应用架构:在读取数据时,通过 .schema(flightSchemaStruct) 将定义好的架构应用到 DataFrame 中,以确保每个字段都具有预期的数据类型。
  • 设置读取选项:
    • 在读取数据文件时通过.schema(flightSchemaStruct)将定义好的数据类型应用到 DataFrame 里。
    • 使用 .option("mode", "FAILFAST") 设置在遇到格式错误时,程序会立即停止并提示错误。这有助于在数据读取过程中及时发现和解决问题。
    • 使用.option("dateFormat", "M/d/y") 设置日期格式,以确保正确解析日期字段。如果不指定日期格式,可能会导致日期字段解析错误。

4.3. DDL字符串定义架构
#

  • 通过 DDL String 来设置数据类型比较简单。
  • 将列名和对应的数据类型用单空格隔开,每组再用逗号加空格隔开。
  • 其余和上面的方法一样,具体写法如下:
 flightSchemaDDL = """FL_DATE DATE, OP_CARRIER STRING, OP_CARRIER_FL_NUM INT, ORIGIN STRING, 
       ORIGIN_CITY_NAME STRING, DEST STRING, DEST_CITY_NAME STRING, CRS_DEP_TIME INT, DEP_TIME INT, 
       WHEELS_ON INT, TAXI_IN INT, CRS_ARR_TIME INT, ARR_TIME INT, CANCELLED INT, DISTANCE INT"""

 flightTimeCsvDF = spark.read \
     .format("csv") \
     .option("header", "true") \
     .schema(flightSchemaDDL) \
     .option("mode", "FAILFAST") \
     .option("dateFormat", "M/d/y") \
     .load("data/flight*.csv")
Spark - 这篇文章属于一个选集。
§ 4: 本文

相关文章

Spark系列 - 配置Spark
·1559 字
大数据 Spark 日志 Log4J 硬编码 软编码
本文将详细介绍如何在 Spark 项目中配置 Log4J 日志模块,以及配置 Spark Session。
Spark系列 - 本地环境的搭建
·544 字
大数据 Spark 环境安装
本篇文章将介绍如何在本地 Mac 环境下搭建 Spark,包括安装 JDK、配置环境变量、安装和配置 Spark 以及安装 PySpark。
Spark系列 - 初识大数据
·2952 字
大数据 Spark Hadoop 数据库
这篇文章初步介绍了大数据、Hadoop 和 Spark 这三个关键方面。本文提供了一个简要的概述,为读者进一步了解大数据处理提供了基础。
AB测试系列 - AB测试里的统计学 PART 2
·2670 字
AB测试 统计 数据分析
本文主要讨论在AB测试中遇到的统计学知识点,主要包括:最小样本量计算、实验时间计算、以及一些其他相关的统计知识点。
AB测试系列 - AB测试里的统计学 PART 1
·3793 字
AB测试 统计 数据分析
本文主要讨论在AB测试中遇到的统计学知识点,主要包括:效应大小、MDE、置信区间、假设检验等等。
AB测试系列 - 如何提升测试的可靠性
·4400 字
AB测试 统计 数据分析
本文主要讨论如何去确保AB测试的有效性,简单地开启AB测试无法保证其实验结果是否有效,需要结合具体的业务场景及一些科学的方法进行验证。