# Design Patterns This section is a collection of useful patterns that can be implemented using the features of the framework. Outflow is still young so if you find interesting way to use it, do not hesitate to share it with the community by starting a merge request with your addition to this section, or contact us on the discord server. ## Use config or cli arguments inside the workflow definition When outflow reads the code in the setup_task of your commands, it has already parsed both the configuration file and the command line arguments. That means you can access them and use the values to make your workflow configurable or conditional. ### Example 1: use the cli arguments to make a task execute or not Let's say you have a task called Debug that sets up a debug environment for your pipeline. ```python from outflow.core.pipeline import context from outflow.core.commands import Command, RootCommand from my_project.my_plugin.tasks import Debug, First, Second @RootCommand.subcommand() class MyCommand(Command): def add_arguments(self): self.add_argument("--debug", action="store_true", help="Set up debug environment") def setup_tasks(self): first_task = First() second_task = Second() if context.args.debug: debug_task = Debug() # Remember that any instantiated task is executed debug_task | first_task first_task | second_task ``` ### Example 2: use the config.yml file to configure the resources of a MapWorkflow You can define the sbatch directives inside the configuration and pass them to the MapWorkflow : Inside the config.yml file: ```yaml my_map_resources: cpus_per_task: 5 mem: 10GB partition: short ``` In your workflow definition. ```python from outflow.core.pipeline import config @SlurmMapWorkflow.as_workflow(**config.my_map_resources) def my_map(): ... ``` This works because Outflow is responsible for importing your plugin code, and this will be done after loading the configuration file. ## Define workflows and commands outside plugins The code structure presented in the tutorial might not fit your pipeline design. If you prefer, it is possible to define workflows and commands inside the pipeline directory itself. If your workflows use a combination of tasks from different plugins, it might be more fitting to define them in the pipeline directory rather than inside one of the plugins. This will also help with avoiding dependencies between plugins. ### Implementation Simply create a `my_commands.py` file inside your pipeline directory containing the definition of your command. Then, you need to import this command module from the manage.py file, so Outflow is able to find and register it (this step is done automatically for plugins). ```python # inside the manage.py file if __name__ == "__main__": pipeline_root_directory = Pipeline.get_parent_directory_posix_path(__file__) # add plugins to the python path # note: for cython like plugins, the compilation step is required and # plugin installation via pip is strongly encouraged plugins_dir = pathlib.Path(__file__).parent_workflow / "plugins" for plugin_path in plugins_dir.glob("*"): sys.path.append(plugin_path.resolve().as_posix()) ### # <--- add either one of these two lines, if you use flake8 it will complain about the first one with "imported but unused" so you might want to use the second one import my_commands importlib.import_module("my_commands") ### with Pipeline(root_directory=pipeline_root_directory) as pipeline: result = pipeline.run() ``` That's all! You can now call this new command as usual with `python manage.py some_command`. ## Nesting MapWorkflow and IterativeWorkflow Case : you have a MapWorkflow inside an IterativeWorkflow, one of the tasks inside the MapWorkflow needs the output of the iterative workflow. In the example below, the Process task needs the output of the IterativeWorkflow, ie the output of ReduceProcessed. This value is an int for the example. In this case, you need to manually add an input to the MapWorkflow, because there is one level of workflow between the iterative workflow and the tasks that need the reinjected value. During execution, all blocks defined one level inside the IterativeWorkflow can have access to the reinjected value, but the MapWorkflow will only retrieve the value if set manually. ```python @as_task def Process(reduced_data: Optional[int]): # do something with reduced_data pass @as_task def ReduceProcessed() -> {}: # return some int (for the example) that represent the combination of all processed files pass @MapWorkflow.as_workflow def inner_map_workflow(): Read() | Process() def break_func(reduced_data: int): if reduced_data > threshold: return True else: return False @IterativeWorkflow.as_workflow(max_iterations=5, break_func=break_func) def my_iterative_workflow(): inner_map = inner_map_workflow() inner_map | ReduceProcessed() # mandatory : manually add input to the map_workflow inner_map.add_input("reduced_data", type=Optional[int]) class MyCommand(Command): def setup_tasks(self): ListFiles() | my_iterative_workflow() ```