成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

Java-協(xié)程

CoffeX / 2265人閱讀

摘要:什么是協(xié)程大多數(shù)的開發(fā)人員可能對(duì)進(jìn)程,線程這兩個(gè)名字比較熟悉。但是為了追求最大力度的發(fā)揮硬件的性能和提升軟件的速度,出現(xiàn)了協(xié)程或者叫纖程,或者綠色線程。原理利用字節(jié)碼增強(qiáng),將普通的代碼轉(zhuǎn)換為支持協(xié)程的代碼。

什么是協(xié)程
大多數(shù)的開發(fā)人員可能對(duì)進(jìn)程,線程這兩個(gè)名字比較熟悉。但是為了追求最大力度的發(fā)揮硬件的性能和提升軟件的速度,出現(xiàn)了協(xié)程或者叫纖程(Fiber),或者綠色線程(GreenThread)。那我們來聊下什么是協(xié)程,以及在java中是怎么體現(xiàn)和運(yùn)用協(xié)程的。

在說協(xié)程之前,我們先來回想下,現(xiàn)在大多數(shù)的程序中,都是使用了多線程技術(shù)來解決一些需要長(zhǎng)時(shí)間阻塞的場(chǎng)景。JAVA中每個(gè)線程棧默認(rèn)1024K,沒有辦法開成千上萬個(gè)線程,而且就算通過JVM參數(shù)調(diào)小,CPU也無法分配時(shí)間片給每個(gè)線程,大多數(shù)的線程還是在等待中,所以我們一般會(huì)使用
Runtime.getRuntime().availableProcessors()來配置線程數(shù)的大小(或者會(huì)根據(jù)實(shí)際情況調(diào)整,就不展開討論了),但是就算是我們開了新的線程,該線程也可能是在等待系統(tǒng)IO的返回或者網(wǎng)絡(luò)IO的返回,而且線程的切換有著大量的開銷。

為了解決上面說的問題,大家可能會(huì)想到回調(diào)?,F(xiàn)在很多框架都是基于回調(diào)來解決那些耗時(shí)的操作。但層數(shù)嵌套多了反而會(huì)引起反人類的回調(diào)地獄,并且回調(diào)后就丟失原函數(shù)的上下文。其中的代表呢就比如說nodeJs。

終于可以來聊聊協(xié)程。它的基本原理是:在某個(gè)點(diǎn)掛起當(dāng)前的任務(wù),并且保存棧信息,去執(zhí)行另一個(gè)任務(wù);等完成或達(dá)到某個(gè)條件時(shí),在還原原來的棧信息并繼續(xù)執(zhí)行。上面提到的幾個(gè)點(diǎn)大家會(huì)想到JVM的結(jié)構(gòu),棧, 程序計(jì)數(shù)器等等,但是JVM原生是不支持這樣的操作的(至少java是不支持的,kotlin是可以的)。因此如果要在純java代碼里需要使用協(xié)程的話需要引入第三方包,如kilim,Quasar。而kilim已經(jīng)很久未更新了,那么我們來看看Quasar。

Quasar原理
1、利用字節(jié)碼增強(qiáng),將普通的java代碼轉(zhuǎn)換為支持協(xié)程的代碼。
2、在調(diào)用pausable方法的時(shí)候,如果pause了就保存當(dāng)前方法棧的State,停止執(zhí)行當(dāng)前協(xié)程,將控制權(quán)交給調(diào)度器
3、調(diào)度器負(fù)責(zé)調(diào)度就緒的協(xié)程
4、協(xié)程resume的時(shí)候,自動(dòng)恢復(fù)State,根據(jù)協(xié)程的pc計(jì)數(shù)跳轉(zhuǎn)到上次執(zhí)行的位置,繼續(xù)執(zhí)行。
這些第三方的框架大部分實(shí)現(xiàn)是一致的。通過對(duì)字節(jié)碼直接操作,在編譯期把你寫的代碼變?yōu)橹С謪f(xié)程的版本,并在運(yùn)行時(shí)把你所有需要用到協(xié)程的部分由他來控制和調(diào)度,同時(shí)也支持在運(yùn)行期這樣做。
Quasar中使用了拋異常的方式來中斷線程,但是 實(shí)際上如果我們捕獲了這個(gè)異常就會(huì)產(chǎn)生問題,所以一般是以這種方式來注冊(cè):
@Suspendable
public int f() {
try {
// do some stuff
return g() * 2;
} catch(SuspendExecution s) {
//這里不應(yīng)該捕獲到異常.
throw new AssertionError(s);
}
}
在調(diào)度方面,Quasar中默認(rèn)使用了JDK7以上才有的ForkJoinPool,它的優(yōu)勢(shì)就在于空閑的線程會(huì)去從其他線程任務(wù)隊(duì)列尾部”偷取”任務(wù)來自己處理,因此也叫work-stealing功能。這個(gè)功能可以大大的利用CPU資源,不讓線程白白空閑著。

