Skip to main content

Python library for dataflow programming with Amazon SWF

Project description

simpleflow

https://badge.fury.io/py/simpleflow.png https://travis-ci.org/botify-labs/simpleflow.png?branch=master https://pypip.in/d/simpleflow/badge.png

Simple Flow is a Python library that provides abstractions to write programs in the distributed dataflow paradigm. It relies on futures to describe the dependencies between tasks. It coordinates the execution of distributed tasks with Amazon SWF.

A Future object models the asynchronous execution of a computation that may end.

It tries to mimics the interface of the Python concurrent.futures library.

Features

  • Provides a Future abstraction to define dependencies between tasks.

  • Define asynchronous tasks from callables.

  • Handle workflows with Amazon SWF.

  • Implement replay behavior like the Amazon Flow framework.

  • Handle retry of tasks that failed.

  • Automatically register decorated tasks.

  • Encodes/decodes large fields to S3 objects transparently (aka “jumbo fields”).

  • Handle the completion of a decision with more than 100 tasks.

  • Provides a local executor to check a workflow without Amazon SWF (see simpleflow --local command).

  • Provides decider and activity worker process for execution with Amazon SWF.

  • Ships with the simpleflow command. simpleflow --help for more information about the commands it supports.

Quickstart

Let’s take a simple example that computes the result of (x + 1) * 2. You will find this example in examples/basic.py.

We need to declare the functions as activities to make them available:

from simpleflow import (
    activity,
    Workflow,
    futures,
)

@activity.with_attributes(task_list='quickstart', version='example')
def increment(x):
    return x + 1

@activity.with_attributes(task_list='quickstart', version='example')
def double(x):
    return x * 2

@activity.with_attributes(task_list='quickstart', version='example')
def delay(t, x):
    time.sleep(t)
    return x

And then define the workflow itself in a example.py file:

class BasicWorkflow(Workflow):
    name = 'basic'
    version = 'example'
    task_list = 'example'

    def run(self, x, t=30):
        y = self.submit(increment, x)
        yy = self.submit(delay, t, y)
        z = self.submit(double, y)

        print('({x} + 1) * 2 = {result}'.format(
            x=x,
            result=z.result))
        futures.wait(yy, z)
        return z.result

Now check that the workflow works locally with an integer “x” and a wait value “t”:

$ simpleflow workflow.start --local examples.basic.BasicWorkflow --input '[1, 5]'
(1 + 1) * 2 = 4

input is encoded in JSON format and can contain the list of positional arguments such as '[1, 1] or a dict with the args and kwargs keys such as {"args": [1], "kwargs": {}}, {"kwargs": {"x": 1}}, or '{"args": [1], "kwargs": {"t": 5}}'`.

Now that you are confident that the workflow should work, you can run it on Amazon SWF with the standalone command:

$ simpleflow standalone --domain TestDomain examples.basic.BasicWorkflow --input '[1, 5]'

The standalone command sets an unique task list and manage all the processes that are needed to execute the workflow: decider, activity worker, and a client that starts the workflow. It is very convenient for testing a workflow by executing it with SWF during the development steps or integration tests.

Let’s take a closer look to the workflow definition.

It is a class that inherits from simpleflow.Workflow:

class BasicWorkflow(Workflow):

It defines 3 class attributes:

  • name, the name of the SWF workflow type.

  • version, the version of the SWF workflow type. It is currently provided only for labeling a workflow.

  • task_list, the default task list (see it as a dynamically created queue) where decision tasks for this workflow will be sent. Any decider that listens on this task list can handle this workflow. This value can be overrided by the simpleflow commands and objects.

It also implements the run method that takes two arguments: x and t=30 (i.e. t is optional and has the default value 30). These arguments are passed with the --input option. The run method describes the workflow and how its tasks should execute.

Each time a decider takes a decision task, it executes again the run from the start. When the workflow execution starts, it evaluates y = self.submit(increment, x) for the first time. y holds a future in state PENDING. The execution continues with the line yy = self.submit(delay, t, y). yy holds another future in state PENDING. This state means the task has not been scheduled. Now execution still continue in the run method with the line z = self.submit(double, y). Here it needs the value of the y future to evaluate the double activity. As the execution cannot continues, the decider schedules the task increment. yy is not a dependency for any task so it is not scheduled.

