Skip to main content

A small set of utilities to help with writing synchronouscode flows in a collaborative multitasking context.

Project description

A small set of utilities to help with writing synchronous code flows in a collaborative multitasking context. It has been designed around the feature set of gevent (http://www.gevent.org)

deferred calls

async.DeferredCallHandler is a wrapper for asynchronously handled function calls. This allows to control in which context the execution of those functions are done, which is essential in collaborative multitasking.

There are 2 available types of calls:
  • sync (synchronous): this type of call awaits for the deferred call handle to process the call to return. for a user’s perspective, it behaves like a regular function call.

  • oneway (one way): this type of call returns instantly. Due to its nature, there is no way to know whether, once it has been processed, it has succeeded or failed.

Example

For instance, imagining we have a Manager entity that must handle some resources in an atomic manner:

from async import DeferredCallHandler
class Manager(DeferredCallHandler):
    def manage(self):
        # do things with resources
        # with the assurance that the resources won't
        # be modified during process

        self.process() # process pending calls

        # do more things

    def access_resources(self):
        #returns the resources the manager has properly managed.

    def update_resource(self, data):
        #updates a resource info

    def run(self):
        while True:
            self.manage()

We can startup the manager and call functions on it from multiple greenlets:

manager = Manager()
gevent.spawn(manager.run)
# At that point, the manager entity is will be doing resource management

resources = ... # we have an array of resources

def monitor(target):
    for event in target.events():
        # we could apply some transformation to the event, and then
        # forward it to the manager.
        manager.oneway.update_resource(event)

for resource in resources:
    gevent.spawn(monitor, resource)


def consumer():
    while True:
        resources = manager.access_resources()
        # at that point, we have the guarantee that the resources
        # are properly managed and will not become stale or corrupted during process.

consumer()

DeferredCallHandler API documentation

  • def process(forever=False, whitelist=None):

    Processes all the the pending deferred calls.

    If forever is set to True, process will remain waiting for new calls until a call to stop_processing() is performed.

    If whitelist is set as a list of string, only functions which names match the elements in the white list will be executed.

  • def stop_processing():

    Interrupts the iteration through incoming calls of a DeferredCallHandler’s call to process(forever=True).

Exceptions

sync calls will forward exceptions just like regular functions:

from async import DeferredCallHandler
class Lemming(DeferredCallHandler):
    def kaboom(self):
        raise Exception("#high pitched# oh no!")

lemming = Lemming()

spawn(lemming.process, forever=True)

try:
    lemming.sync.kaboom()
except Exception:
    pass # We should hit that

# This should trigger the exception but produce an exception log entry.
lemming.oneway.kaboom()

Regular function calls

DeferredCallHandler objects don’t prevent direct function calls. Use at your own risk:

from async import DeferredCallHandler
class Manager(DeferredCallHandler):
    def manage(self):
        # do things with resources
        # with the assurance that the resources won't
        # be modified during process

        self.process() # process pending calls

        # do more things

    def access_resources(self):
        #returns the resources the manager has properly managed.

    def update_resource(self, data):
        #updates a resource info

    def run(self):
        while True:
            self.manage()

manager = Manager()
gevent.spawn(manager.run)

resources = manager.access_resources()
# !!! The resources may be in the middle of a management process and their state
# may be incoherent

resources = manager.sync.access_resources()
# In that case, we're guaranteed the management process is not running.

Timeouts

sync calls can be specified with an optional timeout, to ensure actions are performed within a given time frame:

from async import DeferredCallHandler
class ABitSlow(DeferredCallHandler):
    def taking_my_time(self):
        gevent.sleep(10)

slow = ABitSlow()

spawn(slow.process, forever=True)

try:
    slow.sync(timeout=1).taking_my_time()
except gevent.Timeout:
    pass # We should hit that

multitask state handling

Partially inspired by the mechanism of tail recursion, we provide a way to contain and handle code to manage the behaviour of state machines within greenlets.

The @state decorator transforms a function method into a state greenlet. When another state function is invoked, it create a new state greenlet that replaces the current state greenlet, effectively replicating the behaviour of tail recursion.

For instance:

@state(transitions_to="growing")
def sprouting()
    # germination process here
    growing() # the sprouting greenlet terminates and leaves way to the growing one

@state(transitions_to="flowering")
def growing()
    # transform CO2 and sunlight to biomass
    flowering() # the growing greenlet terminates and leaves way to the flowering one

@state(transitions_to=["dead", "withering"])
def flowering()
    # Grow flowers
    if is_eaten:
        # parameters can be given to state changes.
        dead(is_eaten=True) # the flowering greenlet terminates and leaves way to the dead one
    else:
        withering() # the flowering greenlet terminates and leaves way to the withering one

@state(transitions_to="dead")
def withering()
    # Dry up
    dead() # the withering greenlet terminates and leaves way to the dead one

@state # terminal state, no transitions
def dead(is_eaten=False)
    if not is_eaten:
        # clean up phase


sprouting() # spawns the initial state

The @state decorator can also be used for methods:

class Flower(object):
    @state(transitions_to="growing")
    def sprouting(self)
        # germination process here
        growing() # the sprouting greenlet terminates and leaves way to the growing one

    # ...

Correct transitions must be specified by the transitions_to parameter or any incorrect transition will raise the ValidationError exception.

Callbacks

Callbacks can be defined on transition. By setting the on_start parameter to a state, a given callback will be activated whenever a state is started.

The expected callback signature is def on_start(state, *args, **kwargs), where state is the (at that point, still not started) async.state.State state greenlet which will handle the execution of the state and *args and **kwargs are the parameters given to the state call.

For instance:

def on_transition(new_state, target, *args, **kwargs):
    if "store" in kwargs and kwargs["store"]:
        target.state = new_state

class Object(object):
    def __init__(self):
        self.state = None

    @state(on_start=on_transition)
    def a_state(self, store=False):
        pass

obj = Object()
obj.a_state(store=True)
sleep()

obj.state # => is now storing the current state object.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

gevent_async-0.8.5.tar.gz (7.4 kB view hashes)

Uploaded source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page