Skip to main content

Map and reduce for batch-jobs in distributed computing.

Project description

TravisBuildStatus PyPIStatus BlackStyle

Queues for batch-jobs distribute your compute-tasks over multiple machines in parallel. This pool maps your tasks onto a queue and reduces the results.

import queue_map_reduce as qmr

pool = qmr.Pool()
results = pool.map(sum, [[1, 2], [2, 3], [4, 5], ])

A drop-in-replacement for builtins’ map(), and multiprocessing.Pool()’s map().

Requirements

  • Programs qsub, qstat, and qdel are required to submit, monitor, and delete queue-jobs.

  • Your func(task) must be part of an importable python module.

  • Your tasks and their results must be able to serialize using pickle.

  • Both worker-nodes and process-node can read and write from and to a common path in the file-system.

Queue flavor

Tested flavors are:

  • Sun Grid Engine (SGE) 8.1.9

Features

  • Respects fair-share, i.e. slots are only occupied when the compute is done.

  • No spawning of additional threads. Neither on the process-node, nor on the worker-nodes.

  • No need for databases or web-servers.

  • Queue-jobs with error-state 'E' can be deleted, and resubmitted until your predefined upper limit is reached.

  • Can bundle tasks on worker-nodes to avoid start-up-overhead with many small tasks.

Alternatives

When you do not share resources with other users, and when you have some administrative power you might want to use one of these:

  • Dask has a job_queue which also supports other flavors such as PBS, SLURM.

  • pyABC.sge has a sge.map() very much like our one.

  • ipyparallel

Inner workings

  • map() makes a work_dir because the mapping and reducing takes place in the file-system. You can set work_dir manually to make sure both worker-nodes and process-node can reach it.

  • map() serializes your tasks using pickle into separate files in work_dir/{ichunk:09d}.pkl.

  • map() reads all environment-variables in its process.

  • map() creates the worker-node-script in work_dir/worker_node_script.py. It contains and exports the process’ environment-variables into the batch-job’s context. It reads the chunk of tasks in work_dir/{ichunk:09d}.pkl, imports and runs your func(task), and finally writes the result back to work_dir/{ichunk:09d}.pkl.out.

  • map() submits queue-jobs. The stdout and stderr of the tasks are written to work_dir/{ichunk:09d}.pkl.o and work_dir/{ichunk:09d}.pkl.e respectively. By default, shutil.which("python") is used to process the worker-node-script.

  • When all queue-jobs are submitted, map() monitors their progress. In case a queue-job runs into an error-state ('E' by default) the job wFill be deleted and resubmitted until a maximum number of resubmissions is reached.

  • When no more queue-jobs are running or pending, map() will reduce the results from work_dir/{ichunk:09d}.pkl.out.

  • In case of non zero stderr in any task, a missing result, or on the user’s request, the work_dir will be kept for inspection. Otherwise its removed.

Wording

  • task is a valid input to func. The tasks are the actual payload to be processed.

  • iterable is an iterable (list) of tasks. It is the naming adopted from multiprocessing.Pool.map.

  • itask is the index of a task in iterable.

  • chunk is a chunk of tasks which is processed on a worker-node in serial.

  • ichunk is the index of a chunk. It is used to create the chunks’s filenames such as work_dir/{ichunk:09d}.pkl.

  • queue-job is what we submitt into the queue. Each queue-job processes the tasks in a single chunk in series.

  • JB_job_number is assigned to a queue-job by the queue-system for its own book-keeping.

  • JB_name is assigned to a queue-job by our map(). It is composed of our map()’s session-id, and ichunk. E.g. "q"%Y-%m-%dT%H:%M:%S"#{ichunk:09d}"

Environment Variables

All the user’s environment-variables in the process where map() is called will be exported in the queue-job’s context.

The worker-node-script sets the environment-variables. We do not use qsub’s argument -V because on some clusters this will not set all variables. Apparently some administrators fear security issues when using qsub -V to set LD_LIBRARY_PATH.

Testing

py.test -s .

dummy queue

To test our map() we provide a dummy qsub, qstat, and qdel. These are individual python-scripts which all act on a common state-file in tests/resources/dummy_queue_state.json in order to fake the sun-grid-engine’s queue.

  • dummy_qsub.py only appends queue-jobs to the list of pending jobs in the state-file.

  • dummy_qdel.py only removes queue-jobs from the state-file.

  • dummy_qstat.py does move the queue-jobs from the pending to the running list, and does trigger the actual processing of the jobs. Each time dummy_qstat.py is called it performs a single action on the state-file. So it must be called multiple times to process all jobs. It can intentionally bring jobs into the error-state when this is set in the state-file.

Before running the dummy-queue, its state-file must be initialized:

from queue_map_reduce import dummy_queue

dummy_queue.init_queue_state(
    path="tests/resources/dummy_queue_state.json"
)

When testing our map() you set its arguments qsub_path, qdel_path, and qstat_path to point to the dummy-queue.

See tests/test_full_chain_with_dummy_qsub.py.

Because of the global state-file, only one instance of dummy_queue must run at a time.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

Built Distribution

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page