Plugboard is an event driven modelling and orchestration framework for simulating and driving complex processes with many interconnected stateful components.
Project description
Plugboard is an event-driven modelling and orchestration framework in Python for simulating and driving complex processes with many interconnected stateful components.
You can use it to define models in Python and connect them together easily so that data automatically moves between them. After running your model on a laptop, you can then scale out on multiple processors, or go to a compute cluster in the cloud.
Some examples of what you can build with Plugboard include:
- Digital twin models of complex processes:
- It can easily handle common problems in industrial process simulation like material recirculation;
- Models can be composed from different underlying components, e.g. physics-based simulations, machine-learning, AI models;
- AI integrations:
- You can feed data to/from different LLMs using Plugboard components;
- Easily reconfigure and swap model providers for optimal performance.
🖋️ Key Features
- Reusable classes containing the core framework, which you can extend to define your own model logic;
- Support for different simulation paradigms: discrete time and event based.
- YAML model specification format for saving model definitions, allowing you to run the same model locally or in cloud infrastructure;
- A command line interface for executing models;
- Built to handle the data intensive simulation requirements of industrial process applications;
- Modern implementation with Python 3.12 and above based around asyncio with complete type annotation coverage;
- Built-in integrations for loading/saving data from cloud storage and SQL databases.
🔌 Installation
Plugboard requires Python >= 3.12. Install the package with pip inside a virtual env as below.
python -m pip install plugboard
Optional integrations for different cloud providers can be installed using plugboard[aws]
, plugboard[azure]
or plugboard[gcp]
.
Support for parallelisation can be installed using plugboard[ray]
.
🚀 Usage
Plugboard is built to help you with two things: defining process models, and executing those models. There are two main ways to interact with plugboard: via the Python API; or, via the CLI using model definitions saved in yaml or json format.
Building models with the Python API
A model is made up of one or more components, though Plugboard really shines when you have many! First we start by defining the Component
s within our model. Components can have only inputs, only outputs, or both. To keep it simple we just have two components here, showing the most basic functionality. Each component has several methods which are called at different stages during model execution: init
for optional initialisation actions; step
to take a single step forward through time; run
to execute all steps; and destroy
for optional teardown actions.
import typing as _t
from plugboard.component import Component, IOController as IO
from plugboard.schemas import ComponentArgsDict
class A(Component):
io = IO(outputs=["out_1"])
def __init__(self, iters: int, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self._iters = iters
async def init(self) -> None:
self._seq = iter(range(self._iters))
async def step(self) -> None:
try:
self.out_1 = next(self._seq)
except StopIteration:
await self.io.close()
class B(Component):
io = IO(inputs=["in_1"])
def __init__(self, path: str, **kwargs: _t.Unpack[ComponentArgsDict]) -> None:
super().__init__(**kwargs)
self._path = path
async def init(self) -> None:
self._f = open(self._path, "w")
async def step(self) -> None:
out = 2 * self.in_1
self._f.write(f"{out}\n")
async def destroy(self) -> None:
self._f.close()
Now we take these components, connect them up as a Process
, and fire off the model. Using the Process
context handler takes care of calling init
at the beginning and destroy
at the end for all Component
s. Calling Process.run
triggers all the components to start iterating through all their inputs until a termination condition is reached. Simulations proceed in an event-driven manner: when inputs arrive, the components are triggered to step forward in time. The framework handles the details of the inter-component communication, you just need to specify the logic of your components, and the connections between them.
from plugboard.connector import AsyncioConnector
from plugboard.process import LocalProcess
from plugboard.schemas import ConnectorSpec
process = LocalProcess(
components=[A(name="a", iters=5), B(name="b", path="b.txt")],
connectors=[
AsyncioConnector(
spec=ConnectorSpec(source="a.out_1", target="b.in_1"),
)
],
)
async with process:
await process.run()
Visually, we've created the model below, with Plugboard automatically handling the flow of data between the two components.
graph LR;
A(Component A)-->|data|B(Component B);
Executing pre-defined models on the CLI
In many cases, we want to define components once, with suitable parameters, and then use them repeatedly in different simulations. Plugboard enables this workflow with model specification files in yaml or json format. Once the components have been defined, the simple model above can be represented with a yaml file like so.
# my-model.yaml
plugboard:
process:
args:
components:
- type: hello_world.A
args:
name: "a"
iters: 10
- type: hello_world.B
args:
name: "b"
path: "./b.txt"
connectors:
- source: "a.out_1"
target: "b.in_1"
We can now run this model using the plugboard CLI with the command:
plugboard process run my-model.yaml
📖 Documentation
For more information including a detailed API reference and step-by-step usage examples, refer to the documentation site. We recommend diving into the tutorials for a step-by-step to getting started.
🐾 Roadmap
Plugboard is under active development, with new features in the works:
- Detailed logging of component inputs, outputs and state for monitoring and process mining or surrogate modelling use-cases.
- Support for strongly typed data messages and validation based on pydantic.
- Support for different parallelisation patterns such as: single-threaded with coroutines, single-host multi process, or distributed with Ray in Kubernetes.
- Data exchange between components with popular messaging technologies like RabbitMQ and Google Pub/Sub.
- Support for different message exchange patterns such as: one-to-one, one-to-many, many-to-one etc via a broker; or peer-to-peer with http requests.
👋 Contributions
Contributions are welcomed and warmly received! For bug fixes and smaller feature requests feel free to open an issue on this repo. For any larger changes please get in touch with us to discuss first. More information for developers can be found in the contributing section of the docs.
⚖️ Licence
Plugboard is offered under the Apache 2.0 Licence so it's free for personal or commercial use within those terms.