MapWorkflow

MapWorkflows implements the pattern map-reduce in Outflow. Its purpose is to split an input sequence, and execute itself as many times as the length of the sequence. Each workflow execution will get one of these items.

There is 3 implementations of the MapWorkflow, one for each backend. It is recommended to use the generic outflow.core.library.tasks.MapWorkflow and let the backend choose the right one. Keyword arguments concerning slurm are ignored by other MapWorkflow implementations, so you can for example pass kwargs specific to a SlurmMapWorkflow to the MapWorkflow, and they will be ignored if ran with the default backend.

Usage

@as_task
def ListFiles() -> {"file_path_list": List[str]}:
    paths = os.listdir(data_directory)
    return {"file_path_list": paths}

@as_task
def Read("file_path": str) -> {"data": Any}:
    # read file
    return {"data": data}

@MapWorkflow.as_workflow
def process():
    Read() | Transform() | Save()

class Process(Command):
    def setup_tasks(self):
        ListFiles() | process()

SlurmMapWorkflow

You can configure how your SlurmMapWorkflow will be distributed by passing arguments to the SlurmMapWorkflow.

@SlurmMapWorkflow.as_workflow(partition="short",
                              mem="10GO")
def process():
    Read() | Transform() | Save()

See simple-slurm github for the syntax, and sbatch documentation for available arguments.

There is only one (optional) argument specific to outflow : simultaneous_tasks which specifies the value after the % sign of the slurm array. This tells slurm how many jobs of the slurm array (ie how many mapped workflows) will be executed at the same time. This can be useful if there are limitations on the number of cpus per user on your slurm cluster.

The argument cpu_per_tasks is interesting if you have tasks that can take advantage of multiprocessing computations.

Use config.yml to specify SlurmMapWorkflow sbatch directives

You can easily store your SlurmMapWorkflows configurations in the config.yml file :

my_map_config:
  partition: batch
  cpus_per_task: 2
  simultaneous_tasks: 10
from outflow.core.pipeline import config

with SlurmMapWorkflow(**config["my_map_config"]) as map_task:
    MyComputation()

LoopWorkflow

The LoopTask allows you to repeat a workflow, either a given number of iterations, or indefinitely.

Usage

This will repeat the workflow A | B 3 times:

from outflow.library.tasks import LoopWorkflow

@LoopWorkflow.as_workflow(iterations=3)
def looped_workflow():
    TaskA() | TaskB()

infinite=True will repeat the workflow A | B until an exception is raised, or the processed killed, ctrl-c pressed, or exit_pipeline() called.

@LoopWorkflow.as_workflow(infinite=True)
def looped_workflow():
    TaskA() | TaskB()

Combining the LoopTask with a Sleep Task is useful for automatic processing:

@as_task
def Sleep(seconds=0):
    logger.info(f"Sleeping for {seconds} seconds")
    time.sleep(seconds)

@LoopWorkflow.as_workflow(infinite=True)
def infinite_processing():
    TaskA() | TaskB() | Sleep(seconds=60)