摘要:如果到齊了,就可以開始統(tǒng)計(jì)出這個(gè)時(shí)間窗口內(nèi)的指標(biāo)了。這種里會(huì)遇到兩個(gè)難題多個(gè)流的速度不一樣,如何判斷一個(gè)時(shí)間窗口內(nèi)的都到齊了。
在本文發(fā)出之后不久,老外就寫了一篇類似內(nèi)容的。人家比我寫得好,推薦大家讀這篇
http://radar.oreilly.com/2015/08/the-world-beyond-batch-streaming-101....
流式統(tǒng)計(jì)聽著挺容易的一個(gè)事情,說到底不就是數(shù)數(shù)嘛,每個(gè)告警系統(tǒng)里基本上都有一個(gè)簡單的流式統(tǒng)計(jì)模塊。但是當(dāng)時(shí)基于storm做的時(shí)候,這幾個(gè)問題還是困擾了我很長時(shí)間的。沒有用過spark streaming/flink,不知道下面這些問題在spark streaming/flink里是不是都已經(jīng)解決得很好了。
時(shí)間窗口切分問題做流式統(tǒng)計(jì)首要的問題是把一個(gè)時(shí)間窗口內(nèi)的數(shù)據(jù)統(tǒng)計(jì)到一起。問題是,什么是時(shí)間窗口?有兩種選擇
日志時(shí)間(event timestamp)
墻上時(shí)間(wall clock)
最簡單的時(shí)間窗口統(tǒng)計(jì)的是基于“墻上時(shí)間”的,每過1分鐘就切分出一個(gè)新窗口出來。比如statsd,它的窗口切分就是這樣的。這種基于“墻上時(shí)間”的統(tǒng)計(jì)有一個(gè)非常嚴(yán)重的問題是不能回放數(shù)據(jù)流。當(dāng)數(shù)據(jù)流是實(shí)時(shí)產(chǎn)生的時(shí)候,“墻上時(shí)間”的一分鐘也就只會(huì)有一分鐘的event被產(chǎn)生出來。但是如果統(tǒng)計(jì)的數(shù)據(jù)流是基于歷史event的,那么一分鐘可以產(chǎn)生消費(fèi)的event數(shù)量只受限于數(shù)據(jù)處理速度。另外event在分布式采集的時(shí)候也遇到有快有慢的問題,一分鐘內(nèi)產(chǎn)生的event未必可以在一分鐘內(nèi)精確到達(dá)統(tǒng)計(jì)端,這樣就會(huì)因?yàn)椴杉难舆t波動(dòng)影響統(tǒng)計(jì)數(shù)據(jù)的準(zhǔn)確性。實(shí)際上基于“墻上時(shí)間”統(tǒng)計(jì)需要
collection latency = wall clock - event timestamp
基于“墻上時(shí)間”的統(tǒng)計(jì)需要采集延遲非常小,波動(dòng)也很小才可以工作良好。大部分時(shí)候更現(xiàn)實(shí)的選擇是需要基于“日志時(shí)間”來進(jìn)行窗口統(tǒng)計(jì)的。
使用“日志時(shí)間”就會(huì)引入數(shù)據(jù)亂序的問題,對(duì)于一個(gè)實(shí)時(shí)event stream流,其每個(gè)event的timestamp未必是嚴(yán)格遞增的。這種亂序有兩種因素引入:
event產(chǎn)生的機(jī)器的時(shí)鐘不完全同步(NTP有100ms左右的不同步)
event從采集到到達(dá)kafka的速度不均衡(不同的網(wǎng)絡(luò)線路有快有慢)
我們希望的流式統(tǒng)計(jì)是這樣的:
但是實(shí)際上數(shù)據(jù)只是基本有序的,也就是在時(shí)間窗口的邊緣會(huì)有一些event需要跨到另外一個(gè)窗口去:
最簡單的分發(fā)event到時(shí)間窗口代碼是這樣的
window index = event timestamp / window size
對(duì)1分鐘的時(shí)間窗口 window size 就是60,timestamp除以60為相同window index的event就是在同一個(gè)時(shí)間窗口的。問題的關(guān)鍵是,什么時(shí)候我可以確信這個(gè)時(shí)間窗口內(nèi)的event都已經(jīng)到齊了。如果到齊了,就可以開始統(tǒng)計(jì)出這個(gè)時(shí)間窗口內(nèi)的指標(biāo)了。然后突然又有一個(gè)落后于大伙的event落到這個(gè)已經(jīng)被計(jì)算過的時(shí)間窗口如何處理?
對(duì)于大部分統(tǒng)計(jì)而言,一個(gè)時(shí)間窗口統(tǒng)計(jì)出多條結(jié)果存入db并不是什么大的問題,從db里查詢的時(shí)候把多條結(jié)果再合并就可以了。
對(duì)于一些類型的統(tǒng)計(jì)(非monad),比如平均值,時(shí)間窗口內(nèi)的event分為兩批統(tǒng)計(jì)出來的結(jié)果是沒有辦法被再次匯總的。
實(shí)時(shí)類的計(jì)算對(duì)時(shí)間敏感,來晚了的數(shù)據(jù)就沒有意義了。比如告警,一個(gè)時(shí)間窗過去了就沒有必要再理會(huì)這個(gè)時(shí)間窗口了。
所以對(duì)于來晚了的數(shù)據(jù)就兩種策略:要么再統(tǒng)計(jì)一條結(jié)果出來,要么直接丟棄。要確定什么時(shí)候一個(gè)時(shí)間窗口內(nèi)的event已經(jīng)到齊了,有幾種策略:
sleep 等待一段時(shí)間(墻上時(shí)間)
event timestamp超過了時(shí)間窗口一點(diǎn)點(diǎn)不關(guān)閉當(dāng)前時(shí)間窗口,而是要等event timestamp大幅超出時(shí)間窗口的時(shí)候才關(guān)閉窗口。比如12:05:30秒的event到了才關(guān)閉12:04:00 ~ 12:05:00的時(shí)間窗口。
一兩個(gè)event超出了時(shí)間窗口不關(guān)閉,只有當(dāng)“大量”的event超出時(shí)間窗口才關(guān)閉。比如1個(gè)event超過12:05分不關(guān)閉,如果有100個(gè)event超過了12:05的時(shí)間窗口就關(guān)閉它。
三種策略其實(shí)都是“等”,只是等的依據(jù)不同。實(shí)踐中,第二種策略也就是根據(jù)“日志時(shí)間”的等待是最容易實(shí)現(xiàn)的。如果對(duì)于過期的event不是丟棄,而是要再次統(tǒng)計(jì)一條結(jié)果出來,那么過期的窗口要重新打開,又要經(jīng)過一輪“等待”去判斷這個(gè)過去的窗口什么時(shí)候再被關(guān)閉。
在spark上已經(jīng)有人做類似的嘗試了:Building Big Data Operational Intelligence platform with Apache Spark - Eric Carr (Guavus)
多流合并的問題一個(gè)kafka的partition就是一個(gè)流,一個(gè)kafka topic的多個(gè)partition就是多個(gè)獨(dú)立的流(offset彼此獨(dú)立增長)。多個(gè)kafka topic顯然是多個(gè)獨(dú)立的流。流式統(tǒng)計(jì)經(jīng)常需要把多個(gè)流合并統(tǒng)計(jì)到一起。這種里會(huì)遇到兩個(gè)難題
多個(gè)流的速度不一樣,如何判斷一個(gè)時(shí)間窗口內(nèi)的event都到齊了。如果按照前面的等待策略,可能處理一個(gè)流內(nèi)部的基本有序局部亂序是有效的,但是對(duì)于多個(gè)流速差異很大的流就無能為力了。一個(gè)很快的流很容易把時(shí)間窗口往后推得很遠(yuǎn),把其他流遠(yuǎn)遠(yuǎn)跑到后面。
流速不均不能靠下游兜著,下游的內(nèi)存是有限的。根本上是需要一種“背壓”的機(jī)制,讓下游通知流速過快的上游,你慢點(diǎn)產(chǎn)生新的event,等等其他人。
舉一個(gè)具體的例子:
spout 1 emit 12:05 spout 1 emit 12:06 spout 2 emit 12:04 spout 1 emit 12:07 spout 2 emit 12:05 // this is when 12:05 is ready
要想知道12:05這個(gè)時(shí)間窗的event都到齊了,首先要知道相關(guān)的流有幾個(gè)(在這例子里是spout1和spout2兩個(gè)流),然后要知道什么時(shí)候spout1產(chǎn)生了12:05的數(shù)據(jù),什么時(shí)候spout2產(chǎn)生了12:05的數(shù)據(jù),最后才可以判斷出來12:05的數(shù)據(jù)是到齊了的。在某個(gè)地方要存一份這樣的流速的數(shù)據(jù)去跟蹤,在窗口內(nèi)數(shù)據(jù)到齊之后發(fā)出信號(hào)讓相關(guān)的下游往前推動(dòng)時(shí)間窗口??紤]到一個(gè)分布式的系統(tǒng),這個(gè)跟蹤要放在哪個(gè)地方做,怎么去通知所有的相關(guān)方。
極端一些的例子
spout 1 emit 13:05 spout 2 emit 12:31 spout 1 emit 13:06 spout 2 emit 12:32
多個(gè)流的流速可能會(huì)相差到半個(gè)小時(shí)以上??紤]到如果用歷史的數(shù)據(jù)匯入到實(shí)時(shí)統(tǒng)計(jì)系統(tǒng)里時(shí),很容易因?yàn)橛?jì)算速度不同導(dǎo)致不同節(jié)點(diǎn)之間的處理進(jìn)度不一致。要計(jì)算出正確的結(jié)果,下游需要緩存這些差異的半個(gè)小時(shí)內(nèi)的所有數(shù)據(jù),這樣很容易爆內(nèi)存。但是上游如何感知到下游要處理不過來了呢?多個(gè)上游之間又如何感知彼此之間的速度差異呢?又有誰來仲裁誰應(yīng)該流慢一些呢?
一個(gè)相對(duì)簡單的做法是在整個(gè)流式統(tǒng)計(jì)的分布式系統(tǒng)里引入一個(gè)coordinator的角色。它負(fù)責(zé)跟蹤不同流的流速,在時(shí)間窗口的數(shù)據(jù)到齊之后通知下游flush,在一些上游流速過快的時(shí)候(比如最快的流相比最慢的流差距大于10分鐘)由coordinator發(fā)送backoff指令給流速過快的上游,然后接到指令之后sleep一段時(shí)間。一段基本堪用的跟蹤不同流流速的代碼:https://gist.github.com/taowen/2d0b3bcc0a4bfaecd404
數(shù)據(jù)一致性問題低檔一些的說法是這樣的。假設(shè)統(tǒng)計(jì)出來的曲線是這樣的:
如果中間,比如08:35左右重啟了統(tǒng)計(jì)程序,那么曲線能否還是連續(xù)的?
高檔一些的說法是,可以把流式統(tǒng)計(jì)理解為主數(shù)據(jù)庫與分析數(shù)據(jù)庫之間通過kafka消息隊(duì)列進(jìn)行異步同步。主數(shù)據(jù)庫與分析數(shù)據(jù)庫之間應(yīng)該保持eventual consistency。
要保證數(shù)據(jù)不重不丟,就要做到生產(chǎn)到kafka的時(shí)候,在主數(shù)據(jù)庫和kafka消息隊(duì)列之間保持一個(gè)事務(wù)一致性。舉一個(gè)簡單的例子:
用戶下了一個(gè)訂單 主數(shù)據(jù)庫里插入了一條訂單的數(shù)據(jù)記錄 kafka消息隊(duì)列里多了一條OrderPlaced的event
這個(gè)流程中一個(gè)問題就是,主數(shù)據(jù)插入成功了之后,可能往kafka消息隊(duì)列里enqueue event失敗。如果把這個(gè)操作反過來
用戶下了一個(gè)訂單 kafka消息隊(duì)列里多了一條OrderPlaced的event 主數(shù)據(jù)庫里插入了一條訂單的數(shù)據(jù)記錄
又可能出現(xiàn)kafka消息隊(duì)列里enqueue了,但是主數(shù)據(jù)庫插入失敗的情況。就kafka隊(duì)列的目前的設(shè)計(jì)而言,對(duì)這個(gè)問題是無解的。一旦enqueue的event,除非過期是無法刪除的。
在消費(fèi)端,當(dāng)我們從kafka里取出數(shù)據(jù)之后,去更新分析數(shù)據(jù)庫的過程也要保持一個(gè)分布式事務(wù)的一致性。
取出下一條OrderPlaced evnet(指向的offset+1) 當(dāng)前時(shí)間窗的統(tǒng)計(jì)值+1 重復(fù)以上過程,直到窗口被關(guān)閉,數(shù)據(jù)寫入到分析數(shù)據(jù)庫
kafka的數(shù)據(jù)是可以重放的,只要指定offset就可以把這個(gè)offset以及之后的數(shù)據(jù)讀取出來。所謂消費(fèi)的過程就是把客戶端保存的offset值加1的過程。問題是,這個(gè)offset指針保存在哪里的問題。常規(guī)的做法是把消費(fèi)的offset保存到zookeeper里。那么這就有一個(gè)分布式的一致性問題了,zookeeper里offset+1了,但是分析數(shù)據(jù)庫并沒有實(shí)際把值統(tǒng)計(jì)進(jìn)去??紤]到統(tǒng)計(jì)一般不是每條輸入的event都會(huì)更新分析數(shù)據(jù)庫,而是把中間狀態(tài)緩存在內(nèi)存中的。那么就有可能消費(fèi)了成千上萬個(gè)event,狀態(tài)都在內(nèi)存里,然后“啪”的一下機(jī)器掉電了。如果每次讀取event都移動(dòng)offset的話,這些event就丟掉了。如果不是每次都移動(dòng)offset的話,又可能在重啟的時(shí)候?qū)е轮貜?fù)統(tǒng)計(jì)。
搞統(tǒng)計(jì)的人在乎這么一兩條數(shù)據(jù)嗎?其實(shí)大部分人是不在乎的。不少團(tuán)隊(duì)壓根連offset都不保存,每次開始統(tǒng)計(jì)直接seek到隊(duì)列的尾部開始。實(shí)時(shí)計(jì)算嘛,實(shí)時(shí)最重要了。準(zhǔn)確計(jì)算?重放歷史?這個(gè)讓hadoop搞定就好了。但是如果就是要較這個(gè)真呢?或者我們不追求嚴(yán)格的強(qiáng)一致,只要求重啟之后曲線不斷開那么難看就好了。
別的流式計(jì)算框架不清楚,storm的ack機(jī)制是毫無幫助的。
storm的ack機(jī)制是基于每個(gè)message來做的。這就要求如果做一個(gè)每分鐘100萬個(gè)event的統(tǒng)計(jì),一分鐘就要跟蹤100萬個(gè)message id。就算是100萬個(gè)int,也是一筆相當(dāng)可觀的內(nèi)存開銷。要知道,從kafka里讀出來的event都是順序offset的,處理也是順序,只要記錄一個(gè)offset就可以跟蹤整個(gè)流的消費(fèi)進(jìn)度了。1個(gè)int,相比100萬個(gè)int,storm的per message ack的機(jī)制對(duì)于流式處理的進(jìn)度跟蹤來說,沒有利用消息處理的有序性(storm根本上假設(shè)message之間是彼此獨(dú)立處理的),而變得效率低下。
要做到強(qiáng)一致是很困難的,它需要把
更新保存的offset
更新插入分析數(shù)據(jù)庫
變成一個(gè)原子事務(wù)來完成。大部分分析數(shù)據(jù)庫都沒有原子性事務(wù)的能力,連插入三條數(shù)據(jù)都不能保持同時(shí)變?yōu)榭梢?,且不說還要用它來記錄offset了。考慮到kafka在生產(chǎn)端都無法提供分布式事務(wù),event從生產(chǎn)出來就不是完全一致的(多產(chǎn)生了或者少產(chǎn)生了),真正高一致的計(jì)費(fèi)場景還是用其他的技術(shù)棧。所以值得解決的問題是,如何在重啟之后,把之前重啟的時(shí)候丟棄掉的內(nèi)存狀態(tài)重新恢復(fù)出來,使得統(tǒng)計(jì)出來的曲線仍然是連續(xù)的。
解決思路有三點(diǎn):
上游備份策略:重啟的時(shí)候重放kafka的歷史數(shù)據(jù),恢復(fù)內(nèi)存狀態(tài)
中間狀態(tài)持久化:把統(tǒng)計(jì)的狀態(tài)放到外部的持久的數(shù)據(jù)庫里,不放內(nèi)存里
同時(shí)跑兩份:同時(shí)有兩個(gè)完全一樣的統(tǒng)計(jì)任務(wù),重啟一個(gè),另外一個(gè)還能正常運(yùn)行。
內(nèi)存狀態(tài)管理的問題做流式統(tǒng)計(jì)的有兩種做法:
依賴于外部存儲(chǔ)管理狀態(tài):比如沒收到一個(gè)event,就往redis里發(fā)incr增1
純內(nèi)存統(tǒng)計(jì):在內(nèi)存里設(shè)置一個(gè)counter,每收到一個(gè)event就+1
基于外部存儲(chǔ)會(huì)把整個(gè)壓力全部壓到數(shù)據(jù)庫上。一般來說流式統(tǒng)計(jì)的流速是很快的,遠(yuǎn)大于普通的關(guān)系型數(shù)據(jù)庫,甚至可能會(huì)超過單臺(tái)redis的承載。這就使得基于純內(nèi)存的統(tǒng)計(jì)非常有吸引力。大部分的時(shí)候都是在更新時(shí)間窗口內(nèi)的內(nèi)存狀態(tài),只有當(dāng)時(shí)間窗口關(guān)閉的時(shí)候才把數(shù)據(jù)刷到分析數(shù)據(jù)庫里去。刷數(shù)據(jù)出去的同時(shí)記錄一下當(dāng)前流消費(fèi)到的位置(offset)。
這種純內(nèi)存的狀態(tài)相對(duì)來說容易管理一些。計(jì)算直接是基于這個(gè)內(nèi)存狀態(tài)做的。如果重啟丟失了,重放一段歷史數(shù)據(jù)就可以重建出來。
但是內(nèi)存的問題是它總是不夠用的。當(dāng)統(tǒng)計(jì)的維度組合特別多的時(shí)候,比如其中某個(gè)字段是用戶的id,那么很快這個(gè)內(nèi)存狀態(tài)就會(huì)超過單機(jī)的內(nèi)存上限。這種情況有兩種辦法:
利用partition把輸入的input分割,一個(gè)流分成多個(gè)流,每個(gè)統(tǒng)計(jì)程序需要跟蹤的維度組合就變少了
把存儲(chǔ)移到外邊去
簡單地在流式統(tǒng)計(jì)程序里開關(guān)數(shù)據(jù)庫連接是可以解決這個(gè)容量問題的:
但是這種對(duì)外部數(shù)據(jù)庫使用不小心就會(huì)導(dǎo)致兩個(gè)問題:
處理速度慢。不用一些批量的操作,數(shù)據(jù)庫操作很快就會(huì)變成瓶頸
數(shù)據(jù)庫的狀態(tài)不一直。內(nèi)存的狀態(tài)重啟了就丟失了,外部的狀態(tài)重啟之后不丟失。重放數(shù)據(jù)流就可能導(dǎo)致數(shù)據(jù)的重復(fù)統(tǒng)計(jì)
但是這種把窗口統(tǒng)計(jì)的中間狀態(tài)落地的好處也是顯而易見的。重啟之后不用通過重算來恢復(fù)內(nèi)存狀態(tài)。如果一個(gè)時(shí)間窗口有24小時(shí),重算24小時(shí)的歷史數(shù)據(jù)可能是很昂貴的操作。
版本跟蹤,批量等都不應(yīng)該是具體的統(tǒng)計(jì)邏輯的實(shí)現(xiàn)者的責(zé)任。理論上框架應(yīng)該負(fù)責(zé)把冷熱數(shù)據(jù)分離,自動(dòng)把冷數(shù)據(jù)下沉到外部的存儲(chǔ),以把本地內(nèi)存空閑出來。同時(shí)每次小批量處理event的時(shí)候都要記錄處理的offset,而不是要等到窗口關(guān)閉等待時(shí)候。
數(shù)據(jù)庫狀態(tài)和內(nèi)存狀態(tài)要變成一個(gè)緊密結(jié)合的整體??梢园褍烧叩年P(guān)系想象成操作系統(tǒng)的filesystem page cache。用mmap把狀態(tài)映射到內(nèi)存里,由框架負(fù)責(zé)什么時(shí)候把內(nèi)存里的變更持久化到外部存儲(chǔ)里。
總結(jié)基于storm做流式統(tǒng)計(jì)缺乏對(duì)以下四個(gè)基本問題的成熟解決方案。其trident框架可能可以提供一些答案,但是實(shí)踐中好像使用的人并不多,資料也太少了??梢员容^自信的說,不僅僅是storm,對(duì)于大多數(shù)流式計(jì)算平臺(tái)都是如此。
時(shí)間窗口切分的問題
多流合并的問題
數(shù)據(jù)一致性問題(重啟之后曲線斷開的問題)
內(nèi)存狀態(tài)管理問題
這些問題要好好解決,還是需要一番功夫的。新一代的流式計(jì)算框架比如spark streaming/flink應(yīng)該有很多改進(jìn)。即便底層框架提供了支持,從這四個(gè)角度去考察一下它們是如何支持的也是非常有裨益的事情。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/17474.html
摘要:為了適應(yīng)流式渲染技術(shù)對(duì)網(wǎng)絡(luò)高吞吐零緩沖的特點(diǎn),可能需要對(duì)現(xiàn)有網(wǎng)絡(luò)協(xié)議進(jìn)行改造主要針對(duì)。視頻基于的,視頻在客戶端的播放會(huì)相對(duì)較為容易。輸入信號(hào)各自隔離處理即可,瀏覽器端對(duì)常見的輸入信號(hào)幾乎都有支持。 本文首發(fā)于我的博客(點(diǎn)此查看),歡迎關(guān)注。 流式渲染技術(shù),不同于傳統(tǒng)意義上前端領(lǐng)域的服務(wù)端渲染(即 SSR),指的是云端性能強(qiáng)勁的機(jī)器進(jìn)行畫面渲染,將渲染完成的數(shù)據(jù)傳送至客戶端,客戶端只負(fù)責(zé)...
閱讀 884·2023-04-25 19:40
閱讀 3522·2023-04-25 17:41
閱讀 3035·2021-11-11 11:01
閱讀 2677·2019-08-30 15:55
閱讀 3244·2019-08-30 15:44
閱讀 1379·2019-08-29 14:07
閱讀 507·2019-08-29 11:23
閱讀 1344·2019-08-27 10:54