激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務(wù)器之家:專(zhuān)注于服務(wù)器技術(shù)及軟件下載分享
分類(lèi)導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語(yǔ)言|JavaScript|易語(yǔ)言|vb.net|

服務(wù)器之家 - 編程語(yǔ)言 - Java教程 - 并發(fā)編程之ThreadPoolExecutor線程池原理解析

并發(fā)編程之ThreadPoolExecutor線程池原理解析

2020-12-08 23:51今日頭條一角錢(qián)技術(shù) Java教程

在介紹線程池之前,我們先回顧下線程的基本知識(shí)。其中線程池包括ThreadPoolExecutor 默認(rèn)線程和ScheduledThreadPoolExecutor 定時(shí)線程池 ,本篇重點(diǎn)介紹ThreadPoolExecutor線程池。

 前言

在介紹線程池之前,我們先回顧下線程的基本知識(shí)。其中線程池包括ThreadPoolExecutor 默認(rèn)線程和ScheduledThreadPoolExecutor 定時(shí)線程池 ,本篇重點(diǎn)介紹ThreadPoolExecutor線程池。

線程

線程是調(diào)度CPU資源的最小單位,線程模型分為KLT模型與ULT模型,JVM使用的是KLT模型,Java線程與OS線程保持 1:1 的映射關(guān)系,也就是說(shuō)有一個(gè)Java線程也會(huì)在操作系統(tǒng)里有一個(gè)對(duì)應(yīng)的線程。

內(nèi)核線程模型

并發(fā)編程之ThreadPoolExecutor線程池原理解析

內(nèi)核線程(KLT):系統(tǒng)內(nèi)核管理線程(KLT),內(nèi)核保存線程的狀態(tài)和上下文信息,線程阻塞不會(huì)引起進(jìn)程阻塞。在多處理器系統(tǒng)上,多線程在多處理器上并行運(yùn)行。線程的創(chuàng)建、調(diào)度和管理由內(nèi)核完成,效率比ULT要慢,比進(jìn)程操作快。

用戶(hù)線程模型

并發(fā)編程之ThreadPoolExecutor線程池原理解析

用戶(hù)線程(ULT):用戶(hù)程序?qū)崿F(xiàn),不依賴(lài)操作系統(tǒng)核心,應(yīng)用提供創(chuàng)建、同步、調(diào)度和管理線程的函數(shù)來(lái)控制用戶(hù)線程。不需要用戶(hù)態(tài)/內(nèi)核態(tài)切換,速度快。內(nèi)核對(duì)ULT無(wú)感知,線程阻塞則進(jìn)程(包括它的所有線程)阻塞。

Java線程生命狀態(tài)

Java線程有多種生命狀態(tài):

  • NEW ,新建
  • RUNNABLE ,運(yùn)行
  • BLOCKED ,阻塞
  • WAITING ,等待
  • TIMED_WAITING ,超時(shí)等待
  • TERMINATED,終結(jié)

狀態(tài)切換如下圖所示:

并發(fā)編程之ThreadPoolExecutor線程池原理解析

Java線程實(shí)現(xiàn)方式

Java線程實(shí)現(xiàn)方式主要有四種:

  • 繼承Thread類(lèi)
  • 實(shí)現(xiàn)Runnable接口、
  • 實(shí)現(xiàn)Callable接口通過(guò)FutureTask包裝器來(lái)創(chuàng)建Thread線程、
  • 使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的多線程。

其中前兩種方式線程執(zhí)行完后都沒(méi)有返回值,后兩種是帶返回值的。

繼承Thread類(lèi)創(chuàng)建線程

Thread類(lèi)本質(zhì)上是實(shí)現(xiàn)了Runnable接口的一個(gè)實(shí)例,代表一個(gè)線程的實(shí)例。啟動(dòng)線程的唯一方法就是通過(guò)Thread類(lèi)的start()實(shí)例方法。start()方法是一個(gè)native方法,它將啟動(dòng)一個(gè)新線程,并執(zhí)行run()方法。這種方式實(shí)現(xiàn)多線程很簡(jiǎn)單,通過(guò)自己的類(lèi)直接extend Thread,并復(fù)寫(xiě)run()方法,就可以啟動(dòng)新線程并執(zhí)行自己定義的run()方法。例如:

  1. public class MyThread extends Thread {   
  2.   public void run() {   
  3.    System.out.println("關(guān)注一角錢(qián)技術(shù),獲取Java架構(gòu)資料");   
  4.   }   
  5. }   
  6.   
  7. MyThread myThread1 = new MyThread();   
  8. MyThread myThread2 = new MyThread();   
  9. myThread1.start();   
  10. myThread2.start(); 

實(shí)現(xiàn)Runnable接口創(chuàng)建線程

