成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

[譯] Introducing Complex Event Processing (CEP) wit

Yu_Huang / 3694人閱讀

摘要:所有不相關(guān)的數(shù)據(jù)會(huì)立即丟棄,由于查詢都是在一個(gè)無(wú)限的數(shù)據(jù)流中,這樣的優(yōu)勢(shì)顯而易見。基于這些監(jiān)控事件數(shù)據(jù)流,我們想要檢測(cè)出可能要過(guò)熱的機(jī)架,從而調(diào)整負(fù)載和降溫。

原文鏈接

正文

隨著傳感網(wǎng)絡(luò)的普及,智能設(shè)備持續(xù)收集著越來(lái)越多的數(shù)據(jù),分析近乎實(shí)時(shí),不斷增長(zhǎng)的數(shù)據(jù)流是一個(gè)巨大的挑戰(zhàn)。快速應(yīng)對(duì)變化趨勢(shì)、交付最新的 BI 應(yīng)用會(huì)成為一個(gè)公司成敗的關(guān)鍵因素。其中關(guān)鍵問(wèn)題就是數(shù)據(jù)流的事件模型檢測(cè)。

Complex event processing (CEP) 要處理的就是在持續(xù)事件中匹配模式的問(wèn)題。匹配結(jié)果通常就是:從輸入事件中提取的復(fù)雜事件。傳統(tǒng) DBMSs 在固定數(shù)據(jù)上執(zhí)行查詢,而 CEP 在存儲(chǔ)的 query 上執(zhí)行(譯者注:某個(gè)范圍)。所有不相關(guān)的數(shù)據(jù)會(huì)立即丟棄,由于 CEP 查詢都是在一個(gè)無(wú)限的數(shù)據(jù)流中,這樣的優(yōu)勢(shì)顯而易見。更重要的是,輸入實(shí)時(shí)被處理,系統(tǒng)一旦收到某一個(gè)序列的所有數(shù)據(jù),結(jié)果就會(huì)被輸出。CEP 因此有著非常高效的實(shí)時(shí)分析能力。

由此,CEP 的處理范式吸引了很多技術(shù)人員興趣,有著廣泛的應(yīng)用場(chǎng)景。值得注意的是,CEP 現(xiàn)在用在了金融應(yīng)用,例如:股票市場(chǎng)趨勢(shì)、信用卡欺詐檢測(cè)。還有基于 RFID 的追蹤和監(jiān)控,例如:庫(kù)房小偷檢測(cè)。CEP 還可以被用于基于用戶可疑行為的網(wǎng)絡(luò)入侵檢測(cè)。

Apache Flink 有著天生的真正的流處理能力,具有低延遲、高吞吐量的特性,和 CEP 簡(jiǎn)直絕配。因此,F(xiàn)link 社區(qū)在 Flink 1.0 引入了第一個(gè)版本的 CEP library。接下來(lái)我們會(huì)使用一個(gè)數(shù)據(jù)中心監(jiān)控的案例介紹其使用。

假設(shè)這樣一個(gè)場(chǎng)景:數(shù)據(jù)中心有很多機(jī)架,每一個(gè)機(jī)架都有功率和溫度監(jiān)控。監(jiān)控設(shè)備會(huì)不斷產(chǎn)生功率和溫度事件?;谶@些監(jiān)控事件數(shù)據(jù)流,我們想要檢測(cè)出可能要過(guò)熱的機(jī)架,從而調(diào)整負(fù)載和降溫。

針對(duì)這種場(chǎng)景,我們采取兩階段處理方法。首先,監(jiān)控溫度事件,當(dāng)檢測(cè)到連續(xù)兩個(gè)超過(guò)閾值的溫度事件,即生成一個(gè)當(dāng)前平均溫度的警告(warning),溫度報(bào)警不一定意味著過(guò)熱。但是如果看到兩個(gè)連續(xù)的升溫警告事件,則生成機(jī)架過(guò)熱報(bào)警(alert)。此時(shí),需要采取措施冷卻機(jī)架。

首先,定義來(lái)源的監(jiān)控事件流,每一個(gè) message 都包含來(lái)源 rack ID(機(jī)架 ID)。溫度事件包含當(dāng)前溫度,功率事件包含當(dāng)前電壓。我們把事件模型定義為 POJOs.

    public abstract class MonitoringEvent {
        private int rackID;
        ...
    }
    
    public class TemperatureEvent extends MonitoringEvent {
        private double temperature;
        ...
    }
    
    public class PowerEvent extends MonitoringEvent {
        private double voltage;
        ...
    }

