Skip to main content

A job pool for distributed compute clusters inspired by python's multoprocessing.Pool.

Project description

TestStatus PyPiStatus BlackStyle BlackPackStyle MITLicenseBadge

A python package for job pools (as in multiprocessing.Pool()) which makes use of distributed compute clusters.

The pypoolparty provides a Pool() with a map() function which aims to be a drop-in-replacement for builtinsmap(), and multiprocessing.Pool()’s map(). The idea is to allow the user to always fall back to these builtin pools and map-functions in case a distributed compute cluster is not available.

This package respects the concept of ‘fair share’ what is commonly found in scientific environments, but is not common in commercial environments. Here, fair share means that compute resources are only requested when they are needed. Compute resources are not requested to just idle and wait for the user to submit jobs.

A consequence of this fair sharing is, that this package expects your jobs to randomly die in conflicts for resources with jobs submitted by other users, such as conflicts for limited disk space on temporary drives. If your jobs run into error states, they will be resubmitted until a predefined limit is reached.

Installing

pip install pypoolparty

Basic Usage

import pypoolparty as ppp

pool = ppp.slurm.Pool()
results = pool.map(sum, [[1, 2], [2, 3], [4, 5], ])

Currently, there is ppp.slurm.Pool() and ppp.sun_grid_engine.Pool().

Alternatives

When you do not share resources with other users, when you do not need to respect fair share, and when you have some administrative power you might want to use one of these:

  • Dask has a job_queue which also supports other flavors such as PBS, SLURM.

  • pyABC.sge has a pool.map() very much like the one in this package.

  • ipyparallel

Queue Flavors

  • SLURM, version 22.05.6

  • Sun Grid Engine (SGE), version 8.1.9

Inner Workings

  • map() makes a work_dir because the mapping and reducing takes place in the filesystem. You can set work_dir manually to make sure both the worker nodes and the process node can reach it.

  • map() serializes your tasks using pickle into separate files in work_dir/{ichunk:09d}.pkl.

  • map() reads all environment variables in its process.

  • map() creates the worker-node script in work_dir/worker_node_script.py. It contains and exports the process’ environment variables into the batch job’s context. It reads the chunk of tasks in work_dir/{ichunk:09d}.pkl, imports and runs your func(task), and finally writes the result back to work_dir/{ichunk:09d}.pkl.out.

  • map() submits queue jobs. The stdout and stderr of the tasks are written to work_dir/{ichunk:09d}.pkl.o and work_dir/{ichunk:09d}.pkl.e respectively. By default, shutil.which("python") is used to process the worker-node-script.

  • When all queue jobs are submitted, map() monitors their progress. In case a queue-job runs into an error-state, the job will be deleted and resubmitted until a maximum number of resubmissions is reached.

  • When no more queue jobs are running or pending, map() will reduce the results from work_dir/{ichunk:09d}.pkl.out.

  • In case of non-zero stderr in any task, a missing result, or on the user’s request, the work_dir will be kept for inspection. Otherwise its removed.

Environment Variables

All the user’s environment variables in the process where map() is called will be exported in the queue job’s context.

The worker-node script explicitly sets the environment variables. This package does not rely on the batch system’s ability (slurm/sge) to do so.

Wording

  • task is a valid input to func. The tasks are the actual payload to be processed.

  • iterable is an iterable (list) of tasks. It is the naming adopted from multiprocessing.Pool.map.

  • itask is the index of a task in iterable.

  • chunk is a chunk of tasks which is processed on a worker-node in serial.

  • ichunk is the index of a chunk. It is used to create the chunks’s filenames such as work_dir/{ichunk:09d}.pkl.

  • queue-job is what we submit into the queue. Each queue-job processes the tasks in a single chunk in series.

  • jobname or job["name"] is assigned to a queue job by our map(). It is composed of our map()’s session-id, and ichunk. E.g. "q"%Y-%m-%dT%H:%M:%S"#{ichunk:09d}"

Testing

pytest -s .

dummy queue

To test our map() we provide a dummy qsub, qstat, and qdel for the sun-grid-engine. These are individual python scripts which all act on a common state file in tests/resources/dummy_queue_state.json in order to fake the sun-grid-engine’s queue.

  • dummy_qsub.py only appends queue jobs to the list of pending jobs in the state-file.

  • dummy_qdel.py only removes queue jobs from the state-file.

  • dummy_qstat.py does move the queue jobs from the pending to the running list, and does trigger the actual processing of the jobs. Each time dummy_qstat.py is called it performs a single action on the state file. So it must be called multiple times to process all jobs. It can intentionally bring jobs into the error-state when this is set in the state-file.

Before running the dummy queue, its state file must be initialized:

from pypoolparty import sun_grid_engine

sun_grid_engine.testing.init_queue_state(
    path="tests/resources/dummy_queue_state.json"
)

When testing our map() you set its arguments qsub_path, qdel_path, and qstat_path to point to the dummy queue.

See tests/test_full_chain_with_dummy_qsub.py.

Because of the global state file, only one instance of dummy_queue must run at a time.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pypoolparty-0.0.5.tar.gz (30.1 kB view hashes)

Uploaded Source

Built Distribution

pypoolparty-0.0.5-py3-none-any.whl (33.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page