成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

徒手?jǐn)]框架--高并發(fā)環(huán)境下的請求合并

劉東 / 584人閱讀

摘要:我們就可以將這些請求合并,達(dá)到一定數(shù)量我們統(tǒng)一提交。總結(jié)一個(gè)比較生動(dòng)的例子給大家講解了一些多線程的具體運(yùn)用。學(xué)習(xí)多線程應(yīng)該多思考多動(dòng)手,才會(huì)有比較好的效果。地址徒手?jǐn)]框架系列文章地址徒手?jǐn)]框架實(shí)現(xiàn)徒手?jǐn)]框架實(shí)現(xiàn)

原文地址:https://www.xilidou.com/2018/01/22/merge-request/

在高并發(fā)系統(tǒng)中,我們經(jīng)常遇到這樣的需求:系統(tǒng)產(chǎn)生大量的請求,但是這些請求實(shí)時(shí)性要求不高。我們就可以將這些請求合并,達(dá)到一定數(shù)量我們統(tǒng)一提交。最大化的利用系統(tǒng)性IO,提升系統(tǒng)的吞吐性能。

所以請求合并框架需要考慮以下兩個(gè)需求:

當(dāng)請求收集到一定數(shù)量時(shí)提交數(shù)據(jù)

一段時(shí)間后如果請求沒有達(dá)到指定的數(shù)量也進(jìn)行提交

我們就聊聊一如何實(shí)現(xiàn)這樣一個(gè)需求。

閱讀這篇文章你將會(huì)了解到:

ScheduledThreadPoolExecutor

阻塞隊(duì)列

線程安全的參數(shù)

LockSuppor的使用

設(shè)計(jì)思路和實(shí)現(xiàn)

我們就聊一聊實(shí)現(xiàn)這個(gè)東西的具體思路是什么。希望大家能夠?qū)W習(xí)到分析問題,設(shè)計(jì)模塊的一些套路。

底層使用什么數(shù)據(jù)結(jié)構(gòu)來持有需要合并的請求?

既然我們的系統(tǒng)是在高并發(fā)的環(huán)境下使用,那我們肯定不能使用,普通的ArrayList來持有。我們可以使用阻塞隊(duì)列來持有需要合并的請求。

我們的數(shù)據(jù)結(jié)構(gòu)需要提供一個(gè) add() 的方法給外部,用于提交數(shù)據(jù)。當(dāng)外部add數(shù)據(jù)以后,需要檢查隊(duì)列里面的數(shù)據(jù)的個(gè)數(shù)是否達(dá)到我們限額?達(dá)到數(shù)量提交數(shù)據(jù),不達(dá)到繼續(xù)等待。

數(shù)據(jù)結(jié)構(gòu)還需要提供一個(gè)timeOut()的方法,外部有一個(gè)計(jì)時(shí)器定時(shí)調(diào)用這個(gè)timeOut方法,如果方法被調(diào)用,則直接向遠(yuǎn)程提交數(shù)據(jù)。

條件滿足的時(shí)候線程執(zhí)行提交動(dòng)作,條件不滿足的時(shí)候線程應(yīng)當(dāng)暫停,等待隊(duì)列達(dá)到提交數(shù)據(jù)的條件。所以我們可以考慮使用 LockSuppor.park()LockSuppor.unpark 來暫停和激活操作線程。

經(jīng)過上面的分析,我們就有了這樣一個(gè)數(shù)據(jù)結(jié)構(gòu):

private static class FlushThread implements Runnable{

        private final String name;

        //隊(duì)列大小
        private final int bufferSize;
        //操作間隔
        private int flushInterval;

        //上一次提交的時(shí)間。
        private volatile long lastFlushTime;
        private volatile Thread writer;

        //持有數(shù)據(jù)的阻塞隊(duì)列
        private final BlockingQueue queue;

        //達(dá)成條件后具體執(zhí)行的方法
        private final Processor processor;

        //構(gòu)造函數(shù)
        public FlushThread(String name, int bufferSize, int flushInterval,int queueSize,Processor processor) {
            this.name = name;
            this.bufferSize = bufferSize;
            this.flushInterval = flushInterval;
            this.lastFlushTime = System.currentTimeMillis();
            this.processor = processor;

            this.queue = new ArrayBlockingQueue<>(queueSize);

        }

        //外部提交數(shù)據(jù)的方法
        public boolean add(Item item){
            boolean result = queue.offer(item);
            flushOnDemand();
            return result;
        }

        //提供給外部的超時(shí)方法
        public void timeOut(){
            //超過兩次提交超過時(shí)間間隔
            if(System.currentTimeMillis() - lastFlushTime >= flushInterval){
                start();
            }
        }
        
