Skip to main content

A Metaflow plugin to execute flows on Argo Workflows

Project description

Metaflow library for SAP AI Core

The sap-ai-core-metaflow package is a Metaflow plugin that generates Argo Workflows.

It adds Argo Workflows capabilities using Metaflow's @kubernetes decorator.

Table of Contents

Plugin Installation

pip install sap-ai-core-metaflow

Install and configure Argo Workflows

With the Metaflow-Argo plugin you can execute flows in a K8S cluster.

Note: you can also install a local K8S cluster using docker desktop (Mac, Windows) or K3D, minikube (Linux).

Configure Metaflow

For a quick-start follow the steps below.
(Find complete documentation here: Configuring Metaflow.)

The Metaflow config File

An S3 bucket can be configured to store training data and Metaflow code packages.

  • Create the config file: ~/.metaflowconfig/config.json containing your S3 bucket and prefix:
{
    "METAFLOW_DEFAULT_DATASTORE": "s3",
    "METAFLOW_DATASTORE_SYSROOT_S3": "s3://<bucket>/<prefix>"
}

In addition, your local aws cli has to be configured for the same bucket.

Create the K8S Secret

This secret allows an Argo Workflow to access the S3 bucket when running in the cluster:

    kubectl create secret generic default-object-store-secret --from-literal=AWS_ACCESS_KEY_ID=XXX --from-literal=AWS_SECRET_ACCESS_KEY=YYY

Run the Argo WorkflowTemplate

Here we use helloworld.py from the Metaflow Tutorials.

Deploy the Argo WorkflowTemplate

    python helloworld.py --with kubernetes:secrets=default-object-store-secret argo create

You should see the following output:

    Metaflow 2.5.0 executing HelloFlow for user:<xyz>
    Validating your flow...
        The graph looks good!
    Running pylint...
        Pylint is happy!
    Deploying helloflow to Argo Workflow Templates...
    WorkflowTemplate helloflow is pushed to Argo Workflows successfully.

Trigger the Execution of the WorkflowTemplate

    python helloworld.py argo trigger

You should see a run-id after a successful start of a workflow:

    Metaflow 2.5.0 executing HelloFlow for user:<xyz>
    Validating your flow...
        The graph looks good!
    Running pylint...
        Pylint is happy!
    Workflow helloflow is triggered on Argo Workflows (run-id helloflow-92vbn).

Check the Execution Status

    python helloworld.py argo list-runs

The flow will display the state Running for some time and finally the flow shows Succeeded:

    Metaflow 2.5.0 executing HelloFlow for user:<xyz>
    Validating your flow...
        The graph looks good!
    Running pylint...
        Pylint is happy!
    helloflow-92vbn startedAt:'2022-06-15T14:00:27Z' Succeeded

Metaflow-Argo Decorators

In addition to general Metaflow decorators, i.e. @kubernetes, @resources, @environment, etc. the Metaflow-Argo plugin supports additional Argo-specific customization for steps which support input_artifacts ,output_artifacts, labels and shared_memory.

The @argo Decorator on Step Level

A Metaflow step can override the default values specified on the flow level or add extra configurations.

Labels

Labels are attached as metadata to the pod executing a step:

...

    @argo(labels = {'plan': 'basic'})
    @step
    def training():
        ...

The step labels override the flow labels.

Input Artifacts

The artifacts configuration is inserted into the generated WorkflowTemplate as-is.

@argo(input_artifacts=[{
        'name': 'input_dataset',
        'path': '/tmp/train.csv',
        's3': {
            'endpoint': 's3.amazonaws.com',
            'bucket': 'my-bucket-name',
            'key': 'path/in/bucket',
            'accessKeySecret': {
                'name': 'my-s3-credentials',
                'key': 'accessKey'
            },
            'secretKeySecret': {
                'name': 'my-s3-credentials',
                'key': 'secretKey'
            }
        }}])
@step
def start(self):
    ds = pd.read_csv('/tmp/train.csv')

If the Default Artifact Repository is configured, the credentials can be ommited:

@argo(input_artifacts=[{
        'name': 'input_dataset',
        'path': '/tmp/spam.csv',
        's3': {
            'key': 'path/in/bucket'
          }
        }])

Output Artifacts

Prerequisite: The Default Artifact Repository is configured

If the configuration below is added to a step, Argo will upload the output artifact to the specified S3 bucket.

@argo(output_artifacts=[{
    'name': 'model',
    'globalName': 'model',
    'path': '/tmp/model',
    'archive': {
        'none': {}
    }}])
@step
def train_model(self):
    ...
    with open('/tmp/model', 'wb') as f:
        pickle.dump(self.model, f)

Configure additional Disk Space using Persistent Volume Claim (PVC)

If the pod's disk space is not sufficient, one can mount extra storage using a Persistent Volume Claim.

The example below creates a pvc in WorkflowTemplate with a default size of 50GiB:

@argo(mount_pvc="/mnt/data")
@step
def start(self):
    # use /mnt/data for huge files

To set the size for the PVC, add a value for the command-line parameter volumen-claim in MiB:

    python helloflow.py argo create --volume-claim=100000

Shared Memory Allocation

For adding more shared memory in MiB use the following configuration:

@argo(shared_memory=100)
@step
def train_model(self):
    ...

Command-line Parameters and Environment Variables

The Metaflow-Argo plugin allows overriding default flow values. This done through environment variables or command-line parameters. Command-line parameters naturally have precedence over envvars.

