Create a pipeline with Outflow, part 2: Tasks and commands

The main feature of the Outflow framework is to manage tasks and their execution. In this chapter, we will see how to create tasks, how to execute them in a given order and piping data between them.

Getting started with tasks

To help you getting started, the plugin generated by the command you ran in the previous chapter contains two example tasks and a command.

First, go to the tasks.py file in the plugin directory. You will find two tasks: GenData that outputs a value, and PrintData that prints this value using the outflow logger.

Let’s look at one of the example tasks, GenData :

from outflow.core.tasks import Task

@as_task
def GenData():
    raw_data = 42
    return {"raw_data": raw_data}

As you can see, you can easily create Outflow tasks with a regular python function that you decorate with the @as_task decorator. The decorator will turn this function into a Task subclass called GenData, that is why its name is written in CamelCase.

Tasks must return a dictionary, with keys matching the inputs of the next tasks, here the key is raw_data, like the input of the task PrintData below.

Outflow is here to help you build robust pipelines, for this reason tasks need to define input targets and output targets. Before running your workflow, Outflow uses these targets to check if the input and output of your tasks match each other.

Outflow wants to be robust but not at the cost of the user experience, that is why in the simpler cases like this one, outflow will try to guess the inputs and outputs of your task. More about inputs and outputs of tasks later in this chapter.

Commands

In Outflow, commands are entry points to your workflows.

Let’s go to commands.py and look at the example command. This command is called ComputeData and executes the workflow:

../../_images/tutorial02_diagram_1.png

Schema of the workflow executed by this command

from outflow.core.commands import Command, RootCommand
from .tasks import GenData, PrintData

@RootCommand.subcommand(db_untracked=True)
class ComputeData(Command):
    def setup_tasks(self):

        # setup the workflow
        GenData() | PrintData()

You can see that in order to setup your workflow, you should use the operator | between the tasks that you want to pipe. This operator is overloaded for the class Task, it will create the dependency between tasks, and at runtime will send the result of the task on the left to the task on the right of the operator.

Warning

You should remember that inside the setup_tasks() method, nothing is executed yet. In this method, you build the dependency graph between your tasks. It is only after the execution of this setup method that Outflow will run each task in the right order.

You should be able to run this command with :

$ python manage.py compute_data

All the registered commands are by default callable with their snake case name. The parameter name of the @RootCommand.subcommand() decorator is available to override the cli name of the command.

Modify the workflow

We will edit the workflow by adding a new task between GenData and PrintData:

../../_images/tutorial02_diagram_2.png

Schema of the new workflow

Add a new task

Start by adding two tasks to the tasks.py file:

import time
from outflow.core.tasks import as_task


@as_task
def Transform(raw_data):
    logger.info(f"Transforming data {raw_data=}")
    # Simulate a big computation
    time.sleep(2)
    result = raw_data * 2

    # return the result for the next task
    return {"transformed_data": result}


@as_task
def Compute(transformed_data):
    logger.info(f"Running computation for {transformed_data=}")
    time.sleep(2)
    result = transformed_data / 2

    return {"computation_result": result}

and edit the arguments of the task PrintData to account for the new output name.

@as_task
def PrintData(computation_result):
    logger.info(f"Result of the computation: {computation_result}")

Edit a command

Then edit the command by inserting our new tasks in the workflow definition:

from .tasks import GenData, PrintData, Transform, Compute

@RootCommand.subcommand(db_untracked=True)
class ComputeData(Command):
    def setup_tasks(self):

        # setup the workflow
        GenData() | Transform() | Compute() | PrintData()

Note

If you don’t feel comfortable with the pipe operator, you can also use the bitwise right shift operator : >> . They are equivalent for tasks.

You can now run the command again and see that the output is different:

$ python manage.py compute_data
2020-11-17 18:19:58,369 - tuto.data_reduction.tasks - tasks.py:15 - INFO - Result of the computation: 168

Task targets

In Outflow, Targets are simple objects that have a name and a type. The type can either be a native type like int or a type from the typing module. These targets objects are used by Outflow to represent the input and outputs of each tasks.