        //解除線程的阻塞
        private void start(){
            LockSupport.unpark(writer);
        }

        //當(dāng)前的數(shù)據(jù)是否大于提交的條件
        private void flushOnDemand(){
            if(queue.size() >= bufferSize){
                start();
            }
        }

        //執(zhí)行提交數(shù)據(jù)的方法
        public void flush(){
            lastFlushTime = System.currentTimeMillis();
            List temp = new ArrayList<>(bufferSize);
            int size = queue.drainTo(temp,bufferSize);
            if(size > 0){
                try {
                    processor.process(temp);
                }catch (Throwable e){
                    log.error("process error",e);
                }
            }
        }

        //根據(jù)數(shù)據(jù)的尺寸和時(shí)間間隔判斷是否提交
        private boolean canFlush(){
            return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval;
        }

        @Override
        public void run() {
            writer = Thread.currentThread();
            writer.setName(name);

            while (!writer.isInterrupted()){
                while (!canFlush()){
                    //如果線程沒有被打斷,且不達(dá)到執(zhí)行的條件,則阻塞線程
                    LockSupport.park(this);
                }
                flush();
            }

        }

    }

如何實(shí)現(xiàn)定時(shí)提交呢?

通常我們遇到定時(shí)相關(guān)的需求,首先想到的應(yīng)該是使用 ScheduledThreadPoolExecutor定時(shí)來調(diào)用FlushThread 的 timeOut 方法,如果你想到的是 Thread.sleep()...那需要再努力學(xué)習(xí),多看源碼了。

怎樣進(jìn)一步的提升系統(tǒng)的吞吐量?

我們使用的FlushThread 實(shí)現(xiàn)了 Runnable 所以我們可以考慮使用線程池來持有多個(gè)FlushThread

所以我們就有這樣的代碼:

public class Flusher {

    private final FlushThread[] flushThreads;

    private AtomicInteger index;

    //防止多個(gè)線程同時(shí)執(zhí)行。增加一個(gè)隨機(jī)數(shù)間隔
    private static final Random r = new Random();

    private static final int delta = 50;

    private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1);

    private static ExecutorService POOL = Executors.newCachedThreadPool();

    public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor processor) {

        this.flushThreads = new FlushThread[threads];


        if(threads > 1){
            index = new AtomicInteger();
        }

        for (int i = 0; i < threads; i++) {
            final FlushThread flushThread = new FlushThread(name+ "-" + i,bufferSiz,flushInterval,queueSize,processor);
            flushThreads[i] = flushThread;
            POOL.submit(flushThread);
            //定時(shí)調(diào)用 timeOut()方法。
            TIMER.scheduleAtFixedRate(flushThread::timeOut, r.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS);
        }
    }

    // 對 index 取模,保證多線程都能被add
    public boolean add(Item item){
        int len = flushThreads.length;
        if(len == 1){
            return flushThreads[0].add(item);
        }

        int mod = index.incrementAndGet() % len;
        return flushThreads[mod].add(item);

    }

    //上文已經(jīng)描述
    private static class FlushThread implements Runnable{
        ...省略
    }
}

面向接口編程,提升系統(tǒng)擴(kuò)展性:

public interface Processor {
    void process(List list);
}
使用

我們寫個(gè)測試方法測試一下:

//實(shí)現(xiàn) Processor 將 String 全部輸出
public class PrintOutProcessor implements Processor{
    @Override
    public void process(List list) {

        System.out.println("start flush");

        list.forEach(System.out::println);

        System.out.println("end flush");
    }
}
public class Test {

    public static void main(String[] args) throws InterruptedException {

        Flusher stringFlusher = new Flusher<>("test",5,1000,30,1,new PrintOutProcessor());

        int index = 1;
        while (true){
            stringFlusher.add(String.valueOf(index++));
            Thread.sleep(1000);
        }
    }
}

執(zhí)行的結(jié)果:

start flush
1
2
3
end flush
start flush
4
5
6
7
end flush

我們發(fā)現(xiàn)并沒有達(dá)到10個(gè)數(shù)字就觸發(fā)了flush。因?yàn)槌霭l(fā)了超時(shí)提交,雖然還沒有達(dá)到規(guī)定的5
個(gè)數(shù)據(jù),但還是執(zhí)行了 flush。

如果我們?nèi)コ?Thread.sleep(1000); 再看看結(jié)果:

start flush
1
2
3
4
5
end flush
start flush
6
7
8
9
10
end flush

每5個(gè)數(shù)一次提交。完美。。。。

總結(jié)

