成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

airflow探索篇

leap_frog / 2188人閱讀

摘要:調(diào)度和監(jiān)控工作流的平臺(tái),用于用來(lái)創(chuàng)建監(jiān)控和調(diào)整。安裝以及方式啟動(dòng)重要說(shuō)明使用需要安裝配置說(shuō)明上篇在中配置的。負(fù)責(zé)調(diào)度,只支持單節(jié)點(diǎn),多節(jié)點(diǎn)啟動(dòng)可能會(huì)掛掉負(fù)責(zé)執(zhí)行具體中的。輪詢查詢狀態(tài)是成功失敗。如是則繼續(xù)輪詢,成功失敗操作相應(yīng)后續(xù)操作。

airflow是一個(gè) Airbnb 的 Workflow 開源項(xiàng)目,在Github 上已經(jīng)有超過(guò)兩千星。data pipeline調(diào)度和監(jiān)控工作流的平臺(tái),用于用來(lái)創(chuàng)建、監(jiān)控和調(diào)整data pipeline。類似的產(chǎn)品有:Azkaban、oozie 
pip方式安裝

默認(rèn)已經(jīng)安裝python >= 2.7 以及 pip
安裝可以參考這篇,比較詳細(xì)。airflow安裝以及celery方式啟動(dòng)

重要說(shuō)明 使用mysql需要安裝
python 2 : pip install MySQL-python
python 3 : pip install PyMySQL
AIRFLOW_HOME配置說(shuō)明

上篇在.bashrc中配置的export AIRFLOW_HOME=/home/airflow/airflow01。AIRFLOW_HOME設(shè)置目錄在airflow initdb的時(shí)候初始化,存放airflow的配置文件airflow.cfg及相關(guān)文件。

DAG說(shuō)明-管理建議

默認(rèn)$AIRFLOW_HOME/dags存放定義的dag,可以分目錄管理dag。常用管理dag做法,dag存放另一個(gè)目錄通過(guò)git管理,并設(shè)置軟連接映射到$AIRFLOW_HOME/dag。好處方便dag編輯變更,同時(shí)dag變更不會(huì)出現(xiàn)編輯到一半的時(shí)候就加載到airflow中。

plugins說(shuō)明-算子定義

默認(rèn)$AIRFLOW_HOME/plugins存放定義的plugins,自定義組件??梢宰远xoperator,hook等等。我們希望可以直接使用這種模式定義機(jī)器學(xué)習(xí)的一個(gè)算子。下面定義了一個(gè)簡(jiǎn)單的加法算子。

# -*- coding: UTF-8 -*-
# !/usr/bin/env python

from airflow.plugins_manager import AirflowPlugin
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

# Will show up under airflow.operators.plus_plugin.PluginOperator
class PlusOperator(BaseOperator):

    @apply_defaults
    def __init__(self, op_args=None, params=None, provide_context=False, set_context=False, *args, **kwargs):
        super(PlusOperator, self).__init__(*args, **kwargs)
        self.params = params or {}
        self.set_context = set_context

    def execute(self, context):
        if self.provide_context:
            context.update(self.op_kwargs)
            self.op_kwargs = context

        puls = self.op_kwargs["a"] + self.op_kwargs["b"]
        print "a =", self.op_kwargs["a"], ". b=", self.op_kwargs["a"]
        return_value = self.main()
        context[self.task_id].xcom_push(key="return_value", value=return_value)
        return puls


# Defining the plugin class
class PlusPlugin(AirflowPlugin):
    name = "plus_plugin"
    operators = [PlusOperator]

在dag中使用案例如下

from airflow.operators.plus_plugin import PlusOperator
plus_task = PlusOperator(task_id="plus_task", provide_context=True, params={"a": 1,"b":2},dag=dag)
一些命令說(shuō)明
命令 說(shuō)明
airflow webserver -p 8091 8091啟動(dòng)webserver,通過(guò)頁(yè)面查詢不需要可以不啟動(dòng)
airflow scheduler 調(diào)度器,必須啟動(dòng),不然dag沒法run起來(lái)(使用CeleryExecutor、LocalExecutor時(shí))
airflow run dagid [time] run task instance
airflow backfill [dagid] -s[startTime] -e [endTime] run a backfill over 2 days
run的demo
# run your first task instance
airflow run example_bash_operator runme_0 2018-01-11

# run a backfill over 2 days
airflow backfill example_bash_operator -s 2018-01-10 -e 2018-01-11
基于CeleryExecutor方式的系統(tǒng)架構(gòu)

使用celery方式的系統(tǒng)架構(gòu)圖(官方推薦使用這種方式,同時(shí)支持mesos方式部署)。turing為外部系統(tǒng),GDags服務(wù)幫助拼接成dag,可以忽略。

1.master節(jié)點(diǎn)webui管理dags、日志等信息。scheduler負(fù)責(zé)調(diào)度,只支持單節(jié)點(diǎn),多節(jié)點(diǎn)啟動(dòng)scheduler可能會(huì)掛掉

