Plugin de Pentaho PDI para Airflow

Programa, orquesta y monitoriza tus tareas de Kettle con Airflow con este plugin de Pentaho.

Programa, orquesta y monitoriza tus tareas de Kettle con este plugin de Pentaho PDI para Airflow

En Damavis sabemos la importancia que tiene el procesado de los datos. Extraer, limpiar, transformar, agregar, cargar o cruzar múltiples fuentes de datos permiten a nuestros clientes disponer de Insights o Modelos predictivos usando Machine Learning. En definitiva, obtener valor a partir de esos datos.

Pero mantener, ejecutar, supervisar todas estas ETLs y Agregaciones puede convertirse en una tarea complicada, sobretodo, cuando empezamos a tener muchas dependencias entre estos procesos.

¿Que es Pentaho PDI (Kettle)?

Pentaho Data Integration es una herramienta principalmente diseñada para ETLs (Extraction, Transform, Load) sin necesidad de conocimientos de programación.

Gran parte de nuestros clientes ya gestionan este tipo de tareas con esta herramienta en Business Intelligence. Los motivos de la elección de esta herramienta pueden ser varios, pero los más destacables son, que es OpenSource, es muy completa y además fácil de usar, soportando cargas de trabajo tradicionales pero también de BigData o IoT.

¿Donde está el problema?

El problema está en que la versión OpenSource de Pentaho PDI carece de un orquestador de las tareas, y no podemos establecer relaciones entre distintas Transformaciones o Jobs, ni programar esas tareas para que se ejecuten a partir de una hora o respetando sus dependencias. Aquí es donde entra en juego Apache Airflow y el plugin desarrollado por Damavis.

¿Que es Apache Airflow?

Apache Airflow es una plataforma para crear, programar y monitorizar de forma centralizada flujos de trabajo de datos en batch. Esta herramienta resuelve problemas como por ejemplo, ejecutar las tareas con un scheduler, gestionar los reintentos en caso de error, hacer una gestión de error personalizado, establecer relaciones de dependencia entre tareas para optimizar los tiempos de ejecución de todo el pipeline y mucho más

El plugin

El plugin airflow-pentaho-plugin integra estas dos plataformas para permitir a los Ingenieros de Datos orquestarlo todo de forma elegante, y sobretodo, hacernos la vida más fácil.

A continuación explicamos cómo funciona y cómo puedes instalarlo en tu entorno de trabajo y empezar a aprovecharte de las ventajas que te ofrece.

¿Cómo funciona el plugin?

El funcionamiento del plugin es relativamente sencillo. Mediante el uso de la API REST del servicio de Carte de Pentaho PDI, Apache Airflow es capaz de lanzar y monitorizar tanto Jobs como Transformations, integrándose completamente. Esto significa que desde Airflow podremos ver el estado de estas tareas, cuánto han tardado, así como los logs.

Prerequisitos

  • Una instalación de Apache Airflow funcionando.
  • Una instalación de Pentaho PDI configurada.
  • Una instalación del servicio Carte funcionando.

Instalación

La instalación es sencilla y solo requiere la instalación de un paquete python, que podrás instalar usando la herramienta pip, como en el siguiente ejemplo. Recuerda que para usarlo durante el desarrollo, también deberás instalarlo en tu entorno de trabajo de Airflow.

~$ pip install airflow-pentaho-plugin

Configuración

Para poder usar el plugin deberemos configurar una nueva conexión en Airflow. Esto podemos hacerlo desde la Interfaz de usuario, conocida también como Airflow Webserver.

Desde la página inicial, pinchamos en Admin y después en Connections; aquí pincharemos en el boton Add, y rellenaremos el formulario de la siguiente forma:

CampoValor
Conn Idpdi_default
Conn TypeHTTP
HostNombre de nuestro host de Carte
LoginNombre de usuario del Repositorio de Pentaho Server
PasswordContraseña del usuario del Repositorio de Pentaho Server
Extra
{"rep": "Nombre del repositorio", "carte_username": "Usuario de Carte", "carte_password": "Contraseña de Carte"}
Valores o referencias de configuración de la conexión.

Una vez añadamos la conexión, ya podemos empezar a desarrollar DAGs de Airflow que ejecuten tareas de Pentaho PDI.

DAG de ejemplo

from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago

from airflow_pentaho.operators.KitchenOperator import KitchenOperator
from airflow_pentaho.operators.PanOperator import PanOperator
from airflow_pentaho.operators.CarteJobOperator import CarteJobOperator
from airflow_pentaho.operators.CarteTransOperator import CarteTransOperator

"""
# For versions before 2.0
from airflow.operators.airflow_pentaho import KitchenOperator
from airflow.operators.airflow_pentaho import PanOperator
from airflow.operators.airflow_pentaho import CarteJobOperator
from airflow.operators.airflow_pentaho import CarteTransOperator
"""

DAG_NAME = "pdi_example_flow"
DEFAULT_ARGS = {
    'owner': 'Examples',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['airflow@example.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=10),
    'email_on_failure': False,
    'email_on_retry': False
}

with DAG(dag_id=DAG_NAME,
         default_args=DEFAULT_ARGS,
         dagrun_timeout=timedelta(hours=2),
         schedule_interval='30 0 * * *') as dag:

    trans1 = CarteTransOperator(
        dag=dag,
        task_id="trans1",
        trans="/home/bi/test_trans_1",
        params={"date": "{{ ds }}"})

    trans2 = CarteTransOperator(
        dag=dag,
        task_id="trans2",
        trans="/home/bi/test_trans_2",
        params={"date": "{{ ds }}"})

    job1 = CarteJobOperator(
        dag=dag,
        task_id="job1",
        job="/home/bi/test_job_1",
        params={"date": "{{ ds }}"})

    trans3 = CarteTransOperator(
        dag=dag,
        task_id="trans3",
        trans="/home/bi/test_trans_3",
        params={"date": "{{ ds }}"})

    trans4 = CarteTransOperator(
        dag=dag,
        task_id="trans4",
        trans="/home/bi/test_trans_4",
        params={"date": "{{ ds }}"})

    trans1 >> trans2 >> job1
    job1 >> trans3
    job1 >> trans4
  

Este sería el código interpretado en forma de DAG:

Pentaho
Podemos observar las tareas juntamente con sus dependencias y ver su estado.

¡Así de fácil!

Y de esta forma tan sencilla y a la vez elegante, puedes orquestar tus flujos de datos en batch creados en Pentaho PDI desde Airflow.

A continuación te dejamos algunos enlaces por si quieres saber más sobre nosotros, sobre el proyecto o contribuir:

Si te ha parecido útil este post, te animamos a visitar la categoría Data Engineering de nuestro blog para ver más artículos como este. No olvides compartirlo con tus contactos para que ellos también puedan leerlo y opinar. ¡Nos vemos en redes!

Cristòfol Torrens
Cristòfol Torrens
Artículos: 8