Skip to main content

A small set of utilities to help with writing synchronous code flows in a collaborative multitasking context. It has been designed around the feature set of gevent (http://www.gevent.org)

Project description

============
gevent_async
============

A small set of utilities to help with writing synchronous code flows in a collaborative multitasking context.
It has been designed around the feature set of gevent (http://www.gevent.org)

------------------------------------------------------------------------------------------------------------

--------------
deferred calls
--------------

``async.DeferredCallHandler`` is a wrapper for asynchronously handled function calls.
This allows to control in which context the execution of those functions are done, which is essential
in collaborative multitasking.

There are 2 available types of calls:
- ``sync`` (synchronous): this type of call awaits for the deferred call handle to process the call
to return. for a user's perspective, it behaves like a regular function call.
- ``oneway`` (one way): this type of call returns instantly. Due to its nature, there is no way to know
whether, once it has been processed, it has succeeded or failed.

Example
=======

For instance, imagining we have a Manager entity that must handle some resources in an atomic manner::

from async import DeferredCallHandler
class Manager(DeferredCallHandler):
def manage(self):
# do things with resources
# with the assurance that the resources won't
# be modified during process

self.process() # process pending calls

# do more things

def access_resources(self):
#returns the resources the manager has properly managed.

def update_resource(self, data):
#updates a resource info

def run(self):
while True:
self.manage()

We can startup the manager and call functions on it from multiple greenlets::

manager = Manager()
gevent.spawn(manager.run)
# At that point, the manager entity is will be doing resource management

resources = ... # we have an array of resources

def monitor(target):
for event in target.events():
# we could apply some transformation to the event, and then
# forward it to the manager.
manager.oneway.update_resource(event)

for resource in resources:
gevent.spawn(monitor, resource)


def consumer():
while True:
resources = manager.access_resources()
# at that point, we have the guarantee that the resources
# are properly managed and will not become stale or corrupted during process.

consumer()

DeferredCallHandler API documentation
=====================================

* ``def process(forever=False, whitelist=None)``:

Processes all the the pending deferred calls.

If ``forever`` is set to ``True``, process will remain waiting for new calls until
a call to ``stop_processing()`` is performed.

If ``whitelist`` is set as a list of string, only functions which names match the elements
in the white list will be executed.

* ``def stop_processing()``:

Interrupts the iteration through incoming calls of a DeferredCallHandler's call to
``process(forever=True)``.

Exceptions
==========

sync calls will forward exceptions just like regular functions::

from async import DeferredCallHandler
class Lemming(DeferredCallHandler):
def kaboom(self):
raise Exception("#high pitched# oh no!")

lemming = Lemming()

spawn(lemming.process, forever=True)

try:
lemming.sync.kaboom()
except Exception:
pass # We should hit that

# This should trigger the exception but produce an exception log entry.
lemming.oneway.kaboom()

Regular function calls
======================

``DeferredCallHandler`` objects don't prevent direct function calls. Use at your own risk::

from async import DeferredCallHandler
class Manager(DeferredCallHandler):
def manage(self):
# do things with resources
# with the assurance that the resources won't
# be modified during process

self.process() # process pending calls

# do more things

def access_resources(self):
#returns the resources the manager has properly managed.

def update_resource(self, data):
#updates a resource info

def run(self):
while True:
self.manage()

manager = Manager()
gevent.spawn(manager.run)

resources = manager.access_resources()
# !!! The resources may be in the middle of a management process and their state
# may be incoherent

resources = manager.sync.access_resources()
# In that case, we're guaranteed the management process is not running.

Timeouts
========

``sync`` calls can be specified with an optional timeout, to ensure actions are performed
within a given time frame::

from async import DeferredCallHandler
class ABitSlow(DeferredCallHandler):
def taking_my_time(self):
gevent.sleep(10)

slow = ABitSlow()

spawn(slow.process, forever=True)

try:
slow.sync(timeout=1).taking_my_time()
except gevent.Timeout:
pass # We should hit that

------------------------------------------------------------------------------------------------------------

------------------------
multitask state handling
------------------------

Partially inspired by the mechanism of tail recursion, we provide a way to contain and handle code
to manage the behaviour of state machines within greenlets.

The ``@state`` decorator transforms a function method into a state greenlet. When another state function
is invoked, it create a new state greenlet that replaces the current state greenlet, effectively replicating
the behaviour of tail recursion.

For instance::

@state(transitions_to="growing")
def sprouting()
# germination process here
growing() # the sprouting greenlet terminates and leaves way to the growing one

@state(transitions_to="flowering")
def growing()
# transform CO2 and sunlight to biomass
flowering() # the growing greenlet terminates and leaves way to the flowering one

@state(transitions_to=["dead", "withering"])
def flowering()
# Grow flowers
if is_eaten:
# parameters can be given to state changes.
dead(is_eaten=True) # the flowering greenlet terminates and leaves way to the dead one
else:
withering() # the flowering greenlet terminates and leaves way to the withering one

@state(transitions_to="dead")
def withering()
# Dry up
dead() # the withering greenlet terminates and leaves way to the dead one

@state # terminal state, no transitions
def dead(is_eaten=False)
if not is_eaten:
# clean up phase


sprouting() # spawns the initial state

The ``@state`` decorator can also be used for methods::

class Flower()
@state(transitions_to="growing")
def sprouting(self)
# germination process here
growing() # the sprouting greenlet terminates and leaves way to the growing one

# ...

Correct transitions must be specified by the ``transitions_to`` parameter or any incorrect transition
will raise the ``ValidationError`` exception.

Callbacks
=========

Callbacks can be defined on transition. By setting the on_start parameter to a state, a given callback will
be activated whenever a state is started.

.. attention:: The callback code is executed on the greenlet issuing the state transition, not the greenlet of the new state.

The expected callback signature is ``def on_start(state, *args, **kwargs)``, where ``state`` is the
(at that point, still not started) ``async.state.State`` state greenlet which will handle the execution of the state and
``*args`` and ``**kwargs`` are the parameters given to the state call.

For instance::

def on_transition(new_state, target, *args, **kwargs):
if "store" in kwargs and kwargs["store"]:
target.state = new_state

class Object(object):
def __init__(self):
self.state = None

@state(on_start=on_transition)
def a_state(self, store=False):
pass

obj = Object()
obj.a_state(store=True)
sleep()

obj.state # => is now storing the current state object.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gevent_async-0.8.tar.gz (7.6 kB view hashes)

Uploaded source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page