`
qindongliang1922
  • 浏览: 2146581 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:116302
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:124568
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:58441
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:70335
社区版块
存档分类
最新评论

Spark如何读取一些大数据集到本地机器上

阅读更多



最近在使用spark处理分析一些公司的埋点数据,埋点数据是json格式,现在要解析json取特定字段的数据,做一些统计分析,所以有时候需要把数据从集群上拉到driver节点做处理,这里面经常出现的一个问题就是,拉取结果集过大,而驱动节点内存不足,经常导致OOM,也就是我们常见的异常:
````
 java.lang.OutOfMemoryError: Java heap space 
````


这种写法的代码一般如下:

````
//加载HDFS数据
 val rdd=sc.textFile("/data/logs/*")
 
 //在驱动程序获取结果集
 val datas=ArrayBuffer[String]()
 
 //把所有数据,拉倒驱动端操作
 rdd.collect.foreach(line=>{
 
   datas += line.split('#')(1)  //得到某个字段
 
 })
 
 sc.stop()
````


上面的这种写法,基本原理就是一次性把所有分区的数据,全部读取到driver节点上,然后开始做处理,所以数据量大的时候,经常会出现内存溢出情况。


(问题一)如何避免这种情况?

分而治之,每次只拉取一个分区的数据到驱动节点上,处理完之后,再处理下一个分数据的数据。

(问题二)如果单个分区的数据已经大到内存装不下怎么办?


给数据集增加更多的分区,让大分区变成多个小分区。


(问题三)如果结果集数据大于内存的大小怎么办?

要么增加驱动节点的内存,要么给每个分区的数据都持久化本地文件上,不再内存中维护



下面来看下关键问题,如何修改spark的rdd分区数量?


我们知道在spark里面RDD是数据源的抽象模型,RDD里面实际上是把一份大数据源切分成了多个分区数据,然后来并行处理这份大数据集。


默认情况下如果Spark从HDFS上加载数据,默认分区个数是按照HDFS的block size来切分的,当然我们在加载的时候可以指定的分区个数。
````
textFile(path,partitionNums)//第二个参数可以指定分区个数
````


如果在加载时不指定分区个数,spark里面还提供了两个函数来进行重分区:

````
(1)def coalesce(numPartitions: Int, shuffle: Boolean = false):RDD[T]
(2)def repartition(numPartitions: Int):RDD[T]
````


接着我们来看下coalesce函数和repartition函数的区别:


通过查看源码得知repartition函数内部实际上是调用了coalesce函数第二个参数等于true时的封装。所以我们重点来关注下coalesce函数即可:


coalesce的第一个参数是修改后的分区个数

coalesce的第二个参数是控制是否需要shuffle


举一个例子:

当前我们RDD的分区个数是100:

(1)如果要变成10,应该使用
````
rdd.coalesce(10,false)
````
(2)如果要变成300,应该使用
````
rdd.coalesce(300,true)
````
(3)如果要变成1,应该使用
````
rdd.coalesce(1,true)
````


这里解释一下:

分区数从多变少,一般是不需要开启shuffle的,这样性能最高,因为不需要跨网络混洗数据,当然你也可以开启shuffle在特定场景下,如分区数据极其不均衡。但建议一般不要使用。


分区数从少变多,必须开启shuffle,如果不开启那么分区数据是不会改变的,由少变多必须得重新混洗数据才能变多,这里需要注意一点,如果数据量特别少,那么会有一些分区的数据是空。


最后的例子是一种极端场景,如果从多变成1,不开启shuffle,那么可能就个别节点计算压力特别大,集群资源不能充分利用,所以有必要开启shuffle,加速合并计算的流程。



明白了如何改变rdd的分区个数之后,我们就可以文章开头遇到的问题结合起来,拉取大量数据到驱动节点上,如果整体数据集太大,我们就可以增加分区个数,循环拉取,但这里面需要根据具体的场景来设置分区个数,因为分区个数越多,在spark里面生成的task数目就越多,task数目太多也会影响实际的拉取效率,在本案例中,从hdfs上读取的数据默认是144个分区,大约1G多点数据,没有修改分区个数的情况下处理时间大约10分钟,在调整分区个数为10的情况下,拉取时间大约在1-2分钟之间,所以要根据实际情况进行调整。


文章开始前的代码优化后的如下:
````
  def pt_convert( idx:Int,ds:Iterator[String] ,seq:Int):Iterator[String]={
    if(seq==idx) ds else Iterator()
  }
------------------------------
//加载HDFS数据
 val rdd=sc.textFile("/data/logs/*")
 
 //在驱动程序获取结果集
 val datas=ArrayBuffer[String]()
 //重分区并合理优化分区个数
 val new_rdd=rdd.coalesce(10)
 //得到所有的分区信息
 val parts= new_rdd.partitions
 //循环处理每个分区的数据,避免导致OOM
    for(p<-parts){
      //获取分区号
      val idx=p.index
    //第二个参数为true,避免重新shuffle,保留原始分区数据    
    val parRdd=new_rdd.mapPartitionsWithIndex[String](pt_convert(_,_,idx),true)
    //读取结果数据
    val data=parRdd.collect()
    //循环处理数据
    for(line<-data){
    datas += line.split('#')(1)  //得到某个字段
    }
    
      
      }
 

````




最后在看下,spark任务的提交命令:
````
spark-submit  --class  SparkHdfsDataAnalysis 
--conf spark.driver.maxResultSize=1g  
--master yarn  
--executor-cores 5   
--driver-memory 2g  
--executor-memory 3g 
--num-executors 10    
--jars  $jars     spark-analysis.jar  $1 $2
````


这里面主要关注参数:
````
spark.driver.maxResultSize=1g  
driver-memory 2g 
````


单次拉取数据结果集的最大字节数,以及驱动节点的内存,如果在进行大结果集下拉时,需要特别注意下这两个参数的设置。





参考文档:

https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD
https://spark.apache.org/docs/latest/configuration.html

https://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine

有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
0
0
分享到:
评论

相关推荐

    毕业设计:基于spark的外卖大数据平台分析系统.zip

    项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 ...

    基于spark的地铁大数据客流分析系统.zip

    毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 ...

    基于spark的外卖大数据平台分析系统.zip

    毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 ...

    Hadoop Spark大数据巨量分析与机器学习整合开发实战 ,林大贵

    Hadoop Spark大数据巨量分析与机器学习整合开发实战 ,林大贵 扫描版

    基于spark的大数据过滤引擎推荐系统.zip

    项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 ...

    基于spark的外卖大数据平台分析系统源码.zip

    基于spark的外卖大数据平台分析系统源码.zip基于spark的外卖大数据平台分析系统源码.zip基于spark的外卖大数据平台分析系统源码.zip基于spark的外卖大数据平台分析系统源码.zip基于spark的外卖大数据平台分析系统...

    基于Spark框架的大数据局部频繁项集挖掘算法设计.pdf

    基于Spark框架的大数据局部频繁项集挖掘算法设计.pdf

    基于spark的大数据论文资料

    本资料是集合20篇知网被引最高的基于spark的大数据论文,包括大数据Spark技术研究_刘峰波、大数据下基于Spark的电商实时推荐系统的设计与实现_岑凯伦、基于Spark的Apriori算法的改进_牛海玲、基于Spark的大数据混合...

    基于Spark新闻网大数据实时分析设计与实现

    最后终稿的毕业论文形式,而这一版查重率为3.8% 其中引用率还占2.01%,复写率只有1.79。里面有项目运行指令图片、架构设计图、数据库图、数据库设计表等内容,让你直接下载参考即毕业。

    大数据相关面试题Spark,Kakfa等

    大数据Spark面试题 大数据Kakfa面试题 大数据Spark面试题 大数据Kakfa面试题 大数据Spark面试题 大数据Kakfa面试题 大数据Spark面试题 大数据Kakfa面试题 大数据Spark面试题 大数据Kakfa面试题 大数据Spark面试题 ...

    基于Spark框架的new网大数据实时分析可视化系统项目.zip

    毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 ...

    基于Spark2.2的new网大数据实时分析系统设计与实现.zip

    毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 ...

    基于spark+drools+kafka+redis的大数据实时风控系统.zip

    毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 ...

    基于Flask+Spark+ALS+MovieLens数据集的电影智能推荐系统.zip

    项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 ...

    基于Django Restframework的异常检测系统,分析服务为Spark SQL和Spark Mllib.zip

    项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 ...

    PySpark 机器学习、自然语言处理与推荐系统配套代码+数据集.zip

    项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 ...

    基于Spark机器学习的电商推荐系统设计与实现.zip

    毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 源码 毕业设计 课程设计 项目开发 系统开发 Spark 机器学习 大数据 算法 ...

    基于spark的电商用户行为分析系统.zip

    项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 ...

    基于Spark+hive的交通智能研判系统.zip

    项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 项目开发 系统设计 Spark 机器学习 大数据 算法 源码 ...

Global site tag (gtag.js) - Google Analytics