摘要:中使用首先,我們需要寫一個(gè)文件,假設(shè)我們就是把某一列按格式放的一列轉(zhuǎn)成格式。這里我們指定了函數(shù)名叫,主類使我們上傳的文件里的類?,F(xiàn)在我們就可以在中調(diào)用這個(gè)了。這樣我們就完成了在中使用的整個(gè)過程。
摘要: PyODPS 中使用 Python UDF 包含兩方面,一個(gè)是直接使用,也就是在 MaxCompute SQL 中使用;一個(gè)是間接的方式,也就是 PyODPS DataFrame,這種方式你不需要直接寫 Python UDF,而是寫普通的 Python 函數(shù)或者類。
點(diǎn)此查看原文:http://click.aliyun.com/m/41092/
PyODPS 中使用 Python UDF 包含兩方面,一個(gè)是直接使用,也就是在 MaxCompute SQL 中使用;一個(gè)是間接的方式,也就是 PyODPS DataFrame,這種方式你不需要直接寫 Python UDF,而是寫普通的 Python 函數(shù)或者類。下面我們分開說明。
作為準(zhǔn)備工作,我們需要 ODPS 入口,可以通過直接初始化,或者使用 room 機(jī)制 加載。
from odps import ODPS o = ODPS("your-access-id", "your-access-key", "your-project")
MaxCompute SQL 中使用 Python UDF
首先,我們需要寫一個(gè) Python 文件,假設(shè)我們就是把某一列按 csv 格式放的一列轉(zhuǎn)成 json 格式。
import json from odps.udf import annotate @annotate("string->string") class Transform(object): def evaluate(self, x): columns = list("abc") d = dict(zip(columns, x.split(","))) return json.dumps(d)
假設(shè)這個(gè)文件叫 my.py,接下來我們就需要創(chuàng)建 py 資源。
r = o.create_resource("csv_to_json.py", "py", fileobj=open("my.py"))
fileobj 參數(shù)也可以是 str 類型,就是表示文件的內(nèi)容
接著我們就可以創(chuàng)建 Python UDF 了。
o.create_function("csv_to_json", class_type="csv_to_json.Transform", resources=[r])
這里我們指定了函數(shù)名叫 csv_to_json,主類使我們上傳的 csv_to_json.py 文件里的 Transform 類。
現(xiàn)在我們就可以在 MaxCompute SQL 中調(diào)用這個(gè) UDF 了。
o.execute_sql("select csv_to_json(raw) from pyodps_test_udf")
這樣我們就完成了在 PyODPS 中使用 MaxCompute SQL + Python UDF 的整個(gè)過程。
PyODPS DataFrame
對于 PyODPS DataFrame 來說,用戶只需要寫普通的 Python 函數(shù)或者類,在函數(shù)或者類里,甚至可以讀取全局變量,這樣給開發(fā)帶來了極大的方便。
和上面的例子目標(biāo)相同,我們定義一個(gè) transform 函數(shù)即可。然后我們對于 DataFrame 的一列調(diào)用 map 方法來應(yīng)用這個(gè)函數(shù)。
passed_columns = list("abc") # 可以從數(shù)據(jù)庫中讀取或者寫死 def transform(x): import json d = dict(zip(passed_columns, x.split(","))) return json.dumps(d) df.raw.map(transform)
In [30]: df raw 0 1,2,3 1 4,5,6 2 7,8,9 In [31]: df.raw.map(transform) raw 0 {"a": "1", "c": "3", "b": "2"} 1 {"a": "4", "c": "6", "b": "5"} 2 {"a": "7", "c": "9", "b": "8"}
實(shí)際上,PyODPS DataFrame 在用 MaxCompute 執(zhí)行的時(shí)候,也會創(chuàng)建 Python UDF 來實(shí)現(xiàn)這個(gè)功能,但用戶不需要去創(chuàng)建文件、資源和函數(shù)這些過程,一切都是 Python 原生函數(shù)和類,整個(gè)過程相當(dāng)順暢。
另外可以看到,在上面的 my.py 里,我們也是定義了一個(gè) columns 參數(shù)的,而如果這個(gè)參數(shù)是通過變量傳進(jìn)去的話,在 Python UDF 里非常麻煩,可能常常需要用一些 tricky 的方法,比如寫到某個(gè)文件資源,然后在 UDF 里讀取之類的。而對于 DataFrame 來說,完全沒有這個(gè)問題,我們可以自由讀取全局變量。
不過要注意的是,這個(gè)全局變量是被序列化到各個(gè)機(jī)器上的,所以你修改它不會全局生效。
好了,還有什么問題可以隨時(shí)和我們?nèi)〉寐?lián)系。
文檔:http://pyodps.readthedocs.io/...
代碼:https://github.com/aliyun/ali... ,歡迎提 issue 和 merge request
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/41326.html
摘要:中使用首先,我們需要寫一個(gè)文件,假設(shè)我們就是把某一列按格式放的一列轉(zhuǎn)成格式。這里我們指定了函數(shù)名叫,主類使我們上傳的文件里的類。現(xiàn)在我們就可以在中調(diào)用這個(gè)了。這樣我們就完成了在中使用的整個(gè)過程。 摘要: PyODPS 中使用 Python UDF 包含兩方面,一個(gè)是直接使用,也就是在 MaxCompute SQL 中使用;一個(gè)是間接的方式,也就是 PyODPS DataFrame,這種...
摘要:摘要北京云棲大會上阿里云發(fā)布了最新的功能,萬眾期待的功能終于支持啦,我怎么能不一試為快,今天就分享如何通過進(jìn)行開發(fā)。注冊函數(shù)在腳本中編輯試用好了,一個(gè)簡單完整的通過開發(fā)實(shí)踐分享完成。 摘要: 2017/12/20 北京云棲大會上阿里云MaxCompute發(fā)布了最新的功能Python UDF,萬眾期待的功能終于支持啦,我怎么能不一試為快,今天就分享如何通過Studio進(jìn)行Python u...
摘要:編寫完成后,將代碼保存為,并在中執(zhí)行此后創(chuàng)建函數(shù)。執(zhí)行創(chuàng)建后,便可以在中執(zhí)行查詢暫不支持,因而需禁用其他如果包依賴了其他包,需要一并上傳并同時(shí)加入到依賴中。 摘要: 新版 MaxCompute Isolation Session 支持 Python UDF。也就是說,Python UDF 中已經(jīng)可以跑二進(jìn)制包。剛才以 Scipy 為例踩了一下坑,把相關(guān)的過程分享出來。 新版 MaxCo...
摘要:摘要支持用來對對象進(jìn)行操作,它提供了來用類似的接口進(jìn)行大規(guī)模數(shù)據(jù)分析以及預(yù)處理,并且可以用模塊來執(zhí)行機(jī)器學(xué)習(xí)算法。現(xiàn)在為了讓大家能更好地使用,我們總結(jié)開發(fā)過程中的最佳實(shí)踐,來讓大家更高效地開發(fā)程序。 摘要: PyODPS支持用 Python 來對 MaxCompute 對象進(jìn)行操作,它提供了 DataFrame API 來用類似 pandas 的接口進(jìn)行大規(guī)模數(shù)據(jù)分析以及預(yù)處理,并且可...
閱讀 465·2023-04-25 23:00
閱讀 3496·2021-11-22 13:54
閱讀 1899·2021-10-27 14:14
閱讀 1487·2019-08-30 13:59
閱讀 3512·2019-08-23 16:15
閱讀 1959·2019-08-23 16:06
閱讀 3328·2019-08-23 15:26
閱讀 1258·2019-08-23 13:48