Skip to main content

Wrapper of threading module providing Actor interface.

Project description

Wrapper of threading module providing Actor, Future interface.

This module provides decorator to make function and method run in background thread, and thread pool class to pool worker threads. The caller thread can retrieve the return value or the unhandled exception in the background thread using the future object. The finished threads are joined by garbage collection.

Requirements

  • Python 2.7 or 3.3 or 3.4.

  • Unix or Linux platforms which support python thread module.

Setup

  • Install from pip

    $ sudo pip install thread_utils
  • Install from git.

    $ git clone https://github.com/wbcchsyn/python-thread_utils.git
    $ cd python-thread_utils
    $ sudo python setup.py install

Usage

This module defines the following functions and classes.

thread_utils.actor(daemon=True)

Decorator to create a worker thread and to invoke the callable there.

The decorated callable returns a Future object immediately and invoked callable starts to run in worker thread. If argument `daemon' is True, the worker thread will be daemonic; otherwise not. Python program exits when only daemon threads are left. i.e, the program never ends before all non daemonic threads are finished.

In the following example, function sleep_sort print positive numbers in asending order. The main thread will terminate soon, however workers display numbers after that.

"""
Print numbers in asending order using non daemonic workers.
The main thread will terminate soon and after that workers do each task.
"""

import thread_utils
import time

@thread_utils.actor(daemon=False)
def _sleep_print(n):
    time.sleep(n)
    print n

def sleep_sort(un_sorted_list):
    """
    Print positive numbers in asending order.
    """

    for i in un_sorted_list:
        _sleep_print(i)

sleep_sort([3,1,4,2]) # Numbers are displayed in asending this order.

The decorated callable returns a Future object immediately; it monitors invoked callable progress and stores the result. The foreground thread can retrieve the result of invoked callable through the future object like as follows.

import thread_utils
import time

@thread_utils.actor(daemon=True)
def add(m, n):
    time.sleep(m)
    return m + n

future = add(3, 5)
print "Task started"
print future.receive() # Blocks for 3 seconds and display "8".

See Future Objects for more information abaout it.

This decorator doesn’t affect to thread safty, so it depends only on the invoked callable whether the decorated will be thread safe or not.

thread_utils.async(daemon=True)

Alias to thread_utils.actor

thread_utils.synchronized

Decorator to restrict from simultaneous access from 2 or more than 2 threads.

Decorated callable can be accessible from only one thread. If 2 or more than 2 threads try calling at the same time, only the 1st thread starts to run and the others are blocked. It is after the 1st thread finishes when 2nd threads starts to run.

import thread_utils
import time

@thread_utils.synchronized
def foo():
    time.sleep(1)

@thread_utils.async(daemon=False)
def create_worker():
    print "Worker is started."
    foo()
    print "Worker is finished."


# Text "Worker is started." will be printed 10 times at once.
# On the other hand "Worker is finished." will be printed every second.
for i in xrange(10):
    create_worker()

Future Objects

This class monitors associated callable progress and stores its return value or unhandled exception. Future.is_finished() returns whether the invoked callable is finished or not. Future.receive(timeout=None) blocks until timeout or invoked callable is finished and returns what the callable returns or raises its unhandled exception.

If the future object is generated by thread_utils.Pool.send method, and if the invoked task is canceled before the task is started, this method raises CancelError.

The instance will be created by thread_utils.Pool.send method or callable decorated by thread_utils.async.

Future.is_finished()

Return True if invoked callable is finished. Otherwise, return False.

Future.receive(timeout=None)

Block until timeout or invoked callable is finished and returns what the callable returned or raises its unhandled exception.

When argument `timeout' is present and is not None, it shoule be int or floating number. This method raises TimeoutError if task won’t be finished before timeout.

Pool Objects

This class pools worker threads and do tasks parallel using them.

`send' method queues specified callable with the arguments and returns a Future object immediately. The returned future object monitors the invoked callable progress and stores the result.

The workers are reused for many times, so after using this object, kill method must be called to join workers except for used in with statement.

All public methods of this class are thread safe.

class thread_utils.Pool(worker_size=1, loop_count=sys.maxint, daemon=True)

All arguments are optional. Argument `worker_size' specifies the number of the worker thread. The object can do this number of tasks at the same time parallel. Each worker will invoke callable `loop_count' times. After that, the worker kill itself and a new worker is created.

