========================================================== Create a pipeline with Outflow, part 2: Tasks and commands ========================================================== The main feature of the Outflow framework is to manage tasks and their execution. In this chapter, we will see how to create tasks, how to execute them in a given order and piping data between them. Getting started with tasks ========================== To help you getting started, the plugin generated by the command you ran in the previous chapter contains two example tasks and a command. First, go to the tasks.py file in the plugin directory. You will find two tasks: *GenData* that outputs a value, and *PrintData* that prints this value using the outflow logger. Let's look at one of the example tasks, GenData : .. code-block:: python from outflow.core.tasks import Task @as_task def GenData(): raw_data = 42 return {"raw_data": raw_data} As you can see, you can easily create Outflow tasks with a regular python function that you decorate with the :code:`@as_task` decorator. The decorator will turn this function into a Task subclass called GenData, that is why its name is written in CamelCase. Tasks must return a dictionary, with keys matching the inputs of the next tasks, here the key is `raw_data`, like the input of the task PrintData below. Outflow is here to help you build robust pipelines, for this reason tasks need to define input targets and output targets. Before running your workflow, Outflow uses these targets to check if the input and output of your tasks match each other. Outflow wants to be robust but not at the cost of the user experience, that is why in the simpler cases like this one, outflow will try to guess the inputs and outputs of your task. More about inputs and outputs of tasks later in this chapter. Commands ======== In Outflow, commands are entry points to your workflows. Let's go to *commands.py* and look at the example command. This command is called *ComputeData* and executes the workflow: .. figure:: ../images/tutorial02_diagram_1.png :align: center Schema of the workflow executed by this command .. code-block:: python from outflow.core.commands import Command, RootCommand from .tasks import GenData, PrintData @RootCommand.subcommand(db_untracked=True) class ComputeData(Command): def setup_tasks(self): # setup the workflow GenData() | PrintData() You can see that in order to setup your workflow, you should use the operator :code:`|` between the tasks that you want to pipe. This operator is overloaded for the class Task, it will create the dependency between tasks, and at runtime will send the result of the task on the left to the task on the right of the operator. .. warning:: You should remember that inside the setup_tasks() method, nothing is executed yet. In this method, you build the dependency graph between your tasks. It is only after the execution of this setup method that Outflow will run each task in the right order. You should be able to run this command with : .. code-block:: $ python manage.py compute_data All the registered commands are by default callable with their snake case name. The parameter `name` of the `@RootCommand.subcommand()` decorator is available to override the cli name of the command. Modify the workflow =================== We will edit the workflow by adding a new task between *GenData* and *PrintData*: .. figure:: ../images/tutorial02_diagram_2.png :align: center Schema of the new workflow Add a new task -------------- Start by adding two tasks to the *tasks.py* file: .. code-block:: python import time from outflow.core.tasks import as_task @as_task def Transform(raw_data): logger.info(f"Transforming data {raw_data=}") # Simulate a big computation time.sleep(2) result = raw_data * 2 # return the result for the next task return {"transformed_data": result} @as_task def Compute(transformed_data): logger.info(f"Running computation for {transformed_data=}") time.sleep(2) result = transformed_data / 2 return {"computation_result": result} and edit the arguments of the task *PrintData* to account for the new output name. .. code-block:: python @as_task def PrintData(computation_result): logger.info(f"Result of the computation: {computation_result}") Edit a command -------------- Then edit the command by inserting our new tasks in the workflow definition: .. code-block:: python from .tasks import GenData, PrintData, Transform, Compute @RootCommand.subcommand(db_untracked=True) class ComputeData(Command): def setup_tasks(self): # setup the workflow GenData() | Transform() | Compute() | PrintData() .. note:: If you don't feel comfortable with the pipe operator, you can also use the bitwise right shift operator : :code:`>>` . They are equivalent for tasks. You can now run the command again and see that the output is different: .. code-block:: $ python manage.py compute_data 2020-11-17 18:19:58,369 - tuto.data_reduction.tasks - tasks.py:15 - INFO - Result of the computation: 168 Task targets ============ In Outflow, Targets are simple objects that have a name and a type. The type can either be a native type like :code:`int` or a type from the typing module. These targets objects are used by Outflow to represent the input and outputs of each tasks. You will most likely not need to manipulate these Targets directly, as Outflow performs automatic target detection: - For input targets, automatic target detection is done by looking at the arguments of the decorated function. - For output targets, this is done by looking at the dictionary after the return statement. Try to replace :code:`return {"raw_data": raw_data}` with :code:`return raw_data` and you will see that if you try to run this workflow, it will raise an error like this : :code:`Could not automatically determine outputs of task gen_data`. Specify targets type -------------------- At runtime, Outflow will not only check if the name of targets match each other, but will also check the type of the target matches the type of the objects inside the dictionary returned by the task. If not specified by user, Outflow will consider targets to be of type :code:`typing.Any`, meaning it will always work. However, it is strongly recommended to type your tasks input and outputs. Imagine you make a mistake or a library has an unexpected behaviour and you don't return the right object, your commands could fail in the middle of a task : inside the task GenData, try to change the value of :code:`raw_data` from the int :code:`42` to the string :code:`"42"`. Your command will run but will not print the expected value. To type your input targets, you should use the usual python type hinting with the colon character. Let's annotate task Compute : .. code-block:: python from outflow.core.tasks import Task @as_task # ↓ add this type hint annotation def Compute(transformed_data: int): ... Now, your task Compute will only run if the type of :code:`raw_data` is an int. Try running the command again and it will now fail with an error like : :code:`your type of argument "raw_data" must be int; got str instead` To type task outputs, you should put a typed dictionary in the return annotation of the function. Let's annotate the task GenData : .. code-block:: python @as_task # ↓ add this return type annotation def GenData() -> {"raw_data": int}: raw_data = "42" # still keep the wrong type here return {"raw_data": raw_data} Now if you try to call the command once again, the execution will fail even earlier, when GenData returns : :code:`type of dict item "raw_data" for return_value must be int; got str instead`. You can now put the right value back in raw_data : :code:`raw_data = 42`. This runtime type checking is really useful and will save you time in the long run. You have seen how to define input and output targets along with their type. Using :code:`as_task` decorator and annotations is the recommended way to define tasks and their targets. If you do not feel comfortable with this syntax, or that you task need special behaviour, it is always possible to subclass Task manually and use the :code:`setup_targets` method. Check the commented tasks at the bottom of the tasks.py file for an example of tasks using the explicit syntax. There is also the page tasks in the user documentation that goes over this. Tasks with only one output -------------------------- For a simple task like GenData that returns a dictionary with only one key, Outflow allows you to return only the object directly and not a dictionary, like so :code:`return raw_data`. .. code-block:: python @as_task def GenData() -> {"raw_data": int}: raw_data = 42 return raw_data # return the object directly # return {"raw_data": raw_data} But remember, this shortcut is possible **only** if you correctly specified a typed dictionary with only one key in the return annotation of your task. Adding task parameters ====================== This command is not very interesting because it will output the same value every time. To make it a little more interesting, you can add a task parameter that will decide by how much the task multiplies our generated value. A task parameter is a special kind of task input. As opposed to regular task inputs that comes from the upstream tasks, a parameter comes from the pipeline configuration file. Edit the *Compute* task and add a parameter : .. code-block:: python from outflow.core.types import Parameter @as_task def Compute(transformed_data: int, multiplier: Parameter(int)) -> {"computation_result": int}: time.sleep(4) # Compute the product of "raw_data" and the multiplier result = transformed_data * multiplier return result You define a parameter by using the custom type outflow.core.types.Parameter in the type hint annotation of your task input. Then go to the config.yml file of your pipeline to add a value for :code:`multiplier`. In this file, you will find a commented section called :code:`parameters`. This is the section where outflow look for parameters defined as above. Uncomment the section. Then replace :code:`my_task` with :code:`compute` (here we use the automatic lowercase name of the task). Finally replace :code:`my_param1: param_value1` with :code:`multiplier: 3` .. code-block:: yaml ... parameters: compute: multiplier: 3 ... Finally, let's try again the command, now with a task that has a parameter. .. code-block:: $ python manage.py compute_data tuto.data_reduction.tasks - tasks.py:15 - INFO - Result of the computation: 28.0 Parameter are useful to keep track of values that can change between pipeline executions, as they are recorded into the outflow internal database (not yet set up at this point of the tutorial). This allows to check with which parameter was a given command ran in the past. This is also useful for the workflow cache that we will see in the next section. Note: class Task has a method :code:`add_parameter()` that you can use in the :code:`setup_targets()` method if you want to use the explicit syntax. Conclusion ========== Congratulations, you finished the outflow tutorial part 2! In the next chapter, we will see how to use workflows.