激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|

服務(wù)器之家 - 編程語言 - JAVA教程 - 支持生產(chǎn)阻塞的Java線程池

支持生產(chǎn)阻塞的Java線程池

2019-11-19 14:39java教程網(wǎng) JAVA教程

在各種并發(fā)編程模型中,生產(chǎn)者-消費(fèi)者模式大概是最常用的了。在實(shí)際工作中,對于生產(chǎn)消費(fèi)的速度,通常需要做一下權(quán)衡

通常來說,生產(chǎn)任務(wù)的速度要大于消費(fèi)的速度。一個(gè)細(xì)節(jié)問題是,隊(duì)列長度,以及如何匹配生產(chǎn)和消費(fèi)的速度。

一個(gè)典型的生產(chǎn)者-消費(fèi)者模型如下:

支持生產(chǎn)阻塞的Java線程池

在并發(fā)環(huán)境下利用J.U.C提供的Queue實(shí)現(xiàn)可以很方便地保證生產(chǎn)和消費(fèi)過程中的線程安全。這里需要注意的是,Queue必須設(shè)置初始容量,防止生產(chǎn)者生產(chǎn)過快導(dǎo)致隊(duì)列長度暴漲,最終觸發(fā)OutOfMemory。

 

對于一般的生產(chǎn)快于消費(fèi)的情況。當(dāng)隊(duì)列已滿時(shí),我們并不希望有任何任務(wù)被忽略或得不到執(zhí)行,此時(shí)生產(chǎn)者可以等待片刻再提交任務(wù),更好的做法是,把生產(chǎn)者阻塞在提交任務(wù)的方法上,待隊(duì)列未滿時(shí)繼續(xù)提交任務(wù),這樣就沒有浪費(fèi)的空轉(zhuǎn)時(shí)間了。阻塞這一點(diǎn)也很容易,BlockingQueue就是為此打造的,ArrayBlockingQueue和LinkedBlockingQueue在構(gòu)造時(shí)都可以提供容量做限制,其中LinkedBlockingQueue是在實(shí)際操作隊(duì)列時(shí)在每次拿到鎖以后判斷容量。

更進(jìn)一步,當(dāng)隊(duì)列為空時(shí),消費(fèi)者拿不到任務(wù),可以等一會兒再拿,更好的做法是,用BlockingQueue的take方法,阻塞等待,當(dāng)有任務(wù)時(shí)便可以立即獲得執(zhí)行,建議調(diào)用take的帶超時(shí)參數(shù)的重載方法,超時(shí)后線程退出。這樣當(dāng)生產(chǎn)者事實(shí)上已經(jīng)停止生產(chǎn)時(shí),不至于讓消費(fèi)者無限等待。

于是一個(gè)高效的支持阻塞的生產(chǎn)消費(fèi)模型就實(shí)現(xiàn)了。

等一下,既然J.U.C已經(jīng)幫我們實(shí)現(xiàn)了線程池,為什么還要采用這一套東西?直接用ExecutorService不是更方便?

我們來看一下ThreadPoolExecutor的基本結(jié)構(gòu):

支持生產(chǎn)阻塞的Java線程池

可以看到,在ThreadPoolExecutor中,BlockingQueue和Consumer部分已經(jīng)幫我們實(shí)現(xiàn)好了,并且直接采用線程池的實(shí)現(xiàn)還有很多優(yōu)勢,例如線程數(shù)的動(dòng)態(tài)調(diào)整等。

 

但問題在于,即便你在構(gòu)造ThreadPoolExecutor時(shí)手動(dòng)指定了一個(gè)BlockingQueue作為隊(duì)列實(shí)現(xiàn),事實(shí)上當(dāng)隊(duì)列滿時(shí),execute方法并不會阻塞,原因在于ThreadPoolExecutor調(diào)用的是BlockingQueue非阻塞的offer方法:

 

復(fù)制代碼代碼如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

 

這時(shí)候就需要做一些事情來達(dá)成一個(gè)結(jié)果:當(dāng)生產(chǎn)者提交任務(wù),而隊(duì)列已滿時(shí),能夠讓生產(chǎn)者阻塞住,等待任務(wù)被消費(fèi)。

