Skip to main content

Async Processes and Pipelines

Project description

Async Processes and Pipelines

docs PyPI CI codecov Downloads

shellous provides a concise API for running subprocesses using asyncio. It is similar to and inspired by sh.

import asyncio
import shellous

sh = shellous.context()

async def main():
    result = await (sh("ls") | sh("grep", "README"))
    print(result)

asyncio.run(main())

Benefits

  • Run programs asychronously in a single line.
  • Easily capture output or redirect stdin, stdout and stderr to files, memory buffers or loggers.
  • Easily construct pipelines and use process substitution.
  • Easily set timeouts and reliably cancel running processes.
  • Run a program with a pseudo-terminal (pty).
  • Runs on Linux, MacOS, FreeBSD and Windows.
  • Monitor processes being started and stopped with audit_callback API.

Requirements

  • Requires Python 3.9 or later.
  • Requires an asyncio event loop.
  • Process substitution requires a Unix system with /dev/fd support.
  • Pseudo-terminals require a Unix system.

Basic Usage

Start the asyncio REPL by typing python3 -m asyncio, and import the shellous module:

>>> import shellous

Before we can do anything else, we need to create a context. Store the context in a short variable name like sh because we'll be typing it a lot.

>>> sh = shellous.context()

Now, we're ready to run our first command. Here's one that runs echo "hello, world".

>>> await sh("echo", "hello, world")
'hello, world\n'

The first argument is the program name. It is followed by zero or more separate arguments.

A command does not run until you await it. Here, we create our own echo command with "-n" to omit the newline. Note, echo("abc") is the same as echo -n "abc".

>>> echo = sh("echo", "-n")
>>> await echo("abc")
'abc'

Results and Exit Codes

If a command exits with a non-zero exit code, it raises a ResultError exception that contains the Result object. The Result object contains the exit code for the command among other details.

>>> await sh("cat", "does_not_exist")
Traceback (most recent call last):
  ...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=1, cancelled=False, encoding='utf-8', extra=None)

To always return a Result object (and not raise an error for a non-zero exit status), add the .result modifier.

>>> await echo("abc").result
Result(output_bytes=b'abc', exit_code=0, cancelled=False, encoding='utf-8', extra=None)

Redirecting Standard Input

You can change the standard input of a command by using the | operator.

>>> cmd = "abc" | sh("wc", "-c")
>>> await cmd
'       3\n'

To redirect stdin using a file's contents, use a Path object from pathlib.

>>> from pathlib import Path
>>> cmd = Path("LICENSE") | sh("wc", "-l")
>>> await cmd
'     201\n'

Redirecting Standard Output

To redirect standard output, use the | operator.

>>> output_file = Path("/tmp/output_file")
>>> cmd = sh("echo", "abc") | output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\n'

To redirect standard output with append, use the >> operator.

>>> cmd = sh("echo", "def") >> output_file
>>> await cmd
''
>>> output_file.read_bytes()
b'abc\ndef\n'

Redirecting Standard Error

By default, standard error is not captured. To redirect standard error, use the stderr method.

>>> cmd = sh("cat", "does_not_exist").stderr(shellous.STDOUT)
>>> await cmd.set(exit_codes={0,1})
'cat: does_not_exist: No such file or directory\n'

You can redirect standard error to a file or path.

To redirect standard error to the hosting program's sys.stderr, use the INHERIT redirect option.

>>> cmd = sh("cat", "does_not_exist").stderr(shellous.INHERIT)
>>> await cmd
cat: does_not_exist: No such file or directory
Traceback (most recent call last):
  ...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=1, cancelled=False, encoding='utf-8', extra=None)

Pipelines

You can create a pipeline by combining commands using the | operator.

>>> pipe = sh("ls") | sh("grep", "README")
>>> await pipe
'README.md\n'

Process Substitution (Unix Only)

You can pass a shell command as an argument to another.

>>> cmd = sh("grep", "README", sh("ls"))
>>> await cmd
'README.md\n'

Use .writable to write to a command instead.

>>> buf = bytearray()
>>> cmd = sh("ls") | sh("tee", sh("grep", "README").writable | buf) | shellous.DEVNULL
>>> await cmd
''
>>> buf
bytearray(b'README.md\n')

Async With & For

You can loop over a command's output by using the context manager as an iterator.

>>> async with pipe as run:
...   async for line in run:
...     print(line.rstrip())
... 
README.md

⚠️ You can also acquire an async iterator directly from the command or pipeline object. This is discouraged because you will have less control over the final clean up of the command invocation than with a context manager.

>>> async for line in pipe:   # Use caution!
...   print(line.rstrip())
... 
README.md

You can use async with to interact with the process streams directly. You have to be careful; you are responsible for correctly reading and writing multiple streams at the same time.

>>> async with pipe as run:
...   data = await run.stdout.readline()
...   print(data)
... 
b'README.md\n'

Timeouts

You can specify a timeout using the timeout option. If the timeout expires, shellous will raise an asyncio.TimeoutError.

>>> await sh("sleep", 60).set(timeout=0.1)
Traceback (most recent call last):
  ...
asyncio.exceptions.TimeoutError

Timeouts are just a special case of cancellation.

Cancellation

When a command is cancelled, shellous terminates the process and raises a CancelledError.

You can retrieve the partial result by setting incomplete_result to True. Shellous will return a ResultError when the specified command is cancelled (or timed out).

>>> sleep = sh("sleep", 60).set(incomplete_result=True)
>>> t = asyncio.create_task(sleep.coro())
>>> t.cancel()
True
>>> await t
Traceback (most recent call last):
  ...
shellous.result.ResultError: Result(output_bytes=b'', exit_code=-15, cancelled=True, encoding='utf-8', extra=None)

When you use incomplete_result, your code should respect the cancelled attribute in the Result object. Otherwise, your code may swallow the CancelledError.

Pseudo-Terminal Support (Unix Only)

To run a command through a pseudo-terminal, set the pty option to True. Alternatively, you can pass a function to configure the tty mode and size.

>>> ls = sh("ls").set(pty=shellous.cooked(cols=40, rows=10, echo=False))
>>> await ls("README.md", "CHANGELOG.md")
'CHANGELOG.md\tREADME.md\r\n'

Context Objects

You can specify shared command settings in a context object. Context objects are immutable, so you must store the result of your changes in a new variable.

>>> auditor = lambda phase, info: print(phase, info["runner"].name)
>>> sh1 = sh.set(audit_callback=auditor)

Now all commands run with sh1 will log their progress using the audit callback.

>>> await sh1("echo", "goodbye")
start echo
stop echo
'goodbye\n'

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

shellous-0.13.0.tar.gz (38.6 kB view hashes)

Uploaded Source

Built Distribution

shellous-0.13.0-py3-none-any.whl (39.9 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page