Create a pipeline with Outflow, part 3: Workflow and caching

In the previous chapter, we used the database as a manual caching mechanism. Outflow supports automatic output caching using the Workflow object.

Until now, we talked about workflows as being a sequence of task. However Outflow has a Workflow class that has a bunch of useful additional features compared to a simple sequence of task. Mainly, Workflows helps reusing common sequences of tasks in multiple commands. Workflows also supports caching their outputs, so that it is not ran if the inputs and tasks parameters did not change.

Workflows

Now let’s turn the task Compute into a workflow object :

from outflow.core.workflow import as_workflow
from .tasks import GenData, PrintData, Compute


# create a workflow
@as_workflow
def process_data():
    Transform() | Compute()


@RootCommand.subcommand(db_untracked=True)
class ComputeData(Command):
    def setup_tasks(self):
        # setup the main workflow by using the workflow defined outside command
        GenData() | process_data() | PrintData()

Workflows behave like tasks, in the way that they have input and output targets, and can be pipe to tasks or other workflows. The input targets of a workflow are automatically copied from the first task of the workflow, and the output copied from the last task of the workflow.

Set up like this, the command should behave exactly as before (try it). One difference is that now you can reuse process_data workflow in multiple commands if needed.

Workflow caching

Workflows have a built in cache mechanism that caches the outputs of the workflows, and avoid running it again if it detects that it is called with exactly the same input values, and the task inside this workflow has the same Parameter values.

Workflow cached outputs are store on the disk. By default, the location is settings.TEMP_DIR / "workflow_cache". You can edit this location either in the config.yml with the workflow_cache_dir key, or edit the TEMP_DIR value in your settings.py file.

To activate workflow caching, you only have to add cache=True in the Workflow decorator :

@as_workflow(cache=True)
def process_data():
    Transform() | Compute()

Now if you run the command twice in a row, you should see by looking a the logs that only the PrintData task is executed the second time.

To check that the cache system is working as intended, you can try to either change the return value of task GenData, or change the value of the parameter multiplier of task Compute. In both cases the workflow should detect that an input changed and execute again.

Overwrite cache

If you want to explicitly ignore cached results, you can use the –overwrite-cache argument in your cli. Outflow will not try to detect changes in your code, so you will have to use this flag if you change your code without changing anything else.