Skip to main content

Streamline your Kafka data processing, this tool aims to standardize streaming data from multiple Kafka clusters. With a pub-sub approach, multiple functions can easily subscribe to incoming messages, serialization can be specified per topic, and data is automatically processed by data sink functions.

Project description

Snapstream

Note: this project is work in progress.

Usage

The Conf() singleton object contains default configurations:

from snapstream import Conf, Topic, snap, stream

Conf({'group.id': '$Default', 'bootstrap.servers': 'localhost:29091'})

cache = Cache('state/db')  # stores data in rocksdb

t = Topic('flights', {
    'sasl.username': 'AUEOZPERNVZW2',
    'sasl.password': '*******',
})

The topic and the cache can be passed to the snap decorator:

@snap(topic, sink=[cache])
def cache_flight(msg):
    """Process incoming flight and cache in rocksdb."""
    val = msg.value()
    key = val['flight_id']

    flight = {
        'id': val['flight_id'],
        'origin': val['inbound_flight_location'],
        'destination': val['outbound_flight_location'],
    }
    return key, flight

The cache_flight function will handle any incoming messages, and store them in the cache.

We can set up a mock events stream by creating a generator function:

def mock_events():
    for i in range(10):
        yield i, f'MockEvent'


@snap(mock_events(), sink=[print])
def join_streams(msg):
    """Use id to find flight in cache."""
    id, event = msg
    flight = cache[id]
    result = {
        **flight,
        'event': event,
    }
    return id, result


if __name__ == "__main__":
    stream()

The join_streams function handles incoming mock messages, and uses the id to find the associated flight from the cache.

When all streams are "snapped" to the handler functions, we call stream() to start all data streams.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

snapstream-0.0.0.dev0.tar.gz (6.9 kB view hashes)

Uploaded Source

Built Distribution

snapstream-0.0.0.dev0-py3-none-any.whl (8.2 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page