tornado.tcpserver 源代码

#!/usr/bin/env python
# coding: utf-8
#
# Copyright 2011 Facebook
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

u"""一个非阻塞, 单线程 TCP 服务."""
from __future__ import absolute_import, division, print_function, with_statement

import errno
import os
import socket

from tornado.log import app_log
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream, SSLIOStream
from tornado.netutil import bind_sockets, add_accept_handler, ssl_wrap_socket
from tornado import process
from tornado.util import errno_from_exception

try:
    import ssl
except ImportError:
    # ssl is not available on Google App Engine.
    ssl = None


[文档]class TCPServer(object): r"""一个非阻塞, 单线程的 TCP 服务. 想要使用 `TCPServer`, 只需要定义一个子类, 复写 `handle_stream` 方法即可. 例如, 一个简单的 echo server 可以做如下定义:: from tornado.tcpserver import TCPServer from tornado.iostream import StreamClosedError from tornado import gen class EchoServer(TCPServer): @gen.coroutine def handle_stream(self, stream, address): while True: try: data = yield stream.read_until(b"\n") yield stream.write(data) except StreamClosedError: break 为了使该服务提供 SSL 传输, 通过一个名为``ssl_options`` 的关键字参数 传递进去 `ssl.SSLContext` 对象即可. 为了兼容旧版本的 Python, ``ssl_options`` 也可以是一个字典, 作为`ssl.wrap_socket` 方法的关键字 参数.:: ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_ctx.load_cert_chain(os.path.join(data_dir, "mydomain.crt"), os.path.join(data_dir, "mydomain.key")) TCPServer(ssl_options=ssl_ctx) `TCPServer` 初始化可以是以下三种模式之一: 1. `listen`: 简单的单进程模式:: server = TCPServer() server.listen(8888) IOLoop.current().start() 2. `bind`/`start`: 简单的多进程模式:: server = TCPServer() server.bind(8888) server.start(0) # Forks multiple sub-processes IOLoop.current().start() 当使用这个接口, `.IOLoop` 一定 *不能* 被传递给 `TCPServer` 构造器. `start` 总是会在默认单一的 `.IOLoop` 上启动服务. 3. `add_sockets`: 高级多进程模式:: sockets = bind_sockets(8888) tornado.process.fork_processes(0) server = TCPServer() server.add_sockets(sockets) IOLoop.current().start() `add_sockets` 接口更加复杂, 但是它可以和 `tornado.process.fork_processes` 一起被使用, 当 fork 发生的时候给你更多灵活性. `add_sockets` 也可以被用于 单进程服务中, 如果你想要使用 `~tornado.netutil.bind_sockets` 以外的方式 创建你监听的 socket. .. versionadded:: 3.1 ``max_buffer_size`` 参数. """ def __init__(self, io_loop=None, ssl_options=None, max_buffer_size=None, read_chunk_size=None): self.io_loop = io_loop self.ssl_options = ssl_options self._sockets = {} # fd -> socket object self._pending_sockets = [] self._started = False self.max_buffer_size = max_buffer_size self.read_chunk_size = read_chunk_size # Verify the SSL options. Otherwise we don't get errors until clients # connect. This doesn't verify that the keys are legitimate, but # the SSL module doesn't do that until there is a connected socket # which seems like too much work if self.ssl_options is not None and isinstance(self.ssl_options, dict): # Only certfile is required: it can contain both keys if 'certfile' not in self.ssl_options: raise KeyError('missing key "certfile" in ssl_options') if not os.path.exists(self.ssl_options['certfile']): raise ValueError('certfile "%s" does not exist' % self.ssl_options['certfile']) if ('keyfile' in self.ssl_options and not os.path.exists(self.ssl_options['keyfile'])): raise ValueError('keyfile "%s" does not exist' % self.ssl_options['keyfile'])
[文档] def listen(self, port, address=""): u"""开始在给定的端口接收连接. 这个方法可能不只被调用一次, 可能会在多个端口上被调用多次. `listen` 方法将立即生效, 所以它没必要在 `TCPServer.start` 之后调用. 然而, 必须要启动 `.IOLoop` 才可以. """ sockets = bind_sockets(port, address=address) self.add_sockets(sockets)
[文档] def add_sockets(self, sockets): u"""使服务开始接收给定端口的连接. ``sockets`` 参数是一个 socket 对象的列表, 例如那些被 `~tornado.netutil.bind_sockets` 所返回的对象. `add_sockets` 通常和 `tornado.process.fork_processes` 相结合使用, 以便于在一个多进程服务初始化时提供更多控制. """ if self.io_loop is None: self.io_loop = IOLoop.current() for sock in sockets: self._sockets[sock.fileno()] = sock add_accept_handler(sock, self._handle_connection, io_loop=self.io_loop)
[文档] def add_socket(self, socket): u"""单数版本的 `add_sockets`. 接受一个单一的 socket 对象.""" self.add_sockets([socket])
[文档] def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128): u"""绑定该服务到指定的地址的指定端口上. 要启动该服务, 调用 `start`. 如果你想要在一个单进程上运行该服务, 你可以调用 `listen` 作为顺序调用 `bind` 和 `start` 的一个快捷方式. address 参数可以是 IP 地址或者主机名. 如果它是主机名, 该服务将监听在和该名称有关的所有 IP 地址上. 地址也可以是空字符串或者 None, 服务将监听所有可用的接口. family 可以被设置为 `socket.AF_INET` 或 `socket.AF_INET6` 用来限定是 IPv4 或 IPv6 地址, 否则如果可用的话, 两者 都将被使用. ``backlog`` 参数和 `socket.listen <socket.socket.listen>` 是相同含义. 这个方法可能在 `start` 之前被调用多次来监听在多个端口或接口上. """ sockets = bind_sockets(port, address=address, family=family, backlog=backlog) if self._started: self.add_sockets(sockets) else: self._pending_sockets.extend(sockets)
[文档] def start(self, num_processes=1): u"""在 `.IOLoop` 中启动该服务. 默认情况下, 我们在该进程中运行服务, 并且不会 fork 出任何额外 的子进程. 如果 num_processes 为 ``None`` 或 <= 0, 我们检测这台机器上可用的 核心数并 fork 相同数量的子进程. 如果给定了 num_processes 并且 > 1, 我们 fork 指定数量的子进程. 因为我们使用进程而不是线程, 在任何服务代码之间没有共享内存. 注意多进程模式和 autoreload 模块不兼容(或者是当 ``debug=True`` 时 `tornado.web.Application` 的 ``autoreload=True`` 选项默认为 True). 当使用多进程模式时, 直到 ``TCPServer.start(n)`` 调用后, 才能创建或者 引用 IOLoops . """ assert not self._started self._started = True if num_processes != 1: process.fork_processes(num_processes) sockets = self._pending_sockets self._pending_sockets = [] self.add_sockets(sockets)
[文档] def stop(self): u"""停止对新连接的监听. 正在进行的请求可能仍然会继续在服务停止之后. """ for fd, sock in self._sockets.items(): self.io_loop.remove_handler(fd) sock.close()
[文档] def handle_stream(self, stream, address): u"""通过复写这个方法以处理一个来自传入连接的新 `.IOStream` . 这个方法可能是一个协程; 如果是这样, 异步引发的任何异常都将被记录. 接受传入连接不会被该协程阻塞. 如果这个 `TCPServer` 被配置为 SSL, ``handle_stream`` 将在 SSL 握手完成前被调用. 如果你需要验证客户端的证书或使用 NPN/ALPN 请使用 `.SSLIOStream.wait_for_handshake` . .. versionchanged:: 4.2 给这个方法添加了选项, 可以为协程. """ raise NotImplementedError()
def _handle_connection(self, connection, address): if self.ssl_options is not None: assert ssl, "Python 2.6+ and OpenSSL required for SSL" try: connection = ssl_wrap_socket(connection, self.ssl_options, server_side=True, do_handshake_on_connect=False) except ssl.SSLError as err: if err.args[0] == ssl.SSL_ERROR_EOF: return connection.close() else: raise except socket.error as err: # If the connection is closed immediately after it is created # (as in a port scan), we can get one of several errors. # wrap_socket makes an internal call to getpeername, # which may return either EINVAL (Mac OS X) or ENOTCONN # (Linux). If it returns ENOTCONN, this error is # silently swallowed by the ssl module, so we need to # catch another error later on (AttributeError in # SSLIOStream._do_ssl_handshake). # To test this behavior, try nmap with the -sT flag. # https://github.com/tornadoweb/tornado/pull/750 if errno_from_exception(err) in (errno.ECONNABORTED, errno.EINVAL): return connection.close() else: raise try: if self.ssl_options is not None: stream = SSLIOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size, read_chunk_size=self.read_chunk_size) else: stream = IOStream(connection, io_loop=self.io_loop, max_buffer_size=self.max_buffer_size, read_chunk_size=self.read_chunk_size) future = self.handle_stream(stream, address) if future is not None: self.io_loop.add_future(future, lambda f: f.result()) except Exception: app_log.error("Error in connection callback", exc_info=True)