如果自己的類(lèi)已經(jīng)extends另一個(gè)類(lèi),就無(wú)法直接extends Thread,此時(shí),可以實(shí)現(xiàn)一個(gè)Runnable接口,如下:

  1. // 實(shí)現(xiàn)Runnable接口的類(lèi)將被Thread執(zhí)行,表示一個(gè)基本的任務(wù) 
  2. public interface Runnable { 
  3.     // run方法就是它所有的內(nèi)容,就是實(shí)際執(zhí)行的任務(wù) 
  4.     public abstract void run(); 

  1. public class MyThread implements Runnable {   
  2.   public void run() {   
  3.    System.out.println("關(guān)注一角錢(qián)技術(shù),獲取Java架構(gòu)資料");   
  4.   }   

為了啟動(dòng)MyThread,需要首先實(shí)例化一個(gè)Thread,并傳入自己的MyThread實(shí)例:

  1. MyThread myThread = new MyThread();   
  2. Thread thread = new Thread(myThread);   
  3. thread.start();   

事實(shí)上,當(dāng)傳入一個(gè)Runnable target參數(shù)給Thread后,Thread的run()方法就會(huì)調(diào)用target.run(),參考JDK源代碼:

  1. public void run() {   
  2.   if (target != null) {   
  3.    target.run();   
  4.   }   

實(shí)現(xiàn)Callable接口通過(guò)FutureTask包裝器來(lái)創(chuàng)建Thread線程

Callable接口(也只有一個(gè)方法)定義如下:

  1. public interface Callable<V> {  
  2.  V call() throws Exception;    
  3. }  

  1. //Callable同樣是任務(wù),與Runnable接口的區(qū)別在于它接收泛型,同時(shí)它執(zhí)行任務(wù)后帶有返回內(nèi)容 
  2. public class SomeCallable<V> implements Callable<V> { 
  3.  // 相對(duì)于run方法的帶有返回值的call方法 
  4.     @Override 
  5.     public V call() throws Exception { 
  6.         // TODO Auto-generated method stub 
  7.         return null
  8.     } 
  9.  

  1. Callable<V> oneCallable = new SomeCallable<V>();    
  2. //由Callable<Integer>創(chuàng)建一個(gè)FutureTask<Integer>對(duì)象:    
  3. FutureTask<V> oneTask = new FutureTask<V>(oneCallable); 
  4. //注釋?zhuān)篎utureTask<Integer>是一個(gè)包裝器,它通過(guò)接受Callable<Integer>來(lái)創(chuàng)建,它同時(shí)實(shí)現(xiàn)了Future和Runnable接口。 
  5. //由FutureTask<Integer>創(chuàng)建一個(gè)Thread對(duì)象:    
  6. Thread oneThread = new Thread(oneTask);    
  7. oneThread.start();    
  8. //至此,一個(gè)線程就創(chuàng)建完成了。 

使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的線程

ExecutorService、Callable、Future三個(gè)接口實(shí)際上都是屬于Executor框架。返回結(jié)果的線程是在JDK1.5中引入的新特征,有了這種特征就不需要再為了得到返回值而大費(fèi)周折了。而且自己實(shí)現(xiàn)了也可能漏洞百出。(下部分來(lái)講線程池了)

  • 可返回值的任務(wù)必須實(shí)現(xiàn)Callable接口。
  • 類(lèi)似的,無(wú)返回值的任務(wù)必須實(shí)現(xiàn)Runnable接口。

執(zhí)行Callable任務(wù)后,可以獲取一個(gè)Future的對(duì)象,在該對(duì)象上調(diào)用get就可以獲取到Callable任務(wù)返回的Object了。

  • 注意:get方法是阻塞的,即:線程無(wú)返回結(jié)果,get方法會(huì)一直等待。

再結(jié)合線程池接口ExecutorService就可以實(shí)現(xiàn)傳說(shuō)中有返回結(jié)果的多線程了。

下面提供了一個(gè)完整的有返回結(jié)果的多線程測(cè)試?yán)印4a如下:

  1. package com.niuh.thread.v4; 
  2.  
  3. import java.util.ArrayList; 
  4. import java.util.Date
  5. import java.util.List; 
  6. import java.util.concurrent.Callable; 
  7. import java.util.concurrent.ExecutionException; 
  8. import java.util.concurrent.ExecutorService; 
  9. import java.util.concurrent.Executors; 
  10. import java.util.concurrent.Future; 
  11.  
  12. /** 
  13.  * <p> 
  14.  * 使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的線程 
  15.  * </p> 
  16.  */ 
  17. public class MyThread { 
  18.      
  19.     public static void main(String[] args) throws ExecutionException, 
  20.             InterruptedException { 
  21.  
  22.         System.out.println(("----程序開(kāi)始運(yùn)行----")); 
  23.         Date date1 = new Date(); 
  24.  
  25.         int taskSize = 5; 
  26.         // 創(chuàng)建一個(gè)線程池 
  27.         ExecutorService pool = Executors.newFixedThreadPool(taskSize); 
  28.         // 創(chuàng)建多個(gè)有返回值的任務(wù) 
  29.         List<Future> list = new ArrayList<Future>(); 
  30.         for (int i = 0; i < taskSize; i++) { 
  31.             Callable c = new MyCallable(i + " "); 
  32.             // 執(zhí)行任務(wù)并獲取Future對(duì)象 
  33.             Future f = pool.submit(c); 
  34.             // System.out.println(">>>" + f.get().toString()); 
  35.             list.add(f); 
  36.         } 
  37.         // 關(guān)閉線程池 
  38.         pool.shutdown(); 
  39.  
  40.         // 獲取所有并發(fā)任務(wù)的運(yùn)行結(jié)果 
  41.         for (Future f : list) { 
  42.             // 從Future對(duì)象上獲取任務(wù)的返回值,并輸出到控制臺(tái) 
  43.             System.out.println(">>>" + f.get().toString()); 
  44.         } 
  45.  
  46.         Date date2 = new Date(); 
  47.         System.out.println("----程序結(jié)束運(yùn)行----,程序運(yùn)行時(shí)間【" 
  48.                 + (date2.getTime() - date1.getTime()) + "毫秒】"); 
  49.     } 
  50.  
  51. class MyCallable implements Callable<Object> { 
  52.     private String taskNum; 
  53.  
  54.     MyCallable(String taskNum) { 
  55.         this.taskNum = taskNum; 
  56.     } 
  57.  
  58.     public Object call() throws Exception { 
  59.         System.out.println(">>>" + taskNum + "任務(wù)啟動(dòng)"); 
  60.         Date dateTmp1 = new Date(); 
  61.         Thread.sleep(1000); 
  62.         Date dateTmp2 = new Date(); 
  63.         long time = dateTmp2.getTime() - dateTmp1.getTime(); 
  64.         System.out.println(">>>" + taskNum + "任務(wù)終止"); 
  65.         return taskNum + "任務(wù)返回運(yùn)行結(jié)果,當(dāng)前任務(wù)時(shí)間【" + time + "毫秒】"
  66.     } 

協(xié)程

協(xié)程(纖程,用戶(hù)級(jí)線程),目的是為了追求最大力度的發(fā)揮硬件性能和提升軟件的速度,協(xié)程基本原理是:在某個(gè)點(diǎn)掛起當(dāng)前的任務(wù),并且保存棧信息,去執(zhí)行另一個(gè)任務(wù);等完成或達(dá)到某個(gè)條件時(shí),再還原原來(lái)的棧信息并繼續(xù)執(zhí)行(整個(gè)過(guò)程不需要上下文切換)。

協(xié)程的概念很早就提出來(lái)了,但直到最近幾年才在某些語(yǔ)言(如Lua)中得到廣泛應(yīng)用。

協(xié)程的目的:當(dāng)我們?cè)谑褂枚嗑€程的時(shí)候,如果存在長(zhǎng)時(shí)間的I/O操作。這個(gè)時(shí)候線程一直處于阻塞狀態(tài),如果線程很多的時(shí)候,會(huì)存在很多線程處于空閑狀態(tài),造成了資源應(yīng)用不徹底。相對(duì)的協(xié)程不一樣了,在單線程中多個(gè)任務(wù)來(lái)回執(zhí)行如果出現(xiàn)長(zhǎng)時(shí)間的I/O操作,讓其讓出目前的協(xié)程調(diào)度,執(zhí)行下一個(gè)任務(wù)。當(dāng)然可能所有任務(wù),全部卡在同一個(gè)點(diǎn)上,但是這只是針對(duì)于單線程而言,當(dāng)所有數(shù)據(jù)正常返回時(shí),會(huì)同時(shí)處理當(dāng)前的I/O操作。

Java原生不支持協(xié)程,在純java代碼里需要使用協(xié)程的話(huà)需要引入第三方包,如:quasar

  1. <dependency> 
  2.  <groupId>co.paralleluniverse</groupId> 
  3.  <artifactId>quasar-core</artifactId> 
  4.  <version>0.8.0</version> 
  5.  <classifier>jdk8</classifier> 
  6. </dependency> 

線程池

“線程池”,顧名思義就是一個(gè)線程緩存,線程是稀缺資源,如果被無(wú)限制的創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,因此 Java 中提供線程池對(duì)線程進(jìn)行同一分配、調(diào)優(yōu)和監(jiān)控。

線程池介紹

在web開(kāi)發(fā)中,服務(wù)器需要接受并處理請(qǐng)求,所以會(huì)為一個(gè)請(qǐng)求分配一個(gè)線程來(lái)進(jìn)行處理。如果每次請(qǐng)求都創(chuàng)建一個(gè)線程的話(huà)實(shí)現(xiàn)起來(lái)非常簡(jiǎn)單,但是存在一個(gè)問(wèn)題:如果并發(fā)的請(qǐng)求數(shù)量非常多,但每個(gè)線程執(zhí)行的時(shí)間很短,這樣就會(huì)頻繁的創(chuàng)建和銷(xiāo)毀線程,如此一來(lái)會(huì)大大降低系統(tǒng)的效率。可能出現(xiàn)服務(wù)器在為每個(gè)請(qǐng)求創(chuàng)建新線程和銷(xiāo)毀線程上花費(fèi)的時(shí)間和消耗的系統(tǒng)資源要比處理實(shí)際的用戶(hù)請(qǐng)求的時(shí)間和資源更多。

那么有沒(méi)有一種辦法使執(zhí)行完一個(gè)任務(wù),并不被銷(xiāo)毀,而是可以繼續(xù)執(zhí)行其他的任務(wù)呢?

這就是線程池的目的。線程池為線程生命周期的開(kāi)銷(xiāo)和資源不足問(wèn)題提供了解決方案。通過(guò)對(duì)多個(gè)任務(wù)重用線程,線程創(chuàng)建的開(kāi)銷(xiāo)被分?jǐn)偟蕉鄠€(gè)任務(wù)上。

什么時(shí)候使用線程池?

  • 單個(gè)任務(wù)處理時(shí)間比較短;
  • 需要處理的任務(wù)數(shù)量很大。

線程池優(yōu)勢(shì)

  • 重用存在的線程。減少線程黃金、消亡的開(kāi)銷(xiāo),提高性能;
  • 提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等待線程創(chuàng)建就能立即執(zhí)行;
  • 提高線程的可管理性。線程是稀缺資源,如果無(wú)限制的創(chuàng)建,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行同一的分配、調(diào)優(yōu)和監(jiān)控。

Executor框架

Executor接口是線程池框架中最基礎(chǔ)的部分,定義來(lái)一個(gè)用于執(zhí)行 Runnable 的 execute 方法。下面為它的繼承與實(shí)現(xiàn)

并發(fā)編程之ThreadPoolExecutor線程池原理解析

ExecutorService接口

從圖中可以看出 Executor 下有一個(gè)重要的子接口 ExecutorService ,其中定義來(lái)線程池的具體行為

并發(fā)編程之ThreadPoolExecutor線程池原理解析
  • execute(Runnable command):履行Ruannable類(lèi)型的任務(wù);
  • submit(task):可用來(lái)提交Callable或Runnable任務(wù),并返回代表此任務(wù)的Future對(duì)象;
  • shutdown():在完成已提交的任務(wù)后封閉辦事,不再接管新任務(wù);
  • shutdownNow():停止所有正在履行的任務(wù)并封閉辦事;
  • isTerminated():測(cè)試是否所有任務(wù)都履行完畢了;
  • isShutdown():測(cè)試是否該ExecutorService已被關(guān)閉;
  • awaitTermination(long,TimeUnit):接收timeout和TimeUnit兩個(gè)參數(shù),用于設(shè)定超時(shí)時(shí)間及單位。當(dāng)?shù)却^(guò)設(shè)定時(shí)間時(shí),會(huì)監(jiān)測(cè)ExecutorService是否已經(jīng)關(guān)閉,若關(guān)閉則返回true,否則返回false。一般情況下會(huì)和shutdown方法組合使用;
  • invokeAll :作用是等待所有的任務(wù)執(zhí)行完成后統(tǒng)一返回;
  • invokeAny :將第一個(gè)得到的結(jié)果作為返回值,然后立刻終止所有的線程。如果設(shè)置了超時(shí)時(shí)間,未超時(shí)完成則正常返回結(jié)果,如果超時(shí)未完成則報(bào)超時(shí)異常。

AbstractExcutorService抽象類(lèi)

此類(lèi)的定義并沒(méi)有特殊的意義僅僅是實(shí)現(xiàn)了ExecutorService接口

并發(fā)編程之ThreadPoolExecutor線程池原理解析
  1. public abstract class AbstractExecutorService implements ExecutorService { 
  2.     //此方法很簡(jiǎn)單就是對(duì)runnable保證,將其包裝為一個(gè)FutureTask 
  3.     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 
  4.         return new FutureTask<T>(runnable, value); 
  5.     } 
  6.     //包裝callable為FutureTask 
  7.     //FutureTask其實(shí)就是對(duì)Callable的一個(gè)封裝 
  8.     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 
  9.         return new FutureTask<T>(callable); 
  10.     } 
  11.     //提交一個(gè)Runnable類(lèi)型的任務(wù) 
  12.     public Future<?> submit(Runnable task) { 
  13.         //如果為null則拋出NPE 
  14.         if (task == null) throw new NullPointerException(); 
  15.         //包裝任務(wù)為一個(gè)Future 
  16.         RunnableFuture<Void> ftask = newTaskFor(task, null); 
  17.         //將任務(wù)丟給執(zhí)行器,而此處會(huì)拋出拒絕異常,在講述ThreadPoolExecutor的時(shí)候有講述,不記得的讀者可以去再看看 
  18.         execute(ftask); 
  19.         return ftask; 
  20.     } 
  21.  
  22.     //與上方方法相同只不過(guò)指定了返回結(jié)果 
  23.     public <T> Future<T> submit(Runnable task, T result) { 
  24.         if (task == null) throw new NullPointerException(); 
  25.         RunnableFuture<T> ftask = newTaskFor(task, result); 
  26.         execute(ftask); 
  27.         return ftask; 
  28.     } 
  29.     //與上方方法相同只是換成了callable 
  30.     public <T> Future<T> submit(Callable<T> task) { 
  31.         if (task == null) throw new NullPointerException(); 
  32.         RunnableFuture<T> ftask = newTaskFor(task); 
  33.         execute(ftask); 
  34.         return ftask; 
  35.     } 
  36.  
  37.     //執(zhí)行集合tasks結(jié)果是最后一個(gè)執(zhí)行結(jié)束的任務(wù)結(jié)果 
  38.     //可以設(shè)置超時(shí) timed為true并且nanos是未來(lái)的一個(gè)時(shí)間 
  39.     //任何一個(gè)任務(wù)完成都將會(huì)返回結(jié)果 
  40.     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, 
  41.                               boolean timed, long nanos) 
  42.         throws InterruptedException, ExecutionException, TimeoutException { 
  43.         //傳入的任務(wù)集合不能為null 
  44.         if (tasks == null
  45.             throw new NullPointerException(); 
  46.         //傳入的任務(wù)數(shù)不能是0 
  47.         int ntasks = tasks.size(); 
  48.         if (ntasks == 0) 
  49.             throw new IllegalArgumentException(); 
  50.         //滿(mǎn)足上面的校驗(yàn)后將任務(wù)分裝到一個(gè)ArrayList中 
  51.         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks); 
  52.         //并且創(chuàng)建一個(gè)執(zhí)行器傳入this 
  53.         //這里簡(jiǎn)單講述他的執(zhí)行原理,傳入this會(huì)使用傳入的this(類(lèi)型為Executor)作為執(zhí)行器用于執(zhí)行任務(wù),當(dāng)submit提交任務(wù)的時(shí)候回將任務(wù) 
  54.         //封裝為一個(gè)內(nèi)部的Future并且重寫(xiě)他的done而此方法就是在future完成的時(shí)候調(diào)用的,而他的寫(xiě)法則是將當(dāng)前完成的future添加到esc 
  55.         //維護(hù)的結(jié)果隊(duì)列中 
  56.         ExecutorCompletionService<T> ecs = 
  57.             new ExecutorCompletionService<T>(this); 
  58.  
  59.         try { 
  60.             //創(chuàng)建一個(gè)執(zhí)行異常,以便后面拋出 
  61.             ExecutionException ee = null
  62.             //如果開(kāi)啟了超時(shí)則計(jì)算死線時(shí)間如果時(shí)間是0則代表沒(méi)有開(kāi)啟執(zhí)行超時(shí) 
  63.             final long deadline = timed ? System.nanoTime() + nanos : 0L; 
  64.             //獲取任務(wù)的迭代器 
  65.             Iterator<? extends Callable<T>> it = tasks.iterator(); 
  66.             //先獲取迭代器中的第一個(gè)任務(wù)提交給前面創(chuàng)建的ecs執(zhí)行器 
  67.             futures.add(ecs.submit(it.next())); 
  68.             //前面記錄的任務(wù)數(shù)減一 
  69.             --ntasks; 
  70.             //當(dāng)前激活數(shù)為1 
  71.             int active = 1; 
  72.             //進(jìn)入死循環(huán) 
  73.             for (;;) { 
  74.                 //獲取剛才提價(jià)的任務(wù)是否完成如果完成則f不是null否則為null 
  75.                 Future<T> f = ecs.poll(); 
  76.                 //如果為null則代表任務(wù)還在繼續(xù) 
  77.                 if (f == null) { 
  78.                     //如果當(dāng)前任務(wù)大于0 說(shuō)明除了剛才的任務(wù)還有別的任務(wù)存在 
  79.                     if (ntasks > 0) { 
  80.                         //則任務(wù)數(shù)減一 
  81.                         --ntasks; 
  82.                         //并且再次提交新的任務(wù) 
  83.                         futures.add(ecs.submit(it.next())); 
  84.                         //當(dāng)前的存活的執(zhí)行任務(wù)加一 
  85.                         ++active; 
  86.                     } 
  87.                     //如果當(dāng)前存活任務(wù)數(shù)是0則代表沒(méi)有任務(wù)在執(zhí)行了從而跳出循環(huán) 
  88.                     else if (active == 0) 
  89.                         break; 
  90.                     //如果當(dāng)前任務(wù)執(zhí)行設(shè)置了超時(shí)時(shí)間 
  91.                     else if (timed) { 
  92.                         //則設(shè)置指定的超時(shí)時(shí)間獲取 
  93.                         f = ecs.poll(nanos, TimeUnit.NANOSECONDS); 
  94.                         //等待執(zhí)行超時(shí)還沒(méi)有獲取到則拋出超時(shí)異常 
  95.                         if (f == null
  96.                             throw new TimeoutException(); 
  97.                         //否則使用當(dāng)前時(shí)間計(jì)算剩下的超時(shí)時(shí)間用于下一個(gè)循環(huán)使用 
  98.                         nanos = deadline - System.nanoTime(); 
  99.                     } 
  100.                     //如果沒(méi)有設(shè)置超時(shí)則直接獲取任務(wù) 
  101.                     else 
  102.                         f = ecs.take(); 
  103.                 } 
  104.                 //如果獲取到了任務(wù)結(jié)果f!=null 
  105.                 if (f != null) { 
  106.                     //激活數(shù)減一 
  107.                     --active; 
  108.                     try { 
  109.                         //返回獲取到的結(jié)果 
  110.                         return f.get(); 
  111.                         //如果獲取結(jié)果出錯(cuò)則包裝異常 
  112.                     } catch (ExecutionException eex) { 
  113.                         ee = eex; 
  114.                     } catch (RuntimeException rex) { 
  115.                         ee = new ExecutionException(rex); 
  116.                     } 
  117.                 } 
  118.             } 
  119.             //如果異常不是null則拋出如果是則創(chuàng)建一個(gè) 
  120.             if (ee == null
  121.                 ee = new ExecutionException(); 
  122.             throw ee; 
  123.  
  124.         } finally { 
  125.             //其他任務(wù)則設(shè)置取消 
  126.             for (int i = 0, size = futures.size(); i < size; i++) 
  127.                 futures.get(i).cancel(true); 
  128.         } 
  129.     } 
  130.     //對(duì)上方方法的封裝 
  131.     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 
  132.         throws InterruptedException, ExecutionException { 
  133.         try { 
  134.             return doInvokeAny(tasks, false, 0); 
  135.         } catch (TimeoutException cannotHappen) { 
  136.             assert false
  137.             return null
  138.         } 
  139.     } 
  140.     //對(duì)上方法的封裝 
  141.     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 
  142.                            long timeout, TimeUnit unit) 
  143.         throws InterruptedException, ExecutionException, TimeoutException { 
  144.         return doInvokeAny(tasks, true, unit.toNanos(timeout)); 
  145.     } 
  146.     //相對(duì)于上一個(gè)方法執(zhí)行成功任何一個(gè)則返回結(jié)果而此方法是全部執(zhí)行完然后統(tǒng)一返回結(jié)果 
  147.     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
  148.         throws InterruptedException { 
  149.         //傳入的任務(wù)集合不能是null 
  150.         if (tasks == null
  151.             throw new NullPointerException(); 
  152.         //創(chuàng)建一個(gè)集合用來(lái)保存獲取到的執(zhí)行future 
  153.         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 
  154.         //任務(wù)是否執(zhí)行完成 
  155.         boolean done = false
  156.         try { 
  157.             //遍歷傳入的任務(wù)并且調(diào)用執(zhí)行方法將創(chuàng)建的future添加到集合中 
  158.             for (Callable<T> t : tasks) { 
  159.                 RunnableFuture<T> f = newTaskFor(t); 
  160.                 futures.add(f); 
  161.                 execute(f); 
  162.             } 
  163.             //遍歷獲取到的future 
  164.             for (int i = 0, size = futures.size(); i < size; i++) { 
  165.                 Future<T> f = futures.get(i); 
  166.                 //如果當(dāng)前任務(wù)沒(méi)有成功則進(jìn)行f.get方法等待此方法執(zhí)行成功,如果方法執(zhí)行異常或者被取消將忽略異常 
  167.                 if (!f.isDone()) { 
  168.                     try { 
  169.                         f.get(); 
  170.                     } catch (CancellationException ignore) { 
  171.                     } catch (ExecutionException ignore) { 
  172.                     } 
  173.                 } 
  174.             } 
  175.             //到這一步則代表所有的任務(wù)都已經(jīng)有了確切的結(jié)果 
  176.             done = true
  177.             //返回任務(wù)結(jié)果集合 
  178.             return futures; 
  179.         } finally { 
  180.             //如果不是truefalse 則代表執(zhí)行過(guò)程中被中斷了則需要對(duì)任務(wù)進(jìn)行取消操作,如果正常完成則不會(huì)被取消 
  181.             if (!done) 
  182.                 for (int i = 0, size = futures.size(); i < size; i++) 
  183.                     futures.get(i).cancel(true); 
  184.         } 
  185.     } 
  186.     //與上方方法的區(qū)別在于對(duì)于任務(wù)集合可以設(shè)置超時(shí)時(shí)間 
  187.     //這里會(huì)針對(duì)差異進(jìn)行講解 
  188.     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, 
  189.                                          long timeout, TimeUnit unit) 
  190.         throws InterruptedException { 
  191.         if (tasks == null
  192.             throw new NullPointerException(); 
  193.         //計(jì)算設(shè)置時(shí)長(zhǎng)的納秒時(shí)間 
  194.         long nanos = unit.toNanos(timeout); 
  195.         ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 
  196.         boolean done = false
  197.         try { 
  198.             for (Callable<T> t : tasks) 
  199.                 futures.add(newTaskFor(t)); 
  200.             //計(jì)算最終計(jì)算的確切時(shí)間點(diǎn),運(yùn)行時(shí)長(zhǎng)不能超過(guò)此時(shí)間也就是時(shí)間死線 
  201.             //這里是個(gè)細(xì)節(jié)future創(chuàng)建的時(shí)間并沒(méi)有算作執(zhí)行時(shí)間 
  202.             final long deadline = System.nanoTime() + nanos; 
  203.             //獲取當(dāng)前結(jié)果數(shù) 
  204.             final int size = futures.size(); 
  205.             //遍歷將任務(wù)進(jìn)行執(zhí)行 
  206.             for (int i = 0; i < size; i++) { 
  207.                 execute((Runnable)futures.get(i)); 
  208.                 //并且每次都計(jì)算死線 
  209.                 nanos = deadline - System.nanoTime(); 
  210.                 //如果時(shí)間已經(jīng)超過(guò)則返回結(jié)果 
  211.                 if (nanos <= 0L) 
  212.                     return futures; 
  213.             } 
  214.             //否則遍歷future確定每次執(zhí)行都獲取到了結(jié)果 
  215.             for (int i = 0; i < size; i++) { 
  216.                 Future<T> f = futures.get(i); 
  217.                 if (!f.isDone()) { 
  218.                     //如果在等待過(guò)程中已經(jīng)超時(shí)則返回當(dāng)前等待結(jié)合 
  219.                     if (nanos <= 0L) 
  220.                         return futures; 
  221.                     try { 
  222.                         //如果沒(méi)有超過(guò)死線則設(shè)置從future中獲取結(jié)果的時(shí)間如果超過(guò)則會(huì)派出timeout 
  223.                         f.get(nanos, TimeUnit.NANOSECONDS); 
  224.                     } catch (CancellationException ignore) { 
  225.                     } catch (ExecutionException ignore) { 
  226.                     } catch (TimeoutException toe) { 
  227.                         //拋出了異常則會(huì)返回當(dāng)前的列表 
  228.                         return futures; 
  229.                     } 
  230.                     //計(jì)算最新的超時(shí)時(shí)間 
  231.                     nanos = deadline - System.nanoTime(); 
  232.                 } 
  233.             } 
  234.             //之前的返回都沒(méi)有設(shè)置為true所以在finally中都會(huì)設(shè)置為取消唯獨(dú)正常執(zhí)行完成到此處返回的結(jié)果才是最終的結(jié)果 
  235.             done = true
  236.             return futures; 
  237.         } finally { 
  238.             if (!done) 
  239.                 for (int i = 0, size = futures.size(); i < size; i++) 
  240.                     futures.get(i).cancel(true); 
  241.         } 
  242.     } 
  243.  

線程池的具體實(shí)現(xiàn)

并發(fā)編程之ThreadPoolExecutor線程池原理解析
  • ThreadPoolExecutor 默認(rèn)線程池
  • ScheduledThreadPoolExecutor 定時(shí)線程池 (下篇再做介紹)

ThreadPoolExecutor

線程池重點(diǎn)屬性

  1. //用來(lái)標(biāo)記線程池狀態(tài)(高3位),線程個(gè)數(shù)(低29位) 
  2. //默認(rèn)是RUNNING狀態(tài),線程個(gè)數(shù)為0 
  3. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 
  4.  
  5. //線程個(gè)數(shù)掩碼位數(shù),并不是所有平臺(tái)int類(lèi)型是32位,所以準(zhǔn)確說(shuō)是具體平臺(tái)下Integer的二進(jìn)制位數(shù)-3后的剩余位數(shù)才是線程的個(gè)數(shù), 
  6. private static final int COUNT_BITS = Integer.SIZE - 3; 
  7.  
  8. //線程最大個(gè)數(shù)(低29位)000 11111111111111111111111111111 
  9. private static final int CAPACITY   = (1 << COUNT_BITS) - 1; 

ctl 是對(duì)線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個(gè)字段, 它包含兩部分的信息: 線程池的運(yùn)行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),這里可以看到,使用了Integer類(lèi)型來(lái)保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個(gè)1),這個(gè)常量表示workerCount的上限值,大約是5億。

