在使用spark处理数据的时候,大多数都是提交一个job执行,然后job内部会根据具体的任务,生成task任务,运行在多个进程中,比如读取的HDFS文件的数据,spark会加载所有的数据,然后根据block个数生成task数目,多个task运行中不同的进程中,是并行的,如果在同一个进程中一个JVM里面有多个task,那么多个task也可以并行,这是常见的使用方式。
考虑下面一种场景,在HDFS上某个目录下面有10个文件,我想要同时并行的去统计每个文件的数量,应该怎么做? 其实spark是支持在一个spark context中可以通过多线程同时提交多个任务运行,然后spark context接到这所有的任务之后,通过中央调度,在来分配执行各个task,最终任务完成程序退出。
下面就来看下如何使用多线程提交任务,可以直接使用new Thread来创建线程提交,但是不建议这么做,推荐的做法是通过Executors线程池来异步管理线程,尤其是在提交的任务比较多的时候用这个会更加方便。
核心代码如下:
def main(args: Array[String]): Unit = {
val sparkConf=new SparkConf()
//实例化spark context
val sc=new SparkContext(sparkConf)
sparkConf.setAppName("multi task submit ")
//保存任务返回值
val list=new util.ArrayList[Future[String]]()
//并行任务读取的path
val task_paths=new util.ArrayList[String]()
task_paths.add("/tmp/data/path1/")
task_paths.add("/tmp/data/path2/")
task_paths.add("/tmp/data/path3/")
//线程数等于path的数量
val nums_threads=task_paths.size()
//构建线程池
val executors=Executors.newFixedThreadPool(nums_threads)
for(i<-0 until nums_threads){
val task= executors.submit(new Callable[String] {
override def call(): String ={
val count=sc.textFile(task_paths.get(i)).count()//获取统计文件数量
return task_paths.get(i)+" 文件数量: "+count
}
})
list.add(task)//添加集合里面
}
//遍历获取结果
list.asScala.foreach(result=>{
log.info(result.get())
})
//停止spark
sc.stop()
}
可以看到使用scala写的代码比较精简,这样就完成了一个并行task提交的spark任务,最后我们打包完毕后,上传到linux上进行提交,命令如下:
/opt/bigdata/spark/bin/spark-submit
--class MultiTaskSubmit
--master yarn
--deploy-mode cluster
--executor-cores 3
--driver-memory 1g
--executor-memory 1g
--num-executors 10
--jars $jars task.jar
最后需要注意一点,在线程里面调用的方法如果包含一些全局加载的属性,最好放在线程的成员变量里面进行初始化,否则多个线程去更改全局属性,有可能会造成一些未知的问题。
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
分享到:
相关推荐
windows中使用yarn-cluster模式提交spark任务,百度找不着的啦,看我这里。另外spark的版本要使用正确哦 更简单的方式参考: https://blog.csdn.net/u013314600/article/details/96313579
java提交spark任务到yarn平台的配置讲解共9页.pdf.zip
spark-2.0.1集群安装及编写例子提交任务,包括集群安装包及例子代码加上安装文档, spark-2.0.1集群安装及编写例子提交任务,包括集群安装包及例子代码加上安装文档
包括如何在多种机器上安装Spark,如何配置一个Spark集群,如何在交互模式下运行第一个Spark作业,如何在Spark集群上构建一个生产级的脱机/独立作业,如何与Spark集群建立连接和使用SparkContext,如何创建和保存RDD...
包括如何在多种机器上安装Spark,如何配置一个Spark集群,如何在交互模式下运行第一个Spark作业,如何在Spark集群上构建一个生产级的脱机/独立作业,如何与Spark集群建立连接和使用SparkContext,如何创建和保存RDD...
hdp spark
Spark重新启动的脚本,Spark作业提交的脚本,Spark作业提交中SparkContext,Spark中SparkContext,执行器进程启动的流程和结合简单的WordCount程序对于RDD执行流程进行剖析以及进行阶段划分分析和任务提交,最后也...
本书系统讲解了Spark的应用方法,包括如下内容:第1章介绍如何在多种机器上安装Spark,以及如何配置一个Spark集群。第2章介绍如何在交互模式下运行第一个Spark作业。第3章介绍如何在Spark集群上构建一个生产级的脱机...
包括如何在多种机器上安装Spark,如何配置一个Spark集群,如何在交互模式下运行第一个Spark作业,如何在Spark集群上构建一个生产级的脱机\独立作业,如何与Spark集群建立连接和使用SparkContext,如何创建和保存RDD...
对开发者来说,本地IDEA调试Flink、Spark任务不涉及对象的序列化及反序列化,任务在本地调试通过后,执行在分布式环境下也可能会出错。 而将任务提交到集群进行调试还要走那些繁琐的流程太影响效率了。 因此,为方便...
web管理spark任务。scala代码编写.可视化。web管理spark任务。scala代码编写.可视化。
这个是Spark的App运行图,它通过一个Driver来和集群通信,集群负责作业的分配。今天我要讲的是如何创建这个Driver Program的过程。我们先看一下用SparkSubmit提交的方法吧,下面是从官方上面摘抄的内容。这个是提交...
Transformations:转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录...
2、SparkEnv SparkEnv可以说是Context中非常重要的类,它维护着Spark的执行环境,包含有:serializer、RpcEnv、bloc
hue中对spark任务的支持,是靠oozie支撑的。文档中提供了如何在hue中提交spark作业的操作步骤。
Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询...
一、实验目的 1. 理解Spark编程思想;...(8)可以调用SparkContext的parallelize方法,在Driver中一个已经存在的数组上创建RDD。 (9)可以调用SparkContext的parallelize方法,在Drive (二)spark运行wordcount程序
例子中定义了多个List数据集合,包括用户信息,订单信息,用户订单信息,将List对象生成DataFrame,使用SparkSQL查询将多个DataFrame合成一个DataFrame,使用Scala语言编写。