- 浏览: 2149784 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
用过sparkstreaming的人都知道,当使用sparkstreaming on yarn模式的时候,如果我们想查看系统运行的log,是没法直接看的,就算能看也只是一部分。
这里的log分:
(1)spark本身运行的log
(2)代码里面业务产生的log
spark on yarn模式,如果你的hadoop集群有100台,那么意味着你的sparkstreaming的log有可能会随机分布在100台中,你想查看log必须登录上每台机器上,一个个查看,如果通过Hadoop的8088页面查看,你也得打开可能几十个页面才能看到所有的log,那么问题来了?
能不能将这个job运行所有的log统一收集到某一个目录里面呢? 如果收集到一起的话排查log就非常方便了。
答案是很遗憾,在sparkstreaming里面没法做到,因为sparkstreaming程序永远不停机,就算你开启hadoop的log聚合也没用,只有当sparkstreaming程序停掉,hadoop的log聚合才能把所有的log收集到一个目录里面,所以其他的非sparkstreaming程序,比如MR,Spark 运行完后,如果开启log聚合,hadoop会负责把运行在各个节点上的log给统一收集到HDFS上,这样的话我们查看log就非常方便了。
现在的问题是sparkstreaming不能停机,那么还能集中收集log到指定的地方吗?答案是可以的,我们使用log4j收集日志然后异步发送至kafka里面,最后再通过logstash收集kafka里面的日志进入es即可,这样一条龙服务打通之后,出现任何异常都可以非常快和方便的在es中排查问题,效率大大提升。至于使用logstash从kafka收集到es里面,不是本文的重点,有兴趣的参考散仙前面的文章:http://qindongliang.iteye.com/blog/2278642。
下面会介绍下如何使用:
streaming项目中的log4j使用的是apache log4j
<dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
sparkstreaming项目可以单独提交某个job的log4j文件,这样就能定制每个job的log输出格式,如果提交的时候不提交log4j文件,那么默认用的是spark安装目录下面的log4j文件。
看下我们log4j文件的内容:
log4j.rootLogger=WARN,console,kafka #log4j.logger.com.demo.kafka=DEBUG,kafka # appender kafka log4j.appender.kafka=kafka.producer.KafkaLog4jAppender log4j.appender.kafka.topic=kp_diag_log # multiple brokers are separated by comma ",". log4j.appender.kafka.brokerList=192.168.201.6:9092,192.168.201.7:9092,192.168.201.8:9092 log4j.appender.kafka.compressionType=none log4j.appender.kafka.syncSend=false log4j.appender.kafka.layout=org.apache.log4j.PatternLayout #log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n log4j.appender.kafka.layout.ConversionPattern=[%d] [%p] [%t] %m%n # appender console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=[%d] [%p] [%t] %m%n #log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
最后看下提交脚本:
jars=`echo /home/spark/x_spark_job/streaming_lib/*jar | sed 's/ /,/g'` echo $jars #nohup /opt/bigdata/spark/bin/spark-submit --class com.bigdata.xuele.streaming.SparkStreamingKmd --master yarn --deploy-mode cluster --executor-cores 3 --driver-memory 4g --executor-memory 4g --num-executors 10 --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=logback.xml" --jars $jars kpdiag-stream-1.0.0-SNAPSHOT.jar &> streaming.log & nohup /opt/bigdata/spark/bin/spark-submit --class com.bigdata.xuele.streaming.SparkStreamingKmd --master yarn --deploy-mode cluster \ --files "/home/spark/x_spark_job/log4j.properties" \ --executor-cores 3 --driver-memory 3g --executor-memory 3g --num-executors 12 --jars $jars \ --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties" \ --driver-class-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ --driver-library-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ --conf spark.executor.extraClassPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ --conf spark.executor.extraLibraryPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ kpdiag-stream-1.0.0-SNAPSHOT.jar &> kp.log &
注意上面提交脚本中,/opt/bigdata/jars/spark/这个路径引用的jar包,必须在每台hadoop机器上都要存在,sparkstreaming运行过程中,会从本地加载jar包,此外log4j.properties文件以及参数里面--jars 后面的依赖jar 可以在提交机器上放一份即可,不需要每台机器上都存放。
提交任务后,在kafka的节点上执行消费者命令就能看到对应的log输出:
执行命令:
kafka-console-consumer --zookeeper 192.168.201.5:2181 --topic kp_diag_log
收集到的log内容如下:
[2017-01-21 16:37:03,154] [WARN] [Driver] Support for Java 7 is deprecated as of Spark 2.0.0 [2017-01-21 16:37:19,714] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,740] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,842] [WARN] [Executor task launch worker-0] 题目id:b07e88feff464659ab5a351bf1e68ee0在redis不存在
至此,我们的log就统一收集成功了,后续我们可以把log从kafka导入到es中,就可以任意分析和查询了。
这里需要注意一点,sparkstreaming运行时候,系统本身也有大量的log,如果把这个系统log也收集到kafka里面本身的量是非常大的,而且好多信息不重要,其实
我们只需要关注业务重点log即可,主要是WARN+ERROR级别的,调试的时候可以把info级别打开,代码里重点关注的log都放在warn级别,异常什么的放在ERROR即可
这样排查问题时候也容易而且了避免了大量log的产生从应用本身性能的影响。
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
发表评论
-
Scala里面的排序函数的使用
2018-01-09 20:20 2611排序方法在实际的应用场景中非常常见,Scala里面有三种排序 ... -
在Scala里面如何使用元组
2018-01-08 22:05 868元组在Scala语言中是一 ... -
Spark如何读取一些大数据集到本地机器上
2018-01-04 21:07 1636最近在使用spark处理分 ... -
使用Spark SQL的临时表解决一个小问题
2017-12-28 18:27 2410最近在使用spark处理一个业务场景时,遇到一个小问题,我在 ... -
Spark任务两个小问题笔记
2017-12-26 19:52 1614今天在用spark处理数据 ... -
Spark中foreachPartition和mapPartitions的区别
2017-12-25 21:19 3284spark的运算操作有两种类型:分别是Transformat ... -
Spark Streaming优雅的关闭策略优化
2017-12-07 19:26 4103前面文章介绍了不少有关Spark Streaming的off ... -
kafka版本不一致导致的一个小问题(二)
2017-12-04 21:37 8508背景介绍: 我们公司的实时流项目现在用的spark stre ... -
谈谈如何优雅的关闭正在运行中的Spark Streaming的流程序
2017-11-30 19:20 2245前面的文章,已经简 ... -
如何管理Spark Streaming消费Kafka的偏移量(三)
2017-11-28 23:41 5165前面的文章已经介绍了在spark streaming集成kaf ... -
理解Spark的运行机制
2017-11-23 21:52 1215Spark生态系统目前已经非常成熟了,有很多类型的任务都可以使 ... -
如何管理Spark Streaming消费Kafka的偏移量(二)
2017-11-16 19:30 4706上篇文章,讨论了在spar ... -
如何管理Spark Streaming消费Kafka的偏移量(一)
2017-11-14 20:42 4034最近工作有点忙,所以更新文章频率低了点,在这里给大家说声抱 ... -
在scala中使用spark sql解决特定需求(2)
2017-07-21 16:00 2239接着上篇文章,本篇来 ... -
在scala中使用spark sql解决特定需求
2017-07-20 19:53 997spark sql一个强大之处就 ... -
Spark如何在一个SparkContext中提交多个任务
2017-07-04 19:09 6690在使用spark处理数据的时候,大多数都是提交一个job执行, ... -
如何使用scala+spark读写hbase?
2017-06-12 19:48 3411最近工作有点忙,所以文章更新频率低了点,希望大家可以谅解,好 ... -
使用ES-Hadoop插件结合spark向es插入数据
2017-05-05 17:19 5010上篇文章简单介绍了E ... -
spark sql 快速体验调试
2017-04-13 16:40 1022spark sql提供了更快的查询性能,如何能够更快的体验,开 ... -
spark on yarn 如何集成elasticsearch
2017-04-11 16:16 1513随着spark越来越流行,我们的很多组件都有可能和spark集 ...
相关推荐
大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK.......
这是一本立足于企业真实的商用项目来讲解如何高效从事大数据实践的著作。技术层面,从全栈的角度系统梳理和详尽讲解了大数据的核心技术,包括Spark、Druid、Flume、Kafka等,让我们在纷繁复杂的技术中少走弯路......
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
kafka-sparkstreaming-cassandra, 用于 Kafka Spark流的Docker 容器 用于 Kafka Spark流的Docker 容器这里Dockerfile为实验 Kafka 。Spark流( PySpark ) 和Cassandra设置了完整的流环境。 安装Kafka 0.10.2.1用于 ...
spark3.0.0版本对接kafka数据源需要的jar包,最新的版本导致maven的阿里云仓库不能直接下载下来,所以需要手动导入jar包进行操作,有需要的朋友可以免费下载
本科毕业设计项目,基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统 本科毕业设计项目,基于spark streaming+flume+kafka+hbase的...
spakr streaming的kafka依赖
基于Flink+ClickHouse构建的分析平台,涉及 Flink1.9.0 、ClickHouse、Hadoop、Hbase、Kafka、Hive、Jmeter、Docker 、HDFS、MapReduce 、Zookeeper 等技术
基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码。 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: ...
hadoop,spark,hbase,zookeeper,kafka配置文件。 例如: <?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Licensed under the ...
* Spark:使用spark stream功能,实时分析消息系统中的数据,完成计算分析工作。 * Hbase:做为后端存储,存储spark计算结构,供其他系统进行调用 ## 环境部署 ### 软件版本 * hadoop 版本 : Hadoop相关...
扩展logback将日志输出到Kafka实例源码,详情请参见博文:http://blog.csdn.net/l1028386804/article/details/79135948
基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip 基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip ...
使用slf4j配置kafkaAppender写入日志到kafka列队 支持日志解析+过滤等扩展接口
kafka与streaming配置与开发文档001. kafka版本为kafka_2.10-0.8.2.0 spark版本为1.3.0
Flink实时读取Kafka数据批量聚合(定时/按数量)写入Mysql源码+kafka安装包+zookeeper安装包
基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip基于spark streaming+flume+kafka+hbase的实时日志处理分析系统源码(分控制台版本和Web UI可视化版本).zip...
1、资源内容:实时风控系统,基于spark-streaming、drools、kafka、redis+源代码+文档说明 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便更改、代码编程思路清晰、注释明细,都经过测试运行...
Spark Streaming实时解析flume和kafka传来的josn数据写入mysql 注意,以下文件不提供 配置c3p0-config.xml链接,链接数据库 配置log4j.properties、my.properties 另,还需将您的spark和hadoop安装文件下的core-site...
基于kafka和spark streaming和hbase的日志统计分析系统.zip基于kafka和spark streaming和hbase的日志统计分析系统.zip基于kafka和spark streaming和hbase的日志统计分析系统.zip基于kafka和spark streaming和hbase的...