摘要:如果數(shù)據(jù)庫檢測到是連續(xù)的,則表明沒有串包,如果不連續(xù),則表示串包,數(shù)據(jù)庫會(huì)直接丟棄該連接。源碼分析上一節(jié)我們分析到,當(dāng)一個(gè)前端連接過來,并不是直接和綁定,而是先插入到線程的注冊隊(duì)列中這樣能釋放的壓力處理更多前端連接。
報(bào)文格式
這一節(jié)我們來講Cobar Handshake的過程。
MySQL服務(wù)端和客戶端交互的所有的包格式都是統(tǒng)一的,報(bào)文格式如下圖:
MySQL報(bào)文的消息頭共有4個(gè)字節(jié),前3字節(jié)表示的是實(shí)際數(shù)據(jù)的長度(不包含消息頭),并且字節(jié)是按照小端模式排放的。
第四個(gè)字節(jié)MySQL為了防止串包用的,其原理是每收到一個(gè)報(bào)文,都在sequence id上加1。如果數(shù)據(jù)庫檢測到sequence id是連續(xù)的,則表明沒有串包,如果不連續(xù),則表示串包,數(shù)據(jù)庫會(huì)直接丟棄該連接。
小端模式就是低位字節(jié)排放在內(nèi)存的低地址端,高位字節(jié)排放在內(nèi)存的高地址端。 大端模式則相反。
下面是Handshake包的結(jié)構(gòu),括號(hào)內(nèi)表示該字段的字節(jié)數(shù):
seed部分是加密種子,分為前后兩個(gè)部分,通過隨機(jī)數(shù)生成。
源碼分析上一節(jié)我們分析到,當(dāng)一個(gè)前端連接過來,并不是直接和selector綁定,而是先插入到R線程的注冊隊(duì)列中,這樣能釋放NIOAcceptor的壓力,處理更多前端連接。所以,連接和selector的綁定過程是在R線程中進(jìn)行的,由register方法實(shí)現(xiàn),代碼如下:
private void register(Selector selector) { NIOConnection c = null; while ((c = registerQueue.poll()) != null) { try { c.register(selector); } catch (Throwable e) { c.error(ErrorCode.ERR_REGISTER, e); } } }
實(shí)際的綁定操作是由NIOConnection的register方法實(shí)現(xiàn)的,NIOConnection接口的抽象類是AbstractConnection,我們來看它實(shí)現(xiàn)的register方法:
@Override public void register(Selector selector) throws IOException { try { // 該連接只監(jiān)聽socket可讀事件 processKey = channel.register(selector, SelectionKey.OP_READ, this); isRegistered = true; } finally { if (isClosed.get()) { clearSelectionKey(); } } }
我們發(fā)現(xiàn),前端連接注冊選擇器時(shí),只監(jiān)聽了可讀事件。這是考慮到,Java的NIO屬于水平觸發(fā)LT(只要滿足條件,就觸發(fā)一個(gè)事件),使用水平觸發(fā)時(shí),如果應(yīng)用程序不需要寫就不要關(guān)注socket可寫的事件,否則就會(huì)無限次地立即返回write ready notification,長期關(guān)注socket可寫事件會(huì)出現(xiàn)CPU打滿的情況,所以在使用JDK的NIO編程時(shí),如果沒有數(shù)據(jù)往外寫,就取消寫事件,有數(shù)據(jù)往外寫時(shí)再注冊寫事件。
FrontendConnection繼承了AbstractConnection,它又重新實(shí)現(xiàn)了register方法,代碼如下:
@Override public void register(Selector selector) throws IOException { // 調(diào)用父類的register方法 super.register(selector); if (!isClosed.get()) { // 生成認(rèn)證數(shù)據(jù) byte[] rand1 = RandomUtil.randomBytes(8); byte[] rand2 = RandomUtil.randomBytes(12); // 保存認(rèn)證數(shù)據(jù) byte[] seed = new byte[rand1.length + rand2.length]; System.arraycopy(rand1, 0, seed, 0, rand1.length); System.arraycopy(rand2, 0, seed, rand1.length, rand2.length); this.seed = seed; // 發(fā)送握手?jǐn)?shù)據(jù)包 HandshakePacket hs = new HandshakePacket(); hs.packetId = 0; hs.protocolVersion = Versions.PROTOCOL_VERSION; hs.serverVersion = Versions.SERVER_VERSION; hs.threadId = id; hs.seed = rand1; hs.serverCapabilities = getServerCapabilities(); hs.serverCharsetIndex = (byte) (charsetIndex & 0xff); hs.serverStatus = 2; hs.restOfScrambleBuff = rand2; // 異步寫入Handshake包 hs.write(this); } }
該方法生成了HandShake包,和上面結(jié)構(gòu)圖相一致,關(guān)鍵是最后異步寫入HandShake包的write方法,代碼如下:
public void write(FrontendConnection c) { // 分配緩存 ByteBuffer buffer = c.allocate(); // 將HandShake包寫入緩存 BufferUtil.writeUB3(buffer, calcPacketSize()); buffer.put(packetId); buffer.put(protocolVersion); BufferUtil.writeWithNull(buffer, serverVersion); BufferUtil.writeUB4(buffer, threadId); BufferUtil.writeWithNull(buffer, seed); BufferUtil.writeUB2(buffer, serverCapabilities); buffer.put(serverCharsetIndex); BufferUtil.writeUB2(buffer, serverStatus); buffer.put(FILLER_13); // buffer.position(buffer.position() + 13); BufferUtil.writeWithNull(buffer, restOfScrambleBuff); // 將ByteBuffer中的數(shù)據(jù)異步寫入Socket c.write(buffer); }
我們再來看最后一行的write方法:
@Override public void write(ByteBuffer buffer) { // 檢查連接是否關(guān)閉,若關(guān)閉則將緩存回收 if (isClosed.get()) { processor.getBufferPool().recycle(buffer); return; } if (isRegistered) { try { // 將緩存先插入對(duì)隊(duì)列中,其實(shí)就是一個(gè)循環(huán)數(shù)組,如數(shù)組已滿,則 wait; // 這個(gè)隊(duì)列是AbstractConnection的一個(gè)成員變量 writeQueue.put(buffer); } catch (InterruptedException e) { error(ErrorCode.ERR_PUT_WRITE_QUEUE, e); return; } // 插入隊(duì)列后,調(diào)用NIOProcessor的postWrite方法,其實(shí)就是NIOReacor的postWrite方法 processor.postWrite(this); } else { // 若連接未注冊,也回收緩存 processor.getBufferPool().recycle(buffer); close(); } }
我們看NIOReactor的postWrite方法:
final void postWrite(NIOConnection c) { reactorW.writeQueue.offer(c); }
其實(shí)是將連接插入到W線程的writeQueue阻塞隊(duì)列中,我們再來看W線程的run方法,
@Override public void run() { NIOConnection c = null; for (;;) { try { if ((c = writeQueue.take()) != null) { write(c); } } catch (Throwable e) { LOGGER.warn(name, e); } } } private void write(NIOConnection c) { try { c.writeByQueue(); } catch (Throwable e) { c.error(ErrorCode.ERR_WRITE_BY_QUEUE, e); } }
輪詢阻塞隊(duì)列,若隊(duì)列不為空,則取出連接,基于隊(duì)列寫方法writeByQueue將緩存中的數(shù)據(jù)寫入socket,下一節(jié)再分析writeByQueue方法。
總結(jié)閱讀源碼后,發(fā)現(xiàn)Cobar從前端連接的accept并注冊selector到發(fā)送Handshake包都是異步,本質(zhì)是將連接插入到R線程和W線程的阻塞隊(duì)列中,不立即進(jìn)行注冊和寫操作,從而實(shí)現(xiàn)整個(gè)過程的異步化,提高了Cobar的吞吐量。
以上。
原文鏈接https://segmentfault.com/a/11...
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/70582.html
摘要:的使用方法就不多介紹了,本文的主要內(nèi)容是剖析的源代碼。而又有一個(gè)私有的靜態(tài)變量,以及獲取這個(gè)私有靜態(tài)變量的靜態(tài)方法,顯然,這是一個(gè)單例設(shè)計(jì)模式,使程序運(yùn)行的時(shí)候全局只有一個(gè)對(duì)象。 簡介 當(dāng)業(yè)務(wù)的數(shù)據(jù)量和訪問量急劇增加的情況下,我們需要對(duì)數(shù)據(jù)進(jìn)行水平拆分,從而降低單庫的壓力,并且數(shù)據(jù)的水平拆分需要對(duì)業(yè)務(wù)透明,屏蔽掉水平拆分的細(xì)節(jié)。并且,前端業(yè)務(wù)的高并發(fā)會(huì)導(dǎo)致后端的數(shù)據(jù)庫連接過多,從而DB...
摘要:淘寶定制基于,是國內(nèi)第一個(gè)優(yōu)化定制且開源的服務(wù)器版虛擬機(jī)。數(shù)據(jù)庫開源數(shù)據(jù)庫是基于官方版本的一個(gè)分支,由阿里云數(shù)據(jù)庫團(tuán)隊(duì)維護(hù),目前也應(yīng)用于阿里巴巴集團(tuán)業(yè)務(wù)以及阿里云數(shù)據(jù)庫服務(wù)。淘寶服務(wù)器是由淘寶網(wǎng)發(fā)起的服務(wù)器項(xiàng)目。 Java JAVA 研發(fā)框架 SOFAStack SOFAStack(Scalable Open Financial Architecture Stack)是用于快速構(gòu)建金融...
閱讀 1790·2021-11-24 09:39
閱讀 1711·2021-11-22 15:22
閱讀 1034·2021-09-27 13:36
閱讀 3332·2021-09-24 10:34
閱讀 3365·2021-07-26 23:38
閱讀 2656·2019-08-29 16:44
閱讀 996·2019-08-29 16:39
閱讀 1133·2019-08-29 16:20