Skip to main content

Simple pubsub pattern for asyncio applications

Project description

Simple publish-subscribe pattern for asyncio applications.

Why

When building big applications, separation of concerns is a great way to keep things manageable. In messaging systems, the publish-subscribe pattern is often used to decouple data producers and data consumers. We went a step ahead and designed even the internals of our applications around this pattern.

We explain our thinking and the workings of aiopubsub in detail in our article Design your app using the pub-sub pattern with aiopubsub. We recommend reading it before using aiopubsub in your project.

Installation

aiopubsub is only compatible with Python 3.8 and higher. There are no plans to support older versions.

aiopubsub is available on PyPI and you can install it with:

pip install aiopubsub

or

poetry add aiopubsub

How to use it

The following comprehensive example is explained step-by-step in our article “Design your app using the pub-sub pattern with aiopubsub”.

import asyncio
import dataclasses
import decimal

import aiopubsub


@dataclasses.dataclass
class Trade:
        timestamp: float
        quantity: int
        price: decimal.Decimal


async def on_trade(key: aiopubsub.Key, trade: Trade) -> None:
        print(f'Processing trade = {trade}  with key = {key}.')


async def on_nyse_trade(key: aiopubsub.Key, trade: Trade) -> None:
        print(f'Processing trade = {trade}  with key = {key} that happened in NYSE')


async def main():
        # create an aiopubsub hub
        hub = aiopubsub.Hub()

        # create a sample of data to send
        trade = Trade(timestamp = 123.5, quantity = 56, price = decimal.Decimal('1639.43'))

        # subscriber listens on every trade and calls the `on_trade` function
        subscriber = aiopubsub.Subscriber(hub, 'trades')
        subscribe_key = aiopubsub.Key('*', 'trade', '*')
        subscriber.add_async_listener(subscribe_key, on_trade)

        # publisher has a NASDAQ prefix and sends the trade that happened on Google stock
        publisher = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NASDAQ'))
        publish_key = aiopubsub.Key('trade', 'GOOGL')
        publisher.publish(publish_key, trade)

        # sleep so the event loop can process the action
        await asyncio.sleep(0.001)

        # expected output:
        # Processing trade = Trade(timestamp=123.5, quantity=56, price=Decimal('1639.43'))  with key = ('NASDAQ', 'trade', 'GOOGL').

        # sample from another stock exchange
        trade_nyse = Trade(timestamp = 127.45, quantity = 67, price = decimal.Decimal('1639.44'))

        # subscribe only for the NYSE exchange
        subscribe_key_nyse = aiopubsub.Key('NYSE', 'trade', '*')
        subscriber.add_async_listener(subscribe_key_nyse, on_nyse_trade)

        # publish NYSE trade
        publisher_nyse = aiopubsub.Publisher(hub, prefix = aiopubsub.Key('NYSE'))
        publisher_nyse.publish(aiopubsub.Key('trade', 'GOOGL'), trade_nyse)

        # sleep so the event loop can process the action
        await asyncio.sleep(0.001)

        # expected output:
        # Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44'))  with key = ('NYSE', 'trade', 'GOOGL').
        # Processing trade = Trade(timestamp=127.45, quantity=67, price=Decimal('1639.44'))  with key = ('NYSE', 'trade', 'GOOGL') that happened in NYSE

        # clean the subscriber before the end of the program
        await subscriber.remove_all_listeners()

if __name__ == '__main__':
        asyncio.run(main())

Aiopubsub will use logwood if it is installed, otherwise it will default to the standard logging module. Note that logwood is required to run tests.

Architecture

Hub accepts messages from Publishers and routes them to Subscribers. Each message is routed by its Key - an iterable of strings forming a hierarchic namespace. Subscribers may subscribe to wildcard keys, where any part of the key may be replaced replaced with a * (star).

addedSubscriber and removedSubscriber messages

When a new subscriber is added the Hub sends this message

{
        "key": ("key", "of", "added", "subscriber"),
        "currentSubscriberCount": 2
}

under the key ('Hub', 'addedSubscriber', 'key', 'of', 'added', 'subscriber') (the part after addedSubscriber is made of the subscribed key). Note the currentSubscriberCount field indicating how many subscribers are currently subscribed.

When a subscriber is removed a message in the same format is sent, but this time under the key ('Hub', 'removedSubscriber', 'key', 'of', 'added', 'subscriber').

Contributing

Pull requests are welcome! In particular, we are aware that the documentation could be improved. If anything about aiopubsub is unclear, please feel free to simply open an issue and we will do our best to advise and explain 🙂


quantlane.png

fastenum was made by Quantlane, a systematic trading firm. We design, build and run our own stock trading platform.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiopubsub-3.0.0.tar.gz (11.8 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page