Schedule, orchestrate and monitor your Kettle tasks with Airflow with this Pentaho plugin.
At Damavis we know the importance of data processing. Extracting, cleaning, transforming, aggregating, loading or cross-referencing multiple data sources allows our clients to have Insights or Predictive Models using Machine Learning. In short, to obtain value from these data.
But maintaining, executing, monitoring all these ETLs and aggregations can become a complicated task, especially when we start to have a lot of dependencies between these processes.
What is Pentaho PDI (Kettle)?
Pentaho Data Integration is a tool primarily designed for ETLs (Extraction, Transform, Load) without the need for programming skills.
Many of our customers already use this tool for their Business Intelligence processes. The reasons for choosing this tool may be many, but the most notable are that it is OpenSource, it is very complete and easy to use, supporting traditional workloads but also BigData or IoT.
What’s the problem?
The problem is that the OpenSource version of Pentaho PDI lacks a task orchestrator, and we cannot establish relationships between different Transformations or Jobs, nor can we schedule those tasks to run from a certain time or respecting their dependencies. This is where Apache Airflow and the plugin developed by Damavis come into play.
What is Apache Airflow?
Apache Airflow is a platform for authoring, scheduling and centrally monitoring data batch workflows. This tool solves problems such as executing tasks with a scheduler, managing retries in case of errors, custom error handling, establishing dependency relationships between tasks to optimise the execution times of the entire pipeline and much more.
The plugin
The plugin airflow-pentaho-plugin integrates these two platforms to allow Data Engineers to orchestrate everything in an elegant way, and above all, to make our lives easier.
Below we explain how it works and how you can install it in your work environment and start taking advantage of the benefits it offers.
How does the plugin work?
The operation of the plugin is relatively easy. By using the REST API of the Pentaho PDI Carte service, Apache Airflow is able to launch and monitor both Jobs and Transformations, fully integrated. This means that from Airflow we will be able to see the status of these tasks, how long they have taken, as well as the logs.
Prerequisites
- A working Apache Airflow installation.
- A configured Pentaho PDI installation.
- A running installation of the Carte service.
Installation
Installation is simple and only requires the installation of a python package, which you can install using the pip tool, as in the example below. Remember that in order to use it during development, you will also need to install it in your Airflow environment.
~$ pip install airflow-pentaho-plugin
Configuration
In order to use the plugin we need to set up a new connection to Airflow. This can be done from the User Interface, also known as Airflow Webserver.
From the home page, click on Admin and then Connections; here click on the Add button, and fill in the form as follows:
Box | Value |
Conn Id | pdi_default |
Conn Type | HTTP |
Host | Name of our Carte host |
Login | Pentaho Server Repository User Name |
Password | Pentaho Server Repository User Password |
Extra |
|
Once we add the connection, we can start developing Airflow DAGs that run Pentaho PDI tasks.
Example DAG
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow_pentaho.operators.KitchenOperator import KitchenOperator
from airflow_pentaho.operators.PanOperator import PanOperator
from airflow_pentaho.operators.CarteJobOperator import CarteJobOperator
from airflow_pentaho.operators.CarteTransOperator import CarteTransOperator
"""
# For versions before 2.0
from airflow.operators.airflow_pentaho import KitchenOperator
from airflow.operators.airflow_pentaho import PanOperator
from airflow.operators.airflow_pentaho import CarteJobOperator
from airflow.operators.airflow_pentaho import CarteTransOperator
"""
DAG_NAME = "pdi_example_flow"
DEFAULT_ARGS = {
'owner': 'Examples',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['airflow@example.com'],
'retries': 3,
'retry_delay': timedelta(minutes=10),
'email_on_failure': False,
'email_on_retry': False
}
with DAG(dag_id=DAG_NAME,
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
schedule_interval='30 0 * * *') as dag:
trans1 = CarteTransOperator(
dag=dag,
task_id="trans1",
trans="/home/bi/test_trans_1",
params={"date": "{{ ds }}"})
trans2 = CarteTransOperator(
dag=dag,
task_id="trans2",
trans="/home/bi/test_trans_2",
params={"date": "{{ ds }}"})
job1 = CarteJobOperator(
dag=dag,
task_id="job1",
job="/home/bi/test_job_1",
params={"date": "{{ ds }}"})
trans3 = CarteTransOperator(
dag=dag,
task_id="trans3",
trans="/home/bi/test_trans_3",
params={"date": "{{ ds }}"})
trans4 = CarteTransOperator(
dag=dag,
task_id="trans4",
trans="/home/bi/test_trans_4",
params={"date": "{{ ds }}"})
trans1 >> trans2 >> job1
job1 >> trans3
job1 >> trans4
This would be the code interpreted in the form of DAG:
Is that easy!
And in this simple yet elegant way, you can orchestrate your batch data flows created in Pentaho PDI from Airflow.
Here are some links in case you want to know more about us, about the project or to contribute:
- https://damavis.com/en/team/
- https://github.com/damavis/airflow-pentaho-plugin
- https://pypi.org/project/airflow-pentaho-plugin/