日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
深入理解ForkJoinPool:入門、使用、原理

?大家好,我是樹哥。

本文將從一個(gè)簡單的例子出發(fā),與大家解釋為啥要有 ForkJoinPool 的存在。接著向大家介紹 ForkJoinPool 的基本信息及使用,最后講解 ForkJoinPool 的基本原理。

誕生原因

對于線程池來說,我們經(jīng)常使用的是 ThreadPoolExecutor,可以用來提升任務(wù)處理效率。一般情況下,我們使用 ThreadPoolExecutor 的時(shí)候,各個(gè)任務(wù)之間都是沒有聯(lián)系的。但在某些特殊情況下,我們處理的任務(wù)之間是有聯(lián)系的,例如經(jīng)典的 Fibonacci 算法就是其中一種情況。

對于 Fibonacci 數(shù)列來說,我們知道 F (N) = F (N-1) + F (N-2)。當(dāng)前數(shù)值的結(jié)果,都依賴后面幾個(gè)數(shù)值的結(jié)果。這時(shí)候如果用 ThreadPoolExecutor 貌似就無法解決問題了。雖然我們可以單線程的遞歸算法,則其計(jì)算速度較慢,并且無法進(jìn)行并行計(jì)算,無法發(fā)揮 CPU 多核的優(yōu)勢。

ForkJoinPool 就是設(shè)計(jì)用來解決父子任務(wù)有依賴的并行計(jì)算問題的。 類似于快速排序、二分查找、集合運(yùn)算等有父子依賴的并行計(jì)算問題,都可以用 ForkJoinPool 來解決。對于 Fibonacci 數(shù)列問題,如果用 ForkJoinPool 來實(shí)現(xiàn),其實(shí)現(xiàn)代碼為:

@Slf4j
public class ForkJoinDemo {
// 1. 運(yùn)行入口
public static void main(String[] args) {
int n = 20;

// 為了追蹤子線程名稱,需要重寫 ForkJoinWorkerThreadFactory 的方法
final ForkJoinPool.ForkJoinWorkerThreadFactory factory = pool -> {
final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("my-thread" + worker.getPoolIndex());
return worker;
};

//創(chuàng)建分治任務(wù)線程池,可以追蹤到線程名稱
ForkJoinPool forkJoinPool = new ForkJoinPool(4, factory, null, false);

// 快速創(chuàng)建 ForkJoinPool 方法
// ForkJoinPool forkJoinPool = new ForkJoinPool(4);

//創(chuàng)建分治任務(wù)
Fibonacci fibonacci = new Fibonacci(n);

//調(diào)用 invoke 方法啟動(dòng)分治任務(wù)
Integer result = forkJoinPool.invoke(fibonacci);
log.info("Fibonacci {} 的結(jié)果是 {}", n, result);
}
}

// 2. 定義拆分任務(wù),寫好拆分邏輯
@Slf4j
class Fibonacci extends RecursiveTask {
final int n;
Fibonacci(int n) {
this.n = n;
}

@Override
public Integer compute() {
//和遞歸類似,定義可計(jì)算的最小單元
if (n <= 1) {
return n;
}
// 想查看子線程名稱輸出的可以打開下面注釋
//log.info(Thread.currentThread().getName());

Fibonacci f1 = new Fibonacci(n - 1);
// 拆分成子任務(wù)
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
// f1.join 等待子任務(wù)執(zhí)行結(jié)果
return f2.compute() + f1.join();
}
}

如上面代碼所示,我們定義了一個(gè) Fibonacci 類,繼承了 RecursiveTask 抽象類。在 Fibonacci 類中,我們定義了拆分邏輯,并調(diào)用了 join () 等待子線程執(zhí)行結(jié)果。運(yùn)行程序,我們會(huì)得到如下的結(jié)果:

17:29:10.336 [main] INFO tech.shuyi.javacodechip.forkjoinpool.ForkJoinDemo - Fibonacci 20 的結(jié)果是 6765

上面代碼中提到的 fork () 和 join () 是 ForkJoinPool 提供的 API 接口,主要用于執(zhí)行任務(wù)以及等待子線程結(jié)果。關(guān)于其詳細(xì)用法,我們稍后會(huì)講到。

