Testing en Apache Airflow

Introducción

Hoy vamos a hablar de dos formas de realizar testing en Apache Airflow. Históricamente, realizar tests en Airflow ha sido algo que nos ha traído de cabeza a todos los usuarios del famoso framework. El acoplamiento del código con la librería hacía imposible poder realizar un simple test unitario. Para poder desacoplar el código, desde Damavis utilizábamos y utilizamos arquitecturas clean que nos permitían hasta cierto punto separar la lógica de negocio del framework, pero cuando llegabas a los tests de integración te veías muy limitado a la hora de desarrollarlos por cómo está pensada la librería. 

En este post, vamos a ver dos formas que los desarrolladores de Airflow nos brindan para poder aplicar testing utilizando su framework.

Test de integridad del código

Este tipo de test pretende únicamente detectar aquellos fallos relacionados con la ejecución del código, es decir, que no exista ningún error gramatical de Python o de importación de los módulos. Es algo parecido a lo que un IDE como Pycharm te dirá durante el tiempo de desarrollo. 

La mayor diferencia está en que es posible que tu entorno de desarrollo y entorno de pruebas difiera por alguna razón y puedas probar que todas las dependencias y Python paths están correctamente escritas. Muchas veces, con IDEs como intellij, podemos automáticamente añadir una variable de entorno, una configuración o establecer el PythonPath de manera transparente al desarrollador.

Veamos cómo se aplica en un caso real. Dado el siguiente DAG de ejemplo, podemos observar cómo intellij detecta un fallo entre los imports.

Este tipo de errores o cualquier otro de sintaxis debe aparecer en el momento que lanzamos nuestros tests de integridad, veámoslo.

Aquí tendríamos el modelo de test que nos permitirá cargar todos los DAGs del proyecto. Esto se consigue mediante la clase DagBag, que nos permite generar una especie de almacén donde, después, mediante el método get_dag, podremos buscar un DAG concreto. En este test se está comprobando dos puntos diferentes: el primero, que efectivamente se pueden cargar todos los DAGs y ninguno tiene errores de integridad. El segundo punto sería que el DAG con nombre example_dag_daily tiene 3 tasks asignadas.

# Para ejecutar los tests:
pytest tests/*

Cuando lancemos los tests, el objeto de BagDag tratará de cargar los DAGs por defecto desde un entorno ejecutable de Airflow, es decir, es necesario haber definido un AIRFLOW_HOME correctamente configurado. Si no existiese dicho workspace, tratará de cargarlo de la ruta por defecto, que en Ubuntu sería ~/airflow

Test de un operador

En ocasiones, durante el desarrollo de un proyecto Airflow, realizamos operadores propios para poder reutilizarlos posteriormente en los diferentes DAGs.

Existe una forma de poder ejecutar un operador dentro de un DAG de ejemplo. En esta ocasión, la ejecución se realiza de manera programática y, muchas veces, es necesario poder consultar Connections y Variables. Veamos un ejemplo de ello.

Dado el siguiente caso de custom operator, se hace una llamada http con el método get a una uri sacada de un objeto Connection y se imprime mediante un valor de una Variable de Airflow.

Veamos el test relacionado con el custom operator.

En dicho test se realizan dos fixture que son invocados antes del método test_my_custom_operator. El primer fixture pretende mockear método get_connection, que es un método heredado de BaseHook para que devuelva el objeto mock_connection. El segundo fixture pretende inyectar una variable de entorno que posteriormente será consultada, print_variable, por lo tanto la inyección se debe autocompletar como se introduce realmente en Airflow, con el prefijo AIRFLOW_VAR_. 

También se realiza un mock sobre la llamada del método plugins.operators.custom_operator.requests.get que es el que materializa la llamada http. 

En este caso, no estamos utilizando el contexto dentro del método execute, pero si fuera necesario, habría que pasarle un contexto mock de los objetos que se desean consultar desde el operador, como por ejemplo el execution_date.

Con todo ello, podemos ver cómo se ejecuta el método execute de CustomOperator haciendo el trabajo programado y finalmente imprimiendo el mock de la llamada http. Este es solamente un ejemplo, pero puede llegar a ser muy complejo con diferentes llamadas y esperas. Todo ello se puede examinar del lado del test, en este caso, se observa que se hace una llamada una sola vez al método .json() del objeto response.

Conclusión

En este post, hemos comprobado cómo se pueden realizar tests en Airflow para evaluar, por un lado, la integridad del código y, por otro, un operador propio que hayamos creado para reutilizarlo en distintos DAGs. Accede al proyecto en Github.

Hasta aquí nuestro post de hoy. Si te ha parecido interesante, te animamos a visitar la categoría Software para ver artículos similares y a compartirlo en redes con tus contactos. ¡Hasta pronto!
Óscar García
Óscar García
Artículos: 8