Once the decider has scheduled the task for y, it sleeps and waits for an event to be waken up. This happens when the increment task completes. SWF schedules a decision task. A decider takes it and executes the BasicWorkflow.run method again from the start. It evalues the line y = self.submit(increment, x). The task associated with the y future has completed. Hence y is in state FINISHED and contains the value 2 in y.result. The execution continues until it blocks. It goes by yy = self.submit(delay, t, y) that stays the same. Then it reaches z = self.submit(double, y). It gets the value of y.result and z now holds a future in state PENDING. Execution reaches the line with the print. It blocks here because z.result is not available. The decider schedules the task backs by the z future: double(y). The workflow execution continues so forth by evaluating the BasicWorkflow.run again from the start until it finishes.

Jumbo Fields

For some use cases, you want to be able to have fields larger than the standard SWF limitations (which is maximum 32K bytes on the largest ones, input and result, and lower for some others).

Simpleflow allows to transparently translate such fields to objects stored on AWS S3. The format is then the following:

simpleflow+s3://jumbo-bucket/with/optional/prefix/5d7191af-3962-4c67-997a-cdd39a31ba61 5242880

The format provides a pseudo-S3 address as a first word. The “simpleflow+s3://” prefix is here for implementation purposes, and may be extended later with other backends such as simpleflow+ssh or simpleflow+gs.

The second word provides the length of the object in bytes, so a client parsing the SWF history can decide if it’s worth it to pull/decode the object.

For now jumbo fields are limited to 5MB in size. Simpleflow will perform disk caching for this feature to avoid issuing too many queries to S3, which would slow down the deciders especially. Disk cache is located at /tmp/simpleflow-cache and is limited to 1GB, with a LRU eviction strategy. It’s performed with the DiskCache library.

You have to configure an environment variable to tell simpleflow where to store things (which implicitly enables the feature by the way):

SIMPLEFLOW_JUMBO_FIELDS_BUCKET=jumbo-bucket/with/optional/prefix

And ensure your deciders and activity workers have access to this S3 bucket (s3:GetObject and s3:PutObject should be enough, but please test it first).

The overhead of the signature format is maximum 91 chars at this point (fixed protocol and UUID width, and max 5M = 5242880 for the size part). So you should ensure that your bucket + directory is not longer than 256 - 91 = 165 chars, else you may not be able to get a working jumbo field signature for tiny fields. In that case stripping the signature would only break things down the road in unpredictable and hard to debug ways, so simpleflow will raise.

This feature is still in beta mode, and any feedback is appreciated.

Commands

Overview

Please read and even run the demo script to have a quick glance of simpleflow commands. To run the demo you will need to start decider and activity worker processes.

Start a decider with:

$ simpleflow decider.start --domain TestDomain --task-list test examples.basic.BasicWorkflow

Start an activity worker with:

$ simpleflow worker.start --domain TestDomain --task-list quickstart

Then execute ./extras/demo.

Controlling SWF access

The SWF region is controlled by the environment variable AWS_DEFAULT_REGION. This variable comes from the legacy “simple-workflow” project. The option might be exposed through a --region option in the future (if you want that, please open an issue).

The SWF domain is controlled by the --domain on most simpleflow commands. It can also be set via the SWF_DOMAIN environment variable. In case both are supplied, the command-line value takes precedence over the environment variable.

Note that some simpleflow commands expect the domain to be passed as a positionnal argument. In that case the environment variable has no effect for now.

The number of retries for accessing SWF can be controlled via SWF_CONNECTION_RETRIES (defaults to 5).

The identity of SWF activity workers and deciders can be controlled via SIMPLEFLOW_IDENTITY which should be a JSON-serialized string representing { "key": "value" } pairs that adds up (or override) the basic identity provided by simpleflow. If some value is null in this JSON map, then the key is removed from the final SWF identity.

