Create a pipeline with Outflow, part 3: Workflow and caching¶
In the previous chapter, we used the database as a manual caching mechanism. Outflow supports automatic output caching using the Workflow object.
Until now, we talked about workflows as being a sequence of task. However Outflow has a Workflow class that has a bunch of useful additional features compared to a simple sequence of task. Mainly, Workflows helps reusing common sequences of tasks in multiple commands. Workflows also supports caching their outputs, so that it is not ran if the inputs and tasks parameters did not change.
Now let’s turn the task Compute into a workflow object :
from outflow.core.workflow import as_workflow from .tasks import GenData, PrintData, Compute # create a workflow @as_workflow def process_data(): Transform() | Compute() @RootCommand.subcommand(db_untracked=True) class ComputeData(Command): def setup_tasks(self): # setup the main workflow by using the workflow defined outside command GenData() | process_data() | PrintData()
Workflows behave like tasks, in the way that they have input and output targets, and can be pipe to tasks or other workflows. The input targets of a workflow are automatically copied from the first task of the workflow, and the output copied from the last task of the workflow.
Set up like this, the command should behave exactly as before (try it). One difference is that now you can reuse
process_data workflow in multiple commands if needed.
Workflows have a built in cache mechanism that caches the outputs of the workflows, and avoid running it again if it detects that it is called with exactly the same input values, and the task inside this workflow has the same Parameter values.
Workflow cached outputs are store on the disk. By default, the location is
settings.TEMP_DIR / "workflow_cache".
You can edit this location either in the config.yml with the
workflow_cache_dir key, or edit the TEMP_DIR value in your settings.py file.
To activate workflow caching, you only have to add
cache=True in the Workflow decorator :
@as_workflow(cache=True) def process_data(): Transform() | Compute()
Now if you run the command twice in a row, you should see by looking a the logs that only the PrintData task is executed the second time.
To check that the cache system is working as intended, you can try to either change the return value of task GenData, or change the value of the parameter
multiplier of task Compute. In both cases the workflow should detect that an input changed and execute again.
If you want to explicitly ignore cached results, you can use the –overwrite-cache argument in your cli. Outflow will not try to detect changes in your code, so you will have to use this flag if you change your code without changing anything else.