我們可以使用 Flink 的 connector(比如:Kafka, RabbitMQ 等),生成 DataStream inputEventStream 給 Flink 的 CEP 算子提供輸入。首先,我們需要定義檢測(cè)溫度警告的事件模式 (pattern),CEP library 提供了非常直觀的 Pattern API 來(lái)定義復(fù)雜的模式。

每個(gè)模式都包含了一個(gè)可以定義過(guò)濾 (filter) 條件的事件序列。模式 (pattern) 的第一個(gè)事件通常都命名為"First Event"。

    Pattern.begin("First Event");

這句話會(huì)匹配每一個(gè)輸入的監(jiān)控事件(monitoring event),而我們只需要溫度大于一定閾值的溫度事件(TemperatureEvents),所以我們需要添加 subtype 和 where 語(yǔ)句限制。

    Pattern.begin("First Event")
        .subtype(TemperatureEvent.class)
        .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD);

之前說(shuō):對(duì)于同一個(gè)機(jī)架,當(dāng)看到兩個(gè)連續(xù)的高溫事件(超過(guò)閾值)就產(chǎn)生一個(gè)溫度報(bào)警(TemperatureWarning),Pattern API 提供了 next 調(diào)用方法,來(lái)添加事件到模式定義中。next 添加的事件發(fā)生時(shí)間必須緊跟著第一個(gè)匹配事件之后,才能觸發(fā)整個(gè)模式的匹配。

Pattern warningPattern = Pattern.begin("First Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .next("Second Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .within(Time.seconds(10));

最后模式的定義包含有一個(gè) within 的 API 調(diào)用,用來(lái)定義兩個(gè)連續(xù) TemperatureEvents 必須在 10s 內(nèi)發(fā)生才能匹配。時(shí)間基于 time characteristic 設(shè)置,可以是:處理時(shí)間、輸入時(shí)間或者事件時(shí)間。(譯者注 Event Time / Processing Time / Ingestion Time 解釋)

定義好事件模型之后,可以將其應(yīng)用到輸入數(shù)據(jù)流中。

    PatternStream tempPatternStream = CEP.pattern(
        inputEventStream.keyBy("rackID"),
        warningPattern);

由于告警是針對(duì)單個(gè)機(jī)架的告警,必須使用 keyBy 通過(guò) rackID 字段對(duì)輸入事件流分流。即匹配出的事件都是同一個(gè)機(jī)架的。

PatternStream 可以訪問(wèn)匹配的事件序列。通過(guò)使用 select API 可以訪問(wèn)其上數(shù)據(jù),給 select API 傳入 PatternSelectFunction,PatternSelectFunction 會(huì)在每一個(gè)匹配上的事件序列上執(zhí)行。事件序列通過(guò) Map 訪問(wèn),MonitoringEvent 通過(guò)之前分配的事件名稱來(lái)定位。這里我們通過(guò) select function 針對(duì)每一個(gè)匹配的模式產(chǎn)生一個(gè) TemperatureWarning 事件。

    public class TemperatureWarning {
        private int rackID;
        private double averageTemperature;
        ...
    }
    
    DataStream warnings = tempPatternStream.select(
        (Map pattern) -> {
            TemperatureEvent first = (TemperatureEvent) pattern.get("First Event");
            TemperatureEvent second = (TemperatureEvent) pattern.get("Second Event");
    
            return new TemperatureWarning(
                first.getRackID(), 
                (first.getTemperature() + second.getTemperature()) / 2);
        }
    );

現(xiàn)在我們從原始監(jiān)控事件流(monitoring event stream)生成了一個(gè)復(fù)雜事件流 DataStream 警告。這個(gè)復(fù)雜事件流可以再次被用作其他復(fù)雜事件處理的輸入。當(dāng)同一個(gè)機(jī)架產(chǎn)生兩個(gè)連續(xù)升溫警告時(shí),我們使用 TemperatureWarnings 來(lái)生成 TemperatureAlerts。TemperatureAlerts 定義如下:

    public class TemperatureAlert {
        private int rackID;
        ...
    }

首先定義報(bào)警事件

    Pattern alertPattern = Pattern.begin("First Event")
        .next("Second Event")
        .within(Time.seconds(20));

定義描述了在 20s 內(nèi)有兩個(gè) TemperatureWarnings 事件,并且第一個(gè)事件名稱為 "First Event",緊接著的第二個(gè)為 “Second Event”。這來(lái)了個(gè)事件都沒(méi)有 where 語(yǔ)句,因?yàn)樾枰L問(wèn)兩個(gè)事件才能判斷溫度時(shí)候增長(zhǎng)。因此,下面我們需要在 select 語(yǔ)句中使用 filter 條件來(lái)提取。這里我們只是生成了 PatternStream。

    PatternStream alertPatternStream = CEP.pattern(
        warnings.keyBy("rackID"),
        alertPattern);

