成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

如何以并發(fā)方式在同一個(gè)流上執(zhí)行多種操作?--復(fù)制流

王晗 / 3496人閱讀

摘要:正常情況下,一個(gè)流在執(zhí)行一次終端操作之后便結(jié)束了。本文通過復(fù)制流內(nèi)數(shù)據(jù)的方式,曲折的實(shí)現(xiàn)了同一個(gè)流上執(zhí)行多次操作。只是思路,其性能并不一定高效,尤其是數(shù)據(jù)都在內(nèi)存中處理時(shí)復(fù)制的開銷很大。但如果流涉及大量,也許性能會有提高。

正常情況下,一個(gè)流在執(zhí)行一次終端操作之后便結(jié)束了。本文通過復(fù)制流內(nèi)數(shù)據(jù)的方式,曲折的實(shí)現(xiàn)了同一個(gè)流上執(zhí)行多次操作。
Demo只是思路,其性能并不一定高效,尤其是數(shù)據(jù)都在內(nèi)存中處理時(shí)復(fù)制的開銷很大。但如果流涉及大量I/O,也許性能會有提高。

public class StreamForker {
    private final Stream stream;
    private final Map, ?>> forks = new HashMap<>();

    public StreamForker(Stream stream) {
        this.stream = stream;
    }

    public StreamForker fork(Object key, Function, ?> f) {
        forks.put(key, f);
        return this;
    }

    public Results getResults() {
        ForkingStreamConsumer consumer = build();
        try {
            stream.sequential().forEach(consumer);
        } finally {
            consumer.finish();
        }
        return consumer;
    }

    private ForkingStreamConsumer build() {
        List> queues = new ArrayList<>();

        Map> actions = forks.entrySet().stream().reduce(new HashMap>(),
                (map, e) -> {
                    map.put(e.getKey(), getOperationResult(queues, e.getValue()));
                    return map;
                }, (m1, m2) -> {
                    m1.putAll(m2);
                    return m1;
                });

        return new ForkingStreamConsumer<>(queues, actions);
    }

    private Future getOperationResult(List> queues, Function, ?> f) {
        BlockingQueue queue = new LinkedBlockingQueue<>();
        queues.add(queue);
        Spliterator spliterator = new BlockingQueueSpliterator<>(queue);
        Stream source = StreamSupport.stream(spliterator, false);
        return CompletableFuture.supplyAsync(() -> f.apply(source));
    }
}

accept方法將原始流中所有的數(shù)據(jù)添加到各個(gè)BlockingQueue內(nèi),此處實(shí)現(xiàn)了復(fù)制

class ForkingStreamConsumer implements Consumer, Results {
    static final Object END_OF_STREAM = new Object();

    private final List> queues;
    private final Map> actions;

    public ForkingStreamConsumer(List> queues, Map> actions) {
        this.queues = queues;
        this.actions = actions;
    }

    @Override
    public void accept(T t) {
        queues.forEach(q -> q.add(t));
    }

    @SuppressWarnings("unchecked")
    void finish() {
        accept((T) END_OF_STREAM);
    }

    @SuppressWarnings("unchecked")
    @Override
    public  R get(Object key) {
        try {
            return ((Future) actions.get(key)).get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

此處重寫了tryAdvance接口,只是簡單的從BlockingQueue中取出數(shù)據(jù),執(zhí)行action。業(yè)務(wù)邏輯中復(fù)制流是為了做什么事情,action就是這件事情。ForkingStreamConsumer.END_OF_STREAM是Queue中數(shù)據(jù)結(jié)束的標(biāo)示

class BlockingQueueSpliterator implements Spliterator {
    private final BlockingQueue q;

    BlockingQueueSpliterator(BlockingQueue q) {
        this.q = q;
    }

    @Override
    public boolean tryAdvance(Consumer action) {
        T t;
        while (true) {
            try {
                t = q.take();
                break;
            } catch (InterruptedException e) {
            }
        }

        if (t != ForkingStreamConsumer.END_OF_STREAM) {
            action.accept(t);
            return true;
        }

        return false;
    }

    @Override
    public Spliterator trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return 0;
    }

    @Override
    public int characteristics() {
        return 0;
    }
}

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/77289.html

相關(guān)文章

  • 從命令式到響應(yīng)式(五)

    摘要:輸出流只有在所有的輸入流都完成以后才能完成,任何一條輸入流上的錯(cuò)誤都將立即推送到輸出流上。如果沒有轉(zhuǎn)入輸入流,輸出流將會立即發(fā)出結(jié)束通知。返回值以數(shù)組形式獲取到的每一個(gè)輸入流的值,或者來自映射函數(shù)的值。返回值僅從最新的內(nèi)部流上取值的流。 接著上一節(jié)的操作符繼續(xù),讓我們直奔主題。 組合類操作符 組合類的操作符可以將不同流數(shù)據(jù)按一定的規(guī)則進(jìn)行合并,從而獲得所需要的完整數(shù)據(jù)。 combine...

    CoderBear 評論0 收藏0
  • 從命令式到響應(yīng)式(四)

    摘要:使用的操作符這條從左到右的橫線代表經(jīng)過操作符轉(zhuǎn)換后的輸出流。返回值通過判定函數(shù)檢測的值組成的流。返回值持續(xù)發(fā)出輸入流上的值,直到通知流上發(fā)出值為止。 上期介紹過了rxjs中的三大件,Observable,subscription,subject,但是在開發(fā)過程我們最常接觸到的東西非操作符莫屬。比如上期代碼中曾出現(xiàn)過的from就是一個(gè)操作符。rxjs中的操作符大致上可以分為幾類,創(chuàng)建類,...

    jaysun 評論0 收藏0
  • 巧妙復(fù)制一個(gè)

    摘要:場景實(shí)際業(yè)務(wù)中可能出現(xiàn)重復(fù)消費(fèi)一個(gè)可讀流的情況,比如在前置過濾器解析請求體,拿到進(jìn)行相關(guān)權(quán)限及身份認(rèn)證認(rèn)證通過后框架或者后置過濾器再次解析請求體傳遞給業(yè)務(wù)上下文。 場景 實(shí)際業(yè)務(wù)中可能出現(xiàn)重復(fù)消費(fèi)一個(gè)可讀流的情況,比如在前置過濾器解析請求體,拿到body進(jìn)行相關(guān)權(quán)限及身份認(rèn)證;認(rèn)證通過后框架或者后置過濾器再次解析請求體傳遞給業(yè)務(wù)上下文。因此,重復(fù)消費(fèi)同一個(gè)流的需求并不奇葩,這類似于js...

    wenzi 評論0 收藏0
  • 探索 RxJS - Core Concept

    摘要:但不同的是,在的遍歷調(diào)用過程中,如果一個(gè)事件還沒有觸發(fā)完畢獲取到返回值,就觸發(fā)了下一個(gè)事件,則將忽略返回的值。這樣,我們就可以避免異步的返回值因?yàn)榉祷剌^慢,反而覆蓋了之后異步的返回值。 Steam in ReactiveX showImg(https://segmentfault.com/img/bVFReX?w=100&h=100); ReactiveX,又稱 Reactive Ex...

    Neilyo 評論0 收藏0

發(fā)表評論

0條評論

王晗

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<