Plugin Apache Hop para Apache Airflow

Desde el lanzamiento de nuestro plugin PDI de Pentaho para Apache Airflow, hemos visto un cambio en la industria hacia el uso de Apache Hop para el procesamiento de datos.

¿Qué es Apache Hop?

Apache Hop comenzó (a finales de 2019) como un fork de Kettle PDI, es una pieza de software que se especializa en ETLs (Extracción, Transformación, Carga) a través de visuales en lugar de requerir conocimientos de programación.

Este enfoque en lo visual permite que cualquier tipo de persona pueda desarrollar ETLs de una manera fácil y comprensible.

¿Qué es Apache Airflow?

Apache Airflow es una plataforma de gestión de flujos de trabajo de código abierto para pipelines de ingeniería de datos.

Esto se hace a través de lo que se conoce como Grafos Acíclicos Dirigidos (DAGs) para controlar la orquestación del flujo de trabajo. Tanto las tareas como las dependencias se definen en Python y su programación y ejecución son controladas por Airflow.

El problema de Apache Hop y su solución 

El principal problema de Apache Hop es que no dispone de una forma de orquestar sus flujos de trabajo y pipelines. Pero es aquí donde Apache Airflow aporta una solución al respecto.

Apache Hop ha implementado una forma de ejecutar sus flujos de trabajo y pipelines utilizando el DockerOperator de Airflow. Esta solución está en la hoja de ruta de Apache Hop, pero aún no está completamente documentada. 

Mientras Apache Hop termina su versión oficial orientada a Docker, en Damavis hemos implementado un plugin para orquestar flujos de trabajo y pipelines que utiliza la API del servidor de Apache Hop.

El plugin

El airflow-hop-plugin integra Airflow y Hop para permitir a los ingenieros de datos orquestar sus tareas.

A continuación, explicamos cómo funciona y cómo empezar a sacarle partido.

Cómo funciona el plugin

El plugin utiliza la API REST de Apache Hop para ejecutar pipelines y flujos de trabajo dentro de un servidor Hop. Debido a esto, Apache Airflow puede rastrear el estado de la tarea, sus registros y su tiempo total de ejecución.

Requisitos previos

Para garantizar el uso correcto del plugin hay algunos prerrequisitos que deben cumplirse:

  • Hay que generar un archivo JSON que contenga los metadatos del proyecto. Para ello, solo tienes que seleccionar la opción Tools, luego Export metadata to JSON y asegurarte de que el archivo creado se llama «metadata.json».
  • Estructura del directorio de inicio de Apache Hop. Debido a algunas limitaciones técnicas, el directorio principal de Hop debe tener una estructura determinada para garantizar que el plugin sepa dónde encontrar la información necesaria para generar las llamadas.
hop # This is the hop home directory
├── ...
├── config
│   ├── hop-config.json
│   ├── example_environment.json # This is where you should 
│   │						save your environment files
│   ├── metadata
│   │   └── ...
│   └── projects
│       ├── ...
│       └── example_project # This is how your project's 
│           │			directory should look
│           ├── metadata.json
│           ├── metadata
│           │   └── ...
│           ├── example_directory
│           │   └── example_workflow.hwl
│           └── example_pipeline.hpl
├── ...

Instalación

La instalación de este plugin es bastante sencilla, solo hay que ejecutar el siguiente comando para instalar un paquete de python. Por favor, ten en cuenta que para utilizarlo correctamente también deberás instalarlo en tu entorno Airflow.

pip install airflow-hop-plugin

Configuración

Para configurar completamente el plugin, es necesario crear una nueva conexión de Airflow. Esto puede hacerse desde el webserver de Airflow.

Desde la página de inicio, hay que hacer clic en Admin y luego en Connections, luego hay que hacer clic en el botón Add y rellenar los siguientes campos con sus respectivos valores:

  • Connection ID: ‘hop_default’
  • Connection Type: HTTP
  • Host: Nombre del servidor Apache Hop
  • Puerto: Puerto del servidor Apache Hop
  • Login: Nombre de usuario del servidor de Apache Hop
  • Password: Contraseña del servidor de Apache Hop
  • Extra: {«hop_home»: «/path/to/hop-home/»}

Una vez hecho esto, puedes empezar a desarrollar DAGs de Airflow para ejecutar las tareas de Apache Hop.

Ejemplo de DAG

from airflow_hop.operators import HopPipelineOperator
from airflow_hop.operators import HopWorkflowOperator

# ... #

with DAG('sample_dag', start_date=datetime(2022,7,26),
        schedule_interval='@daily', catchup=False) as dag:

    # Define a pipeline
    first_pipe = HopPipelineOperator(
        task_id='first_pipe',
        pipeline='pipelines/first_pipeline.hpl',
        pipe_config='remote hop server',
        project_name='default',
        log_level='Basic')

    # Define a pipeline with parameters
    second_pipe = HopPipelineOperator(
        task_id='second_pipe',
        pipeline='pipelines/second_pipeline.hpl',
        pipe_config='remote hop server',
        project_name='default',
        log_level='Basic',
        params={'DATE':'{{ ds }}'}) # Date in yyyy-mm-dd format

    # Define a workflow with parameters
    work_test = HopWorkflowOperator(
        task_id='work_test',
        workflow='workflows/workflow_example.hwf',
        project_name='default',
        log_level='Basic',
        params={'DATE':'{{ ds }}'}) # Date in yyyy-mm-dd format

    first_pipe >> second_pipe >> work_test
Conclusión

De esta forma, podrás orquestar tus flujos de datos en batch creados en Apache Hop desde Airflow.

Aquí tienes algunos enlaces por si quieres saber más sobre nosotros, sobre el proyecto o para contribuir:

Si este artículo te ha parecido interesante, te animamos a visitar las categorías Data Engineering y Software para ver otros posts similares a este y a compartirlo en redes. ¡Hasta pronto!
Pere Alzamora
Pere Alzamora
Artículos: 4