摘要:基本上所有的網(wǎng)絡(luò)應(yīng)用都會示范一個的寫法。除了這些操作的主體是而不是,操作的是,而不是。以為例其過程是這樣的這段代碼就是創(chuàng)建一個,并注冊一個,并把附著到上。關(guān)鍵之一顯然是利用了協(xié)程的和,把回調(diào)轉(zhuǎn)換成順序的邏輯執(zhí)行。
基本上所有的網(wǎng)絡(luò)應(yīng)用都會示范一個tcp的echo寫法。前面我們已經(jīng)看到了如何使用協(xié)程和異步io來做tcp服務(wù)器的第一步,accept。下面是一個完整的echo server的實現(xiàn)(完整代碼):
package org.github.taowen.daili; import kilim.Pausable; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; public class Main { public static void main(String[] args) throws Exception { Scheduler scheduler = new Scheduler(); DailiTask task = new DailiTask(scheduler) { @Override public void execute() throws Pausable, Exception { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(9090)); serverSocketChannel.configureBlocking(false); System.out.println("listening..."); scheduler.timeout = 5000; SocketChannel socketChannel = scheduler.accept(serverSocketChannel); socketChannel.configureBlocking(false); ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024); while (scheduler.read(socketChannel, byteBuffer) > 0) { byteBuffer.flip(); scheduler.write(socketChannel, byteBuffer); byteBuffer.clear(); } } }; scheduler.callSoon(task); scheduler.loop(); } }
從上面的代碼來看,完全沒有異步IO的感覺,代碼寫出來和傳統(tǒng)Java同步網(wǎng)絡(luò)編碼是一樣的。除了scheduler.accept,scheduler.read這些操作的主體是scheduler而不是socket,操作的是byte buffer,而不是input/output stream。
這段代碼中最關(guān)鍵的是其中的那個task,是一個協(xié)程。scheduler.accept,read和accept三處會引起task的跳出執(zhí)行,跳出的時候會把task當前在做的IO等待記錄到內(nèi)部的一個叫SelectorBooking的身上。以readBlocked為例其過程是這樣的:
public int read(SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException, Pausable { int bytesCount = socketChannel.read(byteBuffer); if (bytesCount > 0) { return bytesCount; } SelectionKey selectionKey = socketChannel.keyFor(selector); if (null == selectionKey) { selectionKey = socketChannel.register(selector, SelectionKey.OP_READ); SelectorBooking booking = addSelectorBooking(selectionKey); selectionKey.attach(booking); } else { selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_READ); } SelectorBooking booking = (SelectorBooking) selectionKey.attachment(); booking.readBlocked(getCurrentTimeMillis() + timeout); return socketChannel.read(byteBuffer); }
這段代碼就是創(chuàng)建一個SelectorBooking,并注冊一個SelectionKey,并把booking附著到selection key上。這樣selection key被select出來之后,就可以根據(jù)booking找到對應(yīng)喚醒的task。注意的是selection key是一個socket一個的,但是可能對應(yīng)的有四個操作(accept/connect/read/write),所以booking上可能會有四個被阻塞掛起的task分別對應(yīng)不同的操作。
而booking和task的交互發(fā)生在booking.readBlocked這個調(diào)用內(nèi)部:
public void readBlocked(long deadline) throws Pausable { if (null != readTask) { throw new RuntimeException("multiple read blocked on same channel"); } readDeadline = deadline; updateDeadline(); readTask = Task.getCurrentTask(); Task.pause(this); if (readDeadline == -1) { readUnblocked(); throw new RuntimeException("timeout"); } }
其中 Task.getCurrentTask 是一個神奇的調(diào)用。它可以得到當前的“協(xié)程”。得到的這個協(xié)程可以在掛起之后調(diào)用resume重新喚醒。
Task.pause 是另外一處神奇的調(diào)用。它使得當前執(zhí)行的協(xié)程掛起。等到下面那行if被執(zhí)行到,已經(jīng)是別的地方調(diào)用resume之后的事情了。
通過這樣的一些列操作,就完成一個協(xié)程的掛起,并把協(xié)程和異步io等信息注冊到selector上的過程。
主循環(huán)只需要調(diào)用selector,找到就緒了的selection key,然后根據(jù)之前attach的附件找到booking,通過booking找到需要喚醒的協(xié)程,然后調(diào)用resume就可以讓協(xié)程上的業(yè)務(wù)邏輯繼續(xù)往下執(zhí)行了:
public void loop() { while (loopOnce()) { } } boolean loopOnce() { try { executeReadyTasks(); doSelect(); Iteratoriterator = selector.selectedKeys().iterator(); ioUnblocked(iterator); while (hasDeadSelectorBooking()) { SelectorBooking booking = selectorBookings.poll(); booking.cancelDeadTasks(getCurrentTimeMillis()); } return true; } catch (Exception e) { LOGGER.error("loop died", e); return false; } }
這種做法非常經(jīng)典。關(guān)鍵之一顯然是利用了協(xié)程的pause和resume,把回調(diào)轉(zhuǎn)換成順序的邏輯執(zhí)行。關(guān)鍵之二就是利用了selection key的附件功能,把協(xié)程附著到了selection key上從而在select出來之后可以迅速恢復到阻塞之前的程序狀態(tài)(resume是一個局部上下文恢復的過程)。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/64153.html
摘要:基于的和的,比和之流好看實在太多了。而且同樣是異步實現(xiàn)的,應(yīng)該性能不差的。支持多個客戶端同時連接的。這個是配套的文件特別提一下,使用非常方便,直接可以打開的項目。 現(xiàn)代的Java開發(fā)真的和我當年認識的很不一樣了,這三篇文章非常值得一讀: http://blog.paralleluniverse.co/2014/05/01/modern-java/http://blog.paralle...
摘要:在本文中我將會介紹應(yīng)用性能優(yōu)化的一般原則。性能優(yōu)化的流程圖摘取自和合著的性能,描述了應(yīng)用性能優(yōu)化的處理流程。例如,對每臺服務(wù)器,你面臨著為單個分配堆內(nèi)存和運行個并為每個分配堆內(nèi)存的選擇。不過位能使用堆內(nèi)存最大理論值只有。 原文鏈接:http://www.cubrid.org/blog/dev-platform/the-principles-of-java-application-per...
摘要:強引用中最常見的引用,引用計數(shù)算法的就是典型的強引用,只要強引用還存在,垃圾收集器永遠不會回收掉被引用的對象。 概述 早在半個世紀以前,第一個使用了內(nèi)存動態(tài)分配和垃圾收集技術(shù)的語言Lisp就已經(jīng)誕生了,從那時,人們就在思考關(guān)于gc需要完成的三件事請: 哪些內(nèi)存需要回收 什么時候回收 如何回收 直到今天已經(jīng)有越來越多的語言開始內(nèi)置內(nèi)存動態(tài)分配和垃圾收集技術(shù)。經(jīng)過長時間的發(fā)展,這些技術(shù)...
閱讀 2891·2021-08-20 09:37
閱讀 1617·2019-08-30 12:47
閱讀 1101·2019-08-29 13:27
閱讀 1693·2019-08-28 18:02
閱讀 758·2019-08-23 18:15
閱讀 3095·2019-08-23 16:51
閱讀 939·2019-08-23 14:13
閱讀 2156·2019-08-23 13:05