# MapWorkflow MapWorkflows implements the pattern map-reduce in Outflow. Its purpose is to split an input sequence, and execute itself as many times as the length of the sequence. Each workflow execution will get one of these items. There is 3 implementations of the MapWorkflow, one for each backend. It is recommended to use the generic `outflow.core.library.tasks.MapWorkflow` and let the backend choose the right one. Keyword arguments concerning slurm are ignored by other MapWorkflow implementations, so you can for example pass kwargs specific to a SlurmMapWorkflow to the MapWorkflow, and they will be ignored if ran with the default backend. ## Usage ```python @as_task def ListFiles() -> {"file_path_list": List[str]}: paths = os.listdir(data_directory) return {"file_path_list": paths} @as_task def Read("file_path": str) -> {"data": Any}: # read file return {"data": data} @MapWorkflow.as_workflow def process(): Read() | Transform() | Save() class Process(Command): def setup_tasks(self): ListFiles() | process() ``` ## SlurmMapWorkflow You can configure how your SlurmMapWorkflow will be distributed by passing arguments to the SlurmMapWorkflow. ```python @SlurmMapWorkflow.as_workflow(partition="short", mem="10GO") def process(): Read() | Transform() | Save() ``` See [simple-slurm github](https://github.com/amq92/simple-slurm/#many-syntaxes-available) for the syntax, and [sbatch documentation](https://slurm.schedmd.com/sbatch.html) for available arguments. There is only one (optional) argument specific to outflow : `simultaneous_tasks` which specifies the value after the % sign of the slurm array. This tells slurm how many jobs of the slurm array (ie how many mapped workflows) will be executed at the same time. This can be useful if there are limitations on the number of cpus per user on your slurm cluster. The argument `cpu_per_tasks` is interesting if you have tasks that can take advantage of multiprocessing computations. ### Use config.yml to specify SlurmMapWorkflow sbatch directives You can easily store your SlurmMapWorkflows configurations in the config.yml file : ```yaml my_map_config: partition: batch cpus_per_task: 2 simultaneous_tasks: 10 ``` ```python from outflow.core.pipeline import config with SlurmMapWorkflow(**config["my_map_config"]) as map_task: MyComputation() ``` # LoopWorkflow The LoopTask allows you to repeat a workflow, either a given number of iterations, or indefinitely. ## Usage This will repeat the workflow `A | B` 3 times: ```python from outflow.library.tasks import LoopWorkflow @LoopWorkflow.as_workflow(iterations=3) def looped_workflow(): TaskA() | TaskB() ``` `infinite=True` will repeat the workflow `A | B` until an exception is raised, or the processed killed, ctrl-c pressed, or exit_pipeline() called. ```python @LoopWorkflow.as_workflow(infinite=True) def looped_workflow(): TaskA() | TaskB() ``` Combining the LoopTask with a `Sleep` Task is useful for automatic processing: ```python @as_task def Sleep(seconds=0): logger.info(f"Sleeping for {seconds} seconds") time.sleep(seconds) @LoopWorkflow.as_workflow(infinite=True) def infinite_processing(): TaskA() | TaskB() | Sleep(seconds=60) ```