Airflow Avanzado. Dependencias Entre Tareas y Sensores Cross-DAG

Dependencias entre tareas y sensores Cross-DAG con Airflow. Cómo solucionar problemas relacionados con la complejidad de la ingeniería de datos.

En este artículo vamos a contarte algunas formas de solucionar problemas relacionados con la complejidad de la ingeniería de datos en sí misma. Un DAG de Airflow puede llegar a ser muy complejo si empezamos a incluir todas las dependencias en el mismo, y además, esta estrategia nos permite desacoplar los procesos, por ejemplo, por equipos de ingenieros de datos, por departamentos, o cualquier otro criterio. 

A continuación, te proponemos un ejemplo sobre un caso hipotético, ver la problemática y resolverlo.

Dos departamentos, un proceso.

Imaginemos que nuestra compañía tiene dos departamentos donde es necesario tener unos procesos diarios separados, pero que son interdependientes. Vamos a describir estos supuestos procesos, con sus tiempos de procesado y podremos observar el problema. Supongamos que la interdependencia se encuentra en los Informes, donde en cada uno de ellos se tiene en cuenta el proceso del otro.

Operativa

  • Calculo de ventas (1h)
  • Calculo de compras proveedor (1h)
  • Informe

Finanzas

  • Anotación de las ventas (5m)
  • Validación de ventas con cuentas (2h)
  • Anotación de las compras + otros gastos (5m)
  • Informe

Este sería el código del DAG y su representación en la UI de Airflow:

from datetime import timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'problem_a'
DEFAULT_ARGS = {
    'owner': 'Operations+Finance',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['operations+airflow@example.com', 'finance+airflow@example.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=10),
    'email_on_failure': False,
    'email_on_retry': False
}

with DAG(dag_id=DAG_NAME,
         default_args=DEFAULT_ARGS,
         dagrun_timeout=timedelta(minutes=10),
         schedule_interval=None) as dag:

    calculate_revenue = DummyOperator(task_id='operations_calculate_revenue',
                                      dag=dag)

    income_bookkeep = DummyOperator(task_id='finances_income_bookkeep',
                                    dag=dag)

    validate_income = DummyOperator(task_id='finances_validate_income',
                                    dag=dag)

    calculate_expenses = DummyOperator(task_id='operations_calculate_expenses',
                                       dag=dag)

    outcome_bookkeep = DummyOperator(task_id='finances_outcome_bookkeep',
                                     dag=dag)

    operations_a_report = DummyOperator(task_id='operations_a_report',
                                        dag=dag)

    finance_a_report = DummyOperator(task_id='finance_a_report',
                                     dag=dag)

    calculate_revenue >> income_bookkeep >> validate_income
    calculate_expenses >> outcome_bookkeep

    validate_income >> finance_a_report
    outcome_bookkeep >> finance_a_report

    income_bookkeep >> operations_a_report
    outcome_bookkeep >> operations_a_report

Aquí podemos observar como tenemos, en realidad 2 procesos, con dependencias, en un mismo DAG

Dos departamentos, dos procesos.

El objetivo de este ejercicio, es dividir este DAG en 2, pero queremos mantener las dependencias. Para ello, deberemos seguir una estrategia concreta, en este caso, hemos seleccionado el DAG de operativa como el principal, y el de finanzas como secundario. Básicamente porque el de finanzas, depende primero de las tareas de operativa.

Para desarrollar la solución, vamos a hacer uso de 2 Operadores de Airflow , TriggerDagRunOperator, que sirve para lanzar la ejecución de un DAG externo, y ExternalTaskSensor, que se utiliza para hacer una espera de una Tarea de un DAG externo.

Proceso de Operaciones

# operations_a.py
from datetime import timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.dates import days_ago

DAG_NAME = 'operations_a'
DEFAULT_ARGS = {
    'owner': 'Operations',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['operations+airflow@example.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=10),
    'email_on_failure': False,
    'email_on_retry': False
}

with DAG(dag_id=DAG_NAME,
         default_args=DEFAULT_ARGS,
         dagrun_timeout=timedelta(minutes=10),
         schedule_interval=None) as dag:
    calculate_revenue = DummyOperator(task_id='calculate_revenue',
                                      dag=dag)

    trigger_finances_a = TriggerDagRunOperator(task_id='trigger_finances_a',
                                               dag=dag,
                                               trigger_dag_id='finances_a',
                                               execution_date='{{ execution_date }}')

    calculate_expenses = DummyOperator(task_id='calculate_expenses',
                                       dag=dag)

    wait_finances_a_expenses_bookkept = ExternalTaskSensor(
        task_id='wait_finances_a_outcome_bookeep',
        dag=dag,
        external_dag_id='finances_a',
        external_task_id='outcome_bookkeep')

    operations_a_report = DummyOperator(task_id='operations_a_report',
                                        dag=dag)

    calculate_revenue >> trigger_finances_a >> calculate_expenses
    calculate_expenses >> wait_finances_a_expenses_bookkept >> operations_a_report

Aquí, podemos observar que los Operadores encargados de lanzar un DAG externo, se muestra en un color rosado, y los Operadores de sensor de tarea externa, en azul oscuro.

Proceso de finanzas

# finances_a.py
from datetime import timedelta from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.sensors.external_task import ExternalTaskSensor from airflow.utils.dates import days_ago DAG_NAME = 'finances_a' DEFAULT_ARGS = { 'owner': 'Airflow', 'depends_on_past': False, 'start_date': days_ago(2), 'email': ['finances+airflow@example.com'], 'retries': 3, 'retry_delay': timedelta(seconds=10), 'email_on_failure': False, 'email_on_retry': False } with DAG(dag_id=DAG_NAME, default_args=DEFAULT_ARGS, dagrun_timeout=timedelta(minutes=10), schedule_interval=None) as dag: income_bookkeep = DummyOperator(task_id='income_bookkeep', dag=dag) validate_income = DummyOperator(task_id='validate_income', dag=dag) wait_operations_a_calculate_expenses = ExternalTaskSensor( task_id='wait_operations_a_calculate_expenses', dag=dag, external_dag_id='operations_a', external_task_id='calculate_expenses', timeout=60) outcome_bookkeep = DummyOperator(task_id='outcome_bookkeep', dag=dag) finances_a_report = DummyOperator(task_id='finances_a_report', dag=dag) income_bookkeep >> validate_income >> wait_operations_a_calculate_expenses wait_operations_a_calculate_expenses >> finances_a_report

En este caso, vemos el sensor de la tarea externa, en color azul.

¡Solucionado!

De esta forma, cuando se ejecute el DAG de Operativa, éste se encargará de lanzar el DAG de finanzas a su debido momento, y los departamentos pueden seguir evolucionando sus procesos independientemente y teniendo en cuenta sólo las dependencias que tienen el uno del otro.

Puedes descargarte el código completo desde el repositorio damavis/advanced-airflow

Si te ha parecido útil este post, te animamos a ver más artículos similares de la categoría Data Engineering en nuestro blog y a compartirlo con tus contactos. ¡Nos vemos en redes!
Cristòfol Torrens
Cristòfol Torrens
Artículos: 8