Skip to main content

A tiny pipeline builder

Project description

chkpt

A tiny pipeline builder

What

chkpt is a zero-dependency, 100-line library that makes it easy to define and execute checkpointed pipelines.

It features...

  • Fluent pipeline construction
  • Transparent caching of expensive operations
  • JSON serialization

How

Defining a Stage

Stages are the atomic units of work in chkpt and correspond to single Python functions. Existing functions need only use a decorator @chkpt.Stage.wrap() to be used as a Stage:

@chkpt.Stage.wrap()
def stage1():
  return "123"

# stage1 is now a Stage instance
assert isinstance(stage1, chkpt.Stage)

# but the original function is still accessible
assert stage1.func() == "123"

Stages can also accept parameters to be provided by other Stages in the final Pipeline:

@chkpt.Stage.wrap()
def stage2(stage1_input):
  return [stage1_input, "456"]

Defining a Pipeline

Pipelines define the excution graph of Stages to be run. Stages are combined with shift operators (<< and >>) to direct the dataflow:

# Each defines a pipeline calculating `stage1` and passing its output to `stage2`.
pipeline = stage1 >> stage2
pipeline = stage2 << stage1
pipeline = stage2 << (stage1,)
pipeline = (stage1,) >> stage2
pipeline = () >> stage1 >> stage2 

More complex pipelines should be defined from the leaves down:

result1 = (stage1, stage2) >> stage3
result2 = (result1, stage1) >> stage4
pipeline = result2 >> stage5

Executing a Pipeline

Pipelines can be directly executed which will use the default config settings:

result = pipeline()

The defaults can be configured by passing a Config instance:

# Will store all stage results and attempt to load already-stored results, if present.
result = pipeline(chkpt.Config(store=True, load=True, dir='/tmp'))

Examples

For detailed usage, see the examples/ directory.

The following is a brief example pipeline:

import chkpt


@chkpt.Stage.wrap()
def make_dataset1():
  ...

@chkpt.Stage.wrap()
def big_download2():
  ...

@chkpt.Stage.wrap()
def work_in_progress_analysis(dataset1, dataset2):
  ...

pipeline = (make_dataset1, big_download2) >> work_in_progress_analysis
# Work-intensive inputs only run once, caching on reruns.
result = pipeline(chkpt.Config(load=[make_dataset1, big_download2]))

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

chkpt-0.1.0.tar.gz (4.2 kB view hashes)

Uploaded Source

Built Distribution

chkpt-0.1.0-py3-none-any.whl (4.3 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page