摘要:整合到本文更加注重代碼實(shí)踐,對(duì)于配置相關(guān)的知識(shí)會(huì)一筆帶過(guò),不做過(guò)多的詳解。筆者是上傳到私服,然后通過(guò)導(dǎo)入。接口是預(yù)留給開(kāi)發(fā)者根據(jù)不同事件處理業(yè)務(wù)邏輯的接口。改造筆記二優(yōu)化邏輯
Moquette簡(jiǎn)介
Mqtt作為物聯(lián)網(wǎng)比較流行的協(xié)議現(xiàn)在已經(jīng)被大范圍使用,其中也有很多開(kāi)源的MQTT BROKEN。Moquette是用java基于netty實(shí)現(xiàn)的輕量級(jí)的MQTT BROKEN. Moquette基于Netty實(shí)現(xiàn),性能問(wèn)題至少前期可以不用考慮,在使用過(guò)程中還算穩(wěn)定,沒(méi)有出現(xiàn)過(guò)較大的問(wèn)題。github地址:https://github.com/andsel/moq...。
整合到SpringBoot本文更加注重代碼實(shí)踐,對(duì)于配置相關(guān)的知識(shí)會(huì)一筆帶過(guò),不做過(guò)多的詳解。
假設(shè)已經(jīng)搭建好SpringBoot環(huán)境,下載完Moquette。至于怎么引用Moquette,可以在原項(xiàng)目上修改,也可以達(dá)成Jar包添加到lib調(diào)用,也可以上傳到Maven私服后通過(guò)配置pom引用。筆者是上傳到Maven私服,然后通過(guò)maven導(dǎo)入。
自定義包裝類,實(shí)現(xiàn)io.moquette.server.Server的自動(dòng)注入
@Slf4j @Service public class MoquetteServer { @Value("${mqtt-server.config-path}") private String configFilePath; @Autowired private IAuthorizator authorizator; /** * Safety相關(guān)的攔截器,如果有其它業(yè)務(wù),可以再去實(shí)現(xiàn)一個(gè)攔截器處理其它業(yè)務(wù) */ @Autowired @Qualifier("safetyInterceptHandler") private InterceptHandler safetyinterceptHandler; private Server mqttServer; public void startServer() throws IOException { IResourceLoader configFileResourceLoader = new ClasspathResourceLoader(configFilePath); final IConfig config = new ResourceLoaderConfig(configFileResourceLoader); mqttServer = new Server(); /**添加處理Safety相關(guān)的攔截器,如果有其它業(yè)務(wù),可以再去實(shí)現(xiàn)一個(gè)攔截器處理其它業(yè)務(wù),然后也添加上即可*/ ListinterceptHandlers = Arrays.asList(safetyinterceptHandler); /** * Authenticator 不顯示設(shè)置,Server會(huì)默認(rèn)以password_file創(chuàng)建一個(gè)ResourceAuthenticator * 如果需要更靈活的連接驗(yàn)證方案,可以繼承IAuthenticator接口,自定義實(shí)現(xiàn) */ mqttServer.startServer(config, interceptHandlers, null, null, authorizator); } public void stop() { mqttServer.stopServer(); } }
解釋:
(1)添加@Service
(2)configFilePath是Moquette需要的moquette.conf的配置文件路徑,筆者將配置文件放到了resouces目錄下,在application.xml配置的路徑,因此通過(guò)@Value自動(dòng)注入路徑值。
(3)因?yàn)槭菍⑴渲梦募旁诹藃esources目錄下,所以用Moquette提供的ClasspathResourceLoader加載的配置文件
(4)IAuthorizator 是權(quán)限驗(yàn)證接口
(5)InterceptHandler是Moquette訂閱、發(fā)布、建立連接等相關(guān)事件的攔截回調(diào)業(yè)務(wù)處理邏輯接口
2.實(shí)現(xiàn)IAuthorizator接口
@Component public class PermitAllAuthorizator implements IAuthorizator { @Override public boolean canWrite(Topic topic, String user, String client) { /**可以控制某個(gè)用戶的client,是否具有發(fā)布某個(gè)主題的權(quán)限,目前默認(rèn)任何Client可以發(fā)布任主題*/ return true; } @Override public boolean canRead(Topic topic, String user, String client) { /**可以控制某個(gè)用戶的client,是否具有接收某個(gè)主題的權(quán)限,目前默認(rèn)任何Client可以接收任何主題*/ return true; } }
解釋:實(shí)現(xiàn)另一個(gè)很簡(jiǎn)單的授權(quán)接口,允許任何用戶所有的讀寫請(qǐng)求
3.實(shí)現(xiàn)InterceptHandler接口
@Slf4j @Component public class SafetyInterceptHandler extends AbstractInterceptHandler{ @Override public String getID() { return SafetyInterceptHandler.class.getName(); } @Override public void onConnect(InterceptConnectMessage msg) { } @Override public void onConnectionLost(InterceptConnectionLostMessage msg) { } @Override public void onPublish(InterceptPublishMessage msg) { } @Override public void onMessageAcknowledged(InterceptAcknowledgedMessage msg) { } }
解釋:
1.簡(jiǎn)單實(shí)現(xiàn)InterceptHandler,繼承自AbstractInterceptHandler,并重寫了部分方法??梢愿鶕?jù)業(yè)務(wù)需要實(shí)現(xiàn)不同的方法。InterceptHandler接口是Moquette預(yù)留給開(kāi)發(fā)者根據(jù)不同事件處理業(yè)務(wù)邏輯的接口。
4.通過(guò)SpringBoot啟動(dòng)Moquette
@SpringBootApplication public class MqttServiceApplication { public static void main(String[] args) throws IOException { SpringApplication application = new SpringApplication(MqttServiceApplication.class); final ApplicationContext context = application.run(args); MoquetteServer server = context.getBean(MoquetteServer.class); server.startServer(); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { server.stop(); log.info("Moquette Server stopped"); } }); } }
如果發(fā)現(xiàn)MoquetteServer無(wú)法啟動(dòng),是否是SpringBoot默認(rèn)的包掃描機(jī)制的問(wèn)題,可以通過(guò)@ComponentScan解決。
通過(guò)以上操作,就可以在任何想要使用MoquetteServer的地方,通過(guò)@Autowired自動(dòng)注入。
當(dāng)然在MoquetteServer中筆者只是簡(jiǎn)單實(shí)現(xiàn)了封裝,并沒(méi)有實(shí)現(xiàn)其它方法,讀者完全可以根據(jù)自己的需要在MoquetteServer中實(shí)現(xiàn)自己需要的功能,但前提是你要對(duì)Moquette的源碼熟悉。
moquette改造筆記(二):優(yōu)化BrokerInterceptor notifyTopicPublished()邏輯
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/77205.html
摘要:修改的實(shí)現(xiàn),實(shí)現(xiàn)接口在改造筆記一整合到中修改實(shí)現(xiàn),添加對(duì)的實(shí)現(xiàn)如下負(fù)載過(guò)大,處理不過(guò)來(lái)時(shí),會(huì)回調(diào)該方法例如可以發(fā)生郵件通知相關(guān)人員改造筆記四解決中的調(diào)用兩次 發(fā)現(xiàn)問(wèn)題 在io.moquette.spi.impl.BrokerInterceptor的構(gòu)造函數(shù)中,新建了一個(gè)線程池,代碼如下: private BrokerInterceptor(int poolSize, List hand...
摘要:發(fā)現(xiàn)問(wèn)題在使用中設(shè)備異常斷開(kāi)中的。在中事件都是在鏈中依次傳遞的。事件最后傳遞到。解決方法添加會(huì)導(dǎo)致調(diào)用兩次解釋會(huì)在該從鏈中移除掉時(shí)被調(diào)用,一般的話沒(méi)有手動(dòng)從鏈中刪除時(shí),會(huì)在連接斷開(kāi)后回調(diào)該方法。 發(fā)現(xiàn)問(wèn)題 在使用中設(shè)備異常斷開(kāi),InterceptHandler中的onConnectionLost()。經(jīng)過(guò)調(diào)試發(fā)現(xiàn)是MoquetteIdleTimeoutHandler中的代碼導(dǎo)致的,代碼...
摘要:優(yōu)化邏輯優(yōu)化方向向啟動(dòng)方法一樣,每次調(diào)用的方法都是在線程池中新建一個(gè)任務(wù)具體代碼解釋新建一個(gè)用來(lái)實(shí)現(xiàn)調(diào)用方法。改造筆記三優(yōu)化中的線程池 發(fā)現(xiàn)問(wèn)題 下面部分是io.moquette.spi.impl.BrokerInterceptor.java部分源碼 @Override public void notifyClientConnected(final MqttConnectMes...
摘要:發(fā)現(xiàn)問(wèn)題在使用中發(fā)現(xiàn)在設(shè)備頻繁上下線和兩個(gè)設(shè)備一樣相互頂替連接的情況下,的和的方法調(diào)用沒(méi)有先后順序,如果在這兩個(gè)方法里面來(lái)記錄設(shè)備上下線狀態(tài),會(huì)造成狀態(tài)不對(duì)。因?yàn)橄嗷ロ斕娴那闆r并不多見(jiàn),因此兩個(gè)也可以接受,在性能上并不會(huì)造成多大影響。 發(fā)現(xiàn)問(wèn)題 在moquette使用中發(fā)現(xiàn)在設(shè)備頻繁上下線和兩個(gè)設(shè)備ClientId一樣相互頂替連接的情況下,InterceptHandler的onConn...
摘要:沒(méi)有顯式的通常調(diào)用需要指定域名才能定位到某個(gè)服務(wù)器上的某個(gè)具體應(yīng)用,而通過(guò)的注冊(cè)中心,在調(diào)用時(shí)只需指定服務(wù)名稱,注冊(cè)中心會(huì)自動(dòng)發(fā)現(xiàn)對(duì)應(yīng)的具體服務(wù)。 起因 公司要做系統(tǒng)間的互通,所以需要程序之間互相調(diào)用接口,這塊一直是其他同事在做,但是今天一個(gè)新項(xiàng)目需要調(diào)用到其他系統(tǒng)的接口,所以看了下他們的調(diào)用方法,發(fā)現(xiàn)都是傳統(tǒng)的httpclient調(diào)用,外面做了一層封裝,類似這樣: HttpGet h...
閱讀 1289·2021-11-19 09:40
閱讀 3153·2021-11-02 14:47
閱讀 3188·2021-10-11 10:58
閱讀 3247·2019-08-30 15:54
閱讀 2711·2019-08-30 12:50
閱讀 1755·2019-08-29 16:54
閱讀 489·2019-08-29 15:38
閱讀 1258·2019-08-29 15:19