Fork/Join框架是ExecutorService接口的一個實現,通過它我們可以實現多進程。Fork/Join可以用來將一個大任務遞歸的拆分為多個小任務,目標是充分利用所有的資源盡可能增強應用的性能。
和任何ExecutorService接口的實現一樣,Fork/Join也會使用線程池來分布式的管理工作線程。Fork/Join框架的獨特之處在于它使用了work-stealing(工作竊取)算法。通過這個算法,工作線程在無事可做時可以竊取其它正在繁忙的線程的任務來執行。
Fork/Join框架的核心是ForkJoinPool類,一個AbstractExecutorService類的子類。ForkJoinPool實現了核心的work-stealing算法并可以執行ForkJoinTask處理。
基礎用法
使用Fork/Join框架的第一步是編寫執行碎片任務的代碼。要編寫的代碼類似如下偽代碼:
1
2
3
4
5
|
if 任務足夠小: 直接執行任務 else: 將任務切成兩個小任務 執行兩個小任務并等待結果 |
使用ForkJoinTask子類來封裝如上的代碼,通常會使用一些JDK提供的類,使用的有RecursiveTask(這個類會返回一個結果)和RecursiveAction兩個類。
在準備好ForkJoinTask子類后,創建一個代表所有任務的對象,并將之傳遞給一個ForkJoinPool實例的invoke()方法。
由模糊到清晰
為了輔助理解Fork/Join框架是如何工作的,我們使用一個案例來進行說明:比如對一張圖片進行模糊處理。我們用一個整型數組表示圖片,其中的每個數值代表一個像素的顏色。被模糊的圖片也用一個同等長度的數組來表示。
執行模糊是通過對代表圖片的每個像素進行處理實現的。計算每個像素與其周圍像素的均值(紅黃藍三原色的均值),計算生成的結果數組就是模糊后的圖片。由于代表圖像的通常都是一個大數組,整個處理過程需要通常會需要很多時間。可以使用Fork/Join框架利用多處理器系統上的并發處理優勢來進行提速。下面是一個可能的實現:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
package com.zhyea.robin; import java.util.concurrent.RecursiveAction; public class ForkBlur extends RecursiveAction { private int [] mSource; private int mStart; private int mLength; private int [] mDestination; // 處理窗口大小; 需要是一個奇數. private int mBlurWidth = 15 ; public ForkBlur( int [] src, int start, int length, int [] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } protected void computeDirectly() { int sidePixels = (mBlurWidth - 1 ) / 2 ; for ( int index = mStart; index < mStart + mLength; index++) { // 計算平均值. float rt = 0 , gt = 0 , bt = 0 ; for ( int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0 ), mSource.length - 1 ); int pixel = mSource[mindex]; rt += ( float ) ((pixel & 0x00ff0000 ) >> 16 ) / mBlurWidth; gt += ( float ) ((pixel & 0x0000ff00 ) >> 8 ) / mBlurWidth; bt += ( float ) ((pixel & 0x000000ff ) >> 0 ) / mBlurWidth; } // 重組目標像素. int dpixel = ( 0xff000000 ) | ((( int ) rt) << 16 ) | ((( int ) gt) << 8 ) | ((( int ) bt) << 0 ); mDestination[index] = dpixel; } } .... } |
現在實現抽象方法compute(),在這個方法中既實現了模糊操作,也實現了將一個任務拆分成兩個小任務。這里僅是簡單依據數組長度來決定是直接執行任務還是將之拆分成兩個小任務:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
protected static int sThreshold = 100000 ; protected void compute() { if (mLength < sThreshold) { computeDirectly(); return ; } int split = mLength / 2 ; invokeAll( new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination)); } |
因為上面這些方法的實現是定義在RecursiveAction的一個子類中,可以直接在一個ForkJoinPool中創建并運行任務。具體步驟如下:
1. 創建一個代表要執行的任務的對象:
1
2
3
|
// src 表示源圖片像素的數組 // dst 表示生成的圖片的像素 ForkBlur fb = new ForkBlur(src, 0 , src.length, dst); |
2. 創建一個運行任務的ForkJoinPool實例:
ForkJoinPool pool = new ForkJoinPool();
3. 運行任務:
pool.invoke(fb);
在源代碼中還包含了一些創建目標圖片的代碼。具體參考ForkBlur示例。
標準實現
要使用Fork/Join框架按自定義的算法在多核系統上執行并發任務當然需要實現自定義的類了(比如之前我們實現的ForkBlur類)。除此之外,在JavaSE中已經在廣泛使用Fork/Join框架的一些特性了。比如Java8中的java.util.Arrays類的parallelSort()方法就使用了Fork/Join框架。具體可以參考Java API文檔。
Fork/Join框架的另一個實現在java.util.streams包下,這也是java8的Lambda特性的一部分。