Dynamic Task Mapping in Airflow 2.3.0

Introduction

One of the most outstanding new features of Airflow 2.3.0 is Dynamic Task Mapping. This new feature adds the possibility of creating tasks dynamically at runtime. Thanks to this we can change the number of such tasks in our DAG based on the data handled during an execution.

The result is similar to having a for loop, where for each element a task is invoked. 

Syntax

To perform mapping, simply use the expand() method of a task and pass it a list or a dictionary. 

@task
def add_one(x):
   return x + 1
add_one.expand(x=[1, 2, 3])  # Result: task1 -> 2, task2 -> 3, task3 -> 4

It is also possible to set constant arguments using the method partial().

@task
def add(x, n):
   return x + n
add.partial(n=2).expand(x=[1, 2, 3])# Result: task1 -> 3, task2 ->4,task3-> 5

Limitations

Currently it is only possible to expand a list or dictionary that can be passed as an input parameter in our DAG or as a result of a previous task that has been saved in XCOM. In case a different data type is passed, an UnmappableXComTypePushed exception will be thrown at runtime.

On the other hand, if a dynamic task fails the whole set of tasks are marked as failed. It is possible that future updates will add functionality to improve error handling in dynamic tasks.

Simple example

A field that can potentially benefit from the use of this new functionality is log processing due to its high variability. Below is a DAG that reads a log file (randomly generated for this example), filters the logs with error status and performs a mapping on this set to process each one of them in a different task:

import time

from airflow.decorators import task, dag
from datetime import datetime
from pathlib import Path


logs_file = Path("/data/dummy_logs/random_logs.log")


def get_process_id(line: str) -> str:
   return line.split(" ")[-1]


@dag(start_date=datetime(2022, 5, 16),
    schedule_interval="@daily",
    dag_id="repair_components_dummy",
    catchup=False)
def my_dag():
   @task
   def get_failed_processes(log_file: Path) -> list[str]:
       failed_processes = []

       with open(log_file, 'r') as f:
           for line in f:
               if line.startswith("ERROR"):
                   failed_processes.append(get_process_id(line))

       return failed_processes

   @task
   def repair_process(process_id: str):
       time.sleep(0.5)  # Simulate computation time
       print(f"process {process_id} repaired")

   repair_process.expand(process_id=get_failed_processes(logs_file))

repair_logs_dummy = my_dag()

MapReduce

Creating mappings to distribute tasks and allow their parallel execution is very powerful, but if we also chain the dynamically generated nodes to another one we can perform map reduce operations. To do this, we simply need the post-mapping task to have as arguments a list or dictionary that matches the output of the mapped tasks.

MapReduce example

In the following example we will show a DAG to count the number of error messages in several log files. Each of these files must be processed in a different task in order to obtain the maximum possible parallelism.

For this, the first task returns a list with the elements of a log folder, then a task is created dynamically for each of these files that are responsible for counting the number of error logs for their respective input and finally a task uses the sum operation to reduce the results of the previous dynamic tasks.

import time
import glob

from airflow.decorators import task, dag
from datetime import datetime
from pathlib import Path

logs_folder = Path("/data/dummy_logs/random_logs")


def get_process_id(line: str) -> str:
   return line.split(" ")[-1]


@dag(start_date=datetime(2022, 5, 16),
    schedule_interval="@daily",
    dag_id="map_reduce_logs_dummy",
    catchup=False)
def my_dag():
   @task
   def get_log_files(log_folder: Path | str) -> list[str]:
       return glob.glob(str(log_folder) + "/*")

   @task
   def get_num_of_failed_processes(log_file: str) -> int:
       failed_processes = 0

       with open(log_file, 'r') as f:
           for line in f:
               time.sleep(0.05)  # Simulate computation time
               if line.startswith("ERROR"):
                   failed_processes += 1

       return failed_processes

   @task
   def sum_processes(n: list[int]):
       print(sum(n))

   sum_processes(get_num_of_failed_processes.expand(log_file=get_log_files(logs_folder)))


repair_logs_dummy = my_dag()

Graphical interface

In the grid view (the tree view has been removed in Airflow 2.3.0) dynamic tasks are viewed as if they were a single task, although their name appears with two square brackets:

On the other hand, in the network view it also looks like a single task. However, if it has been executed correctly, you can see the number of dynamic tasks that have been executed.

If we select the dynamic task, a drop-down menu appears to select the specific task to obtain its details.

Configuration parameters

You can limit certain functionalities of the dynamic mapping by modifying the following parameters in the airflow configuration file:

  1. max_map_length: Maximum number of tasks that the expand method can create. 
  2. max_active_tis_per_dag: Maximum number of parallel tasks created by the same call to expand().

Conclusions

In conclusion, the dynamic task mapping introduced in airflow 2.3.0 allows us to create tasks at runtime in order to adapt our DAG to environments with variable inputs such as log processing. Moreover, by combining it with a reduce operation, it is possible to create networks that follow the MapReduce model in a simple way, thus being able to execute tasks in parallel.

Videotutorial

Check out the video tutorial on our YouTube channel. Don’t forget to enable English subtitles:

If you found this article interesting, we encourage you to visit the Software category of our blog to see posts similar to this one and to share it in networks with all your contacts. Don’t forget to mention us to let us know your opinion @Damavisstudio. See you soon!
Default image
Francisco Arenas
Articles: 2