A general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.
Project description
A general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.
Latest release 20230331:
- Task: subclass BaseTask instead of (FSM, RunStateMixin).
- BaseTask.init: use @uses_runstate to ensure we've got a RunState.
Class BaseTask(cs.fsm.FSM, cs.gvutils.DOTNodeMixin, cs.resources.RunStateMixin)
A base class subclassing cs.fsm.FSM
with a RunStateMixin
.
Note that this class and the FSM
base class does not provide
a FSM_DEFAULT_STATE
attribute; a default state
value of
None
will leave .fsm_state
unset.
This behaviour is is chosen mostly to support subclasses
with unusual behaviour, particularly Django's Model
class
whose refresh_from_db
method seems to not refresh fields
which already exist, and setting .fsm_state
from a
FSM_DEFAULT_STATE
class attribute thus breaks this method.
Subclasses of this class and Model
should not provide a
FSM_DEFAULT_STATE
attribute, instead relying on the field
definition to provide this default in the usual way.
Class BlockedError(TaskError, cs.fsm.FSMError, builtins.Exception, builtins.BaseException)
Raised by a blocked Task
if attempted.
Function main(argv)
Dummy main programme to exercise something.
Function make(*tasks, fail_fast=False, queue=None)
Generator which completes all the supplied tasks
by dispatching them
once they are no longer blocked.
Yield each task from tasks
as it completes (or becomes cancelled).
Parameters:
tasks
:Task
s as positional parametersfail_fast
: defaultFalse
; if true, cease evaluation as soon as a task completes in a state with is notDONE
queue
: optional callable to submit a task for execution later via some queue such asLater
or celery
The following rules are applied by this function:
- if a task is being prepared, raise an
FSMError
- if a task is already running or queued, wait for its completion
- if a task is pending:
- if any prerequisite has failed, fail this task
- if any prerequisite is cancelled, cancel this task
- if any prerequisite is pending, make it first
- if any prerequisite is not done, fail this task
- otherwise dispatch this task and then yield it
- if
fail_fast
and the task is not done, return
Examples:
>>> t1 = Task('t1', lambda: print('doing t1'), track=True)
>>> t2 = t1.then('t2', lambda: print('doing t2'), track=True)
>>> list(make(t2)) # doctest: +ELLIPSIS
t1 PENDING->dispatch->RUNNING
doing t1
t1 RUNNING->done->DONE
t2 PENDING->dispatch->RUNNING
doing t2
t2 RUNNING->done->DONE
[Task('t2',<function <lambda> at ...>,state='DONE')]
Function make_later(L, *tasks, fail_fast=False)
Dispatch the tasks
via L:Later
for asynchronous execution
if it is not already completed.
The caller can wait on t.result
for completion.
This calls make_now()
in a thread and uses L.defer
to
queue the task and its prerequisites for execution.
Function make_now(*tasks, fail_fast=False, queue=None)
Run the generator make(*tasks)
to completion and return the
list of completed tasks.
Class Task(BaseTask, cs.fsm.FSM, cs.gvutils.DOTNodeMixin, cs.resources.RunStateMixin, cs.threads.HasThreadState, cs.context.ContextManagerMixin)
A task which may require the completion of other tasks.
The model here may not be quite as expected; it is aimed at
tasks which can be repaired and rerun.
As such, if self.run(func,...)
raises an exception from
func
then this Task
will still block dependent Task
s.
Dually, a Task
which completes without an exception is
considered complete and does not block dependent Task
s.
Keyword parameters:
cancel_on_exception
: if true, cancel thisTask
if.run
raises an exception; the default isFalse
, allowing repair and retrycancel_on_result
: optional callable to test theTask.result
after.run
; if the callable returnsTrue
theTask
is marked as cancelled, allowing repair and retryfunc
: the function to call to complete theTask
; it will be called asfunc(*func_args,**func_kwargs)
func_args
: optional positional arguments, default()
func_kwargs
: optional keyword arguments, default{}
lock
: optional lock, default anRLock
state
: initial state, default fromself._state.initial_state
, which is initally 'PENDING
'track
: defaultFalse
; ifTrue
then apply a callback for all states to print task transitions; otherwise it should be a callback function suitable forFSM.fsm_callback
Other arguments are passed to theResult
initialiser.
Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 10)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
Users wanting more immediate semantics can supply
cancel_on_exception
and/or cancel_on_result
to control
these behaviours.
Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 2)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
Class TaskError(cs.fsm.FSMError, builtins.Exception, builtins.BaseException)
Raised by Task
related errors.
Class TaskQueue
A task queue for managing and running a set of related tasks.
Unlike make
and Task.make
, this is aimed at a "dispatch" worker
which dispatches individual tasks as required.
Example 1, put 2 dependent tasks in a queue and run:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1, t2)
>>> for _ in q.run(): pass
...
t1
t2
Example 2, put 1 task in a queue and run. The queue only runs the specified tasks:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1)
>>> for _ in q.run(): pass
...
t1
Example 2, put 1 task in a queue with run_dependent_tasks=True
and run.
The queue pulls in the dependencies of completed tasks and also runs those:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1, run_dependent_tasks=True)
>>> for _ in q.run(): pass
...
t1
t2
Method TaskQueue.__init__(self, *tasks, run_dependent_tasks=False)
:
Initialise the queue with the supplied tasks
.
Release Log
Release 20230331:
- Task: subclass BaseTask instead of (FSM, RunStateMixin).
- BaseTask.init: use @uses_runstate to ensure we've got a RunState.
Release 20230217: Task: subclass HasThreadState, drop .current_task() class method.
Release 20221207:
- Pull out core stuff from Task into BaseTask, aids subclassing.
- BaseTask: explainatory docustring about unusual FSM_DEFAULT_STATE design choice.
- BaseTask.tasks_as_dot: express the edges using the node ids instead of their labels.
- BaseTask: new tasks_as_svg() method like tasks_as_dot() but returning SVG.
Release 20220805: Initial PyPI release.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for cs.taskqueue-20230331-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3e872824d5472e8446b18b10b67e4d0eaca6ea811bcaba984bfcba66f8b75a3c |
|
MD5 | 64177f7497a52df1585c9a66bcce9300 |
|
BLAKE2b-256 | 14e88ebf82d4067afd12fe039fa0bc30a2ee759b216c128ead53e82fe89f5dcd |