En este artículo vamos a contarte algunas formas de solucionar problemas relacionados con la complejidad de la ingeniería de datos en sí misma. Un DAG de Airflow puede llegar a ser muy complejo si empezamos a incluir todas las dependencias en el mismo, y además, esta estrategia nos permite desacoplar los procesos, por ejemplo, por equipos de ingenieros de datos, por departamentos, o cualquier otro criterio.
A continuación, te proponemos un ejemplo sobre un caso hipotético, ver la problemática y resolverlo.
Dos departamentos, un proceso
Imaginemos que nuestra compañía tiene dos departamentos donde es necesario tener unos procesos diarios separados, pero que son interdependientes. Vamos a describir estos supuestos procesos, con sus tiempos de procesado y podremos observar el problema. Supongamos que la interdependencia se encuentra en los Informes, donde en cada uno de ellos se tiene en cuenta el proceso del otro.
Operativa
- Calculo de ventas (1h)
- Calculo de compras proveedor (1h)
- Informe
Finanzas
- Anotación de las ventas (5m)
- Validación de ventas con cuentas (2h)
- Anotación de las compras + otros gastos (5m)
- Informe
Este sería el código del DAG y su representación en la UI de Airflow:
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
DAG_NAME = 'problem_a'
DEFAULT_ARGS = {
'owner': 'Operations+Finance',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['operations+airflow@example.com', 'finance+airflow@example.com'],
'retries': 3,
'retry_delay': timedelta(minutes=10),
'email_on_failure': False,
'email_on_retry': False
}
with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(minutes=10),
schedule_interval=None) as dag:
calculate_revenue = DummyOperator(task_id='operations_calculate_revenue',
dag=dag)
income_bookkeep = DummyOperator(task_id='finances_income_bookkeep',
dag=dag)
validate_income = DummyOperator(task_id='finances_validate_income',
dag=dag)
calculate_expenses = DummyOperator(task_id='operations_calculate_expenses',
dag=dag)
outcome_bookkeep = DummyOperator(task_id='finances_outcome_bookkeep',
dag=dag)
operations_a_report = DummyOperator(task_id='operations_a_report',
dag=dag)
finance_a_report = DummyOperator(task_id='finance_a_report',
dag=dag)
calculate_revenue >> income_bookkeep >> validate_income
calculate_expenses >> outcome_bookkeep
validate_income >> finance_a_report
outcome_bookkeep >> finance_a_report
income_bookkeep >> operations_a_report
outcome_bookkeep >> operations_a_report
Aquí podemos observar como tenemos, en realidad 2 procesos, con dependencias, en un mismo DAG
Dos departamentos, dos procesos
El objetivo de este ejercicio, es dividir este DAG en 2, pero queremos mantener las dependencias. Para ello, deberemos seguir una estrategia concreta, en este caso, hemos seleccionado el DAG de operativa como el principal, y el de finanzas como secundario. Básicamente porque el de finanzas, depende primero de las tareas de operativa.
Para desarrollar la solución, vamos a hacer uso de 2 Operadores de Airflow , TriggerDagRunOperator, que sirve para lanzar la ejecución de un DAG externo, y ExternalTaskSensor, que se utiliza para hacer una espera de una Tarea de un DAG externo.
Proceso de Operaciones
# operations_a.py
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.dates import days_ago
DAG_NAME = 'operations_a'
DEFAULT_ARGS = {
'owner': 'Operations',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['operations+airflow@example.com'],
'retries': 3,
'retry_delay': timedelta(minutes=10),
'email_on_failure': False,
'email_on_retry': False
}
with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(minutes=10),
schedule_interval=None) as dag:
calculate_revenue = DummyOperator(task_id='calculate_revenue',
dag=dag)
trigger_finances_a = TriggerDagRunOperator(task_id='trigger_finances_a',
dag=dag,
trigger_dag_id='finances_a',
execution_date='{{ execution_date }}')
calculate_expenses = DummyOperator(task_id='calculate_expenses',
dag=dag)
wait_finances_a_expenses_bookkept = ExternalTaskSensor(
task_id='wait_finances_a_outcome_bookeep',
dag=dag,
external_dag_id='finances_a',
external_task_id='outcome_bookkeep')
operations_a_report = DummyOperator(task_id='operations_a_report',
dag=dag)
calculate_revenue >> trigger_finances_a >> calculate_expenses
calculate_expenses >> wait_finances_a_expenses_bookkept >> operations_a_report
Aquí, podemos observar que los Operadores encargados de lanzar un DAG externo, se muestra en un color rosado, y los Operadores de sensor de tarea externa, en azul oscuro.
Proceso de finanzas
# finances_a.pyfrom datetime import timedelta from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.sensors.external_task import ExternalTaskSensor from airflow.utils.dates import days_ago DAG_NAME = 'finances_a' DEFAULT_ARGS = { 'owner': 'Airflow', 'depends_on_past': False, 'start_date': days_ago(2), 'email': ['finances+airflow@example.com'], 'retries': 3, 'retry_delay': timedelta(seconds=10), 'email_on_failure': False, 'email_on_retry': False } with DAG(dag_id=DAG_NAME, default_args=DEFAULT_ARGS, dagrun_timeout=timedelta(minutes=10), schedule_interval=None) as dag: income_bookkeep = DummyOperator(task_id='income_bookkeep', dag=dag) validate_income = DummyOperator(task_id='validate_income', dag=dag) wait_operations_a_calculate_expenses = ExternalTaskSensor( task_id='wait_operations_a_calculate_expenses', dag=dag, external_dag_id='operations_a', external_task_id='calculate_expenses', timeout=60) outcome_bookkeep = DummyOperator(task_id='outcome_bookkeep', dag=dag) finances_a_report = DummyOperator(task_id='finances_a_report', dag=dag) income_bookkeep >> validate_income >> wait_operations_a_calculate_expenses wait_operations_a_calculate_expenses >> finances_a_report
En este caso, vemos el sensor de la tarea externa, en color azul.
¡Solucionado!
De esta forma, cuando se ejecute el DAG de Operativa, éste se encargará de lanzar el DAG de finanzas a su debido momento, y los departamentos pueden seguir evolucionando sus procesos independientemente y teniendo en cuenta sólo las dependencias que tienen el uno del otro.
Puedes descargarte el código completo desde el repositorio damavis/advanced-airflow