新聞中心
?大家好,我是樹哥。

本文將從一個(gè)簡單的例子出發(fā),與大家解釋為啥要有 ForkJoinPool 的存在。接著向大家介紹 ForkJoinPool 的基本信息及使用,最后講解 ForkJoinPool 的基本原理。
誕生原因
對于線程池來說,我們經(jīng)常使用的是 ThreadPoolExecutor,可以用來提升任務(wù)處理效率。一般情況下,我們使用 ThreadPoolExecutor 的時(shí)候,各個(gè)任務(wù)之間都是沒有聯(lián)系的。但在某些特殊情況下,我們處理的任務(wù)之間是有聯(lián)系的,例如經(jīng)典的 Fibonacci 算法就是其中一種情況。
對于 Fibonacci 數(shù)列來說,我們知道 F (N) = F (N-1) + F (N-2)。當(dāng)前數(shù)值的結(jié)果,都依賴后面幾個(gè)數(shù)值的結(jié)果。這時(shí)候如果用 ThreadPoolExecutor 貌似就無法解決問題了。雖然我們可以單線程的遞歸算法,則其計(jì)算速度較慢,并且無法進(jìn)行并行計(jì)算,無法發(fā)揮 CPU 多核的優(yōu)勢。
ForkJoinPool 就是設(shè)計(jì)用來解決父子任務(wù)有依賴的并行計(jì)算問題的。 類似于快速排序、二分查找、集合運(yùn)算等有父子依賴的并行計(jì)算問題,都可以用 ForkJoinPool 來解決。對于 Fibonacci 數(shù)列問題,如果用 ForkJoinPool 來實(shí)現(xiàn),其實(shí)現(xiàn)代碼為:
@Slf4j
public class ForkJoinDemo {
// 1. 運(yùn)行入口
public static void main(String[] args) {
int n = 20;
// 為了追蹤子線程名稱,需要重寫 ForkJoinWorkerThreadFactory 的方法
final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("my-thread" + worker.getPoolIndex());
return worker;
};
//創(chuàng)建分治任務(wù)線程池,可以追蹤到線程名稱
ForkJoinPool forkJoinPool = new ForkJoinPool(4, factory, null, false);
// 快速創(chuàng)建 ForkJoinPool 方法
// ForkJoinPool forkJoinPool = new ForkJoinPool(4);
//創(chuàng)建分治任務(wù)
Fibonacci fibonacci = new Fibonacci(n);
//調(diào)用 invoke 方法啟動(dòng)分治任務(wù)
Integer result = forkJoinPool.invoke(fibonacci);
log.info("Fibonacci {} 的結(jié)果是 {}", n, result);
}
}
// 2. 定義拆分任務(wù),寫好拆分邏輯
@Slf4j
class Fibonacci extends RecursiveTask{
final int n;
Fibonacci(int n) {
this.n = n;
}
@Override
public Integer compute() {
//和遞歸類似,定義可計(jì)算的最小單元
if (n <= 1) {
return n;
}
// 想查看子線程名稱輸出的可以打開下面注釋
//log.info(Thread.currentThread().getName());
Fibonacci f1 = new Fibonacci(n - 1);
// 拆分成子任務(wù)
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
// f1.join 等待子任務(wù)執(zhí)行結(jié)果
return f2.compute() + f1.join();
}
}
如上面代碼所示,我們定義了一個(gè) Fibonacci 類,繼承了 RecursiveTask 抽象類。在 Fibonacci 類中,我們定義了拆分邏輯,并調(diào)用了 join () 等待子線程執(zhí)行結(jié)果。運(yùn)行程序,我們會(huì)得到如下的結(jié)果:
17:29:10.336 [main] INFO tech.shuyi.javacodechip.forkjoinpool.ForkJoinDemo - Fibonacci 20 的結(jié)果是 6765
上面代碼中提到的 fork () 和 join () 是 ForkJoinPool 提供的 API 接口,主要用于執(zhí)行任務(wù)以及等待子線程結(jié)果。關(guān)于其詳細(xì)用法,我們稍后會(huì)講到。
除了用于處理父子任務(wù)有依賴的情形,其實(shí) ForkJoinPool 也可以用于處理需要獲取子任務(wù)執(zhí)行結(jié)果的場景。 例如:我們要計(jì)算 1 到 1 億的和,為了加快計(jì)算的速度,我們自然想到算法中的分治原理,將 1 億個(gè)數(shù)字分成 1 萬個(gè)任務(wù),每個(gè)任務(wù)計(jì)算 1 萬個(gè)數(shù)值的綜合,利用 CPU 的并發(fā)計(jì)算性能縮短計(jì)算時(shí)間。
因?yàn)?ThreadPoolExecutor 也可以通過 Future 獲取執(zhí)行結(jié)果,因此 ThreadPoolExecutor 也是可行的。這時(shí)候我們有兩種實(shí)現(xiàn)方式,一種是用 ThreadPoolExecutor 實(shí)現(xiàn),一種是用 ForkJoinPool 實(shí)現(xiàn)。下面我們將這兩種方式都實(shí)現(xiàn)一下,看看這兩種實(shí)現(xiàn)方式有什么不同。
無論哪種實(shí)現(xiàn)方式,其大致思路都是:
- 按照線程池里線程個(gè)數(shù) N,將 1 億個(gè)數(shù)劃分成 N 等份,隨后丟入線程池進(jìn)行計(jì)算。
- 每個(gè)計(jì)算任務(wù)使用 Future 接口獲取計(jì)算結(jié)果,最后積加即可。
我們先使用 ThreadPoolExecutor 實(shí)現(xiàn)。
首先,定義一個(gè) Calculator 接口,表示計(jì)算數(shù)字總和這個(gè)動(dòng)作,如下所示。
public interface Calculator {
/**
* 把傳進(jìn)來的所有numbers 做求和處理
*
* @param numbers
* @return 總和
*/
long sumUp(long[] numbers);
}接著,我們定義一個(gè)使用 ThreadPoolExecutor 線程池實(shí)現(xiàn)的類,如下所示。
package tech.shuyi.javacodechip.forkjoinpool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorServiceCalculator implements Calculator {
private int parallism;
private ExecutorService pool;
public ExecutorServiceCalculator() {
// CPU的核心數(shù) 默認(rèn)就用cpu核心數(shù)了
parallism = Runtime.getRuntime().availableProcessors();
pool = Executors.newFixedThreadPool(parallism);
}
// 1. 處理計(jì)算任務(wù)的線程
private static class SumTask implements Callable{
private long[] numbers;
private int from;
private int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
@Override
public Long call() {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
}
}
// 2. 核心業(yè)務(wù)邏輯實(shí)現(xiàn)
@Override
public long sumUp(long[] numbers) {
List> results = new ArrayList<>();
// 2.1 數(shù)字拆分
// 把任務(wù)分解為 n 份,交給 n 個(gè)線程處理 4核心 就等分成4份唄
// 然后把每一份都扔個(gè)一個(gè)SumTask線程 進(jìn)行處理
int part = numbers.length / parallism;
for (int i = 0; i < parallism; i++) {
int from = i * part; //開始位置
int to = (i == parallism - 1) ? numbers.length - 1 : (i + 1) * part - 1; //結(jié)束位置
//扔給線程池計(jì)算
results.add(pool.submit(new SumTask(numbers, from, to)));
}
// 2.2 阻塞等待結(jié)果
// 把每個(gè)線程的結(jié)果相加,得到最終結(jié)果 get()方法 是阻塞的
// 優(yōu)化方案:可以采用CompletableFuture來優(yōu)化 JDK1.8的新特性
long total = 0L;
for (Futuref : results) {
try {
total += f.get();
} catch (Exception ignore) {
}
}
return total;
}
}
如上面代碼所示,我們實(shí)現(xiàn)了一個(gè)計(jì)算單個(gè)任務(wù)的類 SumTask,在該類中對數(shù)值進(jìn)行累加。其次,我們在 sumUp () 方法中,對 1 億的數(shù)字進(jìn)行拆分,接著扔給線程池計(jì)算,最后阻塞等待計(jì)算結(jié)果,最終累加起來。
我們運(yùn)行上面的代碼,可以得到順利得到最終結(jié)果,如下所示。
耗時(shí):10ms
結(jié)果為:50000005000000
接著我們使用 ForkJoinPool 來實(shí)現(xiàn)。
我們首先實(shí)現(xiàn) SumTask 繼承 RecursiveTask 抽象類,并在 compute () 方法中定義拆分邏輯及計(jì)算。最后在愛 sumUp () 方法中調(diào)用 pool 方法進(jìn)行計(jì)算,代碼如下所示。
public class ForkJoinCalculator implements Calculator {
private ForkJoinPool pool;
// 1. 定義計(jì)算邏輯
private static class SumTask extends RecursiveTask {
private long[] numbers;
private int from;
private int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
//此方法為ForkJoin的核心方法:對任務(wù)進(jìn)行拆分 拆分的好壞決定了效率的高低
@Override
protected Long compute() {
// 當(dāng)需要計(jì)算的數(shù)字個(gè)數(shù)小于6時(shí),直接采用for loop方式計(jì)算結(jié)果
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
} else {
// 否則,把任務(wù)一分為二,遞歸拆分(注意此處有遞歸)到底拆分成多少分 需要根據(jù)具體情況而定
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}
public ForkJoinCalculator() {
// 也可以使用公用的線程池 ForkJoinPool.commonPool():
// pool = ForkJoinPool.commonPool()
pool = new ForkJoinPool();
}
@Override
public long sumUp(long[] numbers) {
Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
pool.shutdown();
return result;
}
} 運(yùn)行上面的代碼,結(jié)果為:
耗時(shí):860ms
結(jié)果為:50000005000000
對比 ThreadPoolExecutor 和 ForkJoinPool 這兩者的實(shí)現(xiàn),可以發(fā)現(xiàn)它們都有任務(wù)拆分的邏輯,以及最終合并數(shù)值的邏輯。但 ForkJoinPool 相比 ThreadPoolExecutor 來說,做了一些實(shí)現(xiàn)上的封裝,例如:
- 不用手動(dòng)去獲取子任務(wù)的結(jié)果,而是使用 join () 方法直接獲取結(jié)果。
- 將任務(wù)拆分的邏輯,封裝到 RecursiveTask 實(shí)現(xiàn)類中,而不是裸露在外。
因此對于沒有父子任務(wù)依賴,但是希望獲取到子任務(wù)執(zhí)行結(jié)果的并行計(jì)算任務(wù),也可以使用 ForkJoinPool 來實(shí)現(xiàn)。在這種情況下,使用 ForkJoinPool 實(shí)現(xiàn)更多是代碼實(shí)現(xiàn)方便,封裝做得更加好。
使用指南
使用 ForkJoinPool 來進(jìn)行并行計(jì)算,主要分為兩步:
- 定義 RecursiveTask 或 RecursiveAction 的任務(wù)子類。
- 初始化線程池及計(jì)算任務(wù),丟入線程池處理,取得處理結(jié)果。
首先,我們需要定義一個(gè) RecursiveTask 或 RecursiveAction 的子類,然后再該類的 compute () 方法中定義拆分邏輯和計(jì)算邏輯。 這兩個(gè)抽象類的區(qū)別在于:前者有返回值,后者沒有返回值。例如前面講到的 1 到 1 億的疊加問題,其定義的 RecursiveTask 實(shí)現(xiàn)類 SumTask 的代碼如下:
private static class SumTask extends RecursiveTask{
private long[] numbers;
private int from;
private int to;
public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}
@Override
protected Long compute() {
// 1. 定義拆分退出邏輯
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
} else {
// 2. 定義計(jì)算邏輯
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}
對于 compute () 方法的實(shí)現(xiàn),核心是想清楚:怎么拆分成子任務(wù)?什么時(shí)候結(jié)束拆分?
接著,初始化 ForkJoinPool 線程池,初始化計(jì)算任務(wù),最后將任務(wù)丟入線程池中。
// 初始化線程池
public ForkJoinCalculator() {
pool = new ForkJoinPool();
}
// 初始化計(jì)算任務(wù),將任務(wù)丟入線程池
public long sumUp(long[] numbers) {
Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
pool.shutdown();
return result;
}
通過上面兩步操作,我們就完成了一個(gè) ForkJoinPool 任務(wù)代碼的編寫。
原理解析
ForkJoinPool 的設(shè)計(jì)思想是分治算法,即將任務(wù)不斷拆分(fork)成更小的任務(wù),最終再合并(join)各個(gè)任務(wù)的計(jì)算結(jié)果。通過這種方式,可以充分利用 CPU 資源,再結(jié)合工作竊取算法(worksteal)整體提高執(zhí)行效率。其簡單的流程如下圖:
圖片來源于思否用戶「日拱一兵」
從圖中可以看出 ForkJoinPool 要先執(zhí)行完子任務(wù)才能執(zhí)行上一層任務(wù)。因此 ForkJoinPool 最適合有父子任務(wù)依賴的場景,其次就是需要獲取子任務(wù)執(zhí)行結(jié)果的場景。比如:Fibonacci 數(shù)列、快速排序、二分查找等。
源碼實(shí)現(xiàn)
ForkJoinPool 的主要實(shí)現(xiàn)類為:ForkJoinPool 和 ForkJoinTask 抽象類。
ForkJoinTask 實(shí)現(xiàn)了 Future 接口,可以用于獲取處理結(jié)果。ForkJoinTask 有兩個(gè)抽象子類:RecursiveAction 和 RecursiveTask 抽象類,其區(qū)別在于前者沒有返回值,后者有返回值,其類圖如下所示。
圖片來源于思否用戶「日拱一兵」
ForkJoinPool 則是具體的邏輯實(shí)現(xiàn),由于暫時(shí)沒有應(yīng)用場景,就不了解這么深了,這里就不深入解析了。
感興趣的朋友可以參考這篇文章:ForkJoinPool 大型圖文現(xiàn)場(一閱到底 vs 直接收藏) - SegmentFault 思否。
竊取算法
我們知道 ForkJoinPool 的父子任務(wù)之間是有依賴關(guān)系的,那么 ForkJoinPool 是如何實(shí)現(xiàn)的呢?答案是:利用不同任務(wù)隊(duì)列執(zhí)行。 在 ForkJoinPool 中有一個(gè)數(shù)組形式的成員變量 workQueue[],其對應(yīng)一個(gè)隊(duì)列數(shù)組,每個(gè)隊(duì)列對應(yīng)一個(gè)消費(fèi)線程。丟入線程池的任務(wù),根據(jù)特定規(guī)則進(jìn)行轉(zhuǎn)發(fā)。
圖片來源于思否用戶「日拱一兵」
這樣就有一個(gè)問題:有些隊(duì)列可能任務(wù)比較多,有些隊(duì)列任務(wù)比較少,這樣就會(huì)導(dǎo)致不同線程負(fù)載不一樣,整體不夠高效,怎么辦呢?
答案是:利用竊取算法,空閑的線程從尾部去消費(fèi)其他隊(duì)列的任務(wù)。
一般情況下,線程獲取自己隊(duì)列中的任務(wù)是 LIFO(Last Input First Output 后進(jìn)先出)的方式,即類似于棧的操作方式。如下圖所示,首先放入隊(duì)列的時(shí)候先將任務(wù) Push 進(jìn)隊(duì)列的頭部(top),之后消費(fèi)的時(shí)候在 pop 出隊(duì)列頭部(top)。
圖片來源于思否用戶「日拱一兵」
而當(dāng)某個(gè)線程對應(yīng)的隊(duì)列空閑時(shí),該線程則去隊(duì)列的底部(base)竊?。╬oll)任務(wù)到自己的隊(duì)列,然后進(jìn)行消費(fèi)。那問題來了:為什么不從頭部(top)獲取任務(wù),而要從底部(base)獲取任務(wù)呢? 那是為了避免沖突!如果兩個(gè)線程同時(shí)從頂部獲取任務(wù),那就會(huì)有多線程的沖突問題,就需要加鎖操作,從而降低了執(zhí)行效率。
參考資料
- 線程池 ForkJoinPool 簡介 - 老 K 的 Java 博客
- ForkJoinPool 大型圖文現(xiàn)場(一閱到底 vs 直接收藏) - SegmentFault 思否
- (2 條消息) 介紹 ForkJoinPool 的適用場景,實(shí)現(xiàn)原理_想跑步丶小胖子的博客 - CSDN 博客_forkjoinpool
- ForkJoinPool 使用場景?ARLOOR?
網(wǎng)站標(biāo)題:深入理解ForkJoinPool:入門、使用、原理
網(wǎng)頁路徑:http://m.5511xx.com/article/dhpcsoi.html


咨詢
建站咨詢
