摘要:的使用方法就不多介紹了,本文的主要內(nèi)容是剖析的源代碼。而又有一個私有的靜態(tài)變量,以及獲取這個私有靜態(tài)變量的靜態(tài)方法,顯然,這是一個單例設(shè)計模式,使程序運行的時候全局只有一個對象。
簡介
當(dāng)業(yè)務(wù)的數(shù)據(jù)量和訪問量急劇增加的情況下,我們需要對數(shù)據(jù)進行水平拆分,從而降低單庫的壓力,并且數(shù)據(jù)的水平拆分需要對業(yè)務(wù)透明,屏蔽掉水平拆分的細節(jié)。并且,前端業(yè)務(wù)的高并發(fā)會導(dǎo)致后端的數(shù)據(jù)庫連接過多,從而DB的性能低下。
Cobar就是解決這些問題的一款分庫分表中間件,Cobar以proxy的形式位于前端應(yīng)用和后端數(shù)據(jù)庫之間,Cobar對前端暴露的接口是MySQL通信協(xié)議,其將前端傳輸過來的SQL語句按照sharding規(guī)則路由到后端的數(shù)據(jù)庫實例上,再合并多個實例返回的結(jié)果,從而模擬單庫下的數(shù)據(jù)庫行為。
Cobar的使用方法就不多介紹了,本文的主要內(nèi)容是剖析Cobar的源代碼。
Cobar的前端連接模型結(jié)構(gòu)圖如下:
我們先來看CobarServer的代碼:
private CobarServer() { this.config = new CobarConfig(); SystemConfig system = config.getSystem(); MySQLLexer.setCStyleCommentVersion(system.getParserCommentVersion()); this.timer = new Timer(NAME + "Timer", true); this.initExecutor = ExecutorUtil.create("InitExecutor", system.getInitExecutor()); this.timerExecutor = ExecutorUtil.create("TimerExecutor", system.getTimerExecutor()); this.managerExecutor = ExecutorUtil.create("ManagerExecutor", system.getManagerExecutor()); this.sqlRecorder = new SQLRecorder(system.getSqlRecordCount()); this.isOnline = new AtomicBoolean(true); this.startupTime = TimeUtil.currentTimeMillis(); }
上面是CobarServer的構(gòu)造函數(shù),它的限定是private的。
private static final CobarServer INSTANCE = new CobarServer(); public static final CobarServer getInstance() { return INSTANCE; }
而CobarServer又有一個私有的靜態(tài)變量INSTANCE,以及獲取這個私有靜態(tài)變量的靜態(tài)方法,顯然,這是一個單例設(shè)計模式,使程序運行的時候全局只有一個CobarServer對象。
我們再來看CobarServer的startup()方法,此方法中構(gòu)造了一個NIOAcceptor(綁定服務(wù)器端口,接受客戶端的連接),
server = new NIOAcceptor(NAME + "Server", system.getServerPort(), sf);
構(gòu)造了一個接收前端連接的非阻塞Acceptor,讓我們在來看NIOAcceptor類的代碼。
public final class NIOAcceptor extends Thread { private static final Logger LOGGER = Logger.getLogger(NIOAcceptor.class); private static final AcceptIdGenerator ID_GENERATOR = new AcceptIdGenerator(); private final int port; private final Selector selector; private final ServerSocketChannel serverChannel; private final FrontendConnectionFactory factory; private NIOProcessor[] processors; private int nextProcessor; private long acceptCount; public NIOAcceptor(String name, int port, FrontendConnectionFactory factory) throws IOException { super.setName(name); this.port = port; this.selector = Selector.open(); # 生成選擇器 this.serverChannel = ServerSocketChannel.open(); this.serverChannel.socket().bind(new InetSocketAddress(port)); # 綁定服務(wù)器端口 this.serverChannel.configureBlocking(false); # 設(shè)置非阻塞模式 this.serverChannel.register(selector, SelectionKey.OP_ACCEPT); # 監(jiān)聽ACCEPT事件, this.factory = factory; # 設(shè)置前端連接的工廠 } }
以上的代碼都是NIO編程中很常見的操作。下面我們看run()方法,
@Override public void run() { final Selector selector = this.selector; for (;;) { ++acceptCount; try { selector.select(1000L); # select操作是阻塞的,若沒有監(jiān)聽到相應(yīng)的事件,則一直阻塞,直到超過1000毫秒,則返回 Setkeys = selector.selectedKeys(); try { for (SelectionKey key : keys) { if (key.isValid() && key.isAcceptable()) { accept(); # 接受連接,這個方法很關(guān)鍵 } else { key.cancel(); } } } finally { keys.clear(); } } catch (Throwable e) { LOGGER.warn(getName(), e); } } }
以上的run方法也是常見的NIO中監(jiān)聽事件的套路,其中accept()方法是定義的私有函數(shù),accept方法是為了將channel與selector綁定,代碼如下,
private void accept() { SocketChannel channel = null; try { channel = serverChannel.accept(); # 為新的連接分配socket channel.configureBlocking(false); # 設(shè)置為非阻塞模式 # factory將channel進行封裝,進行相應(yīng)的設(shè)置,返回一個FrontendConnection,connection本質(zhì)上就是一個封裝好的channel FrontendConnection c = factory.make(channel); c.setAccepted(true); c.setId(ID_GENERATOR.getId()); # 為連接設(shè)置ID NIOProcessor processor = nextProcessor(); # 為連接分配processor,NIOAcceptor中包含了一個NIOProcessor數(shù)組,分配的策略即根據(jù)下標(biāo)不斷后移,到達數(shù)組末尾后又從數(shù)組的起始位置開始分配 c.setProcessor(processor); # 回調(diào)NIOProcessor的postRegister方法,而processor的postRegister調(diào)用的是NIOReactor類的postRegister方法 processor.postRegister(c); } catch (Throwable e) { closeChannel(channel); LOGGER.warn(getName(), e); } }
讓我來看NIOProcessor的postRegister方法,
public void postRegister(NIOConnection c) { reactor.postRegister(c); }
NIOProcessor類中定義了一個NIOReactor類的成員變量reactor,而postRegister調(diào)用的是NIOReactor的postRegister方法。下面讓我們來看NIOReactor的postRegister代碼,
final void postRegister(NIOConnection c) { # 只是先將前端連接插入R線程的阻塞隊列中,并沒有立刻將channel與selector進行綁定 reactorR.registerQueue.offer(c); # 喚醒R線程的selector,若之前的select操作沒有返回的話則立即返回 reactorR.selector.wakeup(); }
既然channel與selector沒有立刻進行綁定,那它們是什么時候綁定的呢?我們來看NIOReactor中內(nèi)部類R的run()方法,
@Override public void run() { final Selector selector = this.selector; for (;;) { ++reactCount; try { selector.select(1000L); # 將connection與selector進行綁定 register(selector); Setkeys = selector.selectedKeys(); try { for (SelectionKey key : keys) { Object att = key.attachment(); if (att != null && key.isValid()) { int readyOps = key.readyOps(); if ((readyOps & SelectionKey.OP_READ) != 0) { read((NIOConnection) att); } else if ((readyOps & SelectionKey.OP_WRITE) != 0) { write((NIOConnection) att); } else { key.cancel(); } } else { key.cancel(); } } } finally { keys.clear(); } } catch (Throwable e) { LOGGER.warn(name, e); } } }
在run方法中,當(dāng)select方法返回的時候,就會進行channel和selector的綁定,因為當(dāng)connection插入到阻塞隊列中的時候,會對selector進行wakeup(),即select(1000L)方法會立即返回,所以不必擔(dān)心channel會卡一秒鐘才會和selector進行綁定。
我們再來看R線程的register方法,
private void register(Selector selector) { NIOConnection c = null; # 將R線程阻塞隊列中的所有連接都輪詢?nèi)〕?,與selector進行綁定 while ((c = registerQueue.poll()) != null) { try { c.register(selector); } catch (Throwable e) { c.error(ErrorCode.ERR_REGISTER, e); } } }總結(jié)
關(guān)于NIOAcceptor為何先將connection放入Reactor的阻塞隊列,而不是直接綁定。筆者的觀點是,如果由NIOAcceptor負責(zé)綁定則會造成鎖競爭,selector的register方法會爭用鎖,會導(dǎo)致NIOAcceptor線程和R、W線程競爭selector的鎖,若acceptor中處理綁定connection的邏輯,則NIOAcceptor就不能快速地處理大量的連接,整個系統(tǒng)的吞吐就會降低。所以Cobar中的設(shè)計是將connection的綁定放到R線程的阻塞隊列中去,讓R線程來完成connection的綁定工作。
圖就隨意看看吧-.-,有點丑。
以上。
原文鏈接https://segmentfault.com/a/11...
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/70404.html
摘要:如果數(shù)據(jù)庫檢測到是連續(xù)的,則表明沒有串包,如果不連續(xù),則表示串包,數(shù)據(jù)庫會直接丟棄該連接。源碼分析上一節(jié)我們分析到,當(dāng)一個前端連接過來,并不是直接和綁定,而是先插入到線程的注冊隊列中這樣能釋放的壓力處理更多前端連接。 報文格式 這一節(jié)我們來講Cobar Handshake的過程。 MySQL服務(wù)端和客戶端交互的所有的包格式都是統(tǒng)一的,報文格式如下圖: showImg(https://s...
摘要:淘寶定制基于,是國內(nèi)第一個優(yōu)化定制且開源的服務(wù)器版虛擬機。數(shù)據(jù)庫開源數(shù)據(jù)庫是基于官方版本的一個分支,由阿里云數(shù)據(jù)庫團隊維護,目前也應(yīng)用于阿里巴巴集團業(yè)務(wù)以及阿里云數(shù)據(jù)庫服務(wù)。淘寶服務(wù)器是由淘寶網(wǎng)發(fā)起的服務(wù)器項目。 Java JAVA 研發(fā)框架 SOFAStack SOFAStack(Scalable Open Financial Architecture Stack)是用于快速構(gòu)建金融...
閱讀 3352·2021-11-22 15:22
閱讀 2877·2021-10-12 10:12
閱讀 2171·2021-08-21 14:10
閱讀 3838·2021-08-19 11:13
閱讀 2856·2019-08-30 15:43
閱讀 3238·2019-08-29 16:52
閱讀 456·2019-08-29 16:41
閱讀 1444·2019-08-29 12:53