成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

7-并發(fā)編程

Cheriselalala / 3398人閱讀

摘要:進(jìn)程線程切換都需要使用一定的時(shí)間。子進(jìn)程在中,如果要運(yùn)行系統(tǒng)命令,會(huì)使用來(lái)運(yùn)行,官方建議使用方法來(lái)運(yùn)行系統(tǒng)命令,更高級(jí)的用法是直接使用其接口。

多線程 簡(jiǎn)單示例

對(duì)于CPU計(jì)算密集型的任務(wù),python的多線程跟單線程沒什么區(qū)別,甚至有可能會(huì)更慢,但是對(duì)于IO密集型的任務(wù),比如http請(qǐng)求這類任務(wù),python的多線程還是有用處。在日常的使用中,經(jīng)常會(huì)結(jié)合多線程和隊(duì)列一起使用,比如,以爬取simpledestops 網(wǎng)站壁紙為例:

import os 
from datetime import datetime 
from queue import Queue
from threading import Thread
import requests
requests.packages.urllib3.disable_warnings()

from bs4 import BeautifulSoup
import re

if not os.path.exists("img"):
    os.mkdir("img")

# 聲明一個(gè)隊(duì)列
Q = Queue()

def producer(pages):
    for page in range(1,pages+1):
        # 提取每一頁(yè)的圖片 url 加入隊(duì)列
        print("[-] 收集第 {} 頁(yè)".format(str(page)))
        url = "http://simpledesktops.com/browse/"+str(page)+"/"
        r = requests.get(url,verify=False)
        html = r.text
        soup = BeautifulSoup(html,"html.parser")
        try:
            imgs = soup.find_all("img")
            for img in imgs:
                img_url = img["src"]
                Q.put(img_url)
        except:
            pass

def worker(i):
   # 取出隊(duì)列的值,按順序取,下載圖片
    while not Q.empty():
        img_url = Q.get()
        text = re.search("(http://static.simpledesktops.com/uploads/desktops/d+/d+/d+/(.*?png)).*?png",img_url)
        new_img_url = text.group(1)

        r = requests.get(new_img_url,verify=False)
        path = "img/"+text.group(2)
        print("[-] 線程 {} 開始下載 {} 開始時(shí)間:{}".format(i,text.group(2),datetime.now()))

        with open(path,"wb") as f:
            f.write(r.content)
    
    Q.all_tasks_done


if __name__ =="__main__":
    # 一定要將數(shù)據(jù)加入隊(duì)列,否則是啟動(dòng)不了的,因?yàn)殛?duì)列為空 
    producer(50)
    # 線程的聲明
    ts = [Thread(target=worker,args=(i,)) for i in range(50)]
    for t in ts:
        t.start()

    for t in ts:
        t.join()

我們使用start啟動(dòng)多線程,使用 join 防止主線程退出的時(shí)候結(jié)束所有的線程,使用隊(duì)列有序的且并發(fā)的下載壁紙。 仔細(xì)觀察就會(huì)發(fā)現(xiàn)代碼其實(shí)有跡可循,更改其中的爬取內(nèi)容的部分代碼后,我們就可以應(yīng)用于爬取別的網(wǎng)站。

ThreadLocal

按照道理來(lái)說(shuō),多線程中,每個(gè)線程的處理邏輯應(yīng)該是相同的,但是其處理的數(shù)據(jù),卻不一定是相同的,如果數(shù)據(jù)是全局的,那么我們就需要加鎖,防止數(shù)據(jù)混亂,這樣一來(lái)就會(huì)麻煩很多,所以線程處理的數(shù)據(jù)最好是局部的、其他線程不能干擾的。

代碼示例:

# coding: utf-8 

import threading,time
import requests
requests.packages.urllib3.disable_warnings()
from datetime import datetime 

local_variable = threading.local()

# 邏輯處理函數(shù)
def worker():
    print("每個(gè)線程啟動(dòng)的時(shí)間: ",datetime.now())
    time.sleep(10)
    url = local_variable.url
    r = requests.get(url,verify=False)
    print(r.url,datetime.strftime(datetime.now(),"%H:%M:%S"),threading.current_thread().name)

