Create a pipeline with Outflow, part 4: Parallelize your workflows

Now is time to dive into the most interesting feature of outflow: parallelization of workflows on multiple cores, and distribution on a slurm computing cluster.

Note

You can follow this section until the last part if you do not have access to slurm. You can also check developers documentation to start a docker to simulate a slurm cluster.

The MapWorkflow

We will modify our workflow so that GenData return a list of integer, and using the MapWorkflow we will execute multiple times compute_workflow, once on each element of the list.

There are many types of parallel workflows, and Outflow currently supports map-reduce and branching. In this tutorial, we will use the map-reduce construction to create the following parallel workflow:

TODO update diagram

home/kalterkrieg/Documents/lam/concerto/code/outflow/docs/sections/images/tutorial04_diagram_1.png

Schema of the parallel workflow

We will configure and add an instance of outflow.library.tasks.MapWorkflow to our command.

Note

The class outflow.library.tasks.MapWorkflow is a generic class that will instantiate the right MapWorkflow depending on the backend. This means you can execute your workflow containing a MapWorkflow with any supported backend (default, multiprocessing, or slurm backends).

Edit the tasks

First, let’s modify our task GenData so it returns a list instead of one integer:

@as_task
def GenData() -> {"raw_data_array": list}:  # <-- edit the return annotation
    raw_data_array = [42, 43, 44, 45]
    return {"raw_data_array": raw_data_array}

Add MapWorkflow to the command workflow

To write a map with outflow, we will use an instance of outflow.library.workflow.MapWorkflow. Outflow comes with a library of common tasks and workflows that you don’t need to write, MapWorkflow is one of them.

Import MapWorkflow in commands.py and make ComputeWorkflow a MapWorkflow :

from outflow.library.tasks import MapWorkflow

# Change the decorator and disable cache for now
@MapWorkflow.as_workflow(cache=False)
def process_data():
    Transform() | Compute()

We will reactivate the cache at the end of this section.

Let’s try this new command just like before:

$ python manage.py compute_data

You should see that outflow is running 4 instance of our task Compute sequentially:

tuto.data_reduction.tasks - tasks.py:51 - INFO - Result not found in database, computing result and inserting
tuto.data_reduction.tasks - tasks.py:51 - INFO - Result not found in database, computing result and inserting
tuto.data_reduction.tasks - tasks.py:51 - INFO - Result not found in database, computing result and inserting
tuto.data_reduction.tasks - tasks.py:51 - INFO - Result not found in database, computing result and inserting
tuto.data_reduction.tasks - tasks.py:27 - INFO - Result of the mapped computation: [[{'computation_result': 126}], [{'computation_result': 129}], [{'computation_result': 132}], [{'computation_result': 135}]]

Since GenData has only one output and Transform has only one output, MapWorkflow knows it has to split the output of GenData and each item is the input of each executed workflow. If GenData had more than one output, or Transform more than one input, we would have to specify how to split the inputs of the MapWorkflow using the map_on argument of the decorator. It takes a dictionnary, key is the name of the input sequence, and the value is the name of the target input of the first task of the workflow that will receive each item of the sequence.

The parallel backend

With this configuration, Outflow executes each iteration of the MapWorkflow sequentially. This requires no special configuration and this can be useful for simple pipelines.

If you have bigger datasets, you can run these workflows in parallel on your local machine, using the parallel backend.

Let’s edit our GenData task to generate more data and see that it computes linearly faster with the number of available cpus:

@as_task
def GenData() -> {"raw_data_array": list}:
    raw_data_array = [i for i in range(6)]
    return raw_data_array

Call our usual command:

$ python manage.py compute_data --backend parallel

This commands runs 6 workflows in parallel, each lasting 4 seconds. Depending on the number of cpus on your machine, this workflow will not take the same time to execute, but in any case it should be faster than with the sequential backend.

Distribute workflows on a slurm cluster

Note

An sqlite database cannot keep up with all the access from the different nodes running outflow tasks. It is strongly recommended to connect to a PostgreSQL database before continuing. See section database for how to connect to a postgres database.

If you have even bigger datasets, and access to a slurm computing cluster, Outflow can run its workflow using the slurm backend.

If you have access to a slurm cluster, simply run the command with the argument --backend slurm.

Allocate multiple cpus for multiprocessing computations for each workflow

Let’s say that we can not only parallelize our pipeline on data but each computation can itself be shared between multiple cpu. We can simulate this by dividing the time.sleep() in our Compute task by the number of CPUs available:

# in task Compute
...
except NoResultFound:
    # Result not in the database: compute the value like before
    logger.info("Result not found in database, computing result and inserting")

    # add this line to access number of CPUs available
    from multiprocessing import cpu_count()

    # simulate a multiprocess computation
    computation_time = 5/cpu_count()
    logger.info(f"{cpu_count()} CPUs available, computation will last {computation_time} seconds")
    time.sleep(computation_time)

    computation_result = raw_data * multiplier

Then, edit MapWorkflow parameter to allow multiple CPUs per mapped workflows:

# in setup_task() of ComputeData command

           # ↓ add this parameter
@MapWorkflow.as_workflow(cpus_per_task=5, output_name="computation_result") as map_task:
def
    Compute()

You are now ready to run the usual command (with another multiplier again) and see that it will execute faster than the time it takes to say “Outflow is awesome”.

$ python manage.py compute_data --multiplier 15

When using slurm, you can specify sbatch directives directly to the MapWorkflow arguments. See mapworkflow for details on MapWorkflow usage.

Congratulations, you arrived at the end of the Outflow tutorial! You should now know enough about the framework to create your own pipeline.