Streamline your Kafka data processing, this tool aims to standardize streaming data from multiple Kafka clusters. With a pub-sub approach, multiple functions can easily subscribe to incoming messages, serialization can be specified per topic, and data is automatically processed by data sink functions.
Project description
Snapstream
Note: this project is work in progress.
Usage
The Conf()
singleton object contains default configurations:
from snapstream import Conf, Topic, snap, stream
Conf({'group.id': '$Default', 'bootstrap.servers': 'localhost:29091'})
cache = Cache('state/db') # stores data in rocksdb
t = Topic('flights', {
'sasl.username': 'AUEOZPERNVZW2',
'sasl.password': '*******',
})
The topic and the cache can be passed to the snap
decorator:
@snap(topic, sink=[cache])
def cache_flight(msg):
"""Process incoming flight and cache in rocksdb."""
val = msg.value()
key = val['flight_id']
flight = {
'id': val['flight_id'],
'origin': val['inbound_flight_location'],
'destination': val['outbound_flight_location'],
}
return key, flight
The cache_flight
function will handle any incoming messages, and store them in the cache.
We can set up a mock events stream by creating a generator function:
def mock_events():
for i in range(10):
yield i, f'MockEvent'
@snap(mock_events(), sink=[print])
def join_streams(msg):
"""Use id to find flight in cache."""
id, event = msg
flight = cache[id]
result = {
**flight,
'event': event,
}
return id, result
if __name__ == "__main__":
stream()
The join_streams
function handles incoming mock messages, and uses the id to find the associated flight from the cache.
When all streams are "snapped" to the handler functions, we call stream()
to start all data streams.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for snapstream-0.0.0.dev0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | d5b733012f278a79c6f6ea52f6c36a83001eb3bb853991a30d8bf4388a22b421 |
|
MD5 | 4e419fccf914ac398f9c96725192646a |
|
BLAKE2b-256 | bede69b366c1e6c15eac92973feb3ee68a6fdaa68746e8c77d9e51be410800ad |