Create a pipeline with Outflow, part 4: Parallelize your workflows

Now is time to dive into the most interesting feature of outflow: parallelization of workflows on multiple cores, and distribution on a slurm computing cluster.

Note

You can follow this section until the last part if you do not have access to slurm. You can also check developers documentation to start a docker to simulate a slurm cluster.

The MapTask

We will modify our workflow so that GenData return a list of integer, and using the MapTask we will execute multiple times the task Compute, once on each element of the list.

There are many types of parallel workflows, and Outflow currently supports map-reduce and branching. In this tutorial, we will use the map-reduce construction to create the following parallel workflow:

../../_images/tutorial04_diagram_1.png

Schema of the parallel workflow

We will configure and add an instance of outflow.library.tasks.MapTask to our workflow. We will need to edit our task before and after the MapTask to conform to the default generator and reduce_func. You can write your own if you want to customize this step but for now the default behaviour is enough. (here link to future documentation about subclassing MapTask)

Note

The class outflow.library.tasks.MapTask is a generic class that will instantiate the right MapTask depending on the backend. This means you can execute your workflow containing a MapTask with any supported backend (default, multiprocessing, or slurm backends)

Edit the tasks

First, let’s modify our task GenData so it returns a list instead of one integer:

@Task.as_task
def GenData() -> {"some_data_array": list}:  # <-- edit the return annotation
    some_data_array = [42, 43, 44, 45]
    return {"some_data_array": some_data_array}

We want outflow to split the list some_data_array and pass each element to the parameter some_data. The default generator will read the type annotations of your task to determine which target to map. In our case, edit the annotation of the parameter some_data from : int to : IterateOn("some_data_array", int).

from outflow.core.types import IterateOn

@Task.as_task  #       ↓ add this custom type
def Compute(some_data: IterateOn("some_data_array", int))\
        -> {"computation_result": int}:
    # get the session of the default database
    session = context.session

    multiplier = context.args.multiplier

    ...

Using custom types is useful because you could have unmapped variables returned by the previous task and given directly to the first task of the mapped workflow. In this case you do not have to do anything because inputs are unmapped by default.

Then, edit the task PrintData to account for the name of the output of the MapTask (the default name is just “map_output” but we will change it to “map_computation_result”), as well as the change in type (MapTask returns a list):

@Task.as_task
def PrintData(map_computation_result: list):
    logger.info(f"Result of the mapped computation: {map_computation_result}")

Add MapTask to the command workflow

To write a map with outflow, we will use an instance of outflow.library.tasks.MapTask. Outflow already bundles a number of common tasks that you don’t need to write, MapTask is one of them. Like any other task, we will import it in our command and pipe it to others tasks.

Note

Unlike the tasks we are writing inside this tutorial, MapTask was not created by decorating a function, but by subclassing the Task class. This is why MapTask takes parameters at instantiation in the setup_tasks of our commands. You too can subclass Task to create tasks with customized behaviour.

Import MapTask in commands.py and add it to the workflow:

from outflow.library.tasks import MapTask


@RootCommand.subcommand()
class ComputeData(Command):
    ...


    def setup_tasks(self):
        # instantiate tasks
        gen_data = GenData()
        print_data = PrintData()

        # create and parametrize a MapTask
        with MapTask(output_name="map_computation_result") as mapped_computation:
            compute = Compute()


        # replace compute with mapped_computation in the workflow declaration
        gen_data >> mapped_computation >> print_data

        # return the terminating task(s)
        return print_data

As you can see, MapTask is a context manager. The context it creates is actually a subworkflow. Any tasks instantiated inside the with will belong to this subworkflow.

Let’s try this new command just like before:

$ python manage.py compute_data --multiplier 3

You should see that outflow is running 4 instance of our task Compute sequentially:

tuto.data_reduction.tasks - tasks.py:51 - INFO - Result not found in database, computing result and inserting
tuto.data_reduction.tasks - tasks.py:51 - INFO - Result not found in database, computing result and inserting
tuto.data_reduction.tasks - tasks.py:51 - INFO - Result not found in database, computing result and inserting
tuto.data_reduction.tasks - tasks.py:51 - INFO - Result not found in database, computing result and inserting
tuto.data_reduction.tasks - tasks.py:27 - INFO - Result of the mapped computation: [[{'computation_result': 126}], [{'computation_result': 129}], [{'computation_result': 132}], [{'computation_result': 135}]]

The parallel backend

With this configuration, Outflow executes each iteration of the MapTask sequentially. This requires no special configuration and this can be useful for simple pipelines.

If you have bigger datasets, you can run these workflows in parallel on your local machine, using the parallel backend.

Let’s edit our GenData task to generate more data and see that it computes linearly faster with the number of available cpus:

@Task.as_task
def GenData() -> {"some_data_array": list}:
    some_data_array = [i for i in range(6)]
    return some_data_array

Call our usual command but with a new multiplier:

$ python manage.py compute_data --multiplier 10 --backend parallel

This commands runs 6 workflows in parallel, each lasting 4 seconds. Depending on the number of cpus on your machine, this workflow will not take the same time to execute, but in any case it should be faster than with the sequential backend.

Distribute workflows on a slurm cluster

Note

A sqlite database will most likely not keep up with all the access from the different nodes running outflow tasks. It is strongly recommended to connect to a PostgreSQL database before continuing. See section database for how to connect to a postgres database.

If you have even bigger datasets, and access to a slurm computing cluster, Outflow can run its workflow using the slurm backend.

If you have access to a slurm cluster, simply run the command with the argument --backend slurm.

Allocate multiple cpus for multiprocessing computations for each workflow

Let’s say that we can not only parallelize our pipeline on data but each computation can itself be shared between multiple cpu. We can simulate this by dividing the time.sleep() in our Compute task by the number of CPUs available:

# in task Compute
...
except NoResultFound:
    # Result not in the database: compute the value like before
    logger.info("Result not found in database, computing result and inserting")

    # add this line to access number of CPUs available
    from multiprocessing import cpu_count()

    # simulate a multiprocess computation
    computation_time = 5/cpu_count()
    logger.info(f"{cpu_count()} CPUs available, computation will last {computation_time} seconds")
    time.sleep(computation_time)

    computation_result = some_data * multiplier

Then, edit MapTask parameter to allow multiple CPUs per mapped workflows:

# in setup_task() of ComputeData command

           # ↓ add this parameter
with MapTask(cpus_per_task=5, output_name="computation_result") as map_task:
    Compute()

You are now ready to run the usual command (with another multiplier again) and see that it will execute faster than the time it takes to say “Outflow is awesome”.

$ python manage.py compute_data --multiplier 15

When using slurm, you can specify sbatch directives directly to the MapTask arguments. See maptask for details on MapTask usage.

Congratulations, you arrived at the end of the Outflow tutorial! You should now know enough about the framework to create your own pipeline.