摘要:最近打算學(xué)習(xí)的源碼,所以就建立一個(gè)系列主題深入理解。那么就是最底層的實(shí)現(xiàn)。的實(shí)現(xiàn)基于,那么什么是是內(nèi)核為處理大批量文件描述符而作了改進(jìn)的。經(jīng)過分析,我們可以看到,實(shí)際上是對(duì)的封裝,并加入了一些對(duì)上層事件的處理和相關(guān)的底層處理。
最近打算學(xué)習(xí) tornado 的源碼,所以就建立一個(gè)系列主題 “深入理解 tornado”。 在此記錄學(xué)習(xí)經(jīng)歷及個(gè)人見解與大家分享。文中一定會(huì)出現(xiàn)理解不到位或理解錯(cuò)誤的地方,還請(qǐng)大家多多指教
進(jìn)入正題:
tornado 優(yōu)秀的大并發(fā)處理能力得益于它的 web server 從底層開始就自己實(shí)現(xiàn)了一整套基于 epoll 的單線程異步架構(gòu)(其他 python web 框架的自帶 server 基本是基于 wsgi 寫的簡(jiǎn)單服務(wù)器,并沒有自己實(shí)現(xiàn)底層結(jié)構(gòu)。 關(guān)于 wsgi 詳見之前的文章: 自己寫一個(gè) wsgi 服務(wù)器運(yùn)行 Django 、Tornado 應(yīng)用)。 那么 tornado.ioloop 就是 tornado web server 最底層的實(shí)現(xiàn)。
看 ioloop 之前,我們需要了解一些預(yù)備知識(shí),有助于我們理解 ioloop。
epollioloop 的實(shí)現(xiàn)基于 epoll ,那么什么是 epoll? epoll 是Linux內(nèi)核為處理大批量文件描述符而作了改進(jìn)的 poll 。
那么什么又是 poll ? 首先,我們回顧一下, socket 通信時(shí)的服務(wù)端,當(dāng)它接受( accept )一個(gè)連接并建立通信后( connection )就進(jìn)行通信,而此時(shí)我們并不知道連接的客戶端有沒有信息發(fā)完。 這時(shí)候我們有兩種選擇:
一直在這里等著直到收發(fā)數(shù)據(jù)結(jié)束;
每隔一定時(shí)間來看看這里有沒有數(shù)據(jù);
第二種辦法要比第一種好一些,多個(gè)連接可以統(tǒng)一在一定時(shí)間內(nèi)輪流看一遍里面有沒有數(shù)據(jù)要讀寫,看上去我們可以處理多個(gè)連接了,這個(gè)方式就是 poll / select 的解決方案。 看起來似乎解決了問題,但實(shí)際上,隨著連接越來越多,輪詢所花費(fèi)的時(shí)間將越來越長(zhǎng),而服務(wù)器連接的 socket 大多不是活躍的,所以輪詢所花費(fèi)的大部分時(shí)間將是無用的。為了解決這個(gè)問題, epoll 被創(chuàng)造出來,它的概念和 poll 類似,不過每次輪詢時(shí),他只會(huì)把有數(shù)據(jù)活躍的 socket 挑出來輪詢,這樣在有大量連接時(shí)輪詢就節(jié)省了大量時(shí)間。
對(duì)于 epoll 的操作,其實(shí)也很簡(jiǎn)單,只要 4 個(gè) API 就可以完全操作它。
epoll_create用來創(chuàng)建一個(gè) epoll 描述符( 就是創(chuàng)建了一個(gè) epoll )
epoll_ctl操作 epoll 中的 event;可用參數(shù)有:
參數(shù) | 含義 |
---|---|
EPOLL_CTL_ADD | 添加一個(gè)新的epoll事件 |
EPOLL_CTL_DEL | 刪除一個(gè)epoll事件 |
EPOLL_CTL_MOD | 改變一個(gè)事件的監(jiān)聽方式 |
而事件的監(jiān)聽方式有七種,而我們只需要關(guān)心其中的三種:
宏定義 | 含義 |
---|---|
EPOLLIN | 緩沖區(qū)滿,有數(shù)據(jù)可讀 |
EPOLLOUT | 緩沖區(qū)空,可寫數(shù)據(jù) |
EPOLLERR | 發(fā)生錯(cuò)誤 |
就是讓 epoll 開始工作,里面有個(gè)參數(shù) timeout,當(dāng)設(shè)置為非 0 正整數(shù)時(shí),會(huì)監(jiān)聽(阻塞) timeout 秒;設(shè)置為 0 時(shí)立即返回,設(shè)置為 -1 時(shí)一直監(jiān)聽。
在監(jiān)聽時(shí)有數(shù)據(jù)活躍的連接時(shí)其返回活躍的文件句柄列表(此處為 socket 文件句柄)。
close關(guān)閉 epoll
現(xiàn)在了解了 epoll 后,我們就可以來看 ioloop 了 (如果對(duì) epoll 還有疑問可以看這兩篇資料: epoll 的原理是什么、百度百科:epoll)
tornado.ioloop很多初學(xué)者一定好奇 tornado 運(yùn)行服務(wù)器最后那一句 tornado.ioloop.IOLoop.current().start() 到底是干什么的。 我們先不解釋作用,來看看這一句代碼背后到底都在干什么。
先貼 ioloop 代碼:
from __future__ import absolute_import, division, print_function, with_statement import datetime import errno import functools import heapq # 最小堆 import itertools import logging import numbers import os import select import sys import threading import time import traceback import math from tornado.concurrent import TracebackFuture, is_future from tornado.log import app_log, gen_log from tornado.platform.auto import set_close_exec, Waker from tornado import stack_context from tornado.util import PY3, Configurable, errno_from_exception, timedelta_to_seconds try: import signal except ImportError: signal = None if PY3: import _thread as thread else: import thread _POLL_TIMEOUT = 3600.0 class TimeoutError(Exception): pass class IOLoop(Configurable): _EPOLLIN = 0x001 _EPOLLPRI = 0x002 _EPOLLOUT = 0x004 _EPOLLERR = 0x008 _EPOLLHUP = 0x010 _EPOLLRDHUP = 0x2000 _EPOLLONESHOT = (1 << 30) _EPOLLET = (1 << 31) # Our events map exactly to the epoll events NONE = 0 READ = _EPOLLIN WRITE = _EPOLLOUT ERROR = _EPOLLERR | _EPOLLHUP # Global lock for creating global IOLoop instance _instance_lock = threading.Lock() _current = threading.local() @staticmethod def instance(): if not hasattr(IOLoop, "_instance"): with IOLoop._instance_lock: if not hasattr(IOLoop, "_instance"): # New instance after double check IOLoop._instance = IOLoop() return IOLoop._instance @staticmethod def initialized(): """Returns true if the singleton instance has been created.""" return hasattr(IOLoop, "_instance") def install(self): assert not IOLoop.initialized() IOLoop._instance = self @staticmethod def clear_instance(): """Clear the global `IOLoop` instance. .. versionadded:: 4.0 """ if hasattr(IOLoop, "_instance"): del IOLoop._instance @staticmethod def current(instance=True): current = getattr(IOLoop._current, "instance", None) if current is None and instance: return IOLoop.instance() return current def make_current(self): IOLoop._current.instance = self @staticmethod def clear_current(): IOLoop._current.instance = None @classmethod def configurable_base(cls): return IOLoop @classmethod def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is not None: raise RuntimeError("current IOLoop already exists") self.make_current() def close(self, all_fds=False): raise NotImplementedError() def add_handler(self, fd, handler, events): raise NotImplementedError() def update_handler(self, fd, events): raise NotImplementedError() def remove_handler(self, fd): raise NotImplementedError() def set_blocking_signal_threshold(self, seconds, action): raise NotImplementedError() def set_blocking_log_threshold(self, seconds): self.set_blocking_signal_threshold(seconds, self.log_stack) def log_stack(self, signal, frame): gen_log.warning("IOLoop blocked for %f seconds in %s", self._blocking_signal_threshold, "".join(traceback.format_stack(frame))) def start(self): raise NotImplementedError() def _setup_logging(self): if not any([logging.getLogger().handlers, logging.getLogger("tornado").handlers, logging.getLogger("tornado.application").handlers]): logging.basicConfig() def stop(self): raise NotImplementedError() def run_sync(self, func, timeout=None): future_cell = [None] def run(): try: result = func() if result is not None: from tornado.gen import convert_yielded result = convert_yielded(result) except Exception: future_cell[0] = TracebackFuture() future_cell[0].set_exc_info(sys.exc_info()) else: if is_future(result): future_cell[0] = result else: future_cell[0] = TracebackFuture() future_cell[0].set_result(result) self.add_future(future_cell[0], lambda future: self.stop()) self.add_callback(run) if timeout is not None: timeout_handle = self.add_timeout(self.time() + timeout, self.stop) self.start() if timeout is not None: self.remove_timeout(timeout_handle) if not future_cell[0].done(): raise TimeoutError("Operation timed out after %s seconds" % timeout) return future_cell[0].result() def time(self): return time.time() ...
IOLoop 類首先聲明了 epoll 監(jiān)聽事件的宏定義,當(dāng)然,如前文所說,我們只要關(guān)心其中的 EPOLLIN 、 EPOLLOUT 、 EPOLLERR 就行。
類中的方法有很多,看起來有點(diǎn)暈,但其實(shí)我們只要關(guān)心 IOLoop 核心功能的方法即可,其他的方法在明白核心功能后也就不難理解了。所以接下來我們著重分析核心代碼。
instance 、 initialized、 install、 clear_instance、 current、 make_current、 clear_current 這些方法不用在意細(xì)節(jié),總之現(xiàn)在記住它們都是為了讓 IOLoop 類變成一個(gè)單例,保證從全局上調(diào)用的都是同一個(gè) IOLoop 就好。
你一定疑惑 IOLoop 為何沒有 __init__, 其實(shí)是因?yàn)橐跏蓟蔀閱卫?,IOLoop 的 new 函數(shù)已經(jīng)被改寫了,同時(shí)指定了 initialize 做為它的初始化方法,所以此處沒有 __init__ 。 說到這,ioloop 的代碼里好像沒有看到 new 方法,這又是什么情況? 我們先暫時(shí)記住這里。
接著我們來看這個(gè)初始化方法:
def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is None: raise RuntimeError("current IOLoop already exists") self.make_current() def make_current(self): IOLoop._current.instance = self
what? 里面只是判斷了是否第一次初始化或者調(diào)用 self.make_current() 初始化,而 make_current() 里也僅僅是把實(shí)例指定為自己,那么初始化到底去哪了?
然后再看看 start() 、 run() 、 close() 這些關(guān)鍵的方法都成了返回 NotImplementedError 錯(cuò)誤,全部未定義?!跟網(wǎng)上搜到的源碼分析完全不一樣啊。 這時(shí)候看下 IOLoop 的繼承關(guān)系,原來問題出在這里,之前的 tornado.ioloop 繼承自 object 所以所有的一切都自己實(shí)現(xiàn),而現(xiàn)在版本的 tornado.ioloop 則繼承自 Configurable 看起來現(xiàn)在的 IOLoop 已經(jīng)成為了一個(gè)基類,只定義了接口。 所以接著看 Configurable 代碼:
tornado.util.Configurableclass Configurable(object): __impl_class = None __impl_kwargs = None def __new__(cls, *args, **kwargs): base = cls.configurable_base() init_kwargs = {} if cls is base: impl = cls.configured_class() if base.__impl_kwargs: init_kwargs.update(base.__impl_kwargs) else: impl = cls init_kwargs.update(kwargs) instance = super(Configurable, cls).__new__(impl) # initialize vs __init__ chosen for compatibility with AsyncHTTPClient # singleton magic. If we get rid of that we can switch to __init__ # here too. instance.initialize(*args, **init_kwargs) return instance @classmethod def configurable_base(cls): """Returns the base class of a configurable hierarchy. This will normally return the class in which it is defined. (which is *not* necessarily the same as the cls classmethod parameter). """ raise NotImplementedError() @classmethod def configurable_default(cls): """Returns the implementation class to be used if none is configured.""" raise NotImplementedError() def initialize(self): """Initialize a `Configurable` subclass instance. Configurable classes should use `initialize` instead of ``__init__``. .. versionchanged:: 4.2 Now accepts positional arguments in addition to keyword arguments. """ @classmethod def configure(cls, impl, **kwargs): """Sets the class to use when the base class is instantiated. Keyword arguments will be saved and added to the arguments passed to the constructor. This can be used to set global defaults for some parameters. """ base = cls.configurable_base() if isinstance(impl, (unicode_type, bytes)): impl = import_object(impl) if impl is not None and not issubclass(impl, cls): raise ValueError("Invalid subclass of %s" % cls) base.__impl_class = impl base.__impl_kwargs = kwargs @classmethod def configured_class(cls): """Returns the currently configured class.""" base = cls.configurable_base() if cls.__impl_class is None: base.__impl_class = cls.configurable_default() return base.__impl_class @classmethod def _save_configuration(cls): base = cls.configurable_base() return (base.__impl_class, base.__impl_kwargs) @classmethod def _restore_configuration(cls, saved): base = cls.configurable_base() base.__impl_class = saved[0] base.__impl_kwargs = saved[1]
之前我們尋找的 __new__ 出現(xiàn)了! 注意其中這句: impl = cls.configured_class() impl 在這里就是 epoll ,它的生成函數(shù)是 configured_class(), 而其方法里又有 base.__impl_class = cls.configurable_default() ,調(diào)用了 configurable_default() 。而 Configurable 的 configurable_default():
def configurable_default(cls): """Returns the implementation class to be used if none is configured.""" raise NotImplementedError()
顯然也是個(gè)接口,那么我們?cè)倩仡^看 ioloop 的 configurable_default():
def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop
原來這是個(gè)工廠函數(shù),根據(jù)不同的操作系統(tǒng)返回不同的事件池(linux 就是 epoll, mac 返回 kqueue,其他就返回普通的 select。 kqueue 基本等同于 epoll, 只是不同系統(tǒng)對(duì)其的不同實(shí)現(xiàn))
現(xiàn)在線索轉(zhuǎn)移到了 tornado.platform.epoll.EPollIOLoop 上,我們?cè)賮砜纯?EPollIOLoop:
tornado.platform.epoll.EPollIOLoopimport select from tornado.ioloop import PollIOLoop class EPollIOLoop(PollIOLoop): def initialize(self, **kwargs): super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
EPollIOLoop 完全繼承自 PollIOLoop (注意這里是 PollIOLoop 不是 IOLoop)并只是在初始化時(shí)指定了 impl 是 epoll,所以看起來我們用 IOLoop 初始化最后初始化的其實(shí)就是這個(gè) PollIOLoop,所以接下來,我們真正需要理解和閱讀的內(nèi)容應(yīng)該都在這里:
tornado.ioloop.PollIOLoopclass PollIOLoop(IOLoop): """Base class for IOLoops built around a select-like function. For concrete implementations, see `tornado.platform.epoll.EPollIOLoop` (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or `tornado.platform.select.SelectIOLoop` (all platforms). """ def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl if hasattr(self._impl, "fileno"): set_close_exec(self._impl.fileno()) self.time_func = time_func or time.time self._handlers = {} self._events = {} self._callbacks = [] self._callback_lock = threading.Lock() self._timeouts = [] self._cancellations = 0 self._running = False self._stopped = False self._closing = False self._thread_ident = None self._blocking_signal_threshold = None self._timeout_counter = itertools.count() # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) def close(self, all_fds=False): with self._callback_lock: self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: for fd, handler in self._handlers.values(): self.close_fd(fd) self._waker.close() self._impl.close() self._callbacks = None self._timeouts = None def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True) def set_blocking_signal_threshold(self, seconds, action): if not hasattr(signal, "setitimer"): gen_log.error("set_blocking_signal_threshold requires a signal module " "with the setitimer method") return self._blocking_signal_threshold = seconds if seconds is not None: signal.signal(signal.SIGALRM, action if action is not None else signal.SIG_DFL) def start(self): ... try: while True: # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. with self._callback_lock: callbacks = self._callbacks self._callbacks = [] # Add any timeouts that have come due to the callback list. # Do not run anything until we have determined which ones # are ready, so timeouts that call add_timeout cannot # schedule anything in this iteration. due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: # The timeout was cancelled. Note that the # cancellation check is repeated below for timeouts # that are cancelled by another timeout or callback. heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): # Clean up the timeout queue when it gets large and it"s # more than half cancellations. self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for callback in callbacks: self._run_callback(callback) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # Closures may be holding on to a lot of memory, so allow # them to be freed before we go into our poll wait. callbacks = callback = due_timeouts = timeout = None if self._callbacks: # If any callbacks or timeouts called add_callback, # we don"t want to wait in poll() before we run them. poll_timeout = 0.0 elif self._timeouts: # If there are any timeouts, schedule the first one. # Use self.time() instead of "now" to account for time # spent running callbacks. poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: # No timeouts and no callbacks, so use the default. poll_timeout = _POLL_TIMEOUT if not self._running: break if self._blocking_signal_threshold is not None: # clear alarm so it doesn"t fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, "Interrupted system call") if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # Happens when the client closes the connection pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: # reset the stopped flag so another start/stop pair can be issued self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) def stop(self): self._running = False self._stopped = True self._waker.wake() def time(self): return self.time_func() def call_at(self, deadline, callback, *args, **kwargs): timeout = _Timeout( deadline, functools.partial(stack_context.wrap(callback), *args, **kwargs), self) heapq.heappush(self._timeouts, timeout) return timeout def remove_timeout(self, timeout): # Removing from a heap is complicated, so just leave the defunct # timeout object in the queue (see discussion in # http://docs.python.org/library/heapq.html). # If this turns out to be a problem, we could add a garbage # collection pass whenever there are too many dead timeouts. timeout.callback = None self._cancellations += 1 def add_callback(self, callback, *args, **kwargs): with self._callback_lock: if self._closing: raise RuntimeError("IOLoop is closing") list_empty = not self._callbacks self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if list_empty and thread.get_ident() != self._thread_ident: # If we"re in the IOLoop"s thread, we know it"s not currently # polling. If we"re not, and we added the first callback to an # empty list, we may need to wake it up (it may wake up on its # own, but an occasional extra wake is harmless). Waking # up a polling IOLoop is relatively expensive, so we try to # avoid it when we can. self._waker.wake() def add_callback_from_signal(self, callback, *args, **kwargs): with stack_context.NullContext(): if thread.get_ident() != self._thread_ident: # if the signal is handled on another thread, we can add # it normally (modulo the NullContext) self.add_callback(callback, *args, **kwargs) else: # If we"re on the IOLoop"s thread, we cannot use # the regular add_callback because it may deadlock on # _callback_lock. Blindly insert into self._callbacks. # This is safe because the GIL makes list.append atomic. # One subtlety is that if the signal interrupted the # _callback_lock block in IOLoop.start, we may modify # either the old or new version of self._callbacks, # but either way will work. self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs))
果然, PollIOLoop 繼承自 IOLoop 并實(shí)現(xiàn)了它的所有接口,現(xiàn)在我們終于可以進(jìn)入真正的正題了
ioloop 分析首先要看的是關(guān)于 epoll 操作的方法,還記得前文說過的 epoll 只需要四個(gè) api 就能完全操作嘛? 我們來看 PollIOLoop 的實(shí)現(xiàn):
epoll 操作def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
epoll_ctl:這個(gè)三個(gè)方法分別對(duì)應(yīng) epoll_ctl 中的 add 、 modify 、 del 參數(shù)。 所以這三個(gè)方法實(shí)現(xiàn)了 epoll 的 epoll_ctl 。
epoll_create:然后 epoll 的生成在前文 EPollIOLoop 的初始化中就已經(jīng)完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)。 這個(gè)相當(dāng)于 epoll_create 。
epoll_wait:epoll_wait 操作則在 start() 中:event_pairs = self._impl.poll(poll_timeout)
epoll_close:而 epoll 的 close 則在 PollIOLoop 中的 close 方法內(nèi)調(diào)用: self._impl.close() 完成。
initialize接下來看 PollIOLoop 的初始化方法中作了什么:
def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl # 指定 epoll if hasattr(self._impl, "fileno"): set_close_exec(self._impl.fileno()) # fork 后關(guān)閉無用文件描述符 self.time_func = time_func or time.time # 指定獲取當(dāng)前時(shí)間的函數(shù) self._handlers = {} # handler 的字典,儲(chǔ)存被 epoll 監(jiān)聽的 handler,與打開它的文件描述符 ( file descriptor 簡(jiǎn)稱 fd ) 一一對(duì)應(yīng) self._events = {} # event 的字典,儲(chǔ)存 epoll 返回的活躍的 fd event pairs self._callbacks = [] # 儲(chǔ)存各個(gè) fd 回調(diào)函數(shù)的列表 self._callback_lock = threading.Lock() # 指定進(jìn)程鎖 self._timeouts = [] # 將是一個(gè)最小堆結(jié)構(gòu),按照超時(shí)時(shí)間從小到大排列的 fd 的任務(wù)堆( 通常這個(gè)任務(wù)都會(huì)包含一個(gè) callback ) self._cancellations = 0 # 關(guān)于 timeout 的計(jì)數(shù)器 self._running = False # ioloop 是否在運(yùn)行 self._stopped = False # ioloop 是否停止 self._closing = False # ioloop 是否關(guān)閉 self._thread_ident = None # 當(dāng)前線程堆標(biāo)識(shí)符 ( thread identify ) self._blocking_signal_threshold = None # 系統(tǒng)信號(hào), 主要用來在 epoll_wait 時(shí)判斷是否會(huì)有 signal alarm 打斷 epoll self._timeout_counter = itertools.count() # 超時(shí)計(jì)數(shù)器 ( 暫時(shí)不是很明白具體作用,好像和前面的 _cancellations 有關(guān)系? 請(qǐng)大神講講) self._waker = Waker() # 一個(gè) waker 類,主要是對(duì)于管道 pipe 的操作,因?yàn)?ioloop 屬于底層的數(shù)據(jù)操作,這里 epoll 監(jiān)聽的是 pipe self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) # 將管道加入 epoll 監(jiān)聽,對(duì)于 web server 初始化時(shí)只需要關(guān)心 READ 事件
除了注釋中的解釋,還有幾點(diǎn)補(bǔ)充:
close_exec 的作用: 子進(jìn)程在fork出來的時(shí)候,使用了寫時(shí)復(fù)制(COW,Copy-On-Write)方式獲得父進(jìn)程的數(shù)據(jù)空間、 堆和棧副本,這其中也包括文件描述符。剛剛fork成功時(shí),父子進(jìn)程中相同的文件描述符指向系統(tǒng)文件表中的同一項(xiàng),接著,一般我們會(huì)調(diào)用exec執(zhí)行另一個(gè)程序,此時(shí)會(huì)用全新的程序替換子進(jìn)程的正文,數(shù)據(jù),堆和棧等。此時(shí)保存文件描述符的變量當(dāng)然也不存在了,我們就無法關(guān)閉無用的文件描述符了。所以通常我們會(huì)fork子進(jìn)程后在子進(jìn)程中直接執(zhí)行close關(guān)掉無用的文件描述符,然后再執(zhí)行exec。 所以 close_exec 執(zhí)行的其實(shí)就是 關(guān)閉 + 執(zhí)行的作用。 詳情可以查看: 關(guān)于linux進(jìn)程間的close-on-exec機(jī)制
Waker(): Waker 封裝了對(duì)于管道 pipe 的操作:
def set_close_exec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) def _set_nonblocking(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) class Waker(interface.Waker): def __init__(self): r, w = os.pipe() _set_nonblocking(r) _set_nonblocking(w) set_close_exec(r) set_close_exec(w) self.reader = os.fdopen(r, "rb", 0) self.writer = os.fdopen(w, "wb", 0) def fileno(self): return self.reader.fileno() def write_fileno(self): return self.writer.fileno() def wake(self): try: self.writer.write(b"x") except IOError: pass def consume(self): try: while True: result = self.reader.read() if not result: break except IOError: pass def close(self): self.reader.close() self.writer.close()
可以看到 waker 把 pipe 分為讀、 寫兩個(gè)管道并都設(shè)置了非阻塞和 close_exec。 注意wake(self)方法中:self.writer.write(b"x") 直接向管道中寫入隨意字符從而釋放管道。
startioloop 最核心的部分:
def start(self): if self._running: # 判斷是否已經(jīng)運(yùn)行 raise RuntimeError("IOLoop is already running") self._setup_logging() if self._stopped: self._stopped = False # 設(shè)置停止為假 return old_current = getattr(IOLoop._current, "instance", None) IOLoop._current.instance = self self._thread_ident = thread.get_ident() # 獲得當(dāng)前線程標(biāo)識(shí)符 self._running = True # 設(shè)置運(yùn)行 old_wakeup_fd = None if hasattr(signal, "set_wakeup_fd") and os.name == "posix": try: old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno()) if old_wakeup_fd != -1: signal.set_wakeup_fd(old_wakeup_fd) old_wakeup_fd = None except ValueError: old_wakeup_fd = None try: while True: # 服務(wù)器進(jìn)程正式開始,類似于其他服務(wù)器的 serve_forever with self._callback_lock: # 加鎖,_callbacks 做為臨界區(qū)不加鎖進(jìn)行讀寫會(huì)產(chǎn)生臟數(shù)據(jù) callbacks = self._callbacks # 讀取 _callbacks self._callbacks = []. # 清空 _callbacks due_timeouts = [] # 用于存放這個(gè)周期內(nèi)已過期( 已超時(shí) )的任務(wù) if self._timeouts: # 判斷 _timeouts 里是否有數(shù)據(jù) now = self.time() # 獲取當(dāng)前時(shí)間,用來判斷 _timeouts 里的任務(wù)有沒有超時(shí) while self._timeouts: # _timeouts 有數(shù)據(jù)時(shí)一直循環(huán), _timeouts 是個(gè)最小堆,第一個(gè)數(shù)據(jù)永遠(yuǎn)是最小的, 這里第一個(gè)數(shù)據(jù)永遠(yuǎn)是最接近超時(shí)或已超時(shí)的 if self._timeouts[0].callback is None: # 超時(shí)任務(wù)無回調(diào) heapq.heappop(self._timeouts) # 直接彈出 self._cancellations -= 1 # 超時(shí)計(jì)數(shù)器 -1 elif self._timeouts[0].deadline <= now: # 判斷最小的數(shù)據(jù)是否超時(shí) due_timeouts.append(heapq.heappop(self._timeouts)) # 超時(shí)就加到已超時(shí)列表里。 else: break # 因?yàn)樽钚《?,如果沒超時(shí)就直接退出循環(huán)( 后面的數(shù)據(jù)必定未超時(shí) ) if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): # 當(dāng)超時(shí)計(jì)數(shù)器大于 512 并且 大于 _timeouts 長(zhǎng)度一半( >> 為右移運(yùn)算, 相當(dāng)于十進(jìn)制數(shù)據(jù)被除 2 )時(shí),清零計(jì)數(shù)器,并剔除 _timeouts 中無 callbacks 的任務(wù) self._cancellations = 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) # 進(jìn)行 _timeouts 最小堆化 for callback in callbacks: self._run_callback(callback) # 運(yùn)行 callbacks 里所有的 calllback for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # 運(yùn)行所有已過期任務(wù)的 callback callbacks = callback = due_timeouts = timeout = None # 釋放內(nèi)存 if self._callbacks: # _callbacks 里有數(shù)據(jù)時(shí) poll_timeout = 0.0 # 設(shè)置 epoll_wait 時(shí)間為0( 立即返回 ) elif self._timeouts: # _timeouts 里有數(shù)據(jù)時(shí) poll_timeout = self._timeouts[0].deadline - self.time() # 取最小過期時(shí)間當(dāng) epoll_wait 等待時(shí)間,這樣當(dāng)?shù)谝粋€(gè)任務(wù)過期時(shí)立即返回 poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) # 如果最小過期時(shí)間大于默認(rèn)等待時(shí)間 _POLL_TIMEOUT = 3600,則用 3600,如果最小過期時(shí)間小于0 就設(shè)置為0 立即返回。 else: poll_timeout = _POLL_TIMEOUT # 默認(rèn) 3600 s 等待時(shí)間 if not self._running: # 檢查是否有系統(tǒng)信號(hào)中斷運(yùn)行,有則中斷,無則繼續(xù) break if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) # 開始 epoll_wait 之前確保 signal alarm 都被清空( 這樣在 epoll_wait 過程中不會(huì)被 signal alarm 打斷 ) try: event_pairs = self._impl.poll(poll_timeout) # 獲取返回的活躍事件隊(duì) except Exception as e: if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # epoll_wait 結(jié)束, 再設(shè)置 signal alarm self._events.update(event_pairs) # 將活躍事件加入 _events while self._events: fd, events = self._events.popitem() # 循環(huán)彈出事件 try: fd_obj, handler_func = self._handlers[fd] # 處理事件 handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: self._stopped = False # 確保發(fā)生異常也繼續(xù)運(yùn)行 if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) # 清空 signal alarm IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) # 和 start 開頭部分對(duì)應(yīng),但是不是很清楚作用,求老司機(jī)帶帶路stop
def stop(self): self._running = False self._stopped = True self._waker.wake()
這個(gè)很簡(jiǎn)單,設(shè)置判斷條件,然后調(diào)用 self._waker.wake() 向 pipe 寫入隨意字符喚醒 ioloop 事件循環(huán)。 over!
總結(jié)噗,寫了這么長(zhǎng),終于寫完了。 經(jīng)過分析,我們可以看到, ioloop 實(shí)際上是對(duì) epoll 的封裝,并加入了一些對(duì)上層事件的處理和 server 相關(guān)的底層處理。
最后,感謝大家不辭辛苦看到這,文中理解有誤的地方還請(qǐng)多多指教!
原文地址
作者:rapospectre
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/45479.html
摘要:最大的特點(diǎn)就是其支持異步,所以它有著優(yōu)異的性能。的代碼結(jié)構(gòu)可以在其官網(wǎng)了解,本文著重分析的實(shí)現(xiàn)。事件驅(qū)動(dòng)模型的大致思路的方法用于啟動(dòng)事件循環(huán)。行文比較草率,如有錯(cuò)誤和不足之處,敬請(qǐng)指正。 0. 簡(jiǎn)介 tornado是一個(gè)用Python語言寫成的Web服務(wù)器兼Web應(yīng)用框架,由FriendFeed公司在自己的網(wǎng)站FriendFeed中使用,被Facebook收購以后框架以開源軟件形式開放...
摘要:清楚了以上流程,我們直接來看函數(shù)主要用作初始化應(yīng)用監(jiān)聽端口以及啟動(dòng)。其中就是保存聊天室所有聊天消息的結(jié)構(gòu)。關(guān)于的解讀我會(huì)放到閱讀源碼時(shí)講。然后把消息加到緩存里,如果緩存大于限制則取最新的條消息。 tornado 源碼自帶了豐富的 demo ,這篇文章主要分析 demo 中的聊天室應(yīng)用: chatdemo 首先看 chatdemo 的目錄結(jié)構(gòu): ├── chatdemo.py ├── ...
摘要:源碼之分析的協(xié)程原理分析版本為支持異步,實(shí)現(xiàn)了一個(gè)協(xié)程庫。提供了回調(diào)函數(shù)注冊(cè)當(dāng)異步事件完成后,調(diào)用注冊(cè)的回調(diào)中間結(jié)果保存結(jié)束結(jié)果返回等功能注冊(cè)回調(diào)函數(shù),當(dāng)被解決時(shí),改回調(diào)函數(shù)被調(diào)用。相當(dāng)于喚醒已經(jīng)處于狀態(tài)的父協(xié)程,通過回調(diào)函數(shù),再執(zhí)行。 tornado 源碼之 coroutine 分析 tornado 的協(xié)程原理分析 版本:4.3.0 為支持異步,tornado 實(shí)現(xiàn)了一個(gè)協(xié)程庫。 ...
摘要:主要是為了實(shí)現(xiàn)系統(tǒng)之間的雙向解耦而實(shí)現(xiàn)的。問題及優(yōu)化隊(duì)列過長(zhǎng)問題使用上述方案的異步非阻塞可能會(huì)依賴于的任務(wù)隊(duì)列長(zhǎng)度,若隊(duì)列中的任務(wù)過多,則可能導(dǎo)致長(zhǎng)時(shí)間等待,降低效率。 Tornado和Celery介紹 1.Tornado Tornado是一個(gè)用python編寫的一個(gè)強(qiáng)大的、可擴(kuò)展的異步HTTP服務(wù)器,同時(shí)也是一個(gè)web開發(fā)框架。tornado是一個(gè)非阻塞式web服務(wù)器,其速度相當(dāng)快。...
摘要:對(duì)參數(shù)類型進(jìn)行檢驗(yàn),這里為當(dāng)參數(shù)類型不合適是會(huì)拋出一個(gè)異常。將使用的第二個(gè)參數(shù)值作為默認(rèn)值。而請(qǐng)求將從格式中取得指定的文本。這里需要正則表達(dá)式相關(guān)的知識(shí),到了后面的學(xué)習(xí)中,必要時(shí)再去深入學(xué)習(xí)。到目前我們使用了,還支持任何合法的請(qǐng)求。 參考書籍《Introduction to Tornado》1.1 Tornado是什么?Tornado是使用Python編寫的一個(gè)強(qiáng)大的、可擴(kuò)展的Web服...
閱讀 710·2021-11-18 10:02
閱讀 2248·2021-11-15 18:13
閱讀 3173·2021-11-15 11:38
閱讀 2962·2021-09-22 15:55
閱讀 3684·2021-08-09 13:43
閱讀 2453·2021-07-25 14:19
閱讀 2461·2019-08-30 14:15
閱讀 3457·2019-08-30 14:15