1. 配置 Spark 日志模块 #
在 Spark 项目中,使用适当的日志记录工具对调试和监控应用程序的运行非常重要。下面将详细介绍如何在 Spark 项目中配置 Log4J 日志记录,并在 Python 中使用自定义的 logger 包。
1.1. 创建 Log4J 配置文件 #
在 Spark 项目的主目录中,创建一个名为 log4j.properties 的文件,并将以下内容复制粘贴进去:
# Set everything to be logged to the console
log4j.rootCategory=WARN, console
# define console appender
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# application log
log4j.logger.guru.learningjournal.spark.examples=INFO, console, file
log4j.additivity.guru.learningjournal.spark.examples=false
# define rolling file appender
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=${spark.yarn.app.container.log.dir}/${logfile.name}.log
# log4j.appender.file.File=app-logs/hello-spark.log
# define following in Java System
# -Dlog4j.configuration=file:log4j.properties
# -Dlogfile.name=hello-spark
# -Dspark.yarn.app.container.log.dir=app-logs
log4j.appender.file.ImmediateFlush=true
log4j.appender.file.Append=false
log4j.appender.file.MaxFileSize=500MB
log4j.appender.file.MaxBackupIndex=2
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Recommendations from Spark template
log4j.logger.org.apache.spark.repl.Main=WARN
log4j.logger.org.spark_project.jetty=WARN
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
- 其中
log4j.appender.file.File=${spark.yarn.app.container.log.dir}/${logfile.name}.log
将在Spark里配置
1.2. 配置 Log4J 变量 #
在 Spark 的配置文件中设置 Log4J 的变量值。
- 找到
SPARK_HOME/conf
目录。SPARK_HOME
是 Spark 安装的根目录。 - 如果
conf
目录下没有spark-defaults.conf
,可以复制一份spark-defaults.conf.template
并将其文件名改为spark-defaults.conf
。 - 在 spark-defaults.conf 文件的最下面添加以下内容:
spark.driver.extraJavaOptions -Dlog4j.configuration=file:log4j.properties -Dspark.yarn.app.container.log.dir=app-logs -Dlogfile.name=hello-spark spark.jars.packages org.apache.spark:spark-avro_2.12:3.5.1
- 参考:
log4j.configuration=file:log4j.properties
是项目目录下的日志属性文件spark.yarn.app.container.log.dir=app-logs
和logfile.name=hello-spark
是给上面属性文件里的变量赋值。- 最后一行是配置 avro,用于支持 PySpark 读写 avro 格式的文件(此处和 Log4J 无关)。2.12和3.5.1分别是系统里Scala和Spark的版本号,请按实际情况填写。
1.3. 创建 Python Logger #
- 在项目主目录下,创建一个名为
lib
的包(目录) - 在
lib
目录下创建logger.py
文件,并复制以下代码:class Log4J: def __init__(self, spark): log4j = spark._jvm.org.apache.log4j root_class = "ez.takeezlearn.spark.examples" conf = spark.sparkContext.getConf() app_name = conf.get('spark.app.name') self.logger = log4j.LogManager.getLogger(root_class + "." + app_name) def warn(self, msg): self.logger.warning(msg) def info(self, msg): self.logger.info(msg) def error(self, msg): self.logger.error(msg) def debug(self, msg): self.logger.debug(msg)
1.4. 测试 logger #
在项目主目录下,创建一个主程序文件(例如 main.py
),并复制下面代码,查看运行结果:
from pyspark.sql import *
from lib.logger import Log4J
if __name__ == '__main__':
spark = SparkSession.builder\
.appName("Hello Spark")\
.master("local[3]")\
.getOrCreate()
logger = Log4J(spark)
logger.info("Starting Hello Spark")
# Your processing code
pass
logger.info("Finished Hello Spark")
# spark.stop()
运行后,可以看到日志的输出,同时在项目的目录下会生成一个 app-logs
文件夹,里面保存了日志文件。
通过以上步骤,我们已经成功地在 Spark 项目中配置了 Log4J 日志记录,并实现了 Python 中的自定义 logger。这样可以更好地管理和调试 Spark 应用程序的运行情况。
2. 配置 Spark Session #
在 Spark 中,配置 Spark Session 的方法有四种,优先级由低至高排序如下:
环境变量 -> Spark 配置文件 -> 命令行选项 -> SparkConf 对象
2.1. 环境变量 #
- 配置环境变量,如 SPARK_HOME 等。详细配置方法见 本地环境搭建。
2.2. Spark 默认配置文件 #
- 可以在
SPARK_HOME/conf/spark-defaults.conf
文件中设置一些默认配置,适用于机器上所有运行的 Spark 应用程序。详细配置方法见上文对于 Log4J 的配置。
2.3. 命令行选项 #
- 使用
spark-submit
命令行工具可以在提交 Spark 应用程序时传递配置参数。以下是一些常见的spark-submit
参数示例:
spark-submit --class org.apache.spark.examples.SparkPi \
--master local[4] \
--conf spark.executor.memory=2g \
--conf spark.eventLog.enabled=true \
/path/to/examples.jar
--class
:指定要运行的主类。--master
:指定集群管理器(如 local、yarn、mesos 等)。--conf
:设置其他配置参数,如 spark.executor.memory 和 spark.eventLog.enabled。 更多spark-submit
命令行选项的详细信息,可以参考 官方文档。
2.4. SparkConf 对象 #
-
在程序代码中通过
SparkConf
对象进行配置。这种方法在优先级上最高,覆盖其他配置方式。 -
以下是硬编码的示例:
from pyspark import SparkConf from pyspark.sql import * conf = SparkConf() conf.set("spark.app.name", "Hello Spark") conf.set("spark.master", "local[3]") spark = SparkSession.builder \ .config(conf=conf) \ .getOrCreate()
- 参考官方文档: Applications Properties
-
以下是软编码的示例:
- 假设在一个叫
PySparkLearning
的项目中配置Spark,我们需要在项目主目录下创建一个spark.conf
文件,以及在lib
里创建utils.py
文件。 - 项目结构如下:
./PySparkLearning ├── app-logs │ └── hello-spark.log ├── lib │ ├── __init__.py │ ├── logger.py │ └── utils.py ├── log4j.properties ├── main.py └── spark.conf
- 假设在一个叫
-
spark.conf
文件里编写具体的配置信息,例如:[SPARK_APP_CONFIGS] spark.app.name = Hello Spark spark.master = local[3] app.author = EZ
-
在
utils.py
文件里可以添加读取配置信息的功能,例如:import configparser from pyspark import SparkConf def get_spark_app_config(): spark_conf = SparkConf() config = configparser.ConfigParser() config.read("spark.conf") for (key, value) in config.items("SPARK_APP_CONFIGS"): spark_conf.set(key, value) return spark_conf
-
在主程序文件
main.py
中调用配置信息,参考下面的代码:from pyspark.sql import * from lib.logger import Log4J from lib.utils import get_spark_app_config if __name__ == '__main__': conf = get_spark_app_config() spark = SparkSession.builder \ .config(conf=conf) \ .getOrCreate() logger = Log4J(spark) logger.info("Starting Hello Spark") # Your processing code conf_out = spark.sparkContext.getConf() logger.info(conf_out.toDebugString()) logger.info("Finished Hello Spark") spark.stop()
通过上述方法,可以灵活地配置 Spark Session。一般情况下,环境变量和Spark 配置文件由管理人员操作,命令行选项和SparkConf 对象由开发人员进行配置。