Command-line Parameters

List Command-line Parameters

Usage: helloworld.py argo create [OPTIONS]

  Deploy a new version of this workflow to Argo Workflow Templates.

Options:
  --image-pull-secret TEXT    Docker image pull secret.
  --label TEXT                Label to attach to the workflow template.
  --annotation TEXT           Annotation to attach to the workflow template.
  --volume-claim INTEGER      Size of the volumeClaim in MiB
  --k8s-namespace TEXT        Deploy into the specified kubernetes namespace.
  --embedded                  Don't download code package into step
                              containers.Docker images should have a flow and
                              all dependencies embedded.

  --max-workers INTEGER       Maximum number of parallel pods.  [default: 100]
  --workflow-timeout INTEGER  Workflow timeout in seconds.
  --only-json                 Only print out JSON sent to Argo. Do not deploy
                              anything.

  --help                      Show this message and exit.    

Set the Flow Name

The Argo WorkflowTemplate name is taken by default from class name of the the Metaflow script. If you want to change the flow name, use:

    python helloworld.py argo --name=hello create

@kubernetes Decorator Command Line Parameter

The Metaflow command-line parameter --with=kubernetes allows for setting configurations when using a kubernetes cluster:

  • secrets to access object store, multiple secrets can be passed separated by ,
    python helloworld.py --with=kubernetes:secrets=default-object-store-secret,s3-secret argo create
  • image to specify a docker image
    python helloworld.py --with=kubernetes:secrets=default-object-store-secret,image=demisto/sklearn:1.0.0.29944 argo create
  • disk is used to reserve ephemeral storage in MiB for the node.
    python helloworld.py --with=kubernetes:secrets=default-object-store-secret,disk=1024 argo create

Embedded code package

The flag --embedded generates steps' commands without a code package download; expects a code package to be already embedded into a docker image.

Environment Variables

Prefix for the Flow Name

To add a prefix to the flow name set the environment variable ARGO_WORKFLOW_PREFIX.

Default K8S Namespace

The kubernetes namespace to store templates and execute workflows can be set with envvar METAFLOW_KUBERNETES_NAMESPACE.

Default Environment Variables for Command-line Parameters

Metaflow provides envvars for the correspondent command-line parameters by setting this prefix: METAFLOW_ARGO_CREATE_ e.g.:

    export METAFLOW_ARGO_CREATE_MAX_WORKERS=4

Other Supported Features

The Metaflow-Argo plugin supports also many Metaflow features.

@kubernetes Decorator

The Metaflow-Argo plugin makes use of the specifications defined in @kubernetes decorator to configure container resources.

    @kubernetes(image='tensorflow/tensorflow:2.6.0', cpu=8, memory=60000)
    @step
    def training():
        ...

@resources Decorator

Full support for Requesting resources with @resources decorator. If similar resources are specified in both @kubernetes and @resources, the maximum is taken.

    @kubernetes(image='tensorflow/tensorflow:2.4.2-gpu',
                node_selector='gpu=nvidia-tesla-k80')
    @resources(memory=60000, cpu=2)
    @step
    def training():
        ...

@environment Decorator

In addition to env and envFrom envvars can be specified in the decorator:

@environment(vars={'var':os.getenv('var_1')})
@step
def start(self):
    print(os.getenv('var'))

Display Logs for a Step

Metaflow stores stdout and stderr of every step/task and provides a convenient way to access them, passing run-id and step-name:

    python 00-helloworld/helloworld.py logs helloflow-bdmmw/start

Optionally specify a task-id, useful when have many instances of the same step, e.g. when use foreach:

    python 00-helloworld/helloworld.py logs helloflow-bdmmw/hello/helloflow-bdmmw-3938807173

Resume flows

See How to debug failed flows for details.

    python 00-helloworld/helloworld.py resume --origin-run-id=helloflow-bdmmw

Schedule flows

Metaflow's @schedule decorator is fully supported. E.g. a workflow template with the @schedule(hourly=True) will automatically trigger workflow execution on every hour start.

Automatically rerun failed steps

@retry decorator helps to deal with transient failures. See Retrying Tasks with the retry Decorator.

Handle failures programmatically

What if a default behavior to stop a whole flow if a particular step fails is too limiting? With a help of @catch decorator, it's possible to handle step's failure in a next step. Catching Exceptions with the catch Decorator.

Limit the Step Execution Time

Use @timeout decorator to guard yourself from accidental resources consumption. Read Timing out with the timeout Decorator for more details.

Use with SAP AICore

SAP AI Core does not allow access to its K8s cluster or argo-server. Workflow Templates are generated in json/yaml format and synced to K8s via the registered Git repo. The Workflows can then be triggered via SAP AI API.

The mandatory information for SAP AI Core is provided through labels and annotations.

python helloworld.py --with=kubernetes:secrets=default-object-store-secret,image=docker.io/<my-repo>/helloworld argo create
  --image-pull-secret=<docker-secret>
  --label="scenarios.ai.sap.com/id:ml_usecase_name"
  --label="ai.sap.com/version:0.1.0"
  --annotation="scenarios.ai.sap.com/name:Hello-World"
  --annotation=""executables.ai.sap.com/name":"helloworld"
  --only-json > hello-world.json

Here you can find a step-by-step example for SAP AI Core.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

sap_ai_core_metaflow-1.1.14-py3-none-any.whl (23.8 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page