關(guān)鍵在于,在并發(fā)環(huán)境下,隊(duì)列滿不能由生產(chǎn)者去判斷,不能調(diào)用ThreadPoolExecutor.getQueue().size()來判斷隊(duì)列是否滿。

線程池的實(shí)現(xiàn)中,當(dāng)隊(duì)列滿時(shí)會調(diào)用構(gòu)造時(shí)傳入的RejectedExecutionHandler去拒絕任務(wù)的處理。默認(rèn)的實(shí)現(xiàn)是AbortPolicy,直接拋出一個(gè)RejectedExecutionException。

幾種拒絕策略在這里就不贅述了,這里和我們的需求比較接近的是CallerRunsPolicy,這種策略會在隊(duì)列滿時(shí),讓提交任務(wù)的線程去執(zhí)行任務(wù),相當(dāng)于讓生產(chǎn)者臨時(shí)去干了消費(fèi)者干的活兒,這樣生產(chǎn)者雖然沒有被阻塞,但提交任務(wù)也會被暫停。

 

復(fù)制代碼代碼如下:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a <tt>CallerRunsPolicy</tt>.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

 

但這種策略也有隱患,當(dāng)生產(chǎn)者較少時(shí),生產(chǎn)者消費(fèi)任務(wù)的時(shí)間里,消費(fèi)者可能已經(jīng)把任務(wù)都消費(fèi)完了,隊(duì)列處于空狀態(tài),當(dāng)生產(chǎn)者執(zhí)行完任務(wù)后才能再繼續(xù)生產(chǎn)任務(wù),這個(gè)過程中可能導(dǎo)致消費(fèi)者線程的饑餓。

參考類似的思路,最簡單的做法,我們可以直接定義一個(gè)RejectedExecutionHandler,當(dāng)隊(duì)列滿時(shí)改為調(diào)用BlockingQueue.put來實(shí)現(xiàn)生產(chǎn)者的阻塞:

 

復(fù)制代碼代碼如下:

new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                if (!executor.isShutdown()) {
                        try {
                                executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                                // should not be interrupted
                        }
                }
        }
};

 

這樣,我們就無需再關(guān)心Queue和Consumer的邏輯,只要把精力集中在生產(chǎn)者和消費(fèi)者線程的實(shí)現(xiàn)邏輯上,只管往線程池提交任務(wù)就行了。

相比最初的設(shè)計(jì),這種方式的代碼量能減少不少,而且能避免并發(fā)環(huán)境的很多問題。當(dāng)然,你也可以采用另外的手段,例如在提交時(shí)采用信號量做入口限制等,但是如果僅僅是要讓生產(chǎn)者阻塞,那就顯得復(fù)雜了。

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 青青草成人自拍 | 91丨九色丨国产在线观看 | 娇妻被各种姿势c到高潮小说 | 欧美日韩免费看 | 欧美另类视频一区 | 欧美特级黄色 | 在线中文字幕观看 | 极品销魂一区二区三区 | www69xxxxx| 色综合久久久久综合99 | 欧美日韩免费在线观看视频 | 久国产精品视频 | 亚洲欧美日韩精品久久亚洲区 | 91美女视频在线观看 | 久久人人爽人人爽人人片av免费 | 一区二区精品视频在线观看 | 97香蕉超级碰碰久久免费软件 | 成人不卡 | 中文字幕爱爱视频 | 国产亚洲精品视频中文字幕 | 精品无码久久久久久国产 | 粉嫩粉嫩一区二区三区在线播放 | 欧美成人精品不卡视频在线观看 | av免费在线网 | 午夜视频在线观看免费视频 | 真人一级毛片免费 | 亚洲电影免费观看国语版 | 国产免费观看一区二区三区 | 午夜精品一区二区三区免费 | 欧美一级黄色免费看 | 国产精品久久99精品毛片三a | 免费中文视频 | 12av毛片 | 12av毛片| 成人在线精品视频 | av电影在线观看网站 | 一级在线视频 | 国产xxxxx在线观看 | 午夜精品福利视频 | 少妇一级淫片免费放正片 | 亚洲爱爱图 |