ctl相關(guān)方法

  • runStateOf:獲取運(yùn)行狀態(tài);
  • workerCountOf:獲取活動(dòng)線程數(shù);
  • ctlOf:獲取運(yùn)行狀態(tài)和活動(dòng)線程數(shù)的值。
  1. / 獲取高三位 運(yùn)行狀態(tài) 
  2. private static int runStateOf(int c)     { return c & ~CAPACITY; } 
  3.  
  4. //獲取低29位 線程個(gè)數(shù) 
  5. private static int workerCountOf(int c)  { return c & CAPACITY; } 
  6.  
  7. //計(jì)算ctl新值,線程狀態(tài) 與 線程個(gè)數(shù) 
  8. private static int ctlOf(int rs, int wc) { return rs | wc; } 

線程池存在5種狀態(tài)

  1. //運(yùn)行中 111 00000000000000000000000000000 
  2. private static final int RUNNING    = -1 << COUNT_BITS; 
  3. //關(guān)閉 000 00000000000000000000000000000 
  4. private static final int SHUTDOWN   =  0 << COUNT_BITS; 
  5. //停止 001 00000000000000000000000000000 
  6. private static final int STOP       =  1 << COUNT_BITS; 
  7. //整理 010 00000000000000000000000000000 
  8. private static final int TIDYING    =  2 << COUNT_BITS; 
  9. //終止 011 00000000000000000000000000000 
  10. private static final int TERMINATED =  3 << COUNT_BITS; 

