摘要:正常情況下,一個(gè)流在執(zhí)行一次終端操作之后便結(jié)束了。本文通過復(fù)制流內(nèi)數(shù)據(jù)的方式,曲折的實(shí)現(xiàn)了同一個(gè)流上執(zhí)行多次操作。只是思路,其性能并不一定高效,尤其是數(shù)據(jù)都在內(nèi)存中處理時(shí)復(fù)制的開銷很大。但如果流涉及大量,也許性能會有提高。
正常情況下,一個(gè)流在執(zhí)行一次終端操作之后便結(jié)束了。本文通過復(fù)制流內(nèi)數(shù)據(jù)的方式,曲折的實(shí)現(xiàn)了同一個(gè)流上執(zhí)行多次操作。
Demo只是思路,其性能并不一定高效,尤其是數(shù)據(jù)都在內(nèi)存中處理時(shí)復(fù)制的開銷很大。但如果流涉及大量I/O,也許性能會有提高。
public class StreamForker{ private final Stream stream; private final Map
accept方法將原始流中所有的數(shù)據(jù)添加到各個(gè)BlockingQueue內(nèi),此處實(shí)現(xiàn)了復(fù)制
class ForkingStreamConsumerimplements Consumer , Results { static final Object END_OF_STREAM = new Object(); private final List > queues; private final Map > actions; public ForkingStreamConsumer(List > queues, Map > actions) { this.queues = queues; this.actions = actions; } @Override public void accept(T t) { queues.forEach(q -> q.add(t)); } @SuppressWarnings("unchecked") void finish() { accept((T) END_OF_STREAM); } @SuppressWarnings("unchecked") @Override public R get(Object key) { try { return ((Future ) actions.get(key)).get(); } catch (Exception e) { throw new RuntimeException(e); } } }
此處重寫了tryAdvance接口,只是簡單的從BlockingQueue中取出數(shù)據(jù),執(zhí)行action。業(yè)務(wù)邏輯中復(fù)制流是為了做什么事情,action就是這件事情。ForkingStreamConsumer.END_OF_STREAM是Queue中數(shù)據(jù)結(jié)束的標(biāo)示
class BlockingQueueSpliteratorimplements Spliterator { private final BlockingQueue q; BlockingQueueSpliterator(BlockingQueue q) { this.q = q; } @Override public boolean tryAdvance(Consumer super T> action) { T t; while (true) { try { t = q.take(); break; } catch (InterruptedException e) { } } if (t != ForkingStreamConsumer.END_OF_STREAM) { action.accept(t); return true; } return false; } @Override public Spliterator trySplit() { return null; } @Override public long estimateSize() { return 0; } @Override public int characteristics() { return 0; } }
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/77289.html
摘要:輸出流只有在所有的輸入流都完成以后才能完成,任何一條輸入流上的錯(cuò)誤都將立即推送到輸出流上。如果沒有轉(zhuǎn)入輸入流,輸出流將會立即發(fā)出結(jié)束通知。返回值以數(shù)組形式獲取到的每一個(gè)輸入流的值,或者來自映射函數(shù)的值。返回值僅從最新的內(nèi)部流上取值的流。 接著上一節(jié)的操作符繼續(xù),讓我們直奔主題。 組合類操作符 組合類的操作符可以將不同流數(shù)據(jù)按一定的規(guī)則進(jìn)行合并,從而獲得所需要的完整數(shù)據(jù)。 combine...
摘要:使用的操作符這條從左到右的橫線代表經(jīng)過操作符轉(zhuǎn)換后的輸出流。返回值通過判定函數(shù)檢測的值組成的流。返回值持續(xù)發(fā)出輸入流上的值,直到通知流上發(fā)出值為止。 上期介紹過了rxjs中的三大件,Observable,subscription,subject,但是在開發(fā)過程我們最常接觸到的東西非操作符莫屬。比如上期代碼中曾出現(xiàn)過的from就是一個(gè)操作符。rxjs中的操作符大致上可以分為幾類,創(chuàng)建類,...
摘要:場景實(shí)際業(yè)務(wù)中可能出現(xiàn)重復(fù)消費(fèi)一個(gè)可讀流的情況,比如在前置過濾器解析請求體,拿到進(jìn)行相關(guān)權(quán)限及身份認(rèn)證認(rèn)證通過后框架或者后置過濾器再次解析請求體傳遞給業(yè)務(wù)上下文。 場景 實(shí)際業(yè)務(wù)中可能出現(xiàn)重復(fù)消費(fèi)一個(gè)可讀流的情況,比如在前置過濾器解析請求體,拿到body進(jìn)行相關(guān)權(quán)限及身份認(rèn)證;認(rèn)證通過后框架或者后置過濾器再次解析請求體傳遞給業(yè)務(wù)上下文。因此,重復(fù)消費(fèi)同一個(gè)流的需求并不奇葩,這類似于js...
摘要:但不同的是,在的遍歷調(diào)用過程中,如果一個(gè)事件還沒有觸發(fā)完畢獲取到返回值,就觸發(fā)了下一個(gè)事件,則將忽略返回的值。這樣,我們就可以避免異步的返回值因?yàn)榉祷剌^慢,反而覆蓋了之后異步的返回值。 Steam in ReactiveX showImg(https://segmentfault.com/img/bVFReX?w=100&h=100); ReactiveX,又稱 Reactive Ex...
閱讀 2030·2021-09-29 09:35
閱讀 1957·2019-08-30 14:15
閱讀 2981·2019-08-30 10:56
閱讀 967·2019-08-29 16:59
閱讀 581·2019-08-29 14:04
閱讀 1315·2019-08-29 12:30
閱讀 1033·2019-08-28 18:19
閱讀 517·2019-08-26 11:51