=========================================================== Create a pipeline with Outflow, part 5: Models and database =========================================================== In this chapter, we will learn how to use the database by creating models for our plugin, generating migration, and insert and query the database using our model. Database creation ================= The generated config.yml file already contains the information to a local sqlite database called :code:`outflow.db` that will live in the root directory of the pipeline. Outflow will use this database to store the tasks and their executions. Our plugin models will also create tables in this database. Models ====== Outflow models are sqlalchemy models integrated with the framework. You may already know the concept of models from Django or Flask. If not, the basic concept and use case will be explained here. See the `sqlalchemy documentation `_ for more information. What is a model --------------- Models are python classes that represent your database layout. Either for querying, or even to create it (using migrations). Each model represent a database table, its constraints and relationships. Let's uncomment the example model in *model.py* in the directory models of our plugin : .. code-block:: python from sqlalchemy import Column, INTEGER from outflow.core.db import Model class MyTable(Model): id_my_table = Column(INTEGER(), primary_key=True) my_column = Column(INTEGER(), nullable=False, unique=True) As you can see, there is not much difference with a standard sqlalchemy model. - The baseclass for all your models must be :code:`outflow.core.db.Model class`. - The table name in the database is defaults to the class name in snake case (i.e. "MyTable" -> "my_table"). To override the table name, set the __tablename__ class attribute. Each model has a number of class variables, each of these model *fields* represent a database table *column*. Each field is represented by an instance of a sqlalchemy.Column class – e.g., This tells sqlalchemy what type of data each field holds. The name of each Column instance (e.g. my_column) is the field’s name, in machine-friendly format. You’ll use this value in your Python code, and your database will use it as the column name. Writing a model for our plugin ------------------------------ In our data_reduction plugin, we have a task that makes some computation on the generated data and a command line argument. Let's assume this computation is very big and takes time. We could use the database to cache the result of the computation, so that a pipeline execution with the same input data won't need to compute the result again. To achieve this, let's modify the example model to match our needs : .. code-block:: python from sqlalchemy import Column, INTEGER, UniqueConstraint from outflow.core.db import Model class ComputationResult(Model): id_computation_result = Column(INTEGER(), primary_key=True) input_value = Column(INTEGER(), nullable=False) multiplier = Column(INTEGER(), nullable=False) result = Column(INTEGER(), nullable=False) __table_args__ = (UniqueConstraint("input_value", "multiplier"),) If you wish, rename the file *model.py* into *computation_result.py* Create the database layout from models -------------------------------------- We now have a model that could record the computation result. However, our database is still empty: no data of course but also not even tables. The next step is to generate migrations for our plugin. Migrations ? ^^^^^^^^^^^^ Migrations are python scripts that execute SQL queries that modify the database layout. They are very useful to keep your data intact when you want to modify your database layout. Migrations should be added to your versioning system. Outflow integrates `alembic `_ to automatically generate migrations of your plugins. Alembic compares the layout of the database with the models, and will generate the migration that will modify the database to match the current models. Generate the migration ^^^^^^^^^^^^^^^^^^^^^^ Before generating new migrations, you have to apply the initial Outflow migrations that creates tables needed for the internals of the framework: .. code-block:: python manage.py management db upgrade heads To generate the migration that creates our table computation_result, type the following command : .. code-block:: python manage.py management db make_migrations --plugin tuto.data_reduction --message="Add table 'computation_result'" This will create a new file in the directory *models/versions/default* of your plugins named something like *ddaf105ff7b5_add_table_computation_result.py*. Let's open it and take a look: .. code-block:: python """Add table 'computation_result' Revision ID: a0d8cd78f167 Revises: Create Date: 2020-11-18 16:21:37.862087 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = "a0d8cd78f167" down_revision = None branch_labels = ("tuto.data_reduction",) depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.create_table( "computation_result", sa.Column("id_computation_result", sa.INTEGER(), nullable=False), sa.Column("input_value", sa.INTEGER(), nullable=False), sa.Column("multiplier", sa.INTEGER(), nullable=False), sa.Column("result", sa.INTEGER(), nullable=False), sa.PrimaryKeyConstraint("id_computation_result"), sa.UniqueConstraint("input_value", "multiplier"), ) op.grant_permissions("computation_result") # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_table("computation_result") # ### end Alembic commands ### You can see that alembic generated a python script with two functions: upgrade and downgrade. As you probably guessed, they correspond to the modifications needed to go to one revision to the other (either up or down). Alembic's name for migration is *revision*. Alembic is an amazing tool but it is not perfect. There are some changes that it cannot detect, because it is not always possible to guess what the user wants from two different models (for example it cannot distinguish between renaming a table or deleting it and creating an new one). For this reason, you should always check the migrations generated by alembic and edit it if needed. Apply the migration ^^^^^^^^^^^^^^^^^^^ To apply the generated migration, call the following command : .. code-block:: $ python manage.py management db upgrade heads *heads* is a shortcut to the last migration of the database. A word about the pipeline context ================================= To access the database, you will need to use the pipeline context. The pipeline context is an object that contains: + the settings and configuration of your pipeline (see `settings `_ and `configuration `_) + the command line arguments + a reference to the database session You can *import* the pipeline context anywhere, but you can only *access* it within the pipeline scope : i.e. inside the code of your tasks and inside the setup_tasks() method of the command. To access the database inside the task, we will use the :code:`context.session` object. Access our database table ========================= Before trying to access our models, we need to edit our command. You may have noticed the argument `db_untracked=True` in the command definition. This arguments allows to run commands without any databases. Now that we have set up a database, go to `commands.py` and remove this argument: .. code-block:: python # remove the argument ↓ @RootCommand.subcommand() class ComputeData(Command): def setup_tasks(self): ... Now that our database is in sync with our model, we can start inserting and querying the newly created table. We will edit the task :code:`Compute` to check if the computation was already done before, and if not do it and insert it in the database. As you may remember from the last chapter, the database session is in the pipeline context : .. code-block:: python import time from outflow.core.logging import logger from outflow.core.tasks import Task from outflow.core.pipeline import context from sqlalchemy.orm.exc import NoResultFound from .models.computation_result import ComputationResult @as_task def Compute(raw_data: int) -> {"computation_result": int}: # get the session of the default database session = context.session multiplier = context.args.multiplier # Check if the result of the computation is already in the database try: # query the database with our model ComputationResult computation_result_obj = session.query(ComputationResult) \ .filter_by(input_value=raw_data, multiplier=multiplier).one() logger.info("Result found in database") # get the result from the model object (ie from the row in the table) computation_result = computation_result_obj.result except NoResultFound: # Result not in the database: compute the value like before logger.info("Result not found in database, computing result and inserting") # simulate a big computation time.sleep(3) computation_result = raw_data * multiplier # create an object ComputationResult computation_result_obj = ComputationResult( input_value=raw_data, multiplier=multiplier, result=computation_result ) # and insert it in the database session.add(computation_result_obj) session.commit() # return the result for the next task return {"computation_result": computation_result} Now, run our command :code:`python manage.py compute_data --multiplier 2` multiple times and you will see that the second time and afterward, the execution will be much faster since we already have the result cached in the database. This is the end of part 3 of the tutorial. In this chapter, you have learned how to configure access to the database, create models and migrations, and finally how to query the database using your models. The next chapter is about the most exciting feature of outflow: parallelization of workflows on multiple cores, and on multiple cluster nodes!