除了用于處理父子任務(wù)有依賴的情形,其實(shí) ForkJoinPool 也可以用于處理需要獲取子任務(wù)執(zhí)行結(jié)果的場景。 例如:我們要計(jì)算 1 到 1 億的和,為了加快計(jì)算的速度,我們自然想到算法中的分治原理,將 1 億個(gè)數(shù)字分成 1 萬個(gè)任務(wù),每個(gè)任務(wù)計(jì)算 1 萬個(gè)數(shù)值的綜合,利用 CPU 的并發(fā)計(jì)算性能縮短計(jì)算時(shí)間。

因?yàn)?ThreadPoolExecutor 也可以通過 Future 獲取執(zhí)行結(jié)果,因此 ThreadPoolExecutor 也是可行的。這時(shí)候我們有兩種實(shí)現(xiàn)方式,一種是用 ThreadPoolExecutor 實(shí)現(xiàn),一種是用 ForkJoinPool 實(shí)現(xiàn)。下面我們將這兩種方式都實(shí)現(xiàn)一下,看看這兩種實(shí)現(xiàn)方式有什么不同。

無論哪種實(shí)現(xiàn)方式,其大致思路都是:

  • 按照線程池里線程個(gè)數(shù) N,將 1 億個(gè)數(shù)劃分成 N 等份,隨后丟入線程池進(jìn)行計(jì)算。
  • 每個(gè)計(jì)算任務(wù)使用 Future 接口獲取計(jì)算結(jié)果,最后積加即可。

我們先使用 ThreadPoolExecutor 實(shí)現(xiàn)。

首先,定義一個(gè) Calculator 接口,表示計(jì)算數(shù)字總和這個(gè)動(dòng)作,如下所示。

public interface Calculator {
/**
* 把傳進(jìn)來的所有numbers 做求和處理
*
* @param numbers
* @return 總和
*/
long sumUp(long[] numbers);
}

接著,我們定義一個(gè)使用 ThreadPoolExecutor 線程池實(shí)現(xiàn)的類,如下所示。

package tech.shuyi.javacodechip.forkjoinpool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceCalculator implements Calculator {

private int parallism;
private ExecutorService pool;

public ExecutorServiceCalculator() {
// CPU的核心數(shù) 默認(rèn)就用cpu核心數(shù)了
parallism = Runtime.getRuntime().availableProcessors();
pool = Executors.newFixedThreadPool(parallism);
}

// 1. 處理計(jì)算任務(wù)的線程
private static class SumTask implements Callable {
private long[] numbers;
private int from;
private int to;

public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}

@Override
public Long call() {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
}
}

// 2. 核心業(yè)務(wù)邏輯實(shí)現(xiàn)
@Override
public long sumUp(long[] numbers) {
List> results = new ArrayList<>();

// 2.1 數(shù)字拆分
// 把任務(wù)分解為 n 份,交給 n 個(gè)線程處理 4核心 就等分成4份唄
// 然后把每一份都扔個(gè)一個(gè)SumTask線程 進(jìn)行處理
int part = numbers.length / parallism;
for (int i = 0; i < parallism; i++) {
int from = i * part; //開始位置
int to = (i == parallism - 1) ? numbers.length - 1 : (i + 1) * part - 1; //結(jié)束位置

//扔給線程池計(jì)算
results.add(pool.submit(new SumTask(numbers, from, to)));
}

// 2.2 阻塞等待結(jié)果
// 把每個(gè)線程的結(jié)果相加,得到最終結(jié)果 get()方法 是阻塞的
// 優(yōu)化方案:可以采用CompletableFuture來優(yōu)化 JDK1.8的新特性
long total = 0L;
for (Future f : results) {
try {
total += f.get();
} catch (Exception ignore) {
}
}

return total;
}
}

如上面代碼所示,我們實(shí)現(xiàn)了一個(gè)計(jì)算單個(gè)任務(wù)的類 SumTask,在該類中對數(shù)值進(jìn)行累加。其次,我們在 sumUp () 方法中,對 1 億的數(shù)字進(jìn)行拆分,接著扔給線程池計(jì)算,最后阻塞等待計(jì)算結(jié)果,最終累加起來。

我們運(yùn)行上面的代碼,可以得到順利得到最終結(jié)果,如下所示。

耗時(shí):10ms
結(jié)果為:50000005000000

接著我們使用 ForkJoinPool 來實(shí)現(xiàn)。

我們首先實(shí)現(xiàn) SumTask 繼承 RecursiveTask 抽象類,并在 compute () 方法中定義拆分邏輯及計(jì)算。最后在愛 sumUp () 方法中調(diào)用 pool 方法進(jìn)行計(jì)算,代碼如下所示。

