- 浏览: 2146730 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
前面文章介绍了不少有关Spark Streaming的offset的管理以及如何优雅的关闭Spark Streaming的流程序。
到目前为止还有几个问题:
(1)有关spark streaming集成kafka时,如果kafka新增分区, 那么spark streaming程序能不能动态识别到而不用重启?
(2)如果需要重启,那么在自己管理offset时,如何才能识别到新增的分区?
(3)spark streaming优雅关闭的策略还有那些?
首先第一个问题,如果kafka要新增分区,对于正在运行的实时流程序能否动态识别到?
经过测试,是不能识别的,我推测使用createDirectStream创建流对象一旦创建就是不可变的,也就是说创建实例那一刻的分区数量,会一直使用直到流程序结束,就算中间kafka的分区数量扩展了,流程序也是不能识别到的。所以在扩展kafka分区前,一定要先把流程序给停掉,然后扩展完成后需要再次重启流程序。
然后看第二个问题,如果是我们自己管理offset时,一定要考虑到kafka扩展分区的情况,每次启动程序前都得检测下目前保存的偏移量里面的kafka的分区个数是否小于kafka实际元数据里面实际的分区个数,正常没扩展分区的情况下两个值应该是相等的,如果值不一致,就说明是kafka分区得到扩展了,所以我们的程序需要能够兼容这种情况。
核心代码如下:
//这个topic在zk里面最新的分区数量 val lastest_partitions= ZkUtils.getPartitionsForTopics(zkClient,Seq(topic)).get(topic).get var offsets = offsetsRangesStr.split(",")//按逗号split成数组 .map(s => s.split(":"))//按冒号拆分每个分区和偏移量 .map { case Array(partitionStr, offsetStr) => (TopicAndPartition(topic, partitionStr.toInt) -> offsetStr.toLong) }//加工成最终的格式 .toMap//返回一个Map //说明有分区扩展了 if(offsets.size<lastest_partitions.size){ //得到旧的所有分区序号 val old_partitions=offsets.keys.map(p=>p.partition).toArray //通过做差集得出来多的分区数量数组 val add_partitions=lastest_partitions.diff(old_partitions) if(add_partitions.size>0){ log.warn("发现kafka新增分区:"+add_partitions.mkString(",")) add_partitions.foreach(partitionId=>{ offsets += (TopicAndPartition(topic,partitionId)->0) log.warn("新增分区id:"+partitionId+"添加完毕....") }) } }else{ log.warn("没有发现新增的kafka分区:"+lastest_partitions.mkString(",")) }
上面的代码在每次启动程序时,都会检查当前我们自己管理的offset的分区数量与zk元数据里面实际的分区数量,如果不一致就会把新增的分区id给加到TopicAndPartition里面并放入到Map对象里面,这样在启动前就会传入到createDirectStream对象中,就能兼容新增的分区了。
最后一个问题,前面的文章谈到过我们可以有两种方式来更加优雅的停止流程序,分别是通过http暴露服务,和通过HDFS做消息中转来定时扫描mark文件是否存在来触发关闭服务。
下面我们先来看下通过http暴露服务的核心代码:
/**** * 负责启动守护的jetty服务 * @param port 对外暴露的端口号 * @param ssc Stream上下文 */ def daemonHttpServer(port:Int,ssc: StreamingContext)={ val server=new Server(port) val context = new ContextHandler(); context.setContextPath( "/close" ); context.setHandler( new CloseStreamHandler(ssc) ) server.setHandler(context) server.start() } /*** 负责接受http请求来优雅的关闭流 * @param ssc Stream上下文 */ class CloseStreamHandler(ssc:StreamingContext) extends AbstractHandler { override def handle(s: String, baseRequest: Request, req: HttpServletRequest, response: HttpServletResponse): Unit ={ log.warn("开始关闭......") ssc.stop(true,true)//优雅的关闭 response.setContentType("text/html; charset=utf-8"); response.setStatus(HttpServletResponse.SC_OK); val out = response.getWriter(); out.println("close success"); baseRequest.setHandled(true); log.warn("关闭成功.....") } }然后在来看下另一种方式扫描HDFS文件的方式:
/*** * 通过一个消息文件来定时触发是否需要关闭流程序 * @param ssc StreamingContext */ def stopByMarkFile(ssc:StreamingContext):Unit= { val intervalMills = 10 * 1000 // 每隔10秒扫描一次消息是否存在 var isStop = false val hdfs_file_path = "/spark/streaming/stop" //判断消息文件是否存在,如果存在就 while (!isStop) { isStop = ssc.awaitTerminationOrTimeout(intervalMills) if (!isStop && isExistsMarkFile(hdfs_file_path)) { log.warn("2秒后开始关闭sparstreaming程序.....") Thread.sleep(2000) ssc.stop(true, true) } } } /*** * 判断是否存在mark file * @param hdfs_file_path mark文件的路径 * @return */ def isExistsMarkFile(hdfs_file_path:String):Boolean={ val conf = new Configuration() val path=new Path(hdfs_file_path) val fs =path.getFileSystem(conf); fs.exists(path) }
上面是两种方式的核心代码,最后提下触发停止流程序:
第一种需要在启动服务的机器上,执行下面封装的脚本:
## tx.log是提交spark任务后的输出log重定向的log ## &> tx.log & #!/bin/bash driver=`cat tx.log | grep ApplicationMaster | grep -Po '\d+.\d+.\d+.\d+'` echo $driver curl http://$driver:port/close/ echo "stop finish"
第二种方式,找到一个拥有HDFS客户端机器,向HDFS上写入指定的文件:
#生成文件后,10秒后程序就会自动停止 hadoop fs -touch /spark/streaming/stop #下次启动前,需要清空这个文件,否则程序启动后就会停止 hadoop fs -rm -r /spark/streaming/stop
所有代码,已经同步更新到我的github上,有兴趣的朋友可以参考这个链接:
https://github.com/qindongliang/streaming-offset-to-zk
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
发表评论
-
Scala里面的排序函数的使用
2018-01-09 20:20 2606排序方法在实际的应用场景中非常常见,Scala里面有三种排序 ... -
在Scala里面如何使用元组
2018-01-08 22:05 863元组在Scala语言中是一 ... -
Spark如何读取一些大数据集到本地机器上
2018-01-04 21:07 1630最近在使用spark处理分 ... -
使用Spark SQL的临时表解决一个小问题
2017-12-28 18:27 2405最近在使用spark处理一个业务场景时,遇到一个小问题,我在 ... -
Spark任务两个小问题笔记
2017-12-26 19:52 1610今天在用spark处理数据 ... -
Spark中foreachPartition和mapPartitions的区别
2017-12-25 21:19 3279spark的运算操作有两种类型:分别是Transformat ... -
kafka版本不一致导致的一个小问题(二)
2017-12-04 21:37 8500背景介绍: 我们公司的实时流项目现在用的spark stre ... -
谈谈如何优雅的关闭正在运行中的Spark Streaming的流程序
2017-11-30 19:20 2242前面的文章,已经简 ... -
如何管理Spark Streaming消费Kafka的偏移量(三)
2017-11-28 23:41 5160前面的文章已经介绍了在spark streaming集成kaf ... -
理解Spark的运行机制
2017-11-23 21:52 1208Spark生态系统目前已经非常成熟了,有很多类型的任务都可以使 ... -
如何管理Spark Streaming消费Kafka的偏移量(二)
2017-11-16 19:30 4704上篇文章,讨论了在spar ... -
如何管理Spark Streaming消费Kafka的偏移量(一)
2017-11-14 20:42 4028最近工作有点忙,所以更新文章频率低了点,在这里给大家说声抱 ... -
在scala中使用spark sql解决特定需求(2)
2017-07-21 16:00 2235接着上篇文章,本篇来 ... -
在scala中使用spark sql解决特定需求
2017-07-20 19:53 992spark sql一个强大之处就 ... -
Spark如何在一个SparkContext中提交多个任务
2017-07-04 19:09 6685在使用spark处理数据的时候,大多数都是提交一个job执行, ... -
如何使用scala+spark读写hbase?
2017-06-12 19:48 3406最近工作有点忙,所以文章更新频率低了点,希望大家可以谅解,好 ... -
使用ES-Hadoop插件结合spark向es插入数据
2017-05-05 17:19 5003上篇文章简单介绍了E ... -
spark sql 快速体验调试
2017-04-13 16:40 1018spark sql提供了更快的查询性能,如何能够更快的体验,开 ... -
spark on yarn 如何集成elasticsearch
2017-04-11 16:16 1507随着spark越来越流行,我们的很多组件都有可能和spark集 ... -
如何使用Spark的local模式远程读取Hadoop集群数据
2017-03-31 11:49 2963我们在windows开发机上使用spark的local模式读取 ...
相关推荐
一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、kafka偏移量管理,web后台管理,web api启动、停止spark streaming,宕机告警、自动重启等等功能支持,用户只需要关心业务代码,无需关注繁琐的...
java的sparkstreaming连接kafka的例子,kafka生产者生产消息,消费者读取消息,sparkstreaming读取kafka小区并进行存储iotdb数据库。
sparkStreaming消费数据不丢失,sparkStreaming消费数据不丢失
spark Streaming和structed streaming分析,理解整个 Spark Streaming 的模块划分和代码逻辑。
1.理解Spark Streaming的工作流程。 2.理解Spark Streaming的工作原理。 3.学会使用Spark Streaming处理流式数据。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)...
(1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn...
1.Spark Streaming整合Flume需要的安装包. 2. Spark Streaming拉取Flume数据的flume配置文件.conf 3. Flume向Spark Streaming推数据的flume配置文件.conf
基于 Flume+ Kafka+ Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码。 基于 Flume+Kafka+Spark Streaming 实现实时监控输出日志的报警系统的 Spark Streaming 程序代码,博客链接: ...
spark streaming streaming
spark Streaming的原理介绍和与storm的对比
写的非常好,早了好久才找到。SparkStreaming预研报告
Scala代码积累之spark streaming kafka 数据存入到hive源码实例,Scala代码积累之spark streaming kafka 数据存入到hive源码实例。
Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面。 Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql)。 Spark零基础思维导图(内含spark-core ,spark-streaming,...
包含kafka消息中间件的使用和Spark Streaming的示例。
spark之sparkStreaming 理解,总结了自己的理解,欢迎大家下载观看!
flume+Logstash+Kafka+Spark Streaming进行实时日志处理分析【大数据】
讲述Storm与sparkStreaming分别用法与区别,在操作流程等。
Spark核心概念简介: Spark使用maven进行打包(减少jar包大小): Spark中的(弹性分布式数据集)简称RDD: ...SparkStreaming中的正常操作(每读2秒就计算一次): Spark中的local[2]: Spark中的处理流程图像:
06Spark Streaming原理和实践
Spark Streaming Programming Guide 翻译+个人学习笔记整理