Workflows¶
In Outflow, Workflows and tasks are both what we call Blocks. They both share the concepts of targets (inputs and outputs, parameters are for tasks only), and they both have a run()
method.
Tasks run()
is the function that you decorate (or that you specify in your subclass). For Workflows, the default run()
behaviour is to run the sequence of tasks it contains. MapWorkflows are special workflows, their run()
method is to run its containing task sequence as many times as their input they are mapped on.
Workflow definition¶
There are multiple ways to define workflows :
Using the as_workflow decorator¶
from outflow.core.workflow import as_workflow
@as_workflow
def process():
Extract() | Transform() | Load()
@RootCommand.subcommand()
class MyCommand(Command):
def setup_tasks(self):
process() # call the decorated function to instanciate and register the workflow
You can use the as_workflow decorator with or without parenthesis.
Using the explicit syntax¶
The as_workflow
decorator is syntactic sugar for something you can do manually if you prefer. Here is the same workflow with the explicit syntax.
@RootCommand.subcommand()
class MyCommand(Command):
def setup_tasks(self):
process = Workflow()
process.start() # start registering tasks
# everything declared between these two calls with belong to the workflow "process"
Extract() | Transform() | Load()
process.stop() # stop registering
External task in a workflow¶
If you have a task inside a workflow that needs the output of a task outside this workflow. For example, the Transform task need the result of a big computation that we want to run only once (for example if using a MapWorkflow).
With the decorator syntax :¶
@as_workflow
def process(big_computation):
transform = Transform()
big_computation | transform
Extract() | transform | Load()
@RootCommand.subcommand()
class MyCommand(Command):
def setup_tasks(self):
big_computation = BigComputation()
process(big_computation) # call the decorated function to instanciate and register the workflow
With the explicit syntax :¶
@RootCommand.subcommand()
class MyCommand(Command):
def setup_tasks(self):
big_computation = BigComputation()
process = Workflow()
process.start() # start registering tasks
# everything declared between these two calls with belong to the workflow "process"
transform = Transform()
big_computation | transform
Extract() | transform | Load()
process.stop() # stop registering
Using decorator on special workflow classes¶
If you need special workflow behaviour, you can subclass Workflow and use its method as_workflow
.
For example, a workflow class that will do a coin toss and run half the time :
class RandomWorkflow(Workflow):
def _run(self, **inputs):
import random
if random.choice([True, False]):
logger.info("Heads, I run")
return super()._run(**inputs)
else:
logger.info("Tails, I sleep")
return {output: None for output in self.outputs} # return None for all output targets
# create a random workflow
@RandomWorkflow.as_workflow(cache=True, output_name="computation_result")
def compute_workflow():
Extract() | Transform() | Load()