In this article we are going to tell you some ways to solve problems related to the complexity of data engineering itself. An Airflow DAG can become very complex if we start including all dependencies in it, and furthermore, this strategy allows us to decouple the processes, for example, by teams of data engineers, by departments, or any other criteria.
Here is an example of an hypothetical case, see the problem and solve it.
Two departments, one process
Let’s imagine that our company has two departments where it is necessary to have separate daily processes, but which are interdependent. Describe these supposed processes, with their processing times, and we will be able to observe the problem. Let’s assume that the interdependence is in the Reports, where each of them takes into account the process of the other.
Operational
- Sales Calculation (1h)
- Vendor purchase calculation (1h)
- Report
Finance
- Sales book keep (5m)
- Validation of sales with accounts (2h)
- Book keep of purchases + other expenses (5m)
- Report
This would be the DAG code and its representation in the Airflow UI:
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.dates import days_ago
DAG_NAME = 'problem_a'
DEFAULT_ARGS = {
'owner': 'Operations+Finance',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['operations+airflow@example.com', 'finance+airflow@example.com'],
'retries': 3,
'retry_delay': timedelta(minutes=10),
'email_on_failure': False,
'email_on_retry': False
}
with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(minutes=10),
schedule_interval=None) as dag:
calculate_revenue = DummyOperator(task_id='operations_calculate_revenue',
dag=dag)
income_bookkeep = DummyOperator(task_id='finances_income_bookkeep',
dag=dag)
validate_income = DummyOperator(task_id='finances_validate_income',
dag=dag)
calculate_expenses = DummyOperator(task_id='operations_calculate_expenses',
dag=dag)
outcome_bookkeep = DummyOperator(task_id='finances_outcome_bookkeep',
dag=dag)
operations_a_report = DummyOperator(task_id='operations_a_report',
dag=dag)
finance_a_report = DummyOperator(task_id='finance_a_report',
dag=dag)
calculate_revenue >> income_bookkeep >> validate_income
calculate_expenses >> outcome_bookkeep
validate_income >> finance_a_report
outcome_bookkeep >> finance_a_report
income_bookkeep >> operations_a_report
outcome_bookkeep >> operations_a_report
Here we can see how we have, in fact, 2 processes with dependencies, in the same DAG
Two departments, two processes
The objective of this exercise is to divide this DAG in 2, but we want to maintain the dependencies. To do this, we will have to follow a specific strategy, in this case, we have selected the operating DAG as the main one, and the financial one as the secondary. Basically because the finance DAG depends first on the operational tasks.
To develop the solution, we are going to make use of 2 AirflowOperators, TriggerDagRunOperator, which is used to launch the execution of an external DAG, and ExternalTaskSensor, which is used to wait for a Task of an external DAG.
Operations Process
# operations_a.py
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.dates import days_ago
DAG_NAME = 'operations_a'
DEFAULT_ARGS = {
'owner': 'Operations',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['operations+airflow@example.com'],
'retries': 3,
'retry_delay': timedelta(minutes=10),
'email_on_failure': False,
'email_on_retry': False
}
with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(minutes=10),
schedule_interval=None) as dag:
calculate_revenue = DummyOperator(task_id='calculate_revenue',
dag=dag)
trigger_finances_a = TriggerDagRunOperator(task_id='trigger_finances_a',
dag=dag,
trigger_dag_id='finances_a',
execution_date='{{ execution_date }}')
calculate_expenses = DummyOperator(task_id='calculate_expenses',
dag=dag)
wait_finances_a_expenses_bookkept = ExternalTaskSensor(
task_id='wait_finances_a_outcome_bookeep',
dag=dag,
external_dag_id='finances_a',
external_task_id='outcome_bookkeep')
operations_a_report = DummyOperator(task_id='operations_a_report',
dag=dag)
calculate_revenue >> trigger_finances_a >> calculate_expenses
calculate_expenses >> wait_finances_a_expenses_bookkept >> operations_a_report
Here, we can observe that the Operators in charge of launching an external DAG are shown in pink, and the external task sensor Operators in dark blue.
Finance process
# finances_a.py
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.dates import days_ago
DAG_NAME = 'finances_a'
DEFAULT_ARGS = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['finances+airflow@example.com'],
'retries': 3,
'retry_delay': timedelta(seconds=10),
'email_on_failure': False,
'email_on_retry': False
}
with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(minutes=10),
schedule_interval=None) as dag:
income_bookkeep = DummyOperator(task_id='income_bookkeep',
dag=dag)
validate_income = DummyOperator(task_id='validate_income',
dag=dag)
wait_operations_a_calculate_expenses = ExternalTaskSensor(
task_id='wait_operations_a_calculate_expenses',
dag=dag,
external_dag_id='operations_a',
external_task_id='calculate_expenses',
timeout=60)
outcome_bookkeep = DummyOperator(task_id='outcome_bookkeep',
dag=dag)
finances_a_report = DummyOperator(task_id='finances_a_report',
dag=dag)
income_bookkeep >> validate_income >> wait_operations_a_calculate_expenses
wait_operations_a_calculate_expenses >> finances_a_report
In this case, we see the external task sensor, in blue.
Solved!
In this way, when the Operational DAG is executed, it will be responsible for launching the Finance DAG in due course, and the departments can continue to evolve their processes independently and taking into account only the dependencies they have on each other.
You can download the complete code from our repository damavis/advanced-airflow