Skip to main content

Asyncio-based, layered networking library providing Request-reply channels, RPC, and multi-agent systems.

Project description

aiomas – A library for multi-agent systems and RPC based on asyncio

aiomas is an easy-to-use library for request-reply channels, remote procedure calls (RPC) and multi-agent systems (MAS). It’s written in pure Python on top of asyncio.

Here are three simple examples that show the different layers of aiomas and what they add on top of each other:

The request-reply channel has the lowest level of abstraction (but already offers more then vanilla asyncio):

>>> import aiomas
>>>
>>>
>>> async def handle_client(channel):
...     """Handle a client connection."""
...     req = await channel.recv()
...     print(req.content)
...     await req.reply('cya')
...     channel.close()
>>>
>>>
>>> async def client():
...     """Client coroutine: Send a greeting to the server and wait for a
...     reply."""
...     channel = await aiomas.channel.open_connection(('localhost', 5555))
...     rep = await channel.send('ohai')
...     print(rep)
...     channel.close()
>>>
>>>
>>> server = aiomas.run(aiomas.channel.start_server(('localhost', 5555), handle_client))
>>> aiomas.run(client())
ohai
cya
>>> server.close()
>>> aiomas.run(server.wait_closed())

The RPC layer adds remote procedure calls on top of it:

>>> import aiomas
>>>
>>>
>>> class MathServer:
...     router = aiomas.rpc.Service()
...
...     @router.expose
...     def add(self, a, b):
...         return a + b
...
>>>
>>> async def client():
...     """Client coroutine: Call the server's "add()" method."""
...     rpc_con = await aiomas.rpc.open_connection(('localhost', 5555))
...     rep = await rpc_con.remote.add(3, 4)
...     print('What’s 3 + 4?', rep)
...     rpc_con.close()
>>>
>>> server = aiomas.run(aiomas.rpc.start_server(('localhost', 5555), MathServer().router))
>>> aiomas.run(client())
Whats 3 + 4? 7
>>> server.close()
>>> aiomas.run(server.wait_closed())

Finally, the agent layer hides some of the boilerplate code required to setup the sockets and allows agent instances to easily talk with each other:

>>> import aiomas
>>>
>>> class TestAgent(aiomas.Agent):
...     def __init__(self, container):
...         super().__init__(container)
...         print('Ohai, I am %s' % self)
...
...     async def run(self, addr):
...         remote_agent = await self.container.connect(addr)
...         ret = await remote_agent.service(42)
...         print('%s got %s from %s' % (self, ret, remote_agent))
...
...     @aiomas.expose
...     def service(self, value):
...         return value
>>>
>>> c = aiomas.Container.create(('localhost', 5555))
>>> agents = [TestAgent(c) for i in range(2)]
Ohai, I am TestAgent('tcp://localhost:5555/0')
Ohai, I am TestAgent('tcp://localhost:5555/1')
>>> aiomas.run(until=agents[0].run(agents[1].addr))
TestAgent('tcp://localhost:5555/0') got 42 from TestAgentProxy('tcp://localhost:5555/1')
>>> c.shutdown()

aiomas is released under the MIT license. It requires Python 3.4 and above and runs on Linux, OS X, and Windows.

Installation

aiomas requires Python >= 3.4. It uses the JSON codec by default and only has pure Python dependencies.

Install aiomas via pip by running:

$ pip install aiomas

You can enable the optional MsgPack codec or its Blosc compressed version by installing the corresponding features (note, that you need a C compiler to install them):

$ pip install aiomas[mp]   # Enables the MsgPack codec
$ pip install aiomas[mpb]  # Enables the MsgPack and MsgPackBlosc codecs

Features

aiomas just puts three layers of abstraction around raw TCP / unix domain sockets provided by asyncio:

Agents and agent containers:

The top-layer provides a simple base class for your own agents. All agents live in a container.

Containers take care of creating agent instances and performing the communication between them.

The container provides a clock for the agents. This clock can either be synchronized with the real (wall-clock) time or be set by an external process (e.g., other simulators).

RPC:

The rpc layer implements remote procedure calls which let you call methods on remote objects nearly as if they were normal objects:

