A job pool for distributed compute clusters inspired by python's multoprocessing.Pool.
Project description
A python package for job pools (as in multiprocessing.Pool()) which makes use of distributed compute clusters.
The pypoolparty provides a Pool() with a map() function which aims to be a drop-in-replacement for builtins’ map(), and multiprocessing.Pool()’s map(). The idea is to allow the user to always fall back to these builtin pools and map-functions in case a distributed compute cluster is not available.
This package respects the concept of ‘fair share’ what is commonly found in scientific environments, but is not common in commercial environments. Here, fair share means that compute resources are only requested when they are needed. Compute resources are not requested to just idle and wait for the user to submit jobs.
A consequence of this fair sharing is, that this package expects your jobs to randomly die in conflicts for resources with jobs submitted by other users, such as conflicts for limited disk space on temporary drives. If your jobs run into error states, they will be resubmitted until a predefined limit is reached.
Installing
pip install pypoolparty
Basic Usage
import pypoolparty as ppp
pool = ppp.slurm.Pool()
results = pool.map(sum, [[1, 2], [2, 3], [4, 5], ])
Currently, there is ppp.slurm.Pool() and ppp.sun_grid_engine.Pool().
Alternatives
When you do not share resources with other users, when you do not need to respect fair share, and when you have some administrative power you might want to use one of these:
Queue Flavors
SLURM, version 22.05.6
Sun Grid Engine (SGE), version 8.1.9
Inner Workings
map() makes a work_dir because the mapping and reducing takes place in the filesystem. You can set work_dir manually to make sure both the worker nodes and the process node can reach it.
map() serializes your tasks using pickle into separate files in work_dir/{ichunk:09d}.pkl.
map() reads all environment variables in its process.
map() creates the worker-node script in work_dir/worker_node_script.py. It contains and exports the process’ environment variables into the batch job’s context. It reads the chunk of tasks in work_dir/{ichunk:09d}.pkl, imports and runs your func(task), and finally writes the result back to work_dir/{ichunk:09d}.pkl.out.
map() submits queue jobs. The stdout and stderr of the tasks are written to work_dir/{ichunk:09d}.pkl.o and work_dir/{ichunk:09d}.pkl.e respectively. By default, shutil.which("python") is used to process the worker-node-script.
When all queue jobs are submitted, map() monitors their progress. In case a queue-job runs into an error-state, the job will be deleted and resubmitted until a maximum number of resubmissions is reached.
When no more queue jobs are running or pending, map() will reduce the results from work_dir/{ichunk:09d}.pkl.out.
In case of non-zero stderr in any task, a missing result, or on the user’s request, the work_dir will be kept for inspection. Otherwise its removed.
Environment Variables
All the user’s environment variables in the process where map() is called will be exported in the queue job’s context.
The worker-node script explicitly sets the environment variables. This package does not rely on the batch system’s ability (slurm/sge) to do so.
Wording
task is a valid input to func. The tasks are the actual payload to be processed.
iterable is an iterable (list) of tasks. It is the naming adopted from multiprocessing.Pool.map.
itask is the index of a task in iterable.
chunk is a chunk of tasks which is processed on a worker-node in serial.
ichunk is the index of a chunk. It is used to create the chunks’s filenames such as work_dir/{ichunk:09d}.pkl.
queue-job is what we submit into the queue. Each queue-job processes the tasks in a single chunk in series.
jobname or job["name"] is assigned to a queue job by our map(). It is composed of our map()’s session-id, and ichunk. E.g. "q"%Y-%m-%dT%H:%M:%S"#{ichunk:09d}"
Testing
pytest -s .
dummy queue
To test our map() we provide a dummy qsub, qstat, and qdel for the sun-grid-engine. These are individual python scripts which all act on a common state file in tests/resources/dummy_queue_state.json in order to fake the sun-grid-engine’s queue.
dummy_qsub.py only appends queue jobs to the list of pending jobs in the state-file.
dummy_qdel.py only removes queue jobs from the state-file.
dummy_qstat.py does move the queue jobs from the pending to the running list, and does trigger the actual processing of the jobs. Each time dummy_qstat.py is called it performs a single action on the state file. So it must be called multiple times to process all jobs. It can intentionally bring jobs into the error-state when this is set in the state-file.
Before running the dummy queue, its state file must be initialized:
from pypoolparty import sun_grid_engine
sun_grid_engine.testing.init_queue_state(
path="tests/resources/dummy_queue_state.json"
)
When testing our map() you set its arguments qsub_path, qdel_path, and qstat_path to point to the dummy queue.
See tests/test_full_chain_with_dummy_qsub.py.
Because of the global state file, only one instance of dummy_queue must run at a time.
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for pypoolparty-0.0.5-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 480231b8464aeba266f260acf3d74925e8e10171cee2673d7750c933fc6780af |
|
MD5 | c2fbdef3da8759ead664ee1800d040fb |
|
BLAKE2b-256 | a89705721f90e44b4d20457b814afae7cec8e3047bedba624ae82a390777c2e1 |