Skip to main content

A framework for fast developing scalable data pipelines following a simple design pattern

Project description

A framework for rapid development of robust data pipelines following a simple design pattern

pipeline comic

from https://xkcd.com

Documentation Status Tests Tests Coverage

SmartPipeline gives you the tools to design and formalize simple data pipelines, in which tasks are sequentially encapsulated in pipeline stages.

It is straightforward to implement pipelines, but they are deeply customizable: stages can run concurrently and scale on heavy tasks, they can process batch of items at once, moreover executions and errors can be monitored easily.

It is a framework for engineering sequences of data operations and making them concurrent: an optimal solution for fast and clean data analysis prototypes (small/medium projects and POC) but also production code, as an alternative to plain scripts. Consider it as a solution for problems where big task queues and workflow frameworks are overkill. No dependencies are required.

Install

Install from PyPI, no dependencies will be installed:

pip install smartpipeline

Writing your pipeline

SmartPipeline is designed to help the developer following best practices, the design is based on industrial experience on data products.

SmartPipeline focuses on simplicity and efficiency in handling data locally, i.e. serialization and copies of the data are minimized.

Main features:

  • Define a pipeline object as a sequence of stateful stage objects, optionally set a source on which the pipeline iterates.

  • A pipeline can run indefinitely on the source or it can be used to process single items.

  • Stages can run concurrently on the source or on single asynchronously processed items.

  • A stage can be designed for processing batches, i.e. sequences of consecutive items, at once.

  • Custom error handling can be set for logging and monitoring at stage level.

An example of a trivial pipeline definition and run:

class RandomGenerator(Source):
    def pop(self):
        item = Item()
        item.data["number"] = random.random()
        return item


class Adder(Stage):
    def __init__(self, value):
        self.value = value

    def process(self, item):
        item.data["number"] += self.value
        return item


class Rounder(BatchStage):
    def process_batch(self, items):
        for item in items:
            item.data["number"] = round(item.data["number"], 1)
        return items


pipeline = (
    Pipeline()
    .set_source(RandomGenerator())
    .append("adder", Adder(1), concurrency=2)
    .append("rounder", Rounder(size=100))  # process 100 items at a time
    .build()
)


for item in pipeline.run():
    print(item)

Read the documentation for an exhaustive guide.

The examples folder contains full working sample pipelines.

Future improvements:

  • Stages can be memory profiled.

  • Processed items can be cached at stage level.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

SmartPipeline-0.7.1.tar.gz (30.3 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page