skip to navigation
skip to content

arpc 0.5

A small library for using AMQP message queues for RPC-like communication

Downloads ↓

This is a small library for RPC (Remote Procedure Call) communication using AMQP. It uses amqplib (http://code.google.com/p/py-amqplib/), thus it uses blocking sockets, no threads or other asynchronous libs. However, there is a support for timeouts and "lazy responses". It's not meant to be a framework, like celery/kombu, only a library for easy setup of AMQP broker for RPC. It does not affect performance and does not introduce many abstractions or "magic", raw AMQP objects can be accessed directly if needed.

A typical usage scenario is when a client is a webapp, and a server consists of a pool of "workers", possibly placed on many nodes (physical servers). The webapp calls a service, eg. "send_email", and one of the workers from the pool processes the request. For default messages (service arguments and responses) are JSON document, although it is not hardcoded.

Example: a service for adding numbers

Let's call the service "example". The class implementing the service is as follows:

class ExampleService(object):
    def c_add(self, req):
        return {'res': req['x'] + req['y']}

Methods prefixed with c_ are methods exposed for remote calling. The req argument is a request sent by a client. In this example this is a JSON document with x and y attributes. We return a dictionary with a result which will be converted to a JSON document. This is how the processing loop in a worker process is run:

from arpc import mq

# Uses default connection parameters (localhost)
amqp = mq.AMQPOperations(useack=True, persistent=True)
server = mq.make_json_server(amqp, 'example', ExampleService())
server.loop()

The AMQPOperations class makes low-level AMQP operations and stores connection parameters. The second argument of make_json_server indicates the name of AMQP routing key from which the process will receive requests. You can scale the service by running many worker processes with the same routing key.

A client calls the service following way:

from arpc import mq

sc = mq.ServiceCaller(mq.AMQPOperations(useack=True, persistent=True), 'example', want_replies=True)
res = sc.send_request({'c': 'add', 'x': 4, 'y': 8})

By convention the request document should be a dictionary with a c attribute, under which the name of called method should be placed (without the c_ prefix).

Sending messages from command line

There is a script arpc_msg_sender.py included with the package (should be in $PATH after installation). It can be used for sending messages directly from command line. The first argument is a routing key, and the second is message body. After running a worker process (runnable example of ExampleService is here: http://code.google.com/p/python-arpc/source/browse/examples/example_worker.py) we can use it from command line:

$ arpc_msg_sender.py example '{"c":"add", "x":10, "y":20}'
msg_sender 2011-05-01 17:53:39,454 INFO Message sent, waiting for response
msg_sender 2011-05-01 17:53:39,492 INFO Result: <{u'res': 30}>

Managing worker processes

For every service's routing key foo there is created a mgt.foo routing key which processes management commands. We can ping workers to see what are available:

$ arpc_msg_sender.py mgt.example '{"c":"ping"}'
msg_sender 2011-05-01 19:01:34,544 INFO Message sent, waiting for response
msg_sender 2011-05-01 19:01:34,583 INFO Result: <{u'host': u'aserver', u'wid': u'arpc-example-4f741449e0', u'pid': 6868}>
msg_sender 2011-05-01 19:01:34,584 INFO Result: <{u'host': u'aserver', u'wid': u'arpc-example-cd06537047', u'pid': 7177}>
msg_sender 2011-05-01 19:01:34,584 INFO Result: <{u'host': u'aserver', u'wid': u'arpc-example-1bea3c09b4', u'pid': 7178}>

The wid is a "worker id", a unique string identifying this process. We can also get some statistics:

$ arpc_msg_sender.py mgt.example '{"c":"stats"}'
msg_sender 2011-05-01 19:04:22,114 INFO Message sent, waiting for response
msg_sender 2011-05-01 19:04:22,154 INFO Result: <{u'wid': u'arpc-example-1bea3c09b4', u'pid': 7178, u'host':  u'aserver', u'waittimes': [u'0.0049', u'0.0040', u'0.0043', u'0.0046', u'0.0050', u'0.0038', u'0.0039', u'0.0038', u'0.0038', u'0.0044'], u'processed': 3333, u'exceptions': 0}>
msg_sender 2011-05-01 19:04:22,154 INFO Result: <{u'wid': u'arpc-example-4f741449e0', u'pid': 6868, u'host': u'aserver', u'waittimes': [u'0.0039', u'0.0038', u'0.0038', u'0.0053', u'0.0043', u'0.0040', u'0.0040', u'0.0053', u'0.0042', u'0.0038'], u'processed': 13337, u'exceptions': 0}>
msg_sender 2011-05-01 19:04:22,154 INFO Result: <{u'wid': u'arpc-example-cd06537047', u'pid': 7177, u'host': u'aserver', u'waittimes': [u'0.0055', u'0.0041', u'0.0041', u'0.0040', u'0.0056', u'0.0038', u'0.0038', u'0.0039', u'0.0038', u'0.0038'], u'processed': 3333, u'exceptions': 0}>

The waittimes is an array with seconds elapsed between processing last ten requests. processed and exceptions give number of requests processed by a worker with a success and with a failure (unhandled exception thrown).

Other management commands are eval for evaluating Python expression and kill for killing the workers by themselves. Other commands can be added easily.

If you want to send a message to a specific worker, then use a routing key mgt.<worker-id>, eg. mgt.arpc-example-1bea3c09b4.

Lazy requests

In the default situation, when you call a service which sends a reply, the call operation blocks until there is a reply available. By calling res = service_caller.send_lazy_request() instead of res = service_caller.send_request(), the call doesn't wait for a reply. This way you can eg. call requests to services at the beginning of rendering a webpage, and receive the results at the time you really need them. Receiving a real result is done by calling res.get(). However, if multiple lazy requests are made, there is no guaranteed results ordering.

Implementation notes

  • using explicit send_request instead of using a proxy object (service.add(x, y)) is a feature - simpler stack traces and easier to search code for remote calls
  • tested with RabbitMQ
  • requires Python 2.7
  • no thread-safety, designed to work with multi-process webservers like http://gunicorn.org
  • timeout can be specified as a send_request argument
  • exception information is transferred from a client to a server (no full object serialization, only exception name and a traceback).
 
File Type Py Version Uploaded on Size # downloads
arpc-0.5.tar.gz (md5) Source 2011-05-01 9KB 337