2.worker負(fù)責(zé)執(zhí)行具體dag中的task。這樣不同的task可以在不同的環(huán)境中執(zhí)行。

基于LocalExecutor方式的系統(tǒng)架構(gòu)圖

另一種啟動(dòng)方式的思考,一個(gè)dag分配到1臺(tái)機(jī)器上執(zhí)行。如果task不復(fù)雜同時(shí)task環(huán)境相同,可以采用這種方式,方便擴(kuò)容、管理,同時(shí)沒有master單點(diǎn)問(wèn)題。

基于源碼的啟動(dòng)以及二次開發(fā)

很多情況airflow是不滿足我們需求,就需要自己二次開發(fā),這時(shí)候就需要基于源碼方式啟動(dòng)。比如日志我們期望通過(guò)http的方式提供出來(lái),同其他系統(tǒng)查看。airflow自動(dòng)的webserver只提供頁(yè)面查詢的方式。

下載源碼

github源碼地址 : [https://github.com/apache/inc...]
git clone [email protected]:apache/incubator-airflow.git

切換分支

master分支的表初始化有坑,mysql設(shè)置的sql校驗(yàn)安全級(jí)別過(guò)高一直建表不成功。這個(gè)坑被整的有點(diǎn)慘。v1-8-stable或者v1-9-stable分支都可以。
git checkout v1-8-stable

安裝必要Python包

進(jìn)入incubator-airflow,python setup.py install (沒啥文檔說(shuō)明,又是一個(gè)坑。找了半天)

初始化

直接輸入airflow initdb(python setup.py install這個(gè)命令會(huì)將airflow安裝進(jìn)去)

修改配置

進(jìn)入$AIRFLOE_HOME (默認(rèn)在~/airflow),修改airflow.cfg,修改mysql配置。可以查看上面推薦的文章以及上面的[使用mysql需要安裝]

啟動(dòng)

airflow webserver -p 8085
airflow scheduler

獲取日志信息的改造

1.進(jìn)入incubator-airflow/airflow/www/
2.修改views.py
在 class Airflow(BaseView)中添加下面代碼

@expose("/logs")
    @login_required
    @wwwutils.action_logging
    def logs(self):
        BASE_LOG_FOLDER = os.path.expanduser(
            conf.get("core", "BASE_LOG_FOLDER"))
        dag_id = request.args.get("dag_id")
        task_id = request.args.get("task_id")
        execution_date = request.args.get("execution_date")
        dag = dagbag.get_dag(dag_id)
        log_relative = "{dag_id}/{task_id}/{execution_date}".format(
            **locals())
        loc = os.path.join(BASE_LOG_FOLDER, log_relative)
        loc = loc.format(**locals())
        log = ""
        TI = models.TaskInstance
        session = Session()
        dttm = dateutil.parser.parse(execution_date)
        ti = session.query(TI).filter(
            TI.dag_id == dag_id, TI.task_id == task_id,
            TI.execution_date == dttm).first()
        dttm = dateutil.parser.parse(execution_date)
        form = DateTimeForm(data={"execution_date": dttm})

        if ti:
            host = ti.hostname
            log_loaded = False

            if os.path.exists(loc):
                try:
                    f = open(loc)
                    log += "".join(f.readlines())
                    f.close()
                    log_loaded = True
                except:
                    log = "*** Failed to load local log file: {0}.
".format(loc)
            else:
                WORKER_LOG_SERVER_PORT = 
                    conf.get("celery", "WORKER_LOG_SERVER_PORT")
                url = os.path.join(
                    "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative
                ).format(**locals())
                log += "*** Log file isn"t local.
"
                log += "*** Fetching here: {url}
".format(**locals())
                try:
                    import requests
                    timeout = None  # No timeout
                    try:
                        timeout = conf.getint("webserver", "log_fetch_timeout_sec")
                    except (AirflowConfigException, ValueError):
                        pass

                    response = requests.get(url, timeout=timeout)
                    response.raise_for_status()
                    log += "
" + response.text
                    log_loaded = True
                except:
                    log += "*** Failed to fetch log file from worker.
".format(
                        **locals())

            if not log_loaded:
                # load remote logs
                remote_log_base = conf.get("core", "REMOTE_BASE_LOG_FOLDER")
                remote_log = os.path.join(remote_log_base, log_relative)
                log += "
*** Reading remote logs...
"

                # S3
                if remote_log.startswith("s3:/"):
                    log += log_utils.S3Log().read(remote_log, return_error=True)

                # GCS
                elif remote_log.startswith("gs:/"):
                    log += log_utils.GCSLog().read(remote_log, return_error=True)

                # unsupported
                elif remote_log:
                    log += "*** Unsupported remote log location."

            session.commit()
            session.close()

        if PY2 and not isinstance(log, unicode):
            log = log.decode("utf-8")

        title = "Log"

        return wwwutils.json_response(log)

3.重啟服務(wù),訪問(wèn)url如:

http://localhost:8085/admin/airflow/logs?task_id=run_after_loop&dag_id=example_bash_operator&execution_date=2018-01-11

