Skip to main content

Pika bus wrapper with amqp

Project description

PikaBus

PyPI version Build Status MIT license

The PikaBus library is a wrapper around pika to make it easy to implement the messages, events and command pattern, as described in detail here:

Install Or Upgrade

  • pip install --upgrade PikaBus

Prerequisites

  • python3x

Example

import asyncio
import pika
import time
from PikaBus.abstractions.AbstractPikaBus import AbstractPikaBus
from PikaBus.PikaBusSetup import PikaBusSetup

def messageHandlerMethod(**kwargs):
  """
  A message handler method may simply be a method with som **kwargs.
  The **kwargs will be given all incoming pipeline data, the bus and the incoming payload as a dictionary.
  """
  data: dict = kwargs['data']
  bus: AbstractPikaBus = kwargs['bus']
  payload: dict = kwargs['payload']
  print(payload)

# Use Pika connection params to set connection details
credentials = pika.PlainCredentials('amqp', 'amqp')
connParams = pika.ConnectionParameters(
        host='localhost',
        port=5672,
        virtual_host='/',
        credentials=credentials)

# Create a PikaBusSetup instance with a listener queue, and add the message handler method.
pikaBusSetup = PikaBusSetup(connParams, listenerQueue='myQueue')
pikaBusSetup.AddMessageHandler(messageHandlerMethod)

# Start consuming messages from the queue.
consumingTask = pikaBusSetup.StartAsync()

# Create a temporary bus to subscribe on topics and send or publish messages.
bus: AbstractPikaBus = pikaBusSetup.CreateBus()
bus.Subscribe('myTopic')
payload = {'hello': 'world!'}
bus.Send(payload=payload, queue='myQueue')
bus.Publish(payload=payload, topic='myTopic')

# Give it a few seconds for the consumer to receive all sent messages, and then stop all consuming channels
time.sleep(2)
pikaBusSetup.Stop()

# Wait for the consuming task to complete safely.
loop = asyncio.get_event_loop()
loop.run_until_complete(consumingTask)

Locate more examples here:

  • ./Examples
    • Start the local rabbitmq instance with docker and DockerBuildManagement:
      • pip install DockerBuildManagement
      • dbm -swarm -start
    • The run the example:
      • pip install --upgrade PikaBus
      • python ./Examples/basic_example.py

Development

Dependencies:

  • pip install twine
  • pip install wheel
  • pip install -r requirements.txt

Publish New Version.

  1. Configure setup.py with new version.
  2. Package: python setup.py bdist_wheel
  3. Publish: twine upload dist/*
  4. Or with dbm:
    • pip install DockerBuildManagement
    • dbm -build -publish

Run Unit Tests

  • pip install DockerBuildManagement
  • dbm -test

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

PikaBus-1.0.1-py3-none-any.whl (15.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page