Create a pipeline with Outflow, part 5: Models and database

In this chapter, we will learn how to use the database by creating models for our plugin, generating migration, and insert and query the database using our model.

Database creation

The generated config.yml file already contains the information to a local sqlite database called outflow.db that will live in the root directory of the pipeline.

Outflow will use this database to store the tasks and their executions. Our plugin models will also create tables in this database.

Models

Outflow models are sqlalchemy models integrated with the framework. You may already know the concept of models from Django or Flask. If not, the basic concept and use case will be explained here. See the sqlalchemy documentation for more information.

What is a model

Models are python classes that represent your database layout. Either for querying, or even to create it (using migrations). Each model represent a database table, its constraints and relationships. Let’s uncomment the example model in model.py in the directory models of our plugin :

from sqlalchemy import Column, INTEGER
from outflow.core.db import Model


class MyTable(Model):
    id_my_table = Column(INTEGER(), primary_key=True)
    my_column = Column(INTEGER(), nullable=False, unique=True)

As you can see, there is not much difference with a standard sqlalchemy model.

  • The baseclass for all your models must be outflow.core.db.Model class.

  • The table name in the database is defaults to the class name in snake case (i.e. “MyTable” -> “my_table”). To override the table name, set the __tablename__ class attribute.

Each model has a number of class variables, each of these model fields represent a database table column.

Each field is represented by an instance of a sqlalchemy.Column class – e.g., This tells sqlalchemy what type of data each field holds.

The name of each Column instance (e.g. my_column) is the field’s name, in machine-friendly format. You’ll use this value in your Python code, and your database will use it as the column name.

Writing a model for our plugin

In our data_reduction plugin, we have a task that makes some computation on the generated data and a command line argument. Let’s assume this computation is very big and takes time. We could use the database to cache the result of the computation, so that a pipeline execution with the same input data won’t need to compute the result again.

To achieve this, let’s modify the example model to match our needs :

from sqlalchemy import Column, INTEGER, UniqueConstraint
from outflow.core.db import Model

class ComputationResult(Model):
    id_computation_result = Column(INTEGER(), primary_key=True)
    input_value = Column(INTEGER(), nullable=False)
    multiplier = Column(INTEGER(), nullable=False)
    result = Column(INTEGER(), nullable=False)

    __table_args__ = (UniqueConstraint("input_value", "multiplier"),)

If you wish, rename the file model.py into computation_result.py

Create the database layout from models

We now have a model that could record the computation result. However, our database is still empty: no data of course but also not even tables.

The next step is to generate migrations for our plugin.

Migrations ?

Migrations are python scripts that execute SQL queries that modify the database layout. They are very useful to keep your data intact when you want to modify your database layout. Migrations should be added to your versioning system.

Outflow integrates alembic to automatically generate migrations of your plugins. Alembic compares the layout of the database with the models, and will generate the migration that will modify the database to match the current models.

Generate the migration

Before generating new migrations, you have to apply the initial Outflow migrations that creates tables needed for the internals of the framework:

python manage.py management db upgrade heads

To generate the migration that creates our table computation_result, type the following command :

python manage.py management db make_migrations --plugin tuto.data_reduction --message="Add table 'computation_result'"

This will create a new file in the directory models/versions/default of your plugins named something like ddaf105ff7b5_add_table_computation_result.py. Let’s open it and take a look:

"""Add table 'computation_result'

Revision ID: a0d8cd78f167
Revises:
Create Date: 2020-11-18 16:21:37.862087

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "a0d8cd78f167"
down_revision = None
branch_labels = ("tuto.data_reduction",)
depends_on = None


def upgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table(
        "computation_result",
        sa.Column("id_computation_result", sa.INTEGER(), nullable=False),
        sa.Column("input_value", sa.INTEGER(), nullable=False),
        sa.Column("multiplier", sa.INTEGER(), nullable=False),
        sa.Column("result", sa.INTEGER(), nullable=False),
        sa.PrimaryKeyConstraint("id_computation_result"),
        sa.UniqueConstraint("input_value", "multiplier"),
    )
    op.grant_permissions("computation_result")
    # ### end Alembic commands ###


def downgrade():
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_table("computation_result")
    # ### end Alembic commands ###

You can see that alembic generated a python script with two functions: upgrade and downgrade. As you probably guessed, they correspond to the modifications needed to go to one revision to the other (either up or down). Alembic’s name for migration is revision.

Alembic is an amazing tool but it is not perfect. There are some changes that it cannot detect, because it is not always possible to guess what the user wants from two different models (for example it cannot distinguish between renaming a table or deleting it and creating an new one). For this reason, you should always check the migrations generated by alembic and edit it if needed.

Apply the migration

To apply the generated migration, call the following command :

$ python manage.py management db upgrade heads

heads is a shortcut to the last migration of the database.

A word about the pipeline context

To access the database, you will need to use the pipeline context.

The pipeline context is an object that contains:

  • the settings and configuration of your pipeline (see settings and configuration)

  • the command line arguments

  • a reference to the database session

You can import the pipeline context anywhere, but you can only access it within the pipeline scope : i.e. inside the code of your tasks and inside the setup_tasks() method of the command.

To access the database inside the task, we will use the context.session object.

Access our database table

Before trying to access our models, we need to edit our command. You may have noticed the argument db_untracked=True in the command definition. This arguments allows to run commands without any databases. Now that we have set up a database, go to commands.py and remove this argument:

# remove the argument  ↓
@RootCommand.subcommand()
class ComputeData(Command):
    def setup_tasks(self):
        ...

Now that our database is in sync with our model, we can start inserting and querying the newly created table. We will edit the task Compute to check if the computation was already done before, and if not do it and insert it in the database.

As you may remember from the last chapter, the database session is in the pipeline context :

import time

from outflow.core.logging import logger
from outflow.core.tasks import Task
from outflow.core.pipeline import context
from sqlalchemy.orm.exc import NoResultFound

from .models.computation_result import ComputationResult


@as_task
def Compute(raw_data: int) -> {"computation_result": int}:
    # get the session of the default database
    session = context.session

    multiplier = context.args.multiplier

    # Check if the result of the computation is already in the database
    try:
        # query the database with our model ComputationResult
        computation_result_obj = session.query(ComputationResult) \
            .filter_by(input_value=raw_data, multiplier=multiplier).one()

        logger.info("Result found in database")

        # get the result from the model object (ie from the row in the table)
        computation_result = computation_result_obj.result

    except NoResultFound:
        # Result not in the database: compute the value like before
        logger.info("Result not found in database, computing result and inserting")

        # simulate a big computation
        time.sleep(3)
        computation_result = raw_data * multiplier

        # create an object ComputationResult
        computation_result_obj = ComputationResult(
            input_value=raw_data,
            multiplier=multiplier,
            result=computation_result
        )

        # and insert it in the database
        session.add(computation_result_obj)
        session.commit()

    # return the result for the next task
    return {"computation_result": computation_result}

Now, run our command python manage.py compute_data --multiplier 2 multiple times and you will see that the second time and afterward, the execution will be much faster since we already have the result cached in the database.

This is the end of part 3 of the tutorial. In this chapter, you have learned how to configure access to the database, create models and migrations, and finally how to query the database using your models.

The next chapter is about the most exciting feature of outflow: parallelization of workflows on multiple cores, and on multiple cluster nodes!