# 線程處理函數(shù)
def process_thread(url):
    local_variable.url = url
    worker()


if __name__ == "__main__":
    ts = [threading.Thread(target=process_thread,args=(url,))for url in ["https://www.baidu.com","https://www.google.com","https://www.bing.com"]]
    for t in ts:
        t.start()

    for t in ts:

        t.join()

輸出:

線程Thread-1 啟動(dòng)的時(shí)間:2019-01-09 11:25:18.339631
線程Thread-2 啟動(dòng)的時(shí)間:2019-01-09 11:25:18.340646
線程Thread-3 啟動(dòng)的時(shí)間:2019-01-09 11:25:18.342635
https://www.baidu.com/ 11:25:28 Thread-1
https://cn.bing.com/ 11:25:29 Thread-3
https://www.google.com/ 11:25:29 Thread-2
多進(jìn)程 進(jìn)程池

python中使用 multiprocessing 來(lái)創(chuàng)建多進(jìn)程,如果要?jiǎng)?chuàng)建多個(gè)子進(jìn)程,則需要使用 進(jìn)程池 Pool 來(lái)創(chuàng)建,一個(gè)簡(jiǎn)單的例子:

from multiprocessing import Pool
import os 
from datetime import datetime 


"""
@param {type} int
@return: None
"""
def print_num(i):
    print("進(jìn)程{} 打印 {}".format(os.getpid(),i))


if __name__ == "__main__":
    p = Pool(4)
    for i in range(100):
        p.apply_async(print_num,args=(i,))
    # 關(guān)閉進(jìn)程池,不再加入進(jìn)程
    p.close()
    # 防止主進(jìn)程結(jié)束,子進(jìn)程無(wú)法繼續(xù)運(yùn)行
    p.join()
    

輸出:

進(jìn)程2624 打印 0
進(jìn)程2625 打印 1
進(jìn)程2626 打印 3
進(jìn)程2627 打印 2
進(jìn)程2624 打印 4
進(jìn)程2625 打印 5
進(jìn)程2626 打印 6
進(jìn)程2627 打印 7
進(jìn)程2624 打印 8
...

進(jìn)程可以實(shí)現(xiàn)并行運(yùn)行代碼,但是一旦進(jìn)程太多,CPU運(yùn)行不過(guò)來(lái)也是需要進(jìn)行等待,用了多進(jìn)程以后,就可以不使用隊(duì)列了,也可以實(shí)現(xiàn)多線程的效果

除此之外,還可以多進(jìn)程和多線程結(jié)合起來(lái)使用,一個(gè)簡(jiǎn)單的例子

from multiprocessing import Pool
import threading
import os,time 
import queue 
from datetime import datetime 

def producer(i):
    Q = queue.Queue()
    start = 25*(i-1)
    end = 100 * int(i / 4)

    for x in range(start,end):
        Q.put(x)
  
    
    return Q

def process_thread(Q,j):
    while not Q.empty():
        item = Q.get()
        print("進(jìn)程{}: 線程{} 正在消耗:{} 時(shí)間:{}".format(os.getpid(),j,item,datetime.now()))

    Q.all_tasks_done


def tasks(i):
    Q = producer(i)
    ts = [threading.Thread(target=process_thread,args=(Q,j)) for j in range(10)]
    for t in ts:
        t.start()
    for t in ts:
        t.join()


if __name__ == "__main__":
    start = datetime.now()
    p = Pool(4)
    for i in range(1,5):
        print(i)
        p.apply_async(tasks,args=(i,))
    p.close()
    p.join()
    end = datetime.now()
    waste = end-start
    print("一共花費(fèi)了: {}".format(waste))
    

先將要處理的數(shù)據(jù),填進(jìn)隊(duì)列,然后創(chuàng)建4個(gè)進(jìn)程,10個(gè)線程運(yùn)行。 其輸出為:

"""
(venv) C:projectlibraries-python>python bulit-in-libraries	hreadingmultithreading.py
進(jìn)程17020: 線程0 正在消耗:1 時(shí)間:2019-01-09 12:50:48.701523
進(jìn)程17020: 線程1 正在消耗:2 時(shí)間:2019-01-09 12:50:48.703521
進(jìn)程17020: 線程3 正在消耗:4 時(shí)間:2019-01-09 12:50:48.704365
進(jìn)程17020: 線程2 正在消耗:3 時(shí)間:2019-01-09 12:50:48.704365

進(jìn)程2804: 線程0 正在消耗:5 時(shí)間:2019-01-09 12:50:48.706349
進(jìn)程2804: 線程1 正在消耗:6 時(shí)間:2019-01-09 12:50:48.707352
進(jìn)程2804: 線程4 正在消耗:9 時(shí)間:2019-01-09 12:50:48.708355
進(jìn)程2804: 線程3 正在消耗:8 時(shí)間:2019-01-09 12:50:48.708355
進(jìn)程2804: 線程2 正在消耗:7 時(shí)間:2019-01-09 12:50:48.708355

進(jìn)程16060: 線程0 正在消耗:10 時(shí)間:2019-01-09 12:50:48.728409
進(jìn)程16060: 線程1 正在消耗:11 時(shí)間:2019-01-09 12:50:48.730413
進(jìn)程16060: 線程4 正在消耗:14 時(shí)間:2019-01-09 12:50:48.732418
進(jìn)程16060: 線程3 正在消耗:13 時(shí)間:2019-01-09 12:50:48.732418
進(jìn)程16060: 線程2 正在消耗:12 時(shí)間:2019-01-09 12:50:48.732418

進(jìn)程7588: 線程3 正在消耗:18 時(shí)間:2019-01-09 12:50:48.761808
進(jìn)程7588: 線程4 正在消耗:19 時(shí)間:2019-01-09 12:50:48.761808
進(jìn)程7588: 線程0 正在消耗:15 時(shí)間:2019-01-09 12:50:48.761808
進(jìn)程7588: 線程1 正在消耗:16 時(shí)間:2019-01-09 12:50:48.761808
進(jìn)程7588: 線程2 正在消耗:17 時(shí)間:2019-01-09 12:50:48.761808

后來(lái)實(shí)驗(yàn)了打印出10萬(wàn)個(gè)數(shù),4個(gè)進(jìn)程,每個(gè)進(jìn)程400個(gè)線程,花費(fèi)了39秒。而400個(gè)線程,只花費(fèi)了17秒。所以有時(shí)候,也并不是多就是好。進(jìn)程線程切換都需要使用一定的時(shí)間。

子進(jìn)程

在python中,如果要運(yùn)行系統(tǒng)命令,會(huì)使用 subprocess 來(lái)運(yùn)行,官方建議使用run 方法來(lái)運(yùn)行系統(tǒng)命令,更高級(jí)的用法是直接使用其 Popen 接口。
其函數(shù)格式為:

subprocess.run(args,?*,?stdin=None,?input=None,?stdout=None,?stderr=None,?capture_output=False,?shell=False,?cwd=None,?timeout=None,?check=False,?encoding=None,?errors=None,?text=None,?env=None,?universal_newlines=None)

可以看幾個(gè)簡(jiǎn)單的例子:

直接使用
import subprocess
subprocess.run(["ls","-al"])

在python3.7 之前,默認(rèn)系統(tǒng)命令執(zhí)行的結(jié)果(輸出/錯(cuò)誤)不存在stdout/stderr 里面,需要設(shè)置 capture_output=True,而在python3.6 版本,如果你需要使用執(zhí)行的結(jié)果,你就需要設(shè)置 stdout. 如下所示

# python 3.6
>>> a = subprocess.run(["ls","-al"],stdout=subprocess.PIPE)
>>> a.stdout

# python3.7 
>>> a = subprocess.run(["ls","-al"],capture_output=True)
>>> a.stdout

所以可以看出python3.7 又做了一層封裝,為了讓大家使用更上一層的接口。可以看一下幾個(gè)參數(shù)的含義為:

args 列表,為shell命令
shell boolean值, 設(shè)置后,args可以直接接受shell命令
capture_output = True , 設(shè)置后,stdout/stderr會(huì)存儲(chǔ)值
check=True, 設(shè)置后,如果程序異常退出,會(huì)跑出一個(gè)CalledProcessError異常
cwd 是工作目錄,可以為str,或者path-like 類
高級(jí)使用

Popen的構(gòu)造函數(shù):

class?subprocess.Popen(args,?bufsize=-1,?executable=None,?stdin=None,?stdout=None,?stderr=None,?preexec_fn=None,?close_fds=True,?shell=False,?cwd=None,?env=None,?universal_newlines=False,?startupinfo=None,?creationflags=0,?restore_signals=True,?start_new_session=False,?pass_fds=(),?*,?encoding=None,?errors=None)

一個(gè)簡(jiǎn)單的例子

p = subprocess.Popen(["ls","-al"],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)

其次,通過(guò)Popen.communicate() ,子進(jìn)程可以在啟動(dòng)了以后,還可以進(jìn)行參數(shù)的輸入

import subprocess

print("$ nslookup")
p = subprocess.Popen(["nslookup"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b"set q=mx
python.org
exit
")
print(output.decode("utf-8"))
print("Exit code:", p.returncode)
其輸出:
$ nslookup
Server:        192.168.19.4
Address:    192.168.19.4#53

Non-authoritative answer:
python.org    mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:
mail.python.org    internet address = 82.94.164.166
mail.python.org    has AAAA address 2001:888:2000:d::a6

Exit code: 0
分布式多進(jìn)程

python的分布式接口簡(jiǎn)單,使用起來(lái)也十分簡(jiǎn)單,可以參考廖雪峰的教程,需要的時(shí)候,修改代碼,即可完成屬于自己的分布式程序

這里貼出代碼:

# master
import random,time,queue
from multiprocessing.managers import BaseManager

task_queue = queue.Queue()
result_queue = queue.Queue()

class QueueManager(BaseManager):
    pass

QueueManager.register("get_task_queue",callable=lambda:task_queue)
QueueManager.register("get_result_queue",callable=lambda:result_queue)

manager = QueueManager(address=("",5000),authkey=b"abc")
manager.start()

tasks = manager.get_task_queue()
results = manager.get_result_queue()

for i in range(10):
    n = random.randint(0,10000)
    print("put task {}".format(n))
    tasks.put(n)

print("try get results...")
for i in range(10):
    r = results.get(timeout=100)
    print("result:{}".format(r))

manager.shutdown()
print("master exit")

# worker
import time,sys,queue
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
    pass


QueueManager.register("get_task_queue")
QueueManager.register("get_result_queue")

# master的主機(jī)地址
server_addr = "127.0.0.1"
print("connect to server...")
m = QueueManager(address=(server_addr,5000),authkey=b"abc")
m.connect()

tasks = m.get_task_queue()
results = m.get_result_queue()

for i in range(10):
    try:
        n = tasks.get(timeout=1)
        print("run task %d * %d..." % (n, n))

        r = "{} * {} = {}".format(n,n,n*n)
        time.sleep(1)
        results.put(r)
    except Queue.Empty:
        print("task queue is empty.")

print("worker exit.")
參考

https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431929340191970154d52b9d484b88a7b343708fcc60000
https://docs.python.org/3.6/library/subprocess.html
https://docs.python.org/3.7/library/subprocess.html

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/42965.html

相關(guān)文章

  • java并發(fā)編程學(xué)習(xí)之ConcurrentHashMap(JDK1.7)

    摘要:之前中提過(guò),并發(fā)的時(shí)候,可能造成死循環(huán),那么在多線程中可以用來(lái)避免這一情況。默認(rèn),當(dāng)容量大于時(shí),開始擴(kuò)容并發(fā)數(shù),默認(rèn),直接影響和的值,以及的初始化數(shù)量。初始化的數(shù)量,為最接近且大于的辦等于的次方的值,比如,數(shù)量為,,數(shù)量為。 之前HashMap中提過(guò),并發(fā)的時(shí)候,可能造成死循環(huán),那么在多線程中可以用ConcurrentHashMap來(lái)避免這一情況。 Segment Concurrent...

    piglei 評(píng)論0 收藏0
  • java并發(fā)編程學(xué)習(xí)7--同步--synchronized關(guān)鍵字

    摘要:如果兩個(gè)線程存取相同的對(duì)象,并且每一個(gè)線程都調(diào)用一個(gè)修改該對(duì)象狀態(tài)的方法,根據(jù)線程訪問(wèn)數(shù)據(jù)的順序,可能會(huì)出現(xiàn)錯(cuò)誤的數(shù)據(jù)結(jié)果,這種現(xiàn)象成為條件競(jìng)爭(zhēng)。而問(wèn)題往往就是有多個(gè)線程同時(shí)在執(zhí)行步驟。內(nèi)部鎖有如下的特點(diǎn)不能中斷正在試圖獲得鎖的線程。 【條件競(jìng)爭(zhēng) 在多線程的開發(fā)中,兩個(gè)及其以上的線程需要共享統(tǒng)一數(shù)據(jù)的存取。如果兩個(gè)線程存取相同的對(duì)象,并且每一個(gè)線程都調(diào)用一個(gè)修改該對(duì)象狀態(tài)的方法,根據(jù)線...

    zzzmh 評(píng)論0 收藏0
  • <java并發(fā)編程實(shí)戰(zhàn)>學(xué)習(xí)一

    摘要:無(wú)狀態(tài)的是線程安全的,當(dāng)無(wú)狀態(tài)變?yōu)橛袪顟B(tài)時(shí)就是不安全的破壞了線程的安全性,非原子性操作競(jìng)態(tài)條件在并發(fā)編程中,由于不恰當(dāng)?shù)膱?zhí)行時(shí)序而出現(xiàn)的不正確結(jié)果是一種非常重要的情況,被稱之為競(jìng)態(tài)條件。重入意味著獲取鎖的操作的粒度是線程,而不是調(diào)用。 這本書的內(nèi)容是什么? 本書提供了各種實(shí)用的設(shè)計(jì)規(guī)則,用于幫助開發(fā)人員創(chuàng)建安全的和高性能的并發(fā)類。 什么類是線程安全的? 當(dāng)多個(gè)線程訪問(wèn)某...

    xiaoqibTn 評(píng)論0 收藏0
  • 從小白程序員一路晉升為大廠高級(jí)技術(shù)專家我看過(guò)哪些書籍?(建議收藏)

    摘要:大家好,我是冰河有句話叫做投資啥都不如投資自己的回報(bào)率高。馬上就十一國(guó)慶假期了,給小伙伴們分享下,從小白程序員到大廠高級(jí)技術(shù)專家我看過(guò)哪些技術(shù)類書籍。 大家好,我是...

    sf_wangchong 評(píng)論0 收藏0
  • [Java并發(fā)-1]入門:并發(fā)編程Bug的源頭

    摘要:所以這情況下,當(dāng)線程操作變量的時(shí)候,變量并不對(duì)線程可見??偨Y(jié),緩存引發(fā)的可見性問(wèn)題,切換線程帶來(lái)的原子性問(wèn)題,編譯帶來(lái)的有序性問(wèn)題深刻理解這些前因后果,可以診斷大部分并發(fā)的問(wèn)題 背景介紹 如何解決并發(fā)問(wèn)題,首先要理解并發(fā)問(wèn)題的實(shí)際源頭怎么發(fā)生的。 現(xiàn)代計(jì)算機(jī)的不同硬件的運(yùn)行速度是差異很大的,這個(gè)大家應(yīng)該都是知道的。 計(jì)算機(jī)數(shù)據(jù)傳輸運(yùn)行速度上的快慢比較: CPU > 緩存 > I/O ...

    xiguadada 評(píng)論0 收藏0
  • 對(duì)python并發(fā)編程的思考

    摘要:我們以請(qǐng)求網(wǎng)絡(luò)服務(wù)為例,來(lái)實(shí)際測(cè)試一下加入多線程之后的效果。所以,執(zhí)行密集型操作時(shí),多線程是有用的,對(duì)于密集型操作,則每次只能使用一個(gè)線程。說(shuō)到這里,對(duì)于密集型,可以使用多線程或者多進(jìn)程來(lái)提高效率。 為了提高系統(tǒng)密集型運(yùn)算的效率,我們常常會(huì)使用到多個(gè)進(jìn)程或者是多個(gè)線程,python中的Threading包實(shí)現(xiàn)了線程,multiprocessing 包則實(shí)現(xiàn)了多進(jìn)程。而在3.2版本的py...

    sshe 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<