就可以拿到這個(gè)任務(wù)在execution_date=2018-01-11的日志

異步任務(wù)思考

案例:task通過(guò)http請(qǐng)求大數(shù)據(jù)操作,拆分一些數(shù)據(jù),存入一些臨時(shí)表。
方案:
1.新建一張task實(shí)例的狀態(tài)表如:task_instance_state。
2.擴(kuò)展一個(gè)plugins,如:AsyncHttpOperator。AsyncHttpOperator實(shí)現(xiàn)邏輯:

在task_instance_state插入一條running狀態(tài)記錄running。

發(fā)送http請(qǐng)求給大數(shù)據(jù)平臺(tái),操作數(shù)據(jù)。

輪詢查詢task_instance_state狀態(tài)是成功、失敗、running。如是running則繼續(xù)輪詢,成功、失敗操作相應(yīng)后續(xù)操作。

3.提供一個(gè)restful api update task_instance_state,供大數(shù)據(jù)平臺(tái)回調(diào),修改任務(wù)實(shí)例狀態(tài)。

不錯(cuò)的文章推薦

瓜子云的任務(wù)調(diào)度系統(tǒng)
Get started developing workflows with Apache Airflow
官網(wǎng)地址
生產(chǎn)環(huán)境使用可能遇到的坑
初探airflow
焦油坑
系統(tǒng)研究Airbnb開源項(xiàng)目airflow

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/41221.html

相關(guān)文章

  • 數(shù)據(jù)科學(xué)部門如何使用Python和R組合完成任務(wù)

    摘要:數(shù)據(jù)科學(xué)項(xiàng)目的完整流程通常是這樣的五步驟需求定義數(shù)據(jù)獲取數(shù)據(jù)治理數(shù)據(jù)分析數(shù)據(jù)可視化一需求定義需求定義是數(shù)據(jù)科學(xué)項(xiàng)目和數(shù)據(jù)科學(xué)比賽的最大不同之處,在真實(shí)情景下,我們往往對(duì)目標(biāo)函數(shù)自變量約束條件都并不清晰。 概述 和那些數(shù)據(jù)科學(xué)比賽不同,在真實(shí)的數(shù)據(jù)科學(xué)中,我們可能更多的時(shí)間不是在做算法的開發(fā),而是對(duì)需求的定義和數(shù)據(jù)的治理。所以,如何更好的結(jié)合現(xiàn)實(shí)業(yè)務(wù),讓數(shù)據(jù)真正產(chǎn)生價(jià)值成了一個(gè)更有意義的...

    Apollo 評(píng)論0 收藏0
  • [原]數(shù)據(jù)科學(xué)教程:如何使用Airflow調(diào)度數(shù)據(jù)科學(xué)工作流

    摘要:概述是一個(gè)我們正在用的工作流調(diào)度器,相對(duì)于傳統(tǒng)的任務(wù)管理,很好的為我們理清了復(fù)雜的任務(wù)依賴關(guān)系監(jiān)控任務(wù)執(zhí)行的情況。步驟三修改默認(rèn)數(shù)據(jù)庫(kù)找到配置文件修改配置注意到,之前使用的的方式是行不通的。微信號(hào)商業(yè)使用請(qǐng)聯(lián)系作者。 showImg(https://segmentfault.com/img/remote/1460000006760428?w=1918&h=1556); 概述 Airfl...

    v1 評(píng)論0 收藏0
  • 一個(gè)適合小公司用的 data pipeline 工具

    摘要:有了自己的系統(tǒng)我覺得就很安心了,以后能夠做數(shù)據(jù)處理和機(jī)器學(xué)習(xí)方面就相對(duì)方便一些。隆重推薦的工具是我很喜歡的公司,他們有很多開源的工具,我覺得是最實(shí)用的代表。是,在很多機(jī)器學(xué)習(xí)里有應(yīng)用,也就是所謂的有向非循環(huán)。 最近在Prettyyes一直想建立起非常專業(yè)的data pipeline系統(tǒng),然后沒有很多時(shí)間,這幾個(gè)禮拜正好app上線,有時(shí)間開始建立自己的 data pipeline,能夠很...

    2i18ns 評(píng)論0 收藏0
  • [譯] 解密 Airbnb 的數(shù)據(jù)流編程神器:Airflow 中的技巧和陷阱

    摘要:顯然,這單獨(dú)執(zhí)行不起作用這將通過(guò)子操作符被作為像是自己的調(diào)度任務(wù)中那樣運(yùn)行。子也必須有個(gè)可用調(diào)度即使子作為其父的一部分被觸發(fā)子也必須有一個(gè)調(diào)度如果他們的調(diào)度是設(shè)成,這個(gè)子操作符將不會(huì)觸發(fā)任何任務(wù)。這兩個(gè)例子都是緣起子操作符被當(dāng)做了回填工作。 showImg(https://segmentfault.com/img/remote/1460000006768714); 前言 Airbnb的...

    zsy888 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<