摘要:年月日上午阿里云消息服,隊(duì)列消息發(fā)送以及消費(fèi)的并發(fā)測(cè)試解析配置文件二者等價(jià)線程數(shù)并發(fā)數(shù)程序入口準(zhǔn)備工作發(fā)送消息線程池一個(gè)計(jì)數(shù)信號(hào)量。但是,不使用實(shí)際的許可對(duì)象,只對(duì)可用許可的號(hào)碼進(jìn)行計(jì)數(shù),并采取相應(yīng)的行動(dòng)。
package com.study.mq.aliyunmns; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URL; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import org.apache.commons.lang3.SystemUtils; import org.apache.log4j.Logger; import org.apache.log4j.PropertyConfigurator; import com.aliyun.mns.client.CloudAccount; import com.aliyun.mns.client.CloudQueue; import com.aliyun.mns.client.MNSClient; import com.aliyun.mns.common.http.ClientConfiguration; import com.aliyun.mns.model.Message; /** * * @author wangkai * @2016年11月22日 上午11:27:14 * @desc:阿里云消息服(MNS),隊(duì)列消息發(fā)送以及消費(fèi)的并發(fā)測(cè)試 * https://www.aliyun.com/product/mns?spm=5176.8142029 * .388261.80.fNnCkg */ public class MnsQueueAppV2 { private static Logger LOG = Logger.getLogger(MnsQueueAppV2.class.getName()); private static MNSClient client = null; // private static AtomicLong totalCount = new AtomicLong(0); private static String endpoint = null; private static String accessId = null; private static String accessKey = null; private static String queueName = "articlepricinglog"; private static int threadNum = 100; private static int clientNum = 10000; // private static int totalSeconds = 180; private static String log4jConfPath = "./log4j.properties"; static { PropertyConfigurator.configureAndWatch(log4jConfPath); } /** * 解析配置文件 * * @return */ @SuppressWarnings("unused") protected static boolean parseConf() { // URL resource = // MnsQueueAppV2.class.getClassLoader().getResource("name.properties"); String confFilePath = SystemUtils.getUserDir() + SystemUtils.FILE_SEPARATOR + "src/main/resources/mns.properties"; URL resource = MnsQueueAppV2.class.getResource("/mns.properties"); URL resource2 = MnsQueueAppV2.class.getClassLoader().getResource( "mns.properties");// 二者等價(jià) BufferedInputStream bis = null; try { bis = new BufferedInputStream(new FileInputStream(confFilePath)); if (bis == null) { LOG.info("ConfFile not opened: " + confFilePath); return false; } } catch (FileNotFoundException e) { LOG.error("ConfFile not found: " + confFilePath, e); return false; } // load file Properties properties = new Properties(); try { properties.load(bis); } catch (IOException e) { LOG.error("Load ConfFile Failed: " + e.getMessage()); return false; } finally { try { bis.close(); } catch (Exception e) { // do nothing } } // init the member parameters endpoint = properties.getProperty("Endpoint"); LOG.info("Endpoint: " + endpoint); accessId = properties.getProperty("AccessId"); LOG.info("AccessId: " + accessId); accessKey = properties.getProperty("AccessKey"); queueName = properties.getProperty("QueueName", queueName); LOG.info("QueueName: " + queueName); threadNum = Integer.parseInt(properties.getProperty("ThreadNum", String.valueOf(threadNum))); LOG.info("ThreadNum: 線程數(shù)" + threadNum); clientNum = Integer.parseInt(properties.getProperty("ClientNum", String.valueOf(clientNum))); LOG.info("ClientNum: 并發(fā)數(shù)" + clientNum); // totalSeconds = // Integer.parseInt(properties.getProperty("TotalSeconds", // String.valueOf(totalSeconds))); // LOG.info("TotalSeconds: " + totalSeconds); return true; } /** * 程序入口 * * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { // 準(zhǔn)備工作 if (!parseConf()) { return; } ClientConfiguration clientConfiguration = new ClientConfiguration(); clientConfiguration.setMaxConnections(threadNum); clientConfiguration.setMaxConnectionsPerRoute(threadNum); CloudAccount cloudAccount = new CloudAccount(accessId, accessKey, endpoint, clientConfiguration); client = cloudAccount.getMNSClient(); LOG.info("發(fā)送消息"); // 線程池 ExecutorService exec = Executors.newFixedThreadPool(500); /** * Semaphore 一個(gè)計(jì)數(shù)信號(hào)量。從概念上講,信號(hào)量維護(hù)了一個(gè)許可集。如有必要,在許可可用前會(huì)阻塞每一個(gè) * acquire(),然后再獲取該許可。每個(gè) release() * 添加一個(gè)許可,從而可能釋放一個(gè)正在阻塞的獲取者。但是,不使用實(shí)際的許可對(duì)象,Semaphore 只對(duì)可用許可的號(hào)碼進(jìn)行計(jì)數(shù) * ,并采取相應(yīng)的行動(dòng)。拿到信號(hào)量的線程可以進(jìn)入代碼,否則就等待。通過(guò)acquire()和release()獲取和釋放訪問(wèn)許可。 */ final Semaphore semp = new Semaphore(threadNum);// ["sem?f??] final Semaphore semaphore = new Semaphore(10, true); // 拿到信號(hào)量的線程可以進(jìn)入代碼,否則就等待 // Semaphore(信號(hào)量)是用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù)量,它通過(guò)協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源 // 輔助理解 :很多年以來(lái),我都覺(jué)得從字面上很難理解Semaphore所表達(dá)的含義,只能把它比作是控制流量的紅綠燈, // 比如XX馬路要限制流量,只允許同時(shí)有一百輛車在這條路上行使,其他的都必須在路口等待,所以前一百輛車會(huì)看到綠燈, // 可以開(kāi)進(jìn)這條馬路,后面的車會(huì)看到紅燈,不能駛?cè)隭X馬路,但是如果前一百輛中有五輛車已經(jīng)離開(kāi)了XX馬路, // 那么后面就允許有5輛車駛?cè)腭R路,這個(gè)例子里說(shuō)的車就是線程,駛?cè)腭R路就表示線程在執(zhí)行,離開(kāi)馬路就表示線程執(zhí)行完成,看見(jiàn)紅燈就表示線程被阻塞,不能執(zhí)行。 long startTime = System.currentTimeMillis(); // 開(kāi)啟時(shí)間 /** * 原理: * 更進(jìn)一步,信號(hào)量的特性如下:信號(hào)量是一個(gè)非負(fù)整數(shù)(車位數(shù)),所有通過(guò)它的線程(車輛)都會(huì)將該整數(shù)減一(通過(guò)它當(dāng)然是為了使用資源), * 當(dāng)該整數(shù)值為零時(shí),所有試圖通過(guò)它的線程都將處于等待狀態(tài)。在信號(hào)量上我們定義兩種操作: Wait(等待) 和 Release(釋放)。 * 當(dāng)一個(gè)線程調(diào)用Wait * (等待)操作時(shí),它要么通過(guò)然后將信號(hào)量減一,要么一直等下去,直到信號(hào)量大于一或超時(shí)。Release(釋放)實(shí)際上是在信號(hào)量上執(zhí)行加操作 * ,對(duì)應(yīng)于車輛離開(kāi)停車場(chǎng),該操作之所以叫做“釋放”是因?yàn)榧硬僮鲗?shí)際上是釋放了由信號(hào)量守護(hù)的資源。 */ // 開(kāi)始 for (int index = 0; index < clientNum; index++) { // final int NO = index; Runnable task = new Runnable() { public void run() { try { semp.acquire();// 獲取許可 try { // 獲取queue CloudQueue queue = client.getQueueRef(queueName); // 組裝消息 Message message = new Message(); message.setMessageBody("Test"); // 發(fā)送消息 queue.putMessage(message); } catch (Exception e) { e.printStackTrace(); } semp.release();// 歸還許可 } catch (Exception e) { e.printStackTrace(); } } }; exec.submit(task); } long endTime = System.currentTimeMillis(); // 開(kāi)啟時(shí)間 exec.shutdown(); LOG.info(clientNum + " 的并發(fā)發(fā)送消息總耗時(shí):>>>" + (endTime - startTime) + " ms"); LOG.info(clientNum + " 的并發(fā)發(fā)送消息 QPS為:>>>" + (clientNum * 1000) / (endTime - startTime) + " q/s"); LOG.info("接收消息"); Thread.sleep(3000); ExecutorService exec2 = Executors.newFixedThreadPool(500); final Semaphore semp2 = new Semaphore(threadNum); long startTime2 = System.currentTimeMillis(); // 開(kāi)啟時(shí)間 for (int index = 0; index < clientNum; index++) { // final int NO = index; Runnable task = new Runnable() { public void run() { try { semp2.acquire(); try { // 獲取queue CloudQueue queue = client.getQueueRef(queueName); // 獲取消息 Message message = queue.popMessage(); // 刪掉消息 if (message != null) queue.deleteMessage(message.getReceiptHandle()); } catch (Exception e) { e.printStackTrace(); } semp2.release(); } catch (Exception e) { e.printStackTrace(); } } }; exec2.submit(task); } long endTime2 = System.currentTimeMillis(); // 開(kāi)啟時(shí)間 exec2.shutdown(); // 忽略線程切換的耗時(shí) 精確的做法? LOG.info(clientNum + " 的并發(fā)接收消息總耗時(shí):>>>" + (endTime2 - startTime2)
+ " ms"); LOG.info(clientNum + " 的并發(fā)接收消息 QPS為:>>>" + (clientNum * 1000) / (endTime2 - startTime2) + " q/s"); }
}
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/66396.html
摘要:多線程工具箱之前言這一篇談一下信號(hào)量。信息信息信息信息信息信息信息信息信息信息信息小結(jié)適用于多線程請(qǐng)求數(shù)量資源的場(chǎng)景,但無(wú)法解決單多個(gè)線程對(duì)同一資源訪問(wèn)的競(jìng)爭(zhēng)性訪問(wèn)。在后面我們?cè)谖覀兊亩嗑€程工具箱里面陸續(xù)會(huì)提到。 Java多線程工具箱之Semaphore 前言 這一篇談一下Semaphore:信號(hào)量。 將Semaphore類比為為信號(hào)燈,被繼承Runable的線程類比為列車:理解信號(hào)量...
摘要:在每個(gè)線程獲取之前,必須先從信號(hào)量獲取許可。注意,因?yàn)橥瑫r(shí)可能發(fā)生取消,所以返回并不保證有其他線程等待獲取許可。該值僅是估計(jì)的數(shù)字,因?yàn)樵诖朔椒ū闅v內(nèi)部數(shù)據(jù)結(jié)構(gòu)的同時(shí),線程的數(shù)目可能動(dòng)態(tài)地變化。 本人郵箱: 歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明網(wǎng)址 http://blog.csdn.net/tianshi_kcogithub: https://github.com/kco1989/kco代碼已經(jīng)全部托...
摘要:將屏障重置為其初始狀態(tài)。注意,在由于其他原因造成損壞之后,實(shí)行重置可能會(huì)變得很復(fù)雜此時(shí)需要使用其他方式重新同步線程,并選擇其中一個(gè)線程來(lái)執(zhí)行重置。 安全共享對(duì)象策略 1.線程限制 : 一個(gè)被線程限制的對(duì)象,由線程獨(dú)占,并且只能被占有它的線程修改2.共享只讀 : 一個(gè)共享只讀的對(duì)象,在沒(méi)有額外同步的情況下,可以被多個(gè)線程并發(fā)訪問(wèn),但是任何線程都不能修改它3.線程安全對(duì)象 : 一個(gè)線程安全...
摘要:前言之前學(xué)多線程的時(shí)候沒(méi)有學(xué)習(xí)線程的同步工具類輔助類。而其它線程完成自己的操作后,調(diào)用使計(jì)數(shù)器減。信號(hào)量控制一組線程同時(shí)執(zhí)行。 前言 之前學(xué)多線程的時(shí)候沒(méi)有學(xué)習(xí)線程的同步工具類(輔助類)。ps:當(dāng)時(shí)覺(jué)得暫時(shí)用不上,認(rèn)為是挺高深的知識(shí)點(diǎn)就沒(méi)去管了.. 在前幾天,朋友發(fā)了一篇比較好的Semaphore文章過(guò)來(lái),然后在瀏覽博客的時(shí)候又發(fā)現(xiàn)面試還會(huì)考,那還是挺重要的知識(shí)點(diǎn)。于是花了點(diǎn)時(shí)間去了解...
摘要:所以得出結(jié)論需要分配較多的線程進(jìn)行讀數(shù)據(jù),較少的線程進(jìn)行寫(xiě)數(shù)據(jù)。注意多線程編程對(duì)實(shí)際環(huán)境和需求有很大的依賴,需要根據(jù)實(shí)際的需求情況對(duì)各個(gè)參數(shù)做調(diào)整。 背景 最近對(duì)于 Java 多線程做了一段時(shí)間的學(xué)習(xí),筆者一直認(rèn)為,學(xué)習(xí)東西就是要應(yīng)用到實(shí)際的業(yè)務(wù)需求中的。否則要么無(wú)法深入理解,要么硬生生地套用技術(shù)只是達(dá)到炫技的效果。 不過(guò)筆者仍舊認(rèn)為自己對(duì)于多線程掌握不夠熟練,不敢輕易應(yīng)用到生產(chǎn)代碼中...
閱讀 1901·2021-11-22 09:34
閱讀 3039·2021-09-28 09:35
閱讀 13475·2021-09-09 11:34
閱讀 3603·2019-08-29 16:25
閱讀 2834·2019-08-29 15:23
閱讀 2049·2019-08-28 17:55
閱讀 2438·2019-08-26 17:04
閱讀 3053·2019-08-26 12:21