Create a pipeline with Outflow, part 5: Testing

We’ve built a basic pipeline, and we’ll now create some automated tests for it.

Introducing automated testing

What are automated tests?

Tests are routines that check the operation of your code.

Testing operates at different levels. Some tests might apply to a tiny detail (does a particular task method return values as expected?) while others examine the overall operation of the software (does a workflow (a sequence of task) produce the desired result?). That’s no different from the kind of testing you did earlier in Models and database section, using the shell command to examine the behavior of a task in the pipeline context.

What’s different in automated tests is that the testing work is done for you by the system. You create a set of tests once, and then as you make changes to your plugin, you can check that your code still works as you originally intended, without having to perform time consuming manual testing.

Why you need to create tests

So why create tests, and why now?

Tests will save you time

Up to a certain point, checking that it seems to work will be a satisfactory test. In a more sophisticated plugin, you might have dozens of complex interactions between components.

A change in any of those components could have unexpected consequences on the plugin’s behavior. Checking that it still seems to work could mean running through your code’s functionality with twenty different variations of your test data to make sure you haven’t broken something - not a good use of your time.

That’s especially true when automated tests could do this for you in seconds. If something’s gone wrong, tests will also assist in identifying the code that’s causing the unexpected behavior.

Sometimes it may seem a chore to tear yourself away from your productive, creative programming work to face the unglamorous and unexciting business of writing tests, particularly when you know your code is working properly.

However, the task of writing tests is a lot more fulfilling than spending hours testing your application manually or trying to identify the cause of a newly-introduced problem.

Tests don’t just identify problems, they prevent them

It’s a mistake to think of tests merely as a negative aspect of development.

Without tests, the purpose or intended behavior of an application might be rather opaque. Even when it’s your own code, you will sometimes find yourself poking around in it trying to find out what exactly it’s doing.

Tests change that; they light up your code from the inside, and when something goes wrong, they focus light on the part that has gone wrong - even if you hadn’t even realized it had gone wrong.

Tests make your code more attractive

You might have created a brilliant piece of software, but you will find that many other developers will refuse to look at it because it lacks tests; without tests, they won’t trust it.

That other developers want to see tests in your software before they take it seriously is yet another reason for you to start writing tests.

Tests help teams work together

The previous points are written from the point of view of a single developer maintaining an application. Complex applications will be maintained by teams. Tests guarantee that colleagues don’t inadvertently break your code (and that you don’t break theirs without knowing).

Basic testing strategies

There are many ways to approach writing tests.

Some programmers follow a discipline called test-driven development; they actually write their tests before they write their code. This might seem counter-intuitive, but in fact it’s similar to what most people will often do anyway: they describe a problem, then create some code to solve it. Test-driven development formalizes the problem in a Python test case.

More often, a newcomer to testing will create some code and later decide that it should have some tests. Perhaps it would have been better to write some tests earlier, but it’s never too late to get started.

Sometimes it’s difficult to figure out where to get started with writing tests. If you have written several thousand lines of Python, choosing something to test might not be easy. In such a case, it’s fruitful to write your first test the next time you make a change, either when you add a new feature or fix a bug.

So let’s do that right away.

Tutorial code modifications

For this part of the tutorial, we have modified the code in order to be able to test parallelized and non-parallelize code.

Tasks written at the end of Part 3 are renamed into ComputeOneData, GenOneData and PrintOneData. Functions at the end of part 4 are ComputeMoreData, GenMoreData and PrintMoredata.

Commands are also renamed in to compute_one_data and compute_more_data :

$ python manage.py compute_one_data --multiplier 3
tuto.data_reduction.tasks - tasks.py:91 - INFO - Result of the computation: 126
$ python manage.py compute_more_data --multiplier 3
tuto.data_reduction.tasks - tasks.py:45 - INFO - Result found in database
tuto.data_reduction.tasks - tasks.py:55 - INFO - Result not found in database, computing result and inserting
tuto.data_reduction.tasks - tasks.py:55 - INFO - Result not found in database, computing result and inserting
tuto.data_reduction.tasks - tasks.py:55 - INFO - Result not found in database, computing result and inserting
tuto.data_reduction.tasks - tasks.py:96 - INFO - Result of the mapped computation: [[{'computation_result': 126}], [{'computation_result': 129}], [{'computation_result': 132}], [{'computation_result': 135}]]

