成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

tornado 源碼之 iostream.py

JeOam / 2911人閱讀

摘要:對進行包裝,采用注冊回調(diào)方式實現(xiàn)非阻塞。通過接口注冊各個事件回調(diào)中事件發(fā)生后,調(diào)用方法,對事件進行分發(fā)。

iostream.py
A utility class to write to and read from a non-blocking socket.

IOStream 對 socket 進行包裝,采用注冊回調(diào)方式實現(xiàn)非阻塞。
通過接口注冊各個事件回調(diào)

_read_callback

_write_callback

_close_callback

_connect_callback

ioloop 中 socket 事件發(fā)生后,調(diào)用 IOStream._handle_events 方法,對事件進行分發(fā)。
對應(yīng)的事件處理過程中,如果滿足注冊的回調(diào)條件,則調(diào)用回調(diào)函數(shù)
回調(diào)函數(shù)在 IOStream._handle_events 中被調(diào)用

contents

iostream.py

contents

example

head

IOStream.__init__

IOStream.connect

IOStream.read_until

IOStream.read_bytes

IOStream.write

IOStream.close

IOStream._handle_events

IOStream._run_callback

IOStream._run_callback

IOStream._read_from_socket

IOStream._read_to_buffer

IOStream._read_from_buffer

IOStream._handle_connect

IOStream._handle_write

IOStream._consume

IOStream._add_io_state

IOStream._read_buffer_size

copyright

example

一個簡單的 IOStream 客戶端示例
由此可見, IOStream 是一個異步回調(diào)鏈

創(chuàng)建 socket

創(chuàng)建 IOStream 對象

連接到主機,傳入連接成功后回調(diào)函數(shù) send_request

socket 輸出數(shù)據(jù)請求頁面,讀取 head,傳入讀取 head 成功后回調(diào)函數(shù) on_headers

繼續(xù)讀取 body,傳入讀取 body 成功后回調(diào)函數(shù) on_body

關(guān)閉 stream,關(guān)閉 ioloop

from tornado import ioloop
from tornado import iostream
import socket


