成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

TensorFlow學(xué)習(xí)筆記(9):分布式TensorFlow

PumpkinDylan / 597人閱讀

摘要:本文基于官方教程,實踐了分布式搭建的過程。一般將任務(wù)分為兩類一類叫參數(shù)服務(wù)器,,簡稱為,用于存儲一類就是普通任務(wù),稱為,用于執(zhí)行具體的計算。參數(shù)服務(wù)器是一套分布式存儲,用于保存參數(shù),并提供參數(shù)更新的操作。

簡介

TensorFlow支持使用多臺機器的設(shè)備進(jìn)行計算。本文基于官方教程,實踐了分布式TensorFlow搭建的過程。

TensorFlow入門教程

基本概念 TensorFlow集群

A TensorFlow "cluster" is a set of "tasks" that participate in the distributed execution of a TensorFlow graph. Each task is associated with a TensorFlow "server", which contains a "master" that can be used to create sessions, and a "worker" that executes operations in the graph.

從上面的定義可以看出,所謂的TensorFlow集群就是一組任務(wù),每個任務(wù)就是一個服務(wù)。服務(wù)由兩個部分組成,第一部分是master,用于創(chuàng)建session,第二部分是worker,用于執(zhí)行具體的計算。

TensorFlow一般將任務(wù)分為兩類job:一類叫參數(shù)服務(wù)器,parameter server,簡稱為ps,用于存儲tf.Variable;一類就是普通任務(wù),稱為worker,用于執(zhí)行具體的計算。

首先來理解一下參數(shù)服務(wù)器的概念。一般而言,機器學(xué)習(xí)的參數(shù)訓(xùn)練過程可以劃分為兩個類別:第一個是根據(jù)參數(shù)算算梯度,第二個是根據(jù)梯度更新參數(shù)。對于小規(guī)模訓(xùn)練,數(shù)據(jù)量不大,參數(shù)數(shù)量不多,一個CPU就足夠了,兩類任務(wù)都交給一個CPU來做。對于普通的中等規(guī)模的訓(xùn)練,數(shù)據(jù)量比較大,參數(shù)數(shù)量不多,計算梯度的任務(wù)負(fù)荷較重,參數(shù)更新的任務(wù)負(fù)荷較輕,所以將第一類任務(wù)交給若干個CPU或GPU去做,第二類任務(wù)交給一個CPU即可。對于超大規(guī)模的訓(xùn)練,數(shù)據(jù)量大、參數(shù)多,不僅計算梯度的任務(wù)要部署到多個CPU或GPU上,而且更新參數(shù)的任務(wù)也要部署到多個CPU。如果計算量足夠大,一臺機器能搭載的CPU和GPU數(shù)量有限,就需要多臺機器來進(jìn)行計算能力的擴(kuò)展了。參數(shù)服務(wù)器是一套分布式存儲,用于保存參數(shù),并提供參數(shù)更新的操作。

我們來看一下怎么創(chuàng)建一個TensorFlow集群。每個任務(wù)用一個ip:port表示。TensorFlow用tf.train.ClusterSpec表示一個集群信息,舉例如下:

import tensorflow as tf

# Configuration of cluster 
ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

上面的語句提供了一個TensorFlow集群信息,集群有兩類任務(wù),稱為job,一個job是ps,一個job是worker;ps由2個任務(wù)組成,worker由3個任務(wù)組成。

定義完集群信息后,使用tf.train.Server創(chuàng)建每個任務(wù):

tf.app.flags.DEFINE_string("job_name", "worker", "One of "ps", "worker"")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")

FLAGS = tf.app.flags.FLAGS

def main(_):
    server = tf.train.Server(cluster,
                             job_name=FLAGS.job_name,
                             task_index=FLAGS.task_index)
    server.join()

if __name__ == "__main__":
    tf.app.run()

對于本例而言,我們需要在ip:port對應(yīng)的機器上運行每個任務(wù),共需執(zhí)行五次代碼,生成五個任務(wù)。

python worker.py --job_name=ps --task_index=0
python worker.py --job_name=ps --task_index=1
python worker.py --job_name=worker --task_index=0
python worker.py --job_name=worker --task_index=1
python worker.py --job_name=worker --task_index=2