同樣,我們需要 keyBy 對(duì)輸入的告警數(shù)據(jù)流針對(duì)同一個(gè)機(jī)架進(jìn)行分流。然后使用 flatSelect 方法訪問(wèn)匹配的事件序列,當(dāng)判斷溫度上升時(shí)生成 TemperatureAlert 告警。

    DataStream alerts = alertPatternStream.flatSelect(
        (Map pattern, Collector out) -> {
            TemperatureWarning first = pattern.get("First Event");
            TemperatureWarning second = pattern.get("Second Event");
    
            if (first.getAverageTemperature() < second.getAverageTemperature()) {
                out.collect(new TemperatureAlert(first.getRackID()));
            }
        });

DataStream 告警是針對(duì)同一個(gè)機(jī)架的數(shù)據(jù)流,基于這個(gè)數(shù)據(jù)我們現(xiàn)在可以調(diào)整負(fù)載和降溫。源代碼地址(譯者注:注意閱讀 readme)

總結(jié):

本文描述了使用 Flink CEP library 可以很容易處理事件流。我們通過(guò)數(shù)據(jù)中心的監(jiān)控和報(bào)警案例,完成了服務(wù)器機(jī)架過(guò)熱報(bào)警的小程序。
未來(lái) Flink 社區(qū)會(huì)持續(xù)擴(kuò)展 CEP library 的功能和表述能力。接下來(lái)的 road map 是支持類正則表達(dá)式的模式實(shí)現(xiàn),包括 *, 上下限制和否定。此外,還計(jì)劃允許 where 語(yǔ)句訪問(wèn)之前匹配的事件字段。這個(gè)特性可以讓我們提前刪除不需要的事件序列。

閱讀材料:

本內(nèi)容為譯者添加

官網(wǎng):Apache Flink

概念:Event Time / Processing Time / Ingestion Time

案例:Apache Flink example CEP program to monitor data center temperatures

API 介紹:FlinkCEP - Complex event processing for Flink

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/35812.html

相關(guān)文章

  • Flux再進(jìn)化:Introducing Relay and GraphQL

    摘要:它的設(shè)計(jì)使得即使是大型團(tuán)隊(duì)也能以高度隔離的方式應(yīng)對(duì)功能變更。獲取數(shù)據(jù)數(shù)據(jù)變更性能,都是讓人頭痛的問(wèn)題。通過(guò)維護(hù)組件與數(shù)據(jù)間的依賴在依賴的數(shù)據(jù)就緒前組件不會(huì)被渲染為開發(fā)者提供更加可預(yù)測(cè)的開發(fā)環(huán)境。這杜絕了隱式的數(shù)據(jù)依賴導(dǎo)致的潛在。 關(guān)于Relay與GraphQL的介紹 原文:Introducing Relay and GraphQL 視頻地址(強(qiáng)烈建議觀看):https://www.y...

    cncoder 評(píng)論0 收藏0
  • 時(shí)序列數(shù)據(jù)庫(kù)武斗大會(huì)之TSDB名錄 Part 2

    摘要:在前面時(shí)序列數(shù)據(jù)庫(kù)武斗大會(huì)之名錄我們已經(jīng)介紹了一些常見的,這里我們?cè)賹?duì)剩余的一些做些簡(jiǎn)單介紹。是一個(gè)多租戶的時(shí)間序列和資源數(shù)據(jù)庫(kù)。是基于的時(shí)序列數(shù)據(jù)庫(kù)。 【編者按】劉斌,OneAPM后端研發(fā)工程師,擁有10多年編程經(jīng)驗(yàn),參與過(guò)大型金融、通信以及Android手機(jī)操作系的開發(fā),熟悉Linux及后臺(tái)開發(fā)技術(shù)。曾參與翻譯過(guò)《第一本Docker書》、《GitHub入門與實(shí)踐》、《Web應(yīng)用安全...

    luodongseu 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<