使用一個(gè)整形,前3位表示狀態(tài),后29位表示線程容量,也就是說(shuō)線程最多有 230−1 個(gè)

并發(fā)編程之ThreadPoolExecutor線程池原理解析

也可以看出當(dāng)ctl小于零表示線程池仍在運(yùn)行

RUNNING

  • 狀態(tài)說(shuō)明:線程池處在RUNNING狀態(tài)時(shí),能夠接收新任務(wù),以及對(duì)已添加的任務(wù)進(jìn)行處理。
  • 狀態(tài)切換:線程池的初始化狀態(tài)是RUNNING。換句話(huà)說(shuō),線程池被一旦被創(chuàng)建,就處于RUNNING狀態(tài),并且線程池中的任務(wù)數(shù)為0!

SHUTDOWN

  • 狀態(tài)說(shuō)明:線程池處在SHUTDOWN狀態(tài)時(shí),不接收新任務(wù),但能處理已添加的任務(wù)。
  • 狀態(tài)切換:調(diào)用線程池的shutdown()接口時(shí),線程池由RUNNING -> SHUTDOWN。

STOP

  • 狀態(tài)說(shuō)明:線程池處在STOP狀態(tài)時(shí),不接收新任務(wù),不處理已添加的任務(wù),并且會(huì)中斷正在處理的任務(wù)。
  • 狀態(tài)切換:調(diào)用線程池的shutdownNow()接口時(shí),線程池由(RUNNING or SHUTDOWN ) -> STOP。

