Workflows

In Outflow, Workflows and tasks are both what we call Blocks. They both share the concepts of targets (inputs and outputs, parameters are for tasks only), and they both have a run() method.

Tasks run() is the function that you decorate (or that you specify in your subclass). For Workflows, the default run() behaviour is to run the sequence of tasks it contains. MapWorkflows are special workflows, their run() method is to run its containing task sequence as many times as their input they are mapped on.

Workflow definition

There are multiple ways to define workflows :

Using the as_workflow decorator

from outflow.core.workflow import as_workflow

@as_workflow
def process():
    Extract() | Transform() | Load()


@RootCommand.subcommand()
class MyCommand(Command):
    def setup_tasks(self):
        process() # call the decorated function to instanciate and register the workflow

You can use the as_workflow decorator with or without parenthesis.

Using the explicit syntax

The as_workflow decorator is syntactic sugar for something you can do manually if you prefer. Here is the same workflow with the explicit syntax.


@RootCommand.subcommand()
class MyCommand(Command):
    def setup_tasks(self):
        process = Workflow()
        process.start() # start registering tasks
        # everything declared between these two calls with belong to the workflow "process"
        Extract() | Transform() | Load()
        process.stop() # stop registering

External task in a workflow

If you have a task inside a workflow that needs the output of a task outside this workflow. For example, the Transform task need the result of a big computation that we want to run only once (for example if using a MapWorkflow).

With the decorator syntax :


@as_workflow
def process(big_computation):
    transform = Transform()

    big_computation | transform
    Extract() | transform | Load()


@RootCommand.subcommand()
class MyCommand(Command):
    def setup_tasks(self):
        big_computation = BigComputation()
        process(big_computation) # call the decorated function to instanciate and register the workflow

With the explicit syntax :


@RootCommand.subcommand()
class MyCommand(Command):
    def setup_tasks(self):
        big_computation = BigComputation()

        process = Workflow()
        process.start() # start registering tasks
        # everything declared between these two calls with belong to the workflow "process"
        transform = Transform()
        big_computation | transform
        Extract() | transform | Load()
        process.stop() # stop registering

Using decorator on special workflow classes

If you need special workflow behaviour, you can subclass Workflow and use its method as_workflow.

For example, a workflow class that will do a coin toss and run half the time :


class RandomWorkflow(Workflow):
    def _run(self, **inputs):
        import random

        if random.choice([True, False]):
            logger.info("Heads, I run")
            return super()._run(**inputs)
        else:
            logger.info("Tails, I sleep")
            return {output: None for output in self.outputs} # return None for all output targets


# create a random workflow
@RandomWorkflow.as_workflow(cache=True, output_name="computation_result")
def compute_workflow():
    Extract() | Transform() | Load()