Quasar模塊

Fiber
Fiber可以認(rèn)為是一個(gè)微線程,使用方式基本上和Thread相同,啟動(dòng)start:
new Fiber() {
@Override
protected V run() throws SuspendExecution, InterruptedException {

   // your code

}
}.start();
new Fiber(new SuspendableRunnable() {
public void run() throws SuspendExecution, InterruptedException {
// your code
}
}).start();

其實(shí)它更類似于一個(gè)CallBack,是可以攜帶返回值的,并且可以拋異常SuspendExecution,InterruptedException。你也可以向其中傳遞SuspendableRunnable 或 SuspendableCallable 給Fiber的構(gòu)造函數(shù)。你甚至可以像線程一樣調(diào)用join(),或者get()來阻塞線程等待他完成。
當(dāng)Fiber比較大的時(shí)候,F(xiàn)iber可以在調(diào)用parkAndSerialize 方法時(shí)被序列化,在調(diào)用unparkSerialized時(shí)被反序列化。
從以上我們可以看出Fiber與Thread非常類似,極大的減少了遷移的成本。

FiberScheduler
FiberScheduler是Quasar框架中核心的任務(wù)調(diào)度器,負(fù)責(zé)管理任務(wù)的工作者線程WorkerThread,之前提到的他是一個(gè)FiberForkJoinScheduler。
ForkJoinPool的默認(rèn)初始化個(gè)數(shù)為Runtime.getRuntime().availableProcessors()。

instrumentation
當(dāng)一個(gè)類被加載時(shí),Quasar的instrumentation模塊 (使用 Java agent時(shí)) 搜索suspendable 方法。每一個(gè)suspendable 方法 f通過下面的方式 instrument:

它搜索對(duì)其它suspendable方法的調(diào)用。對(duì)suspendable方法g的調(diào)用,一些代碼會(huì)在這個(gè)調(diào)用g的前后被插入,它們會(huì)保存和恢復(fù)fiber棧本地變量的狀態(tài),記錄這個(gè)暫停點(diǎn)。在這個(gè)“suspendable function chain”的最后,我們會(huì)發(fā)現(xiàn)對(duì)Fiber.park的調(diào)用。park暫停這個(gè)fiber,扔出 SuspendExecution異常。

當(dāng)g block的時(shí)候,SuspendExecution異常會(huì)被Fiber捕獲。 當(dāng)Fiber被喚醒(使用unpark), 方法f會(huì)被調(diào)用, 執(zhí)行記錄顯示它被block在g的調(diào)用上,所以程序會(huì)立即跳到f調(diào)用g的那一行,然后調(diào)用它。最終我們會(huì)到達(dá)暫停點(diǎn),然后繼續(xù)執(zhí)行。當(dāng)g返回時(shí), f中插入的代碼會(huì)恢復(fù)f的本地變量。

過程聽起來很復(fù)雜,但是它只會(huì)帶來3% ~ 5%的性能的損失。

下面看一個(gè)簡(jiǎn)單的例子, 方法m2聲明拋出SuspendExecution異常,方法m1調(diào)用m2和m3,所以也聲明拋出這個(gè)異常,最后這個(gè)異常會(huì)被Fiber所捕獲:

public class Helloworld {
static void m1() throws SuspendExecution, InterruptedException {

   String m = "m1";
   System.out.println("m1 begin");
   m = m2();
   m = m3();
   System.out.println("m1 end");
   System.out.println(m);

}
static String m2() throws SuspendExecution, InterruptedException {

   return "m2";

}
static String m3() throws SuspendExecution, InterruptedException {

   return "m3";

}
static public void main(String[] args) throws ExecutionException, InterruptedException {

   new Fiber("Caller", new SuspendableRunnable() {
       @Override
       public void run() throws SuspendExecution, InterruptedException {
           m1();
       }
   }).start();

}
}