TIDYING

  • 狀態(tài)說(shuō)明:當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,線程池會(huì)變?yōu)門(mén)IDYING狀態(tài)。當(dāng)線程池變?yōu)門(mén)IDYING狀態(tài)時(shí),會(huì)執(zhí)行鉤子函數(shù)terminated()。terminated()在ThreadPoolExecutor類(lèi)中是空的,若用戶(hù)想在線程池變?yōu)門(mén)IDYING時(shí),進(jìn)行相應(yīng)的處理;可以通過(guò)重載terminated()函數(shù)來(lái)實(shí)現(xiàn)。
  • 狀態(tài)切換:當(dāng)線程池在SHUTDOWN狀態(tài)下,阻塞隊(duì)列為空并且線程池中執(zhí)行的任務(wù)也為空時(shí),就會(huì)由 SHUTDOWN -> TIDYING。 當(dāng)線程池在STOP狀態(tài)下,線程池中執(zhí)行的任務(wù)為空時(shí),就會(huì)由STOP -> TIDYING。

TERMINATED

  • 狀態(tài)說(shuō)明:線程池徹底終止,就變成TERMINATED狀態(tài)。
  • 狀態(tài)切換:線程池處在TIDYING狀態(tài)時(shí),執(zhí)行完terminated()之后,就會(huì)由 TIDYING -> TERMINATED。

