Skip to main content

asyncronous pubsub in django using redis

Project description

https://travis-ci.org/andrewyoung1991/django-redis-pubsub.svg?branch=master https://coveralls.io/repos/github/andrewyoung1991/django-redis-pubsub/badge.svg?branch=master Code Climate

asyncronous subscription distrobution for django (with websocket support!!!!).

PublishableModel

You’ll first need to create some publishable models.

# models.py

class Correspondence(PublishableModel):
    PUBLISH_ON_CREATE = False
    PUBLISH_ON_UPDATE = False

    participants = models.ManyToManyField(settings.AUTH_USER_MODEL, related_name="correspondences")

    def save(self, *args, **kwargs):
        super(Correspondence, self).save(*args, **kwargs)
        # add subscribe all the users to the channel
        channel = self.channel
        for subscriber in self.participants:
            channel.subscribe(subscriber)


class Message(PublishableModel):
    PUBLISH_ON_CREATE = True
    PUBLISH_ON_UPDATE = False

    correspondence = models.ForeignKey("Correspondence", related_name="messages")
    author = models.ForeignKey(settings.AUTH_USER_MODEL, related_name="sent_messages")
    body = models.TextField()

    def save(self, *args, **kwargs):
        if not hasattr(self, "channel"):
            self.channel = self.correspondence.channel
        super(Message, self).save(*args, **kwargs)

# views.py

def send_message(request, correspondence, *args, **kwargs):
    message = Message.objects.create(
        correspondence_id=correspondence,
        author=request.user,
        body=request.POST["body"]
    return render_to_response(request, "messages.html", {"message": message})

# websockets.py

@websocket_pubsub(authenticate=True)
def read_messages(ws, params, user, manager):
    subscription = user.subscriptions.get(channel__name="something:unique".format(user.username))
    reader = subscription.get_reader(manager=manager)

    @reader.callback
    def send_message_alert(channel_name, model):
        alert = json.dumps({"message": "new message from {0}".format(model.author.get_full_name()))
        ws.send_str(alert)
        return True

    listener = yield from reader.listen()
    yield from listener

In the above example, a client who has established a websocket connection to the handler in websockets.py will receive alerts as long as the websocket connection remains open. When another client sends a POST request to the send_message view in views.py the message will be published and received by the read_messages.send_message_alert callback where further processing/serialization can occur.

Websockets

If you choose to use redis_pubsub.contrib.websockets there are additional packages that you will need to install:

$ pip install aiohttp aiohttp_wsgi

Websocket handlers belong in module in your application by the name of websockets.py. This module should export a handlerconf, which is a list of the names of the handlers in the module

@websocket("/")  # this handler will be at http://yourapp.com/
def myhandler(ws, params, **kwargs):
    ...

handlerconf = ["myhandler", ]

Websocket requests are handled with the excellent aiohttp package which takes care of the encoding/decoding, handshake, and cleanup of a websocket session. Handlers for websocket requests are coroutines decorated with either the redis_pubsub.contrib.websockets.websocket or redis_pubsub.contrib.websockets.websocket_pubsub wrappers. These wrappers handle converting your handler to a coroutine and passing arguments to your handler. A simple handler that echo’s a message back to the client would look like this

@websocket("/echo")
def echo(ws, params, **kwargs):
    message = yield from ws.receive()
    ws.send_str(message.data)

The former example shows a websocket handler that waits for a message from a connected client, echo’s the message back to the client and closes the connection.

Websocket Authentication

If you choose to use authenticated websockets you will need to either install djangorestframework and use the rest_framework.authtoken.models.Token object as your authentication method or simply use rest_framework_jwt to distribute and challenge JTW’s provided by your client. to configure authentication with one of these methods (or your own token authentication method) add the module path to the REDIS_PUBSUB config:

REDIS_PUBSUB = {
    "tokenauth_method": "redis_pubsub.auth.authjwt_method",  # defaults to "redis_pubsub.auth.authtoken_method"
}

If you do decide to roll your own tokenauth_method, this method must accept a single argument (the token string) and return either None if the token is not valid or an instance of AUTH_USER_MODEL if the token is valid.

Websocket Pubsub

You can access the Pubsub methods provided by redis_pubsub in your websocket handlers by decorating your handler with the redis.pubsub.contrib.websockets.websocket_pubsub wrapper. This wrapper provides an additional argument manager to your handler. The manager can be used to keep track of subscription channels and stop them if necessary

# websockets.py

@websocket_pubsub("/messages", authenticate=True)
def message_pusher(ws, params, manager, user, **kwargs):
    subscription = user.subscriptions.get(channel__name="messages")
    reader = subscription.get_reader(manager=manager)

    @reader.callback
    def callback(channel_name, message):
        to_client = {
            channel_name: {
                "author": message.author.username,
                "body": message.body
            }
        }
        ws.send_str(json.dumps())
        return True

    listener = yield from reader.listen()
    yield from listener

This example shows the main purpose of the redis_pubsub package, which is to listen for updates on a redis channel and push the publication to a client. Lets break it down line by line

  1. retreive the users subscription

  2. create a managed ChannelReader object for this subscription

  3. register a callback to be executed whenever a new publication is received

  4. begin listening for changes

  5. listen until the channel is closed

The most fruitful method offerd by a SubscriptionManager is listen_to_all_subscriptions which takes two arguments, a subscriber and a callback, and publishes subscriptions as they arrive

# websockets.py

@websocket_pubsub("/subscriptions", authenticate=True)
def subscriptions(ws, params, manager, user, **kwargs):

    def callback(channel_name, message):
        ws.send_str(message.serialize())
        return True

    manager.listen_to_all_subscriptions(user, callback)

    while True:
        message = yield from ws.receive()
        if message.tp not in (MsgType.error, MsgType.close):
            message = json.loads(message)
            if message["action"] == "unsubscribe":
                subscription = user.subscriptions.get(channel__name=message["channe"])
                subscription.active = False
                subscription.save()
                reader = manager.readers[message["channel"]]
                yield from manager.remove(reader)
            elif message["action"] == "subscribe":
                channel = Channel.objects.get(name=message["channel"])
                reader = channel.subscribe(user).get_reader(manager=manager)
                reader.callback(callback)
                yield from reader.listen()
        else:
            break

The callback in this example will keep all subscription channels open and push messages to a client until the websocket has closed. This code provides a simple means of managing users with a multitude of subscriptions. The while loop here also handles unsubscribing and subscribing to new channels

Deploying

when deploying an application with websockets/aiohttp you will not be able to use the normal django deployment proceedures. Since your django application will be a component of an AioHttp application object, you will have to use Gunicorn as an application server. Using utilities from the redis_pubsub.contrib.websockets module you can create a deployment file simply

# deployment.py

import asyncio

from django.core.wsgi import get_wsgi_application

from aiohttp_wsgi import WSGIHandler

from redis_pubsub.contrib.websockets import setup


wsgi_app = get_wsgi_application()  # django.setup() is called here
wsgi_handler = WSGIHandler(wsgi_app)

loop = asyncio.get_event_loop()
application = setup(loop=loop)
# any url patterns not matched by the Websocket app go to the django app for handling
application.router.add_route("*", "/{path_info:.*}", wsgi_handler.handle_request)

you can then start gunicorn by running:

$ gunicorn deployment:application --bind localhost:8080 --worker-class aiohttp.worker.GunicornWebWorker

Project details


Release history Release notifications | RSS feed

This version

1.0

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

django-redis-pubsub-1.0.tar.gz (13.0 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page