摘要:最近剛看完多線程,為了加深印象,按照分鐘實(shí)現(xiàn)延遲消息功能的思路,實(shí)現(xiàn)了一個(gè)簡(jiǎn)易版的異步隊(duì)列。讀取任務(wù)時(shí),計(jì)算當(dāng)前和,取出需要執(zhí)行的任務(wù),使用多線程的形式執(zhí)行。加鎖的主要作用是防止多線程同時(shí)操作文件讀寫(xiě),影響數(shù)據(jù)一致性。
最近剛看完python多線程,為了加深印象,按照1分鐘實(shí)現(xiàn)“延遲消息”功能的思路,實(shí)現(xiàn)了一個(gè)簡(jiǎn)易版的異步隊(duì)列。
高效延時(shí)消息,包含兩個(gè)重要的數(shù)據(jù)結(jié)構(gòu):
1.環(huán)形隊(duì)列,例如可以創(chuàng)建一個(gè)包含3600個(gè)slot的環(huán)形隊(duì)列(本質(zhì)是個(gè)數(shù)組)
2.任務(wù)集合,環(huán)上每一個(gè)slot是一個(gè)Set
同時(shí),啟動(dòng)一個(gè)timer,這個(gè)timer每隔1s,在上述環(huán)形隊(duì)列中移動(dòng)一格,有一個(gè)Current Index指針來(lái)標(biāo)識(shí)正在檢測(cè)的slot。
Task結(jié)構(gòu)中有兩個(gè)很重要的屬性:
(1)Cycle-Num:當(dāng)Current Index第幾圈掃描到這個(gè)Slot時(shí),執(zhí)行任務(wù)
(2)Task-Function:需要執(zhí)行的任務(wù)指針
下邊是代碼(代碼不止100行,但是在200行內(nèi),也算100行了。)
#! -*- coding: utf-8 -*- try: import cPickle as pickle except ImportError: import pickle try: import simplejson as json except ImportError: import json import os import errno import Queue import random import logging from functools import wraps from threading import Timer, RLock, Thread from time import sleep, time from base64 import b64encode, b64decode # json 的數(shù)據(jù)結(jié)構(gòu) # tasks = { # index: { # cycle_num: [(func, bargs)] # } # } logging.basicConfig(level=logging.DEBUG, format="(%(asctime)-15s) %(message)s",) tasks_file = "tasks.json" flags = os.O_CREAT | os.O_EXCL | os.O_WRONLY # 為了防止任務(wù)太多需要生成過(guò)多的線程,我們使用Queue 來(lái)限制生成的線程數(shù)量 WORKER_NUMS = 2 q = Queue.Queue(WORKER_NUMS) lock = RLock() def check_file(): try: file_handle = os.open(tasks_file, flags) except OSError as e: if e.errno == errno.EEXIST: # Failed as the file already exists. pass else: raise else: with os.fdopen(file_handle, "w") as file_obj: file_obj.write("{}") def set_delay_task(func_name, *args, **kwargs): # 使用鎖來(lái)保證每次只要一個(gè)線程寫(xiě)入文件,防止數(shù)據(jù)出錯(cuò) with lock: with open(tasks_file, "r+") as json_file: count_down = kwargs.pop("count_down", 0) tasks = json.load(json_file) # 執(zhí)行時(shí)間 exec_time = int(time()) + count_down # 循環(huán)索引 index = str(exec_time % 3600) # 圈數(shù) cycle_num = str(exec_time / 3600 + 1) dargs = pickle.dumps((args, kwargs)) bargs = b64encode(dargs) index_data = tasks.get(index, {}) index_data.setdefault(cycle_num, []).append((func_name, bargs)) tasks[index] = index_data json_file.seek(0) json.dump(tasks, json_file) logging.debug("Received task: %s" % func_name) def get_delay_tasks(): with open(tasks_file, "r+") as json_file: tasks = json.load(json_file) # 執(zhí)行時(shí)間 current_time = int(time()) # 循環(huán)索引 index = str(current_time % 3600) # 圈數(shù) cycle_num = str(current_time / 3600 + 1) current_tasks = tasks.get(index, {}).get(cycle_num, []) tasks = [] for func, bargs in current_tasks: dargs = b64decode(bargs) args, kwargs = pickle.loads(dargs) tasks.append((func, (args, kwargs))) return tasks def get_method_by_name(method_name): possibles = globals().copy() possibles.update(locals()) method = possibles.get(method_name) return method def create_task(task_class, func, task_name=None, **kwargs): def execute(self): args, kwargs = self.data or ((), {}) return func(*args, **kwargs) attrs = { "execute": execute, "func_name": func.__name__, "__module__": func.__module__, "__doc__": func.__doc__ } attrs.update(kwargs) klass = type( task_name or func.__name__, (task_class,), attrs ) return klass class Hu(object): def __init__(self, func_name=None): self.func_name = func_name check_file() def task(self): def deco(func): self.func_name = func.__name__ klass = create_task(Hu, func, self.func_name) func.delay = klass(func_name=klass.func_name).delay @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper return deco def delay(self, *args, **kwargs): _args = [self.func_name] _args.extend(args) Timer(0, set_delay_task, _args, kwargs).start() return True def boss(): while True: current_tasks = get_delay_tasks() for func, params in current_tasks: # Task accepted: auth.tasks.send_msg logging.debug("Task accepted: %s" % func) q.put((func, params)) sleep(1) def worker(): while True: func, params = q.get() print "get task: %s " % func method = get_method_by_name(func) args, kwargs = params # Task auth.tasks.send_msgsucceeded in start_time = time() method(*args, **kwargs) end_time = time() logging.debug("Task %s succeeded in %s" % (str(func), end_time - start_time)) q.task_done() def main(): check_file() print("starting at:", time()) for target in (boss, worker): t = Thread(target=target) t.start() print("all DONE at:", time()) hu = Hu() # 使用方式如下: @hu.task() def test(num): sleep(2) print "test: %s" % num if __name__ == "__main__": for i in range(10): test.delay(i, count_down=random.randint(1, 10)) main() # output (2017-03-21 15:59:20,394) Received task: test (2017-03-21 15:59:20,396) Received task: test (2017-03-21 15:59:20,397) Received task: test (2017-03-21 15:59:20,398) Received task: test (2017-03-21 15:59:20,400) Received task: test (2017-03-21 15:59:20,401) Received task: test (2017-03-21 15:59:20,403) Received task: test (2017-03-21 15:59:20,404) Received task: test (2017-03-21 15:59:20,406) Received task: test (2017-03-21 15:59:20,408) Received task: test get task: test (2017-03-21 15:59:21,395) Task accepted: test (2017-03-21 15:59:22,397) Task accepted: test (2017-03-21 15:59:22,397) Task accepted: test (2017-03-21 15:59:22,397) Task accepted: test test: 2 get task: test (2017-03-21 15:59:23,399) Task test succeeded in 2.0037419796 (2017-03-21 15:59:24,404) Task accepted: test test: 1 get task: test
按照1分鐘實(shí)現(xiàn)“延遲消息”功能的思路。隊(duì)列的數(shù)據(jù)結(jié)構(gòu)為
{ index: { cycle_num: [(func, bargs)] } }
index的值為 1-3600。每小時(shí)一個(gè)循環(huán)。
cycle_num 則是 由 (時(shí)間戳 / 3600 + 1) 計(jì)算得到的值,是圈數(shù)。
每當(dāng)有任務(wù)加入,我們計(jì)算出index和cycle_num 將參數(shù)和方法名寫(xiě)入json文件。
讀取任務(wù)時(shí),計(jì)算當(dāng)前 index和cycle_num, 取出需要執(zhí)行的任務(wù),使用多線程的形式執(zhí)行。
為了防止任務(wù)太多需要生成過(guò)多的線程,我們使用Queue 來(lái)限制生成的線程數(shù)量。
加鎖的主要作用是防止多線程同時(shí)操作文件讀寫(xiě),影響數(shù)據(jù)一致性。
當(dāng)然,也可以使用redis 存儲(chǔ)隊(duì)列,因?yàn)?redis 是單線程操作,可以防止多線程操作影響數(shù)據(jù)一致性的問(wèn)題。
這一部分有需要的可以自己實(shí)現(xiàn)。
參考:
python線程筆記
1分鐘實(shí)現(xiàn)“延遲消息”功能
>歡迎關(guān)注 | >請(qǐng)我喝芬達(dá) |
---|---|
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/38539.html
摘要:文章轉(zhuǎn)自視頻教程優(yōu)雅的應(yīng)用調(diào)試工具新擴(kuò)展是由和開(kāi)源的應(yīng)用的調(diào)試工具。計(jì)劃任務(wù)列出已運(yùn)行的計(jì)劃任務(wù)。該封閉函數(shù)會(huì)被序列化為一個(gè)長(zhǎng)字符串,加上他的哈希與簽名如出一轍該功能將記錄所有異常,并可查看具體異常情況。事件顯示所有事件的列表。 文章轉(zhuǎn)自:https://laravel-china.org/topics/19013視頻教程:047. 優(yōu)雅的應(yīng)用調(diào)試工具--laravel/telesco...
摘要:所以回來(lái)后就想著補(bǔ)一篇文章針對(duì)時(shí)間切片展開(kāi)詳細(xì)的討論。所以時(shí)間切片的目的是不阻塞主線程,而實(shí)現(xiàn)目的的技術(shù)手段是將一個(gè)長(zhǎng)任務(wù)拆分成很多個(gè)不超過(guò)的小任務(wù)分散在宏任務(wù)隊(duì)列中執(zhí)行。上周我在FDConf的分享《讓你的網(wǎng)頁(yè)更絲滑》中提到了時(shí)間切片,由于時(shí)間關(guān)系當(dāng)時(shí)并沒(méi)有對(duì)時(shí)間切片展開(kāi)更細(xì)致的討論。所以回來(lái)后就想著補(bǔ)一篇文章針對(duì)時(shí)間切片展開(kāi)詳細(xì)的討論。 從用戶(hù)的輸入,再到顯示器在視覺(jué)上給用戶(hù)的輸出,這一過(guò)...
摘要:我的這篇文章沒(méi)有任何高大上的術(shù)語(yǔ),就是行代碼,實(shí)現(xiàn)一個(gè)最簡(jiǎn)單的區(qū)塊鏈原型。檢查該區(qū)塊鏈?zhǔn)欠裼行А6ㄟ^(guò)在循環(huán)里不斷嘗試最終得到一個(gè)合法的哈希值的這一過(guò)程,就是區(qū)塊鏈圈內(nèi)俗稱(chēng)的挖礦。 不知從什么時(shí)候起,區(qū)塊鏈在網(wǎng)上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 這里Jerry就不班門(mén)弄斧了,網(wǎng)上...
摘要:我的這篇文章沒(méi)有任何高大上的術(shù)語(yǔ),就是行代碼,實(shí)現(xiàn)一個(gè)最簡(jiǎn)單的區(qū)塊鏈原型。檢查該區(qū)塊鏈?zhǔn)欠裼行?。而通過(guò)在循環(huán)里不斷嘗試最終得到一個(gè)合法的哈希值的這一過(guò)程,就是區(qū)塊鏈圈內(nèi)俗稱(chēng)的挖礦。 不知從什么時(shí)候起,區(qū)塊鏈在網(wǎng)上一下子就火了。 showImg(https://segmentfault.com/img/remote/1460000014744826); 這里Jerry就不班門(mén)弄斧了,網(wǎng)上...
閱讀 711·2021-11-18 10:02
閱讀 2249·2021-11-15 18:13
閱讀 3176·2021-11-15 11:38
閱讀 2963·2021-09-22 15:55
閱讀 3684·2021-08-09 13:43
閱讀 2454·2021-07-25 14:19
閱讀 2462·2019-08-30 14:15
閱讀 3458·2019-08-30 14:15