Skip to main content

Parallel task graph framework.

Project description

===============
About TaskGraph
===============

``TaskGraph`` is a library that was developed to help manage complicated
computational software pipelines consisting of long running individual tasks.
Many of these tasks could be executed in parallel, almost all of them wrote
results to disk, and many times results could be reused from part of the
pipeline. TaskGraph manages all of this for you. With it you can schedule
tasks with dependencies, avoid recomputing results that have already been
computed, and allot multiple CPU cores to execute tasks in parallel if
desired.

TaskGraph Dependencies
----------------------

Task Graph is written in pure Python, but if the ``psutils`` package is
installed the distributed multiprocessing processes will be ``nice``\d.

Example Use
-----------

Install ``TaskGraph`` with

``pip install taskgraph``

Then

.. code-block:: python

def _create_list_on_disk(value, length, target_path):
"""Create a numpy array on disk filled with value of `size`."""
target_list = [value] * length
pickle.dump(target_list, open(target_path, 'wb'))


def _sum_lists_from_disk(list_a_path, list_b_path, target_path):
"""Read two lists, add them and save result."""
list_a = pickle.load(open(list_a_path, 'rb'))
list_b = pickle.load(open(list_b_path, 'rb'))
target_list = []
for a, b in zip(list_a, list_b):
target_list.append(a+b)
pickle.dump(target_list, open(target_path, 'wb'))

# create a taskgraph that uses 4 multiprocessing subprocesses when possible
task_graph = taskgraph.TaskGraph(self.workspace_dir, 4)
target_a_path = os.path.join(self.workspace_dir, 'a.dat')
target_b_path = os.path.join(self.workspace_dir, 'b.dat')
result_path = os.path.join(self.workspace_dir, 'result.dat')
result_2_path = os.path.join(self.workspace_dir, 'result2.dat')
value_a = 5
value_b = 10
list_len = 10
task_a = task_graph.add_task(
target=_create_list_on_disk,
args=(value_a, list_len, target_a_path),
target_path_list=[target_a_path])
task_b = task_graph.add_task(
target=_create_list_on_disk,
args=(value_b, list_len, target_b_path),
target_path_list=[target_b_path])
sum_task = task_graph.add_task(
target=_sum_lists_from_disk,
args=(target_a_path, target_b_path, result_path),
target_path_list=[result_path],
dependent_task_list=[task_a, task_b])

task_graph.close()
task_graph.join()

# expect that result is a list `list_len` long with `value_a+value_b` in it
result = pickle.load(open(result_path, 'rb'))

Running Tests
-------------

Taskgraph includes a ``tox`` configuration for automating builds across
python versions 2.7, 3.6, and whether ``psutil`` is installed. To execute all
tests on all platforms, run:

$ tox

Alternatively, if you're only trying to run tests on a single configuration
(say, python 3.5 without ``psutil``), you'd run::

$ tox -e py36

Or if you'd like to run the tests for the combination of Python 2.7 with
``psutil``, you'd run::

$ tox -e py27-psutil


.. :changelog:

=========================
TaskGraph Release History
=========================

Unreleased Changes
-----------------
* TaskGraph now stores all task completion information in a single SQLite
database stored in its cache directory. In previous versions
TaskGraph would write a small text file for each task in a highly branching
directory tree. This structure made removal of those directory trees
computationally difficult.
* Fixed an issue that would cause TaskGraph to reexecute if the target path
was included in the argument list and that path was not normalized to the
operating system's path style.

0.7.0 (2018-10-22)
------------------
* Fixed an issue where very long strings might be interpreted as paths and
Windows crashes because the path is too long.
* Fixed a deadlock issue where a Task might raise an unhandled exception as a
new task was added to the TaskGraph.
* Fixed the occasional ``BrokenPipeError`` that could occur when a Task
encountered an unhandled exception.
* Added an ``n_retries`` parameter to ``add_task`` that lets TaskGraph attempt
to reexecute a failing Task up to ``n_retries`` times before terminating
the TaskGraph.
* Removed the ``delayed_start`` option.

