Advanced Airflow: Cross-DAG tasks and sensor dependencies

In this article we are going to tell you some ways to solve problems related to the complexity of data engineering itself. An Airflow DAG can become very complex if we start including all dependencies in it, and furthermore, this strategy allows us to decouple the processes, for example, by teams of data engineers, by departments, or any other criteria.

Here is an example of an hypothetical case, see the problem and solve it.

Two departments, one process

Let’s imagine that our company has two departments where it is necessary to have separate daily processes, but which are interdependent. Describe these supposed processes, with their processing times, and we will be able to observe the problem. Let’s assume that the interdependence is in the Reports, where each of them takes into account the process of the other.

Operational

  • Sales Calculation (1h)
  • Vendor purchase calculation (1h)
  • Report

Finance

  • Sales book keep (5m)
  • Validation of sales with accounts (2h)
  • Book keep of purchases + other expenses (5m)
  • Report

This would be the DAG code and its representation in the Airflow UI:

from datetime import timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago

DAG_NAME = 'problem_a'
DEFAULT_ARGS = {
    'owner': 'Operations+Finance',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['operations+airflow@example.com', 'finance+airflow@example.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=10),
    'email_on_failure': False,
    'email_on_retry': False
}

with DAG(dag_id=DAG_NAME,
         default_args=DEFAULT_ARGS,
         dagrun_timeout=timedelta(minutes=10),
         schedule_interval=None) as dag:

    calculate_revenue = DummyOperator(task_id='operations_calculate_revenue',
                                      dag=dag)

    income_bookkeep = DummyOperator(task_id='finances_income_bookkeep',
                                    dag=dag)

    validate_income = DummyOperator(task_id='finances_validate_income',
                                    dag=dag)

    calculate_expenses = DummyOperator(task_id='operations_calculate_expenses',
                                       dag=dag)

    outcome_bookkeep = DummyOperator(task_id='finances_outcome_bookkeep',
                                     dag=dag)

    operations_a_report = DummyOperator(task_id='operations_a_report',
                                        dag=dag)

    finance_a_report = DummyOperator(task_id='finance_a_report',
                                     dag=dag)

    calculate_revenue >> income_bookkeep >> validate_income
    calculate_expenses >> outcome_bookkeep

    validate_income >> finance_a_report
    outcome_bookkeep >> finance_a_report

    income_bookkeep >> operations_a_report
    outcome_bookkeep >> operations_a_report

Here we can see how we have, in fact, 2 processes with dependencies, in the same DAG

Two departments, two processes

The objective of this exercise is to divide this DAG in 2, but we want to maintain the dependencies. To do this, we will have to follow a specific strategy, in this case, we have selected the operating DAG as the main one, and the financial one as the secondary. Basically because the finance DAG depends first on the operational tasks.

To develop the solution, we are going to make use of 2 AirflowOperators, TriggerDagRunOperator, which is used to launch the execution of an external DAG, and ExternalTaskSensor, which is used to wait for a Task of an external DAG.

Operations Process

# operations_a.py
from datetime import timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.dates import days_ago

DAG_NAME = 'operations_a'
DEFAULT_ARGS = {
    'owner': 'Operations',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['operations+airflow@example.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=10),
    'email_on_failure': False,
    'email_on_retry': False
}

with DAG(dag_id=DAG_NAME,
         default_args=DEFAULT_ARGS,
         dagrun_timeout=timedelta(minutes=10),
         schedule_interval=None) as dag:
    calculate_revenue = DummyOperator(task_id='calculate_revenue',
                                      dag=dag)

    trigger_finances_a = TriggerDagRunOperator(task_id='trigger_finances_a',
                                               dag=dag,
                                               trigger_dag_id='finances_a',
                                               execution_date='{{ execution_date }}')

    calculate_expenses = DummyOperator(task_id='calculate_expenses',
                                       dag=dag)

    wait_finances_a_expenses_bookkept = ExternalTaskSensor(
        task_id='wait_finances_a_outcome_bookeep',
        dag=dag,
        external_dag_id='finances_a',
        external_task_id='outcome_bookkeep')

    operations_a_report = DummyOperator(task_id='operations_a_report',
                                        dag=dag)

    calculate_revenue >> trigger_finances_a >> calculate_expenses
    calculate_expenses >> wait_finances_a_expenses_bookkept >> operations_a_report

Here, we can observe that the Operators in charge of launching an external DAG are shown in pink, and the external task sensor Operators in dark blue.

Finance process

# finances_a.py
from datetime import timedelta

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.dates import days_ago

DAG_NAME = 'finances_a'
DEFAULT_ARGS = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['finances+airflow@example.com'],
    'retries': 3,
    'retry_delay': timedelta(seconds=10),
    'email_on_failure': False,
    'email_on_retry': False
}

with DAG(dag_id=DAG_NAME,
         default_args=DEFAULT_ARGS,
         dagrun_timeout=timedelta(minutes=10),
         schedule_interval=None) as dag:

    income_bookkeep = DummyOperator(task_id='income_bookkeep',
                                    dag=dag)

    validate_income = DummyOperator(task_id='validate_income',
                                    dag=dag)

    wait_operations_a_calculate_expenses = ExternalTaskSensor(
        task_id='wait_operations_a_calculate_expenses',
        dag=dag,
        external_dag_id='operations_a',
        external_task_id='calculate_expenses',
        timeout=60)

    outcome_bookkeep = DummyOperator(task_id='outcome_bookkeep',
                                     dag=dag)


    finances_a_report = DummyOperator(task_id='finances_a_report',
                                      dag=dag)

    income_bookkeep >> validate_income >> wait_operations_a_calculate_expenses
    wait_operations_a_calculate_expenses >> finances_a_report

In this case, we see the external task sensor, in blue.

Solved!

In this way, when the Operational DAG is executed, it will be responsible for launching the Finance DAG in due course, and the departments can continue to evolve their processes independently and taking into account only the dependencies they have on each other.

You can download the complete code from our repository damavis/advanced-airflow

If you found this post useful, we encourage you to see more similar articles from the Data Engineering category in our blog and to share it with your contacts. See you in networks!
Cristòfol Torrens
Cristòfol Torrens
Articles: 8