摘要:底層淺析簡介是官方提供的接口,同時也是中的一個程序。這里一提,對于大部分機器學(xué)習(xí)算法,你都會看到模塊與模塊都提供了接口,它們的區(qū)別在于模塊接受格式的數(shù)據(jù)而模塊接受格式的數(shù)據(jù)。
pyspark底層淺析 pyspark簡介
pyspark是Spark官方提供的API接口,同時pyspark也是Spark中的一個程序。
在terminal中輸入pyspark指令,可以打開python的shell,同時其中默認(rèn)初始化了SparkConf和SparkContext.
在編寫Spark應(yīng)用的.py文件時,可以通過import pyspark引入該模塊,并通過SparkConf對Spark的啟動參數(shù)進(jìn)行設(shè)置。不過,如果你僅完成了Spark的安裝,直接用python指令運行py文件并不能檢索到pyspark模塊。你可以通過pip等包管理工具安裝該模塊,也可以直接使用pyspark(新版本已不支持)或spark-submit直接提交.py文件的作業(yè)。
pyspark program這里指的是spark中的bin/pyspark,github地址 。
實際上pyspark只不過解析了命令行中的參數(shù),并進(jìn)行了python方面的設(shè)置,然后調(diào)用spark-submit
exec "${SPARK_HOME}"/bin/spark-submit pyspark-shell-main --name "PySparkShell" "$@"
在較新一些的版本如Spark2.2中,已經(jīng)不支持用pyspark運行py腳本文件,一切spark作業(yè)都應(yīng)該使用spark-submit提交。
pyspark moduleSpark是用scala編寫的框架,不過考慮到主要是機器學(xué)習(xí)的應(yīng)用場景,Spark官方提供了可以用python的API。但是,一方面,python的API是不全的,即不是所有的scala的函數(shù)都可以用pyspark調(diào)用到,雖然新的API也在隨著版本迭代不斷開放;另一方面,pyspark模塊,對于很多復(fù)雜算法,是通過反射機制調(diào)用的Spark中JVM里正在運行的scala編寫的類、方法。所以,如果你將頻繁應(yīng)用spark于業(yè)務(wù)或研究,建議學(xué)習(xí)直接使用scala語言編寫程序,而不是python。
這篇博客并不會講述如何去使用pyspark來編寫python的spark應(yīng)用。各類API以及模塊如何使用,你完全可以前往官方文檔查看。這里的鏈接是最新版pyspark的文檔,如果你的機器上的spark不是最新版,請去找對應(yīng)版本的pyspark文檔。因為正如我上面所說,不同版本的pyspark逐步開放了新的API并有對舊API進(jìn)行改進(jìn),你在最新版本看到的類、函數(shù),不一定能在舊版本使用。這里一提,對于大部分機器學(xué)習(xí)算法,你都會看到ml模塊與mllib模塊都提供了接口,它們的區(qū)別在于ml模塊接受DataFrame格式的數(shù)據(jù)而mllib模塊接受RDD格式的數(shù)據(jù)。
關(guān)于pyspark底層,這里主要探索兩個地方。一個是其初始化時的工作,一個是其對JVM中scala代碼的調(diào)用
SparkContextSparkContext類在pyspark/context.py中,在python代碼里通過初試化該類的實例來完成Spark的啟動與初始化。這個類的__init__方法中執(zhí)行了下面幾行代碼
self._callsite = first_spark_call() or CallSite(None, None, None) SparkContext._ensure_initialized(self, gateway=gateway) try: self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, jsc, profiler_cls) except: # If an error occurs, clean up in order to allow future SparkContext creation: self.stop() raise
first_spark_call和CallSite方法都是用來獲取JAVA虛擬機中的堆棧,它們在pyspark/traceback_util.py中。
之后調(diào)用了類函數(shù)_ensure_initialized函數(shù),對Spark的Java的gate_way和jvm進(jìn)行設(shè)置。
最后調(diào)用了類中的_do_init_函數(shù),從函數(shù)就可以看出是對內(nèi)部類成員SparkConf的實例_conf函數(shù)進(jìn)行設(shè)置,判斷各參數(shù)值是否為None,非空的話就進(jìn)行設(shè)置,并讀取一些本地的python環(huán)境參數(shù),啟動Spark。
以mllib庫為例,主要邏輯都在pyspark/mllib/common.py中。你去查看mllib模塊中機器學(xué)習(xí)算法的類與函數(shù),你會發(fā)現(xiàn)基本都是使用self.call或者callMLlibFunc,將函數(shù)名與參數(shù)傳入。
各類模型的Model類都繼承自common.JavaModelWrapper,這個類代碼很短:
class JavaModelWrapper(object): """ Wrapper for the model in JVM """ def __init__(self, java_model): self._sc = SparkContext._active_spark_context self._java_model = java_model def __del__(self): self._sc._gateway.detach(self._java_model) def call(self, name, *a): """Call method of java_model""" return callJavaFunc(self._sc, getattr(self._java_model, name), *a)
_java_model是來自Java或Scala的類的實例,在調(diào)用對應(yīng)的訓(xùn)練算法時由對應(yīng)的scala代碼在末尾將這些類初始化并返回,其關(guān)鍵的類方法call,同callMLLibFunc方法一樣,都是調(diào)用了callJavaFunc的方法。對于調(diào)用某一類的方法,是運用python的getattr函數(shù),將類實例與方法名傳入,使用反射機制獲取函數(shù);而對于調(diào)用一些不屬于類的方法,即使用callMLLibFunc時,是傳入的PythonMLLibAPI類的實例以及方法名,來獲取函數(shù):
def callMLlibFunc(name, *args): """ Call API in PythonMLLibAPI """ sc = SparkContext.getOrCreate() api = getattr(sc._jvm.PythonMLLibAPI(), name) return callJavaFunc(sc, api, *args)
最終callJavaFunc做的也很簡單,將python的參數(shù)*a,使用_py2java方法轉(zhuǎn)換為java的數(shù)據(jù)類型,并執(zhí)行函數(shù),再將結(jié)果使用_java2py方法轉(zhuǎn)換為python的數(shù)據(jù)類型返回:
def callJavaFunc(sc, func, *args): """ Call Java Function """ args = [_py2java(sc, a) for a in args] return _java2py(sc, func(*args))
這里的_java2py,對很多數(shù)據(jù)格式的支持不是很好,所以當(dāng)你嘗試用底層的call方法調(diào)用一些pyspark尚未支持但scala中已經(jīng)有的函數(shù)時,可能在scala部分可以執(zhí)行,但是python的返回結(jié)果卻不盡如人意。
ml模塊的調(diào)用機制與mllib的機制有些許的不同,但本質(zhì)上都還是去調(diào)用在Spark的JVM中scala代碼的class。
總結(jié)本篇博客其實說的非常簡單,pyspark即使是不涉及具體算法的部分,也還有很多內(nèi)容尚未討論。這里僅是對pyspark產(chǎn)生一個初步的認(rèn)識,同時簡單分析了一下底層對scala的調(diào)用過程。
你興許會有這樣的疑問--“去看這些源代碼有什么用呢?好像就算知道這些,實際使用時不還是用一下API就好了嗎?”。
實際上,看源代碼首先的就是滿足一下好奇心,對Spark有一個更充分的了解;其次關(guān)于具體用途,我舉個例子,很多情況你使用的集群可能不是最新版本的,因為復(fù)雜的配置導(dǎo)致一般而言也不可能有一個新版本就更新一次,這時你想用新版本的API怎么辦?看了這篇博客想必你也會有一些“大膽的想法”。后一篇博客會舉例說明我在實際工作中相關(guān)的一個問題,以及如何利用這些源碼去解決的。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/40956.html
摘要:大數(shù)據(jù)除了體積和速度外,數(shù)據(jù)的多樣性和準(zhǔn)確性也是大數(shù)據(jù)的一大特點。這些也被稱為大數(shù)據(jù)的特征。介紹是一個解決大數(shù)據(jù)問題的分布式可伸縮的框架。介紹計算的模型最早出現(xiàn)在谷歌的一篇研究論文中。相關(guān)鏈接介紹是一個通用的分布式編程框架。 本文作者:foochane?本文鏈接:https://foochane.cn/article/2019060601.html 1 大數(shù)據(jù)簡介 大數(shù)據(jù)是這個時代最...
摘要:篇分布計算提高效率的庫及庫函數(shù),比如的庫就有一大堆函數(shù),本質(zhì)上和的分布式計算的底層思想是一致的。篇特別適用于搭,比如的用于和在用的,其實根本上都是用了的腳本特性,串聯(lián)起來。的種常見操作增刪找值相當(dāng)于執(zhí)行了這個命令然后可以用函數(shù)來, 持續(xù)更新。--------------------C++篇------------------------ 分布計算提高效率的庫及庫函數(shù),比如FB的foll...
摘要:篇分布計算提高效率的庫及庫函數(shù),比如的庫就有一大堆函數(shù),本質(zhì)上和的分布式計算的底層思想是一致的。篇特別適用于搭,比如的用于和在用的,其實根本上都是用了的腳本特性,串聯(lián)起來。的種常見操作增刪找值相當(dāng)于執(zhí)行了這個命令然后可以用函數(shù)來, 持續(xù)更新。--------------------C++篇------------------------ 分布計算提高效率的庫及庫函數(shù),比如FB的foll...
摘要:由于使用的是天河二號,版本是,同樣,所以獲取主題時還不能使用在中才開放對的接口,只能使用的方法。本來做并行化就是希望效率更高,卻在調(diào)用代碼,同時進(jìn)行了很多數(shù)據(jù)轉(zhuǎn)換。 在pyspark中調(diào)用scala代碼 情境說明 問題 我們這邊是要使用Spark去并行一個自然語言處理的算法,其中使用到了LDA主題模型。由于使用的是天河二號,Spark版本是1.5.1,pyspark同樣,所以獲取主題時...
摘要:如何改變智能城市物聯(lián)網(wǎng)來源愿碼內(nèi)容編輯愿碼連接每個程序員的故事網(wǎng)站愿碼愿景打造全學(xué)科系統(tǒng)免費課程,助力小白用戶初級工程師成本免費系統(tǒng)學(xué)習(xí)低成本進(jìn)階,幫助一線資深工程師成長并利用自身優(yōu)勢創(chuàng)造睡后收入。 AI如何改變智能城市物聯(lián)網(wǎng)? showImg(https://segmentfault.com/img/remote/1460000018768732); 來源 | 愿碼(ChainDe...
閱讀 1364·2021-09-28 09:43
閱讀 4169·2021-09-04 16:41
閱讀 1932·2019-08-30 15:44
閱讀 3763·2019-08-30 15:43
閱讀 790·2019-08-30 14:21
閱讀 2050·2019-08-30 11:00
閱讀 3333·2019-08-29 16:20
閱讀 1934·2019-08-29 14:21