線程池
無限制的創建線程
若采用"為每個任務分配一個線程"的方式會存在一些缺陷,尤其是當需要創建大量線程時:
- 線程生命周期的開銷非常高
- 資源消耗
- 穩定性
引入線程池
任務是一組邏輯工作單元,線程則是使任務異步執行的機制。當存在大量并發任務時,創建、銷毀線程需要很大的開銷,運用線程池可以大大減小開銷。
executor框架
說明:
- executor 執行器接口,該接口定義執行runnable任務的方式。
- executorservice 該接口定義提供對executor的服務。
- scheduledexecutorservice 定時調度接口。
- abstractexecutorservice 執行框架抽象類。
- threadpoolexecutor jdk中線程池的具體實現。
- executors 線程池工廠類。
threadpoolexecutor線程池類
線程池是一個復雜的任務調度工具,它涉及到任務、線程池等的生命周期問題。要配置一個線程池是比較復雜的,尤其是對于線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優的。
jdk中的線程池均由threadpoolexecutor類實現。其構造方法如下:
參數說明:
- corepoolsize:核心線程數。
- maximumpoolsize:最大線程數。
- keepalivetime:線程存活時間。當線程數大于core數,那么超過該時間的線程將會被終結。
- unit:keepalivetime的單位。java.util.concurrent.timeunit類存在靜態靜態屬性:nanoseconds、microseconds、milliseconds、seconds
- workqueue:runnable的阻塞隊列。若線程池已經被占滿,則該隊列用于存放無法再放入線程池中的runnable。
另一個構造方法:
該方法在下面的擴展部分有更深入的講解。其中handler表示線程池對拒絕任務的處理策略。
threadpoolexecutor的使用需要注意以下概念:
- 若線程池中的線程數量小于corepoolsize,即使線程池中的線程都處于空閑狀態,也要創建新的線程來處理被添加的任務。
- 若線程池中的線程數量等于 corepoolsize且緩沖隊列 workqueue未滿,則任務被放入緩沖隊列。
- 若線程池中線程的數量大于corepoolsize且緩沖隊列workqueue滿,且線程池中的數量小于maximumpoolsize,則建新的線程來處理被添加的任務。
- 若線程池中線程的數量大于corepoolsize且緩沖隊列workqueue滿,且線程池中的數量等于maximumpoolsize,那么通過 handler所指定的策略來處理此任務。
- 當線程池中的線程數量大于corepoolsize時,如果某線程空閑時間超過keepalivetime,線程將被終止。
executors 工廠方法
jdk內部提供了五種最常見的線程池。由executors類的五個靜態工廠方法創建。
- newfixedthreadpool(...)
- newsinglethreadexecutor(...)
- newcachedthreadpool(...)
- newscheduledthreadpool(...)
- newsinglethreadscheduledexecutor()
單線程的線程池newsinglethreadexecutor
這個線程池只有一個線程在工作,也就是相當于單線程串行執行所有任務。
返回單線程的executor,將多個任務交給此exector時,這個線程處理完一個任務后接著處理下一個任務,若該線程出現異常,將會有一個新的線程來替代。此線程池保證所有任務的執行順序按照任務的提交順序執行。
說明:linkedblockingqueue會無限的添加需要執行的runnable。
創建固定大小的線程池newfixedthreadpool
每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
public static executorsevice newfixedthreadpool()
返回一個包含指定數目線程的線程池,如果任務數量多于線程數目,那么沒有沒有執行的任務必須等待,直到有任務完成為止。
可緩存的線程池newcachedthreadpool
如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(或者說jvm)能夠創建的最大線程大小。
newcachedthreadpool方法創建的線程池可以自動的擴展線程池的容量。核心線程數量為0。
synchronousqueue是個特殊的隊列。synchronousqueue隊列的容量為0。當試圖為synchronousqueue添加runnable,則執行會失敗。只有當一邊從synchronousqueue取數據,一邊向synchronousqueue添加數據才可以成功。synchronousqueue僅僅起到數據交換的作用,并不保存線程。但newcachedthreadpool()方法沒有線程上限。runable添加到synchronousqueue會被立刻取出。
根據用戶的任務數創建相應的線程來處理,該線程池不會對線程數目加以限制,完全依賴于jvm能創建線程的數量,可能引起內存不足。
定時任務調度的線程池newscheduledthreadpool
創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。
例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public class scheduledthreadpooltest { public static void main(string[] args) { scheduledexecutorservice ses = executors.newscheduledthreadpool( 10 ); ses.schedulewithfixeddelay( new runnable() { @override public void run() { try { thread.sleep( 1000 ); } catch (interruptedexception e) { e.printstacktrace(); } system.out.println( new date()); } }, 1000 , 2000 , timeunit.milliseconds); } } |
單線程的定時任務調度線程池newsinglethreadscheduledexecutor
此線程池支持定時以及周期性執行任務的需求。
executor接口
executor是一個線程執行接口。任務執行的主要抽象不是thead,而是executor。
1
2
3
|
public interface executor{ void executor(runnable command); } |
executor將任務的提交過程與執行過程解耦,并用runnable來表示任務。執行的任務放入run方法中即可,將runnable接口的實現類交給線程池的execute方法,作為它的一個參數。如果需要給任務傳遞參數,可以通過創建一個runnable接口的實現類來完成。
executor可以支持多種不同類型的任務執行策略。
executor基于生產者消費者模式,提交任務的操作相當于生產者,執行任務的線程則相當于消費者。
executorservice接口
線程池接口。executorservice在executor的基礎上增加了一些方法,其中有兩個核心的方法:
1
2
3
|
future<?> submit(runnable task) <t> future<t> submit(callable<t> task) |
這兩個方法都是向線程池中提交任務,它們的區別在于runnable在執行完畢后沒有結果,callable執行完畢后有一個結果。這在多個線程中傳遞狀態和結果是非常有用的。另外他們的相同點在于都返回一個future對象。future對象可以阻塞線程直到運行完畢(獲取結果,如果有的話),也可以取消任務執行,當然也能夠檢測任務是否被取消或者是否執行完畢。
在沒有future之前我們檢測一個線程是否執行完畢通常使用thread.join()或者用一個死循環加狀態位來描述線程執行完畢。現在有了更好的方法能夠阻塞線程,檢測任務執行完畢甚至取消執行中或者未開始執行的任務。
scheduledexecutorservice接口
scheduledexecutorservice描述的功能和timer/timertask類似,解決那些需要任務重復執行的問題。這包括延遲時間一次性執行、延遲時間周期性執行以及固定延遲時間周期性執行等。當然了繼承executorservice的scheduledexecutorservice擁有executorservice的全部特性。
線程池生命周期
線程是有多種執行狀態的,同樣管理線程的線程池也有多種狀態。jvm會在所有線程(非后臺daemon線程)全部終止后才退出,為了節省資源和有效釋放資源關閉一個線程池就顯得很重要。有時候無法正確的關閉線程池,將會阻止jvm的結束。
線程池executor是異步的執行任務,因此任何時刻不能夠直接獲取提交的任務的狀態。這些任務有可能已經完成,也有可能正在執行或者還在排隊等待執行。因此關閉線程池可能出現一下幾種情況:
- 平緩關閉:已經啟動的任務全部執行完畢,同時不再接受新的任務。
- 立即關閉:取消所有正在執行和未執行的任務。
另外關閉線程池后對于任務的狀態應該有相應的反饋信息。
啟動線程池
線程池在構造前(new操作)是初始狀態,一旦構造完成線程池就進入了執行狀態running。嚴格意義上講線程池構造完成后并沒有線程被立即啟動,只有進行"預啟動"或者接收到任務的時候才會啟動線程。
線程池是處于運行狀態,隨時準備接受任務來執行。
關閉線程池
線程池運行中可以通過shutdown()和shutdownnow()來改變運行狀態。
- shutdown():平緩的關閉線程池。線程池停止接受新的任務,同時等待已經提交的任務執行完畢,包括那些進入隊列還沒有開始的任務。shutdown()方法執行過程中,線程池處于shutdown狀態。
- shutdownnow():立即關閉線程池。線程池停止接受新的任務,同時線程池取消所有執行的任務和已經進入隊列但是還沒有執行的任務。shutdownnow()方法執行過程中,線程池處于stop狀態。shutdownnow方法本質是調用thread.interrupt()方法。但我們知道該方法僅僅是讓線程處于interrupted狀態,并不會讓線程真正的停止!所以若只調用或只調用一次shutdownnow()方法,不一定會讓線程池中的線程都關閉掉,線程中必須要有處理interrupt事件的機制。
線程池結束
一旦shutdown()或者shutdownnow()執行完畢,線程池就進入terminated狀態,即線程池就結束了。
- isterminating() 如果關閉后所有任務都已完成,則返回true。
- isshutdown() 如果此執行程序已關閉,則返回true。
例:使用固定大小的線程池。并將任務添加到線程池。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
import java.util.concurrent.executors; import java.util.concurrent.executorservice; public class javathreadpool { public static void main(string[] args) { // 創建一個可重用固定線程數的線程池 executorservice pool = executors.newfixedthreadpool( 2 ); // 創建實現了runnable接口對象,thread對象當然也實現了runnable接口 thread t1 = new mythread(); thread t2 = new mythread(); thread t3 = new mythread(); thread t4 = new mythread(); thread t5 = new mythread(); // 將線程放入池中進行執行 pool.execute(t1); pool.execute(t2); pool.execute(t3); pool.execute(t4); pool.execute(t5); // 關閉線程池 pool.shutdown(); } } class mythread extends thread { @override public void run() { system.out.println(thread.currentthread().getname() + "正在執行。。。" ); } } |
java線程池擴展
threadpoolexecutor線程池的執行監控
threadpoolexecutor中定義了三個空方法,用于監控線程的執行情況。
threadpoolexecutor源碼:
1
2
3
4
5
|
protected void beforeexecute(thread t, runnable r) { } protected void afterexecute(runnable r, throwable t) { } protected void terminated() { } |
例:使用覆蓋方法,定義新的線程池。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
public class extthreadpooltest { static class mytask implements runnable { public string name; public mytask(string name) { super (); this .name = name; } @override public void run() { try { thread.sleep( 500 ); system.out.println( "執行中:" + this .name); thread.sleep( 500 ); } catch (interruptedexception e) { e.printstacktrace(); } } } public static void main(string[] args) throws interruptedexception { executorservice es = new threadpoolexecutor( 5 , 5 , 0 ,timeunit.milliseconds, new linkedblockingqueue<runnable>()){ @override protected void beforeexecute(thread t, runnable r) { system.out.println( "準備執行:" + ((mytask)r).name); } @override protected void afterexecute(runnable r, throwable t) { system.out.println( "執行完成:" + ((mytask)r).name); } @override protected void terminated() { system.out.println( "執行退出" ); } }; for ( int i= 0 ;i< 5 ;i++){ mytask task = new mytask( "task-" +i); es.execute(task); } thread.sleep( 10 ); // 等待terminated()執行 es.shutdown(); // 若無該方法,主線程不會結束。 } } |
threadpoolexecutor的拒絕策略
線程池不可能處理無限多的線程。所以一旦線程池中中需要執行的任務過多,線程池對于某些任務就無法處理了。拒絕策略即對這些無法處理的任務進行處理。可能丟棄掉這些不能處理的任務,也可能用其他方式。
threadpoolexecutor類還有另一個構造方法。該構造方法中的rejectedexecutionhandler用于定義拒絕策略。
1
2
3
4
5
6
7
8
9
10
11
|
public threadpoolexecutor( int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler) { ..... } |
jdk內部已經提供一些拒絕策略。
abortpolicy一旦線程不能處理,則拋出異常。
abortpolicy源碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
public static class abortpolicy implements rejectedexecutionhandler { /** * creates an {@code abortpolicy}. */ public abortpolicy() { } /** * always throws rejectedexecutionexception. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws rejectedexecutionexception always. */ public void rejectedexecution(runnable r, threadpoolexecutor e) { throw new rejectedexecutionexception( "task " + r.tostring() + " rejected from " + e.tostring()); } } |
discardpolicy 一旦線程不能處理,則丟棄任務。
discardpolicy源碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public static class discardpolicy implements rejectedexecutionhandler { /** * creates a {@code discardpolicy}. */ public discardpolicy() { } /** * does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedexecution(runnable r, threadpoolexecutor e) { } } |
callerrunspolicy 一旦線程不能處理,則將任務返回給提交任務的線程處理。
callerrunspolicy源碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public static class callerrunspolicy implements rejectedexecutionhandler { /** * creates a {@code callerrunspolicy}. */ public callerrunspolicy() { } /** * executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedexecution(runnable r, threadpoolexecutor e) { if (!e.isshutdown()) { r.run(); } } } |
discardoldestpolicy 一旦線程不能處理,丟棄掉隊列中最老的任務。
discardoldestpolicy源碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public static class discardoldestpolicy implements rejectedexecutionhandler { /** * creates a {@code discardoldestpolicy} for the given executor. */ public discardoldestpolicy() { } /** * obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedexecution(runnable r, threadpoolexecutor e) { if (!e.isshutdown()) { e.getqueue().poll(); e.execute(r); } } } |
例:自定義拒絕策略。打印并丟棄無法處理的任務。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public class rejectedpolicyhandletest { public static void main(string[] args) throws interruptedexception { executorservice es = new threadpoolexecutor( 5 , 5 , 0 ,timeunit.milliseconds, new synchronousqueue<runnable>(),executors.defaultthreadfactory(), new rejectedexecutionhandler() { @override public void rejectedexecution(runnable r, threadpoolexecutor executor) { // 打印并丟棄。 system.out.println(r.tostring()+ " is discard" ); } }); for ( int i= 0 ;i<integer.max_value;i++){ mytask task = new mytask( "task-" +i); es.execute(task); thread.sleep( 10 ); } es.shutdown(); // 若無該方法,主線程不會結束。 } } |
threadfactory 線程工廠
threadpoolexecutor類構造器的參數其中之一即為threadfactory線程工廠。
threadfactory用于創建線程池中的線程。
1
2
3
|
public interface threadfactory { thread newthread(runnable r); } |
threadfactory的實現類中一般定義線程了線程組,線程數與線程名稱。
defaultthreadfactory源碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
static class defaultthreadfactory implements threadfactory { private static final atomicinteger poolnumber = new atomicinteger( 1 ); private final threadgroup group; private final atomicinteger threadnumber = new atomicinteger( 1 ); private final string nameprefix; defaultthreadfactory() { securitymanager s = system.getsecuritymanager(); group = (s != null ) ? s.getthreadgroup() : thread.currentthread().getthreadgroup(); nameprefix = "pool-" + poolnumber.getandincrement() + "-thread-" ; } public thread newthread(runnable r) { thread t = new thread(group, r, nameprefix + threadnumber.getandincrement(), 0 ); if (t.isdaemon()) t.setdaemon( false ); if (t.getpriority() != thread.norm_priority) t.setpriority(thread.norm_priority); return t; } } |
completionservice接口
這里需要稍微提一下的是completionservice接口,它是用于描述順序獲取執行結果的一個線程池包裝器。它依賴一個具體的線程池調度,但是能夠根據任務的執行先后順序得到執行結果,這在某些情況下可能提高并發效率。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://www.cnblogs.com/shijiaqi1066/p/3412300.html