我們找到集群的某一臺機器,執(zhí)行下面的代碼:

# -*- coding=utf-8 -*-

import tensorflow as tf
import numpy as np

train_X = np.random.rand(100).astype(np.float32)
train_Y = train_X * 0.1 + 0.3

# 選擇變量存儲位置和op執(zhí)行位置,這里全部放在worker的第一個task上
with tf.device("/job:worker/task:0"):
    X = tf.placeholder(tf.float32)
    Y = tf.placeholder(tf.float32)
    w = tf.Variable(0.0, name="weight")
    b = tf.Variable(0.0, name="reminder")
    y = w * X + b
    loss = tf.reduce_mean(tf.square(y - Y))

    init_op = tf.global_variables_initializer()
    train_op = tf.train.GradientDescentOptimizer(0.01).minimize(loss)

# 選擇創(chuàng)建session使用的master
with tf.Session("grpc://xx.xxx.xx.xxxx:oooo") as sess:
    sess.run(init_op)
    for i in range(500):
        sess.run(train_op, feed_dict={X: train_Y, Y: train_Y})
        if i % 50 == 0:
            print i, sess.run(w), sess.run(b)

    print sess.run(w)
    print sess.run(b)

執(zhí)行結(jié)果如下:

0 0.00245265 0.00697793
50 0.0752466 0.213145
100 0.0991397 0.279267
150 0.107308 0.30036
200 0.110421 0.306972
250 0.111907 0.308929
300 0.112869 0.309389
350 0.113663 0.309368
400 0.114402 0.309192
450 0.115123 0.308967
0.115824
0.30873

其實ps和worker本質(zhì)上是一個東西,就是名字不同,我們將上例中的with tf.device("/job:worker/task:0"):改為with tf.device("/job:psr/task:0"):,一樣能夠執(zhí)行。之所以在創(chuàng)建集群時要分為兩個類別的任務(wù),是因為TensorFlow提供了一些工具函數(shù),會根據(jù)名字不同賦予task不同的任務(wù),ps的用于存儲變量,worker的用于計算。

同步與異步更新

同步更新:將數(shù)據(jù)拆分成多份,每份基于參數(shù)計算出各自部分的梯度;當(dāng)每一份的部分梯度計算完成后,收集到一起算出總梯度,再用總梯度去更新參數(shù)。

異步更新:同步更新模式下,每次都要等各個部分的梯度計算完后才能進(jìn)行參數(shù)更新操作,處理速度取決于計算梯度最慢的那個部分,其他部分存在大量的等待時間浪費;異步更新模式下,所有的部分只需要算自己的梯度,根據(jù)自己的梯度更新參數(shù),不同部分之間不存在通信和等待。

分布式訓(xùn)練案例
import tensorflow as tf
import numpy as np

# Configuration of cluster 
ps_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
worker_hosts = [ "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo", "xx.xxx.xx.xxxx:oooo" ]
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS

def main(_):
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):
        
        x_data = tf.placeholder(tf.float32, [100])
        y_data = tf.placeholder(tf.float32, [100])

        W = tf.Variable(tf.random_uniform([1], -1.0, 1.0))
        b = tf.Variable(tf.zeros([1]))
        y = W * x_data + b
        loss = tf.reduce_mean(tf.square(y - y_data))
        
        global_step = tf.Variable(0, name="global_step", trainable=False)
        optimizer = tf.train.GradientDescentOptimizer(0.1)
        train_op = optimizer.minimize(loss, global_step=global_step)
        
        tf.summary.scalar("cost", loss)
        summary_op = tf.summary.merge_all()
        init_op = tf.global_variables_initializer()
    # The StopAtStepHook handles stopping after running given steps.
    hooks = [ tf.train.StopAtStepHook(last_step=1000000)]
    # The MonitoredTrainingSession takes care of session initialization,
    # restoring from a checkpoint, saving to a checkpoint, and closing when done
    # or an error occurs.
    with tf.train.MonitoredTrainingSession(master="grpc://" + worker_hosts[FLAGS.task_index],
                                           is_chief=(FLAGS.task_index==0), # 我們制定task_index為0的任務(wù)為主任務(wù),用于負(fù)責(zé)變量初始化、做checkpoint、保存summary和復(fù)原
                                           checkpoint_dir="/tmp/tf_train_logs",
                                           save_checkpoint_secs=None,
                                           hooks=hooks) as mon_sess:
        while not mon_sess.should_stop():
            # Run a training step asynchronously.
            # See `tf.train.SyncReplicasOptimizer` for additional details on how to
            # perform *synchronous* training.
            # mon_sess.run handles AbortedError in case of preempted PS.
            train_x = np.random.rand(100).astype(np.float32)
            train_y = train_x * 0.1 + 0.3
            _, step, loss_v, weight, biase = mon_sess.run([train_op, global_step, loss, W, b], feed_dict={x_data: train_x, y_data: train_y})
            if step % 100 == 0:
                print "step: %d, weight: %f, biase: %f, loss: %f" %(step, weight, biase, loss_v)
        print "Optimization finished."