一個(gè)比較生動(dòng)的例子給大家講解了一些多線程的具體運(yùn)用。學(xué)習(xí)多線程應(yīng)該多思考多動(dòng)手,才會(huì)有比較好的效果。希望這篇文章大家讀完以后有所收獲,歡迎交流。

github地址:https://github.com/diaozxin007/framework

徒手?jǐn)]框架系列文章地址:

徒手?jǐn)]框架--實(shí)現(xiàn)IoC

徒手?jǐn)]框架--實(shí)現(xiàn)Aop

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/68354.html

相關(guān)文章

  • JAVA 中的 CAS

    摘要:我們繼續(xù)看代碼的意思是這個(gè)是一段內(nèi)嵌匯編代碼。也就是在語言中使用匯編代碼。就是匯編版的比較并交換。就是保證在多線程情況下,不阻塞線程的填充和消費(fèi)。微觀上看匯編的是實(shí)現(xiàn)操作系統(tǒng)級別的原子操作的基石。 原文地址:https://www.xilidou.com/2018/02/01/java-cas/ CAS 是現(xiàn)代操作系統(tǒng),解決并發(fā)問題的一個(gè)重要手段,最近在看 eureka 的源碼的時(shí)候。...

    CocoaChina 評論0 收藏0
  • 徒手一個(gè)簡單的RPC框架

    摘要:徒手?jǐn)]一個(gè)簡單的框架之前在牛逼哄哄的框架,底層到底什么原理得知了遠(yuǎn)程過程調(diào)用簡單來說就是調(diào)用遠(yuǎn)程的服務(wù)就像調(diào)用本地方法一樣,其中用到的知識(shí)有序列化和反序列化動(dòng)態(tài)代理網(wǎng)絡(luò)傳輸動(dòng)態(tài)加載反射這些知識(shí)點(diǎn)。 徒手?jǐn)]一個(gè)簡單的RPC框架 之前在牛逼哄哄的 RPC 框架,底層到底什么原理得知了RPC(遠(yuǎn)程過程調(diào)用)簡單來說就是調(diào)用遠(yuǎn)程的服務(wù)就像調(diào)用本地方法一樣,其中用到的知識(shí)有序列化和反序列化、動(dòng)態(tài)...

    Gemini 評論0 收藏0
  • 徒手一個(gè) Spring Boot 中的 Starter ,解密自動(dòng)化配置黑魔法!

    摘要:先來看代碼吧,一會(huì)松哥再慢慢解釋關(guān)于這一段自動(dòng)配置,解釋如下首先注解表明這是一個(gè)配置類。本文的案例,松哥已經(jīng)上傳到上了,地址。我們使用 Spring Boot,基本上都是沉醉在它 Stater 的方便之中。Starter 為我們帶來了眾多的自動(dòng)化配置,有了這些自動(dòng)化配置,我們可以不費(fèi)吹灰之力就能搭建一個(gè)生產(chǎn)級開發(fā)環(huán)境,有的小伙伴會(huì)覺得這個(gè) Starter 好神奇呀!其實(shí) Starter 也都...

    xiaochao 評論0 收藏0
  • 徒手UI之Paginator

    摘要:是一個(gè)組件庫目前擁有的組件語法編寫,無依賴原生模塊化,以上支持,請開啟靜態(tài)服務(wù)器預(yù)覽效果,靜態(tài)服務(wù)器傳送門采用變量配置樣式辛苦造輪子,歡迎來倉庫四月份找工作,求內(nèi)推,坐標(biāo)深圳寫在前面去年年底項(xiàng)目中嘗試著寫過一個(gè)分頁的組件,然后就有了寫的想法 QingUI是一個(gè)UI組件庫目前擁有的組件:DatePicker, TimePicker, Paginator, Tree, Cascader, ...

    liuhh 評論0 收藏0
  • 徒手框架--實(shí)現(xiàn)IoC

    摘要:從而能夠進(jìn)一步深入了解框架。至此我們框架開發(fā)完成。雖然說閱讀源碼是了解框架的最終手段。但是框架作為一個(gè)生產(chǎn)框架,為了保證通用和穩(wěn)定,源碼必定是高度抽象,且處理大量細(xì)節(jié)。下一篇文章應(yīng)該會(huì)是徒手?jǐn)]框架實(shí)現(xiàn)。 原文地址:https://www.xilidou.com/2018/... Spring 作為 J2ee 開發(fā)事實(shí)上的標(biāo)準(zhǔn),是每個(gè)Java開發(fā)人員都需要了解的框架。但是Spring 的...

    rottengeek 評論0 收藏0

發(fā)表評論

0條評論

最新活動(dòng)
閱讀需要支付1元查看
<