成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專(zhuān)欄INFORMATION COLUMN

100行代碼實(shí)現(xiàn)任務(wù)隊(duì)列

xorpay / 696人閱讀

摘要:最近剛看完多線程,為了加深印象,按照分鐘實(shí)現(xiàn)延遲消息功能的思路,實(shí)現(xiàn)了一個(gè)簡(jiǎn)易版的異步隊(duì)列。讀取任務(wù)時(shí),計(jì)算當(dāng)前和,取出需要執(zhí)行的任務(wù),使用多線程的形式執(zhí)行。加鎖的主要作用是防止多線程同時(shí)操作文件讀寫(xiě),影響數(shù)據(jù)一致性。

最近剛看完python多線程,為了加深印象,按照1分鐘實(shí)現(xiàn)“延遲消息”功能的思路,實(shí)現(xiàn)了一個(gè)簡(jiǎn)易版的異步隊(duì)列。

高效延時(shí)消息,包含兩個(gè)重要的數(shù)據(jù)結(jié)構(gòu):

1.環(huán)形隊(duì)列,例如可以創(chuàng)建一個(gè)包含3600個(gè)slot的環(huán)形隊(duì)列(本質(zhì)是個(gè)數(shù)組)

2.任務(wù)集合,環(huán)上每一個(gè)slot是一個(gè)Set

同時(shí),啟動(dòng)一個(gè)timer,這個(gè)timer每隔1s,在上述環(huán)形隊(duì)列中移動(dòng)一格,有一個(gè)Current Index指針來(lái)標(biāo)識(shí)正在檢測(cè)的slot。

Task結(jié)構(gòu)中有兩個(gè)很重要的屬性:

(1)Cycle-Num:當(dāng)Current Index第幾圈掃描到這個(gè)Slot時(shí),執(zhí)行任務(wù)
(2)Task-Function:需要執(zhí)行的任務(wù)指針

下邊是代碼(代碼不止100行,但是在200行內(nèi),也算100行了。)

#! -*- coding: utf-8 -*-

try:
    import cPickle as pickle
except ImportError:
    import pickle
try:
    import simplejson as json
except ImportError:
    import json

import os
import errno
import Queue
import random
import logging
from functools import wraps
from threading import Timer, RLock, Thread
from time import sleep, time
from base64 import b64encode, b64decode

# json 的數(shù)據(jù)結(jié)構(gòu)
# tasks = {
#     index: {
#         cycle_num: [(func, bargs)]
#     }
# }

logging.basicConfig(level=logging.DEBUG,
                    format="(%(asctime)-15s) %(message)s",)
tasks_file = "tasks.json"
flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY

# 為了防止任務(wù)太多需要生成過(guò)多的線程,我們使用Queue 來(lái)限制生成的線程數(shù)量
WORKER_NUMS = 2
q = Queue.Queue(WORKER_NUMS)

lock = RLock()


def check_file():
    try:
        file_handle = os.open(tasks_file, flags)
    except OSError as e:
        if e.errno == errno.EEXIST:  # Failed as the file already exists.
            pass
        else:
            raise
    else:
        with os.fdopen(file_handle, "w") as file_obj:
            file_obj.write("{}")


def set_delay_task(func_name, *args, **kwargs):
    # 使用鎖來(lái)保證每次只要一個(gè)線程寫(xiě)入文件,防止數(shù)據(jù)出錯(cuò)
    with lock:
        with open(tasks_file, "r+") as json_file:
            count_down = kwargs.pop("count_down", 0)
            tasks = json.load(json_file)
            # 執(zhí)行時(shí)間
            exec_time = int(time()) + count_down
            # 循環(huán)索引
            index = str(exec_time % 3600)
            # 圈數(shù)
            cycle_num = str(exec_time / 3600 + 1)
            dargs = pickle.dumps((args, kwargs))
            bargs = b64encode(dargs)
            index_data = tasks.get(index, {})
            index_data.setdefault(cycle_num, []).append((func_name, bargs))
            tasks[index] = index_data
            json_file.seek(0)
            json.dump(tasks, json_file)
            logging.debug("Received task: %s" % func_name)


