摘要:什么是協(xié)程大多數(shù)的開發(fā)人員可能對(duì)進(jìn)程,線程這兩個(gè)名字比較熟悉。但是為了追求最大力度的發(fā)揮硬件的性能和提升軟件的速度,出現(xiàn)了協(xié)程或者叫纖程,或者綠色線程。原理利用字節(jié)碼增強(qiáng),將普通的代碼轉(zhuǎn)換為支持協(xié)程的代碼。
什么是協(xié)程
大多數(shù)的開發(fā)人員可能對(duì)進(jìn)程,線程這兩個(gè)名字比較熟悉。但是為了追求最大力度的發(fā)揮硬件的性能和提升軟件的速度,出現(xiàn)了協(xié)程或者叫纖程(Fiber),或者綠色線程(GreenThread)。那我們來聊下什么是協(xié)程,以及在java中是怎么體現(xiàn)和運(yùn)用協(xié)程的。
在說協(xié)程之前,我們先來回想下,現(xiàn)在大多數(shù)的程序中,都是使用了多線程技術(shù)來解決一些需要長(zhǎng)時(shí)間阻塞的場(chǎng)景。JAVA中每個(gè)線程棧默認(rèn)1024K,沒有辦法開成千上萬個(gè)線程,而且就算通過JVM參數(shù)調(diào)小,CPU也無法分配時(shí)間片給每個(gè)線程,大多數(shù)的線程還是在等待中,所以我們一般會(huì)使用
Runtime.getRuntime().availableProcessors()來配置線程數(shù)的大小(或者會(huì)根據(jù)實(shí)際情況調(diào)整,就不展開討論了),但是就算是我們開了新的線程,該線程也可能是在等待系統(tǒng)IO的返回或者網(wǎng)絡(luò)IO的返回,而且線程的切換有著大量的開銷。
為了解決上面說的問題,大家可能會(huì)想到回調(diào)?,F(xiàn)在很多框架都是基于回調(diào)來解決那些耗時(shí)的操作。但層數(shù)嵌套多了反而會(huì)引起反人類的回調(diào)地獄,并且回調(diào)后就丟失原函數(shù)的上下文。其中的代表呢就比如說nodeJs。
終于可以來聊聊協(xié)程。它的基本原理是:在某個(gè)點(diǎn)掛起當(dāng)前的任務(wù),并且保存棧信息,去執(zhí)行另一個(gè)任務(wù);等完成或達(dá)到某個(gè)條件時(shí),在還原原來的棧信息并繼續(xù)執(zhí)行。上面提到的幾個(gè)點(diǎn)大家會(huì)想到JVM的結(jié)構(gòu),棧, 程序計(jì)數(shù)器等等,但是JVM原生是不支持這樣的操作的(至少java是不支持的,kotlin是可以的)。因此如果要在純java代碼里需要使用協(xié)程的話需要引入第三方包,如kilim,Quasar。而kilim已經(jīng)很久未更新了,那么我們來看看Quasar。
Quasar原理
1、利用字節(jié)碼增強(qiáng),將普通的java代碼轉(zhuǎn)換為支持協(xié)程的代碼。
2、在調(diào)用pausable方法的時(shí)候,如果pause了就保存當(dāng)前方法棧的State,停止執(zhí)行當(dāng)前協(xié)程,將控制權(quán)交給調(diào)度器
3、調(diào)度器負(fù)責(zé)調(diào)度就緒的協(xié)程
4、協(xié)程resume的時(shí)候,自動(dòng)恢復(fù)State,根據(jù)協(xié)程的pc計(jì)數(shù)跳轉(zhuǎn)到上次執(zhí)行的位置,繼續(xù)執(zhí)行。
這些第三方的框架大部分實(shí)現(xiàn)是一致的。通過對(duì)字節(jié)碼直接操作,在編譯期把你寫的代碼變?yōu)橹С謪f(xié)程的版本,并在運(yùn)行時(shí)把你所有需要用到協(xié)程的部分由他來控制和調(diào)度,同時(shí)也支持在運(yùn)行期這樣做。
Quasar中使用了拋異常的方式來中斷線程,但是 實(shí)際上如果我們捕獲了這個(gè)異常就會(huì)產(chǎn)生問題,所以一般是以這種方式來注冊(cè):
@Suspendable
public int f() {
try {
// do some stuff
return g() * 2;
} catch(SuspendExecution s) {
//這里不應(yīng)該捕獲到異常.
throw new AssertionError(s);
}
}
在調(diào)度方面,Quasar中默認(rèn)使用了JDK7以上才有的ForkJoinPool,它的優(yōu)勢(shì)就在于空閑的線程會(huì)去從其他線程任務(wù)隊(duì)列尾部”偷取”任務(wù)來自己處理,因此也叫work-stealing功能。這個(gè)功能可以大大的利用CPU資源,不讓線程白白空閑著。
Quasar模塊
Fiber
Fiber可以認(rèn)為是一個(gè)微線程,使用方式基本上和Thread相同,啟動(dòng)start:
new Fiber
@Override
protected V run() throws SuspendExecution, InterruptedException {
// your code
}
}.start();
new Fiber
public void run() throws SuspendExecution, InterruptedException {
// your code
}
}).start();
其實(shí)它更類似于一個(gè)CallBack,是可以攜帶返回值的,并且可以拋異常SuspendExecution,InterruptedException。你也可以向其中傳遞SuspendableRunnable 或 SuspendableCallable 給Fiber的構(gòu)造函數(shù)。你甚至可以像線程一樣調(diào)用join(),或者get()來阻塞線程等待他完成。
當(dāng)Fiber比較大的時(shí)候,F(xiàn)iber可以在調(diào)用parkAndSerialize 方法時(shí)被序列化,在調(diào)用unparkSerialized時(shí)被反序列化。
從以上我們可以看出Fiber與Thread非常類似,極大的減少了遷移的成本。
FiberScheduler
FiberScheduler是Quasar框架中核心的任務(wù)調(diào)度器,負(fù)責(zé)管理任務(wù)的工作者線程WorkerThread,之前提到的他是一個(gè)FiberForkJoinScheduler。
ForkJoinPool的默認(rèn)初始化個(gè)數(shù)為Runtime.getRuntime().availableProcessors()。
instrumentation
當(dāng)一個(gè)類被加載時(shí),Quasar的instrumentation模塊 (使用 Java agent時(shí)) 搜索suspendable 方法。每一個(gè)suspendable 方法 f通過下面的方式 instrument:
它搜索對(duì)其它suspendable方法的調(diào)用。對(duì)suspendable方法g的調(diào)用,一些代碼會(huì)在這個(gè)調(diào)用g的前后被插入,它們會(huì)保存和恢復(fù)fiber棧本地變量的狀態(tài),記錄這個(gè)暫停點(diǎn)。在這個(gè)“suspendable function chain”的最后,我們會(huì)發(fā)現(xiàn)對(duì)Fiber.park的調(diào)用。park暫停這個(gè)fiber,扔出 SuspendExecution異常。
當(dāng)g block的時(shí)候,SuspendExecution異常會(huì)被Fiber捕獲。 當(dāng)Fiber被喚醒(使用unpark), 方法f會(huì)被調(diào)用, 執(zhí)行記錄顯示它被block在g的調(diào)用上,所以程序會(huì)立即跳到f調(diào)用g的那一行,然后調(diào)用它。最終我們會(huì)到達(dá)暫停點(diǎn),然后繼續(xù)執(zhí)行。當(dāng)g返回時(shí), f中插入的代碼會(huì)恢復(fù)f的本地變量。
過程聽起來很復(fù)雜,但是它只會(huì)帶來3% ~ 5%的性能的損失。
下面看一個(gè)簡(jiǎn)單的例子, 方法m2聲明拋出SuspendExecution異常,方法m1調(diào)用m2和m3,所以也聲明拋出這個(gè)異常,最后這個(gè)異常會(huì)被Fiber所捕獲:
public class Helloworld {
static void m1() throws SuspendExecution, InterruptedException {
String m = "m1"; System.out.println("m1 begin"); m = m2(); m = m3(); System.out.println("m1 end"); System.out.println(m);
}
static String m2() throws SuspendExecution, InterruptedException {
return "m2";
}
static String m3() throws SuspendExecution, InterruptedException {
return "m3";
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
new Fiber("Caller", new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException { m1(); } }).start();
}
}
// 反編譯后的代碼
@Instrumented(suspendableCallSites={16, 17}, methodStart=13, methodEnd=21, methodOptimized=false)
static void m1()
throws SuspendExecution, InterruptedException
{
// Byte code:
// 0: aconst_null
// 1: astore_3
// 2: invokestatic 88 co/paralleluniverse/fibers/Stack:getStack ()Lco/paralleluniverse/fibers/Stack;
// 5: dup
// 6: astore_1
// 7: ifnull +42 -> 49
// 10: aload_1
// 11: iconst_1
// 12: istore_2
// 13: invokevirtual 92 co/paralleluniverse/fibers/Stack:nextMethodEntry ()I
// 16: tableswitch default:+24->40, 1:+64->80, 2:+95->111
// 40: aload_1
// 41: invokevirtual 96 co/paralleluniverse/fibers/Stack:isFirstInStackOrPushed ()Z
// 44: ifne +5 -> 49
// 47: aconst_null
// 48: astore_1
// 49: iconst_0
// 50: istore_2
// 51: ldc 2
// 53: astore_0
// 54: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 57: ldc 4
// 59: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 62: aload_1
// 63: ifnull +26 -> 89
// 66: aload_1
// 67: iconst_1
// 68: iconst_1
// 69: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 72: aload_0
// 73: aload_1
// 74: iconst_0
// 75: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 78: iconst_0
// 79: istore_2
// 80: aload_1
// 81: iconst_0
// 82: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 85: checkcast 110 java/lang/String
// 88: astore_0
// 89: invokestatic 6 com/colobu/fiber/Helloworld:m2 ()Ljava/lang/String;
// 92: astore_0
// 93: aload_1
// 94: ifnull +26 -> 120
// 97: aload_1
// 98: iconst_2
// 99: iconst_1
// 100: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 103: aload_0
// 104: aload_1
// 105: iconst_0
// 106: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 109: iconst_0
// 110: istore_2
// 111: aload_1
// 112: iconst_0
// 113: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 116: checkcast 110 java/lang/String
// 119: astore_0
// 120: invokestatic 7 com/colobu/fiber/Helloworld:m3 ()Ljava/lang/String;
// 123: astore_0
// 124: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 127: ldc 8
// 129: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 132: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 135: aload_0
// 136: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 139: aload_1
// 140: ifnull +7 -> 147
// 143: aload_1
// 144: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 147: return
// 148: aload_1
// 149: ifnull +7 -> 156
// 152: aload_1
// 153: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 156: athrow
// Line number table:
// Java source line #13 -> byte code offset #51
// Java source line #15 -> byte code offset #54
// Java source line #16 -> byte code offset #62
// Java source line #17 -> byte code offset #93
// Java source line #18 -> byte code offset #124
// Java source line #19 -> byte code offset #132
// Java source line #21 -> byte code offset #139
// Local variable table:
// start length slot name signature
// 53 83 0 m String
// 6 147 1 localStack co.paralleluniverse.fibers.Stack
// 12 99 2 i int
// 1 1 3 localObject Object
// 156 1 4 localSuspendExecution SuspendExecution
// Exception table:
// from to target type
// 49 148 148 finally
// 49 148 156 co/paralleluniverse/fibers/SuspendExecution
// 49 148 156 co/paralleluniverse/fibers/RuntimeSuspendExecution
}
我并沒有更深入的去了解Quasar的實(shí)現(xiàn)細(xì)節(jié)以及調(diào)度算法,有興趣的讀者可以翻翻它的代碼。
實(shí)戰(zhàn)
public class Helloworld {
@Suspendable
static void m1() throws InterruptedException, SuspendExecution {
String m = "m1"; //System.out.println("m1 begin"); m = m2(); //System.out.println("m1 end"); //System.out.println(m);
}
static String m2() throws SuspendExecution, InterruptedException {
String m = m3(); Strand.sleep(1000); return m;
}
//or define in META-INF/suspendables
@Suspendable
static String m3() {
List l = Stream.of(1,2,3).filter(i -> i%2 == 0).collect(Collectors.toList()); return l.toString();
}
static public void main(String[] args) throws ExecutionException, InterruptedException {
int count = 10000; testThreadpool(count); testFiber(count);
}
static void testThreadpool(int count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(count); ExecutorService es = Executors.newFixedThreadPool(200); LongAdder latency = new LongAdder(); long t = System.currentTimeMillis(); for (int i =0; i< count; i++) { es.submit(() -> { long start = System.currentTimeMillis(); try { m1(); } catch (InterruptedException e) { e.printStackTrace(); } catch (SuspendExecution suspendExecution) { suspendExecution.printStackTrace(); } start = System.currentTimeMillis() - start; latency.add(start); latch.countDown(); }); } latch.await(); t = System.currentTimeMillis() - t; long l = latency.longValue() / count; System.out.println("thread pool took: " + t + ", latency: " + l + " ms"); es.shutdownNow();
}
static void testFiber(int count) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(count); LongAdder latency = new LongAdder(); long t = System.currentTimeMillis(); for (int i =0; i< count; i++) { new Fiber("Caller", new SuspendableRunnable() { @Override public void run() throws SuspendExecution, InterruptedException { long start = System.currentTimeMillis(); m1(); start = System.currentTimeMillis() - start; latency.add(start); latch.countDown(); } }).start(); } latch.await(); t = System.currentTimeMillis() - t; long l = latency.longValue() / count; System.out.println("fiber took: " + t + ", latency: " + l + " ms");
}
}
OUTPUT:
1
2
thread pool took: 50341, latency: 1005 ms
fiber took: 1158, latency: 1000 ms
可以看到很明顯的時(shí)間差距,存在多線程阻塞的情況下,協(xié)程的性能非常的好,但是。如果把sleep這段去掉,F(xiàn)iber的性能反而更差:
這說明Fiber并不意味著它可以在所有的場(chǎng)景中都可以替換Thread。當(dāng)fiber的代碼經(jīng)常會(huì)被等待其它fiber阻塞的時(shí)候,就應(yīng)該使用fiber。
對(duì)于那些需要CPU長(zhǎng)時(shí)間計(jì)算的代碼,很少遇到阻塞的時(shí)候,就應(yīng)該首選thread
擴(kuò)展
其實(shí)協(xié)程這個(gè)概念在其他的語言中有原生的支持,如:
kotlin 1.30之后已經(jīng)穩(wěn)定
: https://www.kotlincn.net/docs...
golang : https://gobyexample.com/gorou...
python : http://www.gevent.org/content...~
在這些語言中協(xié)程就看起來至少?zèng)]這么奇怪或者難以理解了,而且開發(fā)起開也相比java簡(jiǎn)單很多。
總結(jié)
協(xié)程的概念也不算是很新了,但是在像Java這樣的語言或者特定的領(lǐng)域并不是很火,也并沒有完全普及。不是很明白是它的學(xué)習(xí)成本高,還是說應(yīng)用場(chǎng)景是在太小了。但是當(dāng)我聽到這個(gè)概念的時(shí)候確實(shí)是挺好奇,也挺好奇的。也希望之后會(huì)有更多的框架和特性來簡(jiǎn)化我們苦逼程序員的開發(fā)~~
參考文獻(xiàn)
http://docs.paralleluniverse....
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/74561.html
摘要:本周提交的一份增強(qiáng)建議草案要求將虛擬線程作為標(biāo)準(zhǔn)版的一部分進(jìn)行預(yù)覽。虛擬線程目的是更好地支持編寫和維護(hù)高吞吐量并發(fā)應(yīng)用程序。該提案指出,使用虛擬線程不需要學(xué)習(xí)新的編程模型。我們知道 Go 語言最大亮點(diǎn)之一就是原生支持并發(fā),這得益于 Go 語言的協(xié)程機(jī)制。一個(gè) go 語句就可以發(fā)起一個(gè)協(xié)程 (goroutin)。 協(xié)程本質(zhì)上是一種用戶態(tài)線程,它不需要操作系統(tǒng)來進(jìn)行調(diào)度,而是由用戶程序自行管理...
摘要:相比與其他操作系統(tǒng)包括其他類系統(tǒng)有很多的優(yōu)點(diǎn),其中有一項(xiàng)就是,其上下文切換和模式切換的時(shí)間消耗非常少。因?yàn)槎嗑€程競(jìng)爭(zhēng)鎖時(shí)會(huì)引起上下文切換。減少線程的使用。很多編程語言中都有協(xié)程。所以如何避免死鎖的產(chǎn)生,在我們使用并發(fā)編程時(shí)至關(guān)重要。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Java多線程學(xué)習(xí)(二)synchronized關(guān)鍵字(1) java多線程學(xué)習(xí)(二)syn...
摘要:因?yàn)槎嗑€程競(jìng)爭(zhēng)鎖時(shí)會(huì)引起上下文切換。減少線程的使用。舉個(gè)例子如果說服務(wù)器的帶寬只有,某個(gè)資源的下載速度是,系統(tǒng)啟動(dòng)個(gè)線程下載該資源并不會(huì)導(dǎo)致下載速度編程,所以在并發(fā)編程時(shí),需要考慮這些資源的限制。 最近私下做一項(xiàng)目,一bug幾日未解決,總惶恐。一日頓悟,bug不可怕,怕的是項(xiàng)目不存在bug,與其懼怕,何不與其剛正面。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Jav...
摘要:很長(zhǎng)一段時(shí)間,我都很天真的認(rèn)為,特別是以為代表的庫,才是協(xié)程的樂土。里是沒法實(shí)現(xiàn)協(xié)程,更別說實(shí)現(xiàn)這樣可以的協(xié)程的。咱真的是太井底之蛙了。不完全列表如下還有一個(gè)據(jù)作者說是最的這些協(xié)程庫的實(shí)現(xiàn)方式都是類似的,都是通過字節(jié)碼生成達(dá)到的目的。 很長(zhǎng)一段時(shí)間,我都很天真的認(rèn)為python,特別是以gevent為代表的庫,才是協(xié)程的樂土。Java里是沒法實(shí)現(xiàn)協(xié)程,更別說實(shí)現(xiàn)stackless py...
摘要:線程線程,是程序執(zhí)行流的最小單元。由于線程之間的相互制約,致使線程在運(yùn)行中呈現(xiàn)出間斷性。線程的狀態(tài)機(jī)線程也有就緒阻塞和運(yùn)行三種基本狀態(tài)。在單個(gè)程序中同時(shí)運(yùn)行多個(gè)線程完成不同的工作,稱為多線程??梢砸暈椴煌€程競(jìng)爭(zhēng)一把鎖。 進(jìn)程線程協(xié)程 進(jìn)程 進(jìn)程是一個(gè)實(shí)體。每一個(gè)進(jìn)程都有它自己的地址空間, 文本區(qū)域(text region) 數(shù)據(jù)區(qū)域(data region) 堆棧(stack re...
閱讀 1895·2021-11-17 09:33
閱讀 6489·2021-10-12 10:20
閱讀 2310·2021-09-22 15:50
閱讀 1798·2021-09-22 15:10
閱讀 631·2021-09-10 10:51
閱讀 636·2021-09-10 10:50
閱讀 3059·2021-08-11 11:19
閱讀 1788·2019-08-30 15:55