摘要:理由是如果到了上,而這個對應(yīng)的操作遲遲不能就緒被出來。但我認(rèn)為這其實(shí)是一個超時處理問題。問題是,原生的是沒有超時支持的。如果是回調(diào)性質(zhì)的,一般的做法是正常就緒給一個,超時給另外一個。只要時間合理,作者之前所說的會引發(fā)的問題并不會出現(xiàn)。
grizzly框架的作者曾經(jīng)提到NIO框架不應(yīng)該使用selection key的attach功能(鏈接)。理由是如果attach到了selection key上,而這個selection key對應(yīng)的操作遲遲不能就緒(被select出來)。那么這些selection key所attach的附件都是被強(qiáng)引用的,從而無法被gc。如果有大量這樣的selection key累積,程序就好像發(fā)生了內(nèi)存泄漏了一樣。
但我認(rèn)為這其實(shí)是一個超時處理問題??蚣軕?yīng)該支持設(shè)置超時,并且可以在超時之后調(diào)用框架用戶預(yù)先設(shè)置的處理邏輯,并且釋放掉對應(yīng)的資源。問題是,原生的NIO 1是沒有超時支持的。它提供的是selector,可以注冊,可以select,可以cancel。但是超時需要自己做記錄,程序自己判斷超時了,也就是select了老半天了仍然沒有就緒,那么就需要去調(diào)用cancel把selection key注銷掉。如果使用netty這樣的封裝庫,它是把selector的api轉(zhuǎn)成回調(diào)的形式,同時也添加了超時的支持。NIO 2除了windows的proactor(OICP)部分之外,對于selector基本上就是一個官方版的netty,也是回調(diào)的形式,也支持了超時。
基于協(xié)程來封裝selector的話,支持超時處理自然也不在話下(代碼在此)。如果是回調(diào)性質(zhì)的api,一般的做法是正常就緒給一個callback,超時給另外一個callback??蚣芨鶕?jù)實(shí)際情況決定調(diào)用哪個callback。如果是協(xié)程的api,最自然的方式自然是拋異常了。
private SocketChannel tryAccept(ServerSocketChannel serverSocketChannel) throws IOException, Pausable { while(true) { try { return scheduler.accept(serverSocketChannel); } catch (TimeoutException e) { System.out.println("time out, try again"); continue; } } }
scheduler.accept會有兩個路徑的返回。一個路徑是正常的return一個socket channel,這表明accept阻塞等待成功,拿到了一個socket channel。另外一個返回是拋出了TimeoutException異常,這表明等待超時了??蚣芤龅木褪且诔瑫r的時候拋出這個異常,同時要確保相關(guān)的資源這個時候已經(jīng)釋放掉了,不會引起內(nèi)存泄漏。
首先,需要在做阻塞調(diào)用之前說明超時時間的長度。
scheduler.timeout = 5000; SocketChannel socketChannel = tryAccept(serverSocketChannel);
這里設(shè)置的是5秒之后超時。根據(jù)這個超時時間可以計算一個dead line:
booking.acceptBlocked(getCurrentTimeMillis() + timeout);
然后拿一個小本子記著這個dead line:
public void acceptBlocked(long deadline) throws Pausable, TimeoutException { if (null != acceptTask) { throw new RuntimeException("multiple accept blocked on same channel"); } acceptDeadline = deadline; updateDeadline(); acceptTask = Task.getCurrentTask(); Task.pause(this); if (acceptDeadline == -1) { acceptUnblocked(); throw new TimeoutException(); } }
這個dead line會用來計算整個selector booking四個操作的earliest dead line:
public void updateDeadline() { earliestDeadline = Long.MAX_VALUE; if (readDeadline > 0 && readDeadline < earliestDeadline) { earliestDeadline = readDeadline; } if (writeDeadline > 0 && writeDeadline < earliestDeadline) { earliestDeadline = writeDeadline; } if (acceptDeadline > 0 && acceptDeadline < earliestDeadline) { earliestDeadline = acceptDeadline; } if (connectDeadline > 0 && connectDeadline < earliestDeadline) { earliestDeadline = connectDeadline; } bookings.remove(this); // when timed out, the booking might be removed already if (earliestDeadline != Long.MAX_VALUE) { // add back in case read timed out, but write is still blocking if (!bookings.offer(this)) { throw new RuntimeException("update booking failed"); } } }
也就是說每個selector booking通過這樣的設(shè)置都會有一個自己的時間戳(earliestDeadline)。用這個時間戳可以對booking進(jìn)行一個時間上的排序:
@Override public int compareTo(SelectorBooking that) { if (that.earliestDeadline > this.earliestDeadline) { return -1; } else if (that.earliestDeadline < this.earliestDeadline) { return 1; } return 0; }
因為可以排序,所以也就可以用一個PriorityQueue來維護(hù)一個鏈表以記錄哪個booking是最近會到期的booking。因為PriorityQueue的排序是發(fā)生在插入時的,所以在這個booking的時間戳發(fā)生變更的時候,需要從鏈表中刪除然后二次插入已達(dá)到更新排序的目的。有了這個排序的鏈表之后,就可以用來做兩個事情:決定selector的select等待時間,以及哪些booking的哪些task是超時了的:
protected int doSelect() throws IOException { SelectorBooking booking = selectorBookings.peek(); if (null == booking) { return selector.select(); } else { long delta = booking.getEarliestDeadline() - getCurrentTimeMillis(); if (delta > 0) { return selector.select(delta); } else { return selector.selectNow(); } } } boolean loopOnce() { try { executeReadyTasks(); doSelect(); Iteratoriterator = selector.selectedKeys().iterator(); ioUnblocked(iterator); while (hasDeadSelectorBooking()) { SelectorBooking booking = selectorBookings.poll(); booking.cancelDeadTasks(getCurrentTimeMillis()); } return true; } catch (Exception e) { LOGGER.error("loop died", e); return false; } }
最后就是一件事情了,如果協(xié)程所阻塞的io操作確實(shí)超時了,如何在超時的調(diào)用處拋出異常,以達(dá)到走不通業(yè)務(wù)邏輯路徑的目的:
public void cancelDeadTasks(long currentTimeMillis) { // ... if (null != acceptTask && currentTimeMillis > acceptDeadline) { selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_ACCEPT); acceptDeadline = -1; updateDeadline(); acceptTask.resume(); if (-1 == acceptDeadline) { throw new RuntimeException("accept deadline unhandled"); } } // ... if (0 == selectionKey.interestOps()) { selectionKey.cancel(); } } public void acceptBlocked(long deadline) throws Pausable, TimeoutException { if (null != acceptTask) { throw new RuntimeException("multiple accept blocked on same channel"); } acceptDeadline = deadline; updateDeadline(); acceptTask = Task.getCurrentTask(); Task.pause(this); if (acceptDeadline == -1) { acceptUnblocked(); throw new TimeoutException(); } }
這里是兩方面的配合。一方面是在io循環(huán)的地方設(shè)置一個-1為標(biāo)志位。然后去喚醒協(xié)程。協(xié)程喚醒了之后立即去檢查-1這個標(biāo)志位有沒有設(shè)置,如果設(shè)置了,則認(rèn)為自己被喚醒是因為超時,而不是io操作就緒了。于是TimeoutException被拋出了。特別注意這行:
if (0 == selectionKey.interestOps()) { selectionKey.cancel(); }
通過在超時之后取消了interestOps,然后在所有interestOps都沒有之后自動cancel對應(yīng)的selection key。這個時候?qū)?yīng)的附件也會被垃圾回收給干掉了。只要time out時間合理,grizzly作者之前所說的attach會引發(fā)的問題并不會出現(xiàn)。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/64152.html
摘要:接下來,就看怎么用協(xié)程來實(shí)現(xiàn)異步了。直接拿的原始寫代碼會死人的。引入?yún)f(xié)程就是為了把上下連續(xù)的業(yè)務(wù)邏輯放在一個協(xié)程里,把與業(yè)務(wù)關(guān)系不大的的處理部分放到框架的里。第三部分是放棄掉執(zhí)行權(quán)。這樣一個只能接收打印一行的異步應(yīng)用就寫好了。 前面已經(jīng)準(zhǔn)備好了greenlet對應(yīng)的Java版本了,一個刪減后的kilim(http://segmentfault.com/blog/taowen/11900...
摘要:基本上所有的網(wǎng)絡(luò)應(yīng)用都會示范一個的寫法。除了這些操作的主體是而不是,操作的是,而不是。以為例其過程是這樣的這段代碼就是創(chuàng)建一個,并注冊一個,并把附著到上。關(guān)鍵之一顯然是利用了協(xié)程的和,把回調(diào)轉(zhuǎn)換成順序的邏輯執(zhí)行。 基本上所有的網(wǎng)絡(luò)應(yīng)用都會示范一個tcp的echo寫法。前面我們已經(jīng)看到了如何使用協(xié)程和異步io來做tcp服務(wù)器的第一步,accept。下面是一個完整的echo server的...
摘要:抽象類有一個方法用于使通道處于阻塞模式或非阻塞模式。注意抽象類的方法是由抽象類實(shí)現(xiàn)的,都是直接繼承了抽象類。大家有興趣可以看看的源碼,各種抽象類和抽象類上層的抽象類。 歷史回顧: Java NIO 概覽 Java NIO 之 Buffer(緩沖區(qū)) Java NIO 之 Channel(通道) 其他高贊文章: 面試中關(guān)于Redis的問題看這篇就夠了 一文輕松搞懂redis集群原理及搭建...
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請求都由程序執(zhí)行并返回,這是同步的缺陷。這些都會被注冊在多路復(fù)用器上。多路復(fù)用器提供選擇已經(jīng)就緒狀態(tài)任務(wù)的能力。并沒有采用的多路復(fù)用器,而是使用異步通道的概念。 Netty是一個提供異步事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用框架,用以快速開發(fā)高性能、高可靠的網(wǎng)絡(luò)服務(wù)器和客戶端程序。Netty簡化了網(wǎng)絡(luò)程序的開發(fā),是很多框架和公司...
摘要:后改良為用線程池的方式代替新增線程,被稱為偽異步。最大的問題是阻塞,同步。每次請求都由程序執(zhí)行并返回,這是同步的缺陷。這些都會被注冊在多路復(fù)用器上。多路復(fù)用器提供選擇已經(jīng)就緒狀態(tài)任務(wù)的能力。并沒有采用的多路復(fù)用器,而是使用異步通道的概念。 Netty是一個提供異步事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用框架,用以快速開發(fā)高性能、高可靠的網(wǎng)絡(luò)服務(wù)器和客戶端程序。Netty簡化了網(wǎng)絡(luò)程序的開發(fā),是很多框架和公司...
閱讀 1587·2021-10-18 13:35
閱讀 2370·2021-10-09 09:44
閱讀 825·2021-10-08 10:05
閱讀 2723·2021-09-26 09:47
閱讀 3578·2021-09-22 15:22
閱讀 441·2019-08-29 12:24
閱讀 2005·2019-08-29 11:06
閱讀 2862·2019-08-26 12:23