Programa, orquesta y monitoriza tus tareas de Kettle con este plugin de Pentaho PDI para Airflow
En Damavis sabemos la importancia que tiene el procesado de los datos. Extraer, limpiar, transformar, agregar, cargar o cruzar múltiples fuentes de datos permiten a nuestros clientes disponer de Insights o Modelos predictivos usando Machine Learning. En definitiva, obtener valor a partir de esos datos.
Pero mantener, ejecutar, supervisar todas estas ETLs y Agregaciones puede convertirse en una tarea complicada, sobretodo, cuando empezamos a tener muchas dependencias entre estos procesos.
¿Qué es Pentaho PDI (Kettle)?
Pentaho Data Integration es una herramienta principalmente diseñada para ETLs (Extraction, Transform, Load) sin necesidad de conocimientos de programación.
Gran parte de nuestros clientes ya gestionan este tipo de tareas con esta herramienta en Business Intelligence. Los motivos de la elección de esta herramienta pueden ser varios, pero los más destacables son, que es OpenSource, es muy completa y además fácil de usar, soportando cargas de trabajo tradicionales pero también de BigData o IoT.
¿Dónde está el problema?
El problema está en que la versión OpenSource de Pentaho PDI carece de un orquestador de las tareas, y no podemos establecer relaciones entre distintas Transformaciones o Jobs, ni programar esas tareas para que se ejecuten a partir de una hora o respetando sus dependencias. Aquí es donde entra en juego Apache Airflow y el plugin desarrollado por Damavis.
¿Qué es Apache Airflow?
Apache Airflow es una plataforma para crear, programar y monitorizar de forma centralizada flujos de trabajo de datos en batch. Esta herramienta resuelve problemas como por ejemplo, ejecutar las tareas con un scheduler, gestionar los reintentos en caso de error, hacer una gestión de error personalizado, establecer relaciones de dependencia entre tareas para optimizar los tiempos de ejecución de todo el pipeline y mucho más
El plugin
El plugin airflow-pentaho-plugin integra estas dos plataformas para permitir a los Ingenieros de Datos orquestarlo todo de forma elegante, y sobretodo, hacernos la vida más fácil.
A continuación explicamos cómo funciona y cómo puedes instalarlo en tu entorno de trabajo y empezar a aprovecharte de las ventajas que te ofrece.
¿Cómo funciona el plugin?
El funcionamiento del plugin es relativamente sencillo. Mediante el uso de la API REST del servicio de Carte de Pentaho PDI, Apache Airflow es capaz de lanzar y monitorizar tanto Jobs como Transformations, integrándose completamente. Esto significa que desde Airflow podremos ver el estado de estas tareas, cuánto han tardado, así como los logs.
Prerequisitos
- Una instalación de Apache Airflow funcionando.
- Una instalación de Pentaho PDI configurada.
- Una instalación del servicio Carte funcionando.
Instalación
La instalación es sencilla y solo requiere la instalación de un paquete python, que podrás instalar usando la herramienta pip, como en el siguiente ejemplo. Recuerda que para usarlo durante el desarrollo, también deberás instalarlo en tu entorno de trabajo de Airflow.
~$ pip install airflow-pentaho-plugin
Configuración
Para poder usar el plugin deberemos configurar una nueva conexión en Airflow. Esto podemos hacerlo desde la Interfaz de usuario, conocida también como Airflow Webserver.
Desde la página inicial, pinchamos en Admin y después en Connections; aquí pincharemos en el boton Add, y rellenaremos el formulario de la siguiente forma:
Campo | Valor |
Conn Id | pdi_default |
Conn Type | HTTP |
Host | Nombre de nuestro host de Carte |
Login | Nombre de usuario del Repositorio de Pentaho Server |
Password | Contraseña del usuario del Repositorio de Pentaho Server |
Extra |
|
Una vez añadamos la conexión, ya podemos empezar a desarrollar DAGs de Airflow que ejecuten tareas de Pentaho PDI.
DAG de ejemplo
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_pentaho.operators.KitchenOperator import KitchenOperator
from airflow_pentaho.operators.PanOperator import PanOperator
from airflow_pentaho.operators.CarteJobOperator import CarteJobOperator
from airflow_pentaho.operators.CarteTransOperator import CarteTransOperator
"""
# For versions before 2.0
from airflow.operators.airflow_pentaho import KitchenOperator
from airflow.operators.airflow_pentaho import PanOperator
from airflow.operators.airflow_pentaho import CarteJobOperator
from airflow.operators.airflow_pentaho import CarteTransOperator
"""
DAG_NAME = "pdi_example_flow"
DEFAULT_ARGS = {
'owner': 'Examples',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'retries': 3,
'retry_delay': timedelta(minutes=10),
'email_on_failure': False,
'email_on_retry': False
}
with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
schedule_interval='30 0 * * *') as dag:
trans1 = CarteTransOperator(
dag=dag,
task_id="trans1",
trans="/home/bi/test_trans_1",
params={"date": "{{ ds }}"})
trans2 = CarteTransOperator(
dag=dag,
task_id="trans2",
trans="/home/bi/test_trans_2",
params={"date": "{{ ds }}"})
job1 = CarteJobOperator(
dag=dag,
task_id="job1",
job="/home/bi/test_job_1",
params={"date": "{{ ds }}"})
trans3 = CarteTransOperator(
dag=dag,
task_id="trans3",
trans="/home/bi/test_trans_3",
params={"date": "{{ ds }}"})
trans4 = CarteTransOperator(
dag=dag,
task_id="trans4",
trans="/home/bi/test_trans_4",
params={"date": "{{ ds }}"})
trans1 >> trans2 >> job1
job1 >> trans3
job1 >> trans4
Este sería el código interpretado en forma de DAG:
¡Así de fácil!
Y de esta forma tan sencilla y a la vez elegante, puedes orquestar tus flujos de datos en batch creados en Pentaho PDI desde Airflow.
A continuación, te dejamos algunos enlaces por si quieres saber más sobre nosotros, sobre el proyecto o contribuir:
- https://damavis.com/es/team/
- https://github.com/damavis/airflow-pentaho-plugin
- https://pypi.org/project/airflow-pentaho-plugin/