// 反編譯后的代碼
@Instrumented(suspendableCallSites={16, 17}, methodStart=13, methodEnd=21, methodOptimized=false)
static void m1()
throws SuspendExecution, InterruptedException
{
// Byte code:
// 0: aconst_null
// 1: astore_3
// 2: invokestatic 88 co/paralleluniverse/fibers/Stack:getStack ()Lco/paralleluniverse/fibers/Stack;
// 5: dup
// 6: astore_1
// 7: ifnull +42 -> 49
// 10: aload_1
// 11: iconst_1
// 12: istore_2
// 13: invokevirtual 92 co/paralleluniverse/fibers/Stack:nextMethodEntry ()I
// 16: tableswitch default:+24->40, 1:+64->80, 2:+95->111
// 40: aload_1
// 41: invokevirtual 96 co/paralleluniverse/fibers/Stack:isFirstInStackOrPushed ()Z
// 44: ifne +5 -> 49
// 47: aconst_null
// 48: astore_1
// 49: iconst_0
// 50: istore_2
// 51: ldc 2
// 53: astore_0
// 54: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 57: ldc 4
// 59: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 62: aload_1
// 63: ifnull +26 -> 89
// 66: aload_1
// 67: iconst_1
// 68: iconst_1
// 69: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 72: aload_0
// 73: aload_1
// 74: iconst_0
// 75: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 78: iconst_0
// 79: istore_2
// 80: aload_1
// 81: iconst_0
// 82: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 85: checkcast 110 java/lang/String
// 88: astore_0
// 89: invokestatic 6 com/colobu/fiber/Helloworld:m2 ()Ljava/lang/String;
// 92: astore_0
// 93: aload_1
// 94: ifnull +26 -> 120
// 97: aload_1
// 98: iconst_2
// 99: iconst_1
// 100: invokevirtual 100 co/paralleluniverse/fibers/Stack:pushMethod (II)V
// 103: aload_0
// 104: aload_1
// 105: iconst_0
// 106: invokestatic 104 co/paralleluniverse/fibers/Stack:push (Ljava/lang/Object;Lco/paralleluniverse/fibers/Stack;I)V
// 109: iconst_0
// 110: istore_2
// 111: aload_1
// 112: iconst_0
// 113: invokevirtual 108 co/paralleluniverse/fibers/Stack:getObject (I)Ljava/lang/Object;
// 116: checkcast 110 java/lang/String
// 119: astore_0
// 120: invokestatic 7 com/colobu/fiber/Helloworld:m3 ()Ljava/lang/String;
// 123: astore_0
// 124: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 127: ldc 8
// 129: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 132: getstatic 3 java/lang/System:out Ljava/io/PrintStream;
// 135: aload_0
// 136: invokevirtual 5 java/io/PrintStream:println (Ljava/lang/String;)V
// 139: aload_1
// 140: ifnull +7 -> 147
// 143: aload_1
// 144: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 147: return
// 148: aload_1
// 149: ifnull +7 -> 156
// 152: aload_1
// 153: invokevirtual 113 co/paralleluniverse/fibers/Stack:popMethod ()V
// 156: athrow
// Line number table:
// Java source line #13 -> byte code offset #51
// Java source line #15 -> byte code offset #54
// Java source line #16 -> byte code offset #62
// Java source line #17 -> byte code offset #93
// Java source line #18 -> byte code offset #124
// Java source line #19 -> byte code offset #132
// Java source line #21 -> byte code offset #139
// Local variable table:
// start length slot name signature
// 53 83 0 m String
// 6 147 1 localStack co.paralleluniverse.fibers.Stack
// 12 99 2 i int
// 1 1 3 localObject Object
// 156 1 4 localSuspendExecution SuspendExecution
// Exception table:
// from to target type
// 49 148 148 finally
// 49 148 156 co/paralleluniverse/fibers/SuspendExecution
// 49 148 156 co/paralleluniverse/fibers/RuntimeSuspendExecution
}
我并沒有更深入的去了解Quasar的實(shí)現(xiàn)細(xì)節(jié)以及調(diào)度算法,有興趣的讀者可以翻翻它的代碼。

實(shí)戰(zhàn)

