tornado.queues 源代码

#!/usr/bin/env python
# coding: utf-8
#
# Copyright 2015 The Tornado Authors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from __future__ import absolute_import, division, print_function, with_statement

__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']

import collections
import heapq

from tornado import gen, ioloop
from tornado.concurrent import Future
from tornado.locks import Event


[文档]class QueueEmpty(Exception): """当队列中没有项目时, 由 `.Queue.get_nowait` 抛出.""" pass
[文档]class QueueFull(Exception): """当队列为最大size时, 由 `.Queue.put_nowait` 抛出.""" pass
def _set_timeout(future, timeout): if timeout: def on_timeout(): future.set_exception(gen.TimeoutError()) io_loop = ioloop.IOLoop.current() timeout_handle = io_loop.add_timeout(timeout, on_timeout) future.add_done_callback( lambda _: io_loop.remove_timeout(timeout_handle)) class _QueueIterator(object): def __init__(self, q): self.q = q def __anext__(self): return self.q.get()
[文档]class Queue(object): """协调生产者消费者协程. 如果maxsize 是0(默认配置)意味着队列的大小是无限的. .. testcode:: from tornado import gen from tornado.ioloop import IOLoop from tornado.queues import Queue q = Queue(maxsize=2) @gen.coroutine def consumer(): while True: item = yield q.get() try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done() @gen.coroutine def producer(): for item in range(5): yield q.put(item) print('Put %s' % item) @gen.coroutine def main(): # Start consumer without waiting (since it never finishes). IOLoop.current().spawn_callback(consumer) yield producer() # Wait for producer to put all tasks. yield q.join() # Wait for consumer to finish all tasks. print('Done') IOLoop.current().run_sync(main) .. testoutput:: Put 0 Put 1 Doing work on 0 Put 2 Doing work on 1 Put 3 Doing work on 2 Put 4 Doing work on 3 Doing work on 4 Done 在Python 3.5, `Queue` 实现了异步迭代器协议, 所以 ``consumer()`` 可以被重写为:: async def consumer(): async for item in q: try: print('Doing work on %s' % item) yield gen.sleep(0.01) finally: q.task_done() .. versionchanged:: 4.3 为Python 3.5添加 ``async for`` 支持 in Python 3.5. """ def __init__(self, maxsize=0): if maxsize is None: raise TypeError("maxsize can't be None") if maxsize < 0: raise ValueError("maxsize can't be negative") self._maxsize = maxsize self._init() self._getters = collections.deque([]) # Futures. self._putters = collections.deque([]) # Pairs of (item, Future). self._unfinished_tasks = 0 self._finished = Event() self._finished.set() @property def maxsize(self): """队列中允许的最大项目数.""" return self._maxsize
[文档] def qsize(self): """当前队列中的项目数.""" return len(self._queue)
def empty(self): return not self._queue def full(self): if self.maxsize == 0: return False else: return self.qsize() >= self.maxsize
[文档] def put(self, item, timeout=None): """将一个项目放入队列中, 可能需要等待直到队列中有空间. 返回一个Future对象, 如果超时会抛出 `tornado.gen.TimeoutError` . """ try: self.put_nowait(item) except QueueFull: future = Future() self._putters.append((item, future)) _set_timeout(future, timeout) return future else: return gen._null_future
[文档] def put_nowait(self, item): """非阻塞的将一个项目放入队列中. 如果没有立即可用的空闲插槽, 则抛出 `QueueFull`. """ self._consume_expired() if self._getters: assert self.empty(), "queue non-empty, why are getters waiting?" getter = self._getters.popleft() self.__put_internal(item) getter.set_result(self._get()) elif self.full(): raise QueueFull else: self.__put_internal(item)
[文档] def get(self, timeout=None): """从队列中删除并返回一个项目. 返回一个Future对象, 当项目可用时resolve, 或者在超时后抛出 `tornado.gen.TimeoutError` . """ future = Future() try: future.set_result(self.get_nowait()) except QueueEmpty: self._getters.append(future) _set_timeout(future, timeout) return future
[文档] def get_nowait(self): """非阻塞的从队列中删除并返回一个项目. 如果有项目是立即可用的则返回该项目, 否则抛出 `QueueEmpty`. """ self._consume_expired() if self._putters: assert self.full(), "queue not full, why are putters waiting?" item, putter = self._putters.popleft() self.__put_internal(item) putter.set_result(None) return self._get() elif self.qsize(): return self._get() else: raise QueueEmpty
[文档] def task_done(self): """表明前面排队的任务已经完成. 被消费者队列使用. 每个 `.get` 用来获取一个任务, 随后(subsequent) 调用 `.task_done` 告诉队列正在处理的任务已经完成. 如果 `.join` 正在阻塞, 它会在所有项目都被处理完后调起; 即当每个 `.put` 都被一个 `.task_done` 匹配. 如果调用次数超过 `.put` 将会抛出 `ValueError` . """ if self._unfinished_tasks <= 0: raise ValueError('task_done() called too many times') self._unfinished_tasks -= 1 if self._unfinished_tasks == 0: self._finished.set()
[文档] def join(self, timeout=None): """阻塞(block)直到队列中的所有项目都处理完. 返回一个Future对象, 超时后会抛出 `tornado.gen.TimeoutError` 异常. """ return self._finished.wait(timeout)
@gen.coroutine def __aiter__(self): return _QueueIterator(self) # These three are overridable in subclasses. def _init(self): self._queue = collections.deque() def _get(self): return self._queue.popleft() def _put(self, item): self._queue.append(item) # End of the overridable methods. def __put_internal(self, item): self._unfinished_tasks += 1 self._finished.clear() self._put(item) def _consume_expired(self): # Remove timed-out waiters. while self._putters and self._putters[0][1].done(): self._putters.popleft() while self._getters and self._getters[0].done(): self._getters.popleft() def __repr__(self): return '<%s at %s %s>' % ( type(self).__name__, hex(id(self)), self._format()) def __str__(self): return '<%s %s>' % (type(self).__name__, self._format()) def _format(self): result = 'maxsize=%r' % (self.maxsize, ) if getattr(self, '_queue', None): result += ' queue=%r' % self._queue if self._getters: result += ' getters[%s]' % len(self._getters) if self._putters: result += ' putters[%s]' % len(self._putters) if self._unfinished_tasks: result += ' tasks=%s' % self._unfinished_tasks return result
[文档]class PriorityQueue(Queue): """一个有优先级的 `.Queue` 最小的最优先. 写入的条目通常是元组, 类似 ``(priority number, data)``. .. testcode:: from tornado.queues import PriorityQueue q = PriorityQueue() q.put((1, 'medium-priority item')) q.put((0, 'high-priority item')) q.put((10, 'low-priority item')) print(q.get_nowait()) print(q.get_nowait()) print(q.get_nowait()) .. testoutput:: (0, 'high-priority item') (1, 'medium-priority item') (10, 'low-priority item') """ def _init(self): self._queue = [] def _put(self, item): heapq.heappush(self._queue, item) def _get(self): return heapq.heappop(self._queue)
[文档]class LifoQueue(Queue): """一个后进先出(Lifo)的 `.Queue`. .. testcode:: from tornado.queues import LifoQueue q = LifoQueue() q.put(3) q.put(2) q.put(1) print(q.get_nowait()) print(q.get_nowait()) print(q.get_nowait()) .. testoutput:: 1 2 3 """ def _init(self): self._queue = [] def _put(self, item): self._queue.append(item) def _get(self): return self._queue.pop()