Skip to main content

producer/consumer with exception handling

Project description

Proconex is a module to simplify the implementation of the producer/consumer idiom. In addition to simple implementations based on Python’s Queue.Queue, proconex also takes care of exceptions raised during producing or consuming items and ensures that all the work shuts down in a clean manner without leaving zombie threads.

Example Usage

Here is a simple producer that reads lines from a file:

>>> import proconex
>>> class LineProducer(proconex.Producer):
...     def __init__(self, fileToReadPath):
...         super(LineProducer, self).__init__()
...         self._fileToReadPath = fileToReadPath
...     def items(self):
...         with open(self._fileToReadPath, 'rb') as fileToRead:
...             for lineNumber, line in enumerate(fileToRead, start=1):
...                 yield (lineNumber, line.rstrip('\n\r'))

The constructor can take any parameters you need to set up the producer. In this case, all we need is the path to the file to read, fileToReadPath. The constructor simply stores the value in an attribute for later reference.

The function items() typically is implemented as generator and yields the produced items one after another until there are no more items to produce. In this case, we just return the file line by line as a tuple of line number and line contents without trailing newlines.

Next, we need a consumer. Here is a simple one that processes the lines read by the producer above and prints its number and text:

>>> class LineConsumer(proconex.Consumer):
...     def consume(self, item):
...         if "self" in item:
...             print line

With classes for producer and consumer defined, we can create a producer and a list of consumers:

>>> producer =  LineProducer(__file__)
>>> consumers = [LineConsumer("consumer#%d" % consumerId)
...         for consumerId in xrange(3)]

To actually start the production process, we need a worker to control the producer and consumers:

>>> with proconex.Worker(producer, consumers) as lineWorker:
...     lineWorker.work()

The with statement makes sure that all threads are terminated once the worker finished or failed. Alternatively you can use try ... except ... finally to handle error and cleanup:

>>> lineWorker = proconex.Worker(producer, consumers)
>>> try:
...     lineWorker
... except Exception, error:
...     print error
... finally:
...    lineWorker.close() # doctest: +ELLIPSIS
<proconex.Worker object at ...>

Limitations

When using proconex, there are a few things you should be aware of:

  • Due to Python’s GlobalLock, either or both producer and consumer should

    be I/O bound in order to allow thread switches.

  • The code contains a few polling loops because Queue does

    not support canceling get() and put(). However, the polling does not drain the CPU because it uses a timeout when waiting for events to happen.

  • The only way to recover from errors during production is to restart the

    whole process from the beginning.

If you need more flexibility and control than proconex offers, try celery.

Version history

Version 0.1, 2012-02-03

  • Initial public release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

proconex-0.1.zip (11.8 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page