摘要:進(jìn)程線程與協(xié)程它們都是并行機(jī)制的解決方案。選擇是任意性的,并在對(duì)實(shí)現(xiàn)做出決定時(shí)發(fā)生。線程池的大小一旦達(dá)到最大值就會(huì)保持不變,如果某個(gè)線程因?yàn)閳?zhí)行異常而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新線程。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。
并發(fā)與并行的概念
并發(fā)(Concurrency): 問(wèn)題域中的概念—— 程序需要被設(shè)計(jì)成能夠處理多個(gè)同時(shí)(或者幾乎同時(shí))發(fā)生的事件
并行(Parallelism): 方法域中的概念——通過(guò)將問(wèn)題中的多個(gè)部分 并行執(zhí)行,來(lái)加速解決問(wèn)題。
進(jìn)程、線程與協(xié)程它們都是并行機(jī)制的解決方案。
進(jìn)程: 進(jìn)程是什么呢?直白地講,進(jìn)程就是應(yīng)用程序的啟動(dòng)實(shí)例。比如我們運(yùn)行一個(gè)游戲,打開一個(gè)軟件,就是開啟了一個(gè)進(jìn)程。進(jìn)程擁有代碼和打開的文件資源、數(shù)據(jù)資源、獨(dú)立的內(nèi)存空間。啟動(dòng)一個(gè)進(jìn)程非常消耗資源,一般一臺(tái)機(jī)器最多啟動(dòng)數(shù)百個(gè)進(jìn)程。
線程: 線程從屬于進(jìn)程,是程序的實(shí)際執(zhí)行者。一個(gè)進(jìn)程至少包含一個(gè)主線程,也可以有更多的子線程。線程擁有自己的??臻g。在進(jìn)程內(nèi)啟動(dòng)線程也要消耗一定的資源,一般一個(gè)進(jìn)程最多啟動(dòng)數(shù)千個(gè)線程。操作系統(tǒng)能夠調(diào)度的最小單位就是線程了。
協(xié)程: 協(xié)程又從屬于線程,它不屬于操作系統(tǒng)管轄,完全由程序控制,一個(gè)線程內(nèi)可以啟動(dòng)數(shù)萬(wàn)甚至數(shù)百萬(wàn)協(xié)程。但也正是因?yàn)樗沙绦蚩刂?,它?duì)編寫代碼的風(fēng)格改變也最多。
Java的并行執(zhí)行實(shí)現(xiàn) JVM中的線程主線程: 獨(dú)立生命周期的線程
守護(hù)線程: 被主線程創(chuàng)建,隨著創(chuàng)建線程結(jié)束而結(jié)束
線程狀態(tài)要注意的是,線程不是調(diào)用start之后馬上進(jìn)入運(yùn)行中的狀態(tài),而是在"可運(yùn)行"狀態(tài),由操作系統(tǒng)來(lái)決定調(diào)度哪個(gè)線程來(lái)運(yùn)行。
Jetty中的線程Web服務(wù)器都有自己管理的線程池, 比如輕量級(jí)的Jetty, 就有以下三種類型的線程:
Acceptor
Selector
Worker
最原始的多線程——Thread類 繼承類 vs 實(shí)現(xiàn)接口繼承Thread類
實(shí)現(xiàn)Runnable接口
實(shí)際使用中顯然實(shí)現(xiàn)接口更好, 避免了單繼承限制。
Runnable vs CallableRunnable:實(shí)現(xiàn)run方法,無(wú)法拋出受檢查的異常,運(yùn)行時(shí)異常會(huì)中斷主線程,但主線程無(wú)法捕獲,所以子線程應(yīng)該自己處理所有異常
Callable:實(shí)現(xiàn)call方法,可以拋出受檢查的異常,可以被主線程捕獲,但主線程無(wú)法捕獲運(yùn)行時(shí)異常,也不會(huì)被打斷。
需要返回值的話,就用Callable接口
一個(gè)實(shí)現(xiàn)了Callable接口的對(duì)象,需要被包裝為RunnableFuture對(duì)象, 然后才能被新線程執(zhí)行, 而RunnableFuture其實(shí)還是實(shí)現(xiàn)了Runnable接口。
Future, Runnable 和FutureTask的關(guān)系如下:
可以看出FutureTask其實(shí)是RunnableFuture接口的實(shí)現(xiàn)類,下面是使用Future的示例代碼
public class Callee implements Callable { AtomicInteger counter = new AtomicInteger(0); private Integer seq=null; public Callee() { super(); } public Callee(int seq) { this.seq = seq; } /** * call接口可以拋出受檢查的異常 * @return * @throws InterruptedException */ @Override public Person call() throws InterruptedException { Person p = new Person("person"+ counter.incrementAndGet(), RandomUtil.random(0,150)); System.out.println("In thread("+seq+"), create a Person: "+p.toString()); Thread.sleep(1000); return p; } }
Callee callee1 = new Callee(); FutureTask線程調(diào)度ft= new FutureTask (callee1); Thread thread = new Thread(ft); thread.start(); try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); return; } System.out.println("ft.isDone: "+ft.isDone()); Person result1; try { result1 = ((Future ) ft).get(); } catch (InterruptedException e) { e.printStackTrace(); result1 = null; } catch (ExecutionException e) { e.printStackTrace(); result1 = null; } Person result = result1; System.out.println("main thread get result: "+result.toString());
Thread.yield() 方法:調(diào)用這個(gè)方法,會(huì)讓當(dāng)前線程退回到可運(yùn)行狀態(tài),而不是阻塞狀態(tài),這樣就留給其他同級(jí)線程一些運(yùn)行機(jī)會(huì)
Thread.sleep(long millis):調(diào)用這個(gè)方法,真的會(huì)讓當(dāng)前線程進(jìn)入阻塞狀態(tài),直到時(shí)間結(jié)束
線程對(duì)象的join():這個(gè)方法讓當(dāng)前線程進(jìn)入阻塞狀態(tài),直到要等待的線程結(jié)束。
線程對(duì)象的interrupt():不要以為它是中斷某個(gè)線程!它只是線線程發(fā)送一個(gè)中斷信號(hào),讓線程在無(wú)限等待時(shí)(如死鎖時(shí))能拋出異常,從而結(jié)束線程,但是如果你吃掉了這個(gè)異常,那么這個(gè)線程還是不會(huì)中斷的!
Object類中的wait():線程進(jìn)入等待狀態(tài),直到其他線程調(diào)用此對(duì)象的 notify() 方法或 notifyAll() 喚醒方法。這個(gè)狀態(tài)跟加鎖有關(guān),所以是Object的方法。
Object類中的notify():?jiǎn)拘言诖藢?duì)象監(jiān)視器上等待的單個(gè)線程。如果所有線程都在此對(duì)象上等待,則會(huì)選擇喚醒其中一個(gè)線程。選擇是任意性的,并在對(duì)實(shí)現(xiàn)做出決定時(shí)發(fā)生。線程通過(guò)調(diào)用其中一個(gè) wait 方法,在對(duì)象的監(jiān)視器上等待。 直到當(dāng)前的線程放棄此對(duì)象上的鎖定,才能繼續(xù)執(zhí)行被喚醒的線程。被喚醒的線程將以常規(guī)方式與在該對(duì)象上主動(dòng)同步的其他所有線程進(jìn)行競(jìng)爭(zhēng);類似的方法還有一個(gè)notifyAll(),喚醒在此對(duì)象監(jiān)視器上等待的所有線程。
同步與鎖 內(nèi)存一致性錯(cuò)誤由于線程在并行時(shí),可能會(huì)"同時(shí)"訪問(wèn)一個(gè)變量, 所以共享變量的時(shí)候,會(huì)出現(xiàn)值處于一個(gè)不確定的狀況, 例如下面的代碼, c是一個(gè)實(shí)例變量, 多個(gè)線程同時(shí)訪問(wèn)increment或decrement方法時(shí),就可能出現(xiàn)一致性錯(cuò)誤,最終讓c變成"奇怪"的值。
public class Counter { private int c = 0; public void increment() { c++; } public void decrement() { c--; } public int value() { return c; } }volatile
public class Foo { private int x = -1; private volatile boolean v = false; public void setX(int x) { this.x = x; v = true; } public int getX() { if (v == true) { return x; } return 0; } }
volatile關(guān)鍵字實(shí)際上指定了變量不使用寄存器, 并且對(duì)變量的訪問(wèn)不會(huì)亂序執(zhí)行,從而避免了并行訪問(wèn)的不一致問(wèn)題。但這個(gè)方案僅僅對(duì)原始類型變量本身生效,如果是++或者--這種“非原子”操作,則不能保證多線程操作的正確性了
原子類型JDK提供了一系列對(duì)基本類型的封裝,形成原子類型(Atomic Variables),特別適合用來(lái)做計(jì)數(shù)器
import java.util.concurrent.atomic.AtomicInteger; class AtomicCounter { private AtomicInteger c = new AtomicInteger(0); public void increment() { c.incrementAndGet(); } public void decrement() { c.decrementAndGet(); } public int value() { return c.get(); } }
原子操作的實(shí)現(xiàn)原理,在Java8之前和之后不同
Java7
public final int getAndIncrement() { for (;;) { int current = get(); int next = current + 1; if (compareAndSet(current, next)) return current; } }
Java8
public final int getAndIncrement() { return unsafe.getAndAddInt(this, valueOffset, 1); }
至于Compare-and-Swap,以及Fetch-and-Add兩種算法,是依賴機(jī)器底層機(jī)制實(shí)現(xiàn)的。
線程安全的集合類BlockingQueue: 定義了一個(gè)先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),當(dāng)你嘗試往滿隊(duì)列中添加元素,或者從空隊(duì)列中獲取元素時(shí),將會(huì)阻塞或者超時(shí)
ConcurrentMap: 是 java.util.Map 的子接口,定義了一些有用的原子操作。移除或者替換鍵值對(duì)的操作只有當(dāng) key 存在時(shí)才能進(jìn)行,而新增操作只有當(dāng) key 不存在時(shí)。使這些操作原子化,可以避免同步。ConcurrentMap 的標(biāo)準(zhǔn)實(shí)現(xiàn)是 ConcurrentHashMap,它是 HashMap 的并發(fā)模式。
ConcurrentNavigableMap: 是 ConcurrentMap 的子接口,支持近似匹配。ConcurrentNavigableMap 的標(biāo)準(zhǔn)實(shí)現(xiàn)是 ConcurrentSkipListMap,它是 TreeMap 的并發(fā)模式。
ThreadLocal-只有本線程才能訪問(wèn)的變量ThreadLoal 變量,它的基本原理是,同一個(gè) ThreadLocal 所包含的對(duì)象(對(duì)ThreadLocal< String >而言即為 String 類型變量),在不同的 Thread 中有不同的副本(實(shí)際是不同的實(shí)例,后文會(huì)詳細(xì)闡述)。這里有幾點(diǎn)需要注意
因?yàn)槊總€(gè) Thread 內(nèi)有自己的實(shí)例副本,且該副本只能由當(dāng)前 Thread 使用。這是也是 ThreadLocal 命名的由來(lái)
既然每個(gè) Thread 有自己的實(shí)例副本,且其它 Thread 不可訪問(wèn),那就不存在多線程間共享的問(wèn)題。
它與普通變量的區(qū)別在于,每個(gè)使用該變量的線程都會(huì)初始化一個(gè)完全獨(dú)立的實(shí)例副本。ThreadLocal 變量通常被private static修飾。當(dāng)一個(gè)線程結(jié)束時(shí),它所使用的所有 ThreadLocal 相對(duì)的實(shí)例副本都可被回收。
總的來(lái)說(shuō),ThreadLocal 適用于每個(gè)線程需要自己獨(dú)立的實(shí)例且該實(shí)例需要在多個(gè)方法中被使用,也即變量在線程間隔離而在方法或類間共享的場(chǎng)景。后文會(huì)通過(guò)實(shí)例詳細(xì)闡述該觀點(diǎn)。另外,該場(chǎng)景下,并非必須使用 ThreadLocal ,其它方式完全可以實(shí)現(xiàn)同樣的效果,只是 ThreadLocal 使得實(shí)現(xiàn)更簡(jiǎn)潔。
synchronized關(guān)鍵字方法加鎖:其實(shí)不是加在指定的方法上,而是在指定的對(duì)象上,只不過(guò)在方法開始前會(huì)檢查這個(gè)鎖
靜態(tài)方法鎖:加在類上,它和加在對(duì)象上的鎖互補(bǔ)干擾
代碼區(qū)塊鎖:其實(shí)不是加在指定的代碼塊上,而是加在指定的對(duì)象上,只不過(guò)在代碼塊開始前會(huì)檢查這個(gè)鎖。一個(gè)對(duì)象只會(huì)有一個(gè)鎖,所以代碼塊鎖和實(shí)例方法鎖是會(huì)互相影響的
需要注意的是:無(wú)論synchronized關(guān)鍵字加在方法上還是對(duì)象上,它取得的鎖都是對(duì)象,而不是把一段代碼或函數(shù)當(dāng)作鎖――而且同步方法很可能還會(huì)被其他線程的對(duì)象訪問(wèn),每個(gè)對(duì)象只有一個(gè)鎖(lock)與之相關(guān)聯(lián)
加鎖不慎可能會(huì)造成死鎖
線程池(Java 5) 用途真正的多線程使用,是從線程池開始的,Callable接口,基本上也是被線程池調(diào)用的。
線程池全景圖ExecutorService pool = Executors.newFixedThreadPool(3); Callable線程池要解決的問(wèn)題worker1 = new Callee(); Future ft1 = pool.submit(worker1); Callable worker2 = new Callee(); Future ft2 = pool.submit(worker2); Callable worker3 = new Callee(); Future ft3 = pool.submit(worker3); System.out.println("準(zhǔn)備通知線程池shutdown..."); pool.shutdown(); System.out.println("已通知線程池shutdown"); try { pool.awaitTermination(2L, TimeUnit.SECONDS); System.out.println("線程池完全結(jié)束"); } catch (InterruptedException e) { e.printStackTrace(); }
任務(wù)排隊(duì):當(dāng)前能并發(fā)執(zhí)行的線程數(shù)總是有限的,但任務(wù)數(shù)可以很大
線程調(diào)度:線程的創(chuàng)建是比較消耗資源的,需要一個(gè)池來(lái)維持活躍線程
結(jié)果收集:每個(gè)任務(wù)完成以后,其結(jié)果需要統(tǒng)一采集
線程池類型newSingleThreadExecutor:創(chuàng)建一個(gè)單線程的線程池。這個(gè)線程池只有一個(gè)線程在工作,也就是相當(dāng)于單線程串行執(zhí)行所有任務(wù)。如果這個(gè)唯一的線程因?yàn)楫惓=Y(jié)束,那么會(huì)有一個(gè)新的線程來(lái)替代它。此線程池保證所有任務(wù)的執(zhí)行順序按照任務(wù)的提交順序執(zhí)行。
newFixedThreadPool:創(chuàng)建固定大小的線程池。每次提交一個(gè)任務(wù)就創(chuàng)建一個(gè)線程,直到線程達(dá)到線程池的最大大小。線程池的大小一旦達(dá)到最大值就會(huì)保持不變,如果某個(gè)線程因?yàn)閳?zhí)行異常而結(jié)束,那么線程池會(huì)補(bǔ)充一個(gè)新線程。
newCachedThreadPool:創(chuàng)建一個(gè)可緩存的線程池。如果線程池的大小超過(guò)了處理任務(wù)所需要的線程,那么就會(huì)回收部分空閑(60秒不執(zhí)行任務(wù))的線程,當(dāng)任務(wù)數(shù)增加時(shí),此線程池又可以智能的添加新線程來(lái)處理任務(wù)。此線程池不會(huì)對(duì)線程池大小做限制,線程池大小完全依賴于操作系統(tǒng)(或者說(shuō)JVM)能夠創(chuàng)建的最大線程大小。
newScheduledThreadPool:創(chuàng)建一個(gè)大小無(wú)限的線程池。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。
newSingleThreadScheduledExecutor:創(chuàng)建一個(gè)單線程的線程池。此線程池支持定時(shí)以及周期性執(zhí)行任務(wù)的需求。
線程池狀態(tài)線程池在構(gòu)造前(new操作)是初始狀態(tài),一旦構(gòu)造完成線程池就進(jìn)入了執(zhí)行狀態(tài)RUNNING。嚴(yán)格意義上講線程池構(gòu)造完成后并沒有線程被立即啟動(dòng),只有進(jìn)行“預(yù)啟動(dòng)”或者接收到任務(wù)的時(shí)候才會(huì)啟動(dòng)線程。這個(gè)會(huì)后面線程池的原理會(huì)詳細(xì)分析。但是線程池是出于運(yùn)行狀態(tài),隨時(shí)準(zhǔn)備接受任務(wù)來(lái)執(zhí)行。
線程池運(yùn)行中可以通過(guò)shutdown()和shutdownNow()來(lái)改變運(yùn)行狀態(tài)。shutdown()是一個(gè)平緩的關(guān)閉過(guò)程,線程池停止接受新的任務(wù),同時(shí)等待已經(jīng)提交的任務(wù)執(zhí)行完畢,包括那些進(jìn)入隊(duì)列還沒有開始的任務(wù),這時(shí)候線程池處于SHUTDOWN狀態(tài);shutdownNow()是一個(gè)立即關(guān)閉過(guò)程,線程池停止接受新的任務(wù),同時(shí)線程池取消所有執(zhí)行的任務(wù)和已經(jīng)進(jìn)入隊(duì)列但是還沒有執(zhí)行的任務(wù),這時(shí)候線程池處于STOP狀態(tài)。
一旦shutdown()或者shutdownNow()執(zhí)行完畢,線程池就進(jìn)入TERMINATED狀態(tài),此時(shí)線程池就結(jié)束了。
isTerminating()描述的是SHUTDOWN和STOP兩種狀態(tài)。
isShutdown()描述的是非RUNNING狀態(tài),也就是SHUTDOWN/STOP/TERMINATED三種狀態(tài)。
任務(wù)拒絕策略 Fork/Join模型(Java7) 用途計(jì)算密集型的任務(wù),最好很少有IO等待,也沒有Sleep之類的,最好是本身就適合遞歸處理的算法
分析在給定的線程數(shù)內(nèi),盡可能地最大化利用CPU資源,但又不會(huì)導(dǎo)致其他資源過(guò)載(比如內(nèi)存),或者大量空線程等待。
ForkJoinPool主要用來(lái)使用分治法(Divide-and-Conquer Algorithm)來(lái)解決問(wèn)題。典型的應(yīng)用比如快速排序算法。
這里的要點(diǎn)在于,F(xiàn)orkJoinPool需要使用相對(duì)少的線程來(lái)處理大量的任務(wù)。
比如要對(duì)1000萬(wàn)個(gè)數(shù)據(jù)進(jìn)行排序,那么會(huì)將這個(gè)任務(wù)分割成兩個(gè)500萬(wàn)的排序任務(wù)和一個(gè)針對(duì)這兩組500萬(wàn)數(shù)據(jù)的合并任務(wù)。以此類推,對(duì)于500萬(wàn)的數(shù)據(jù)也會(huì)做出同樣的分割處理,到最后會(huì)設(shè)置一個(gè)閾值來(lái)規(guī)定當(dāng)數(shù)據(jù)規(guī)模到多少時(shí),停止這樣的分割處理。比如,當(dāng)元素的數(shù)量小于10時(shí),會(huì)停止分割,轉(zhuǎn)而使用插入排序?qū)λ鼈冞M(jìn)行排序。
那么到最后,所有的任務(wù)加起來(lái)會(huì)有大概2000000+個(gè)。問(wèn)題的關(guān)鍵在于,對(duì)于一個(gè)任務(wù)而言,只有當(dāng)它所有的子任務(wù)完成之后,它才能夠被執(zhí)行。
所以當(dāng)使用ThreadPoolExecutor時(shí),使用分治法會(huì)存在問(wèn)題,因?yàn)門hreadPoolExecutor中的線程無(wú)法像任務(wù)隊(duì)列中再添加一個(gè)任務(wù)并且在等待該任務(wù)完成之后再繼續(xù)執(zhí)行。而使用ForkJoinPool時(shí),就能夠讓其中的線程創(chuàng)建新的任務(wù),并掛起當(dāng)前的任務(wù),此時(shí)線程就能夠從隊(duì)列中選擇子任務(wù)執(zhí)行。
以上程序的關(guān)鍵是fork()和join()方法。在ForkJoinPool使用的線程中,會(huì)使用一個(gè)內(nèi)部隊(duì)列來(lái)對(duì)需要執(zhí)行的任務(wù)以及子任務(wù)進(jìn)行操作來(lái)保證它們的執(zhí)行順序。
那么使用ThreadPoolExecutor或者ForkJoinPool,會(huì)有什么性能的差異呢?
首先,使用ForkJoinPool能夠使用數(shù)量有限的線程來(lái)完成非常多的具有父子關(guān)系的任務(wù),比如使用4個(gè)線程來(lái)完成超過(guò)200萬(wàn)個(gè)任務(wù)。但是,使用ThreadPoolExecutor時(shí),是不可能完成的,因?yàn)門hreadPoolExecutor中的Thread無(wú)法選擇優(yōu)先執(zhí)行子任務(wù),需要完成200萬(wàn)個(gè)具有父子關(guān)系的任務(wù)時(shí),也需要200萬(wàn)個(gè)線程,顯然這是不可行的。
ps:ForkJoinPool在執(zhí)行過(guò)程中,會(huì)創(chuàng)建大量的子任務(wù),導(dǎo)致GC進(jìn)行垃圾回收,這些是需要注意的。
原理與使用ForkJoinPool首先是ExecutorService的實(shí)現(xiàn)類,因此是特殊的線程池。
創(chuàng)建了ForkJoinPool實(shí)例之后,就可以調(diào)用ForkJoinPool的submit(ForkJoinTask
其中ForkJoinTask代表一個(gè)可以并行、合并的任務(wù)。ForkJoinTask是一個(gè)抽象類,它還有兩個(gè)抽象子類:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任務(wù),而RecusiveAction代表沒有返回值的任務(wù)。
個(gè)人認(rèn)為ForkJoinPool設(shè)計(jì)不太好的地方在于,F(xiàn)orkJoinTask不是個(gè)接口,而是抽象類,實(shí)際使用時(shí)基本上不是繼承RecursiveAction就是繼承RecursiveTask,對(duì)業(yè)務(wù)類有限制。
示例典型的一個(gè)例子,就是一串?dāng)?shù)組求和
public interface Calculator { long sumUp(long[] numbers); }
public class ForkJoinCalculator implements Calculator { private ForkJoinPool pool; private static class SumTask extends RecursiveTask{ private long[] numbers; private int from; private int to; public SumTask(long[] numbers, int from, int to) { this.numbers = numbers; this.from = from; this.to = to; } @Override protected Long compute() { // 當(dāng)需要計(jì)算的數(shù)字小于6時(shí),直接計(jì)算結(jié)果 if (to - from < 6) { long total = 0; for (int i = from; i <= to; i++) { total += numbers[i]; } return total; // 否則,把任務(wù)一分為二,遞歸計(jì)算 } else { int middle = (from + to) / 2; SumTask taskLeft = new SumTask(numbers, from, middle); SumTask taskRight = new SumTask(numbers, middle+1, to); taskLeft.fork(); taskRight.fork(); return taskLeft.join() + taskRight.join(); } } } public ForkJoinCalculator() { // 也可以使用公用的 ForkJoinPool: // pool = ForkJoinPool.commonPool() pool = new ForkJoinPool(); } @Override public long sumUp(long[] numbers) { return pool.invoke(new SumTask(numbers, 0, numbers.length-1)); } }
這個(gè)例子展示了當(dāng)數(shù)組被拆分得足夠小(<6)之后,就不需要并行處理了,而更大的數(shù)組就拆為兩半,分別處理。
Stream(Java 8) 概念別搞混了,跟IO的Stream完全不是一回事,可以把它看做是集合處理的聲明式語(yǔ)法,類似數(shù)據(jù)庫(kù)操作語(yǔ)言SQL。當(dāng)然也有跟IO類似的地方,就是Stream只能消費(fèi)一次,不能重復(fù)使用。
看個(gè)例子:
int sum = widgets.stream() .filter(w -> w.getColor() == RED) .mapToInt(w -> w.getWeight()) .sum();
流提供了一個(gè)能力,任何一個(gè)流,只要獲取一次并行流,后面的操作就都可以并行了。
例如:
Stream流操作 生成流stream = Stream.of("a", "b", "c","d","e","f","g"); String str = stream.parallel().reduce((a, b) -> a + "," + b).get(); System.out.println(str);
Collection.stream()
Collection.parallelStream()
Arrays.stream(T array) or Stream.of()
java.io.BufferedReader.lines()
java.util.stream.IntStream.range()
java.nio.file.Files.walk()
java.util.Spliterator
Random.ints()
BitSet.stream()
Pattern.splitAsStream(java.lang.CharSequence)
JarFile.stream()
示例
// 1. Individual values Stream stream = Stream.of("a", "b", "c"); // 2. Arrays String [] strArray = new String[] {"a", "b", "c"}; stream = Stream.of(strArray); stream = Arrays.stream(strArray); // 3. Collections Listlist = Arrays.asList(strArray); stream = list.stream();
需要注意的是,對(duì)于基本數(shù)值型,目前有三種對(duì)應(yīng)的包裝類型 Stream:
IntStream、LongStream、DoubleStream。當(dāng)然我們也可以用 Stream
一個(gè)流可以后面跟隨零個(gè)或多個(gè) intermediate 操作。其目的主要是打開流,做出某種程度的數(shù)據(jù)映射/過(guò)濾,然后返回一個(gè)新的流,交給下一個(gè)操作使用。這類操作都是惰性化的(lazy),就是說(shuō),僅僅調(diào)用到這類方法,并沒有真正開始流的遍歷。
已知的Intermediate操作包括:map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered。
Terminal一個(gè)流只能有一個(gè) terminal操作,當(dāng)這個(gè)操作執(zhí)行后,流就被使用“光”了,無(wú)法再被操作。所以這必定是流的最后一個(gè)操作。Terminal 操作的執(zhí)行,才會(huì)真正開始流的遍歷,并且會(huì)生成一個(gè)結(jié)果,或者一個(gè) side effect。
已知的Terminal操作包括:forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator
reduce解析: reduce本質(zhì)上是個(gè)聚合方法,它的作用是用流里面的元素生成一個(gè)結(jié)果,所以用來(lái)做累加,字符串拼接之類的都非常合適。它有三個(gè)參數(shù)
初始值:最終結(jié)果的初始化值,可以是一個(gè)空的對(duì)象
聚合函數(shù):一個(gè)二元函數(shù)(有兩個(gè)參數(shù)),第一個(gè)參數(shù)是上一次聚合的結(jié)果,第二個(gè)參數(shù)是某個(gè)元素
多個(gè)部分結(jié)果的合并函數(shù):如果流并發(fā)了,那么聚合操作會(huì)分為多段進(jìn)行,這里顯示了多段之間如何配合
collect: collect比reduce更強(qiáng)大:reduce最終只能得到一個(gè)跟流里數(shù)據(jù)類型相同的值, 但collect的結(jié)果可以是任何對(duì)象。簡(jiǎn)單的collect也有三個(gè)參數(shù):
最終要返回的數(shù)據(jù)容器
把元素并入返回值的方法
多個(gè)部分結(jié)果的合并
兩個(gè)collect示例
//和reduce相同的合并字符操作 String concat = stringStream.collect(StringBuilder::new, StringBuilder::append,StringBuilder::append).toString(); //等價(jià)于上面,這樣看起來(lái)應(yīng)該更加清晰 String concat = stringStream.collect(() -> new StringBuilder(),(l, x) -> l.append(x), (r1, r2) -> r1.append(r2)).toString();
//把stream轉(zhuǎn)成map Stream stream = Stream.of(1, 2, 3, 4).filter(p -> p > 2); List result = stream.collect(() -> new ArrayList<>(), (list, item) -> list.add(item), (one, two) -> one.addAll(two)); /* 或者使用方法引用 */ result = stream.collect(ArrayList::new, List::add, List::addAll);協(xié)程
協(xié)程,英文Coroutines,也叫纖程(Fiber)是一種比線程更加輕量級(jí)的存在。正如一個(gè)進(jìn)程可以擁有多個(gè)線程一樣,一個(gè)線程也可以擁有多個(gè)協(xié)程。
協(xié)程實(shí)際上是在語(yǔ)言底層(或者框架)對(duì)需要等待的程序進(jìn)行調(diào)度,從而充分利用CPU的方法, 其實(shí)這完全可以通過(guò)回調(diào)來(lái)實(shí)現(xiàn), 但是深層回調(diào)的代碼太{{BANNED}}了,所以發(fā)明了協(xié)程的寫法。理論上多個(gè)協(xié)程不會(huì)真的"同時(shí)"執(zhí)行,也就不會(huì)引起共享變量操作的不確定性,不需要加鎖(待確認(rèn))。
pythone協(xié)程示例
Pythone, Golang和C#都內(nèi)置了協(xié)程的語(yǔ)法,但Java沒有,只能通過(guò)框架實(shí)現(xiàn),常見的框架包括:Quasar,kilim和ea-async。
Java ea-async 協(xié)程示例
import static com.ea.async.Async.await; import static java.util.concurrent.CompletableFuture.completedFuture; public class Store { //購(gòu)物操作, 傳一個(gè)商品id和一個(gè)價(jià)格 public CompletableFuture參考資料buyItem(String itemTypeId, int cost) { //銀行扣款(長(zhǎng)時(shí)間操作) if(!await(bank.decrement(cost))) { return completedFuture(false); } try { //商品出庫(kù)(長(zhǎng)時(shí)間操作) await(inventory.giveItem(itemTypeId)); return completedFuture(true); } catch (Exception ex) { await(bank.refund(cost)); throw new AppException(ex); } } }
《七周七并發(fā)模型》電子書
深入淺出Java Concurrency——線程池
Java多線程學(xué)習(xí)(吐血超詳細(xì)總結(jié))
Jetty基礎(chǔ)之線程模型
Jetty-server高性能,多線程特性的源碼分析
Java 編程要點(diǎn)之并發(fā)(Concurrency)詳解
Java Concurrency in Depth (Part 1)
Java進(jìn)階(七)正確理解Thread Local的原理與適用場(chǎng)景
Java 并發(fā)編程筆記:如何使用 ForkJoinPool 以及原理
ForkJoinPool簡(jiǎn)介
多線程 ForkJoinPool
Java 8 中的 Streams API 詳解
Java中的協(xié)程實(shí)現(xiàn)
漫畫:什么是協(xié)程
學(xué)習(xí)源碼
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/72823.html
摘要:實(shí)戰(zhàn)高并發(fā)程序設(shè)計(jì)推薦豆瓣評(píng)分書的質(zhì)量沒的說(shuō),推薦大家好好看一下。推薦,豆瓣評(píng)分,人評(píng)價(jià)本書介紹了在編程中條極具實(shí)用價(jià)值的經(jīng)驗(yàn)規(guī)則,這些經(jīng)驗(yàn)規(guī)則涵蓋了大多數(shù)開發(fā)人員每天所面臨的問(wèn)題的解決方案。 很早就想把JavaGuide的書單更新一下了,昨晚加今天早上花了幾個(gè)時(shí)間對(duì)之前的書單進(jìn)行了分類和補(bǔ)充完善。雖是終極版,但一定還有很多不錯(cuò)的 Java 書籍我沒有添加進(jìn)去,會(huì)繼續(xù)完善下去。希望這篇...
摘要:程序正常運(yùn)行,輸出了預(yù)期容量的大小這是正常運(yùn)行結(jié)果,未發(fā)生多線程安全問(wèn)題,但這是不確定性的,不是每次都會(huì)達(dá)到正常預(yù)期的。另外,像等都有類似多線程安全問(wèn)題,在多線程并發(fā)環(huán)境下避免使用這種集合。 這個(gè)問(wèn)題是 Java 程序員面試經(jīng)常會(huì)遇到的吧。 工作一兩年的應(yīng)該都知道 ArrayList 是線程不安全的,要使用線程安全的就使用 Vector,這也是各種 Java 面試寶典里面所提及的,可能...
摘要:相關(guān)推薦,豆瓣評(píng)分,人評(píng)價(jià)本書介紹了在編程中條極具實(shí)用價(jià)值的經(jīng)驗(yàn)規(guī)則,這些經(jīng)驗(yàn)規(guī)則涵蓋了大多數(shù)開發(fā)人員每天所面臨的問(wèn)題的解決方案。實(shí)戰(zhàn)高并發(fā)程序設(shè)計(jì)推薦豆瓣評(píng)分,書的質(zhì)量沒的說(shuō),推薦大家好好看一下。 該文已加入開源文檔:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識(shí))。地址:https://github.com/Snailclimb... 【強(qiáng)烈推薦!非廣告!】...
本文是公眾號(hào)讀者jianfeng投稿的面試經(jīng)驗(yàn)恭喜該同學(xué)成功轉(zhuǎn)型目錄:毅然轉(zhuǎn)型,沒頭蒼蠅制定目標(biāo),系統(tǒng)學(xué)習(xí)面試經(jīng)歷毅然轉(zhuǎn)崗,沒頭蒼蠅首先,介紹一下我的背景。本人坐標(biāo)廣州,2016年畢業(yè)于一個(gè)普通二本大學(xué),曾經(jīng)在某機(jī)構(gòu)培訓(xùn)過(guò)Android。2018年初的時(shí)候已經(jīng)在兩家小公司工作干了兩年的android開發(fā),然后會(huì)一些Tomcat、Servlet之類的技術(shù),當(dāng)時(shí)的年薪大概也就15萬(wàn)這樣子。由于個(gè)人發(fā)展...
閱讀 2188·2023-04-25 19:06
閱讀 1388·2021-11-17 09:33
閱讀 1776·2019-08-30 15:53
閱讀 2598·2019-08-30 14:20
閱讀 3553·2019-08-29 12:58
閱讀 3551·2019-08-26 13:27
閱讀 512·2019-08-26 12:23
閱讀 493·2019-08-26 12:22