skip to navigation
skip to content

django-task 0.1.5

A Django app to run new background tasks from either admin or cron, and inspect task history from admin

Latest Version: 0.1.6

A Django app to run new background tasks from either admin or cron, and inspect task history from admin


  • TODO

The full documentation is at


Install Django Task:

pip install django-task

Add it to your INSTALLED_APPS:


Add Django Task’s URL patterns:

urlpatterns = [
    url(r'^django_task/', include('django_task.urls', namespace='django_task')),



  • create async tasks programmatically
  • create and monitor async tasks from admin
  • log all tasks in the database for later inspection


  1. each specific job is described my a Model derived from models.Task, which is responsible for:

    • selecting the name for the consumer queue among available queues
    • collecting and saving all parameters required by the associated job
    • running the specific job asyncronously
  2. a new job can be run either:

    • creating a Task from the Django admin
    • creating a Task from code, then calling
    • scheduling directly the executio of a job; i.e.: count_beans.delay(100)

    In the latter case, a new Task is automatically created by the job for logging purposes.

  3. job responsibilities (optional):

    • Before execution, each job will either recover or create a corresponding tasks
    • During execution, the job can notify the state and progress to the app by calling Task.set_state() and Task.set_progress()
    • See for an example


Run consumer:

python runserver

Run worker(s):

python rqworker low high default
python rqworker low high default

Howto separate jobs for different instances on the same machine

To sepatare jobs for different instances on the same machine (or more precisely for the same redis connection), override queues names for each instance;

for example:

# file ""

# RQ config

SESSION_COOKIE_NAME = 'primary_sid'

REDIS_URL = 'redis://localhost:6379/0'
RQ_PREFIX = "primary_"

        'URL': REDIS_URL,
        'DEFAULT_TIMEOUT': 360,


then run worker as follows:

python rqworker primary_default

Howto run jobs programmatically

python shell


from .jobs import count_beans


or, for finer control:

import django_rq
from .jobs import count_beans

queue = django_rq.get_queue('high')
queue.enqueue(count_beans, num_beans=1000)

Howto schedule jobs with cron

Call management command ‘count_beans’, which in turn executes the required job.

For example:


0 * * * *  {{username}}    timeout 55m {{django.pythonpath}}/python {{django.website_home}}/ count_beans 1000 >> {{django.logto}}/cron.log 2>&1

A base class TaskCommand has been provided to simplify the creation of any specific task-related management commad;

a derived management command is only responsible for:

  • defining suitable command-line parameters
  • selecting the specific Task class and job function

for example:

from django_task.task_command import TaskCommand

class Command(TaskCommand):

    def add_arguments(self, parser):
        super(Command, self).add_arguments(parser)
        parser.add_argument('num_beans', type=int)

    def handle(self, *args, **options):
        from tasks.models import CountBeansTask
        self.run_task(CountBeansTask, **options)

        # or:
        # from import count_beans
        # self.run_job(CountBeansTask, count_beans, **options)


Running Tests

  • TODO

Does the code actually work?

source <YOURVIRTUALENV>/bin/activate
(myenv) $ pip install tox
(myenv) $ tox


  • fixes for Django 1.10
  • send_email management command example added


  • Fix OneToOneRel import for Django < 1.9


  • Polymorphic behaviour or Task.get_child() restored


  • TaskCommand.run_task() renamed as TaskCommand.run_job()
  • New TaskCommand.run_task() creates a Task, then runs it; this guarantees that something is traced even when background job will fail
File Type Py Version Uploaded on Size
django_task-0.1.5-py2.py3-none-any.whl (md5) Python Wheel py2.py3 2017-10-12 25KB