博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ForkJoin 学习使用笔记
阅读量:5981 次
发布时间:2019-06-20

本文共 5561 字,大约阅读时间需要 18 分钟。

hot3.png

ForkJoin 学习使用笔记

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架

背景

在日常的业务需求中,经常出现的批量查询,批量写入等接口的提供,一般来说,最简单最low的方式就是写一个for循环来一次执行,但是当业务方对接口的性能要求较高时,就比较尴尬了

通常可以想到的方式是采用并发操作,首先想到可以实现的方式就是利用线程池来做

通常实现方式如下

// 1. 创建线程池ExecutorService executorService = new ThreadPoolExecutor(3, 5, 60,      TimeUnit.SECONDS,      new LinkedBlockingDeque
(10), new DefaultThreadFactory("biz-exec"), new ThreadPoolExecutor.CallerRunsPolicy());// 2. 创建执行任务List
> futureList = new ArrayList<>();for(Object arg : list) { futureList.add(executorService.submit(new Callable() { @Override public Object call() throws Exception { // xxx } }));}// 3. 结果获取for(Future f: futureList) { Object obj = f.get();}

用上面的这种方式并没有什么问题,我们接下来考虑的是如何使用ForkJoin框架来实现类似的功能

ForkJoin 基本知识

Fork: 将大任务拆分成若干个可以并发执行的小任务

Join: 合并所有小任务的执行结果

forkjoin

任务分割

ForkJoinTask : 基本任务,使用forkjoin框架必须创建的对象,提供fork,join操作,常用的两个子类

  • RecursiveAction : 无结果返回的任务
  • RecursiveTask : 有返回结果的任务

说明:

  1. fork : 让task异步执行
  2. join : 让task同步执行,可以获取返回值
  3. ForkJoinTask 在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行

结果合并

ForkJoinPool 执行 ForkJoinTask

  • 任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。
  • 当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务

三中提交方式:

  1. execute 异步,无返回结果
  2. submit 异步,有返回结果 (返回Future<T>
  3. invoke 同步,有返回结果 (会阻塞)

使用说明

结合两个场景,给出使用姿势

1. 累加

实现从 start - end 的累加求和

首先是定义一个CountTask 来实现求和

首先是确定任务分割的阀值,当 end-start 的差值大于阀值时,将任务一分为二

public class CountTask extends RecursiveTask
{ private int start; private int end; private static final int THRED_HOLD = 30; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THRED_HOLD; if (canCompute) { // 不需要拆分 for (int i = start; i <= end; i++) { sum += i; } System.out.println("thread: " + Thread.currentThread() + " start: " + start + " end: " + end); } else { int mid = (end + start) / 2; CountTask left = new CountTask(start, mid); CountTask right = new CountTask(mid + 1, end); left.fork(); right.fork(); sum = left.join() + right.join(); } return sum; }}

调用case

@Testpublic void testFork() throws ExecutionException, InterruptedException {    int start = 0;    int end = 200;    CountTask task = new CountTask(start, end);    ForkJoinPool pool = ForkJoinPool.commonPool();    Future
ans = pool.submit(task); int sum = ans.get(); System.out.println(sum);}

输出结果:

thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 51 end: 75thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 101 end: 125thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 0 end: 25thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 126 end: 150thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] start: 76 end: 100thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 151 end: 175thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] start: 26 end: 50thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] start: 176 end: 20020100

2. 排序

int 数组进行排序

同样先定义一个SortTask, 主要是为了演示ForkJoin的使用姿势,具体的排序和合并的逻辑比较简陋的实现了一下(这块不是重点)

public class SortTask extends RecursiveTask
> { private List
list; private final static int THRESHOLD = 5; public SortTask(List
list) { this.list = list; } @Override protected List
compute() { if (list.size() < THRESHOLD) { Collections.sort(list); System.out.println("thread: " + Thread.currentThread() + " sort: " + list); return list; } int mid = list.size() >> 1; SortTask l = new SortTask(list.subList(0, mid)); SortTask r = new SortTask(list.subList(mid, list.size())); l.fork(); r.fork(); List
left = l.join(); List
right = r.join(); return merge(left, right); } private List
merge(List
left, List
right) { List
result = new ArrayList<>(left.size() + right.size()); int rightIndex = 0; for (int i = 0; i < left.size(); i++) { if (rightIndex >= right.size() || left.get(i) <= right.get(rightIndex)) { result.add(left.get(i)); } else { result.add(right.get(rightIndex++)); i -= 1; } } if (rightIndex < right.size()) { result.addAll(right.subList(rightIndex, right.size())); } return result; }}

测试case和上面基本一样,我们改用 invoke 替换上面的 submit

@Testpublic void testMerge() throws ExecutionException, InterruptedException {    List
list = Arrays.asList(100, 200, 150, 123, 4512, 3414, 3123, 34, 5412, 34, 1234, 893, 213, 455, 6, 123, 23); SortTask sortTask = new SortTask(list); ForkJoinPool pool = ForkJoinPool.commonPool(); List
ans = pool.invoke(sortTask); System.out.println(ans);}

输出结果

thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [34, 3123, 3414, 4512]thread: Thread[ForkJoinPool.commonPool-worker-1,5,main] sort: [100, 123, 150, 200]thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [34, 893, 1234, 5412]thread: Thread[ForkJoinPool.commonPool-worker-0,5,main] sort: [213, 455]thread: Thread[ForkJoinPool.commonPool-worker-3,5,main] sort: [6, 23, 123][6, 23, 34, 34, 100, 123, 123, 150, 200, 213, 455, 893, 1234, 3123, 3414, 4512, 5412]

参考

其他

个人博客:

公众号获取更多:

个人信息

转载于:https://my.oschina.net/u/566591/blog/1532135

你可能感兴趣的文章
数值积分中的辛普森方法及其误差估计
查看>>
Web service (一) 原理和项目开发实战
查看>>
跑带宽度多少合适_跑步机选购跑带要多宽,你的身体早就告诉你了
查看>>
Javascript异步数据的同步处理方法
查看>>
iis6 zencart1.39 伪静态规则
查看>>
SQL Server代理(3/12):代理警报和操作员
查看>>
Linux备份ifcfg-eth0文件导致的网络故障问题
查看>>
2018年尾总结——稳中成长
查看>>
通过jsp请求Servlet来操作HBASE
查看>>
Shell编程基础
查看>>
Shell之Sed常用用法
查看>>
Centos下基于Hadoop安装Spark(分布式)
查看>>
mysql开启binlog
查看>>
设置Eclipse编码方式
查看>>
分布式系统唯一ID生成方案汇总【转】
查看>>
并查集hdu1232
查看>>
Mysql 监视工具
查看>>
Linux Namespace系列(09):利用Namespace创建一个简单可用的容器
查看>>
博客搬家了
查看>>
Python中使用ElementTree解析xml
查看>>