Skip to main content

PostgreSQL backed job queues

Project description

pgjobq

A job queue built on top of Postgres.

Features

  • Best effort at most once delivery (jobs are only delivered to one worker at a time)
  • Automatic redelivery of failed jobs
  • Low latency delivery (near realtime, uses PostgreSQL's NOTIFY feature)
  • Completion tracking (using NOTIFY)
  • Bulk sending and receiving
  • Fully typed async Python client (using asyncpg)
  • Persistent scheduled jobs (scheduled in the database, not the client application)

Possible features:

  • Exponential backoffs
  • Wait for any job to complete even if it was sent somewhere else

Unplanned features:

  • Sending back response data (currently it needs to be sent out of band)
  • Supporting "subscriptions" (this is a simple queue, not a message broker)

Examples

from contextlib import AsyncExitStack

import anyio
import asyncpg  # type: ignore
from pgjobq import create_queue, connect_to_queue, migrate_to_latest_version

async def main() -> None:

    async with AsyncExitStack() as stack:
        pool: asyncpg.Pool = await stack.enter_async_context(
            asyncpg.create_pool(  # type: ignore
                "postgres://postgres:postgres@localhost/postgres"
            )
        )
        await migrate_to_latest_version(pool)
        await create_queue("myq", pool)
        queue = await stack.enter_async_context(
            connect_to_queue("myq", pool)
        )
        async with anyio.create_task_group() as tg:

            async def worker() -> None:
                async with queue.receive() as msg_handle_rcv_stream:
                    # receive a single message
                    async with (await msg_handle_rcv_stream.receive()).acquire():
                        print("received")
                        # do some work
                        await anyio.sleep(1)
                        print("done processing")
                        print("acked")

            tg.start_soon(worker)
            tg.start_soon(worker)

            async with queue.send(b'{"foo":"bar"}') as completion_handle:
                print("sent")
                await completion_handle()
                print("completed")
                tg.cancel_scope.cancel()


if __name__ == "__main__":
    anyio.run(main)
    # prints:
    # "sent"
    # "received"
    # "done processing"
    # "acked"
    # "completed"

Development

  1. Clone the repo
  2. Start a disposable PostgreSQL instance (e.g docker run -it -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres)
  3. Run make test

See this release on GitHub: v0.4.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgjobq-0.4.0.tar.gz (11.9 kB view hashes)

Uploaded Source

Built Distribution

pgjobq-0.4.0-py3-none-any.whl (13.0 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page