Sensores de Apache Airflow en detalle

Introducción

Apache Airflow es una herramienta de código abierto para la orquestación de flujos de trabajo muy utilizada en el ámbito de la ingeniería de datos. Puedes echar un vistazo a este otro post donde hacíamos una introducción a los Conceptos Básicos de Apache Airflow

En este artículo, entramos en detalle en un tipo de operador especial: el sensor. El sensor es un operador que se utiliza cuando en un DAG (Directed Acyclic Graph) se requiere esperar hasta que se cumpla una condición para continuar con las tareas. Esta condición puede ser de cualquier tipo, como por ejemplo, esperar a que pase cierto tiempo (TimeDeltaSensor), la existencia de un fichero (FileSensor) o aguardar a que otra tarea en otro DAG haya terminado (ExternalTaskSensor).

Cómo funcionan los sensores de Apache Airflow

Un sensor es un operador más: sirve como plantilla para generar tareas dentro de un DAG. Si vemos el código del sensor base que proporciona Airflow (BaseSensorOperator), podemos ver qué hereda del operador base (BaseOperator). 

El comportamiento de los sensores se puede entender mirando la implementación del método execute que hace la clase BaseSensorOperator. Un bucle while se encarga de reevaluar la condición del sensor hasta que ésta sea exitosa (o salte alguna excepción). La evaluación de esta condición se ejecuta en el método abstracto poke, que será implementado por cada sensor que derive de esta clase base. Resumiendo, las implementaciones de un operador normal implementan el método execute, mientras que en el sensor se implementa el método poke.

Aparte de los argumentos que permiten las implementaciones de los sensores (como TimeDeltaSensor, FileSensor o ExternalTaskSensor), el sensor base ofrece algunos parámetros interesantes que modifican el comportamiento del método execute

Cuando no se cumple la condición del método poke ni salta ninguna excepción, el sensor espera un tiempo dormido (sleep) antes de seguir con la siguiente iteración del bucle. El parámetro poke_interval permite ajustar este tiempo mientras que el parámetro exponential_backoff hace que este tiempo incremente exponencialmente (esto puede ser útil para evitar sobrecargar un servicio). 

Este código de ejemplo muestra cómo se puede ver el log de la siguiente tarea sensor_time_delta, si se realiza una ejecución manual del DAG:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync


DEFAULT_DAG_ARGS = {
    'start_date': datetime(2023, 6, 4),
}

dag = DAG(
    'sensors_overview',
    default_args=DEFAULT_DAG_ARGS,
    schedule_interval=None
)

sensor_time_delta = TimeDeltaSensor(
   task_id='sensor_time_delta',
   dag=dag,
   delta=timedelta(minutes=1),
   poke_interval=10,  # seconds
)

Si a este mismo sensor le aplicamos exponential_backoff=True, el log es el siguiente:

Por otro lado, el parámetro timeout permite romper el bucle en caso de que se sobrepase un límite de tiempo y marcar la tarea como fallida (u omitida, en caso de usar el parámetro soft_fail).

Para el próximo ejemplo, configuramos el parámetro timeout a 30 segundos. Esta tarea siempre falla ya que la condición de éxito del sensor es un tiempo superior a este timeout.

sensor_fail = TimeDeltaSensor(
    task_id='sensor_fail',
    dag=dag,
    delta=timedelta(minutes=1),
    poke_interval=10,  # seconds
    timeout=30,  # seconds
)

El resultado es el siguiente, con la tarea marcada como FAILED:

Si a este mismo sensor le aplicamos soft_fail=True, obtenemos el siguiente resultado, con la tarea marcada como SKIPPED:

Modos

Quizá no lo hemos notado previamente, pero el sensor, tal y como lo hemos visto antes, puede tener un problema si se dan ciertas condiciones. Un sensor cuya condición no resuelve es, al fin y al cabo, una tarea en ejecución, y esto supone un worker de Airflow ocupado, en muchos casos sin hacer nada (esperando al siguiente intervalo). Esto puede ser un inconveniente con cualquier tarea larga que, por ejemplo, espere la respuesta de un servicio. No obstante, en el caso del sensor, esto es mucho más obvio, especialmente si se ha configurado un timeout largo.

