Asynchronous Tornado Redis-based reliable queue package
Project description
Torrelque
Torrelque is a Python 3 package that provides a reliable Redis-backed queues and runs on Tornado IO-loop.
Without further ado it’s easy to say the package is an implementation of the queue described in this blog post with some required changes and improvements.
There’s also a related, synchronous queue package called saferedisqueue.
Install
pip install Torrelque
Use
#!/usr/bin/env python3
import random
import logging
import tornadoredis
from tornado import gen, ioloop
from torrelque import Torrelque
logger = logging.getLogger(__name__)
@gen.coroutine
def produce():
redis = tornadoredis.Client()
queue = Torrelque(redis, ioloop.IOLoop.current())
while True:
for _ in range(5):
task = {'value': random.randint(0, 99)}
logger.debug('Produced task %s', task)
yield queue.enqueue(task)
yield gen.sleep(10)
@gen.coroutine
def process(task_data):
logger.debug('Consmed task %s', task_data)
yield gen.sleep(1)
@gen.coroutine
def consume():
redis = tornadoredis.Client()
queue = Torrelque(redis, ioloop.IOLoop.current())
while True:
task_id, task_data = yield queue.dequeue()
if not task_id:
continue
try:
yield process(task_data)
yield queue.release(task_id)
except Exception:
logger.exception('Job processing has failed')
yield queue.requeue(task_id, delay = 30)
@gen.coroutine
def main():
for _ in range(4):
ioloop.IOLoop.current().spawn_callback(consume)
yield produce()
if __name__ == '__main__':
logging.basicConfig(level = logging.DEBUG, format = '%(asctime)s %(message)s')
ioloop.IOLoop.instance().run_sync(main)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Torrelque-0.1.2.tar.gz
(6.0 kB
view hashes)