1. 分布式介绍 #
在实际情况中,文件数据通常存储在分布式系统中,如 HDFS、Amazon S3 等。这些文件会被分区,并将文件的分区存储在不同的分布式节点上。 例如,一个文件可以被划分为 100 个分区,然后存储在 10 个节点上。一些存储系统允许你控制文件的分区数,而另一些存储系统可能不允许你控制分区数。
- Driver 与存储系统交互:在 Driver 中使用
spark.read.csv()
方法时,Driver 会与集群管理器(例如 YARN)和存储管理器(例如 HDFS)交互,以获取有关数据文件分区的详细信息。 - DataFrame 的分区:可以将 DataFrame 视为一组较小的 DataFrame 的组合,每个小的 DataFrame 代表一个分区(partition)。Spark 会自动将这些分区分配给集群中的不同节点进行并行处理。
- 资源分配:可以通过 Spark 配置文件或程序代码指定为 YARN 中的执行器(executors)分配多少内存和 CPU 资源。每个执行器都是一个独立的 JVM 进程,负责处理分配给它的任务。
- 分区的加载与处理:每个执行器从存储系统中加载它们各自负责的分区到 JVM 内存中进行处理。这样可以充分利用集群的计算资源,实现高效的并行计算。
2. 数据来源 #
Spark 支持从多种外部和内部数据源读取数据。下面是几种常见的数据来源:
2.1. 外部数据来源 #
- 方法一:使用数据集成工具:通过合适的数据集成工具,将数据存储到数据湖中(适用于批处理)。
- 方法二:Spark 数据源 API:直接连接外部数据系统(适用于流处理)。
- JDBC 数据源:如 Oracle、SQL Server、PostgreSQL。
- NoSQL 数据系统:如 Cassandra、MongoDB。
- 云数据仓库:如 Snowflake、Redshift。
- 流数据集成器:如 Kafka、Kinesis。
2.2. 内部数据来源 #
- HDFS
- Amazon S3
- Azure Blob
- Google Cloud
3. 读取文件数据 #
3.1. 标准方式 #
下面的代码展示了一个标准方式如何从文件中读取数据。
spark.read \
.format("csv") \
.option("header", "true") \
.option("path", "/data/mycsvfiles/") \
.option("mode", "FAILFAST") \
.schema(mySchema) \
.load()
.read
是Spark的属性,它提供了一个 DataframeReader 对象。format
支持多种内置的数据格式:CSV、JSON、Parquet、ORC、JDBC等 。- 同时,通过社区的贡献,它还支持数百个外部的数据源,如:Cassandra、MongoDB、AVRO、XML、HBase、Redshift等。
option
用于设置读取文件的配置参数。每个 option 里都有一组对应的 key 和 value,用于确定DataFrameReader 读取数据的方式。"header"
:是否包含标题行。"mode"
在读取数据文件时,指定遇到格式错误的记录时的处理方式。"PERMISSIVE"
:默认选项,遇到损坏的记录时将所有字段设置为空,并将损坏的记录放在_corrupt_record
字段列表中。"DROPMALFORMED"
:删除格式错误的记录。"FAILFAST"
:遇到格式错误的记录时,引发异常并立即终止程序。
schema
:指定数据 schema(可选)。
3.2. 介绍不同的读取方式 #
- 方法1
df = spark.read \ .format('csv') \ .option('header', 'true') \ .option('inferScheme', 'true') \ .load('./data/sample.csv')
- 这个写法与上面的标准写法一致,也是最推荐的写法。
- 使用
format()
方法指定文件格式。 - inferSchema 设置为true,表示 Spark 会自动推断每列的数据类型(如整数、浮点数、日期等),但是不能保证其准确性,后面将介绍具体的解决办法。
- 最后调用 load 方法,从给定的位置加载文件。
- 方法2
df = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .csv('./data/sample.csv')
.csv
方法表示要读取 CSV 文件,并指定文件路径。
- 方法3
df = spark.read \ .csv('./data/sample.csv', header='true', inferSchema='true')
- 这段代码直接将之前的
option
的相关参数写在了csv
配置里 以上三个方法得到相同的结果,但是推荐使用方法1。因为在读取数据时,会遇到不同格式的文件格式,第一种方法最通用的。下面将介绍一些不同文件格式的读取。
- 这段代码直接将之前的
3.3. 展示数据 #
- 显示所有数据
df.show()
- 显示10行数据
df.show(10)
- 在 Databricks 平台上友好地展示数据
display(df)
- display 不是 Apache Spark 里的 function, 它是 Databricks 平台的一个附加工具,用于格式化 DataFrame 的输出
3.4. 读取 CSV 文件 #
from pyspark.sql import SparkSession
from lib.logger import Log4J
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.getOrCreate()
logger = Log4J(spark)
flightTimeCsvDF = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("data/flight*.csv")
flightTimeCsvDF.show(5)
logger.info("CSV Schema:" + flightTimeCsvDF.schema.simpleString())
与上面介绍的读取 CSV 文件一样。最后一行通过 df.schema.simpleString()
查看读取出来的 schema 信息
3.5. 读取 JSON 文件 #
flightTimeJsonDF = spark.read \
.format("json") \
.load("data/flight*.json")
flightTimeJsonDF.show(5)
logger.info("JSON Schema:" + flightTimeJsonDF.schema.simpleString())
读取 JSON 文件时,会自动推断 schema,所以不需要特别指定 inferSchema 为 true。
3.6. 读取 Parquet 文件 #
flightTimeParquetDF = spark.read \
.format("parquet") \
.load("data/flight*.parquet")
flightTimeParquetDF.show(5)
logger.info("Parquet Schema:" + flightTimeParquetDF.schema.simpleString())
Parquet 是一个二进制的文件格式,它是Apache Spark 推荐的默认文件格式。
读取 Parquet 文件时,由于 Parquet 文件会保存数据 schema 信息,所以只要文件里的 schema 信息正确,读取出来的 schema 就是正确的。
需要注意的是 CSV 和 JSON 文件读出来的日期字段的类型都是 string 类型,只有 Parquet 正确的表明日期字段是 date 类型。 尽量使用Parquet 格式的文件,因为它会保存schema信息。Spark 从 CSV 和 JSON 格式的数据文件中推断schema 不一定准确。
4. 定义 DataFrame 架构 #
4.1. Spark Data Types #
在定义 DataFrame 架构(Schema)之前,我们需要了解 Spark 的数据类型。 下面是一些常见的 Spark 数据类型和与之对应的 Python 数据类型:
Spark Types | Python Types |
---|---|
IntegerType | int |
LongType | long |
FloatType | float |
DoubleType | float |
StringType | string |
DateType | datetime.date |
TimestampType | datetime.datetime |
ArrayType | list, tuple, or array |
MapType | dict |
4.2. 编程式定义架构 #
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType
from lib.logger import Log4J
flightSchemaStruct = StructType([
StructField("FL_DATE", DateType()),
StructField("OP_CARRIER", StringType()),
StructField("OP_CARRIER_FL_NUM", IntegerType()),
StructField("ORIGIN", StringType()),
StructField("ORIGIN_CITY_NAME", StringType()),
StructField("DEST", StringType()),
StructField("DEST_CITY_NAME", StringType()),
StructField("CRS_DEP_TIME", IntegerType()),
StructField("DEP_TIME", IntegerType()),
StructField("WHEELS_ON", IntegerType()),
StructField("TAXI_IN", IntegerType()),
StructField("CRS_ARR_TIME", IntegerType()),
StructField("ARR_TIME", IntegerType()),
StructField("CANCELLED", IntegerType()),
StructField("DISTANCE", IntegerType())
])
flightTimeCsvDF = spark.read \
.format("csv") \
.option("header", "true") \
.schema(flightSchemaStruct) \
.option("mode", "FAILFAST") \
.option("dateFormat", "M/d/y") \
.load("data/flight*.csv")
- 定义架构:首先,通过
StructType
定义 DataFrame 的架构。架构由多个StructField
组成, 每个StructField
包含字段名称和对应的数据类型。例如,"FL_DATE"
字段的数据类型是DateType()
。 - 应用架构:在读取数据时,通过
.schema(flightSchemaStruct)
将定义好的架构应用到 DataFrame 中,以确保每个字段都具有预期的数据类型。 - 设置读取选项:
- 在读取数据文件时通过
.schema(flightSchemaStruct)
将定义好的数据类型应用到 DataFrame 里。 - 使用
.option("mode", "FAILFAST")
设置在遇到格式错误时,程序会立即停止并提示错误。这有助于在数据读取过程中及时发现和解决问题。 - 使用
.option("dateFormat", "M/d/y")
设置日期格式,以确保正确解析日期字段。如果不指定日期格式,可能会导致日期字段解析错误。
- 在读取数据文件时通过
4.3. DDL字符串定义架构 #
- 通过 DDL String 来设置数据类型比较简单。
- 将列名和对应的数据类型用单空格隔开,每组再用逗号加空格隔开。
- 其余和上面的方法一样,具体写法如下:
flightSchemaDDL = """FL_DATE DATE, OP_CARRIER STRING, OP_CARRIER_FL_NUM INT, ORIGIN STRING,
ORIGIN_CITY_NAME STRING, DEST STRING, DEST_CITY_NAME STRING, CRS_DEP_TIME INT, DEP_TIME INT,
WHEELS_ON INT, TAXI_IN INT, CRS_ARR_TIME INT, ARR_TIME INT, CANCELLED INT, DISTANCE INT"""
flightTimeCsvDF = spark.read \
.format("csv") \
.option("header", "true") \
.schema(flightSchemaDDL) \
.option("mode", "FAILFAST") \
.option("dateFormat", "M/d/y") \
.load("data/flight*.csv")