Tasks¶
Built-in tasks¶
IPythonTask¶
This task spawn an ipython shell, so that you have interactive access to everything that a task has access to, notably the pipeline context and the objects at this stage of the workflow.
The pipeline context is available with from outflow.core.pipeline import context
as usual.
The objects returned by the previous task are in the variable kwargs
.
Example use¶
Let’s say you have a workflow like this one :
from outflow.core.tasks import Task
from outflow.core.logging import logger
from outflow.core.commands import Command, RootCommand
@as_task
def TaskA() -> {"some_output": str}:
value = "some_value"
logger.info(f"Value in TaskA : {value}")
return {"some_output": value}
@as_task
def TaskB(some_output):
logger.info(f"Value in TaskB : {some_output}")
@RootCommand.subcommand()
class SomeCommand(Command):
def setup_task(self):
TaskA() | TaskB()
And you want to visualize or edit the values of your objects between the tasks A and B. Edit the workflow and add an InteractiveTask between the two:
...
from outflow.library.tasks import InteractiveTask
class SomeCommand(Command):
def setup_task(self):
A = TaskA()
B = TaskB()
intepreter = InteractiveTask()
A | intepreter | B
Now if you run this command, you can view and edit the value inside “some_output” :
$ python3 manage.py some_command
outflow_tests.plugin_a.tasks - tasks.py:10 - INFO - Value in TaskA : some_value
|> You are inside an outflow interactive task. You can access the outputs of the previous task through the dictionary 'kwargs'.
If you edit the values in 'kwargs', the changes will be passed to the next task.
To quit, do NOT use exit(), use EOF instead (usually CTRL+D)
kwargs
{'some_output': 'some_value'}
|> kwargs["some_output"] = "some_other_value"
|> ^D
now exiting InteractiveConsole...
outflow_tests.plugin_a.tasks - tasks.py:15 - INFO - Value in TaskB : some_other_value
IfThenElse¶
The IfThenElse construct split a workflow into two branches, and execute either of them depending on a user-defined condition. The input targets of the first task of both branches should be the output of the task before the branching.
IfThenElse
is a function that takes a Callable (the condition) and return a tuple of three tasks:
the first is the “if” in which you pipe the inputs
the second is the “then”, out of which you pipe the tasks to execute if the condition succeeds
the third is the “else”
Either the “then” or the “else” its children will be executed, the other and its children will be skipped (ie return a Skipped object).
The condition must have this signature :
def condition(**kwargs):
# kwargs contains all the inputs of the task piped to the "if"
# return either True or False
Your condition can use the pipeline arguments, the database, or the values returned by the previous task.
It is possible to merge the two branches of an IfElse construct using the MergeTask.
Example usage¶
Branches containing tasks with targets¶
The first task of each branch should have the same input targets:
from outflow.library.tasks import IfThenElse
from outflow.core.tasks import Task
from outflow.core.logging import logger
from outflow.core.commands import Command, RootCommand
import random
@as_task
def A() -> {"choice": bool}:
choice = random.choice([True, False])
return {"choice": choice}
@as_task
def B(choice: bool) -> {"output": str}:
logger.info("Executing B")
return {"output": "b"}
@as_task
def C(choice: bool) -> {"output": str}:
logger.info("Executing C")
return {"output": "c"}
@RootCommand.subcommand()
class SomeCommand(Command):
@staticmethod
def condition(**kwargs):
return kwargs["choice"]
def setup_tasks(self):
a = A()
b = B()
c = C()
if_t, then_t, else_t = IfThenElse(self.condition)
a | if_t
then_t | b
else_t | c
(MergeTask)=
MergeTask¶
The MergeTask can be used to merge multiple branches of a workflow. These branches can be either the result of an IfThenElse construct or manual branching using the piping syntax.
The MergeTask returns the only non-Skipped result of the different branches.
Merge IfThenElse branches¶
@as_task
def A() -> {"out1": str}:
return {"out1": "a"}
@as_task
def B(out1: str) -> {"out1": str, "out2": str}:
return {"out1": out1, "out2": "b"}
@as_task
def C(out1: str) -> {"out1": str, "out2": str}:
return {"out1": out1, "out2": "c"}
@as_task
def LastTask(out1: str, out2: str):
logger.info(f"{out1=} , {out2=}")
# will print either
# out1 = a , out2 = b
# or
# out1 = a , out2 = c
@RootCommand.subcommand()
class SomeConditionalCommand(Command):
@staticmethod
def condition(**kwargs):
# some condition
def setup_tasks(self):
a = A()
b = B()
c = C()
if_t, then_t, else_t = IfElse(self.condition)
merge_task = MergeTask()
last_task = LastTask()
a | if_t
then_workflow | b | merge_task
else_workflow | c | merge_task
merge_task | last_task
Merge manual branches¶
Merging manual branching works the same, but you have to manually return Skipped() from either one branch or the other.
from outflow.core.types import Skipped