Tasks

Built-in tasks

LoopTask

The LoopTask allows you to repeat a workflow, either a given number of iterations, or indefinitely.

Usage

This will repeat the workflow A >> B 3 times:

from outflow.library.tasks import LoopTask

@RootCommand.subcommand()
class SomeCommand(Command):
    def setup_task(self):
        with LoopTask(iterations=3):
            TaskA() >> TaskB()

infinite=True will repeat the workflow A >> B until an exception is raised, or the processed killed, ctrl-c pressed, or exit_pipeline() called.

@RootCommand.subcommand()
class SomeCommand(Command):
    def setup_task(self):
        with LoopTask(infinite=True):
            TaskA() >> TaskB()

Combining the LoopTask with a Sleep Task is useful for automatic processing:

@Task.as_task
def Sleep(seconds=0):
    logger.info(f"Sleeping for {seconds} seconds")
    time.sleep(seconds)

@RootCommand.subcommand()
class SomeCommand(Command):
    def setup_task(self):
        with LoopTask(infinite=True):
            Sleep(seconds=60)
            TaskA() >> TaskB()

IPythonTask

This task spawn an ipython shell, so that you have interactive access to everything that a task has access to, notably the pipeline context and the objects at this stage of the workflow.

The pipeline context is available with from outflow.core.pipeline import context as usual.

The objects returned by the previous task are in the variable kwargs.

Example use

Let’s say you have a workflow like this one :

from outflow.core.tasks import Task
from outflow.core.logging import logger
from outflow.core.commands import Command, RootCommand


@Task.as_task()
def TaskA() -> {"some_output": str}:
    value = "some_value"
    logger.info(f"Value in TaskA : {value}")
    return {"some_output": value}

@Task.as_task()
def TaskB(some_output):
    logger.info(f"Value in TaskB : {some_output}")


@RootCommand.subcommand()
class SomeCommand(Command):
    def setup_task(self):
        A = TaskA()
        B = TaskB()
        A >> B

And you want to visualize or edit the values of your objects between the tasks A and B. Edit the workflow and add an InteractiveTask between the two:

...
from outflow.library.tasks import InteractiveTask

class SomeCommand(Command):
    def setup_task(self):
        A = TaskA()
        B = TaskB()
        intepreter = InteractiveTask()
        A >> intepreter >> B

Now if you run this command, you can view and edit the value inside “some_output” :

$ python3 manage.py some_command
outflow_tests.plugin_a.tasks - tasks.py:10 - INFO - Value in TaskA : some_value
>>> You are inside an outflow interactive task. You can access the outputs of the previous task through the dictionary 'kwargs'.
If you edit the values in 'kwargs', the changes will be passed to the next task.
To quit, do NOT use exit(), use EOF instead (usually CTRL+D)

kwargs
{'some_output': 'some_value'}
>>> kwargs["some_output"] = "some_other_value"
>>> ^D
now exiting InteractiveConsole...
outflow_tests.plugin_a.tasks - tasks.py:15 - INFO - Value in TaskB : some_other_value

IfThenElse

The IfThenElse construct split a workflow into two branches, and execute either of them depending on a user-defined condition. The input targets of the first task of both branches should be the output of the task before the branching.

IfThenElse is a function that takes a Callable (the condition) and return a tuple of three tasks:

  • the first is the “if” in which you pipe the inputs

  • the second is the “then”, out of which you pipe the tasks to execute if the condition succeeds

  • the third is the “else”

Either the “then” or the “else” its children will be executed, the other and its children will be skipped (ie return a Skipped object).

The condition must have this signature :

def condition(**kwargs):
    # kwargs contains all the inputs of the task piped to the "if"
    # return either True or False

Your condition can use the pipeline arguments, the database, or the values returned by the previous task.

It is possible to merge the two branches of an IfElse construct using the MergeTask.

Example usage

Branches containing tasks with targets

The first task of each branch should have the same input targets:

from outflow.library.tasks import IfThenElse
from outflow.core.tasks import Task
from outflow.core.logging import logger
from outflow.core.commands import Command, RootCommand
import random


@Task.as_task()
def A() -> {"choice": bool}:
    choice = random.choice([True, False])
    return {"choice": choice}

@Task.as_task()
def B(choice: bool) -> {"output": str}:
    logger.info("Executing B")
    return {"output": "b"}

@Task.as_task()
def C(choice: bool) -> {"output": str}:
    logger.info("Executing C")
    return {"output": "c"}

@RootCommand.subcommand()
class SomeCommand(Command):
    @staticmethod
    def condition(**kwargs):
        return kwargs["choice"]

    def setup_tasks(self):
        a = A()
        b = B()
        c = C()

        if_t, then_t, else_t = IfThenElse(self.condition)

        a >> if_t

        then_t >> b
        else_t >> c

(MergeTask)=

MergeTask

The MergeTask can be used to merge multiple branches of a workflow. These branches can be either the result of an IfThenElse construct or manual branching using the piping syntax.

The MergeTask returns the only non-Skipped result of the different branches.

Merge IfThenElse branches


@Task.as_task()
def A() -> {"out1": str}:
    return {"out1": "a"}


@Task.as_task()
def B(out1: str) -> {"out1": str, "out2": str}:
    return {"out1": out1, "out2": "b"}


@Task.as_task()
def C(out1: str) -> {"out1": str, "out2": str}:
    return {"out1": out1, "out2": "c"}

@Task.as_task()
def LastTask(out1: str, out2: str):
    logger.info(f"{out1=} , {out2=}")
    # will print either
    # out1 = a , out2 = b
    # or
    # out1 = a , out2 = c


@RootCommand.subcommand()
class SomeConditionalCommand(Command):
    @staticmethod
    def condition(**kwargs):
        # some condition

    def setup_tasks(self):
        a = A()
        b = B()
        c = C()
        if_t, then_t, else_t = IfElse(self.condition)
        merge_task = MergeTask()
        last_task = LastTask()

        a >> if_t

        then_workflow >> b >> merge_task
        else_workflow >> c >> merge_task

        merge_task >> last_task

Merge manual branches

Merging manual branching works the same, but you have to manually return Skipped() from either one branch or the other.

from outflow.core.types import Skipped