List Workflow Executions

$ simpleflow workflow.list TestDomain basic-example-1438722273 basic OPEN

Workflow Execution Status

$ simpleflow –header workflow.info TestDomain basic-example-1438722273 domain workflow_type.name workflow_type.version task_list workflow_id run_id tag_list execution_time input TestDomain basic example basic-example-1438722273 22QFVi362TnCh6BdoFgkQFlocunh24zEOemo1L12Yl5Go= 1.70 {u’args’: [1], u’kwargs’: {}}

Tasks Status

You can check the status of the workflow execution with:

$ simpleflow --header workflow.tasks DOMAIN WORKFLOW_ID [RUN_ID] --nb-tasks 3
$ simpleflow --header workflow.tasks TestDomain basic-example-1438722273
Tasks                     Last State    Last State Time             Scheduled Time
examples.basic.increment  scheduled     2015-08-04 23:04:34.510000  2015-08-04 23:04:34.510000
$ simpleflow --header workflow.tasks TestDomain basic-example-1438722273
Tasks                     Last State    Last State Time             Scheduled Time
examples.basic.double     completed     2015-08-04 23:06:19.200000  2015-08-04 23:06:17.738000
examples.basic.delay      completed     2015-08-04 23:08:18.402000  2015-08-04 23:06:17.738000
examples.basic.increment  completed     2015-08-04 23:06:17.503000  2015-08-04 23:04:34.510000

Profiling

You can profile the execution of the workflow with:

$ simpleflow --header workflow.profile TestDomain basic-example-1438722273
Task                                 Last State    Scheduled           Time Scheduled  Start               Time Running  End                 Percentage of total time
activity-examples.basic.double-1     completed     2015-08-04 23:06              0.07  2015-08-04 23:06            1.39  2015-08-04 23:06                        1.15
activity-examples.basic.increment-1  completed     2015-08-04 23:04            102.20  2015-08-04 23:06            0.79  2015-08-04 23:06                        0.65

Controlling log verbosity

You can control log verbosity via the LOG_LEVEL environment variable. Default is INFO. For instance, the following command will start a decider with DEBUG logs:

$ LOG_LEVEL=DEBUG simpleflow decider.start –domain TestDomain –task-list test examples.basic.BasicWorkflow

Documentation

Full documentation (work-in-progress) is available at https://simpleflow.readthedocs.org/.

Requirements

Development

A Dockerfile is provided to help development on non-Linux machines.

You can build a simpleflow image with:

./script/docker-build

And use it with:

./script/docker-run

It will then mount your current directory inside the container and pass the most relevant variables (your AWS_* credentials for instance).

Running tests

You can run tests with:

./script/test

Any parameter passed to this script is propagated to the underlying call to py.test. This wrapper script sets some environment variables which control the behavior of simpleflow during tests:

  • SIMPLEFLOW_CLEANUP_PROCESSES: set to "yes" in tests, so tests will clean up child processes after each test case. You can set it to an empty string ("") or omit it if outside script/test if you want to debug things and take care of it yourself.

  • SIMPLEFLOW_ENV: set to "test" in tests, which changes some constants to ease or speed up tests.

  • SWF_CONNECTION_RETRIES: set to "1" in tests, which avoids having too many retries on the SWF API calls (5 by default in production).

  • SIMPLEFLOW_VCR_RECORD_MODE: set to "none" in tests, which avoids running requests against the real SWF endpoints in tests. If you need to update cassettes, see tests/integration/README.md

Release

In order to release a new version, you’ll need credentials on pypi.python.org for this software, as long as write access to this repository. Ask via an issue if needed. Rough process:

git checkout master git pull –rebase v=0.10.0 vi simpleflow/__init__.py git add . && git commit -m “Bump version to $v” git tag $v git push –tags python setup.py sdist upload -r pypi

License

MIT licensed. See the bundled LICENSE file for more details.

Python Simple Workflow

(This README was included with python-simple-workflow, now merged here as workflow.swf.)

https://travis-ci.org/botify-labs/python-simple-workflow.png?branch=develop

