前言:
并行編程勢不可擋,Java從1.7開始就提供了Fork/Join 支持并行處理。java1.8 進一步加強。
并行處理就是將任務拆分子任務,分發給多個處理器同時處理,之后合并。
1、Stream API
Java 8 引入了許多特性,Stream API
是其中重要的一部分。區別 InputStream OutputStream
,Stream API
是處理對象流而不是字節流。
執行原理如下,流分串行和并行兩種執行方式
// 串行執行流 stream().filter(e -> e > 10).count(); // 并行執行流 .parallelStream().filter(e -> e > 10).count()
2、ParallelStreams執行原理
并行執行時,java
將流劃分為多個子流,分散在不同CPU并行處理,然后進行合并。
并行一定比串行更快嗎?這不一定,取決于兩方面條件:
- 處理器核心數量,并行處理核心數越多自然處理效率會更高。
- 處理的數據量越大,優勢越強。這也很好理解,比如十個人干一個人就能完成的活兒會比它自己干更便宜?
3、ParallelStreams注意事項
使用并行流時,不要使用collectors.groupingBy,collectors.toMap
,替代為
collectors.groupingByConcurrent , collectors.toConcurrentMap
,或直接使用串行流。
原因,并行流執行時,通過操作Key來合并多個map的操作比較昂貴。詳細大家可以查看官網介紹。
https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html#concurrent_reduction
Map<String, List<Person>> byGender = roster .stream() .collect(Collectors.groupingBy(Person::getGender)); ConcurrentMap<String, List<Person>> byGender = roster .parallelStream() .collect(Collectors.groupingByConcurrent(Person::getGender));
ParallelStreams
默認使用 ForkJoinPool.commonPool()
線程池。
注意:默認情況下,你寫的 ParallelStreams 都是通過該線程池調度執行,整個應用程序都共享這個線程池。
看一個例子,我們查詢一批新聞數據,可以利用并行化來處理遠程新聞下載。
public List<News> queryNews(Stream<String> ids) { return ids.parallel() .map(this::getNews) // 網絡操作,新聞下載 .collect(toList()); }
因為是網絡操作,存在很多不確定性,假如某個任務運行時間較長,導致線程池資源占據,阻塞其它線程,這樣就阻止了其他的并行流任務正常進行。
如果解決這個問題的其中一種方式,進行線程池隔離
。那么如何自定義并行流的線程池呢?
ForkJoinPool
構造參數我們默認設置為CPU核心數。
ForkJoinPool customThreadPool = new ForkJoinPool(4); long actualTotal = customThreadPool .submit(() -> roster.parallelStream().reduce(0, Integer::sum)).get();
總結:
Java 1.8
提供的Stream API
簡化了代碼,很好用。不過在使用過程中應該注意以上問題。
到此這篇關于Java8
中的并行流 ParallelStreams
的文章就介紹到這了,更多相關Java8 ParallelStreams
內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://www.onlythinking.com/2020/06/05/%E4%BD%A0%E5%9C%A8%E4%BD%BF%E7%94%A8java-8-parallel-streams-%E5%90%97%EF%BC%9F/