# Create a pipeline with Outflow, part 5: Testing We've built a basic pipeline, and we'll now create some automated tests for it. ## Introducing automated testing ### What are automated tests? Tests are routines that check the operation of your code. Testing operates at different levels. Some tests might apply to a tiny detail (does a particular task method return values as expected?) while others examine the overall operation of the software (does a workflow (a sequence of task) produce the desired result?). That's no different from the kind of testing you did earlier in [Models and database](part3_models_and_database) section, using the [shell](ShellCommand) command to examine the behavior of a task in the pipeline context. What's different in automated tests is that the testing work is done for you by the system. You create a set of tests once, and then as you make changes to your plugin, you can check that your code still works as you originally intended, without having to perform time consuming manual testing. ### Why you need to create tests So why create tests, and why now? #### Tests will save you time Up to a certain point, _checking that it seems to work_ will be a satisfactory test. In a more sophisticated plugin, you might have dozens of complex interactions between components. A change in any of those components could have unexpected consequences on the plugin's behavior. Checking that it still _seems to work_ could mean running through your code's functionality with twenty different variations of your test data to make sure you haven't broken something - not a good use of your time. That's especially true when automated tests could do this for you in seconds. If something's gone wrong, tests will also assist in identifying the code that's causing the unexpected behavior. Sometimes it may seem a chore to tear yourself away from your productive, creative programming work to face the unglamorous and unexciting business of writing tests, particularly when you know your code is working properly. However, the task of writing tests is a lot more fulfilling than spending hours testing your application manually or trying to identify the cause of a newly-introduced problem. #### Tests don't just identify problems, they prevent them It's a mistake to think of tests merely as a negative aspect of development. Without tests, the purpose or intended behavior of an application might be rather opaque. Even when it's your own code, you will sometimes find yourself poking around in it trying to find out what exactly it's doing. Tests change that; they light up your code from the inside, and when something goes wrong, they focus light on the part that has gone wrong - even if you hadn't even realized it had gone wrong. #### Tests make your code more attractive You might have created a brilliant piece of software, but you will find that many other developers will refuse to look at it because it lacks tests; without tests, they won't trust it. That other developers want to see tests in your software before they take it seriously is yet another reason for you to start writing tests. #### Tests help teams work together The previous points are written from the point of view of a single developer maintaining an application. Complex applications will be maintained by teams. Tests guarantee that colleagues don't inadvertently break your code (and that you don't break theirs without knowing). ## Basic testing strategies There are many ways to approach writing tests. Some programmers follow a discipline called [test-driven development](https://en.wikipedia.org/wiki/Test-driven_development); they actually write their tests before they write their code. This might seem counter-intuitive, but in fact it's similar to what most people will often do anyway: they describe a problem, then create some code to solve it. Test-driven development formalizes the problem in a Python test case. More often, a newcomer to testing will create some code and later decide that it should have some tests. Perhaps it would have been better to write some tests earlier, but it's never too late to get started. Sometimes it's difficult to figure out where to get started with writing tests. If you have written several thousand lines of Python, choosing something to test might not be easy. In such a case, it's fruitful to write your first test the next time you make a change, either when you add a new feature or fix a bug. So let's do that right away. ## Tutorial code modifications For this part of the tutorial, we have modified the code in order to be able to test parallelized and non-parallelize code. Tasks written at the end of Part 3 are renamed into `ComputeOneData`, `GenOneData` and `PrintOneData`. Functions at the end of part 4 are `ComputeMoreData`, `GenMoreData` and `PrintMoredata`. Commands are also renamed in to `compute_one_data` and `compute_more_data` : ``` $ python manage.py compute_one_data --multiplier 3 tuto.data_reduction.tasks - tasks.py:91 - INFO - Result of the computation: 126 $ python manage.py compute_more_data --multiplier 3 tuto.data_reduction.tasks - tasks.py:45 - INFO - Result found in database tuto.data_reduction.tasks - tasks.py:55 - INFO - Result not found in database, computing result and inserting tuto.data_reduction.tasks - tasks.py:55 - INFO - Result not found in database, computing result and inserting tuto.data_reduction.tasks - tasks.py:55 - INFO - Result not found in database, computing result and inserting tuto.data_reduction.tasks - tasks.py:96 - INFO - Result of the mapped computation: [[{'computation_result': 126}], [{'computation_result': 129}], [{'computation_result': 132}], [{'computation_result': 135}]] ``` This way, we will learn how to test tasks and commands, with and without parallelized workflow. ## Pytest add-ons and configuration ### pytest-cov This will display which percentage of your code your tests cover. The higher it is, the better it is. This cannot be the only indicator to take into account : you can make tests that will execute every line of code, but if you do not make wise `assert`, it will be useless. But this can highlight that some parts have been forgotten into you tests. `pip install pytest-cov` if needed. The configuration file is `.coveragerc`. You should create this file with ``` [run] omit = plugins/data_reduction/tuto/data_reduction/tests/* ``` This will exclude tests from the coverage (I wonder why it is not the case by default). Output will be written in a directory named `htmlcov`. ### pytest-postgresql This package will provide fixtures needed to perform tests with a real running Postgresql database. This will be used for command tests. It should be installed with `outflow`. ### pytest-sugar Pretty-printing for the tests output. It is not mandatory but I like it 😉. ### `pytest.ini` ```ini [pytest] # This will set the log level to display. Here INFO log_cli = True log_cli_level = INFO log_level = INFO # This will be used by pytest-postgresql to connect to the database postgresql_user = flo postgresql_password = xxx postgresql_host = localhost ``` ## Testing simple tasks ### We identify a bug `GenOneData` and `GenMoreData` are written to return respectively `42` and `[42, 43, 44, 45]`. Suppose that was erroneous. They should return `40` and `[40, 50, 60, 70]`. ### Create a test to expose the bug Conventional places for an application's tests is in the plugin's `tests.py` file or in the `tests/` directory; the testing system will automatically find tests in any file whose name begins with `test`. Put the following in the `tests/test_1_gen.py` file in the `data_reduction` application: ```python from outflow.core.test import TaskTestCase class TestDataReductionGenTasks(TaskTestCase): def test_gen_one(self): from tuto.data_reduction.tasks import GenOneData # --- initialize the task --- self.task = GenOneData() self.config = {} # --- run the task --- result = self.run_task() # --- make assertions --- # test the result assert isinstance(result, dict) assert 'some_data' in result assert result == {'some_data': 40} ``` This will test that: - the result is a dictionary - it has the key `some_data` - the value of `some_data` is 40 Here we have created a [`outflow.core.test.TaskTestCase`]() subclass. The task is run without arguments. ### Running tests In the terminal, we can run our test in the pipeline root directory using: ``` $ pytest ``` and you'll see something like: ``` $ pytest collecting ... _____ TestDataReductionGenTasks.test_gen_one _____ plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py:17: in test_gen_one assert result == {'some_data': 40} E AssertionError: assert {'some_data': 42} == {'some_data': 40} E Differing items: E {'some_data': 42} != {'some_data': 40} E Full diff: E - {'some_data': 40} E ? ^ E + {'some_data': 42} E ? ^ plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py::TestDataReductionGenTasks.test_gen_one ⨯ 100% ██████████ ===== short test summary info ===== FAILED plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py::TestDataReductionGenTasks::test_gen_one - AssertionError: assert {'some_data': 42} == {'some_data': 40} Results (0.78s): 1 failed - plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py:5 TestDataReductionGenTasks.test_gen_one ``` What happened is this: - `pytest` looked for tests in the `data_reduction` plugin - it found a class whose [name starts with "Test"](https://docs.pytest.org/en/6.2.x/goodpractices.html#test-discovery) - it looked for test methods - ones whose names begin with `test` - just before executing `test_gen_one`, TaskTestCase creates a pipeline context - … and using the `assert` statement, it checked the tested feature The test informs us which test failed and even the line on which the failure occurred. You can make as many `assert` in you test as you want. If one fails, all the test will be reported as failed, and the following assertions inside this test will not be evaluated. ### Fixing the bug Replace 42 by 40 in the `tasks.py` file. Run the test again : ``` $ pytest --tb=short plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py::TestDataReductionGenTasks::test_gen_one collecting ... plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py::TestDataReductionGenTasks.test_gen_one 100% ██████████ Results (0.66s): 1 passed ``` ### Repeat for `GenMoreData` Do the same for testing `GenMoreData` : create a test `test_gen_more` that will raise that `GenMoreData` does not return the expected values. ``` $ pytest collecting ... plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py::TestDataReductionGenTasks.test_gen_one ✓ 50% █████ plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py::TestDataReductionGenTasks.test_gen_more ✓ 100% ██████████ Results (0.65s): 2 passed ``` ## Testing complex tasks The tasks `ComputeOneData` and `ComputeMoreData` are complex tasks because they involve a python code and an interaction with a database. As we do not want to test the database connection, we will mock the database responses. ### Testing ComputeOneData Create a new file named `tests/test_2_compute_one.py`. The numbering is not mandatory but tests are executed in the alphabetic order of the file names. I like to test things in a logical order : it allows to break tests early if one fails and if I know that the other ones will fail in a same manner. The complete code will be shown at the end of the section We will mock the session linked to the database with a test decorator : ```python @mock.patch('outflow.core.db.database.Databases.session', new_callable=mock.PropertyMock) def test_compute_one_already_in_db(self, mock_session): (...) ``` Every request to the database will return silently as if everything is ok. #### Testing when result is already in database To simulate the query (in `tasks.py`): ```python computation_result_obj = session.query(ComputationResult) \ .filter_by(input_value=some_data, multiplier=multiplier).one() ``` we will declare the `ComputationResult` that we are waiting for: ```python db_result = ComputationResult( input_value = data, multiplier = mult, result = data * mult ) ``` and tell that this will be the returned value of `session.query.filter_by.one`: ```python mock_session\ .return_value.query\ .return_value.filter_by\ .return_value.one.return_value = db_result ``` As the task is running outside the command, some context has to be defined : ```python from outflow.core.pipeline import context, config context.force_dry_run = False context.db_untracked = False context._models = [] config["databases"] = mock.MagicMock() context.args = Namespace(multiplier = mult, dry_run = False, db_untracked = False) ``` Like before, setup and run the task : ```python self.task = ComputeOne() result = self.run_task(some_data = data) ``` We can then make the needed assertions: ```python assert isinstance(result, dict) assert 'computation_result' in result assert result == {'computation_result': data * mult} ``` As the query to the database returned a `ComputationResult` this means that the result was already in database. Then no insertion should have been made. To ensure this was the case, we can examine the call traceback: ```python # filtering "add" calls with a ComputationResult object as parameter call_add_computation_result = [ call.args[0] for call in mock_session.return_value.add.call_args_list if isinstance(call.args[0], ComputationResult)] ``` `call_args_list` returns all the `add` calls that have been made to the session. We filter the results because outflow uses also the session to log each job into the tables `public.task`. We are only interested in the `add` into the table `computation_result`. For this test, no `ComputationResult` should be added in database. Then : ```python assert len(call_add_computation_result) == 0 ``` #### Testing when the result is not in database In this case, the query should not return a `ComputationResult` object but instead raise a `NoResultFound` exception : ```python from sqlalchemy.orm.exc import NoResultFound mock_session\ .return_value.query\ .return_value.filter_by\ .return_value.one.side_effect = NoResultFound ``` Note that we use the method `side_effect` instead of `return_value` for a function. The same assertions can be made for this test, but for the `len(call_add_computation_result)` the expected value will be different as the `ComputationResult` should have been inserted in database : ```python assert len(call_add_computation_result) == 1 ``` At this stage, `pytest` should output : ``` $ pytest collecting ... plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py ✓✓ 50% █████ plugins/data_reduction/tuto/data_reduction/tests/test_2_compute_one.py ✓✓ 100% ██████████ Results (4.30s): 4 passed ``` The output may differ depending on your `pytest.ini` (here `log_cli = False`). #### Complete code ```python import pytest from unittest import mock from random import sample, randrange from argparse import Namespace from outflow.core.test import TaskTestCase from outflow.core.pipeline import context, config from tuto.data_reduction.tasks import ComputeOne from tuto.data_reduction.models.computation_result import ComputationResult class TestDataReductionComputeTasks(TaskTestCase): @pytest.fixture(autouse=True) def setup_context(self, with_pipeline_context_manager): context.force_dry_run = False context.db_untracked = False context._models = [] config["databases"] = mock.MagicMock() @mock.patch('outflow.core.db.database.Databases.session', new_callable=mock.PropertyMock) def test_compute_one_already_in_db(self, mock_session): # --- define test data --- data = randrange(100) mult = randrange(10) db_result = ComputationResult( input_value = data, multiplier = mult, result = data * mult ) # --- define mock session queries mock_session\ .return_value.query\ .return_value.filter_by\ .return_value.one.return_value = db_result # -- define args context.args = Namespace(multiplier = mult, dry_run = False, db_untracked = False) self.task = ComputeOne() # --- run the task --- result = self.run_task(some_data = data) # filtering "add" calls with a ComputationResult object as parameter call_add_computation_result = [ call.args[0] for call in mock_session.return_value.add.call_args_list if isinstance(call.args[0], ComputationResult)] # --- make assertions --- assert isinstance(result, dict) assert 'computation_result' in result assert result == {'computation_result': data * mult} assert len(call_add_computation_result) == 0 # @mock.patch('tuto.data_reduction.tasks.context') @mock.patch('outflow.core.db.database.Databases.session', new_callable=mock.PropertyMock) def test_compute_one_not_in_db(self, mock_session): # --- initialize the task --- from sqlalchemy.orm.exc import NoResultFound # --- define test data --- data = randrange(100) mult = randrange(10) db_result = ComputationResult( input_value = data, multiplier = mult, result = data * mult ) # --- define mock session queries mock_session\ .return_value.query\ .return_value.filter_by\ .return_value.one.side_effect = NoResultFound # -- define args context.args = Namespace(multiplier = mult, dry_run = False, db_untracked = False) self.task = ComputeOne() # --- run the task --- result = self.run_task(some_data = data) # filtering "add" calls with a ComputationResult object as parameter call_add_computation_result = [ call.args[0] for call in mock_session.return_value.add.call_args_list if isinstance(call.args[0], ComputationResult)] # --- make assertions --- assert isinstance(result, dict) assert 'computation_result' in result assert result == {'computation_result': data * mult} assert len(call_add_computation_result) == 1 ``` ### Testing ComputeMoreData Testing `ComputeMoreData` will be similar to `ComputeOneData`. We have to tell to the mocked session the array of values that will be returned by `session.query.filter_by.one`. #### Testing when all the data are already in database Define the values to be returned : ```python db_result_list = [ComputationResult( input_value = data_array[i], multiplier = mult, result = data_array[i] * mult ) for i in range(nb_data) ] ``` where `data_array` is the array that will be given to the task, and `nb_data` the length of this array. Give them to the mocked session : ```python mock_session\ .return_value.query\ .return_value.filter_by\ .return_value.one.side_effect = db_result_list ``` For testing the values returned by the task, we cannot make a simple comparison because the different tasks distributed on different CPUs may not return in the same order. We have then to test: - the length of the result array is `nb_data` - each expected returned value is in `result` ```python assert len(result['map_computation_result']) == nb_data for i in range(nb_data): data = data_array[i] assert [{'computation_result': data * mult}] in result['map_computation_result'] ``` Finally, don't forget to test that no `add` calls were made to insert `ComputationResult` in database : ```python assert len(call_add_computation_result) == 0 ``` #### Testing when some data are not in database For this case, just replace one `ComputationResult` by a `NoResultFound`. For example : ```python from random import randrange item_not_in_db = randrange(nb_data) db_result_list[i] = NoResultFound ``` And in this case, there should be 1 `add` call to insert `ComputationResult` in database : ```python assert len(call_add_computation_result) == 1 ``` Finally, `pytest` will return: ``` $ pytest collecting ... plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py ✓✓ 33% ███▍ plugins/data_reduction/tuto/data_reduction/tests/test_2_compute_one.py ✓✓ 67% ██████▋ plugins/data_reduction/tuto/data_reduction/tests/test_3_compute_more.py ✓✓ 100% ██████████ Results (12.72s): 6 passed ``` #### Complete code ```python import pytest from random import sample, randrange from argparse import Namespace from unittest import mock from outflow.core.test import TaskTestCase from outflow.library.workflows import MapWorkflow from outflow.core.pipeline import context, config from tuto.data_reduction.models.computation_result import ComputationResult from tuto.data_reduction.tasks import ComputeMore class TestDataReductionComputeMoreTasks(TaskTestCase): @pytest.fixture(autouse=True) def setup_context(self, with_pipeline_context_manager): context.force_dry_run = False context.db_untracked = False context._models = [] config["databases"] = mock.MagicMock() config["backend"] = "ray" config["ray"] = {"cluster_type": "local"} @mock.patch('outflow.core.db.database.Databases.session', new_callable=mock.PropertyMock) def test_compute_more(self, mock_session): """ Testing when all data are already in database Request to DB returns a ComputationResult """ # --- define test data --- nb_data = randrange(1, 5) data_array = sample(range(100), k=nb_data) mult = randrange(10) # --- define the data returned by the request to the DB --- db_result_list = [ComputationResult( input_value=data_array[i], multiplier=mult, result=data_array[i] * mult ) for i in range(nb_data)] # --- define mock session queries mock_session .return_value.query .return_value.filter_by .return_value.one.side_effect = db_result_list # -- define args context.args = Namespace(multiplier=mult, dry_run=False, db_untracked=False) # --- initialize the task --- mapped_computation = MapWorkflow(ComputeMore(), output_name="map_computation_result") self.task = mapped_computation # --- run the task --- result = self.run_task(some_data_array=data_array) # --- make assertions --- # filtering "add" calls with a ComputationResult object as parameter call_add_computation_result = [ call.args[0] for call in mock_session.return_value.add.call_args_list if isinstance(call.args[0], ComputationResult)] # all results are already be in base, since we mock a result in base assert len(call_add_computation_result) == 0 # check the type of result assert isinstance(result, dict) assert 'map_computation_result' in result # we should get as many results as input data assert len(result['map_computation_result']) == nb_data # verifying that every expected result is returned for i in range(nb_data): data = data_array[i] assert [{'computation_result': data * mult}] in result['map_computation_result'] @mock.patch('outflow.core.db.database.Databases.session', new_callable=mock.PropertyMock) def test_compute_more_with_results_not_in_base(self, mock_session): """ Testing when some data are not already in database Request to DB returns a ComputationResult or a NoResultFound exception """ # --- define test data --- nb_data = randrange(2, 5) nb_data_not_in_db = randrange(1, nb_data) data_not_in_db = sample(range(nb_data), k=nb_data_not_in_db) data_array = sample(range(100), k=nb_data) mult = randrange(10) db_result_list = [] # --- define the data returned by the request to the DB --- from sqlalchemy.orm.exc import NoResultFound for i in range(nb_data): if i in data_not_in_db: db_result_list.append(NoResultFound) else: db_result_list.append(ComputationResult( input_value=data_array[i], multiplier=mult, result=data_array[i] * mult )) # --- define mock session queries mock_session .return_value.query .return_value.filter_by .return_value.one.side_effect = db_result_list # -- define args context.force_dry_run = False context.db_untracked = False context._models = [] config["databases"] = mock.MagicMock() config["backend"] = "ray" context.args = Namespace(multiplier=mult, dry_run=False, db_untracked=False) # --- initialize the task --- mapped_computation = MapWorkflow(ComputeMore(), output_name="map_computation_result") self.task = mapped_computation # --- run the task --- result = self.run_task(some_data_array=data_array) # --- make assertions --- # filtering "add" calls with a ComputationResult object as parameter call_add_computation_result = [ call.args[0] for call in mock_session.return_value.add.call_args_list if isinstance(call.args[0], ComputationResult)] # nb_data_not_in_db should be added since they were not found assert len(call_add_computation_result) == nb_data_not_in_db # check the type of result assert isinstance(result, dict) assert 'map_computation_result' in result # we should get as many results as input data assert len(result['map_computation_result']) == nb_data # verifying that every expected result is returned for i in range(nb_data): data = data_array[i] assert [{'computation_result': data * mult}] in result['map_computation_result'] ``` ## Testing commands The aim of testing command is to verify that tasks are executed one after the other as defined. The cases tested at the tasks level do not need to be tested again. ### Regular commands In the first part of the tutorial, we defined a command `data_reduction` which only prints a "Hello world". There is no many things to test for this command, but it will give the skeleton for testing a command : ```python class TestDataReductionNoCmd(CommandTestCase): def test_data_reduction(self): # --- initialize the command --- from tuto.data_reduction.commands import DataReduction self.root_command_class = DataReduction arg_list = [] # --- run the command --- return_code, result = self.run_command(arg_list) # --- make assertions --- assert return_code == 0 assert result == [{"None": None}] ``` ### Commands involving an interaction with a Postgresql database Outflow comes with a `PostgresCommandTestCase` class that will setup a fresh database for us before each test and that will drop it after use. Create a new file test `test_4_commands.py`. ```python from outflow.core.test.test_cases import (PostgresCommandTestCase, postgresql_fixture) class TestDataReductionCmd(PostgresCommandTestCase): PLUGINS = ['outflow.management', 'tuto.data_reduction'] ``` In order to be able to use the database, migrations has to be applied before each test (since a new database is created each time). We will use a fixture to do this automatically (that will prevent duplicated code). ```python # Automatically run a db upgrade heads before each test @pytest.fixture(autouse=True) def setup_db_upgrade(self, with_pipeline_context_manager): # -- commands to be executed db_upgrade = ['management', 'db', 'upgrade', 'heads', '-ll', 'INFO'] self.run_command(db_upgrade, force_dry_run = False) ``` #### Testing the migration ```python # --- test if the upgrade is ok def test_db_upgrade(self): with self.pipeline_db_session() as session: # The table computation_result has be to created try: c = session.query(ComputationResult).count() # and it should be empty assert c == 0 except Exception as e: assert False, e ``` To verify that this test is relevant, comment the line `self.run_command(db_upgrade)` in the `setup_db_upgrade()` function. `pytest` will then fail with : ``` FAILED plugins/data_reduction/tuto/data_reduction/tests/test_4_commands.py::TestDataReductionCmd::test_db_upgrade - AssertionError: ProgrammingError('(psycopg2.errors.UndefinedTable) relation "computation_result" does not exist ``` This way, you are sure that if the migration fails for any reason, you will be notified. Do not forget to comment out the line `self.run_command(db_upgrade)` afterwards. #### Testing `compute_one_data` Testing this command is easy. Just define the command: ```python command = [ 'compute_one_data', '--multiplier', f'{multiplier}', '-ll', 'INFO', ] ``` Run the command: ```python return_code, result = self.run_command(command, force_dry_run = False) ``` And test we get the expected results: ```python assert return_code == 0 assert result[0]['computation_result'] == 40 * multiplier ``` #### Testing `compute_more_data` For this command, the Ray (TODO change to parallel backend) backend has to be activated. Before running the command, configuration has to be updated : ```python custom_config = { "backend": "ray", "ray": {"cluster_type": "local"} } config.update(custom_config) ``` The other parts of the tests remain the same : assert the `return_code` and that each expected value is present in the result array : ```python assert return_code == 0 for i in range(40, 71, 10): res = [{'computation_result': i * multiplier}] assert res in result[0]['map_computation_result'] ``` Finally, `pytest` should return: ``` $ pytest collecting ... plugins/data_reduction/tuto/data_reduction/tests/test_1_gen.py ✓✓ 20% ██ plugins/data_reduction/tuto/data_reduction/tests/test_2_compute_one.py ✓✓ 40% ████ plugins/data_reduction/tuto/data_reduction/tests/test_3_compute_more.py ✓✓ 60% ██████ plugins/data_reduction/tuto/data_reduction/tests/test_4_commands.py ✓✓✓✓ 100% ██████████ Results (22.31s): 10 passed ``` Congratulations ! ## Test coverage In addition you can check the coverage of your tests running `pytest --cov=tuto.data_reduction --cov-report html`. This will generate an HTML report in the `htmlcov/` directory. With this tutorial, you should be able to reach 100% of coverage. | Module | statements | missing | excluded | coverage | | ------------------------------------------------------------------------- | ---------- | ------- | -------- | -------- | | Total | 96 | 0 | 0 | 100% | | `plugins/data_reduction/tuto/data_reduction/__init__.py` | 0 | 0 | 0 | 100% | | `plugins/data_reduction/tuto/data_reduction/commands.py` | 30 | 0 | 0 | 100% | | `plugins/data_reduction/tuto/data_reduction/models/__init__.py` | 7 | 0 | 0 | 100% | | `plugins/data_reduction/tuto/data_reduction/models/computation_result.py` | 8 | 0 | 0 | 100% | | `plugins/data_reduction/tuto/data_reduction/tasks.py` | 51 | 0 | 0 | 100% |