進(jìn)入TERMINATED的條件如下:

  • 線程池不是RUNNING狀態(tài);
  • 線程池狀態(tài)不是TIDYING狀態(tài)或TERMINATED狀態(tài);
  • 如果線程池狀態(tài)是SHUTDOWN并且workerQueue為空;
  • workerCount為0;
  • 設(shè)置TIDYING狀態(tài)成功。
并發(fā)編程之ThreadPoolExecutor線程池原理解析

線程池參數(shù)

corePoolSize

線程池中的核心線程數(shù),當(dāng)提交一個(gè)任務(wù)時(shí),線程池創(chuàng)建一個(gè)新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行;如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有核心線程。

maximumPoolSize

線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊(duì)列滿(mǎn)了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize;

keepAliveTim

線程池維護(hù)線程所允許的空閑時(shí)間。當(dāng)線程池中的線程數(shù)量大于corePoolSize的時(shí)候,如果這時(shí)沒(méi)有新的任務(wù)提交,核心線程外的線程不會(huì)立即銷(xiāo)毀,而是會(huì)等待,直到等待的時(shí)間超過(guò)了keepAliveTime;

unit

keepAliveTime的單位;

workQueue

用來(lái)保存等待被執(zhí)行的任務(wù)的阻塞隊(duì)列,且任務(wù)必須實(shí)現(xiàn)Runable接口,在JDK中提供了如下阻塞隊(duì)列:

1、ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,按FIFO排序任務(wù);

2、LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene;

3、SynchronousQuene:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQuene;

4、priorityBlockingQuene:具有優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列;

threadFactory

它是ThreadFactory類(lèi)型的變量,用來(lái)創(chuàng)建新線程。默認(rèn)使用Executors.defaultThreadFactory() 來(lái)創(chuàng)建線程。使用默認(rèn)的ThreadFactory來(lái)創(chuàng)建線程時(shí),會(huì)使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級(jí)并且是非守護(hù)線程,同時(shí)也設(shè)置了線程的名稱(chēng)。

handler

線程池的飽和策略,當(dāng)阻塞隊(duì)列滿(mǎn)了,且沒(méi)有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了4種策略:

  1. AbortPolicy:直接拋出異常,默認(rèn)策略;
  2. CallerRunsPolicy:用調(diào)用者所在的線程來(lái)執(zhí)行任務(wù);
  3. DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
  4. DiscardPolicy:直接丟棄任務(wù);

上面的4種策略都是ThreadPoolExecutor的內(nèi)部類(lèi)。

并發(fā)編程之ThreadPoolExecutor線程池原理解析

當(dāng)然也可以根據(jù)應(yīng)用場(chǎng)景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲(chǔ)不能處理的任務(wù)。

