Tasks

Tasks are the smallest building blocks of your pipelines.

Create a task from a function

The as_task decorator allows you to convert a function into a Task subclass, that you can use in your workflow definition just like regular tasks defined above. You can use this decorator both with and without parenthesis.

from outflow.core.tasks import as_task

@as_task
def MyTask(a, b):
    return {"result": a+b}

Create a task by subclassing Task

from outflow.core.tasks import Task

class MyTask(Task):
    def run(self, a: int, b: int) -> {"result": int}:
        return {"result": a+b}

Create a Task template

If you need a special behaviour for each of your task, you can use the as_task method of your subclass on your tasks. Here is an example with tasks that will always log their name before running.

from outflow.core.tasks import Task
from outflow.core.logging import logger


class LogTask(Task):
    def __call__(self, *args, **kwargs):
        logger.info(f"Running task {self.name}")
        return super().__call__(*args, **kwargs)

@LogTask.as_task
def MyTask():
    pass

Running a task directly

You might want to manually run a task, for example if you are in the outflow shell. In this case, you can call a task instance manually, and passing all needed task inputs as keyword arguments of this call.

Inside a python shell

>>> @as_task
... def Add(a:int, b:int) -> {"result": int}:
...     return a+b
>>> add = Add()
>>> add(2,4)
{"result": 6}

Targets

Targets represent the input and outputs of your tasks. There are 3 types of targets, inputs, outputs, and parameters.

Target typing

In all target definition syntax, the type is optional (but highly recommended) and defaults to typing.Any.

Parameters

Parameters are special inputs that comes from the pipeline configuration file. At the task execution, outflow will look for the name of the task in the parameter field of the configuration file. Note that by default, the task name is the snake_case name of the decorated function. A parameter must be present in the configuration file and cannot have a default value.

Task definition alternatives

Let’s take the example of a task that compute the sum of two numbers a and b, and returns the result :


from outflow.core.tasks import Task

class MyTask(Task):
    def setup_targets(self):
        self.add_input("a", int)
        self.add_input("b", int)
        self.add_output("result", int)

    def run(self, a, b):
        return {"result": a+b}

Class Task have a method setup_targets that is automatically called by the framework. As shown in the above example, you can override this method to add the targets. The default method will try to guess your targets from the run function annotations.

You can have a hybrid approach, by calling super().setup_targets() and then manually set up additional targets.

Automatic inputs


from outflow.core.tasks import Task

class MyTask(Task):
    def run(self, a: int, b: int):
        return {"result": a+b}

Automatic output targets from annotations

Outflow will make output targets from the return annotations. This is the recommended way since you can type your outputs.


from outflow.core.tasks import Task

class MyTask(Task):
    def setup_targets(self):
        self.add_input("a", int)
        self.add_input("b", int)
        super().setup_targets() # use automatic output detection
        # self.add_output("result", int)

    def run(self, a, b) -> {"result": int}:
        return {"result": a+b}

Automatic output targets from return statement

If Outflow does not find a return annotation, it will try to check your run method for the return statement, and recover the dictionary to automatically add the output targets to the task. In this case the type of each target will be typing.Any.


from outflow.core.tasks import Task

class MyTask(Task):
    def setup_targets(self):
        self.add_input("a", int)
        self.add_input("b", int)
        super().setup_targets() # use automatic output detection from return statement
        # self.add_output("result", int)

    def run(self, a, b):
        return {"result": a+b}

Implicit return

If your task has only one output, and the type of this output is not a dictionary, it is possible to return the value directly from the run method.

from outflow.core.tasks import Task

class MyTask(Task):
    def setup_targets(self):
        self.add_input("a", int)
        self.add_input("b", int)
        self.add_output("result", int)

    def run(self, a, b):
        return a+b
        # return {"result": a+b}