# Tasks ## Built-in tasks ### IPythonTask This task spawn an ipython shell, so that you have interactive access to everything that a task has access to, notably the pipeline context and the objects at this stage of the workflow. The pipeline context is available with `from outflow.core.pipeline import context` as usual. The objects returned by the previous task are in the variable `kwargs`. #### Example use Let's say you have a workflow like this one : ```python from outflow.core.tasks import Task from outflow.core.logging import logger from outflow.core.commands import Command, RootCommand @as_task def TaskA() -> {"some_output": str}: value = "some_value" logger.info(f"Value in TaskA : {value}") return {"some_output": value} @as_task def TaskB(some_output): logger.info(f"Value in TaskB : {some_output}") @RootCommand.subcommand() class SomeCommand(Command): def setup_task(self): TaskA() | TaskB() ``` And you want to visualize or edit the values of your objects between the tasks A and B. Edit the workflow and add an InteractiveTask between the two: ```python ... from outflow.library.tasks import InteractiveTask class SomeCommand(Command): def setup_task(self): A = TaskA() B = TaskB() intepreter = InteractiveTask() A | intepreter | B ``` Now if you run this command, you can view and edit the value inside "some_output" : ``` $ python3 manage.py some_command outflow_tests.plugin_a.tasks - tasks.py:10 - INFO - Value in TaskA : some_value |> You are inside an outflow interactive task. You can access the outputs of the previous task through the dictionary 'kwargs'. If you edit the values in 'kwargs', the changes will be passed to the next task. To quit, do NOT use exit(), use EOF instead (usually CTRL+D) kwargs {'some_output': 'some_value'} |> kwargs["some_output"] = "some_other_value" |> ^D now exiting InteractiveConsole... outflow_tests.plugin_a.tasks - tasks.py:15 - INFO - Value in TaskB : some_other_value ``` ### IfThenElse The IfThenElse construct split a workflow into two branches, and execute either of them depending on a user-defined condition. The input targets of the first task of both branches should be the output of the task before the branching. `IfThenElse` is a function that takes a Callable (the condition) and return a tuple of three tasks: - the first is the "if" in which you pipe the inputs - the second is the "then", out of which you pipe the tasks to execute if the condition succeeds - the third is the "else" Either the "then" or the "else" its children will be executed, the other and its children will be skipped (ie return a Skipped object). The condition must have this signature : ```python def condition(**kwargs): # kwargs contains all the inputs of the task piped to the "if" # return either True or False ``` Your condition can use the pipeline arguments, the database, or the values returned by the previous task. It is possible to merge the two branches of an IfElse construct using the [MergeTask](MergeTask). #### Example usage ##### Branches containing tasks with targets The first task of each branch should have the same input targets: ```python from outflow.library.tasks import IfThenElse from outflow.core.tasks import Task from outflow.core.logging import logger from outflow.core.commands import Command, RootCommand import random @as_task def A() -> {"choice": bool}: choice = random.choice([True, False]) return {"choice": choice} @as_task def B(choice: bool) -> {"output": str}: logger.info("Executing B") return {"output": "b"} @as_task def C(choice: bool) -> {"output": str}: logger.info("Executing C") return {"output": "c"} @RootCommand.subcommand() class SomeCommand(Command): @staticmethod def condition(**kwargs): return kwargs["choice"] def setup_tasks(self): a = A() b = B() c = C() if_t, then_t, else_t = IfThenElse(self.condition) a | if_t then_t | b else_t | c ``` (MergeTask)= ### MergeTask The MergeTask can be used to merge multiple branches of a workflow. These branches can be either the result of an IfThenElse construct or manual branching using the piping syntax. The MergeTask returns the only non-Skipped result of the different branches. #### Merge IfThenElse branches ```python @as_task def A() -> {"out1": str}: return {"out1": "a"} @as_task def B(out1: str) -> {"out1": str, "out2": str}: return {"out1": out1, "out2": "b"} @as_task def C(out1: str) -> {"out1": str, "out2": str}: return {"out1": out1, "out2": "c"} @as_task def LastTask(out1: str, out2: str): logger.info(f"{out1=} , {out2=}") # will print either # out1 = a , out2 = b # or # out1 = a , out2 = c @RootCommand.subcommand() class SomeConditionalCommand(Command): @staticmethod def condition(**kwargs): # some condition def setup_tasks(self): a = A() b = B() c = C() if_t, then_t, else_t = IfElse(self.condition) merge_task = MergeTask() last_task = LastTask() a | if_t then_workflow | b | merge_task else_workflow | c | merge_task merge_task | last_task ``` #### Merge manual branches Merging manual branching works the same, but you have to manually return Skipped() from either one branch or the other. `from outflow.core.types import Skipped`