def get_delay_tasks():
    with open(tasks_file, "r+") as json_file:
        tasks = json.load(json_file)
        # 執(zhí)行時(shí)間
        current_time = int(time())
        # 循環(huán)索引
        index = str(current_time % 3600)
        # 圈數(shù)
        cycle_num = str(current_time / 3600 + 1)
        current_tasks = tasks.get(index, {}).get(cycle_num, [])
    tasks = []
    for func, bargs in current_tasks:
        dargs = b64decode(bargs)
        args, kwargs = pickle.loads(dargs)
        tasks.append((func, (args, kwargs)))
    return tasks


def get_method_by_name(method_name):
    possibles = globals().copy()
    possibles.update(locals())
    method = possibles.get(method_name)
    return method


def create_task(task_class, func, task_name=None, **kwargs):

    def execute(self):
        args, kwargs = self.data or ((), {})
        return func(*args, **kwargs)

    attrs = {
        "execute": execute,
        "func_name": func.__name__,
        "__module__": func.__module__,
        "__doc__": func.__doc__
    }
    attrs.update(kwargs)

    klass = type(
        task_name or func.__name__,
        (task_class,),
        attrs
    )

    return klass


class Hu(object):

    def __init__(self, func_name=None):
        self.func_name = func_name
        check_file()

    def task(self):
        def deco(func):
            self.func_name = func.__name__
            klass = create_task(Hu, func, self.func_name)
            func.delay = klass(func_name=klass.func_name).delay
            @wraps(func)
            def wrapper(*args, **kwargs):
                return func(*args, **kwargs)
            return wrapper
        return deco

    def delay(self, *args, **kwargs):
        _args = [self.func_name]
        _args.extend(args)
        Timer(0, set_delay_task, _args, kwargs).start()
        return True


def boss():
    while True:
        current_tasks = get_delay_tasks()
        for func, params in current_tasks:
            # Task accepted: auth.tasks.send_msg
            logging.debug("Task accepted: %s" % func)
            q.put((func, params))
        sleep(1)


def worker():
    while True:
        func, params = q.get()
        print "get task: %s
" % func
        method = get_method_by_name(func)
        args, kwargs = params
        # Task auth.tasks.send_msgsucceeded in
        start_time = time()
        method(*args, **kwargs)
        end_time = time()
        logging.debug("Task %s succeeded in %s" % (str(func), end_time - start_time))
        q.task_done()


def main():
    check_file()
    print("starting at:", time())
    for target in (boss, worker):
        t = Thread(target=target)
        t.start()
    print("all DONE at:", time())

hu = Hu()

# 使用方式如下:

@hu.task()
def test(num):
    sleep(2)
    print "test: %s" % num


if __name__ == "__main__":
    for i in range(10):
        test.delay(i, count_down=random.randint(1, 10))
    main()

# output

(2017-03-21 15:59:20,394) Received task: test
(2017-03-21 15:59:20,396) Received task: test
(2017-03-21 15:59:20,397) Received task: test
(2017-03-21 15:59:20,398) Received task: test
(2017-03-21 15:59:20,400) Received task: test
(2017-03-21 15:59:20,401) Received task: test
(2017-03-21 15:59:20,403) Received task: test
(2017-03-21 15:59:20,404) Received task: test
(2017-03-21 15:59:20,406) Received task: test
(2017-03-21 15:59:20,408) Received task: test
get task: test

(2017-03-21 15:59:21,395) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
(2017-03-21 15:59:22,397) Task accepted: test
test: 2
get task: test

(2017-03-21 15:59:23,399) Task test succeeded in 2.0037419796
(2017-03-21 15:59:24,404) Task accepted: test
test: 1
get task: test

按照1分鐘實(shí)現(xiàn)“延遲消息”功能的思路。隊(duì)列的數(shù)據(jù)結(jié)構(gòu)為

{
    index: {
        cycle_num: [(func, bargs)]
    }
}

index的值為 1-3600。每小時(shí)一個(gè)循環(huán)。
cycle_num 則是 由 (時(shí)間戳 / 3600 + 1) 計(jì)算得到的值,是圈數(shù)。

每當(dāng)有任務(wù)加入,我們計(jì)算出index和cycle_num 將參數(shù)和方法名寫(xiě)入json文件。
讀取任務(wù)時(shí),計(jì)算當(dāng)前 index和cycle_num, 取出需要執(zhí)行的任務(wù),使用多線程的形式執(zhí)行。

為了防止任務(wù)太多需要生成過(guò)多的線程,我們使用Queue 來(lái)限制生成的線程數(shù)量。

