ThreadPoolExecutor是JDK中的線程池實現,這個類實現了一個線程池需要的各個方法,它提供了任務提交、線程管理、監控等方法。
下面是ThreadPoolExecutor類的構造方法源碼,其他創建線程池的方法最終都會導向這個構造方法,共有7個參數:corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; } |
這些參數都通過volatile修飾:
1
2
3
4
5
6
7
8
9
10
|
public class ThreadPoolExecutor extends AbstractExecutorService { private final BlockingQueue<Runnable> workQueue; private volatile ThreadFactory threadFactory; private volatile RejectedExecutionHandler handler; private volatile long keepAliveTime; // 是否允許核心線程被回收 private volatile boolean allowCoreThreadTimeOut; private volatile int corePoolSize; private volatile int maximumPoolSize; } |
corePoolSize:核心線程數
線程池維護的最小線程數量,核心線程創建后不會被回收(注意:設置allowCoreThreadTimeout=true后,空閑的核心線程超過存活時間也會被回收)。
大于核心線程數的線程,在空閑時間超過keepAliveTime后會被回收。
線程池剛創建時,里面沒有一個線程,當調用 execute() 方法添加一個任務時,如果正在運行的線程數量小于corePoolSize,則馬上創建新線程并運行這個任務。
maximumPoolSize:最大線程數
線程池允許創建的最大線程數量。
當添加一個任務時,核心線程數已滿,線程池還沒達到最大線程數,并且沒有空閑線程,工作隊列已滿的情況下,創建一個新線程,然后從工作隊列的頭部取出一個任務交由新線程來處理,而將剛提交的任務放入工作隊列尾部。
keepAliveTime:空閑線程存活時間
當一個可被回收的線程的空閑時間大于keepAliveTime,就會被回收。
可被回收的線程:
設置allowCoreThreadTimeout=true的核心線程。大于核心線程數的線程(非核心線程)。
unit:時間單位
keepAliveTime的時間單位:
1
2
3
4
5
6
7
|
TimeUnit.NANOSECONDS TimeUnit.MICROSECONDS TimeUnit.MILLISECONDS // 毫秒 TimeUnit.SECONDS TimeUnit.MINUTES TimeUnit.HOURS TimeUnit.DAYS |
workQueue:工作隊列
新任務被提交后,會先添加到工作隊列,任務調度時再從隊列中取出任務。工作隊列實現了BlockingQueue接口。
JDK默認的工作隊列有五種:
1.ArrayBlockingQueue 數組型阻塞隊列:數組結構,初始化時傳入大小,有界,FIFO,使用一個重入鎖,默認使用非公平鎖,入隊和出隊共用一個鎖,互斥。
2。LinkedBlockingQueue 鏈表型阻塞隊列:鏈表結構,默認初始化大小為Integer.MAX_VALUE,有界(近似無解),FIFO,使用兩個重入鎖分別控制元素的入隊和出隊,用Condition進行線程間的喚醒和等待。
3.SynchronousQueue 同步隊列:容量為0,添加任務必須等待取出任務,這個隊列相當于通道,不存儲元素。
4.PriorityBlockingQueue 優先阻塞隊列:無界,默認采用元素自然順序升序排列。
5.DelayQueue 延時隊列:無界,元素有過期時間,過期的元素才能被取出。
threadFactory:線程工廠
創建線程的工廠,可以設定線程名、線程編號等。
默認線程工廠:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
/** * The default thread factory */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger( 1 ); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger( 1 ); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null ) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" ; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0 ); if (t.isDaemon()) t.setDaemon( false ); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } |
handler:拒絕策略
當線程池線程數已滿,并且工作隊列達到限制,新提交的任務使用拒絕策略處理。可以自定義拒絕策略,拒絕策略需要實現RejectedExecutionHandler接口。
JDK默認的拒絕策略有四種:
1.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。
2.DiscardPolicy:丟棄任務,但是不拋出異常。可能導致無法發現系統的異常狀態。
3.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新提交被拒絕的任務。
4.CallerRunsPolicy:由調用線程處理該任務。
默認拒絕策略:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
/** * The default rejected execution handler */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException( "Task " + r.toString() + " rejected from " + e.toString()); } } |
自定義線程池工具
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 線程池工廠工具 * * @author 向振華 * @date 2021/04/11 10:24 */ public class ThreadPoolFactory { /** * 生成固定大小的線程池 * * @param threadName 線程名稱 * @return 線程池 */ public static ExecutorService createFixedThreadPool(String threadName) { AtomicInteger threadNumber = new AtomicInteger( 0 ); return new ThreadPoolExecutor( // 核心線程數 desiredThreadNum(), // 最大線程數 desiredThreadNum() * 2 , // 空閑線程存活時間 60L, // 空閑線程存活時間單位 TimeUnit.SECONDS, // 工作隊列 new ArrayBlockingQueue<>( 1024 ), // 線程工廠 new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, threadName + "-" + threadNumber.getAndIncrement()); } }, // 拒絕策略 new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { //嘗試阻塞式加入任務隊列 executor.getQueue().put(r); } catch (Exception e) { //保持線程的中斷狀態 Thread.currentThread().interrupt(); } } } }); } /** * 理想的線程數,使用 2倍cpu核心數 */ public static int desiredThreadNum() { return Runtime.getRuntime().availableProcessors() * 2 ; } } |
到此這篇關于Java多線程之線程池七個參數詳解的文章就介紹到這了,更多相關java線程池詳解內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://blog.csdn.net/Anenan/article/details/115603481