This way, we will learn how to test tasks and commands, with and without parallelized workflow.

Pytest add-ons and configuration

pytest-cov

This will display which percentage of your code your tests cover. The higher it is, the better it is. This cannot be the only indicator to take into account : you can make tests that will execute every line of code, but if you do not make wise assert, it will be useless. But this can highlight that some parts have been forgotten into you tests. pip install pytest-cov if needed.

The configuration file is .coveragerc. You should create this file with

[run]
omit = plugins/data_reduction/tuto/data_reduction/tests/*

This will exclude tests from the coverage (I wonder why it is not the case by default).

Output will be written in a directory named htmlcov.

pytest-postgresql

This package will provide fixtures needed to perform tests with a real running Postgresql database. This will be used for command tests. It should be installed with outflow.

pytest-sugar

Pretty-printing for the tests output. It is not mandatory but I like it 😉.

pytest.ini

[pytest]
# This will set the log level to display. Here INFO
log_cli = True
log_cli_level = INFO
log_level = INFO
# This will be used by pytest-postgresql to connect to the database
postgresql_user = flo
postgresql_password = xxx
postgresql_host = localhost

Testing simple tasks

We identify a bug

GenOneData and GenMoreData are written to return respectively 42 and [42, 43, 44, 45]. Suppose that was erroneous. They should return 40 and [40, 50, 60, 70].

Create a test to expose the bug

Conventional places for an application’s tests is in the plugin’s tests.py file or in the tests/ directory; the testing system will automatically find tests in any file whose name begins with test.

Put the following in the tests/test_1_gen.py file in the data_reduction application:

from outflow.core.test import TaskTestCase

class TestDataReductionGenTasks(TaskTestCase):

    def test_gen_one(self):
        from tuto.data_reduction.tasks import GenOneData

        # --- initialize the task ---
        self.task = GenOneData()
        self.config = {}

        # --- run the task ---
        result = self.run_task()

        # --- make assertions ---
        # test the result
        assert isinstance(result, dict)
        assert 'some_data' in result
        assert result == {'some_data': 40}

This will test that:

  • the result is a dictionary

  • it has the key some_data

  • the value of some_data is 40

Here we have created a outflow.core.test.TaskTestCase subclass. The task is run without arguments.

Running tests

In the terminal, we can run our test in the pipeline root directory using:

$ pytest

and you’ll see something like:

$ pytest
collecting ...

_____ TestDataReductionGenTasks.test_gen_one _____
plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py:17: in test_gen_one
    assert result == {'some_data': 40}
E   AssertionError: assert {'some_data': 42} == {'some_data': 40}
E     Differing items:
E     {'some_data': 42} != {'some_data': 40}
E     Full diff:
E     - {'some_data': 40}
E     ?                ^
E     + {'some_data': 42}
E     ?                ^

 plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py::TestDataReductionGenTasks.test_gen_one ⨯   100% ██████████
===== short test summary info =====
FAILED plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py::TestDataReductionGenTasks::test_gen_one -
AssertionError: assert {'some_data': 42} == {'some_data': 40}

Results (0.78s):
       1 failed
         - plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py:5 TestDataReductionGenTasks.test_gen_one

What happened is this:

  • pytest looked for tests in the data_reduction plugin

  • it found a class whose name starts with “Test”

  • it looked for test methods - ones whose names begin with test

  • just before executing test_gen_one, TaskTestCase creates a pipeline context

  • … and using the assert statement, it checked the tested feature

The test informs us which test failed and even the line on which the failure occurred.

You can make as many assert in you test as you want. If one fails, all the test will be reported as failed, and the following assertions inside this test will not be evaluated.

Fixing the bug

Replace 42 by 40 in the tasks.py file. Run the test again :

$ pytest --tb=short plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py::TestDataReductionGenTasks::test_gen_one
collecting ...
 plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py::TestDataReductionGenTasks.test_gen_one   100% ██████████

Results (0.66s):
       1 passed

Repeat for GenMoreData

Do the same for testing GenMoreData : create a test test_gen_more that will raise that GenMoreData does not return the expected values.

$ pytest
collecting ...
 plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py::TestDataReductionGenTasks.test_gen_one ✓     50% █████
 plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py::TestDataReductionGenTasks.test_gen_more ✓   100% ██████████

Results (0.65s):
       2 passed

Testing complex tasks

The tasks ComputeOneData and ComputeMoreData are complex tasks because they involve a python code and an interaction with a database. As we do not want to test the database connection, we will mock the database responses.

Testing ComputeOneData

Create a new file named tests/test_2_compute_one.py. The numbering is not mandatory but tests are executed in the alphabetic order of the file names. I like to test things in a logical order : it allows to break tests early if one fails and if I know that the other ones will fail in a same manner.

The complete code will be shown at the end of the section

We will mock the session linked to the database with a test decorator :

@mock.patch('outflow.core.db.database.Databases.session', new_callable=mock.PropertyMock)
def test_compute_one_already_in_db(self, mock_session):
       (...)

Every request to the database will return silently as if everything is ok.

Testing when result is already in database

To simulate the query (in tasks.py):

computation_result_obj = session.query(ComputationResult) \
   .filter_by(input_value=some_data, multiplier=multiplier).one()

we will declare the ComputationResult that we are waiting for:

db_result = ComputationResult(
   input_value = data,
   multiplier = mult,
   result = data * mult
)

and tell that this will be the returned value of session.query.filter_by.one:

mock_session\
   .return_value.query\
   .return_value.filter_by\
   .return_value.one.return_value = db_result

As the task is running outside the command, some context has to be defined :

from outflow.core.pipeline import context, config

context.force_dry_run = False
context.db_untracked = False
context._models = []
config["databases"] = mock.MagicMock()
context.args = Namespace(multiplier = mult, dry_run = False, db_untracked = False)

Like before, setup and run the task :

self.task = ComputeOne()
result = self.run_task(some_data = data)

We can then make the needed assertions:

assert isinstance(result, dict)
assert 'computation_result' in result
assert result == {'computation_result': data * mult}

As the query to the database returned a ComputationResult this means that the result was already in database. Then no insertion should have been made. To ensure this was the case, we can examine the call traceback:

# filtering "add" calls with a ComputationResult object as parameter
call_add_computation_result = [
    call.args[0]
    for call in mock_session.return_value.add.call_args_list
    if isinstance(call.args[0], ComputationResult)]

call_args_list returns all the add calls that have been made to the session. We filter the results because outflow uses also the session to log each job into the tables public.task. We are only interested in the add into the table computation_result.

For this test, no ComputationResult should be added in database. Then :

assert len(call_add_computation_result) == 0

Testing when the result is not in database

In this case, the query should not return a ComputationResult object but instead raise a NoResultFound exception :

from sqlalchemy.orm.exc import NoResultFound
mock_session\
    .return_value.query\
    .return_value.filter_by\
    .return_value.one.side_effect = NoResultFound

Note that we use the method side_effect instead of return_value for a function.

The same assertions can be made for this test, but for the len(call_add_computation_result) the expected value will be different as the ComputationResult should have been inserted in database :

assert len(call_add_computation_result) == 1

At this stage, pytest should output :

$ pytest
collecting ...
 plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py ✓✓            50% █████
 plugins/data_reduction/tuto/data_reduction/tests/test_2_compute_one.py ✓✓   100% ██████████

Results (4.30s):
       4 passed

The output may differ depending on your pytest.ini (here log_cli = False).

Complete code

import pytest
from unittest import mock
from random import sample, randrange
from argparse import Namespace

from outflow.core.test import TaskTestCase
from outflow.core.pipeline import context, config

from tuto.data_reduction.tasks import ComputeOne
from tuto.data_reduction.models.computation_result import ComputationResult

class TestDataReductionComputeTasks(TaskTestCase):

    @pytest.fixture(autouse=True)
    def setup_context(self, with_pipeline_context_manager):
        context.force_dry_run = False
        context.db_untracked = False
        context._models = []
        config["databases"] = mock.MagicMock()

    @mock.patch('outflow.core.db.database.Databases.session', new_callable=mock.PropertyMock)
    def test_compute_one_already_in_db(self, mock_session):

        # --- define test data ---
        data = randrange(100)
        mult = randrange(10)
        db_result = ComputationResult(
            input_value = data,
            multiplier = mult,
            result = data * mult
        )

        # --- define mock session queries
        mock_session\
            .return_value.query\
            .return_value.filter_by\
            .return_value.one.return_value = db_result

        # -- define args
        context.args = Namespace(multiplier = mult, dry_run = False, db_untracked = False)
        self.task = ComputeOne()

        # --- run the task ---
        result = self.run_task(some_data = data)

        # filtering "add" calls with a ComputationResult object as parameter
        call_add_computation_result = [
            call.args[0]
            for call in mock_session.return_value.add.call_args_list
            if isinstance(call.args[0], ComputationResult)]

        # --- make assertions ---
        assert isinstance(result, dict)
        assert 'computation_result' in result
        assert result == {'computation_result': data * mult}
        assert len(call_add_computation_result) == 0

    # @mock.patch('tuto.data_reduction.tasks.context')
    @mock.patch('outflow.core.db.database.Databases.session', new_callable=mock.PropertyMock)
    def test_compute_one_not_in_db(self, mock_session):

        # --- initialize the task ---
        from sqlalchemy.orm.exc import NoResultFound

        # --- define test data ---
        data = randrange(100)
        mult = randrange(10)
        db_result = ComputationResult(
            input_value = data,
            multiplier = mult,
            result = data * mult
        )

        # --- define mock session queries
        mock_session\
            .return_value.query\
            .return_value.filter_by\
            .return_value.one.side_effect = NoResultFound

        # -- define args
        context.args = Namespace(multiplier = mult, dry_run = False, db_untracked = False)
        self.task = ComputeOne()

        # --- run the task ---
        result = self.run_task(some_data = data)

        # filtering "add" calls with a ComputationResult object as parameter
        call_add_computation_result = [
            call.args[0]
            for call in mock_session.return_value.add.call_args_list
            if isinstance(call.args[0], ComputationResult)]

        # --- make assertions ---
        assert isinstance(result, dict)
        assert 'computation_result' in result
        assert result == {'computation_result': data * mult}
        assert len(call_add_computation_result) == 1

Testing ComputeMoreData

Testing ComputeMoreData will be similar to ComputeOneData. We have to tell to the mocked session the array of values that will be returned by session.query.filter_by.one.

Testing when all the data are already in database

Define the values to be returned :

db_result_list = [ComputationResult(
   input_value = data_array[i],
   multiplier = mult,
   result = data_array[i] * mult
) for i in range(nb_data) ]

where data_array is the array that will be given to the task, and nb_data the length of this array.

Give them to the mocked session :

mock_session\
   .return_value.query\
   .return_value.filter_by\
   .return_value.one.side_effect = db_result_list

For testing the values returned by the task, we cannot make a simple comparison because the different tasks distributed on different CPUs may not return in the same order.

We have then to test:

  • the length of the result array is nb_data

  • each expected returned value is in result

assert len(result['map_computation_result']) == nb_data

for i in range(nb_data):
   data = data_array[i]
   assert [{'computation_result': data * mult}] in result['map_computation_result']

Finally, don’t forget to test that no add calls were made to insert ComputationResult in database :

assert len(call_add_computation_result) == 0

Testing when some data are not in database

For this case, just replace one ComputationResult by a NoResultFound. For example :

from random import randrange
item_not_in_db = randrange(nb_data)
db_result_list[i] = NoResultFound

And in this case, there should be 1 add call to insert ComputationResult in database :

assert len(call_add_computation_result) == 1

Finally, pytest will return:

$ pytest
collecting ...
 plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py ✓✓             33% ███▍
 plugins/data_reduction/tuto/data_reduction/tests/test_2_compute_one.py ✓✓     67% ██████▋
 plugins/data_reduction/tuto/data_reduction/tests/test_3_compute_more.py ✓✓   100% ██████████

Results (12.72s):
       6 passed

Complete code

import pytest
from random import sample, randrange
from argparse import Namespace
from unittest import mock

from outflow.core.test import TaskTestCase
from outflow.library.tasks import MapTask

from outflow.core.pipeline import context, config

from tuto.data_reduction.models.computation_result import ComputationResult
from tuto.data_reduction.tasks import ComputeMore

class TestDataReductionComputeMoreTasks(TaskTestCase):

    @pytest.fixture(autouse=True)
    def setup_context(self, with_pipeline_context_manager):
        context.force_dry_run = False
        context.db_untracked = False
        context._models = []
        config["databases"] = mock.MagicMock()
        config["backend"] = "ray"
        config["ray"] = {"cluster_type": "local"}

    @mock.patch('outflow.core.db.database.Databases.session', new_callable=mock.PropertyMock)
    def test_compute_more(self, mock_session):
        """
        Testing when all data are already in database
        Request to DB returns a ComputationResult
        """

        # --- define test data ---
        nb_data = randrange(1,5)
        data_array = sample(range(100), k=nb_data)
        mult = randrange(10)

        # --- define the data returned by the request to the DB ---
        db_result_list = [ComputationResult(
            input_value = data_array[i],
            multiplier = mult,
            result = data_array[i] * mult
        ) for i in range(nb_data) ]

        # --- define mock session queries
        mock_session\
            .return_value.query\
            .return_value.filter_by\
            .return_value.one.side_effect = db_result_list

        # -- define args
        context.args = Namespace(multiplier = mult, dry_run = False, db_untracked = False)

        # --- initialize the task ---
        mapped_computation = MapTask(ComputeMore(), output_name="map_computation_result")
        self.task = mapped_computation

        # --- run the task ---
        result = self.run_task(some_data_array = data_array)

        # --- make assertions ---

        # filtering "add" calls with a ComputationResult object as parameter
        call_add_computation_result = [
            call.args[0]
            for call in mock_session.return_value.add.call_args_list
            if isinstance(call.args[0], ComputationResult)]

        # all results are already be in base, since we mock a result in base
        assert len(call_add_computation_result) == 0

        # check the type of result
        assert isinstance(result, dict)
        assert 'map_computation_result' in result

        # we should get as many results as input data
        assert len(result['map_computation_result']) == nb_data

        # verifying that every expected result is returned
        for i in range(nb_data):
            data = data_array[i]
            assert [{'computation_result': data * mult}] in result['map_computation_result']


    @mock.patch('outflow.core.db.database.Databases.session', new_callable=mock.PropertyMock)
    def test_compute_more_with_results_not_in_base(self, mock_session):
        """
        Testing when some data are not already in database
        Request to DB returns a ComputationResult or a NoResultFound exception
        """

        # --- define test data ---
        nb_data = randrange(2,5)
        nb_data_not_in_db = randrange(1,nb_data)
        data_not_in_db = sample(range(nb_data), k=nb_data_not_in_db)
        data_array = sample(range(100), k=nb_data)
        mult = randrange(10)
        db_result_list = []

        # --- define the data returned by the request to the DB ---
        from sqlalchemy.orm.exc import NoResultFound
        for i in range(nb_data):
            if i in data_not_in_db:
                db_result_list.append(NoResultFound)
            else:
                db_result_list.append(ComputationResult(
                    input_value = data_array[i],
                    multiplier = mult,
                    result = data_array[i] * mult
                ))

        # --- define mock session queries
        mock_session\
            .return_value.query\
            .return_value.filter_by\
            .return_value.one.side_effect = db_result_list

        # -- define args
        context.force_dry_run = False
        context.db_untracked = False
        context._models = []
        config["databases"] = mock.MagicMock()
        config["backend"] = "ray"
        context.args = Namespace(multiplier = mult, dry_run = False, db_untracked = False)

        # --- initialize the task ---
        mapped_computation = MapTask(ComputeMore(), output_name="map_computation_result")
        self.task = mapped_computation

        # --- run the task ---
        result = self.run_task(some_data_array = data_array)

        # --- make assertions ---

        # filtering "add" calls with a ComputationResult object as parameter
        call_add_computation_result = [
            call.args[0]
            for call in mock_session.return_value.add.call_args_list
            if isinstance(call.args[0], ComputationResult)]

        # nb_data_not_in_db should be added since they were not found
        assert len(call_add_computation_result) == nb_data_not_in_db

        # check the type of result
        assert isinstance(result, dict)
        assert 'map_computation_result' in result

        # we should get as many results as input data
        assert len(result['map_computation_result']) == nb_data

        # verifying that every expected result is returned
        for i in range(nb_data):
            data = data_array[i]
            assert [{'computation_result': data * mult}] in result['map_computation_result']

Testing commands

The aim of testing command is to verify that tasks are executed one after the other as defined. The cases tested at the tasks level do not need to be tested again.

Regular commands

In the first part of the tutorial, we defined a command data_reduction which only prints a “Hello world”. There is no many things to test for this command, but it will give the skeleton for testing a command :

class TestDataReductionNoCmd(CommandTestCase):
    def test_data_reduction(self):

        # --- initialize the command ---
        from tuto.data_reduction.commands import DataReduction
        self.root_command_class = DataReduction
        arg_list = []

        # --- run the command ---
        return_code, result = self.run_command(arg_list)

        # --- make assertions ---
        assert return_code == 0
        assert result == [{"None": None}]

Commands involving an interaction with a Postgresql database

Outflow comes with a PostgresCommandTestCase class that will setup a fresh database for us before each test and that will drop it after use.

Create a new file test test_4_commands.py.

from outflow.core.test.test_cases import (PostgresCommandTestCase,
                                          postgresql_fixture)

class TestDataReductionCmd(PostgresCommandTestCase):

    PLUGINS = ['outflow.management', 'tuto.data_reduction']

In order to be able to use the database, migrations has to be applied before each test (since a new database is created each time).

We will use a fixture to do this automatically (that will prevent duplicated code).

    # Automatically run a db upgrade heads before each test
    @pytest.fixture(autouse=True)
    def setup_db_upgrade(self, with_pipeline_context_manager):
        # -- commands to be executed
        db_upgrade = ['management', 'db', 'upgrade', 'heads', '-ll', 'INFO']
        self.run_command(db_upgrade, force_dry_run = False)

Testing the migration

# --- test if the upgrade is ok
def test_db_upgrade(self):
   with self.pipeline_db_session() as session:
      # The table computation_result has be to created
      try:
         c = session.query(ComputationResult).count()
         # and it should be empty
         assert c == 0
      except Exception as e:
         assert False, e

To verify that this test is relevant, comment the line self.run_command(db_upgrade) in the setup_db_upgrade() function.

pytest will then fail with :

FAILED plugins/data_reduction/tuto/data_reduction/tests/test_4_commands.py::TestDataReductionCmd::test_db_upgrade -
AssertionError: ProgrammingError('(psycopg2.errors.UndefinedTable) relation "computation_result" does not exist

This way, you are sure that if the migration fails for any reason, you will be notified. Do not forget to comment out the line self.run_command(db_upgrade) afterwards.

Testing compute_one_data

Testing this command is easy. Just define the command:

command = [
    'compute_one_data',
    '--multiplier',
    f'{multiplier}',
    '-ll',
    'INFO',
]

Run the command:

return_code, result = self.run_command(command, force_dry_run = False)

And test we get the expected results:

assert return_code == 0
assert result[0]['computation_result'] == 40 * multiplier

Testing compute_more_data

For this command, the Ray (TODO change to parallel backend) backend has to be activated. Before running the command, configuration has to be updated :

custom_config = {
    "backend": "ray",
    "ray": {"cluster_type": "local"}
}
config.update(custom_config)

The other parts of the tests remain the same : assert the return_code and that each expected value is present in the result array :

assert return_code == 0
for i in range(40, 71, 10):
   res = [{'computation_result': i * multiplier}]
   assert res in result[0]['map_computation_result']

Finally, pytest should return:

$ pytest
collecting ...
 plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py ✓✓            20% ██
 plugins/data_reduction/tuto/data_reduction/tests/test_2_compute_one.py ✓✓    40% ████
 plugins/data_reduction/tuto/data_reduction/tests/test_3_compute_more.py ✓✓   60% ██████
 plugins/data_reduction/tuto/data_reduction/tests/test_4_commands.py ✓✓✓✓    100% ██████████

Results (22.31s):
      10 passed

Congratulations !

Test coverage

In addition you can check the coverage of your tests running pytest --cov=tuto.data_reduction --cov-report html. This will generate an HTML report in the htmlcov/ directory.

With this tutorial, you should be able to reach 100% of coverage.

| Module | statements | missing | excluded | coverage | |————————————————————————-|————|———|———-|———-| | Total | 96 | 0 | 0 | 100% | | plugins/data_reduction/tuto/data_reduction/__init__.py | 0 | 0 | 0 | 100% | | plugins/data_reduction/tuto/data_reduction/commands.py | 30 | 0 | 0 | 100% | | plugins/data_reduction/tuto/data_reduction/models/__init__.py | 7 | 0 | 0 | 100% | | plugins/data_reduction/tuto/data_reduction/models/computation_result.py | 8 | 0 | 0 | 100% | | plugins/data_reduction/tuto/data_reduction/tasks.py | 51 | 0 | 0 | 100% |