Instead of ret = obj.meth(arg) you write ret = await obj.meth(arg).

Request-reply channel:

The channel layer is the basis for the rpc layer. It sends JSON or MsgPack encoded byte strings over TCP or unix domain sockets. It also maps replies (of success or failure) to their corresponding request.

Other features:

  • TLS support for authorization and encrypted communication.

  • Interchangeable and extensible codecs: JSON and MsgPack (the latter optionally compressed with Blosc) are built-in. You can add custom codecs or write (de)serializers for your own objects to extend a codec.

  • Deterministic, emulated sockets: A LocalQueue transport lets you send and receive message in a deterministic and reproducible order within a single process. This helps testing and debugging distributed algorithms.

Planned features

Some ideas for future releases:

  • Optional automatic re-connect after connection loss

  • Helper for binding a socket to a random free port

Contribute

Set-up a development environment with:

$ virtualenv -p `which python3` aiomas
$ pip install -r requirements.txt

Run the tests with:

$ py.test
$ # or
$ tox

Support

License

The project is licensed under the MIT license.

License

The project is licensed under the MIT license.

Changelog

0.6.1 – 2015-10-21

  • [CHANGE] Agent now also accepts subclasses of Container (issue #17).

  • [FIX] issue #16: Container API docs no correctly refer to the “create()” method.

0.6.0 – 2015-09-18

  • [CHANGE] Asserted Python 3.5 compatibility and converted all examples to use the new async and await keywords.

  • [CHANGE] Container.__init__() no longer contains an asynchronous task. Instead, you now need to call the factory function Container.create().

  • [CHANGE] Removed Container.spawn(). You can now directly instantiate agent instances but you still need to pass a reference to the agent’s container to Agent.__init__().

  • [NEW] AiomasError is the new base class for all errors in aiomas (issue #15).

  • [NEW] Documentation tests now have their own tox environment (tox -e docs).

  • [NEW] Added support and docs for TLS encryption.

  • [NEW] Added some documentation about the channel layer.

0.5.0 – 2015-06-27

  • [CHANGE] Agent addresses now start with tcp:// or ipc:// (for Unix domain sockets) instead of just agent://.

  • [CHANGE] Using dictionaries as routers is now easier (issue #13).

  • [CHANGE] Renamed the rpc attribute for routers to router.

  • [CHANGE] Renamed Agent.name to Agent.addr and improved agent’s str representation.

  • [CHANGE] Updated and improved str and repr for agents, proxies and agent proxies.

  • [CHANGE] Codec.add_serializer() now raises an exception if there is already a serializer for a given type (issue #9).

  • [NEW] Added aiomas.util.run() (and an aiomas.run() alias) which are shortcuts for loop = asyncio.get_event_loop(); loop_run_{until_complete|forever}().

  • [NEW] Added a @serializable decorator to aiomas.codecs which simplifies making a type serializable.

  • [NEW] Documentation: Overview, Agents, Codecs, Clocks (draft), Testing (draft).

  • [NEW] Container.connect() checks if an agent exists in the remote container.

  • [NEW] Proxies are now cached with weakrefs.

  • [FIX] issue #12: Router.path reversed the order of path components.

  • [FIX] Fixed a bug where concurrent calls to Container.connect() would lead to multiple connections to the same address.

0.4.0 – 2015-04-15

  • [CHANGE] Channel and Container no longer take codec instances but classes. They also accept a list of factories for extra serializers.

  • [CHANGE] The rpc.open_connection() and rpc.start_server() methods no longer accept the add_to parameter. rpc.start_server() accept a client_connected_cb instead, which should be a function with one argument, the RpcClient for each new connection. rpc.open_connection() already returns the RpcClient().

  • [CHANGE] Renamed the package extras from MsgPack to mp and from MsgPackBlosc to mpb to work around a bug in pip/setuptools. They are also shorter now. ;-)

  • [NEW] RpcClient no has a channel and a service attribute.

  • [NEW] Improved error message for LookupError.

  • [FIX] issue #8: Every channel instance created by channel.start_server() now has a separate codec instance to avoid problems with some serializers.

0.3.0 – 2015-03-11

  • [CHANGE] Removed LocalProxies and everything related to it because they caused several problems. That means that agents within a single container now also communicate via TCP sockets. Maybe something similar but more robust will be reintroduced in a later release.

  • [CHANGE] Channel.send() is no longer a coroutine. It returns a Future instead.

  • [CHANGE] Removed Container.get_url_for() which didn’t (and couldn’t) work as I originally assumed.

  • [CHANGE] JSON is now the default codec. msgpack and blosc don’t get installed by default. This way, we only have pure Python dependencies for the default installation which is very handy if you are on Windows. You can enable the other codecs via pip install -U aiomas[MsgPack] or pip install -U aiomas[MsgPackBlosc].

  • [NEW] Support for Python 3.4.0 and 3.4.1 (yes, Python 3.3 with asyncio works, too, but I’ll drop support for it as soon as it becomes a burden) (Resolves issue #6).

  • [NEW] ExternalClock accepts a date string or an Arrow object to set the inital date and time.

  • [NEW] aiomas.util.async() which is like asyncio.async() but registers a callback that instantly captures and raises exceptions, instead of delaying them until the task gets garbage collected.

  • [NEW] The agent container adds a serializer for Arrow dates.

  • [NEW] Proxy implements __eq__() and __hash__(). Two different proxy objects sharing the same channel and pointing to the same remote function will no appear to be equal. This makes it less error prone to use Proxy instances as keys in dictionaries.

  • [NEW] Updated and improved flow-control for Channel and its protocol.

  • [NEW] Improved error handling if the future returned by Channel.send() is triggered or cancelled by an external party (e.g., by going out of scope). If asyncio’s DEBUG mode is enabled, you will even get more detailed error messages.

  • [NEW] MessagePackBlosc codec. It uses msgpack to serialize messages and blosc to compress them. It can massively reduce the message size and consumes very little CPU time.

  • [NEW] A Contract Net example (https://bitbucket.org/ssc/aiomas/src/tip/examples/contractnet.py)

  • [NEW] __str__() representations for agents, containers and codecs (fixes issue #5).

  • [FIX] issue #7: Improved error handling and messages if the (de)serialization raises an exception.

  • [FIX] Containers now work with unix domain sockets.

  • [FIX] Various minor bug-fixes

0.2.0 - 2015-01-23

  • [CHANGE] The MsgPack codec is now the default. Thus, msgpack-python is now a mandatory dependency.

  • [CHANGE] Renamed RpcClient.call to RpcClient.remote.

  • [NEW] aiomas.agent module with an Agent base class and a Container for agents. Agents within a container communicate via direct method calls. Agents in different containers use RPC.

  • [NEW] aiomas.clock module which offers various clocks for a MAS:

    • AsyncioClock is a real-time clock and wraps asyncio’s time(), sleep(), call_later() and call_at() functions.

    • ExternalClock can be synchronized with external simulation environments. This allows you to stop the time or let it pass faster/slower than the wall-clock time.

  • [NEW] Support for unix domain sockets in aiomas.channel and aiomas.rpc.

  • [NEW] “rpc_service()” tasks created by an RPC server can now be collected so that you can wait for their completion before you shutdown your program.

  • [NEW] Added contents to the README and created a Sphinx project. Only the API reference is done yet. A tutorial and topical guides will follow.

  • [FIX] aiomas with the JSON codec is now compatible to simpy.io

0.1.0 – 2014-12-18

Initial release with the following features:

  • A request-reply channel via TCP that allows to send multiple messages and to asynconously wait for results (or an exception).

  • Messages can be serialized with JSON or msgpack.

  • The underlying communication protocol should be compatible with simpy.io (if you use JSON and no custom serializers).

  • Remote procedure calls (RPCs) supporting nested handlers and bidirectional calls (callees can make calls to the caller before returning the actual result).

Authors

The original author of aiomas is Stefan Scherfke.

The development is kindly supported by OFFIS.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiomas-0.6.1.tar.gz (205.1 kB view hashes)

Uploaded Source

Built Distribution

aiomas-0.6.1-py2.py3-none-any.whl (34.9 kB view hashes)

Uploaded Python 2 Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page