摘要:遠(yuǎn)程通訊目標(biāo)介紹的相關(guān)實(shí)現(xiàn)邏輯介紹中的包內(nèi)的源碼解析。源碼分析一處理對(duì)應(yīng)的命令命令該接口上命令處理器接口,是一個(gè)可擴(kuò)展接口。關(guān)閉通道五該類實(shí)現(xiàn)了接口,封裝了命令的實(shí)現(xiàn)。下一篇我會(huì)講解基于實(shí)現(xiàn)遠(yuǎn)程通信部分。
遠(yuǎn)程通訊——Telnet
目標(biāo):介紹telnet的相關(guān)實(shí)現(xiàn)邏輯、介紹dubbo-remoting-api中的telnet包內(nèi)的源碼解析。前言
從dubbo 2.0.5開始,dubbo開始支持通過 telnet 命令來進(jìn)行服務(wù)治理。本文就是講解一些公用的telnet命令的實(shí)現(xiàn)。下面來看一下telnet實(shí)現(xiàn)的類圖:
可以看到,實(shí)現(xiàn)了TelnetHandler接口的有六個(gè)類,除了TelnetHandlerAdapter是以外,其他五個(gè)分別對(duì)應(yīng)了clear、exit、help、log、status命令的實(shí)現(xiàn),具體用來干嘛,請(qǐng)看官方文檔的介紹。
源碼分析 (一)TelnetHandler@SPI public interface TelnetHandler { /** * telnet. * 處理對(duì)應(yīng)的telnet命令 * @param channel * @param message telnet命令 */ String telnet(Channel channel, String message) throws RemotingException; }
該接口上telnet命令處理器接口,是一個(gè)可擴(kuò)展接口。它定義了一個(gè)方法,就是處理相關(guān)的telnet命令。
(二)TelnetHandlerAdapter該類繼承了ChannelHandlerAdapter,實(shí)現(xiàn)了TelnetHandler接口,是TelnetHandler的適配器類,負(fù)責(zé)在接收到HeaderExchangeHandler發(fā)來的telnet命令后分發(fā)給對(duì)應(yīng)的TelnetHandler實(shí)現(xiàn)類去實(shí)現(xiàn),并且返回命令結(jié)果。
public class TelnetHandlerAdapter extends ChannelHandlerAdapter implements TelnetHandler { /** * 擴(kuò)展加載器 */ private final ExtensionLoaderextensionLoader = ExtensionLoader.getExtensionLoader(TelnetHandler.class); @Override public String telnet(Channel channel, String message) throws RemotingException { // 獲得提示鍵配置,用于nc獲取信息時(shí)不顯示提示符 String prompt = channel.getUrl().getParameterAndDecoded(Constants.PROMPT_KEY, Constants.DEFAULT_PROMPT); boolean noprompt = message.contains("--no-prompt"); message = message.replace("--no-prompt", ""); StringBuilder buf = new StringBuilder(); // 刪除頭尾空白符的字符串 message = message.trim(); String command; // 獲得命令 if (message.length() > 0) { int i = message.indexOf(" "); if (i > 0) { // 獲得命令 command = message.substring(0, i).trim(); // 獲得參數(shù) message = message.substring(i + 1).trim(); } else { command = message; message = ""; } } else { command = ""; } if (command.length() > 0) { // 如果有該命令的擴(kuò)展實(shí)現(xiàn)類 if (extensionLoader.hasExtension(command)) { try { // 執(zhí)行相應(yīng)命令的實(shí)現(xiàn)類的telnet String result = extensionLoader.getExtension(command).telnet(channel, message); if (result == null) { return null; } // 返回結(jié)果 buf.append(result); } catch (Throwable t) { buf.append(t.getMessage()); } } else { buf.append("Unsupported command: "); buf.append(command); } } if (buf.length() > 0) { buf.append(" "); } // 添加 telnet 提示語 if (prompt != null && prompt.length() > 0 && !noprompt) { buf.append(prompt); } return buf.toString(); } }
該類只實(shí)現(xiàn)了telnet方法,其中的邏輯還是比較清晰,就是根據(jù)對(duì)應(yīng)的命令去讓對(duì)應(yīng)的實(shí)現(xiàn)類產(chǎn)生命令結(jié)果。
(三)ClearTelnetHandler該類實(shí)現(xiàn)了TelnetHandler接口,封裝了clear命令的實(shí)現(xiàn)。
@Activate @Help(parameter = "[lines]", summary = "Clear screen.", detail = "Clear screen.") public class ClearTelnetHandler implements TelnetHandler { @Override public String telnet(Channel channel, String message) { // 清除屏幕上的內(nèi)容行數(shù) int lines = 100; if (message.length() > 0) { // 如果不是一個(gè)數(shù)字 if (!StringUtils.isInteger(message)) { return "Illegal lines " + message + ", must be integer."; } lines = Integer.parseInt(message); } StringBuilder buf = new StringBuilder(); // 一行一行清除 for (int i = 0; i < lines; i++) { buf.append(" "); } return buf.toString(); } }(四)ExitTelnetHandler
該類實(shí)現(xiàn)了TelnetHandler接口,封裝了exit命令的實(shí)現(xiàn)。
@Activate @Help(parameter = "", summary = "Exit the telnet.", detail = "Exit the telnet.") public class ExitTelnetHandler implements TelnetHandler { @Override public String telnet(Channel channel, String message) { // 關(guān)閉通道 channel.close(); return null; } }(五)HelpTelnetHandler
該類實(shí)現(xiàn)了TelnetHandler接口,封裝了help命令的實(shí)現(xiàn)。
@Activate @Help(parameter = "[command]", summary = "Show help.", detail = "Show help.") public class HelpTelnetHandler implements TelnetHandler { /** * 擴(kuò)展加載器 */ private final ExtensionLoaderextensionLoader = ExtensionLoader.getExtensionLoader(TelnetHandler.class); @Override public String telnet(Channel channel, String message) { // 如果需要查看某一個(gè)命令的幫助 if (message.length() > 0) { if (!extensionLoader.hasExtension(message)) { return "No such command " + message; } // 獲得對(duì)應(yīng)的擴(kuò)展實(shí)現(xiàn)類 TelnetHandler handler = extensionLoader.getExtension(message); Help help = handler.getClass().getAnnotation(Help.class); StringBuilder buf = new StringBuilder(); // 生成命令和幫助信息 buf.append("Command: "); buf.append(message + " " + help.parameter().replace(" ", " ").replace(" ", " ")); buf.append(" Summary: "); buf.append(help.summary().replace(" ", " ").replace(" ", " ")); buf.append(" Detail: "); buf.append(help.detail().replace(" ", " ").replace(" ", " ")); return buf.toString(); // 如果查看所有命令的幫助 } else { List > table = new ArrayList
>(); // 獲得所有命令的提示信息 List
handlers = extensionLoader.getActivateExtension(channel.getUrl(), "telnet"); if (handlers != null && !handlers.isEmpty()) { for (TelnetHandler handler : handlers) { Help help = handler.getClass().getAnnotation(Help.class); List row = new ArrayList (); String parameter = " " + extensionLoader.getExtensionName(handler) + " " + (help != null ? help.parameter().replace(" ", " ").replace(" ", " ") : ""); row.add(parameter.length() > 50 ? parameter.substring(0, 50) + "..." : parameter); String summary = help != null ? help.summary().replace(" ", " ").replace(" ", " ") : ""; row.add(summary.length() > 50 ? summary.substring(0, 50) + "..." : summary); table.add(row); } } return "Please input "help [command]" show detail. " + TelnetUtils.toList(table); } } }
help分為了需要查看某一個(gè)命令的幫助還是查看全部命令的幫助。
(六)LogTelnetHandler該類實(shí)現(xiàn)了TelnetHandler接口,封裝了log命令的實(shí)現(xiàn)。
@Activate @Help(parameter = "level", summary = "Change log level or show log ", detail = "Change log level or show log") public class LogTelnetHandler implements TelnetHandler { public static final String SERVICE_KEY = "telnet.log"; @Override public String telnet(Channel channel, String message) { long size = 0; File file = LoggerFactory.getFile(); StringBuffer buf = new StringBuffer(); if (message == null || message.trim().length() == 0) { buf.append("EXAMPLE: log error / log 100"); } else { String str[] = message.split(" "); if (!StringUtils.isInteger(str[0])) { // 設(shè)置日志級(jí)別 LoggerFactory.setLevel(Level.valueOf(message.toUpperCase())); } else { // 獲得日志長度 int SHOW_LOG_LENGTH = Integer.parseInt(str[0]); if (file != null && file.exists()) { try { FileInputStream fis = new FileInputStream(file); try { FileChannel filechannel = fis.getChannel(); try { size = filechannel.size(); ByteBuffer bb; if (size <= SHOW_LOG_LENGTH) { // 分配緩沖區(qū) bb = ByteBuffer.allocate((int) size); // 讀日志數(shù)據(jù) filechannel.read(bb, 0); } else { int pos = (int) (size - SHOW_LOG_LENGTH); // 分配緩沖區(qū) bb = ByteBuffer.allocate(SHOW_LOG_LENGTH); // 讀取日志數(shù)據(jù) filechannel.read(bb, pos); } bb.flip(); String content = new String(bb.array()).replace("<", "<") .replace(">", ">").replace(" ", "
"); buf.append(" content:" + content); buf.append(" modified:" + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") .format(new Date(file.lastModified())))); buf.append(" size:" + size + " "); } finally { filechannel.close(); } } finally { fis.close(); } } catch (Exception e) { buf.append(e.getMessage()); } } else { size = 0; buf.append(" MESSAGE: log file not exists or log appender is console ."); } } } buf.append(" CURRENT LOG LEVEL:" + LoggerFactory.getLevel()) .append(" CURRENT LOG APPENDER:" + (file == null ? "console" : file.getAbsolutePath())); return buf.toString(); } }
log命令實(shí)現(xiàn)原理就是從日志文件中把日志信息讀取出來。
(七)StatusTelnetHandler該類實(shí)現(xiàn)了TelnetHandler接口,封裝了status命令的實(shí)現(xiàn)。
@Activate @Help(parameter = "[-l]", summary = "Show status.", detail = "Show status.") public class StatusTelnetHandler implements TelnetHandler { private final ExtensionLoader(八)HelpextensionLoader = ExtensionLoader.getExtensionLoader(StatusChecker.class); @Override public String telnet(Channel channel, String message) { // 顯示狀態(tài)列表 if (message.equals("-l")) { List checkers = extensionLoader.getActivateExtension(channel.getUrl(), "status"); String[] header = new String[]{"resource", "status", "message"}; List > table = new ArrayList
>(); Map
statuses = new HashMap (); if (checkers != null && !checkers.isEmpty()) { // 遍歷各個(gè)資源的狀態(tài),如果一個(gè)當(dāng)全部 OK 時(shí)則顯示 OK,只要有一個(gè) ERROR 則顯示 ERROR,只要有一個(gè) WARN 則顯示 WARN for (StatusChecker checker : checkers) { String name = extensionLoader.getExtensionName(checker); Status stat; try { stat = checker.check(); } catch (Throwable t) { stat = new Status(Status.Level.ERROR, t.getMessage()); } statuses.put(name, stat); if (stat.getLevel() != null && stat.getLevel() != Status.Level.UNKNOWN) { List row = new ArrayList (); row.add(name); row.add(String.valueOf(stat.getLevel())); row.add(stat.getMessage() == null ? "" : stat.getMessage()); table.add(row); } } } Status stat = StatusUtils.getSummaryStatus(statuses); List row = new ArrayList (); row.add("summary"); row.add(String.valueOf(stat.getLevel())); row.add(stat.getMessage()); table.add(row); return TelnetUtils.toTable(header, table); } else if (message.length() > 0) { return "Unsupported parameter " + message + " for status."; } String status = channel.getUrl().getParameter("status"); Map statuses = new HashMap (); if (status != null && status.length() > 0) { String[] ss = Constants.COMMA_SPLIT_PATTERN.split(status); for (String s : ss) { StatusChecker handler = extensionLoader.getExtension(s); Status stat; try { stat = handler.check(); } catch (Throwable t) { stat = new Status(Status.Level.ERROR, t.getMessage()); } statuses.put(s, stat); } } Status stat = StatusUtils.getSummaryStatus(statuses); return String.valueOf(stat.getLevel()); } }
該接口是幫助文檔接口
@Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) public @interface Help { String parameter() default ""; String summary(); String detail() default ""; }
可以看上在每個(gè)命令的實(shí)現(xiàn)類上都加上了@Help注解,為了添加一些幫助文案。
(九)TelnetUtils該類是Telnet命令的工具類,其中邏輯我就不介紹了。
(十)TelnetCodec該類繼承了TransportCodec,是telnet的編解碼類。
1.屬性private static final Logger logger = LoggerFactory.getLogger(TelnetCodec.class); /** * 歷史命令列表 */ private static final String HISTORY_LIST_KEY = "telnet.history.list"; /** * 歷史命令位置,就是用上下鍵來找歷史命令 */ private static final String HISTORY_INDEX_KEY = "telnet.history.index"; /** * 向上鍵 */ private static final byte[] UP = new byte[]{27, 91, 65}; /** * 向下鍵 */ private static final byte[] DOWN = new byte[]{27, 91, 66}; /** * 回車 */ private static final List> ENTER = Arrays.asList(new Object[]{new byte[]{" ", " "} /* Windows Enter */, new byte[]{" "} /* Linux Enter */}); /** * 退出 */ private static final List> EXIT = Arrays.asList(new Object[]{new byte[]{3} /* Windows Ctrl+C */, new byte[]{-1, -12, -1, -3, 6} /* Linux Ctrl+C */, new byte[]{-1, -19, -1, -3, 6} /* Linux Pause */});2.getCharset
private static Charset getCharset(Channel channel) { if (channel != null) { // 獲得屬性設(shè)置 Object attribute = channel.getAttribute(Constants.CHARSET_KEY); // 返回指定字符集的charset對(duì)象。 if (attribute instanceof String) { try { return Charset.forName((String) attribute); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } else if (attribute instanceof Charset) { return (Charset) attribute; } URL url = channel.getUrl(); if (url != null) { String parameter = url.getParameter(Constants.CHARSET_KEY); if (parameter != null && parameter.length() > 0) { try { return Charset.forName(parameter); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } } // 默認(rèn)的編碼是utf-8 try { return Charset.forName(Constants.DEFAULT_CHARSET); } catch (Throwable t) { logger.warn(t.getMessage(), t); } return Charset.defaultCharset(); }
該方法是獲得通道的字符集,根據(jù)url中編碼來獲得字符集,默認(rèn)是utf-8。
3.encode@Override public void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException { // 如果需要編碼的是 telnet 命令結(jié)果 if (message instanceof String) { //如果為客戶端側(cè)的通道m(xù)essage直接返回 if (isClientSide(channel)) { message = message + " "; } // 獲得字節(jié)數(shù)組 byte[] msgData = ((String) message).getBytes(getCharset(channel).name()); // 寫入緩沖區(qū) buffer.writeBytes(msgData); } else { super.encode(channel, buffer, message); } }
該方法是編碼方法。
4.decode@Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { // 獲得緩沖區(qū)可讀的字節(jié) int readable = buffer.readableBytes(); byte[] message = new byte[readable]; // 從緩沖區(qū)讀數(shù)據(jù) buffer.readBytes(message); return decode(channel, buffer, readable, message); } @SuppressWarnings("unchecked") protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] message) throws IOException { // 如果是客戶端側(cè),直接返回結(jié)果 if (isClientSide(channel)) { return toString(message, getCharset(channel)); } // 檢驗(yàn)消息長度 checkPayload(channel, readable); if (message == null || message.length == 0) { return DecodeResult.NEED_MORE_INPUT; } // 如果回退 if (message[message.length - 1] == "") { // Windows backspace echo try { boolean doublechar = message.length >= 3 && message[message.length - 3] < 0; // double byte char channel.send(new String(doublechar ? new byte[]{32, 32, 8, 8} : new byte[]{32, 8}, getCharset(channel).name())); } catch (RemotingException e) { throw new IOException(StringUtils.toString(e)); } return DecodeResult.NEED_MORE_INPUT; } // 如果命令是退出 for (Object command : EXIT) { if (isEquals(message, (byte[]) command)) { if (logger.isInfoEnabled()) { logger.info(new Exception("Close channel " + channel + " on exit command: " + Arrays.toString((byte[]) command))); } // 關(guān)閉通道 channel.close(); return null; } } boolean up = endsWith(message, UP); boolean down = endsWith(message, DOWN); // 如果用上下鍵找歷史命令 if (up || down) { LinkedListhistory = (LinkedList ) channel.getAttribute(HISTORY_LIST_KEY); if (history == null || history.isEmpty()) { return DecodeResult.NEED_MORE_INPUT; } Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY); Integer old = index; if (index == null) { index = history.size() - 1; } else { // 向上 if (up) { index = index - 1; if (index < 0) { index = history.size() - 1; } } else { // 向下 index = index + 1; if (index > history.size() - 1) { index = 0; } } } // 獲得歷史命令,并發(fā)送給客戶端 if (old == null || !old.equals(index)) { // 設(shè)置當(dāng)前命令位置 channel.setAttribute(HISTORY_INDEX_KEY, index); // 獲得歷史命令 String value = history.get(index); // 清除客戶端原有命令,用查到的歷史命令替代 if (old != null && old >= 0 && old < history.size()) { String ov = history.get(old); StringBuilder buf = new StringBuilder(); for (int i = 0; i < ov.length(); i++) { buf.append(""); } for (int i = 0; i < ov.length(); i++) { buf.append(" "); } for (int i = 0; i < ov.length(); i++) { buf.append(""); } value = buf.toString() + value; } try { channel.send(value); } catch (RemotingException e) { throw new IOException(StringUtils.toString(e)); } } // 返回,需要更多指令 return DecodeResult.NEED_MORE_INPUT; } // 關(guān)閉命令 for (Object command : EXIT) { if (isEquals(message, (byte[]) command)) { if (logger.isInfoEnabled()) { logger.info(new Exception("Close channel " + channel + " on exit command " + command)); } channel.close(); return null; } } byte[] enter = null; // 如果命令是回車 for (Object command : ENTER) { if (endsWith(message, (byte[]) command)) { enter = (byte[]) command; break; } } if (enter == null) { return DecodeResult.NEED_MORE_INPUT; } LinkedList history = (LinkedList ) channel.getAttribute(HISTORY_LIST_KEY); Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY); // 移除歷史命令 channel.removeAttribute(HISTORY_INDEX_KEY); // 將歷史命令拼接 if (history != null && !history.isEmpty() && index != null && index >= 0 && index < history.size()) { String value = history.get(index); if (value != null) { byte[] b1 = value.getBytes(); byte[] b2 = new byte[b1.length + message.length]; System.arraycopy(b1, 0, b2, 0, b1.length); System.arraycopy(message, 0, b2, b1.length, message.length); message = b2; } } // 將命令字節(jié)數(shù)組,轉(zhuǎn)成具體的一條命令 String result = toString(message, getCharset(channel)); if (result.trim().length() > 0) { if (history == null) { history = new LinkedList (); channel.setAttribute(HISTORY_LIST_KEY, history); } if (history.isEmpty()) { history.addLast(result); } else if (!result.equals(history.getLast())) { history.remove(result); // 添加當(dāng)前命令到歷史尾部 history.addLast(result); // 超過上限,移除歷史的頭部 if (history.size() > 10) { history.removeFirst(); } } } return result; }
該方法是編碼。
后記該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了telnet的相關(guān)實(shí)現(xiàn)邏輯,本文有興趣的朋友可以看看。下一篇我會(huì)講解基于grizzly實(shí)現(xiàn)遠(yuǎn)程通信部分。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/72810.html
摘要:而存在的意義就是保證請(qǐng)求或響應(yīng)對(duì)象可在線程池中被解碼,解碼完成后,就會(huì)分發(fā)到的。 2.7大揭秘——服務(wù)端處理請(qǐng)求過程 目標(biāo):從源碼的角度分析服務(wù)端接收到請(qǐng)求后的一系列操作,最終把客戶端需要的值返回。 前言 上一篇講到了消費(fèi)端發(fā)送請(qǐng)求的過程,該篇就要將服務(wù)端處理請(qǐng)求的過程。也就是當(dāng)服務(wù)端收到請(qǐng)求數(shù)據(jù)包后的一系列處理以及如何返回最終結(jié)果。我們也知道消費(fèi)端在發(fā)送請(qǐng)求的時(shí)候已經(jīng)做了編碼,所以我...
摘要:而編碼器是講應(yīng)用程序的數(shù)據(jù)轉(zhuǎn)化為網(wǎng)絡(luò)格式,解碼器則是講網(wǎng)絡(luò)格式轉(zhuǎn)化為應(yīng)用程序,同時(shí)具備這兩種功能的單一組件就叫編解碼器。在中是老的編解碼器接口,而是新的編解碼器接口,并且已經(jīng)用把適配成了。 遠(yuǎn)程通訊——開篇 目標(biāo):介紹之后解讀遠(yuǎn)程通訊模塊的內(nèi)容如何編排、介紹dubbo-remoting-api中的包結(jié)構(gòu)設(shè)計(jì)以及最外層的的源碼解析。 前言 服務(wù)治理框架中可以大致分為服務(wù)通信和服務(wù)管理兩個(gè)...
摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請(qǐng)求過程 目標(biāo):從源碼的角度分析一個(gè)服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過程,引用服務(wù)無非就是創(chuàng)建出一個(gè)代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...
摘要:大揭秘異步化改造目標(biāo)從源碼的角度分析的新特性中對(duì)于異步化的改造原理??丛创a解析四十六消費(fèi)端發(fā)送請(qǐng)求過程講到的十四的,在以前的邏輯會(huì)直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠(yuǎn)程通信層的六。 2.7大揭秘——異步化改造 目標(biāo):從源碼的角度分析2.7的新特性中對(duì)于異步化的改造原理。 前言 dubbo中提供了很多類型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...
摘要:和斷開,處理措施不一樣,會(huì)分別做出重連和關(guān)閉通道的操作。取消定時(shí)器取消大量已排隊(duì)任務(wù),用于回收空間該方法是停止現(xiàn)有心跳,也就是停止定時(shí)器,釋放空間。做到異步處理返回結(jié)果時(shí)能給準(zhǔn)確的返回給對(duì)應(yīng)的請(qǐng)求。 遠(yuǎn)程通訊——Exchange層 目標(biāo):介紹Exchange層的相關(guān)設(shè)計(jì)和邏輯、介紹dubbo-remoting-api中的exchange包內(nèi)的源碼解析。 前言 上一篇文章我講的是dubb...
閱讀 3250·2021-11-15 11:37
閱讀 2464·2021-09-29 09:48
閱讀 3828·2021-09-22 15:55
閱讀 3025·2021-09-22 10:02
閱讀 2649·2021-08-25 09:40
閱讀 3240·2021-08-03 14:03
閱讀 1708·2019-08-29 13:11
閱讀 1581·2019-08-29 12:49