Tasks¶
Tasks are the smallest building blocks of your pipelines.
Create a task from a function¶
The as_task
decorator allows you to convert a function into a Task subclass, that you can use in your workflow definition just like regular tasks defined above. You can use this decorator both with and without parenthesis.
from outflow.core.tasks import as_task
@as_task
def MyTask(a, b):
return {"result": a+b}
Create a task by subclassing Task¶
from outflow.core.tasks import Task
class MyTask(Task):
def run(self, a: int, b: int) -> {"result": int}:
return {"result": a+b}
Create a Task template¶
If you need a special behaviour for each of your task, you can use the as_task
method of your subclass on your tasks. Here is an example with tasks that will always log their name before running.
from outflow.core.tasks import Task
from outflow.core.logging import logger
class LogTask(Task):
def __call__(self, *args, **kwargs):
logger.info(f"Running task {self.name}")
return super().__call__(*args, **kwargs)
@LogTask.as_task
def MyTask():
pass
Running a task directly¶
You might want to manually run a task, for example if you are in the outflow shell. In this case, you can call a task instance manually, and passing all needed task inputs as keyword arguments of this call.
Inside a python shell
>>> @as_task
... def Add(a:int, b:int) -> {"result": int}:
... return a+b
>>> add = Add()
>>> add(2,4)
{"result": 6}
Targets¶
Targets represent the input and outputs of your tasks. There are 3 types of targets, inputs, outputs, and parameters.
Target typing¶
In all target definition syntax, the type is optional (but highly recommended) and defaults to typing.Any.
Parameters¶
Parameters are special inputs that comes from the pipeline configuration file. At the task execution, outflow will look for the name of the task in the parameter
field of the configuration file. Note that by default, the task name is the snake_case name of the decorated function.
A parameter must be present in the configuration file and cannot have a default value.
Task definition alternatives¶
Let’s take the example of a task that compute the sum of two numbers a and b, and returns the result :
from outflow.core.tasks import Task
class MyTask(Task):
def setup_targets(self):
self.add_input("a", int)
self.add_input("b", int)
self.add_output("result", int)
def run(self, a, b):
return {"result": a+b}
Class Task have a method setup_targets
that is automatically called by the framework. As shown in the above example, you can override this method to add the targets. The default method will try to guess your targets from the run function annotations.
You can have a hybrid approach, by calling super().setup_targets()
and then manually set up additional targets.
Automatic inputs¶
from outflow.core.tasks import Task
class MyTask(Task):
def run(self, a: int, b: int):
return {"result": a+b}
Automatic output targets from annotations¶
Outflow will make output targets from the return annotations. This is the recommended way since you can type your outputs.
from outflow.core.tasks import Task
class MyTask(Task):
def setup_targets(self):
self.add_input("a", int)
self.add_input("b", int)
super().setup_targets() # use automatic output detection
# self.add_output("result", int)
def run(self, a, b) -> {"result": int}:
return {"result": a+b}
Automatic output targets from return statement¶
If Outflow does not find a return annotation, it will try to check your run
method for the return statement, and recover the dictionary to automatically add the output targets to the task. In this case the type of each target will be typing.Any.
from outflow.core.tasks import Task
class MyTask(Task):
def setup_targets(self):
self.add_input("a", int)
self.add_input("b", int)
super().setup_targets() # use automatic output detection from return statement
# self.add_output("result", int)
def run(self, a, b):
return {"result": a+b}
Implicit return¶
If your task has only one output, and the type of this output is not a dictionary, it is possible to return the value directly from the run
method.
from outflow.core.tasks import Task
class MyTask(Task):
def setup_targets(self):
self.add_input("a", int)
self.add_input("b", int)
self.add_output("result", int)
def run(self, a, b):
return a+b
# return {"result": a+b}