Skip to main content

Remote AMQP mock

Project description

AMQP Mock

Codecov PyPI PyPI - Downloads Python Version

Installation

pip3 install amqp-mock

Overview

Test Publishing

from amqp_mock import create_amqp_mock

# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
    # 2. Publish message via "system under test"
    publish_message([1, 2, 3], "exchange")

    # 3. Test message has been published
    messages = await mock.client.get_exchange_messages("exchange")
    assert messages[0].value == [1, 2, 3]

Full code available here: ./examples/publish_example.py

Test Consuming

from amqp_mock import create_amqp_mock, Message, MessageStatus

# 1. Start AMQP mock server
async with create_amqp_mock() as mock:
    # 2. Mock next message
    await mock.client.publish_message("queue", Message([1, 2, 3]))

    # 3. Consume message via "system under test"
    consume_message("queue")

    # 4. Test message has been consumed
    history = await mock.client.get_queue_message_history("queue")
    assert history[0].status == MessageStatus.ACKED

Full code available here: ./examples/consume_example.py

Mock Server

Start server

import asyncio

from amqp_mock import AmqpServer, HttpServer, Storage, create_amqp_mock


async def run() -> None:
    storage = Storage()
    http_server = HttpServer(storage, port=8080)
    amqp_server = AmqpServer(storage, port=5672)
    async with create_amqp_mock(http_server, amqp_server):
        await asyncio.Future()

asyncio.run(run())

or via docker

docker run -p 8080:80 -p 5672:5672 tsv1/amqp-mock

Publish message

POST /queues/{queue}/messages

{
    "id": "9e342ac1-eef6-40b1-9eaf-053ee7887968",
    "value": [1, 2, 3],
    "exchange": "",
    "routing_key": "",
    "properties": null
}
HTTP

$ http POST localhost/queues/test_queue/messages \
    value:='[1, 2, 3]' \
    exchange=test_exchange

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

from amqp_mock import AmqpMockClient, Message

mock_client = AmqpMockClient()
message = Message([1, 2, 3], exchange="test_exchange")
await mock_client.publish_message("test_queue", message)

Get queue message history

GET /queues/{queue}/messages/history

HTTP

$ http GET localhost/queues/test_queue/messages/history

HTTP/1.1 200 OK
Content-Length: 190
Content-Type: application/json; charset=utf-8

[
    {
        "message": {
            "exchange": "test_exchange",
            "id": "94459a41-9119-479a-98c9-80bc9dabb719",
            "properties": null,
            "routing_key": "",
            "value": [1, 2, 3]
        },
        "queue": "test_queue",
        "status": "ACKED"
    }
]

Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.get_queue_message_history("test_queue")
# [
#   <QueuedMessage message=<Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>,
#                  queue='test_queue',
#                  status=MessageStatus.ACKED>
# ]

Get exchange messages

GET /exchanges/{exchange}/messages

HTTP

$ http GET localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 423
Content-Type: application/json; charset=utf-8

[
    {
        "exchange": "test_exchange",
        "id": "63fd1646-bdc1-4baa-9780-e337a9ab109c",
        "properties": {
            "app_id": "",
            "cluster_id": "",
            "content_encoding": "",
            "content_type": "",
            "correlation_id": "",
            "delivery_mode": 1,
            "expiration": "",
            "headers": null,
            "message_id": "5ec9024c74eca2e419fd7e29f7be846c",
            "message_type": "",
            "priority": null,
            "reply_to": "",
            "timestamp": null,
            "user_id": ""
        },
        "routing_key": "",
        "value": [1, 2, 3]
    }
]

Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
messages = await mock_client.get_exchange_messages("test_exchange")
# [
#   <Message value=[1, 2, 3], exchange='test_exchange', routing_key=''>
# ]

Delete exchange messages

DELETE /exchanges/{exchange}/messages

HTTP

$ http DELETE localhost/exchanges/test_exchange/messages

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.delete_exchange_messages("test_exchange")

Reset

DELETE /

HTTP

$ http DELETE localhost/

HTTP/1.1 200 OK
Content-Length: 0
Content-Type: application/json

Python

from amqp_mock import AmqpMockClient

mock_client = AmqpMockClient()
await mock_client.reset()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

amqp-mock-0.6.1.tar.gz (21.3 kB view hashes)

Uploaded Source

Built Distribution

amqp_mock-0.6.1-py3-none-any.whl (21.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page