成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

java多線程信號(hào)量-semaphore

zzbo / 2986人閱讀

摘要:年月日上午阿里云消息服,隊(duì)列消息發(fā)送以及消費(fèi)的并發(fā)測(cè)試解析配置文件二者等價(jià)線程數(shù)并發(fā)數(shù)程序入口準(zhǔn)備工作發(fā)送消息線程池一個(gè)計(jì)數(shù)信號(hào)量。但是,不使用實(shí)際的許可對(duì)象,只對(duì)可用許可的號(hào)碼進(jìn)行計(jì)數(shù),并采取相應(yīng)的行動(dòng)。

package com.study.mq.aliyunmns;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

import org.apache.commons.lang3.SystemUtils;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.http.ClientConfiguration;
import com.aliyun.mns.model.Message;

/**
 * 
 * @author wangkai
 * @2016年11月22日 上午11:27:14
 * @desc:阿里云消息服(MNS),隊(duì)列消息發(fā)送以及消費(fèi)的并發(fā)測(cè)試 
 *                                   https://www.aliyun.com/product/mns?spm=5176.8142029
 *                                   .388261.80.fNnCkg
 */
public class MnsQueueAppV2 {

    private static Logger LOG = Logger.getLogger(MnsQueueAppV2.class.getName());

    private static MNSClient client = null;
    // private static AtomicLong totalCount = new AtomicLong(0);
    private static String endpoint = null;
    private static String accessId = null;
    private static String accessKey = null;
    private static String queueName = "articlepricinglog";
    private static int threadNum = 100;
    private static int clientNum = 10000;
    // private static int totalSeconds = 180;
    private static String log4jConfPath = "./log4j.properties";

    static {
        PropertyConfigurator.configureAndWatch(log4jConfPath);
    }

    /**
     * 解析配置文件
     * 
     * @return
     */
    @SuppressWarnings("unused")
    protected static boolean parseConf() {
        // URL resource =
        // MnsQueueAppV2.class.getClassLoader().getResource("name.properties");
        String confFilePath = SystemUtils.getUserDir()
                + SystemUtils.FILE_SEPARATOR
                + "src/main/resources/mns.properties";
        URL resource = MnsQueueAppV2.class.getResource("/mns.properties");
        URL resource2 = MnsQueueAppV2.class.getClassLoader().getResource(
                "mns.properties");// 二者等價(jià)
        BufferedInputStream bis = null;
        try {
            bis = new BufferedInputStream(new FileInputStream(confFilePath));
            if (bis == null) {
                LOG.info("ConfFile not opened: " + confFilePath);
                return false;
            }
        } catch (FileNotFoundException e) {
            LOG.error("ConfFile not found: " + confFilePath, e);
            return false;
        }
        // load file
        Properties properties = new Properties();
        try {
            properties.load(bis);
        } catch (IOException e) {
            LOG.error("Load ConfFile Failed: " + e.getMessage());
            return false;
        } finally {
            try {
                bis.close();
            } catch (Exception e) {
                // do nothing
            }
        }
        // init the member parameters
        endpoint = properties.getProperty("Endpoint");
        LOG.info("Endpoint: " + endpoint);
        accessId = properties.getProperty("AccessId");
        LOG.info("AccessId: " + accessId);
        accessKey = properties.getProperty("AccessKey");
        queueName = properties.getProperty("QueueName", queueName);
        LOG.info("QueueName: " + queueName);
        threadNum = Integer.parseInt(properties.getProperty("ThreadNum",
                String.valueOf(threadNum)));
        LOG.info("ThreadNum: 線程數(shù)" + threadNum);
        clientNum = Integer.parseInt(properties.getProperty("ClientNum",
                String.valueOf(clientNum)));
        LOG.info("ClientNum: 并發(fā)數(shù)" + clientNum);
        // totalSeconds =
        // Integer.parseInt(properties.getProperty("TotalSeconds",
        // String.valueOf(totalSeconds)));
        // LOG.info("TotalSeconds: " + totalSeconds);
        return true;
    }

