Skip to main content

A RabbitMQ client for the SSEC.

Project description

QuickMQ

pipeline status coverage report PyPI version shields.io PyPI pyversions

An easy-to-use AMQP/RabbitMQ publisher created for use at the SSEC.

Description

QuickMQ provides a high-level API to simplify common publishing patterns, which allows event processors at SDS to focus on processing events instead of handling network problems.

  • Automatically reconnect and handle errors.
  • Publishing to multiple servers at once.
  • Guaranteeing message delivery.
  • Publishing messages to a RabbitMQ cluster.

To see the requirements of this project check out the reqs and specs doc.

Please note that QuickMQ cannot currently do the following.

  • Declare AMQP 0-9-1 topology (exchanges, queues).
  • Consume messages from a server.

Installation

Requirements

  • Python >= 3.6

Note: QuickMQ has only been tested on OSX and RHEL systems.

To install the latest stable version use pip.

pip install --upgrade quickmq

In addition to being on PyPi, QuickMQ is also deployed to the SSEC's GitLab package registry.

pip install --upgrade --index-url https://gitlab.ssec.wisc.edu/api/v4/projects/2625/packages/pypi/simple quickmq

To install the latest (possibly unstable) developer version use git.

git clone https://gitlab.ssec.wisc.edu/mdrexler/ssec_amqp.git
cd ssec_amqp
pip install .

Usage

API

The easiest way to use QuickMQ is with the API at ssec_amqp.api.

import ssec_amqp.api as mq

# Connect to multiple servers at once
mq.connect('server1', 'server2', user='username', password='password', exchange='satellite')

# Additionally, connect to a RabbitMQ cluster
mq.connect('cluster1', 'cluster2', 'cluster3', user='user', password='pass', exchange='satellite', cluster=True)

QuickMQ will now manage the connections to 'server1', 'server2', and the cluster servers until mq.disconnect() is called or the program exits. This means that if a connection to a server is interrupted, QuickMQ will automatically retry connecting to that server without interrupting connections to other servers or blocking the calling script.

# ...continued from above
# Now when we publish messages they'll get delivered to 'server1', 'server2', and one of the cluster servers.
import logging

try:
    while True:
        deliv_status = mq.publish({'msg': 'Hi from QuickMQ!'}, route_key='my.custom.topic')
        for con, stat in deliv_status.items():
            if stat != "DELIVERED":
                logging.warning("Couldn't deliver message to %s!", con)
except KeyboardInterrupt:
    logging.info("goodbye!")

mq.publish will return a dictionary that contains the status of the message delivery to each server.

  • "DELIVERED" means that the message was successfully delivered and acknowledged by the server.
  • "DROPPED" means that the message wasn't sent to the server (because the connection is actively reconnecting).
  • "REJECTED" means that the message was nacked by the server.

Note: QuickMQ will JSON-serialize messages passed to mq.publish, so make sure to pass the Python object itself and not a JSON string.

It's possible to see the current connection status for each server using mq.status.

# ...continued from above

con_status = mq.status()
for con, status in con_status.items():
    if status != "CONNECTED":
        logging.warning("Not currently connected to %s!", con)

mq.status will return a dictionary that contains the connection status to each server. The values are pretty self-explanatory; "CONNECTED", "RECONNECTING", and "DISCONNECTED". Something to keep in mind is mq.status won't actively check the connection, so a server with a "CONNECTED" status could still fail to publish and then be shown as "RECONNECTING".

Classes

If the API doesn't provide enough flexibility, it's also possible to use the classes that drive it directly.

# Recreate the code blocks above without using the API.

import logging

from ssec_amqp import AmqpClient, AmqpConnection, ClusteredConnection

client = AmqpClient(name='my_client') # Optionally give it a name for logging.

servers = ['server1', 'server2']

for server in servers:
    client.connect(AmqpConnection(server, user='user', password='pass', exchange='satellite'))

cluster_servers = ['cluster1', 'cluster2', 'cluster3']

client.connect(ClusteredConnection([AmqpConnection(server, user='user', password='pass') for server in cluster_servers]))

try:
    while True:
        deliv_status = client.publish({'msg': "hi from QuickMQ!'}, route_key='my.custom.topic')
        for con, stat in deliv_status.items():
            if stat != "DELIVERED":
                logging.warning("Couldn't deliver message to %s!", con)
except KeyboardInterrupt:
    pass

con_status = client.connections
for con, status in con_status.items():
    if status != "CONNECTED":
        logging.warning("Not connected to %s!", con)

# Connections won't automatically disconnect when using a custom client
client.disconnect()

For more information, see the source code for the client and the connections.

Author

Created/Maintained by Max Drexler.

License

MIT License. See LICENSE for more information.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page