If the argument `daemon' is True, the worker threads will be daemonic, or not. Python program exits when only daemon threads are left.

This constructor is thread safe.

Pool.send(func, *args, **kwargs)

Queue specified callable with the arguments and returns a Future object.

Argument `func ' is a callable object invoked by workers, and *args and **kwargs are arguments to be passed to the callable.

The returned Future object monitors the progress of invoked callable and stores the result. The result can be accessed through the Future instance.

See Future Objects for more detail abaout the return value.

This method raises DeadPoolError if called after kill method is called.

Pool.kill(force=False, block=False)

Set internal flag and make worker threads stop.

If the argument `force' is True, all queued tasks are canceled; the workers will stop after their current task is finished. In this case, tasks not started before this method is called will be left undone. If a Future instance is related to canceled task and the receive method is called, it will raise CancelError. The default value is False.

If the argument `block' is True, it blocks until all workers finished their tasks. Otherwise, it returns immediately. The default is False. If this method is called in a task with argument block is True, dead lock will occur.

If `send' or `set_worker_size' is called after this methos is called, it raises DeadPoolError.

This method can be called many times. If argument force is True, cancel undone tasks then. If argument block is True, it blocks until all workers done tasks.

If this class is used in `with' statement, this method is called when block exited with default arguments, i.e. force=False and block=False. Otherwise, this method must be called after finished using the object, or the worker threads will not end till the program ends. (Or, if the workers are daemonic, dead lock occurs and program will never ends.)

For example, the following program creates pool with worker_size = 3. so display 3 messages every seconds. The Pool will be killed soon, but the worker do all tasks to be sent.

import thread_utils
import time

def message(msg):
    time.sleep(1)
    return msg

pool = thread_utils.Pool(worker_size=3)
futures = []
for i in xrange(7):
    futures.append(pool.send(message, "Message %d." % i))
pool.kill()

# First, sleep one second and "Message 0", "Message 1", "Message 2"
# will be displayed.
# After one second, Message 3 - 5 will be displayed.
# Finally, "Message 6" will be displayed and program will exit.
for f in futures:
    print f.receive()

It is not necessary to call kill method if using with statement.

import thread_utils
import time

def message(msg):
    time.sleep(1)
    return msg

pool = thread_utils.Pool(worker_size=3)
futures = []
with thread_utils.Pool(worker_size=3) as pool:
    for i in xrange(7):
        futures.append(pool.send(message, "Message %d." % i))

for f in futures:
    print f.receive()

Pool.cancel()

Cancel all tasks in the Queue.

Cancel all tasks without killing pool. This method can be called whenever, Even after the pool is killed.

Tasks are dequeued when it is started to do and tasks being done are left unchanged. So this method can be called from task. (Of corse, it can be called from outsidde of the task, too.)

Pool.inspect()

Return tuple which indicate the instance status.

The return value is a tuple of 3 ints. The format is as follows. (worker size, tasks currently being done, queued undone tasks)

The values are only indication. Even the instance itself doesn’t know the accurate values.

Pool.set_worker_size()

Change worker size.

This method set the worker size and return soon. The workers will created soon when increasing,howeve, It could take some time when decreasing because workers can’t stop while doing a task.

This method raises DeadPoolError if called after kill method is called.

Development

Install requirements to developing and set pre-commit hook.

$ git clone https://github.com/wbcchsyn/python-thread_utils.git
$ cd python-thread_utils
$ pip install -r dev_utils/requirements.txt
$ ln -s ../../dev_utils/pre-commit .git/hooks/pre-commit

CHANGELOG

1.0.0 (2015/12/08)

  • Change the behavior when Pool.kill method called twice or more than twice.

  • Add Pool.inspect and Pool.cancel method.

  • Enable to change worker size of Pool instance after created.

  • Performance tuning.

  • Stop to support python2.6, 3.1 and 3.2.

0.1.3 (2015/03/15)

  • Bug Fix: Pool.kill method could raise Queue.Empty error.

  • Performance tuning.

0.1.2 (2014/09/14)

  • Create actor alias to async decorator.

  • Add optional arguments ‘force’ and ‘block’ to Pool.kill method.

  • Future.receive method raise DeadPoolError if the Pool is killed before task is done.

  • Update documents.

0.1.1 (2014/06/13)

  • Delete unused files.

0.1.0 (2014/06/12)

  • First release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

thread_utils-1.0.1.tar.gz (17.8 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page