You will most likely not need to manipulate these Targets directly, as Outflow performs automatic target detection:

  • For input targets, automatic target detection is done by looking at the arguments of the decorated function.

  • For output targets, this is done by looking at the dictionary after the return statement. Try to replace return {"raw_data": raw_data} with return raw_data and you will see that if you try to run this workflow, it will raise an error like this : Could not automatically determine outputs of task gen_data.

Specify targets type

At runtime, Outflow will not only check if the name of targets match each other, but will also check the type of the target matches the type of the objects inside the dictionary returned by the task. If not specified by user, Outflow will consider targets to be of type typing.Any, meaning it will always work.

However, it is strongly recommended to type your tasks input and outputs. Imagine you make a mistake or a library has an unexpected behaviour and you don’t return the right object, your commands could fail in the middle of a task : inside the task GenData, try to change the value of raw_data from the int 42 to the string "42". Your command will run but will not print the expected value.

To type your input targets, you should use the usual python type hinting with the colon character. Let’s annotate task Compute :

from outflow.core.tasks import Task

@as_task  #                 ↓ add this type hint annotation
def Compute(transformed_data: int):
    ...

Now, your task Compute will only run if the type of raw_data is an int. Try running the command again and it will now fail with an error like : your type of argument "raw_data" must be int; got str instead

To type task outputs, you should put a typed dictionary in the return annotation of the function. Let’s annotate the task GenData :

@as_task  #   ↓ add this return type annotation
def GenData() -> {"raw_data": int}:
    raw_data = "42" # still keep the wrong type here
    return {"raw_data": raw_data}

Now if you try to call the command once again, the execution will fail even earlier, when GenData returns : type of dict item "raw_data" for return_value must be int; got str instead. You can now put the right value back in raw_data : raw_data = 42.

This runtime type checking is really useful and will save you time in the long run.

You have seen how to define input and output targets along with their type. Using as_task decorator and annotations is the recommended way to define tasks and their targets.

If you do not feel comfortable with this syntax, or that you task need special behaviour, it is always possible to subclass Task manually and use the setup_targets method. Check the commented tasks at the bottom of the tasks.py file for an example of tasks using the explicit syntax. There is also the page tasks in the user documentation that goes over this.

Tasks with only one output

For a simple task like GenData that returns a dictionary with only one key, Outflow allows you to return only the object directly and not a dictionary, like so return raw_data. .. code-block:: python

@as_task def GenData() -> {“raw_data”: int}:

raw_data = 42 return raw_data # return the object directly # return {“raw_data”: raw_data}

But remember, this shortcut is possible only if you correctly specified a typed dictionary with only one key in the return annotation of your task.

Adding task parameters

This command is not very interesting because it will output the same value every time. To make it a little more interesting, you can add a task parameter that will decide by how much the task multiplies our generated value.

A task parameter is a special kind of task input. As opposed to regular task inputs that comes from the upstream tasks, a parameter comes from the pipeline configuration file.

Edit the Compute task and add a parameter :

from outflow.core.types import Parameter

@as_task
def Compute(transformed_data: int, multiplier: Parameter(int)) -> {"computation_result": int}:
    time.sleep(4)
    # Compute the product of "raw_data" and the multiplier
    result = transformed_data * multiplier
    return result

You define a parameter by using the custom type outflow.core.types.Parameter in the type hint annotation of your task input.

Then go to the config.yml file of your pipeline to add a value for multiplier. In this file, you will find a commented section called parameters. This is the section where outflow look for parameters defined as above.

Uncomment the section. Then replace my_task with compute (here we use the automatic lowercase name of the task). Finally replace my_param1: param_value1 with multiplier: 3

...

parameters:
  compute:
    multiplier: 3

...

Finally, let’s try again the command, now with a task that has a parameter.

$ python manage.py compute_data
tuto.data_reduction.tasks - tasks.py:15 - INFO - Result of the computation: 28.0

Parameter are useful to keep track of values that can change between pipeline executions, as they are recorded into the outflow internal database (not yet set up at this point of the tutorial). This allows to check with which parameter was a given command ran in the past.

This is also useful for the workflow cache that we will see in the next section.

Note: class Task has a method add_parameter() that you can use in the setup_targets() method if you want to use the explicit syntax.

Conclusion

Congratulations, you finished the outflow tutorial part 2! In the next chapter, we will see how to use workflows.