線程池的創(chuàng)建

有四個(gè)構(gòu)造函數(shù),其他三個(gè)都是調(diào)用下面代碼中的這個(gè)構(gòu)造函數(shù)

  1. public ThreadPoolExecutor(int corePoolSize, 
  2.                           int maximumPoolSize, 
  3.                           long keepAliveTime, 
  4.                           TimeUnit unit, 
  5.                           BlockingQueue<Runnable> workQueue, 
  6.                           ThreadFactory threadFactory, 
  7.                           RejectedExecutionHandler handler)  

線程池監(jiān)控

  1. public long getTaskCount() //線程池已執(zhí)行與未執(zhí)行的任務(wù)總數(shù) 
  2. public long getCompletedTaskCount() //已完成的任務(wù)數(shù) 
  3. public int getPoolSize() //線程池當(dāng)前的線程數(shù) 
  4. public int getActiveCount() //線程池中正在執(zhí)行任務(wù)的線程數(shù)量 

線程池原理

并發(fā)編程之ThreadPoolExecutor線程池原理解析

核心方法分析

由于篇幅有限,核心方法解析請(qǐng)閱讀文末的擴(kuò)展鏈接。

PS:以上代碼提交在 Github :

https://github.com/Niuh-Study/niuh-juc-final.git

原文地址:https://www.toutiao.com/i6903537887346819595/

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 91短视频在线播放 | 91 在线免费观看 | 关键词 | 久久99国产精品免费网站 | 国产88久久久国产精品免费二区 | 黄色男女视频 | chinese-xvideos| 久草视频手机在线观看 | 日韩免费黄色 | 精品国产成人 | 国产免费网站视频 | 牛牛碰在线| 国产亚洲精品综合一区91 | 亚洲3atv精品一区二区三区 | 羞羞色院91精品网站 | 蜜桃av网 | 91看片在线观看视频 | 黄色免费不卡视频 | av在线网站观看 | 成人黄色在线免费观看 | 中国美女一级黄色大片 | a集毛片 | 韩国一级免费视频 | 最近中文字幕一区二区 | 曰韩毛片| 亚洲午夜天堂吃瓜在线 | 久久人体| 精品国产一区二区三区久久久 | 久久亚洲线观看视频 | 黄在线观看在线播放720p | 国产91久久精品一区二区 | 国产精品99久久久久久久vr | 自拍偷拍亚洲图片 | 亚洲成人午夜精品 | av在线播放亚洲 | 在线成人免费av | 日韩视频一区二区三区在线观看 | 国产精品一区二区免费在线观看 | 亚洲精品7777xxxx青睐 | 99热99精品 | 亚洲射情 |