如何收集SparkSteaming运行日志实时进入kafka中

缺乏、安全感 2022-04-11 12:30 163阅读 0赞

用过sparkstreaming的人都知道,当使用sparkstreaming on yarn模式的时候,如果我们想查看系统运行的log,是没法直接看的,就算能看也只是一部分。

这里的log分:

(1)spark本身运行的log

(2)代码里面业务产生的log

spark on yarn模式,如果你的hadoop集群有100台,那么意味着你的sparkstreaming的log有可能会随机分布在100台中,你想查看log必须登录上每台机器上,一个个查看,如果通过Hadoop的8088页面查看,你也得打开可能几十个页面才能看到所有的log,那么问题来了?

能不能将这个job运行所有的log统一收集到某一个目录里面呢? 如果收集到一起的话排查log就非常方便了。

答案是很遗憾,在sparkstreaming里面没法做到,因为sparkstreaming程序永远不停机,就算你开启hadoop的log聚合也没用,只有当sparkstreaming程序停掉,hadoop的log聚合才能把所有的log收集到一个目录里面,所以其他的非sparkstreaming程序,比如MR,Spark 运行完后,如果开启log聚合,hadoop会负责把运行在各个节点上的log给统一收集到HDFS上,这样的话我们查看log就非常方便了。

现在的问题是sparkstreaming不能停机,那么还能集中收集log到指定的地方吗?答案是可以的,我们使用log4j收集日志然后异步发送至kafka里面,最后再通过logstash收集kafka里面的日志进入es即可,这样一条龙服务打通之后,出现任何异常都可以非常快和方便的在es中排查问题,效率大大提升。至于使用logstash从kafka收集到es里面,不是本文的重点,有兴趣的参考散仙前面的文章:http://qindongliang.iteye.com/blog/2278642。

下面会介绍下如何使用:

streaming项目中的log4j使用的是apache log4j

  1. <dependency>
  2. <groupId>log4j</groupId>
  3. <artifactId>log4j</artifactId>
  4. <version>1.2.17</version>
  5. </dependency>

sparkstreaming项目可以单独提交某个job的log4j文件,这样就能定制每个job的log输出格式,如果提交的时候不提交log4j文件,那么默认用的是spark安装目录下面的log4j文件。
看下我们log4j文件的内容:

  1. log4j.rootLogger=WARN,console,kafka
  2. #log4j.logger.com.demo.kafka=DEBUG,kafka
  3. # appender kafka
  4. log4j.appender.kafka=kafka.producer.KafkaLog4jAppender
  5. log4j.appender.kafka.topic=kp_diag_log
  6. # multiple brokers are separated by comma ",".
  7. log4j.appender.kafka.brokerList=192.168.201.6:9092,192.168.201.7:9092,192.168.201.8:9092
  8. log4j.appender.kafka.compressionType=none
  9. log4j.appender.kafka.syncSend=false
  10. log4j.appender.kafka.layout=org.apache.log4j.PatternLayout
  11. #log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
  12. log4j.appender.kafka.layout.ConversionPattern=[%d] [%p] [%t] %m%n
  13. # appender console
  14. log4j.appender.console=org.apache.log4j.ConsoleAppender
  15. log4j.appender.console.target=System.out
  16. log4j.appender.console.layout=org.apache.log4j.PatternLayout
  17. log4j.appender.console.layout.ConversionPattern=[%d] [%p] [%t] %m%n
  18. #log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n

最后看下提交脚本:

  1. jars=`echo /home/spark/x_spark_job/streaming_lib/*jar | sed 's/ /,/g'`
  2. echo $jars
  3. #nohup /opt/bigdata/spark/bin/spark-submit --class com.bigdata.xuele.streaming.SparkStreamingKmd --master yarn --deploy-mode cluster --executor-cores 3 --driver-memory 4g --executor-memory 4g --num-executors 10 --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=logback.xml" --jars $jars kpdiag-stream-1.0.0-SNAPSHOT.jar &> streaming.log &
  4. nohup /opt/bigdata/spark/bin/spark-submit --class com.bigdata.xuele.streaming.SparkStreamingKmd --master yarn --deploy-mode cluster \
  5. --files "/home/spark/x_spark_job/log4j.properties" \
  6. --executor-cores 3 --driver-memory 3g --executor-memory 3g --num-executors 12 --jars $jars \
  7. --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties" \
  8. --driver-class-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \
  9. --driver-library-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \
  10. --conf spark.executor.extraClassPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \
  11. --conf spark.executor.extraLibraryPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \
  12. kpdiag-stream-1.0.0-SNAPSHOT.jar &> kp.log &

注意上面提交脚本中,/opt/bigdata/jars/spark/这个路径引用的jar包,必须在每台hadoop机器上都要存在,sparkstreaming运行过程中,会从本地加载jar包,此外log4j.properties文件以及参数里面—jars 后面的依赖jar 可以在提交机器上放一份即可,不需要每台机器上都存放。

提交任务后,在kafka的节点上执行消费者命令就能看到对应的log输出:
执行命令:

  1. kafka-console-consumer --zookeeper 192.168.201.5:2181 --topic kp_diag_log

收集到的log内容如下:

  1. [2017-01-21 16:37:03,154] [WARN] [Driver] Support for Java 7 is deprecated as of Spark 2.0.0
  2. [2017-01-21 16:37:19,714] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0
  3. [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0
  4. [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0
  5. [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0
  6. [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0
  7. [2017-01-21 16:37:19,740] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0
  8. [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0
  9. [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0
  10. [2017-01-21 16:37:19,842] [WARN] [Executor task launch worker-0] 题目idb07e88feff464659ab5a351bf1e68ee0redis不存在

至此,我们的log就统一收集成功了,后续我们可以把log从kafka导入到es中,就可以任意分析和查询了。

这里需要注意一点,sparkstreaming运行时候,系统本身也有大量的log,如果把这个系统log也收集到kafka里面本身的量是非常大的,而且好多信息不重要,其实
我们只需要关注业务重点log即可,主要是WARN+ERROR级别的,调试的时候可以把info级别打开,代码里重点关注的log都放在warn级别,异常什么的放在ERROR即可
这样排查问题时候也容易而且了避免了大量log的产生从应用本身性能的影响。

[b][color=green][size=large]
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
[/size][/color][/b]
[img]http://dl2.iteye.com/upload/attachment/0104/9948/3214000f-5633-3c17-a3d7-83ebda9aebff.jpg\[/img\]

发表评论

表情:
评论列表 (有 0 条评论,163人围观)

还没有评论,来说两句吧...

相关阅读