python-simple-workflow is a wrapper for Amazon Simple Workflow service. It aims to provide some abstractions over the webservice concepts through Boto library Swf api implementation.

It aims to provide:

  • Modelisation: Swf entities and concepts are to be manipulated through Models and QuerySets (any ressemblance with the Django api would not be a coincidence).

  • High-level Events, History: A higher level of abstractions over Swf events and history. Events are implemented as stateful objects aware of their own state and possible transitions. History enhance the events flow description, and can be compiled to check it’s integrity and the activities statuses transitions.

  • Decisions: Stateful abstractions above the Swf decision making system.

  • Actors: Swf actors base implementation such as a Decider or an activity task processor Worker from which the user can easily inherit to implement it’s own decision/processing model.

It provides querysets and model objects over commonly used concepts: domains, workflow types, activity types, and so on.

It is under MIT license, and any ideas, features requests, patches, pull requests to improve it are of course welcome.

Installation

pip install simple-workflow

Usage and all the rest

Please, refer to Documentation

What’s left?

Amazon interface models implementation:

✔ Domain @done (13-04-02 10:01) ✔ Workflow Type @done (13-04-02 10:01) ✔ Workflow Execution @done (13-04-05 10:13) ☐ Activity Type ☐ Decider

Amazon interface querysets implementation:

✔ DomainQuery @done (13-04-02 10:02) ✔ WorkflowTypeQuery @done (13-04-02 10:03) ✔ Workflow Execution @done (13-04-05 10:13) ☐ Activity Type ☐ Decider

General:

☐ Add sphinx doc ☐ Document real world example ☐ TESTS TESTS TESTS!

Changelog