    /**
     * 程序入口
     * 
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        // 準(zhǔn)備工作
        if (!parseConf()) {
            return;
        }

        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setMaxConnections(threadNum);
        clientConfiguration.setMaxConnectionsPerRoute(threadNum);
        CloudAccount cloudAccount = new CloudAccount(accessId, accessKey,
                endpoint, clientConfiguration);
        client = cloudAccount.getMNSClient();
        LOG.info("發(fā)送消息");
        // 線程池
        ExecutorService exec = Executors.newFixedThreadPool(500);
        /**
         * Semaphore 一個(gè)計(jì)數(shù)信號(hào)量。從概念上講,信號(hào)量維護(hù)了一個(gè)許可集。如有必要,在許可可用前會(huì)阻塞每一個(gè)
         * acquire(),然后再獲取該許可。每個(gè) release()
         * 添加一個(gè)許可,從而可能釋放一個(gè)正在阻塞的獲取者。但是,不使用實(shí)際的許可對(duì)象,Semaphore 只對(duì)可用許可的號(hào)碼進(jìn)行計(jì)數(shù)
         * ,并采取相應(yīng)的行動(dòng)。拿到信號(hào)量的線程可以進(jìn)入代碼,否則就等待。通過(guò)acquire()和release()獲取和釋放訪問(wèn)許可。
         */
        final Semaphore semp = new Semaphore(threadNum);// ["sem?f??]
        final Semaphore semaphore = new Semaphore(10, true);
                                                        // 拿到信號(hào)量的線程可以進(jìn)入代碼,否則就等待
        // Semaphore(信號(hào)量)是用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù)量,它通過(guò)協(xié)調(diào)各個(gè)線程,以保證合理的使用公共資源
        // 輔助理解 :很多年以來(lái),我都覺(jué)得從字面上很難理解Semaphore所表達(dá)的含義,只能把它比作是控制流量的紅綠燈,
        // 比如XX馬路要限制流量,只允許同時(shí)有一百輛車在這條路上行使,其他的都必須在路口等待,所以前一百輛車會(huì)看到綠燈,
        // 可以開(kāi)進(jìn)這條馬路,后面的車會(huì)看到紅燈,不能駛?cè)隭X馬路,但是如果前一百輛中有五輛車已經(jīng)離開(kāi)了XX馬路,
        // 那么后面就允許有5輛車駛?cè)腭R路,這個(gè)例子里說(shuō)的車就是線程,駛?cè)腭R路就表示線程在執(zhí)行,離開(kāi)馬路就表示線程執(zhí)行完成,看見(jiàn)紅燈就表示線程被阻塞,不能執(zhí)行。
        long startTime = System.currentTimeMillis(); // 開(kāi)啟時(shí)間
        /**
         * 原理:
         * 更進(jìn)一步,信號(hào)量的特性如下:信號(hào)量是一個(gè)非負(fù)整數(shù)(車位數(shù)),所有通過(guò)它的線程(車輛)都會(huì)將該整數(shù)減一(通過(guò)它當(dāng)然是為了使用資源),
         * 當(dāng)該整數(shù)值為零時(shí),所有試圖通過(guò)它的線程都將處于等待狀態(tài)。在信號(hào)量上我們定義兩種操作: Wait(等待) 和 Release(釋放)。
         * 當(dāng)一個(gè)線程調(diào)用Wait
         * (等待)操作時(shí),它要么通過(guò)然后將信號(hào)量減一,要么一直等下去,直到信號(hào)量大于一或超時(shí)。Release(釋放)實(shí)際上是在信號(hào)量上執(zhí)行加操作
         * ,對(duì)應(yīng)于車輛離開(kāi)停車場(chǎng),該操作之所以叫做“釋放”是因?yàn)榧硬僮鲗?shí)際上是釋放了由信號(hào)量守護(hù)的資源。
         */
        // 開(kāi)始
        for (int index = 0; index < clientNum; index++) {

            // final int NO = index;

            Runnable task = new Runnable() {
                public void run() {
                    try {
                        semp.acquire();// 獲取許可
                        try {
                            // 獲取queue
                            CloudQueue queue = client.getQueueRef(queueName);
                            // 組裝消息
                            Message message = new Message();
                            message.setMessageBody("Test");
                            // 發(fā)送消息
                            queue.putMessage(message);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        semp.release();// 歸還許可
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            exec.submit(task);
        }
        long endTime = System.currentTimeMillis(); // 開(kāi)啟時(shí)間
        exec.shutdown();
        LOG.info(clientNum + " 的并發(fā)發(fā)送消息總耗時(shí):>>>" + (endTime - startTime) + " ms");
        LOG.info(clientNum + " 的并發(fā)發(fā)送消息 QPS為:>>>" + (clientNum * 1000)
                / (endTime - startTime) + " q/s");
        LOG.info("接收消息");
        Thread.sleep(3000);
        ExecutorService exec2 = Executors.newFixedThreadPool(500);
        final Semaphore semp2 = new Semaphore(threadNum);
        long startTime2 = System.currentTimeMillis(); // 開(kāi)啟時(shí)間
        for (int index = 0; index < clientNum; index++) {
            // final int NO = index;
            Runnable task = new Runnable() {
                public void run() {
                    try {
                        semp2.acquire();
                        try {
                            // 獲取queue
                            CloudQueue queue = client.getQueueRef(queueName);
                            // 獲取消息
                            Message message = queue.popMessage();
                            // 刪掉消息
                            if (message != null)
                                queue.deleteMessage(message.getReceiptHandle());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        semp2.release();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            exec2.submit(task);
        }
        long endTime2 = System.currentTimeMillis(); // 開(kāi)啟時(shí)間
        exec2.shutdown();
        // 忽略線程切換的耗時(shí) 精確的做法?
        LOG.info(clientNum + " 的并發(fā)接收消息總耗時(shí):>>>" + (endTime2 - startTime2)
            + " ms");
    LOG.info(clientNum + " 的并發(fā)接收消息 QPS為:>>>" + (clientNum * 1000)
            / (endTime2 - startTime2) + " q/s");
}

}

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/66396.html

相關(guān)文章

  • Java線程工具箱之Semaphore

    摘要:多線程工具箱之前言這一篇談一下信號(hào)量。信息信息信息信息信息信息信息信息信息信息信息小結(jié)適用于多線程請(qǐng)求數(shù)量資源的場(chǎng)景,但無(wú)法解決單多個(gè)線程對(duì)同一資源訪問(wèn)的競(jìng)爭(zhēng)性訪問(wèn)。在后面我們?cè)谖覀兊亩嗑€程工具箱里面陸續(xù)會(huì)提到。 Java多線程工具箱之Semaphore 前言 這一篇談一下Semaphore:信號(hào)量。 將Semaphore類比為為信號(hào)燈,被繼承Runable的線程類比為列車:理解信號(hào)量...

    FleyX 評(píng)論0 收藏0
  • (八)java線程Semaphore

    摘要:在每個(gè)線程獲取之前,必須先從信號(hào)量獲取許可。注意,因?yàn)橥瑫r(shí)可能發(fā)生取消,所以返回并不保證有其他線程等待獲取許可。該值僅是估計(jì)的數(shù)字,因?yàn)樵诖朔椒ū闅v內(nèi)部數(shù)據(jù)結(jié)構(gòu)的同時(shí),線程的數(shù)目可能動(dòng)態(tài)地變化。 本人郵箱: 歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明網(wǎng)址 http://blog.csdn.net/tianshi_kcogithub: https://github.com/kco1989/kco代碼已經(jīng)全部托...

    DesGemini 評(píng)論0 收藏0
  • Java并發(fā)線程 - 并發(fā)工具類JUC

    摘要:將屏障重置為其初始狀態(tài)。注意,在由于其他原因造成損壞之后,實(shí)行重置可能會(huì)變得很復(fù)雜此時(shí)需要使用其他方式重新同步線程,并選擇其中一個(gè)線程來(lái)執(zhí)行重置。 安全共享對(duì)象策略 1.線程限制 : 一個(gè)被線程限制的對(duì)象,由線程獨(dú)占,并且只能被占有它的線程修改2.共享只讀 : 一個(gè)共享只讀的對(duì)象,在沒(méi)有額外同步的情況下,可以被多個(gè)線程并發(fā)訪問(wèn),但是任何線程都不能修改它3.線程安全對(duì)象 : 一個(gè)線程安全...

    wuyumin 評(píng)論0 收藏0
  • Java線程打輔助的三個(gè)小伙子

    摘要:前言之前學(xué)多線程的時(shí)候沒(méi)有學(xué)習(xí)線程的同步工具類輔助類。而其它線程完成自己的操作后,調(diào)用使計(jì)數(shù)器減。信號(hào)量控制一組線程同時(shí)執(zhí)行。 前言 之前學(xué)多線程的時(shí)候沒(méi)有學(xué)習(xí)線程的同步工具類(輔助類)。ps:當(dāng)時(shí)覺(jué)得暫時(shí)用不上,認(rèn)為是挺高深的知識(shí)點(diǎn)就沒(méi)去管了.. 在前幾天,朋友發(fā)了一篇比較好的Semaphore文章過(guò)來(lái),然后在瀏覽博客的時(shí)候又發(fā)現(xiàn)面試還會(huì)考,那還是挺重要的知識(shí)點(diǎn)。于是花了點(diǎn)時(shí)間去了解...

    pingink 評(píng)論0 收藏0
  • Java線程編程實(shí)戰(zhàn):模擬大量數(shù)據(jù)同步

    摘要:所以得出結(jié)論需要分配較多的線程進(jìn)行讀數(shù)據(jù),較少的線程進(jìn)行寫(xiě)數(shù)據(jù)。注意多線程編程對(duì)實(shí)際環(huán)境和需求有很大的依賴,需要根據(jù)實(shí)際的需求情況對(duì)各個(gè)參數(shù)做調(diào)整。 背景 最近對(duì)于 Java 多線程做了一段時(shí)間的學(xué)習(xí),筆者一直認(rèn)為,學(xué)習(xí)東西就是要應(yīng)用到實(shí)際的業(yè)務(wù)需求中的。否則要么無(wú)法深入理解,要么硬生生地套用技術(shù)只是達(dá)到炫技的效果。 不過(guò)筆者仍舊認(rèn)為自己對(duì)于多線程掌握不夠熟練,不敢輕易應(yīng)用到生產(chǎn)代碼中...

    elliott_hu 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<