public class ForkJoinCalculator implements Calculator {

private ForkJoinPool pool;

// 1. 定義計(jì)算邏輯
private static class SumTask extends RecursiveTask {
private long[] numbers;
private int from;
private int to;

public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}

//此方法為ForkJoin的核心方法:對任務(wù)進(jìn)行拆分 拆分的好壞決定了效率的高低
@Override
protected Long compute() {

// 當(dāng)需要計(jì)算的數(shù)字個(gè)數(shù)小于6時(shí),直接采用for loop方式計(jì)算結(jié)果
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
} else {
// 否則,把任務(wù)一分為二,遞歸拆分(注意此處有遞歸)到底拆分成多少分 需要根據(jù)具體情況而定
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}

public ForkJoinCalculator() {
// 也可以使用公用的線程池 ForkJoinPool.commonPool():
// pool = ForkJoinPool.commonPool()
pool = new ForkJoinPool();
}

@Override
public long sumUp(long[] numbers) {
Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
pool.shutdown();
return result;
}
}

運(yùn)行上面的代碼,結(jié)果為:

耗時(shí):860ms
結(jié)果為:50000005000000

對比 ThreadPoolExecutor 和 ForkJoinPool 這兩者的實(shí)現(xiàn),可以發(fā)現(xiàn)它們都有任務(wù)拆分的邏輯,以及最終合并數(shù)值的邏輯。但 ForkJoinPool 相比 ThreadPoolExecutor 來說,做了一些實(shí)現(xiàn)上的封裝,例如:

  • 不用手動(dòng)去獲取子任務(wù)的結(jié)果,而是使用 join () 方法直接獲取結(jié)果。
  • 將任務(wù)拆分的邏輯,封裝到 RecursiveTask 實(shí)現(xiàn)類中,而不是裸露在外。

因此對于沒有父子任務(wù)依賴,但是希望獲取到子任務(wù)執(zhí)行結(jié)果的并行計(jì)算任務(wù),也可以使用 ForkJoinPool 來實(shí)現(xiàn)。在這種情況下,使用 ForkJoinPool 實(shí)現(xiàn)更多是代碼實(shí)現(xiàn)方便,封裝做得更加好。

使用指南

使用 ForkJoinPool 來進(jìn)行并行計(jì)算,主要分為兩步:

  • 定義 RecursiveTask 或 RecursiveAction 的任務(wù)子類。
  • 初始化線程池及計(jì)算任務(wù),丟入線程池處理,取得處理結(jié)果。

首先,我們需要定義一個(gè) RecursiveTask 或 RecursiveAction 的子類,然后再該類的 compute () 方法中定義拆分邏輯和計(jì)算邏輯。 這兩個(gè)抽象類的區(qū)別在于:前者有返回值,后者沒有返回值。例如前面講到的 1 到 1 億的疊加問題,其定義的 RecursiveTask 實(shí)現(xiàn)類 SumTask 的代碼如下:

private static class SumTask extends RecursiveTask {
private long[] numbers;
private int from;
private int to;

public SumTask(long[] numbers, int from, int to) {
this.numbers = numbers;
this.from = from;
this.to = to;
}

@Override
protected Long compute() {
// 1. 定義拆分退出邏輯
if (to - from < 6) {
long total = 0;
for (int i = from; i <= to; i++) {
total += numbers[i];
}
return total;
} else {
// 2. 定義計(jì)算邏輯
int middle = (from + to) / 2;
SumTask taskLeft = new SumTask(numbers, from, middle);
SumTask taskRight = new SumTask(numbers, middle + 1, to);
taskLeft.fork();
taskRight.fork();
return taskLeft.join() + taskRight.join();
}
}
}

對于 compute () 方法的實(shí)現(xiàn),核心是想清楚:怎么拆分成子任務(wù)?什么時(shí)候結(jié)束拆分?

接著,初始化 ForkJoinPool 線程池,初始化計(jì)算任務(wù),最后將任務(wù)丟入線程池中。

// 初始化線程池
public ForkJoinCalculator() {
pool = new ForkJoinPool();
}
// 初始化計(jì)算任務(wù),將任務(wù)丟入線程池
public long sumUp(long[] numbers) {
Long result = pool.invoke(new SumTask(numbers, 0, numbers.length - 1));
pool.shutdown();
return result;
}

通過上面兩步操作,我們就完成了一個(gè) ForkJoinPool 任務(wù)代碼的編寫。

原理解析

