`
qindongliang1922
  • 浏览: 2148440 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:116348
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:124617
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:58495
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:70385
社区版块
存档分类
最新评论

Java7之线程池ForkJoinPool

阅读更多
许多情况下,在一个程序中使用多线程是有益处的,可以大大提高程序的效率,多线程主要有以下3个优点1,资源利用率更好2,程序设计在某些情况下更简单3,程序响应更快。当然凡事有利就有弊,多线程也会使程序的开发,维护及调试更加复杂,当然如果我们能够扬长避短,在正确的场合下使用多线程,那么它将成为我们程序中开发的利器。


Java一直以来,对多线程的开发支持比较良好,特别在JDK5,6后引入的java.util.concurrent包,使用多线程的开发变的更加容易,这个包里面大部分的API都是更进一步的封装,作为开发者,我们只需熟悉它怎么使用,就能够很轻松的上手,当然如果你想完全搞懂它,那么就需要读读那些优秀的源码了。

ForkJoinPool这个类是JDK7后新增的线程池,很适合在单机多核的PC上部署多线程程序,ForkJoinPool使用的分而治之的思想,这一点与当前很火的大数据处理框架Hadoop的map/reduce思想非常类似,但是他们的使用场合却不一样,ForkJoinPool适合在一台PC多核CPU上运行,而hadoop则适合在分布式环境中进行大规模集群部署。

Fork/Join 模式有自己的适用范围。如果一个应用能被分解成多个子任务,并且组合多个子任务的结果就能够获得最终的答案,那么这个应用就适合用 Fork/Join 模式来解决.

ForkJoinPool使用的工作窃取的方式能够在最大方式上充分利用CPU的资源,一般流程是fork分解,join结合。本质是将一个任务分解成多个子任务,每个子任务用单独的线程去处理,主要几个常用方法有
fork( ForkJoinTask)异步执行一个线程
join( ForkJoinTask)等待任务完成并返回执行结果
execute( ForkJoinTask)执行不带返回值的任务
submit( ForkJoinTask)执行带返回值的任务
invoke( ForkJoinTask)执行指定的任务,等待完成,返回结果。
invokeAll(ForkJoinTask)执行指定的任务,等待完成,返回结果。
shutdown()执行此方法之后,ForkJoinPool 不再接受新的任务,但是已经提交的任务可以继续执行。如果希望立刻停止所有的任务,可以尝试 shutdownNow() 方法。
awaitTermination(int, TimeUnit.SECONDS)阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束。
compute()执行任务的具体方法
Runtime.getRuntime().availableProcessors()获取CPU个数的方法

关于上表中的ForkJoinTask是一个抽象类,代表一个可以fork与join的任务. ,它还有2个抽象子类:RecursiveAcion和RecursiveTask.其中RecursiveTask代表有泛型返回值的任务.而RecursiveAction代表没有返回值.


下面给出测试代码

package com.demo;

import java.util.concurrent.RecursiveAction;

/**
 * 
 * 继承RecursiveAction来实现可分解的任务
 * 注意无返回值
 * 
 * **/

public class PrintTask extends RecursiveAction {
 
	//每个小任务,最多只打印50个数
	private static final int threshold=50;
	//打印任务的开始
	private int start;
	//打印任务的结束
	private int end;
	
	public PrintTask() {
		// TODO Auto-generated constructor stub
	}
	
	
	
	//打印从start到end之间的任务
	public PrintTask(int start, int end) {
		super();
		this.start = start;
		this.end = end;
	}




	@Override
	protected void compute() {
		
		if(end-start<threshold){
			for(int i=start;i<end;i++){
				
				System.out.println(Thread.currentThread().getName()+"i的值:"+i);
			}
		}else{
			//当end与start之间的差大于threshold,及打印的数超过50个时,
			//将大任务分解成2个小任务
			int middle=(start+end)/2;
			PrintTask left=new PrintTask(start, middle);
			PrintTask  right=new PrintTask(middle, end);
			//并行执行两个小任务
			left.fork();
			right.fork();
			
		}
		
		
	}
	
	

}

package com.demo;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;
/**
 * 测试打印
 * 
 * */
public class Test {

	
	public static void main(String[] args)throws Exception {
		ForkJoinPool pool=new ForkJoinPool();
		//提交可分解的任务
		pool.submit(new PrintTask(0,101));
		
		//阻塞等待所有任务完成 
		pool.awaitTermination(2, TimeUnit.SECONDS);
		pool.shutdown();//关闭线程池
		
	}
}

有返回值的demo
package com.demo;

import java.util.concurrent.RecursiveTask;
/***
 * 有返回值
 * 
 * */
public class CalTask  extends RecursiveTask<Integer>{

	
	
	//将每个小任务,最多只能累加20个数
	private static final int threshold=20;
	private int arr[];
	private int start;//开始
	private int end;//
	//累加从start到end之间的数
	
	public CalTask() {
		// TODO Auto-generated constructor stub
	}
	
	
	//累加从start到end的数组元素
	public CalTask(int[] arr, int start, int end) {
		super();
		this.arr = arr;
		this.start = start;
		this.end = end;
	}



	@Override
	protected Integer compute() {
		int sum=0;
		//当end与start之间的差小于threshold,开始进行累加
		if(end-start<threshold){
			for(int i=start;i<end;i++){
				sum+=arr[i];
			}
			return sum;
			
		}else{
			
			//当end与start之间的差大于threshold,要计算的数超过20个时,
			//将大任务分解成两个小任务
			
			int middle=(start+end)/2;
			CalTask left=new CalTask(arr, start, middle);
			CalTask right=new CalTask(arr, middle, end);
			//并行执行2个小任务
			left.fork();
			right.fork();
			//把2个小任务,累加的结果合并起来
			return left.join()+right.join();
		}
		 
		 
	}
	

}


package com.demo;

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;

public class Sum {
	
	
	public static void main(String[] args)throws Exception {
		
		int []arr=new int[1100];
		Random rand=new Random();
		int total=0;
		//初始化100个数字
		for(int i=0;i<arr.length;i++){
			int tmp=rand.nextInt(20);
			//对元素赋值,并将数组元素的值添加到total总和中
			total+=(arr[i]=tmp);
			
			
		}
		System.out.println("正确的total:"+total);
		ForkJoinPool pool=new ForkJoinPool();
		//提交可分解的CalTask任务
		Future<Integer> future=pool.submit(new CalTask(arr, 0, arr.length));
		System.out.println(future.get());
		//关闭线程池
		
		pool.shutdown();
	}

}



分享到:
评论
1 楼 ksisn 2014-11-22  
楼主您好,很荣幸能拜读到您的博客,我是刚毕业的程序猿,用您这篇文章的例子进行一些测试,发现比单线程方式在时间上慢了很多倍,那么像这种ForkJoinPool 适用的场景又是什么呢,一直在拜读您的例子中,这个没弄懂,特地请教您。

相关推荐

    13、线程池ForkJoinPool工作原理分析

    线程池ForkJoinPool工作原理分析

    13、线程池ForkJoinPool实战及其工作原理分析(1).pdf

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    Java线程池ForkJoinPool实例解析

    主要介绍了Java线程池ForkJoinPool实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

    parallel-stream-fork-join-pool

    所有并行流执行都使用相同的(单例)线程池:ForkJoinPool.commonPool()。 这就是为什么在并行流中执行 IO(更常见的是阻塞调用)非常糟糕的原因:被阻塞的线程无法被 JVM 中的所有并行流使用。 为此,您必须改用 ...

    11-线程池ThreadPoolExecutor底层原理源码分析(上)-周瑜.pdf

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    12-线程池ThreadPoolExecutor底层原理源码分析(下)-周瑜.pdf

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    16、常用并发设计模式精讲(1).pdf

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    14、深入理解并发可见性、有序性、原子性与JMM内存模型(1).pdf

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    15、CPU缓存架构详解&高性能内存队列Disruptor实战(1).pdf

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    designpattern.zip

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    forkjoin.zip

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    disruptor.zip

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    jmm(1).zip

    13、线程池 ForkJoinPool实战及其工作原理分析 (1).pdf 14、深入理解井发可见性、有序性、原子性与JMM内存模型 (1).pdf 15、CPU缓存架构详解&高性能内存队列Disruptor 实战 (1).pdf 16、常用并发设计模式精讲 (1)....

    java并发工具包 java.util.concurrent中文版用户指南pdf

    19. 使用 ForkJoinPool 进行分叉和合并 20. 锁 Lock 21. 读写锁 ReadWriteLock 22. 原子性布尔 AtomicBoolean 23. 原子性整型 AtomicInteger 24. 原子性长整型 AtomicLong 25. 原子性引用型 AtomicReference

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    使用 ForkJoinPool 进行分叉和合并 20. 锁 Lock 21. 读写锁 ReadWriteLock 22. 原子性布尔 AtomicBoolean 23. 原子性整型 AtomicInteger 24. 原子性长整型 AtomicLong 25. 原子性引用型 AtomicReferenc

    java并发工具包详解

    19. 使用 ForkJoinPool 进行分叉和合并 20. 锁 Lock 21. 读写锁 ReadWriteLock 22. 原子性布尔 AtomicBoolean 23. 原子性整型 AtomicInteger 24. 原子性长整型 AtomicLong 25. 原子性引用型 AtomicReference

    一个小的java Demo , 非常适合Java初学者学习阅读.rar

    使用 ForkJoinPool 进行分叉和合并,锁 Lock,读写锁 ReadWriteLock 原子性长整型 AtomicLong,原子性引用型 AtomicReference 修改数据: 一 服务端修改数据: 一 文章知识点与官方知识档案匹配,可进一步学习相关...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版

    19. 使用 ForkJoinPool 进行分叉和合并 20. 锁 Lock 21. 读写锁 ReadWriteLock 22. 原子性布尔 AtomicBoolean 23. 原子性整型 AtomicInteger 24. 原子性长整型 AtomicLong 25. 原子性引用型 AtomicReference

    java并发包资源

    19. 使用 ForkJoinPool 进行分叉和合并 20. 锁 Lock 21. 读写锁 ReadWriteLock 22. 原子性布尔 AtomicBoolean 23. 原子性整型 AtomicInteger 24. 原子性长整型 AtomicLong 25. 原子性引用型 AtomicReference

    尚硅谷Java视频_JUC 视频教程

    ·00. 尚硅谷_JUC线程高级_源码、课件 ·1. 尚硅谷_JUC线程高级_volatile 关键字与内存可见性 ·2. 尚硅谷_JUC线程高级_原子变量与 CAS 算法 ·3. 尚硅谷_JUC线程高级_模拟 CAS... ForkJoinPool 分支合并框架-工作窃取

Global site tag (gtag.js) - Google Analytics