Design Patterns

This section is a collection of useful patterns that can be implemented using the features of the framework.

Outflow is still young so if you find interesting way to use it, do not hesitate to share it with the community by starting a merge request with your addition to this section, or contact us on the discord server.

Use config or cli arguments inside the workflow definition

When outflow reads the code in the setup_task of your commands, it has already parsed both the configuration file and the command line arguments. That means you can access them and use the values to make your workflow configurable or conditional.

Example 1: use the cli arguments to make a task execute or not

Let’s say you have a task called Debug that sets up a debug environment for your pipeline.

from outflow.core.pipeline import context
from outflow.core.commands import Command, RootCommand
from my_project.my_plugin.tasks import Debug, First, Second

@RootCommand.subcommand()
class MyCommand(Command):
    def add_arguments(self):
        self.add_argument("--debug", action="store_true", help="Set up debug environment")

    def setup_tasks(self):
        first_task = First()
        second_task = Second()

        if context.args.debug:
            debug_task = Debug() # Remember that any instantiated task is executed
            debug_task | first_task

        first_task | second_task

Example 2: use the config.yml file to configure the resources of a MapWorkflow

You can define the sbatch directives inside the configuration and pass them to the MapWorkflow :

Inside the config.yml file:

my_map_resources:
  cpus_per_task: 5
  mem: 10GB
  partition: short

In your workflow definition.

from outflow.core.pipeline import config

@SlurmMapWorkflow.as_workflow(**config.my_map_resources)
def my_map():
    ...

This works because Outflow is responsible for importing your plugin code, and this will be done after loading the configuration file.

Define workflows and commands outside plugins

The code structure presented in the tutorial might not fit your pipeline design. If you prefer, it is possible to define workflows and commands inside the pipeline directory itself.

If your workflows use a combination of tasks from different plugins, it might be more fitting to define them in the pipeline directory rather than inside one of the plugins. This will also help with avoiding dependencies between plugins.

Implementation

Simply create a my_commands.py file inside your pipeline directory containing the definition of your command.

Then, you need to import this command module from the manage.py file, so Outflow is able to find and register it (this step is done automatically for plugins).

# inside the manage.py file

if __name__ == "__main__":
    pipeline_root_directory = Pipeline.get_parent_directory_posix_path(__file__)
    # add plugins to the python path
    # note: for cython like plugins, the compilation step is required and
    # plugin installation via pip is strongly encouraged
    plugins_dir = pathlib.Path(__file__).parent_workflow / "plugins"
    for plugin_path in plugins_dir.glob("*"):
        sys.path.append(plugin_path.resolve().as_posix())

    ###
    # <--- add either one of these two lines, if you use flake8 it will complain about the first one with "imported but unused" so you might want to use the second one
    import my_commands

    importlib.import_module("my_commands")
    ###

    with Pipeline(root_directory=pipeline_root_directory) as pipeline:
        result = pipeline.run()

That’s all! You can now call this new command as usual with python manage.py some_command.

Nesting MapWorkflow and IterativeWorkflow

Case : you have a MapWorkflow inside an IterativeWorkflow, one of the tasks inside the MapWorkflow needs the output of the iterative workflow. In the example below, the Process task needs the output of the IterativeWorkflow, ie the output of ReduceProcessed. This value is an int for the example. In this case, you need to manually add an input to the MapWorkflow, because there is one level of workflow between the iterative workflow and the tasks that need the reinjected value. During execution, all blocks defined one level inside the IterativeWorkflow can have access to the reinjected value, but the MapWorkflow will only retrieve the value if set manually.


@as_task
def Process(reduced_data: Optional[int]):
    # do something with reduced_data
    pass

@as_task
def ReduceProcessed() -> {}:
    # return some int (for the example) that represent the combination of all processed files
    pass

@MapWorkflow.as_workflow
def inner_map_workflow():
    Read() | Process()

def break_func(reduced_data: int):
    if reduced_data > threshold:
        return True
    else:
        return False

@IterativeWorkflow.as_workflow(max_iterations=5, break_func=break_func)
def my_iterative_workflow():
    inner_map = inner_map_workflow()
    inner_map | ReduceProcessed()

    # mandatory : manually add input to the map_workflow
    inner_map.add_input("reduced_data", type=Optional[int])

class MyCommand(Command):
    def setup_tasks(self):
        ListFiles() | my_iterative_workflow()