Apache Airflow sensors in detail

Apache Airflow is an open source tool for workflow orchestration widely used in the field of data engineering. You can take a look at this other blog post where we made an introduction to Basics on Apache Airflow

In this article, we go into detail on a special type of operator: the sensor. The sensor is an operator that is used when in a DAG (Directed Acyclic Graph) it is required to wait until a condition is met to continue with the tasks. This condition can be of any type, such as waiting for a certain time to pass (TimeDeltaSensor), waiting for the existence of a file (FileSensor) or waiting until another task in another DAG has finished (ExternalTaskSensor).

How Apache Airflow sensors work

A sensor is just another operator: it serves as a template to generate tasks within a DAG. If we look at the code of the base sensor provided by Airflow (BaseSensorOperator), we can see that it inherits from the base operator (BaseOperator). 

The behavior of the sensors can be understood by looking at the implementation of the execute method that the BaseSensorOperator class does. A while loop is in charge of reevaluating the sensor condition until it is successful (or an exception is raised). The evaluation of this condition is executed in the abstract method poke, which will be implemented by each sensor derived from this base class. Summarizing, the implementations of a normal operator implement the execute method, while the poke method is implemented in the sensor.

Besides the arguments allowed by the sensor implementations (such as TimeDeltaSensor, FileSensor or ExternalTaskSensor), the base sensor offers some interesting parameters that modify the behavior of the execute method. 

When the condition of the poke method is not met and no exception is raised, the sensor waits for a sleep time before continuing with the next iteration of the loop. The poke_interval parameter allows to adjust this time while the exponential_backoff parameter makes this time increase exponentially (this can be useful to avoid overloading a service). 

In this example, you can see the log of the following task sensor_time_delta, if a manual execution of the DAG is performed:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync


DEFAULT_DAG_ARGS = {
    'start_date': datetime(2023, 6, 4),
}

dag = DAG(
    'sensors_overview',
    default_args=DEFAULT_DAG_ARGS,
    schedule_interval=None
)

sensor_time_delta = TimeDeltaSensor(
   task_id='sensor_time_delta',
   dag=dag,
   delta=timedelta(minutes=1),
   poke_interval=10,  # seconds
)

If we apply exponential_backoff=True to this same sensor, the log is as follows:

On the other hand, the timeout parameter allows to break the loop in case a time limit is exceeded and to mark the task as failed (or skipped, in case of using the soft_fail parameter).

In the following example, we set the timeout parameter to 30 seconds. This task always fails because the success condition of the sensor is a time greater than this timeout.

sensor_fail = TimeDeltaSensor(
    task_id='sensor_fail',
    dag=dag,
    delta=timedelta(minutes=1),
    poke_interval=10,  # seconds
    timeout=30,  # seconds
)

The result is as follows, with the task marked FAILED:

If we apply soft_fail=True to this same sensor, we obtain the following result, with the task marked as SKIPPED:

Modes

Perhaps we have not noticed it previously, but the sensor, as we have seen it before, can have a problem if certain conditions are met. A sensor whose condition does not resolve is, after all, a running task, and this means a busy Airflow worker, in many cases doing nothing (waiting for the next interval). This can be a handicap with any long task that, for example, waits for a response from a service. However, in the case of the sensor, this is much more obvious, especially if a long timeout has been configured.

For this reason, sensors in Airflow allow another parameter: mode. This parameter allows to switch from default mode (poke) to reschedule mode (reschedule). In this new mode, when the sensor condition is not met, the AirflowRescheduleException is thrown which breaks the loop and causes the task to be marked as up_for_reschedule status (in cyan color). The task is sent to the Airflow scheduler to be rescheduled for the next poke_interval. In this way, the worker is freed from the period between the intervals so that it can devote its resources to other work.

In the following example, the previous sensor mode is set to reschedule:

sensor_reschedule = TimeDeltaSensor(
    task_id='sensor_reschedule',
    dag=dag,
    delta=timedelta(minutes=1),
    poke_interval=10,  # seconds
    mode='reschedule',  # seconds
)

In the Airflow UI, you can see that the task is marked as UP_FOR_RESCHEDULE status in the waiting intervals. In the log, you can see how after 10 seconds (poke interval), the task starts running again (it is queued).

The rerun mode is useful for sensors that may take a long time to fulfill the condition. It is also of interest to use this mode if you intend to use many sensors at once. However, it should not be used for sensors with very short poke_interval, as this could overload the Airflow scheduler. The documentation recommends intervals longer than 1 minute.

Deferrable operators

The rerun mode manages to free resources by marking the task as up_for_reschedule and rescheduling it for the next interval. However, it has its limits:

  • The intervals are fixed (or exponentially fixed). In some cases, it would be convenient.
  • This mode is unique to the sensors. It would be ideal if it were a generalizable process for those long tasks that wait for some condition external to Airflow.

With these two points in mind, Airflow decides to go a step further and offers deferrable operators, which can be applied to any operator. When an operator is deferrable, it is marked in purple in the UI and a trigger is created, which consists of a small asynchronous Python code that waits for a condition to be met. Triggers are associated with a callback, which is defined through a method in the operator. When triggered, the task continues to execute the method but under a new instance of the operator (it does not maintain the state of the operator).

In this way, the waiting process is separated from the operator and the worker can now be released from this stall. The trigger is serialized and sent to a new Airflow component called triggerer, which has to be configured as it is done with other components (scheduler, webserver, etc.).

In the following example, a task is created this time using TimeDeltaSensorAsync, which is set up to be deferrable:

sensor_defer = TimeDeltaSensorAsync(
    task_id='sensor_defer',
    dag=dag,
    delta=timedelta(minutes=1)
)

It can be seen that, in this case, the task is marked as DEFERRED until the condition (timedelta) is met:

If the triggerer is not configured, this task will remain in this state until the timeout time is reached.

Conclusion

In this post we have seen in detail how Apache Airflow sensors work. Sensors allow you to create tasks that wait for a condition to be met. The parameters of the base sensor allow to modify the frequency at which the sensor condition is evaluated. 

Next, we have seen one of the main problems of sensors: by having tasks that consist only of waiting, they are occupying the worker’s resources without making use of them. We have seen two possible solutions to this problem. The first one is to configure the sensor in re-execution mode: when the sensor condition is not met, the task is rescheduled for after a time interval. In this way, the worker’s resources can be used elsewhere and the task will run again when the scheduler assigns it.

The second option is to use deferrable operators. This option is much more generic, since it is based on asynchronous triggers (and not only on time intervals) and can be used in any operator. In order to use deferrable operators, an additional Airflow component called triggerer must be configured.

If you liked the post, we encourage you to visit the Software category to see other articles similar to this one and to share it on social networks. See you soon!
Guillermo Camps
Guillermo Camps
Articles: 16