0.6.1 (2018-08-14)
------------------
* Resolving an issue with duplicate logging being printed to stdout when
``n_workers > 0``. Logging is now only handled in the process that contains
the TaskGraph instance.
* Updated main logging message to indicate which tasks, by task name, are
currently active and how many tasks are ready to execute but can't because
there is not an open worker.
* Attempted to fix an issue where processes in the process pool were not
terminating on a Linux system by aggressively joining all threads and
processes when possible.
* Fixed an issue that would cause tasks that had been previously calculated to
prematurely trigger children tasks even if the parent tasks of the current
task needed to be reexecuted.

0.6.0 (2018-07-24)
------------------
* Added a ``delayed_start`` flag to TaskGraph to allow for delayed execution
of taskgraph tasks. If enabled on threaded or multiprocess mode, calls to
``add_task`` will not execute tasks until the ``join`` method is invoked on
``taskgraph``. This allows for finer control over execution order when tasks
are passed non-equivalent ``priority`` levels.
* Fixing an issue where a non-JSON serializeable object would cause
``add_task`` to crash. Now TaskGraph is more tolerant of non-JSON
serializeable objects and will log warnings when parameters cannot be
serialized.
* TaskGraph constructor has an option to report a ongoing logging message
at a set interval. The message reports how many tasks have been committed
and completed.
* Fixed a bug that would cause TaskGraph to needlessly reexecute a task if
the only change was the order of the ``target_path_list`` or
``dependent_task_list`` variables.
* Fixed a bug that would cause a task to reexecute between runs if input
argument was a file that would be generated by a task that had not yet
executed.
* Made a code change that makes it very likely that tasks will be executed in
priority order if added to a TaskGraph in delayed execution mode.
* Refactored internal TaskGraph scheduling to fix a design error that made it
likely tasks would be needlessly reexecuted. This also simplified TaskGraph
flow control and cause slight performance improvements.
* Fixed an issue discovered when a ``scipy.sparse`` matrix was passed as an
argument and ``add_task`` crashed on infinite recursion. Type checking of
arguments has been simplified and now iteration only occurs on the Python
``set``, ``dict``, ``list``, and ``tuple`` types.
* Fixed an issue where the ``TaskGraph`` was not ``join``\ing the worker
process pool on a closed/join TaskGraph, or when the ``TaskGraph`` object
was being deconstructed. This would occasionally cause a race condition
where the TaskGraph may still have a cache ``.json`` file open. Discovered
through a flaky build test.
* Added functionality to the ``TaskGraph`` object to propagate log messages
from workers back to the parent process. This only applies for cases where
a ``TaskGraph`` instance is started with ``n_workers > 0``.
* Fixed an issue where a function that was passed as an argument would cause
a reexecution on a separate run because the ``__repr__`` of a function
includes its pointer address.
* Adjusted logging levels so that detailed task information is shown on DEBUG
but basic status updates are shown in INFO.

0.5.2 (2018-06-20)
------------------
* Fixing an issue where a Task would hang on a ``join`` if the number of
workers in TaskGraph was -1 and a call to ``add_task`` has a non-``None``
passed to ``target_path_list`` and the resulting task was ``\.join``\ed
after a second run of the same program.

0.5.1 (2018-06-20)
------------------
* Fixing an issue where TaskGraph would hang on a ``join`` if the number of
workers was -1 and a call to ``add_task`` has ``None`` passed to
``target_path_list``.

0.5.0 (2018-05-04)
------------------
* Taskgraph now supports python versions 2 and 3 (tested with python 2.7,
3.6).
* Fixed an issue with ``taskgraph.TaskGraph`` that prevented a multiprocessed
graph from executing on POSIX systems when ``psutil`` was installed.
* Adding matrix-based test automation (python 2.7, python 3.6, with/without
``psutil``) via ``tox``.
* Updating repository path to ``https://bitbucket.org/natcap/taskgraph``.

