跳过正文

Spark系列 - 数据转换(I)

·2750 字·
大数据 Spark DataFrame SparkSQL Transformations
EZ
作者
EZ
Take it EZ!
目录
Spark - 这篇文章属于一个选集。
§ 6: 本文

1. 定义 DataFrame 转换
#

Spark DataFrame 是一种不可变的数据结构,允许通过数据转换(Transformation)将一个 DataFrame 转换为另一个 DataFrame,而不会修改原始的 DataFrame。

  • DataFrame 转换类型包括:

    • 组合多个 DataFrames:如 Join 和 Union 操作。
    • 聚合和汇总:包括 grouping、windowing 和 rollups 操作。
    • 应用函数和内置转换:包括 filtering、sorting、splitting、sampling 和 finding unique。
    • 使用和实现内置函数、列级函数和用户自定义函数。
  • 具体的常用转换操作:

    • 过滤:使用 where() 方法根据条件筛选数据。
    • 选择:使用 select() 方法选择要保留的列。
    • 排序:使用 orderBy() 方法对数据进行排序。
    • 分组:使用 groupBy() 方法对数据进行分组。
    • 聚合:使用聚合函数(例如 count()、sum()、avg())对分组后的数据进行聚合。
  • 例子:

    filtered_df = df.where("Age < 40") \
                    .select("Age", "Gender", "Country", "state")
    grouped_df = filtered_df.groupBy("Country")
    count_df = grouped_df.count()
    count_df.show()
    
    • 每一次转换都会生成一个DataFrame。

2. 窄依赖转换
#

  • 窄依赖转换(Narrow Dependency Transformations)是指每个分区的数据只需要其本身就可以完成操作,不需要和其他分区的数据进行交互。
  • Spark 执行器上可以在它们的分区上执行,不依赖于其他的执行器和分区,最终组合这些执行器的结果得到总体的结果。
  • 窄依赖转换的执行效率高,因为不需要额外的 shuffle 操作。
  • 比如 whereselect 是窄依赖转换。

3. 宽依赖转换
#

  • 宽依赖转换(Wide Dependency Transformations)是指每个分区的数据需要和其他分区的数据进行交互才能完成操作。
  • 需要其它分区的数据才能产生正确的结果,通过简单的组合不能得到总体的结果。
  • 比如 groupBy 是一个宽依赖转换(orderByjoindistinct等都是宽依赖转换)。
  • 解决方法:
    • Shuffle & Sort Operation: 合并所有的分区,然后创建一些新的分区确保同一组的数据在同一个分区内。
    • 再使用聚合函数就可以合并多个分区的结果了(窄依赖)。
    • 我们不需要做任何事,Spark 在内部会帮我们处理好。
    • 宽依赖转换的执行效率通常低于窄依赖转换,因为需要额外的 shuffle 操作。

4. 惰性求值
#

  • 惰性求值(Lazy Evaluations)是一种函数式编程技术。
  • Spark 采用惰性求值策略,这意味着只有在 行动操作(Action Operation)执行时,Spark 才会真正对数据进行计算。行动操作包括:readwritecollectshow等。
    • collect 操作将DataFrame 作为 Python 列表返回。
  • 在执行行动操作之前,Spark 不会对数据进行任何计算,而只是将要执行的操作记录在 执行计划(Execution Plan)中。这样可以提高 Spark 的性能,因为只有在需要的时候才会对数据进行计算。
  • 在需要立即得到结果的情况下,可以使用 collect() 方法将 DataFrame 的内容收集到本地。
  • 为了提高性能,可以尽量将多个窄依赖转换组合在一起,减少 shuffle 操作的次数。
  • 可以使用 Spark UI 来查看执行计划,了解 Spark 如何执行查询。

5. 窥视 Spark 如何转换 Jobs、Stages 和 Tasks
#

  • 首先我们需要简单了解一下如何对数据分区:

    • 通过 partitioned_df = df.repartition(2) 对 DataFrame 强制性划分2个分区。
    • 有些分区不是我们主动操作的,例如 DataFrame 经过宽依赖转换后会进行 shuffle/sort,这种情况下我们并不能确定 Spark 会把 DataFrame 划分了多少个分区。
      • 可以在 spark.conf 里进行设置,添加下面的语句:spark.sql.shuffle.partitions = 2
      • spark.conf 文件的详细设置可以参考上一篇文章。
  • Spark UI 只能在程序运行时查看

    • 可以在代码末尾加上input("Press Enter to continue...")防止程序结束。
    • 运行后可以用浏览器打开localhost:4040查看。
  • 关于 jobs、stages 和 tasks 的一些特性:

    • 工作流程:Spark 程序 -> jobs -> stages -> tasks
    • 每一个 Action 都会转化为一个 job
    • 每一个 job 至少含有一个 stage 和一个 task
    • 每个 stage 至少有一个 task
    • 每个 task 是分配给 executors 的一个工作单元
  • 以下面代码为例,Spark 会把这部分转换成1个 job,然后再把这个 job 划分成3个 stages

    • 注意这部分代码中的 repartitiongroupBy 会引起 shuffle
      partitioned_df = df.repartition(2)
      count_df = partitioned_df.where("Age < 40") \
              .select("Age", "Gender", "Country", "state") \
              .groupBy("Country") \
              .count()
      logger.info(count_df.collect())
      
      • stage1:repartition -> 1 task
        • 这个 stage 里有一个任务,就是将 DataFrame 划分了两个区
      • stage2:whereselectgroupBy -> 2 tasks
        • 由于上一步有两个分区,所以这个 stage 有两个 tasks ,每个 task 都需要做 whereselectgroupBy
      • stage3:countcollect -> 2 tasks
        • 由于上一步的 groupBy 会进行 shuffle 操作,设置的 shuffle 分区数量为2,所以这个 stage 依旧会使用两个分区,因此是2个tasks