加鎖的主要作用是防止多線程同時(shí)操作文件讀寫(xiě),影響數(shù)據(jù)一致性。

當(dāng)然,也可以使用redis 存儲(chǔ)隊(duì)列,因?yàn)?redis 是單線程操作,可以防止多線程操作影響數(shù)據(jù)一致性的問(wèn)題。
這一部分有需要的可以自己實(shí)現(xiàn)。

參考:

python線程筆記

1分鐘實(shí)現(xiàn)“延遲消息”功能

>歡迎關(guān)注 >請(qǐng)我喝芬達(dá)

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/38539.html

相關(guān)文章

  • Laravel Telescope:優(yōu)雅的應(yīng)用調(diào)試工具

    摘要:文章轉(zhuǎn)自視頻教程優(yōu)雅的應(yīng)用調(diào)試工具新擴(kuò)展是由和開(kāi)源的應(yīng)用的調(diào)試工具。計(jì)劃任務(wù)列出已運(yùn)行的計(jì)劃任務(wù)。該封閉函數(shù)會(huì)被序列化為一個(gè)長(zhǎng)字符串,加上他的哈希與簽名如出一轍該功能將記錄所有異常,并可查看具體異常情況。事件顯示所有事件的列表。 文章轉(zhuǎn)自:https://laravel-china.org/topics/19013視頻教程:047. 優(yōu)雅的應(yīng)用調(diào)試工具--laravel/telesco...

    MasonEast 評(píng)論0 收藏0
  • 時(shí)間切片(Time Slicing)

    摘要:所以回來(lái)后就想著補(bǔ)一篇文章針對(duì)時(shí)間切片展開(kāi)詳細(xì)的討論。所以時(shí)間切片的目的是不阻塞主線程,而實(shí)現(xiàn)目的的技術(shù)手段是將一個(gè)長(zhǎng)任務(wù)拆分成很多個(gè)不超過(guò)的小任務(wù)分散在宏任務(wù)隊(duì)列中執(zhí)行。上周我在FDConf的分享《讓你的網(wǎng)頁(yè)更絲滑》中提到了時(shí)間切片,由于時(shí)間關(guān)系當(dāng)時(shí)并沒(méi)有對(duì)時(shí)間切片展開(kāi)更細(xì)致的討論。所以回來(lái)后就想著補(bǔ)一篇文章針對(duì)時(shí)間切片展開(kāi)詳細(xì)的討論。 從用戶(hù)的輸入,再到顯示器在視覺(jué)上給用戶(hù)的輸出,這一過(guò)...

    Freeman 評(píng)論0 收藏0
  • 300ABAP代碼實(shí)現(xiàn)一個(gè)最簡(jiǎn)單的區(qū)塊鏈原型

    摘要:我的這篇文章沒(méi)有任何高大上的術(shù)語(yǔ),就是行代碼,實(shí)現(xiàn)一個(gè)最簡(jiǎn)單的區(qū)塊鏈原型。檢查該區(qū)塊鏈?zhǔn)欠裼行А6ㄟ^(guò)在循環(huán)里不斷嘗試最終得到一個(gè)合法的哈希值的這一過(guò)程,就是區(qū)塊鏈圈內(nèi)俗稱(chēng)的挖礦。 不知從什么時(shí)候起,區(qū)塊鏈在網(wǎng)上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 這里Jerry就不班門(mén)弄斧了,網(wǎng)上...

    cikenerd 評(píng)論0 收藏0
  • 300ABAP代碼實(shí)現(xiàn)一個(gè)最簡(jiǎn)單的區(qū)塊鏈原型

    摘要:我的這篇文章沒(méi)有任何高大上的術(shù)語(yǔ),就是行代碼,實(shí)現(xiàn)一個(gè)最簡(jiǎn)單的區(qū)塊鏈原型。檢查該區(qū)塊鏈?zhǔn)欠裼行?。而通過(guò)在循環(huán)里不斷嘗試最終得到一個(gè)合法的哈希值的這一過(guò)程,就是區(qū)塊鏈圈內(nèi)俗稱(chēng)的挖礦。 不知從什么時(shí)候起,區(qū)塊鏈在網(wǎng)上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 這里Jerry就不班門(mén)弄斧了,網(wǎng)上...

    DangoSky 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<