================================================================== Create a pipeline with Outflow, part 4: Parallelize your workflows ================================================================== Now is time to dive into the most interesting feature of outflow: parallelization of workflows on multiple cores, and distribution on a slurm computing cluster. .. note:: You can follow this section until the last part if you do not have access to slurm. You can also check developers documentation to start a docker to simulate a slurm cluster. The MapWorkflow =============== We will modify our workflow so that GenData return a list of integer, and using the MapWorkflow we will execute multiple times compute_workflow, once on each element of the list. There are many types of parallel workflows, and Outflow currently supports map-reduce and branching. In this tutorial, we will use the map-reduce construction to create the following parallel workflow: TODO update diagram .. figure:: /home/kalterkrieg/Documents/lam/concerto/code/outflow/docs/sections/images/tutorial04_diagram_1.png :align: center Schema of the parallel workflow We will configure and add an instance of outflow.library.tasks.MapWorkflow to our command. .. note:: The class outflow.library.tasks.MapWorkflow is a generic class that will instantiate the right MapWorkflow depending on the backend. This means you can execute your workflow containing a MapWorkflow with any supported backend (default, multiprocessing, or slurm backends). Edit the tasks ============== First, let's modify our task *GenData* so it returns a list instead of one integer: .. code-block:: python @as_task def GenData() -> {"raw_data_array": list}: # <-- edit the return annotation raw_data_array = [42, 43, 44, 45] return {"raw_data_array": raw_data_array} Add MapWorkflow to the command workflow =================================== To write a *map* with outflow, we will use an instance of outflow.library.workflow.MapWorkflow. Outflow comes with a library of common tasks and workflows that you don't need to write, MapWorkflow is one of them. Import *MapWorkflow* in *commands.py* and make ComputeWorkflow a MapWorkflow : .. code-block:: python from outflow.library.workflows import MapWorkflow # Change the decorator and disable cache for now @MapWorkflow.as_workflow(cache=False) def process_data(): Transform() | Compute() We will reactivate the cache at the end of this section. Let's try this new command just like before: .. code-block:: $ python manage.py compute_data You should see that outflow is running 4 instance of our task Compute sequentially: .. code-block:: tuto.data_reduction.tasks - tasks.py:51 - INFO - Result not found in database, computing result and inserting tuto.data_reduction.tasks - tasks.py:51 - INFO - Result not found in database, computing result and inserting tuto.data_reduction.tasks - tasks.py:51 - INFO - Result not found in database, computing result and inserting tuto.data_reduction.tasks - tasks.py:51 - INFO - Result not found in database, computing result and inserting tuto.data_reduction.tasks - tasks.py:27 - INFO - Result of the mapped computation: [[{'computation_result': 126}], [{'computation_result': 129}], [{'computation_result': 132}], [{'computation_result': 135}]] Since GenData has only one output and Transform has only one output, MapWorkflow knows it has to split the output of GenData and each item is the input of each executed workflow. If GenData had more than one output, or Transform more than one input, we would have to specify how to split the inputs of the MapWorkflow using the `map_on` argument of the decorator. It takes a dictionnary, key is the name of the input sequence, and the value is the name of the target input of the first task of the workflow that will receive each item of the sequence. The parallel backend ==================== With this configuration, Outflow executes each iteration of the MapWorkflow sequentially. This requires no special configuration and this can be useful for simple pipelines. If you have bigger datasets, you can run these workflows in parallel on your local machine, using the `parallel` backend. Let's edit our :code:`GenData` task to generate more data and see that it computes linearly faster with the number of available cpus: .. code-block:: python @as_task def GenData() -> {"raw_data_array": list}: raw_data_array = [i for i in range(6)] return raw_data_array Call our usual command: .. code-block:: $ python manage.py compute_data --backend parallel This commands runs 6 workflows in parallel, each lasting 4 seconds. Depending on the number of cpus on your machine, this workflow will not take the same time to execute, but in any case it should be faster than with the sequential backend. Distribute workflows on a slurm cluster ===================================== .. note:: An sqlite database cannot keep up with all the access from the different nodes running outflow tasks. It is strongly recommended to connect to a PostgreSQL database before continuing. See section `database `_ for how to connect to a postgres database. If you have even bigger datasets, and access to a slurm computing cluster, Outflow can run its workflow using the slurm backend. If you have access to a slurm cluster, simply run the command with the argument :code:`--backend slurm`. Allocate multiple cpus for multiprocessing computations for each workflow ========================================================================= Let's say that we can not only parallelize our pipeline on data but each computation can itself be shared between multiple cpu. We can simulate this by dividing the :code:`time.sleep()` in our *Compute* task by the number of CPUs available: .. code-block:: python # in task Compute ... except NoResultFound: # Result not in the database: compute the value like before logger.info("Result not found in database, computing result and inserting") # add this line to access number of CPUs available from multiprocessing import cpu_count() # simulate a multiprocess computation computation_time = 5/cpu_count() logger.info(f"{cpu_count()} CPUs available, computation will last {computation_time} seconds") time.sleep(computation_time) computation_result = raw_data * multiplier Then, edit MapWorkflow parameter to allow multiple CPUs per mapped workflows: .. code-block:: python # in setup_task() of ComputeData command # ↓ add this parameter @MapWorkflow.as_workflow(cpus_per_task=5, output_name="computation_result") as map_task: def Compute() You are now ready to run the usual command (with another multiplier again) and see that it will execute faster than the time it takes to say "Outflow is awesome". .. code-block:: $ python manage.py compute_data --multiplier 15 When using slurm, you can specify sbatch directives directly to the MapWorkflow arguments. See :ref:`mapworkflow ` for details on MapWorkflow usage. Congratulations, you arrived at the end of the Outflow tutorial! You should now know enough about the framework to create your own pipeline.