Testing in Apache Airflow

Today we are going to talk about two ways of testing in Apache Airflow. Historically, testing in Airflow has been something that has been a headache for all users of the famous framework. The coupling of the code with the library made it impossible to perform a simple unit test. To be able to decouple the code, from Damavis we used and use clean architectures that allowed us to some extent to separate the business logic from the framework, but when you got to the integration tests you were very limited when developing them because of the way the library is designed.

In this post, we are going to see two ways that Airflow developers give us to apply testing using their framework.

Code integrity test

This type of test is only intended to detect bugs related to the execution of the code, that is, that there are no errors in Python grammar, or in the import of modules. It is something similar to what an IDE like Pycharm will tell you during development time.

The biggest difference is that it is possible that your development environment and test environment differ for some reason and you can test that all dependencies and Python paths are correctly written. Many times, with IDEs like intellij, we can automatically add an environment variable, a configuration or set the PythonPath transparently to the developer.

Let’s see how this is applied in a real case. Given the following example DAG, we can see how intellij detects a bug between imports.

This kind of errors or any other syntax bug should appear at the moment we launch our integrity tests, let’s see it.

We would have here the test model that will allow us to load all the DAGs of the project. This is achieved through the DagBag class , which allows us to generate a kind of store where, later, through the get_dag method, we will be able to search for a specific DAG. In this test two different points are being tested: the first one, where effectively all the DAGs can be loaded and none of them has integrity errors. The second point would be that the DAG with name example_dag_daily has 3 assigned tasks.

# To run the tests:
pytest tests/*

When we launch the tests, the BagDag object will try to load the DAGs by default from an Airflow executable environment, that is to say, it is necessary to have defined a correctly configured AIRFLOW_HOME. If such a workspace does not exist, it will try to load it from the default path, which in Ubuntu would be ~/airflow.

Operator test

Sometimes, during the development of an Airflow project, we make our own operators to be able to reuse them later in different DAGs.

There is a way to execute an operator within an example DAG. This time, the execution is done programmatically and, many times, it is necessary to be able to consult Connections and Variables. Let’s see an example of this.

Given the following custom operator case, an http call is made with the get method to an uri taken from a Connection object and it is printed by means of a value of an Airflow Variable.

Let’s see the test related to the custom operator.

In this test there are two fixtures that are invoked before the test_my_custom_operator method. The first one pretends to mock get_connection method, which is a method inherited from BaseHook to return the mock_connection object. The second fixture intends to inject an environment variable that will later be queried, print_variable, therefore, the injection must be auto-completed as entered in Airflow, with the prefix AIRFLOW_VAR_.

A mock is also performed on the plugins.operators.custom_operator.requests.get method call which is the one that materializes the http call.

In this case, we are not using the context inside the execute method, but if it were necessary, we would have to pass a mock context of the objects that we want to consult from the operator, as for example the execution_date.

With all this, we can see how the execute method of CustomOperator is executed doing the programmed work and finally printing the mock of the hHttp call. This is just an example, but it can become very complex with different calls and waits. All this can be examined on the test side, in this case, it is observed that a single call is made to the .json() method of the response object.

Conclusion

In this post, we have checked how tests can be performed in Airflow to evaluate, on the one hand, the integrity of the code and, on the other hand, an operator that we have created to reuse it in different DAGs. Go to the project on Github.

So much for today’s post. If you found it interesting, we encourage you to visit the Software category to see similar articles and to share it in networks with your contacts. See you soon!
Óscar García
Óscar García
Articles: 26