0.18.0

  • Implement “jumbo” fields (#265)

0.17.0

  • Enhancement/272/implement workflow cancelation (#273)

  • Bugfix: 270: signals improvements (#271)

  • Enhancement: timer: get_event_details (#269)

  • Append “/” to get_step_path_prefix (#268)

  • Enhancement/misc (#266)

  • Repair reruns successful child workflows (#191)

0.16.0

  • Feature: timers (#258)

0.15.7

  • Kill worker on UnknownResourceFault’s during a heartbeat (#88) (#263)

  • Sort keys by default in json_dumps (#264)

0.15.6

  • Fix step attribute propagation (#261)

  • Enhancement: get_event_details (#235)

0.15.5

  • Enhancement: distinguish raises_on_failure between tasks and groups (#255)

  • Add time constants

  • Relax activity.with_attributes timeouts types

0.15.4

  • Enhancement: add canvas option break_on_failure (#253)

  • Compute task_id from ActivityTask if has get_task_id method (#237)

  • Another case of wrong task list (#234)

0.15.3

  • make raises_on_failure=True on step activities (#249)

  • SWF: support for non-Python tasks (#219)

  • Fix get_step_path_prefix

  • Make MarkerTask’s idempotents

0.15.2

  • mark when a step is scheduled before it’s executed (#243)

0.15.1

  • Enhancement: better activity type reason (#238)

  • Fix workers not catching errors during dispatch() step (#246)

  • Fix canvas.Chain send_result regression (#247)

0.15.0

  • Feature: steps (#221)

  • Make activity task result optional (#225)

  • Use details in addition to name to find markers (#227)

  • Logging: add exception information (#163)

  • swf/actors: support ‘Message’ key (#224)

  • Implement markers (#216) (#217)

  • Add retry on swf.process.Poller.poll and fail (#208)

0.14.2

  • propagate_attribute: skip signal objects (#215)

  • Local executor: check add_activity_task (#215)

0.14.1

  • Don’t send exception up if raises_on_failure is false (#213)

  • Fix UnicodeDecodeError on windows machine (#211)

  • Try to use less memory (#209)

  • Standalone mode: use created task list for children activities (#207)

0.14.0

  • Fix workers not stopping in case they start during a shutdown (#205)

  • Add support for SWF signals (#188)

  • Improvements on canvas.Group (#204)

0.13.4

  • Implement metrology on SWF and local workflows (#186)

0.13.3

  • Try..except pass for NoSuchProcess (#182)

0.13.2

  • Add optional canvas (#193)

  • Reorganize tests/ directory (#198)

  • Relax DeciderPoller task list check (#201)

  • Implement priorities on SWF tasks (#199)

0.13.1

  • Fix SWF executor not accepting ActivityTask’s in submit() method (#196)

0.13.0

  • Implement child workflow (#74)

  • Don’t schedule idempotent tasks multiple times (#107)

  • Child workflow ID: use parent’s id to generate

0.12.7

  • Control SWF processes identity via environment (#184)

0.12.6

  • Replace execution object with a more flexible get_execution_method() (#177)

  • Fix README_SWF.rst format (#175)

  • Fix CONTRIBUTING.rst format

  • docs/conf.py: remove relative import

0.12.5

  • Executor: expose workflow execution (#172)

0.12.4

  • Avoid returning too big responses to RespondDecisionTaskCompleted endpoint (#166)

  • Worker: remove useless monitor_child (#168)

0.12.3

  • Add max_parallel option in Group (#164)

0.12.2

  • Make the dynamic dispatcher more flexible (#161)

  • Fix README.rst format (#160)

  • Tiny command-line usability fixes (#158)

0.12.1

  • Don’t override passed “default” in json_dumps() (#155)

  • Expose activity context (#156)

0.12.0

  • Improve process management (#142)

0.11.17

  • Don’t reap children in the back of multiprocessing (#141)

  • Don’t force to pass a workflow to activity workers (#133)

  • Don’t override the task list if not standalone (#139)

  • Split FuncGroup submit (#146)

  • CI: Test on python 3 (#144)

  • Decider: use workflow’s task list if unset (#148)

0.11.16

  • Refactor: cleanups and many python 3 compatibility issues fixed (#135)

  • Introduce AggregationException to inspect exceptions inside canvas.Group/Chain (#92)

  • Improve heartbeating, now enabled by default on activity workers (#136)

0.11.15

  • Fix tag_list declaration in case no tag is associated with the workflow

  • Fix listing workflow tasks not handling “scheduled” (not started) tasks correctly

  • Fix CSV formatter outputing an extra “None” at the end of the output

  • Fix ‘simpleflow activity.rerun’ resolving the bad function name if not the last event

0.11.14

  • Various little fixes around process management, heartbeat, logging (#110)

0.11.13

  • Add ability to provide a ‘run ID’ with ‘simpleflow standalone –repair’

0.11.12

  • Fix –tags argument for simpleflow standalone (#114)

  • Improve tests and add integration tests (#116)

  • Add ‘simpleflow activity.rerun’ command (#117)

0.11.11

  • Fix a circular import on simpleflow.swf.executor

0.11.10

  • Fix previous_history initialization (#106)

  • Improve WorkflowExecutionQueryset default date values (#111)

0.11.9

  • Add a –repair option to simpleflow standalone (#100)

0.11.8

  • Retry boto.swf connection to avoid frequent errors when using IAM roles (#99)

0.11.7

Same as 0.11.6 but the 0.11.6 on pypi is broken (pushed something similar to 0.11.5 by mistake)

0.11.6

  • Add issubclass_ method (#96)

  • Avoid duplicate logs if root logger has an handler (#97)

  • Allow passing SWF domain via the SWF_DOMAIN environment variable (#98)

0.11.5

  • Don’t mask activity cancel exception (#84)

  • Propagate all decision response attributes up to Executor.replay() (#76, #94)

0.11.4

  • ISO dates in workflow history #91

  • Fix potential infinite retry loop #90

0.11.3

  • Fix replay hooks introduced in 0.11.2 (#86)

  • Remove python3 compatibility from README (which was not working for a long time)

0.11.2

  • Add new workflow hooks (#79)

0.11.1

  • Fix logging when an exception occurs

0.11.0

  • Merge swf package into simplefow for easier maintenance.

0.10.4 and below

Sorry changes were not documented for simpleflow <= 0.10.x.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

simpleflow-0.18.0.tar.gz (151.0 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page