ForkJoinPool 的設(shè)計(jì)思想是分治算法,即將任務(wù)不斷拆分(fork)成更小的任務(wù),最終再合并(join)各個(gè)任務(wù)的計(jì)算結(jié)果。通過這種方式,可以充分利用 CPU 資源,再結(jié)合工作竊取算法(worksteal)整體提高執(zhí)行效率。其簡單的流程如下圖:

圖片來源于思否用戶「日拱一兵」

從圖中可以看出 ForkJoinPool 要先執(zhí)行完子任務(wù)才能執(zhí)行上一層任務(wù)。因此 ForkJoinPool 最適合有父子任務(wù)依賴的場景,其次就是需要獲取子任務(wù)執(zhí)行結(jié)果的場景。比如:Fibonacci 數(shù)列、快速排序、二分查找等。

源碼實(shí)現(xiàn)

ForkJoinPool 的主要實(shí)現(xiàn)類為:ForkJoinPool 和 ForkJoinTask 抽象類。

ForkJoinTask 實(shí)現(xiàn)了 Future 接口,可以用于獲取處理結(jié)果。ForkJoinTask 有兩個(gè)抽象子類:RecursiveAction 和 RecursiveTask 抽象類,其區(qū)別在于前者沒有返回值,后者有返回值,其類圖如下所示。

圖片來源于思否用戶「日拱一兵」

ForkJoinPool 則是具體的邏輯實(shí)現(xiàn),由于暫時(shí)沒有應(yīng)用場景,就不了解這么深了,這里就不深入解析了。

感興趣的朋友可以參考這篇文章:ForkJoinPool 大型圖文現(xiàn)場(一閱到底 vs 直接收藏) - SegmentFault 思否。

竊取算法

我們知道 ForkJoinPool 的父子任務(wù)之間是有依賴關(guān)系的,那么 ForkJoinPool 是如何實(shí)現(xiàn)的呢?答案是:利用不同任務(wù)隊(duì)列執(zhí)行。 在 ForkJoinPool 中有一個(gè)數(shù)組形式的成員變量 workQueue[],其對應(yīng)一個(gè)隊(duì)列數(shù)組,每個(gè)隊(duì)列對應(yīng)一個(gè)消費(fèi)線程。丟入線程池的任務(wù),根據(jù)特定規(guī)則進(jìn)行轉(zhuǎn)發(fā)。

圖片來源于思否用戶「日拱一兵」

這樣就有一個(gè)問題:有些隊(duì)列可能任務(wù)比較多,有些隊(duì)列任務(wù)比較少,這樣就會(huì)導(dǎo)致不同線程負(fù)載不一樣,整體不夠高效,怎么辦呢?

答案是:利用竊取算法,空閑的線程從尾部去消費(fèi)其他隊(duì)列的任務(wù)。

一般情況下,線程獲取自己隊(duì)列中的任務(wù)是 LIFO(Last Input First Output 后進(jìn)先出)的方式,即類似于棧的操作方式。如下圖所示,首先放入隊(duì)列的時(shí)候先將任務(wù) Push 進(jìn)隊(duì)列的頭部(top),之后消費(fèi)的時(shí)候在 pop 出隊(duì)列頭部(top)。

圖片來源于思否用戶「日拱一兵」

而當(dāng)某個(gè)線程對應(yīng)的隊(duì)列空閑時(shí),該線程則去隊(duì)列的底部(base)竊?。╬oll)任務(wù)到自己的隊(duì)列,然后進(jìn)行消費(fèi)。那問題來了:為什么不從頭部(top)獲取任務(wù),而要從底部(base)獲取任務(wù)呢? 那是為了避免沖突!如果兩個(gè)線程同時(shí)從頂部獲取任務(wù),那就會(huì)有多線程的沖突問題,就需要加鎖操作,從而降低了執(zhí)行效率。

參考資料

  • 線程池 ForkJoinPool 簡介 - 老 K 的 Java 博客
  • ForkJoinPool 大型圖文現(xiàn)場(一閱到底 vs 直接收藏) - SegmentFault 思否
  • (2 條消息) 介紹 ForkJoinPool 的適用場景,實(shí)現(xiàn)原理_想跑步丶小胖子的博客 - CSDN 博客_forkjoinpool
  • ForkJoinPool 使用場景?ARLOOR?

網(wǎng)站標(biāo)題:深入理解ForkJoinPool:入門、使用、原理
網(wǎng)頁路徑:http://m.5511xx.com/article/dhpcsoi.html