Create a pipeline with Outflow, part 2: Tasks and commands

The main feature of the Outflow framework is to manage tasks and their execution. In this chapter, we will see how to create tasks, how to execute them in a given order and piping data between them.

Getting started with tasks

To help you getting started, the plugin generated by the command you ran in the previous chapter contains two example tasks and a command.

First, go to the tasks.py file in the plugin directory. You will find two tasks: GenData that returns a value, and PrintData that prints this value using the outflow logger.

from outflow.core.tasks import Task

@Task.as_task
def PrintData(some_data: int):
    logger.info(f"Result of the computation: {some_data}")

As you can see, outflow is very simple and does not require much more than a decorator to turn a function into a task. You can turn a python function into an outflow task using the decorator @Task.as_task.

You may have noticed the colon followed by a type next to the function argument: some_data: int. This is a function annotation and if you don’t know how this works, please have a look at the PEP or this Real Python guide because you will need it! In Outflow, you can (and should) use function annotations to describe the types of the input and outputs of your tasks. If you specify types in annotations, Outflow will automatically type checks all that goes in and out of your task against the specified type. Check out how this is done for the task GenData

@Task.as_task
def GenData() -> {"some_data": int}:
    some_data = 42
    return {"some_data": some_data}

Tasks should always return dictionaries, and the keys must match the name of the inputs of the next task. Here, the returned dictionary contains the key “some_data”, which matches the name of the input of the next task PrintData(some_data: int).

To specify the output of a task, you should put a typed dictionary in the return annotation of the task.

Note

For a simple task like this one that returns a dictionary with only one key, Outflow allows you to return only the object directly and not a dictionary, like so return some_data. This is possible only if you correctly specified a typed dictionary with only one key in the return annotation of your task.

Next, go to commands.py and uncomment the example command. This command is called ComputeData and describe the following workflow:

../../_images/tutorial02_diagram_1.png

Schema of the workflow described by this command

The method setup_task of your command is called at initialization. In this method, pipe your tasks to setup the workflow corresponding to this command. Let’s look at setup_task of the command ComputeData :

from outflow.core.commands import Command, RootCommand

@RootCommand.subcommand(db_untracked=True)
class ComputeData(Command):
    def setup_tasks(self):

        # instantiate tasks
        gen_data = GenData()
        print_data = PrintData()

        # setup the workflow
        gen_data >> print_data

You can see that in order to setup your workflow, you should use the operator >> between the tasks that you want to pipe. This will at the same time create the dependency between tasks, and send the result of the first task to the second.

You should be able to run this command with :

$ python manage.py compute_data

All the registered commands are by default callable with their snake cased name. The parameter name of the @RootCommand.subcommand() decorator is available to override the cli name of the command.

Modify the workflow

We will edit the workflow by adding a new task between GenData and PrintData:

../../_images/tutorial02_diagram_2.png

Schema of the new workflow

Add a new task

Start by adding the new task to the tasks.py file:

import time

@Task.as_task
def Compute(some_data: int) -> {"computation_result": int}:

    logger.info("Running computation")
    # Simulate a big computation
    time.sleep(4)
    result = some_data * 2

    # return the result for the next task
    return {"computation_result": result}

and edit the parameters of the task PrintData to account for the new output name.

@Task.as_task
def PrintData(computation_result: int):
    logger.info(f"Result of the computation : {computation_result}")

Edit a command

Then edit the command by inserting our new task in the workflow definition:

from .tasks import GenData, PrintData, Compute

@RootCommand.subcommand(db_untracked=True)
class ComputeData(Command):
    def setup_tasks(self):

        # instantiate tasks
        gen_data = GenData()
        print_data = PrintData()
        compute = Compute()

        # setup the workflow
        gen_data >> compute >> print_data

        # return the terminating task(s)
        return print_data

# TODO explain that the command can be called automatically with the snake cased name You can now run the command again and see that the output is different:

$ python manage.py compute_data
2020-11-17 18:19:58,369 - tuto.data_reduction.tasks - tasks.py:15 - INFO - Result of the computation: 84

Adding a cli argument

This command is not very interesting because it will output the same value every time. To make it a little more interesting, you can add a command line argument that will decide by how much the task multiplies our generated value.

First, uncomment the method add_arguments of our command :

def add_arguments(self):
    # my arg1
    self.add_argument(
        '--my_arg1',
        help="""
        Arg1 help
        """,
    )

and edit the argument to have meaning and behave as we want (a mandatory integer argument). The add_argument() method of Command works like add_argument() of argparse :

def add_arguments(self):
    self.add_argument(
        '--multiplier',
        help="""
        Value by which the generated value will be multiplied
        """,
        type=int,
        required=True
    )

Then, we will edit the Compute task to use this command line argument.

The command line arguments are accessible through the pipeline context.

A word about the pipeline context

The pipeline context is an object, it contains:

  • the current settings and configuration (see settings and configuration)

  • the command line arguments

  • a reference to the database session

You can import the pipeline context anywhere, but you can only access it within the pipeline scope.

Let’s edit our task Compute to access the command line arguments :

from outflow.core.pipeline import context

@Task.as_task
def Compute(some_data: int) -> {"computation_result": int}:

    multiplier = context.args.multiplier  # get the multiplier from the cli arguments

    time.sleep(4)
    # Compute the product of "some_data" and the multiplier
    result = some_data * multiplier

The final workflow looks like this :

../../_images/tutorial02_diagram_3.png

Schema of the final workflow

Finally, let’s try our new command :

$ python manage.py compute_data --multiplier 3
tuto.data_reduction.tasks - tasks.py:15 - INFO - Result of the computation: 126

Congratulations, you finished the outflow tutorial part 2! In the next chapter, we will see how to create database models and query databases with outflow.