Skip to main content

High level pub/sub package

Project description

Papfa

https://img.shields.io/pypi/v/papfa.svg Pipeline Status https://codecov.io/gh/kamyab98/papfa/branch/master/graph/badge.svg?token=6O48ISX0KN Documentation Status

Papfa is a high level pub/sub pattern Python library.

Quick Start for Django

Install Papfa:

$ pip install papfa

Add Papfa to your settings:

PAPFA = {
    'BROKER': 'KAFKA',
    'KAFKA_BOOTSTRAP_SERVERS': ...,
    'KAFKA_SASL_PASSWORD': ...,
    'KAFKA_SASL_USERNAME': ...,
    'KAFKA_SASL_MECHANISM': ...,
    'KAFKA_SECURITY_PROTOCOL': ...,
    'GROUP_ID_PREFIX': ...,
    'SCHEMA_REGISTRY_URL': ...,
    'SCHEMA_REGISTRY_BASIC_AUTH': ...,
    'CONSUMER_MIDDLEWARES': [
        ...
    ]

}

Consumer

Add <django_project>.papfa.py

from papfa import Papfa

papfa_app = Papfa()

Import papfa in <django_project>.__init__.py

from <django_project>.papfa import papfa_app

__all__ = ['papfa_app']

Add consumer in <app>/consumers.py:

from papfa.consumers import consumer
from papfa.dtos import Record

@consumer(topic='topic_name')
def my_consumer(messages: List[Record]):
    print(messages)

Use papfa cli to consume

papfa consume -a <django_app> <consumer_name>

Example of project structure:

└── shop
    ├── shop
    │   ├── __init__.py
    │   ├── papfa.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── app1
    │   ├── __init__.py
    │   ├── admin.py
    │   ├── apps.py
    │   ├── consumers.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── models.py
    │   ├── tests.py
    │   └── views.py
    ├── app2
    │   ├── __init__.py
    │   ├── admin.py
    │   ├── apps.py
    │   ├── consumers.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── models.py
    │   ├── tests.py
    │   └── views.py
    └── manage.py

Producer

Produce Message:

from dataclasses import dataclass

from dataclasses_avroschema import AvroModel
from papfa.producer import get_message_producer
from papfa.dtos import Record

@dataclass
class User(AvroModel):
    name: str
    age: int

r1 = Record(value=User(name='thom', age=53))
r1 = Record(value=User(name='jonny', age=50))

message_producer = get_message_producer(topic='topic_name', User)

message_producer.produce(messages=[r1, r2])

CLI

Papfa provides a command line interface to consume and monitor consumers.

Commands

Command

Description

list

list of all consumers

consume

consume messages from a known consumer

stats

show stats of a consumer

Middleware

Papfa provides middlewares for both consumers and producers. You can implement your own middleware by extending the papfa.middlewares.consumer.ConsumerMiddleware and papfa.middlwares.producer.ProducerMiddleware class.

Default Middlewares

  • papfa.middlewares.consumer.ConsumedMessageStatsMiddleware - Logs the last message consumed by each topic - consumer group

Serialization

For Now Papfa only support confluent avro serialization with schema registry.

Broker

For Now Papfa only support Apache Kafka.

Features

  • Batch Processing (Commit per batch)

  • Consumed Messages Stats

Todos

  • Handle Idempotency

  • Add Other Brokers & Serializers

  • Handle Multiple Broker Cluster

Credits

This package was created with Cookiecutter and the audreyr/cookiecutter-pypackage project template.

History

0.1.11 (2023-01-23)

  • Fix meta_data bug

0.1.10 (2023-01-23)

  • Add Transactional Producer

  • Add More Logs for Consumers

  • Fix Avro Dependency

0.1.9 (2023-01-03)

  • Add consumer_kwargs in consumer

0.1.8 (2022-12-07)

  • Add kafka_config in consumer

0.1.7 (2022-11-29)

  • Add deserialize key flag

0.1.6 (2022-11-27)

  • Add Python 3.10 Support

0.1.5 (2022-06-08)

  • Fix AppRegistryNotReady error when importing celery

0.1.4 (2022-06-01)

  • Fix group id with prefix in consumer

0.1.3 (2022-05-31)

  • Fix auto commit in batch processing.

0.1.1 (2022-05-28)

  • Fix value error caused by schema registry invalid configuration.

0.1.0 (2022-05-11)

  • First release on PyPI.

Supported by

AWS Cloud computing and Security Sponsor Datadog Monitoring Fastly CDN Google Download Analytics Pingdom Monitoring Sentry Error logging StatusPage Status page