Skip to main content

UNKNOWN

Project description

A catalog queue provides a queue for catalog indexing. The basic idea is to queue catalog operations so:

  • Operations can be batched for greater efficiency

  • Application requests don’t have to wait for indexing to be done

The benefits of queueing are especially significant when text indexes are used.

Detailed Documentation

Using Queues

A queue is created by instantiating a zc.catalogqueue.queue.CatalogQueue object:

>>> import zc.catalogqueue.queue
>>> queue = zc.catalogqueue.queue.CatalogQueue()

We can pass a queue size. It should be a prime number. The default is 1009, which is a bit large.

>>> queue = zc.catalogqueue.queue.CatalogQueue(11)

Typically, queues are registered as zc.catalogqueue.interfaces.ICatalogQueue utilities.

>>> import zope.interface, pprint
>>> pprint.pprint(sorted(zope.interface.providedBy(queue)), width=1)
[<InterfaceClass zc.catalogqueue.interfaces.ICatalogQueue>,
 <InterfaceClass persistent.interfaces.IPersistent>]

There are some bits of information that the queue maintains regarding its own processing state. The time of last processing and the total number of cataloging events processed are available. Since this queue hasn’t been processed yet, these have some initial values:

>>> print queue.lastProcessedTime
None
>>> queue.totalProcessed
0

The length of the queue provides access to the number of pending cataloging events:

>>> len(queue)
0

Queues are used in 2 ways. As content are modified, we call add, update, and remove methods on the queue:

>>> queue.add(1)
>>> queue.update(1)
>>> queue.remove(1)
>>> queue.update(2)
>>> queue.update(2)
>>> queue.add(3)
>>> queue.update(3)
>>> queue.add(3)
Traceback (most recent call last):
...
TypeError: Attempt to add an object that is already in the catalog
>>> queue.update(4)
>>> queue.update(4)
>>> queue.update(4)
>>> queue.remove(5)
>>> queue.update(5)
Traceback (most recent call last):
...
TypeError: Attempt to change an object that has been removed
>>> queue.update(0)
>>> queue.update(0)

At this point, we’ve added several events, but haven’t processed the queue, so we expect lastProcessedTime, totalProcessed to be unchanged, but the queue length to reflect the pending tasks:

>>> print queue.lastProcessedTime
None
>>> queue.totalProcessed
0
>>> len(queue)
6

Periodically, we call process on the queue. We need to pass an ids object and a collection of injection (catalog) objects:

>>> class Ids:
...     def queryObject(self, id, default=None):
...         if not id:
...             return default
...         return "object %s" % id
>>> class Injection:
...     def __init__(self, name):
...         self.name = name
...     def index_doc(self, docid, value):
...         print self.name, 'indexing', docid, value
...     def unindex_doc(self, docid):
...         print self.name, 'unindexing', docid
>>> queue.process(Ids(), [Injection('cat1'), Injection('cat2')], 10)
cat1 unindexing 1
cat2 unindexing 1
cat1 indexing 2 object 2
cat2 indexing 2 object 2
cat1 indexing 3 object 3
cat2 indexing 3 object 3
cat1 indexing 4 object 4
cat2 indexing 4 object 4
cat1 unindexing 5
cat2 unindexing 5
6

There are a number of things to note about this example:

  • Each object is processed only once.

  • What happens depends on the last event.

  • Object 0 wasn’t indexed because queryObject returned None. We ignore events for objects that have been removed from the intid utility.

  • The number of objects processed is returned.

The processing information has been updated on the queue:

>>> queue.lastProcessedTime  # doctest: +ELLIPSIS
datetime.datetime(... tzinfo=<UTC>)
>>> queue.totalProcessed
6
>>> previous_time = queue.lastProcessedTime

The length of the queue now indicates that no further events are pending:

>>> len(queue)
0

If we process the queue without additional events, we’ll just get 0 back:

>>> queue.process(Ids(), [Injection('cat1'), Injection('cat2')], 10)
0

The historical processing information is updated:

>>> queue.lastProcessedTime  # doctest: +ELLIPSIS
datetime.datetime(... tzinfo=<UTC>)
>>> queue.lastProcessedTime > previous_time
True
>>> queue.totalProcessed
6
>>> len(queue)
0

Of course, the limit argument limits how many events we process:

>>> for i in range(10):
...     queue.update(i)
>>> len(queue)
10
>>> queue.process(Ids(), [Injection('cat1')], 5)
cat1 indexing 1 object 1
cat1 indexing 2 object 2
cat1 indexing 3 object 3
cat1 indexing 4 object 4
5
>>> queue.totalProcessed
11
>>> len(queue)
5
>>> queue.process(Ids(), [Injection('cat1')], 5)
cat1 indexing 5 object 5
cat1 indexing 6 object 6
cat1 indexing 7 object 7
cat1 indexing 8 object 8
cat1 indexing 9 object 9
5
>>> queue.totalProcessed
16
>>> len(queue)
0

(Remember that 0 isn’t processed because it can’t be found.)

When an object can’t be found, a warning is logged:

>>> import zope.testing.loggingsupport
>>> handler = zope.testing.loggingsupport.InstalledHandler('zc')
>>> queue.update(0)
>>> queue.process(Ids(), [Injection('cat1')], 5)
1
>>> print handler
zc.catalogqueue.queue WARNING
  Couldn't find object for 0
>>> handler.uninstall()

Edgecase

If a “old” state has two ‘ADDED’ events, and the committed state processes the queue, and the “new” state modifies one of the objects marked for addition, the code marks the other for removal.

>>> from zc.catalogqueue.CatalogEventQueue import (
...     CatalogEventQueue, ADDED, REMOVED, CHANGED, CHANGED_ADDED)
>>> cq = CatalogEventQueue()
>>> def resolve(old, committed, new):
...     return sorted(cq._p_resolveConflict(
...        {'_conflict_policy': 0, '_data': old},
...        {'_conflict_policy': 0, '_data': committed},
...        {'_conflict_policy': 0, '_data': new}
...        )['_data'].items())
>>> resolve({1: (1, ADDED), 2: (1, ADDED)}, {},
...         {1: (1, ADDED), 2: (3, REMOVED), 3: (1, ADDED)})
[(2, (3, 0)), (3, (1, 1))]

Download

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zc.catalogqueue-0.3.1.tar.gz (12.0 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page