mongoqueue 0.6.0

A queue using mongo as backend storage.

Latest Version: 0.7.2


  • Isolation

    Do not let different consumers process the same message.

  • Reliablity

    Do not let a failed consumer disappear an item.

  • Atomic

    Operations on the queue are atomic.


A queue can be instantiated with a mongo collection and a consumer identifier. The consumer identifier helps distinguish multiple queue consumers that are taking jobs from the queue.

>> from pymongo import Connection >> from mongoqueue import MongoQueue >> queue = MongoQueue( … Connection(TEST_DB).doctest_queue, … consumer_id=”consumer-1”, … timeout=300, … max_attempts=3)

The MongoQueue class timeout parameters specifies how long in a seconds a how long a job may be held by a consumer before its considered failed.

A job which timeouts or errors more than the max_attempts parameter is considered permanently failed, and will no longer be processed.

New jobs/items can be placed in the queue by passing a dictionary.

>> queue.put({“foobar”: 1})

A job priority key and integer value can be specified in the dictionary which will cause the job to be processed before lower priority items.

>> queue.put({“foobar”: 0, “priority”: 1})

An item can be fetched out by calling the next method on a queue. This returns a Job object.

>> job = >> job.payload {“foobar”: 1}

The job class exposes some control methods on the job, for marking progress, completion, errors, or releasing the job back into the queue.

  • complete Marks a job as complete and removes it from the queue.

  • error Optionally specified with a message, releases the job back to the

    queue, and increments its attempts, and stores the error message on the job.

  • progress Optionally takes a progress count integer, notes progress on the job

    and resets the lock timeout.

  • release Release a job back to the pool. The attempts counter is not modified.

As a convience the job supports the context manager protocol.

>> with job as data: … print data[‘payload’]

{“foobar: 0}

If the context closure is exited without the job is marked complete, if there’s an exception the error is stored on the job.

Running Tests

Unit tests can be run with

$ python nosetests


0.6.0 - Feb 4th, 2013 - Isolate passed in data from metadata in Job. 0.5.2 - Dec 9th, 2012 - Fix for regression in sort parameters from pymongo 2.4 0.5.1 - Dec 2nd, 2012 - Packaging fix for readme data file.


Kapil Thangavelu, author & maintainer Dustin Laurence, sort fix for pymongo 2.4 Jonathan Sackett, Job data isolation.