0.4.0 (2018-04-18)
------------------
* Auto-versioning now happens via ``setuptools_scm``, replacing previous calls
to ``natcap.versioner``.
* Added an option to ``TaskGraph`` constructor to allow negative values in the
``n_workers`` argument to indicate that the entire object should run in the
main thread. A value of 0 will indicate that no multiprocessing will be used
but concurrency will be allowed for non-blocking ``add_task``.
* Added an abstract class ``task.EncapsulatedTaskOp`` that can be used to
instance a class that needs scope in order to be used as an operation passed
to a process. The advantage of using ``EncapsulatedTaskOp`` is that the
``__name__`` hash used by ``TaskGraph`` to determine if a task is unique is
calculated in the superclass and the subclass need only worry about
implementation of ``__call__``.
* Added a ``priority`` optional scalar argument to ``TaskGraph.add_task`` to
indicates the priority preference of the task to be executed. A higher
priority task whose dependencies are satisfied will executed before one with
a lower priority.

0.3.0 (2017-11-17)
------------------
* Refactor of core scheduler. Old scheduler used asynchronicity to attempt to
test if a Task was complete, occasionally testing all Tasks in potential
work queue per task completion. Scheduler now uses bookkeeping to keep track
of all dependencies and submits tasks for work only when all dependencies
are satisfied.
* TaskGraph and Task ``.join`` methods now have a timeout parameter.
Additionally ``join`` now also returns False if ``join`` terminates because
of a timeout.
* More robust error reporting and shutdown of TaskGraph if any tasks fail
during execution using pure threading or multiprocessing.


0.2.7 (2017-11-09)
------------------
* Fixed a critical error from the last hotfix that prevented ``taskgraph``
from avoiding recomputation of already completed tasks.

0.2.6 (2017-11-07)
------------------
* Fixed an issue from the previous hotfix that could cause ``taskgraph`` to
exceed the number of available threads if enough tasks were added with long
running dependencies.
* Additional error checking and flow control ensures that a TaskGraph will
catastrophically fail and report useful exception logging a task fails
during runtime.
* Fixed a deadlock issue where a failure on a subtask would occasionally cause
a TaskGraph to hang.
* ``Task.is_complete`` raises a RuntimeError if the task is complete but
failed.
* More efficient handling of topological progression of task execution to
attempt to maximize total possible CPU load.
* Fixing an issue from the last release that caused the test cases to fail.
(Don't use 0.2.5 at all).

0.2.5 (2017-10-11)
------------------
* Fixed a bug where tasks with satisfied dependencies or no dependencies were
blocked on dependent tasks added to the task graph earlier in the main
thread execution.
* Indicating that ``psutil`` is an optional dependency through the ``setup``
function.

0.2.4 (2017-09-19)
------------------
* Empty release. Possible bug with PyPI release, so re-releasing with a
bumped up version.

0.2.3 (2017-09-18)
------------------
* More robust testing on a chain of tasks that might fail because an ancestor
failed.

0.2.2 (2017-08-15)
------------------
* Changed how TaskGraph determines of work is complete. Now records target
paths in file token with modified time and file size. When checking if work
is complete, the token is loaded and the target file stats are compared for
each file.

0.2.1 (2017-08-11)
------------------
* Handling cases where a function might be an object or something else that
can't import source code.
* Using natcap.versioner for versioning.

0.2.0 (2017-07-31)
------------------
* Fixing an issue where ``types.StringType`` is not the same as
``types.StringTypes``.
* Redefined ``target`` in ``add_task`` to ``func`` to avoid naming collision
with ``target_path_list`` in the same function.

0.1.1 (2017-07-31)
------------------
* Fixing a TYPO on ``__version__`` number scheme.
* Importing ``psutil`` if it exists.

0.1.0 (2017-07-29)
------------------
* Initial release.


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

taskgraph-0.7.1.tar.gz (31.7 kB view hashes)

Uploaded Source

Built Distributions

taskgraph-0.7.1-py3-none-any.whl (21.0 kB view hashes)

Uploaded Python 3

taskgraph-0.7.1-py2-none-any.whl (21.0 kB view hashes)

Uploaded Python 2

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page