def send_request():
    stream.write("GET / HTTP/1.0
Host: baidu.com

")
    stream.read_until("

", on_headers)


def on_headers(data):
    headers = {}
    for line in data.split("
"):
        parts = line.split(":")
        if len(parts) == 2:
            headers[parts[0].strip()] = parts[1].strip()
    stream.read_bytes(int(headers["Content-Length"]), on_body)


def on_body(data):
    print data
    stream.close()
    ioloop.IOLoop.instance().stop()


s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
stream = iostream.IOStream(s)
stream.connect(("baidu.com", 80), send_request)
ioloop.IOLoop.instance().start()


# html>
# 
# 
head
from __future__ import with_statement

import collections
import errno
import logging
import socket
import sys

from tornado import ioloop
from tornado import stack_context

try:
    import ssl # Python 2.6+
except ImportError:
    ssl = None
IOStream.__init__

包裝 socket 類
關(guān)鍵語句 self.io_loop.add_handler( self.socket.fileno(), self._handle_events, self._state) 將自身的_handle_events 加入到全局 ioloop poll 事件回調(diào)
此時只注冊了 ERROR 類型事件

_read_buffer: 讀緩沖

class IOStream(object):

    def __init__(self, socket, io_loop=None, max_buffer_size=104857600,
                 read_chunk_size=4096):
        self.socket = socket
        self.socket.setblocking(False)
        self.io_loop = io_loop or ioloop.IOLoop.instance()
        self.max_buffer_size = max_buffer_size
        self.read_chunk_size = read_chunk_size
        self._read_buffer = collections.deque()
        self._write_buffer = collections.deque()
        self._write_buffer_frozen = False
        self._read_delimiter = None
        self._read_bytes = None
        self._read_callback = None
        self._write_callback = None
        self._close_callback = None
        self._connect_callback = None
        self._connecting = False
        self._state = self.io_loop.ERROR
        with stack_context.NullContext():
            self.io_loop.add_handler(
                self.socket.fileno(), self._handle_events, self._state)
IOStream.connect

連接 socket 到遠程地址,非阻塞模式

連接 socket

注冊連接完成回調(diào)

poll 增加 socket 寫事件

    def connect(self, address, callback=None):
        """Connects the socket to a remote address without blocking.

        May only be called if the socket passed to the constructor was
        not previously connected.  The address parameter is in the
        same format as for socket.connect, i.e. a (host, port) tuple.
        If callback is specified, it will be called when the
        connection is completed.

        Note that it is safe to call IOStream.write while the
        connection is pending, in which case the data will be written
        as soon as the connection is ready.  Calling IOStream read
        methods before the socket is connected works on some platforms
        but is non-portable.
        """
        self._connecting = True
        try:
            self.socket.connect(address)
        except socket.error, e:
            # In non-blocking mode connect() always raises an exception
            if e.args[0] not in (errno.EINPROGRESS, errno.EWOULDBLOCK):
                raise
        self._connect_callback = stack_context.wrap(callback)
        self._add_io_state(self.io_loop.WRITE)
IOStream.read_until

注冊讀完成回調(diào)

嘗試從緩沖中讀

從 socket 中讀到緩沖區(qū)

重復(fù) 2,3,沒有數(shù)據(jù)則退出

將 socket 讀事件加入 poll

如果緩存中數(shù)據(jù)滿足條件,則直接執(zhí)行 callback 并返回,
否則,保存 callback 函數(shù)下次 read 事件發(fā)生時,_handle_events 處理讀事件時,再進行檢測及調(diào)用

    def read_until(self, delimiter, callback):
        """Call callback when we read the given delimiter."""
        assert not self._read_callback, "Already reading"
        self._read_delimiter = delimiter
        self._read_callback = stack_context.wrap(callback)
        while True:
            # See if we"ve already got the data from a previous read
            if self._read_from_buffer():
                return
            self._check_closed()
            if self._read_to_buffer() == 0:
                break
        self._add_io_state(self.io_loop.READ)
IOStream.read_bytes

參考 read_until,讀限定字節(jié)

    def read_bytes(self, num_bytes, callback):
        """Call callback when we read the given number of bytes."""
        assert not self._read_callback, "Already reading"
        if num_bytes == 0:
            callback("")
            return
        self._read_bytes = num_bytes
        self._read_callback = stack_context.wrap(callback)
        while True:
            if self._read_from_buffer():
                return
            self._check_closed()
            if self._read_to_buffer() == 0:
                break
        self._add_io_state(self.io_loop.READ)
IOStream.write
    def write(self, data, callback=None):
        """Write the given data to this stream.

        If callback is given, we call it when all of the buffered write
        data has been successfully written to the stream. If there was
        previously buffered write data and an old write callback, that
        callback is simply overwritten with this new callback.
        """
        self._check_closed()
        self._write_buffer.append(data)
        self._add_io_state(self.io_loop.WRITE)
        self._write_callback = stack_context.wrap(callback)

    def set_close_callback(self, callback):
        """Call the given callback when the stream is closed."""
        self._close_callback = stack_context.wrap(callback)
IOStream.close

從 ioloop 移除 socket 事件

關(guān)閉 socket

調(diào)用關(guān)閉回調(diào)

    def close(self):
        """Close this stream."""
        if self.socket is not None:
            self.io_loop.remove_handler(self.socket.fileno())
            self.socket.close()
            self.socket = None
            if self._close_callback:
                self._run_callback(self._close_callback)

    def reading(self):
        """Returns true if we are currently reading from the stream."""
        return self._read_callback is not None

    def writing(self):
        """Returns true if we are currently writing to the stream."""
        return bool(self._write_buffer)

    def closed(self):
        return self.socket is None
IOStream._handle_events

核心回調(diào)
任何類型的 socket 事件觸發(fā) ioloop 回調(diào)_handle_events,然后在_handle_events 再進行分發(fā)
值得注意的是,IOStream 不處理連接請求的 read 事件
注意
作為服務(wù)端,默認代理的是已經(jīng)建立連接的 socket

# HTTPServer.\_handle_events
# connection 為已經(jīng)accept的連接
stream = iostream.IOStream(connection, io_loop=self.io_loop)

作為客戶端,需要手動調(diào)用 IOStream.connect,連接成功后,成功回調(diào)在 write 事件中處理

這個實現(xiàn)比較別扭

    def _handle_events(self, fd, events):
        if not self.socket:
            logging.warning("Got events for closed stream %d", fd)
            return
        try:
            # 處理讀事件,調(diào)用已注冊回調(diào)
            if events & self.io_loop.READ:
                self._handle_read()
            if not self.socket:
                return
            # 處理寫事件,如果是剛建立連接,調(diào)用連接建立回調(diào)
            if events & self.io_loop.WRITE:
                if self._connecting:
                    self._handle_connect()
                self._handle_write()
            if not self.socket:
                return
            # 錯誤事件,關(guān)閉 socket
            if events & self.io_loop.ERROR:
                self.close()
                return
            state = self.io_loop.ERROR
            if self.reading():
                state |= self.io_loop.READ
            if self.writing():
                state |= self.io_loop.WRITE
            if state != self._state:
                self._state = state
                self.io_loop.update_handler(self.socket.fileno(), self._state)
        except:
            logging.error("Uncaught exception, closing connection.",
                          exc_info=True)
            self.close()
            raise
IOStream._run_callback

執(zhí)行回調(diào)

    def _run_callback(self, callback, *args, **kwargs):
        try:
            # Use a NullContext to ensure that all StackContexts are run
            # inside our blanket exception handler rather than outside.
            with stack_context.NullContext():
                callback(*args, **kwargs)
        except:
            logging.error("Uncaught exception, closing connection.",
                          exc_info=True)
            # Close the socket on an uncaught exception from a user callback
            # (It would eventually get closed when the socket object is
            # gc"d, but we don"t want to rely on gc happening before we
            # run out of file descriptors)
            self.close()
            # Re-raise the exception so that IOLoop.handle_callback_exception
            # can see it and log the error
            raise
IOStream._run_callback

讀回調(diào)

從 socket 讀取數(shù)據(jù)到緩存

無數(shù)據(jù),socket 關(guān)閉

檢測是否滿足 read_until read_bytes

滿足則執(zhí)行對應(yīng)回調(diào)

    def _handle_read(self):
        while True:
            try:
                # Read from the socket until we get EWOULDBLOCK or equivalent.
                # SSL sockets do some internal buffering, and if the data is
                # sitting in the SSL object"s buffer select() and friends
                # can"t see it; the only way to find out if it"s there is to
                # try to read it.
                result = self._read_to_buffer()
            except Exception:
                self.close()
                return
            if result == 0:
                break
            else:
                if self._read_from_buffer():
                    return
IOStream._read_from_socket

從 socket 讀取數(shù)據(jù)

    def _read_from_socket(self):
        """Attempts to read from the socket.

        Returns the data read or None if there is nothing to read.
        May be overridden in subclasses.
        """
        try:
            chunk = self.socket.recv(self.read_chunk_size)
        except socket.error, e:
            if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                return None
            else:
                raise
        if not chunk:
            self.close()
            return None
        return chunk
IOStream._read_to_buffer

從 socket 讀取數(shù)據(jù)存入緩存

    def _read_to_buffer(self):
        """Reads from the socket and appends the result to the read buffer.

        Returns the number of bytes read.  Returns 0 if there is nothing
        to read (i.e. the read returns EWOULDBLOCK or equivalent).  On
        error closes the socket and raises an exception.
        """
        try:
            chunk = self._read_from_socket()
        except socket.error, e:
            # ssl.SSLError is a subclass of socket.error
            logging.warning("Read error on %d: %s",
                            self.socket.fileno(), e)
            self.close()
            raise
        if chunk is None:
            return 0
        self._read_buffer.append(chunk)
        if self._read_buffer_size() >= self.max_buffer_size:
            logging.error("Reached maximum read buffer size")
            self.close()
            raise IOError("Reached maximum read buffer size")
        return len(chunk)
IOStream._read_from_buffer

從緩沖中過濾數(shù)據(jù)
檢測是否滿足結(jié)束條件(read_until/read_bytes),滿足則調(diào)用之前注冊的回調(diào)
采用的是查詢方式

    def _read_from_buffer(self):
        """Attempts to complete the currently-pending read from the buffer.

        Returns True if the read was completed.
        """
        if self._read_bytes:
            if self._read_buffer_size() >= self._read_bytes:
                num_bytes = self._read_bytes
                callback = self._read_callback
                self._read_callback = None
                self._read_bytes = None
                self._run_callback(callback, self._consume(num_bytes))
                return True
        elif self._read_delimiter:
            _merge_prefix(self._read_buffer, sys.maxint)
            loc = self._read_buffer[0].find(self._read_delimiter)
            if loc != -1:
                callback = self._read_callback
                delimiter_len = len(self._read_delimiter)
                self._read_callback = None
                self._read_delimiter = None
                self._run_callback(callback,
                                   self._consume(loc + delimiter_len))
                return True
        return False
IOStream._handle_connect

調(diào)用連接建立回調(diào),并清除連接中標志

    def _handle_connect(self):
        if self._connect_callback is not None:
            callback = self._connect_callback
            self._connect_callback = None
            self._run_callback(callback)
        self._connecting = False
IOStream._handle_write

寫事件

從緩沖區(qū)獲取限定范圍內(nèi)數(shù)據(jù)

調(diào)用 socket.send 輸出數(shù)據(jù)

如果數(shù)據(jù)發(fā)送我且已注冊回調(diào),調(diào)用發(fā)送完成回調(diào)

    def _handle_write(self):
        while self._write_buffer:
            try:
                if not self._write_buffer_frozen:
                    # On windows, socket.send blows up if given a
                    # write buffer that"s too large, instead of just
                    # returning the number of bytes it was able to
                    # process.  Therefore we must not call socket.send
                    # with more than 128KB at a time.
                    _merge_prefix(self._write_buffer, 128 * 1024)
                num_bytes = self.socket.send(self._write_buffer[0])
                self._write_buffer_frozen = False
                _merge_prefix(self._write_buffer, num_bytes)
                self._write_buffer.popleft()
            except socket.error, e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    # With OpenSSL, after send returns EWOULDBLOCK,
                    # the very same string object must be used on the
                    # next call to send.  Therefore we suppress
                    # merging the write buffer after an EWOULDBLOCK.
                    # A cleaner solution would be to set
                    # SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER, but this is
                    # not yet accessible from python
                    # (http://bugs.python.org/issue8240)
                    self._write_buffer_frozen = True
                    break
                else:
                    logging.warning("Write error on %d: %s",
                                    self.socket.fileno(), e)
                    self.close()
                    return
        if not self._write_buffer and self._write_callback:
            callback = self._write_callback
            self._write_callback = None
            self._run_callback(callback)
IOStream._consume

從讀緩存消費 loc 長度的數(shù)據(jù)

    def _consume(self, loc):
        _merge_prefix(self._read_buffer, loc)
        return self._read_buffer.popleft()

    def _check_closed(self):
        if not self.socket:
            raise IOError("Stream is closed")
IOStream._add_io_state

增加 socket 事件狀態(tài)

    def _add_io_state(self, state):
        if self.socket is None:
            # connection has been closed, so there can be no future events
            return
        if not self._state & state:
            self._state = self._state | state
            self.io_loop.update_handler(self.socket.fileno(), self._state)
IOStream._read_buffer_size

獲取讀緩存中已有數(shù)據(jù)長度

    def _read_buffer_size(self):
        return sum(len(chunk) for chunk in self._read_buffer)


class SSLIOStream(IOStream):
    """A utility class to write to and read from a non-blocking socket.

    If the socket passed to the constructor is already connected,
    it should be wrapped with
        ssl.wrap_socket(sock, do_handshake_on_connect=False, **kwargs)
    before constructing the SSLIOStream.  Unconnected sockets will be
    wrapped when IOStream.connect is finished.
    """
    def __init__(self, *args, **kwargs):
        """Creates an SSLIOStream.

        If a dictionary is provided as keyword argument ssl_options,
        it will be used as additional keyword arguments to ssl.wrap_socket.
        """
        self._ssl_options = kwargs.pop("ssl_options", {})
        super(SSLIOStream, self).__init__(*args, **kwargs)
        self._ssl_accepting = True
        self._handshake_reading = False
        self._handshake_writing = False

    def reading(self):
        return self._handshake_reading or super(SSLIOStream, self).reading()

    def writing(self):
        return self._handshake_writing or super(SSLIOStream, self).writing()

    def _do_ssl_handshake(self):
        # Based on code from test_ssl.py in the python stdlib
        try:
            self._handshake_reading = False
            self._handshake_writing = False
            self.socket.do_handshake()
        except ssl.SSLError, err:
            if err.args[0] == ssl.SSL_ERROR_WANT_READ:
                self._handshake_reading = True
                return
            elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
                self._handshake_writing = True
                return
            elif err.args[0] in (ssl.SSL_ERROR_EOF,
                                 ssl.SSL_ERROR_ZERO_RETURN):
                return self.close()
            elif err.args[0] == ssl.SSL_ERROR_SSL:
                logging.warning("SSL Error on %d: %s", self.socket.fileno(), err)
                return self.close()
            raise
        except socket.error, err:
            if err.args[0] == errno.ECONNABORTED:
                return self.close()
        else:
            self._ssl_accepting = False
            super(SSLIOStream, self)._handle_connect()

    def _handle_read(self):
        if self._ssl_accepting:
            self._do_ssl_handshake()
            return
        super(SSLIOStream, self)._handle_read()

    def _handle_write(self):
        if self._ssl_accepting:
            self._do_ssl_handshake()
            return
        super(SSLIOStream, self)._handle_write()

    def _handle_connect(self):
        self.socket = ssl.wrap_socket(self.socket,
                                      do_handshake_on_connect=False,
                                      **self._ssl_options)
        # Don"t call the superclass"s _handle_connect (which is responsible
        # for telling the application that the connection is complete)
        # until we"ve completed the SSL handshake (so certificates are
        # available, etc).


    def _read_from_socket(self):
        try:
            # SSLSocket objects have both a read() and recv() method,
            # while regular sockets only have recv().
            # The recv() method blocks (at least in python 2.6) if it is
            # called when there is nothing to read, so we have to use
            # read() instead.
            chunk = self.socket.read(self.read_chunk_size)
        except ssl.SSLError, e:
            # SSLError is a subclass of socket.error, so this except
            # block must come first.
            if e.args[0] == ssl.SSL_ERROR_WANT_READ:
                return None
            else:
                raise
        except socket.error, e:
            if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                return None
            else:
                raise
        if not chunk:
            self.close()
            return None
        return chunk

def _merge_prefix(deque, size):
    """Replace the first entries in a deque of strings with a single
    string of up to size bytes.

    >>> d = collections.deque(["abc", "de", "fghi", "j"])
    >>> _merge_prefix(d, 5); print d
    deque(["abcde", "fghi", "j"])

    Strings will be split as necessary to reach the desired size.
    >>> _merge_prefix(d, 7); print d
    deque(["abcdefg", "hi", "j"])

    >>> _merge_prefix(d, 3); print d
    deque(["abc", "defg", "hi", "j"])

    >>> _merge_prefix(d, 100); print d
    deque(["abcdefghij"])
    """
    prefix = []
    remaining = size
    while deque and remaining > 0:
        chunk = deque.popleft()
        if len(chunk) > remaining:
            deque.appendleft(chunk[remaining:])
            chunk = chunk[:remaining]
        prefix.append(chunk)
        remaining -= len(chunk)
    deque.appendleft("".join(prefix))

def doctests():
    import doctest
    return doctest.DocTestSuite()
copyright

author:bigfish
copyright: 許可協(xié)議 知識共享署名-非商業(yè)性使用 4.0 國際許可協(xié)議

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/42832.html

相關(guān)文章

  • tornado源碼解析IOLoop

    摘要:最大的特點就是其支持異步,所以它有著優(yōu)異的性能。的代碼結(jié)構(gòu)可以在其官網(wǎng)了解,本文著重分析的實現(xiàn)。事件驅(qū)動模型的大致思路的方法用于啟動事件循環(huán)。行文比較草率,如有錯誤和不足之處,敬請指正。 0. 簡介 tornado是一個用Python語言寫成的Web服務(wù)器兼Web應(yīng)用框架,由FriendFeed公司在自己的網(wǎng)站FriendFeed中使用,被Facebook收購以后框架以開源軟件形式開放...

    Lsnsh 評論0 收藏0
  • Tornado Demo chatdemo 不完全解讀

    摘要:清楚了以上流程,我們直接來看函數(shù)主要用作初始化應(yīng)用監(jiān)聽端口以及啟動。其中就是保存聊天室所有聊天消息的結(jié)構(gòu)。關(guān)于的解讀我會放到閱讀源碼時講。然后把消息加到緩存里,如果緩存大于限制則取最新的條消息。 tornado 源碼自帶了豐富的 demo ,這篇文章主要分析 demo 中的聊天室應(yīng)用: chatdemo 首先看 chatdemo 的目錄結(jié)構(gòu): ├── chatdemo.py ├── ...

    TesterHome 評論0 收藏0
  • [零基礎(chǔ)學python]python開發(fā)框架

    摘要:軟件開發(fā)者通常依據(jù)特定的框架實現(xiàn)更為復(fù)雜的商業(yè)運用和業(yè)務(wù)邏輯。所有,做開發(fā),要用一個框架。的性能是相當優(yōu)異的,因為它師徒解決一個被稱之為問題,就是處理大于或等于一萬的并發(fā)。 One does not live by bread alone,but by every word that comes from the mouth of God --(MATTHEW4:4) 不...

    lucas 評論0 收藏0
  • tornado 源碼 coroutine 分析

    摘要:源碼之分析的協(xié)程原理分析版本為支持異步,實現(xiàn)了一個協(xié)程庫。提供了回調(diào)函數(shù)注冊當異步事件完成后,調(diào)用注冊的回調(diào)中間結(jié)果保存結(jié)束結(jié)果返回等功能注冊回調(diào)函數(shù),當被解決時,改回調(diào)函數(shù)被調(diào)用。相當于喚醒已經(jīng)處于狀態(tài)的父協(xié)程,通過回調(diào)函數(shù),再執(zhí)行。 tornado 源碼之 coroutine 分析 tornado 的協(xié)程原理分析 版本:4.3.0 為支持異步,tornado 實現(xiàn)了一個協(xié)程庫。 ...

    NicolasHe 評論0 收藏0
  • Tornado 簡單入門教程(一)——Demo1

    摘要:也就是說用于設(shè)定與處理類間的映射關(guān)系。在中,默認使用和函數(shù)分別處理兩種請求。因為表單仍提交到當前頁面,所以還是由處理。載入時間相關(guān)的的一個類,獲取當前時間戳。獲取數(shù)據(jù)庫中的名為的。 前面的話 Demo1是一個簡單的博客系統(tǒng)(=。=什么網(wǎng)站都叫系統(tǒng))。我們從這個簡單的系統(tǒng)入手,去了解P+T+M網(wǎng)站的內(nèi)部邏輯,并記住一些規(guī)則,方便我們進一步自己開發(fā)。 規(guī)則這個詞特意打上了雙引號,目的是...

    solocoder 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<