if __name__ == "__main__":
    tf.app.run()

代碼中,tf.train.replica_device_setter()會根據(jù)job名,將with內(nèi)的Variable op放到ps tasks,將其他計算op放到worker tasks。默認(rèn)分配策略是輪詢。

在屬于集群的一臺機器中執(zhí)行上面的代碼,屏幕會開始輸出每輪迭代的訓(xùn)練參數(shù)和損失

python train.py --task_index=0

在另一臺機器上執(zhí)行下面你的代碼,再啟動一個任務(wù),會看到屏幕開始輸出每輪迭代的訓(xùn)練參數(shù)和損失,注意,step不再是從0開始,而是在啟動時刻上一個啟動任務(wù)的step后繼續(xù)。此時觀察兩個任務(wù),會發(fā)現(xiàn)他們同時在對同一參數(shù)進(jìn)行更新。

python train.py --task_index=2
思考

分布式TensorFlow與Spark對比:

分布式的級別不同:TensorFlow的Tensor、Variable和Op不是分布式的,分布式執(zhí)行的是subgraph. Spark的op和變量都是構(gòu)建在RDD上,RDD本身是分布式的。

異步訓(xùn)練:TensorFlow支持同步和異步的分布式訓(xùn)練;Spark原生的API只支持同步訓(xùn)練

分布式存儲:Spark在底層封裝好了worker和分布式數(shù)據(jù)之間的關(guān)系;TensorFlow需要自行維護(hù)。

Parameter Server:TensorFlow支持,Spark暫不支持。

TF分布式部署起來還是比較繁瑣的,需要定義好每個任務(wù)的ip:port,手工啟動每個task,不提供一個界面可以對集群進(jìn)行維護(hù)。

參考資料

白話tensorflow分布式部署和開發(fā)

理解和實現(xiàn)分布式TensorFlow集群完整教程

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/38424.html

相關(guān)文章

  • ApacheCN 人工智能知識樹 v1.0

    摘要:貢獻(xiàn)者飛龍版本最近總是有人問我,把這些資料看完一遍要用多長時間,如果你一本書一本書看的話,的確要用很長時間。為了方便大家,我就把每本書的章節(jié)拆開,再按照知識點合并,手動整理了這個知識樹。 Special Sponsors showImg(https://segmentfault.com/img/remote/1460000018907426?w=1760&h=200); 貢獻(xiàn)者:飛龍版...

    劉厚水 評論0 收藏0
  • TensorFlow學(xué)習(xí)筆記(11):數(shù)據(jù)操作指南

    摘要:本文的目的是聚焦于數(shù)據(jù)操作能力,講述中比較重要的一些,幫助大家實現(xiàn)各自的業(yè)務(wù)邏輯。傳入輸入值,指定輸出的基本數(shù)據(jù)類型。 引言 用TensorFlow做好一個機器學(xué)習(xí)項目,需要具備多種代碼能力: 工程開發(fā)能力:怎么讀取數(shù)據(jù)、怎么設(shè)計與運行Computation Graph、怎么保存與恢復(fù)變量、怎么保存統(tǒng)計結(jié)果、怎么共享變量、怎么分布式部署 數(shù)據(jù)操作能力:怎么將原始數(shù)據(jù)一步步轉(zhuǎn)化為模型需...

    jsbintask 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<