6. Spark API 介绍
#

  • RDD (Resilient Distributed Dataset): 弹性分布式数据集,是 Spark 最基本的数据抽象,提供了对大数据集的容错、高效处理。
  • Catalyst Optimizer: 在 RDD 和高级别 API 之间存在一个 Catalyst 优化器。它负责查询优化和生成高效的执行计划。
  • 高级别 API: 在 RDD 之上,Spark 引入了更高级别的 API,主要包括 Spark SQL、DataFrame API 和 Dataset API。这些 API 提供了更简洁、更高效的数据操作方式。
    • Spark SQL: 允许通过 SQL 查询操作结构化数据,并集成了 SQL 语法与 Spark 的计算引擎。
    • DataFrame API: 提供了类似于关系型数据库表的数据结构,使数据操作更加直观、简便。 (前面已经介绍过很多例子)
    • Dataset API: 提供了强类型的 API,与 RDD 类似,但具有更高的优化和更好的编译时类型安全性。要使用 Dataset API,必须使用基于 JVM 的语言,例如 Scala 或 Java。

6.1. RDD 介绍
#

  • RDD API 是 Spark 中最基础的 API,尽管大部分情况下不常用,但它提供了极大的灵活性。

  • 数据结构: RDD 是原生的数据集,不具备行列结构和 schema,而是被分解为多个分区,形成一个分布式的集合。

  • 容错性: RDD 具有高度的容错性。每个 RDD 分区都包含如何创建和处理该分区的信息。如果某个执行器 (executor) 出现故障,相关的 RDD 分区将被重新分配到其他执行器上。

  • 示例代码: 以下示例代码展示了如何创建和处理 RDD。

    • 创建 RDD:
    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SparkSession
    
    
    conf = SparkConf() \
      .setMaster("local[3]") \
      .setAppName("HelloRDD")
    
    spark = SparkSession \
      .builder \
      .config(conf=conf) \
      .getOrCreate()
    
    sc = spark.sparkContext
    
    linesRDD = sc.textFile("./data/sample.csv")  # 假设这个文件不包含标题行
    
    • 处理 RDD:
      • RDD 只提供了基本的转换,如map(), reduce(), filter(), foreach()等。
      • 但是 RDD 接受 lambda 函数,可以应用自定义代码。
    from collections import namedtuple
    
    partitionedRDD = linesRDD.repartition(2)
    closRDD = partitionedRDD.map(lambda line: line.replace('"', '').split(","))
    
    SurveyRecord = namedtuple("SurveyRecord", ["Age", "Gender", "Country", "State"])
    selectRDD = colsRDD.map(lambda cols: SurveyRecord(int(cols[1]), cols[2], cols[3], cols[4]))
    
    filteredRDD = selectRDD.filter(lambda r: r.Age < 40)
    kvRDD = filteredRDD.map(lambda r: (r.Country, 1))
    countRDD = kvRDD.reduceByKey(lambda v1, v2: v1 + v2)
    
    colsList = countRDD.collect()
    for x in colsList:
      logger.info(x)
    
  • 虽然 RDD API 提供了很好的灵活性和容错性,但在实际开发中,使用 Spark 的高级别 API 更为常见,因为它们提供了更高的抽象级别和更简洁的代码。

6.2 Spark SQL
#

Spark SQL 提供了一种将 DataFrame 转换为临时视图,并使用 SQL 查询数据的强大功能。通过这种方式,开发人员可以利用 SQL 的简洁和高效来进行复杂的数据操作和分析。以下是一个示例,展示如何使用 Spark SQL 查询 DataFrame 数据。

# 将 DataFrame 创建为临时视图
df.createOrReplaceTempView("survey_tbl")

# 使用 SQL 查询临时视图中的数据
countDF = spark.sql("""
    SELECT Country, COUNT(1) AS count 
    FROM survey_tbl 
    WHERE Age < 40 
    GROUP BY Country
""")

# 显示查询结果
countDF.show()

通过 Spark SQL,用户可以轻松地将 DataFrame 与 SQL 查询结合使用,这不仅简化了代码编写过程,还利用了 SQL 优化器(Catalyst)的优势,提升查询性能。 同时,这种方法也使得熟悉 SQL 的用户可以无缝地使用 Spark 进行大数据处理。

Spark - 这篇文章属于一个选集。
§ 6: 本文

相关文章

Spark系列 - 数据存储
·1316 字
大数据 Spark DataFrame SparkSQL 分布式数据库
本章主要讨论 pySpark 的数据存储。
Spark系列 - 数据读取
·2470 字
大数据 Spark DataFrame SparkSQL 分布式数据库
本章主要讨论 pySpark 的数据读取。
Spark系列 - 配置Spark
·1559 字
大数据 Spark 日志 Log4J 硬编码 软编码
本文将详细介绍如何在 Spark 项目中配置 Log4J 日志模块,以及配置 Spark Session。
Spark系列 - 本地环境的搭建
·544 字
大数据 Spark 环境安装
本篇文章将介绍如何在本地 Mac 环境下搭建 Spark,包括安装 JDK、配置环境变量、安装和配置 Spark 以及安装 PySpark。
Spark系列 - 初识大数据
·2952 字
大数据 Spark Hadoop 数据库
这篇文章初步介绍了大数据、Hadoop 和 Spark 这三个关键方面。本文提供了一个简要的概述,为读者进一步了解大数据处理提供了基础。
Python系列 - 动态规划
·2966 字
Python 动态规划 Python算法
1. 动态规划介绍 # 动态规划(Dynamic Programmin