maven-dependency-plugin
2.5.1


getClasspathFilenames

properties



  public class Helloworld {

@Suspendable
static void m1() throws InterruptedException, SuspendExecution {

   String m = "m1";
   //System.out.println("m1 begin");
   m = m2();
   //System.out.println("m1 end");
   //System.out.println(m);

}
static String m2() throws SuspendExecution, InterruptedException {

   String m = m3();
   Strand.sleep(1000);
   return m;

}
//or define in META-INF/suspendables
@Suspendable
static String m3() {

   List l = Stream.of(1,2,3).filter(i -> i%2 == 0).collect(Collectors.toList());
   return l.toString();

}
static public void main(String[] args) throws ExecutionException, InterruptedException {

   int count = 10000;
   testThreadpool(count);
   testFiber(count);

}
static void testThreadpool(int count) throws InterruptedException {

   final CountDownLatch latch = new CountDownLatch(count);
   ExecutorService es = Executors.newFixedThreadPool(200);
   LongAdder latency = new LongAdder();
   long t = System.currentTimeMillis();
   for (int i =0; i< count; i++) {
       es.submit(() -> {
           long start = System.currentTimeMillis();
           try {
               m1();
           } catch (InterruptedException e) {
               e.printStackTrace();
           } catch (SuspendExecution suspendExecution) {
               suspendExecution.printStackTrace();
           }
           start = System.currentTimeMillis() - start;
           latency.add(start);
           latch.countDown();
       });
   }
   latch.await();
   t = System.currentTimeMillis() - t;
   long l = latency.longValue() / count;
   System.out.println("thread pool took: " + t + ", latency: " + l + " ms");
   es.shutdownNow();

}
static void testFiber(int count) throws InterruptedException {

   final CountDownLatch latch = new CountDownLatch(count);
   LongAdder latency = new LongAdder();
   long t = System.currentTimeMillis();
   for (int i =0; i< count; i++) {
       new Fiber("Caller", new SuspendableRunnable() {
           @Override
           public void run() throws SuspendExecution, InterruptedException {
               long start = System.currentTimeMillis();
               m1();
               start = System.currentTimeMillis() - start;
               latency.add(start);
               latch.countDown();
           }
       }).start();
   }
   latch.await();
   t = System.currentTimeMillis() - t;
   long l = latency.longValue() / count;
   System.out.println("fiber took: " + t  + ", latency: " + l + " ms");

}
}

OUTPUT:
1
2
thread pool took: 50341, latency: 1005 ms
fiber took: 1158, latency: 1000 ms

可以看到很明顯的時(shí)間差距,存在多線程阻塞的情況下,協(xié)程的性能非常的好,但是。如果把sleep這段去掉,F(xiàn)iber的性能反而更差:

這說明Fiber并不意味著它可以在所有的場(chǎng)景中都可以替換Thread。當(dāng)fiber的代碼經(jīng)常會(huì)被等待其它fiber阻塞的時(shí)候,就應(yīng)該使用fiber。

對(duì)于那些需要CPU長(zhǎng)時(shí)間計(jì)算的代碼,很少遇到阻塞的時(shí)候,就應(yīng)該首選thread

擴(kuò)展
其實(shí)協(xié)程這個(gè)概念在其他的語言中有原生的支持,如:
kotlin 1.30之后已經(jīng)穩(wěn)定
: https://www.kotlincn.net/docs...
golang : https://gobyexample.com/gorou...
python : http://www.gevent.org/content...~
在這些語言中協(xié)程就看起來至少?zèng)]這么奇怪或者難以理解了,而且開發(fā)起開也相比java簡(jiǎn)單很多。

總結(jié)
協(xié)程的概念也不算是很新了,但是在像Java這樣的語言或者特定的領(lǐng)域并不是很火,也并沒有完全普及。不是很明白是它的學(xué)習(xí)成本高,還是說應(yīng)用場(chǎng)景是在太小了。但是當(dāng)我聽到這個(gè)概念的時(shí)候確實(shí)是挺好奇,也挺好奇的。也希望之后會(huì)有更多的框架和特性來簡(jiǎn)化我們苦逼程序員的開發(fā)~~

參考文獻(xiàn)
http://docs.paralleluniverse....

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/74561.html

