Design Patterns

This section is a collection of useful patterns that can be implemented using the features of the framework.

Outflow is still young so if you find interesting way to use it, do not hesitate to share it with the community by starting a merge request with your addition to this section, or contact us on the discord server.

Use config or cli arguments inside the workflow definition

When outflow reads the code in the setup_task of your commands, it has already parsed both the configuration file and the command line arguments. That means you can access them and use the values to make your workflow configurable or conditional.

Example 1: use the cli arguments to make a task execute or not

Let’s say you have a task called Debug that sets up a debug environment for your pipeline.

from outflow.core.pipeline import context
from outflow.core.commands import Command, RootCommand
from my_project.my_plugin.tasks import Debug, First, Second

@RootCommand.subcommand()
class MyCommand(Command):
    def add_arguments(self):
        self.add_argument("--debug", action="store_true", help="Set up debug environment")

    def setup_tasks(self):
        first_task = First()
        second_task = Second()

        if context.args.debug:
            debug_task = Debug() # Remember that any instantiated task is executed
            debug_task >> first_task

        first_task >> second_task

Example 2: use the config.yml file to configure the resources of a MapTask

You can define the sbatch directives inside the configuration and pass them to the MapTask :

Inside the config.yml file:

my_map_resources:
  cpus_per_task: 5
  memory: 10GB
  partition: short

Inside your command:

from outflow.core.pipeline import config
from outflow.core.commands import Command, RootCommand

@RootCommand.subcommand()
class MyCommand(Command):
    def setup_tasks(self):
        with MapTask(**config["my_map_resources"]) as my_map:
            ...

Define workflows outside a command

Reminder: when executing the setup_tasks() method of your command, any instantiated task is automatically added to the root workflow that will be executed by the command.

Knowing this, it is easy to define task a workflow that can be reused in multiple commands. Simple define you workflow in a function, and call this function inside the setup_tasks() of your different commands :

def useful_workflow():
    FirstTask() >> SecondTask() >> ThirdTask()

@RootCommand.subcommand()
class SomeCommand():
    def setup_tasks(self):
        useful_workflow()
        SomeTask() >> AnotherTask()

@RootCommand.subcommand()
class AnotherCommand():
    def setup_tasks(self):
        useful_workflow()
        SomeOtherTask() >> YetAnotherTask()

This way, the task graph described in useful_workflow can be reused in multiple commands.

Create dependency with workflow defined outside a command

The way we set up the command above did not state any dependency between useful_workflow and the other tasks of the command. It might be executed before or after, and we cannot send any input in FirstTask or get the results of ThirdTask.

There is no real outflow feature to do this (yet) but here is a workaround. Return the first and last task of useful_workflow as a tuple, and use these as input an output of the workflow:

def useful_workflow():
    first_task = FirstTask()
    third_task = ThirdTask()

    first_task >> SecondTask() >> third_task

    return first_task, third_task

@RootCommand.subcommand()
class SomeCommand():
    def setup_tasks(self):
        first_task, third_task = some_workflow()

        GenSomeData() >> first_task

        third_task >> SomeTask() >> AnotherTask()

This way, we can both send input and get the output of the workflow defined in some_workflow


NOTE

There is work in progress to implement a real “workflow” object to help with the use case above, that would look like this:


@Workflow
def useful_workflow():
    FirstTask() >> SecondTask() >> ThirdTask()

@RootCommand.subcommand()
class SomeCommand():
    def setup_tasks(self):

        GenSomeData() >> some_workflow()  >> SomeTask() >> AnotherTask()

Define workflows and commands outside plugins

The code structure presented in the tutorial might not fit your pipeline design. If you prefer, it is possible to define workflows and commands inside the pipeline directory itself.

If your workflows use a combination of tasks from different plugins, it might be more fitting to define them in the pipeline directory rather than inside one of the plugins. This will also help with avoiding dependencies between plugins.

Implementation

Simply create a my_commands.py file inside your pipeline directory containing the definition of your command.

Then, you need to import this command module from the manage.py file, so Outflow is able to find and register it (this step is done automatically for plugins).

# inside the manage.py file

if __name__ == "__main__":
    pipeline_root_directory = Pipeline.get_parent_directory_posix_path(__file__)
    # add plugins to the python path
    # note: for cython like plugins, the compilation step is required and
    # plugin installation via pip is strongly encouraged
    plugins_dir = pathlib.Path(__file__).parent / "plugins"
    for plugin_path in plugins_dir.glob("*"):
        sys.path.append(plugin_path.resolve().as_posix())

    ###
    # <--- add either one of these two lines, if you use flake8 it will complain about the first one with "imported but unused" so you might want to use the second one
    import my_commands
    importlib.import_module("my_commands")
    ###

    with Pipeline(root_directory=pipeline_root_directory) as pipeline:
        result = pipeline.run()

That’s all! You can now call this new command as usual with python manage.py some_command.