Por esta razón, los sensores en Airflow permiten otro parámetro: mode. Este parámetro permite cambiar del modo por defecto (poke) al modo de reejecución (reschedule). En este nuevo modo, cuando la condición del sensor no se cumple, se lanza la excepción AirflowRescheduleException que rompe el bucle y hace que la tarea se marque como estado up_for_reschedule (en color cian). La tarea se envía al scheduler de Airflow para que la reprograme para el siguiente poke_interval. De esta manera, se libera al worker del período entre los intervalos para poder dedicar sus recursos a otro trabajo.

En el siguiente ejemplo, se pone el modo del sensor anterior a reschedule

sensor_reschedule = TimeDeltaSensor(
    task_id='sensor_reschedule',
    dag=dag,
    delta=timedelta(minutes=1),
    poke_interval=10,  # seconds
    mode='reschedule',  # seconds
)

En la UI de Airflow se puede observar que la tarea se marca como estado UP_FOR_RESCHEDULE en los intervalos de espera. En el log, se puede ver cómo al pasar 10 segundos (intervalo de poke), la tarea vuelve a empezar a ejecutarse (se pone en cola).

Así pues, el modo de reejecución es útil para sensores que pueden tardar mucho tiempo en cumplir la condición. También es de interés utilizar este modo si se pretende utilizar muchos sensores a la vez. No obstante, no se debería utilizar en sensores con poke_interval muy cortos, ya que esto podría sobrecargar el scheduler de Airflow. En la documentación recomiendan intervalos más largos de 1 minuto.

Deferrable operators

El modo de reejecución consigue liberar recursos marcando la tarea como up_for_reschedule y reprogramándola para el siguiente intervalo. No obstante, tiene sus límites:

  • Los intervalos son fijos (o exponencialmente fijos). En algunos casos, sería conveniente.
  • Este modo es único de los sensores. Sería idóneo que fuera un proceso generalizable para aquellas tareas largas que esperan alguna condición externa a Airflow.

Con estos dos puntos en mente, Airflow decide dar un paso más y ofrece operadores aplazables (o deferrable operators), que puede aplicarse a cualquier operador. Cuando un operador es aplazado, se marca en color lila en la UI y se crea un disparador (o trigger), que consiste en un pequeño código asíncrono de Python que espera a que se cumpla una condición. Los disparadores van asociados a un callback, que se define a través de un método en el operador. Cuando se dispara, la tarea continúa ejecutando el método pero bajo una nueva instancia del operador (no mantiene el estado del mismo).

De esta manera, se consigue separar el proceso de espera del operador y ahora se puede liberar al worker de esta parada. El disparador se serializa y se envía a un nuevo componente de Airflow llamado triggerer, que se tiene que configurar tal y como se hace con otros componentes (scheduler, webserver, etc.).

A continuación, se crea una tarea utilizando esta vez TimeDeltaSensorAsync, que está preparado para ser aplazable:

sensor_defer = TimeDeltaSensorAsync(
    task_id='sensor_defer',
    dag=dag,
    delta=timedelta(minutes=1)
)

Se puede observar que, en este caso, la tarea se marca como DEFERRED hasta que se cumpla la condición (timedelta):

Si no está configurado el triggerer, esta tarea se quedará en este estado hasta que se llegue al tiempo de timeout.

Conclusión

En este post se ha visto en detalle cómo funcionan los sensores de Apache Airflow. Los sensores permiten crear tareas que esperan que una condición se cumpla. Los parámetros del sensor base permiten modificar la frecuencia en que se evalúa la condición del sensor. 

Seguidamente, se ha visto uno de los principales problemas de los sensores: al tener tareas que consisten solamente en esperar, se están ocupando los recursos del worker sin hacer uso de ellos. Se han visto dos posibles soluciones a este problema. El primero es configurar el sensor en modo de reejecución: cuando la condición del sensor no se cumple se reprograma la tarea para después de un intervalo de tiempo. De esta manera, los recursos del worker se pueden utilizar en otro sitio y la tarea volverá a ejecutarse cuando el scheduler la asigne.

La segunda opción es utilizar operadores aplazables. Esta opción es mucho más genérica, ya que se basa en disparadores asíncronos (y no solamente en intervalos de tiempo) y se puede utilizar en cualquier operador. Para poder utilizar operadores aplazables, hay que configurar un componente adicional de Airflow llamado triggerer.

Si te ha gustado el post, te animamos a que visites la categoría Software para ver otros artículos similares a este y a que lo compartas en las redes sociales. ¡Hasta pronto!
Guillermo Camps
Guillermo Camps
Artículos: 14