相關(guān)文章

  • Java將增加虛擬線程,挑戰(zhàn)Go協(xié)程

    摘要:本周提交的一份增強(qiáng)建議草案要求將虛擬線程作為標(biāo)準(zhǔn)版的一部分進(jìn)行預(yù)覽。虛擬線程目的是更好地支持編寫和維護(hù)高吞吐量并發(fā)應(yīng)用程序。該提案指出,使用虛擬線程不需要學(xué)習(xí)新的編程模型。我們知道 Go 語言最大亮點(diǎn)之一就是原生支持并發(fā),這得益于 Go 語言的協(xié)程機(jī)制。一個(gè) go 語句就可以發(fā)起一個(gè)協(xié)程 (goroutin)。 協(xié)程本質(zhì)上是一種用戶態(tài)線程,它不需要操作系統(tǒng)來進(jìn)行調(diào)度,而是由用戶程序自行管理...

    why_rookie 評(píng)論0 收藏0
  • Java多線程學(xué)習(xí)(七)并發(fā)編程中一些問題

    摘要:相比與其他操作系統(tǒng)包括其他類系統(tǒng)有很多的優(yōu)點(diǎn),其中有一項(xiàng)就是,其上下文切換和模式切換的時(shí)間消耗非常少。因?yàn)槎嗑€程競(jìng)爭(zhēng)鎖時(shí)會(huì)引起上下文切換。減少線程的使用。很多編程語言中都有協(xié)程。所以如何避免死鎖的產(chǎn)生,在我們使用并發(fā)編程時(shí)至關(guān)重要。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Java多線程學(xué)習(xí)(二)synchronized關(guān)鍵字(1) java多線程學(xué)習(xí)(二)syn...

    dingding199389 評(píng)論0 收藏0
  • Java多線程學(xué)習(xí)(七)并發(fā)編程中一些問題

    摘要:因?yàn)槎嗑€程競(jìng)爭(zhēng)鎖時(shí)會(huì)引起上下文切換。減少線程的使用。舉個(gè)例子如果說服務(wù)器的帶寬只有,某個(gè)資源的下載速度是,系統(tǒng)啟動(dòng)個(gè)線程下載該資源并不會(huì)導(dǎo)致下載速度編程,所以在并發(fā)編程時(shí),需要考慮這些資源的限制。 最近私下做一項(xiàng)目,一bug幾日未解決,總惶恐。一日頓悟,bug不可怕,怕的是項(xiàng)目不存在bug,與其懼怕,何不與其剛正面。 系列文章傳送門: Java多線程學(xué)習(xí)(一)Java多線程入門 Jav...

    yimo 評(píng)論0 收藏0
  • 協(xié)程原理】 - Java中的協(xié)程

    摘要:很長(zhǎng)一段時(shí)間,我都很天真的認(rèn)為,特別是以為代表的庫,才是協(xié)程的樂土。里是沒法實(shí)現(xiàn)協(xié)程,更別說實(shí)現(xiàn)這樣可以的協(xié)程的。咱真的是太井底之蛙了。不完全列表如下還有一個(gè)據(jù)作者說是最的這些協(xié)程庫的實(shí)現(xiàn)方式都是類似的,都是通過字節(jié)碼生成達(dá)到的目的。 很長(zhǎng)一段時(shí)間,我都很天真的認(rèn)為python,特別是以gevent為代表的庫,才是協(xié)程的樂土。Java里是沒法實(shí)現(xiàn)協(xié)程,更別說實(shí)現(xiàn)stackless py...

    dongfangyiyu 評(píng)論0 收藏0
  • JAVA并發(fā)編程--1.基礎(chǔ)概念

    摘要:線程線程,是程序執(zhí)行流的最小單元。由于線程之間的相互制約,致使線程在運(yùn)行中呈現(xiàn)出間斷性。線程的狀態(tài)機(jī)線程也有就緒阻塞和運(yùn)行三種基本狀態(tài)。在單個(gè)程序中同時(shí)運(yùn)行多個(gè)線程完成不同的工作,稱為多線程??梢砸暈椴煌€程競(jìng)爭(zhēng)一把鎖。 進(jìn)程線程協(xié)程 進(jìn)程 進(jìn)程是一個(gè)實(shí)體。每一個(gè)進(jìn)程都有它自己的地址空間, 文本區(qū)域(text region) 數(shù)據(jù)區(qū)域(data region) 堆棧(stack re...

    abson 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<