Integración de DBT con Apache Spark

Introducción

En este post vamos a hablar de cómo DBT se integra con Spark y para qué puede resultarnos útil dicha integración.

DBT es un framework que nos facilita el diseño del modelado de datos a lo largo de los diferentes ciclos del mismo. DBT se usa normalmente para el modelado de datos con fines analíticos. De esta manera, utilizando DBT interponemos una capa de desarrollo que adopta un DSL muy cercano al del SQL tradicional para agilizar el desarrollo de ELT. 

Esta posibilidad, junto con su capacidad de ser transversal a diferentes almacenes de datos o Data Warehouses, son las principales virtudes que nos ofrece DBT frente al resto de alternativas.

Transformaciones de datos con DBT y Spark

A continuación, veremos un caso práctico de cómo utilizar Spark como motor de las transformaciones modeladas con DBT.

Lo primero que necesitaremos es levantar el entorno adecuado para desarrollar esta POC. En nuestro caso utilizaremos las siguientes dependencias binarias:

  • spark-3.0.2-bin-hadoop3.2
  • python 3.8
spark/sbin  ./start-thriftserver.sh

Con este comando, levantamos un servicio de Thrift Server en Spark que, a su vez, permite ejecutar consultas SQL interactivas en Spark a través de una conexión JDBC/ODBC. El servicio levantado expone por defecto el puerto 10000 que será utilizado para conectar DBT con Spark. Si nos conectamos mediante un IDE de desarrollo de bases de datos a la URI ‘jdbc:hive2://localhost:10000’, podremos realizar queries directamente sobre Spark, ese es el mismo principio que DBT aprovecha para poder llevar a cabo esta integración.

El proceso de modelado de base de datos es similar al de otros desarrollos con DBT, pero es importante destacar las capacidades específicas de la integración con Spark.

En primer lugar, me gustaría resaltar que permite desarrollar modelos que no solo pueden persistir en un soporte indexado tipo base de datos, sino que también se pueden almacenar en un sistema de archivos y en todos los formatos habituales (parquet, csv, orc, etc.). Esto nos brinda la flexibilidad de movernos a un paradigma de ETL, lo cual es particularmente interesante, ya que en muchas ocasiones es posible que necesitemos trabajar con un modelo de datos que no tenga soporte en un Data Warehouse.

Sin embargo, en la mayoría de las integraciones con DBT, encontramos que este último es una pieza imprescindible dentro de la arquitectura.

Proyecto de modelado de datos

En este repositorio de GitHub de Damavis podemos encontrar un código muy sencillo que nos permite levantar un entorno de DBT con las dependencias necesarias y también un proyecto simple de modelado. Por tanto, clonamos el repositorio en local y procedemos a instalar las dependencias de Python mediante Poetry. Aconsejo que se haga mediante un virtualenv.

pip install poetry 

poetry install

Una vez instaladas las dependencias, debemos crear nuestro profiles.yml, que contiene la conectividad entre DBT y el servicio de Spark abierto sobre el puerto 10000.

damavis_dbt_spark:
  target: local
  outputs:
    local:
      type: spark
      method: thrift
      host: localhost
      schema: test       
      port: 10000

DBT tiene varios tipos de conexión con Spark ODBC, Thrift, Http y Session:

  • ODBC es un método que nos permite conectarnos mediante el driver de conexión SQL de Databricks.
  • Thrift nos permite conectarnos a un server de Spark como podría ser un EMR de Amazon o un HDinsight.
  • Http nos permite conectarnos a un cluster pero mediante un servicio HTTP, actualmente (v1.6) solamente se puede integrar con el cluster interactivo de Databricks.
  • Session te permite conectar con una sesión de pySpark lanzada en local.

Ya tenemos configurada la conexión entre DBT y nuestro Spark en local. Veamos ahora el código fuente. En el fichero dbt_project.yml encontramos la primera peculiaridad del desarrollo con Spark:

models:
 damavis_dbt_spark:
   +file_format: csv
   +location_root: /hive/damavis
   +materialized: table

Aquí vemos que podemos especificar por defecto que nuestros modelos se persistan con un formato específico mediante “file_format” o que se almacena por defecto en una ruta específica “location_root”. DBT soporta múltiples formatos con Spark como parquet, delta, iceberg, hudi, csv, json, text, jdbc, orc, hive o libsvm.

Si ejecutamos dbt run, veremos que se crea la siguiente estructura de carpetas:

Podemos observar que el esquema de metainformación como es el fichero _SUCCESS se crea cuando el stage de Spark termina correctamente.

En el modelo example_partition_model vemos algunas de las opciones de las que disponemos para persistir los datos:

{{ config(
   partition_by=['year','month','day'],
   file_format='csv',
   options={'header': 'true'}
   buckets= 100
) }}


select
id,
2023 as year,
10 as month,
id as day
from {{ ref('example_view') }}

De esta forma, podemos especificar opciones de persistencia al igual que en Spark en el método “write.option(«header», True).csv(«path»)”. También podemos establecer opciones de ‘partition_by’, en nuestro caso por ‘year’, ‘month’ y ‘day’, o la cantidad de registros por cada uno de los ficheros que se almacenan mediante la opción ‘buckets’.

Conclusión

DBT nos permite desarrollar modelos dentro de un HDFS mediante el uso de Spark como motor de ejecución, permitiendo aprovechar las fortalezas de cada uno de los frameworks.

Por otro lado, DBT puede ayudarnos a modelar de una forma rápida, documentar, aliviar la curva de aprendizaje y, por último, gestionar operaciones relativamente complicadas de desarrollar en Spark como: pruebas de calidad e integridad de los datos en cada ejecución, sistemas de snapshots desde DBT, seeds, etc. Mientras que Spark, por su lado, nos permite utilizar el músculo de un cluster de Spark para tratar con una gran cantidad de datos sin necesidad de un sistema de un Datawarehouse.

Si este artículo te ha parecido interesante, te animamos a visitar la categoría Data Engineering para ver otros posts similares a este y a compartirlo en redes. ¡Hasta pronto!
Óscar García
Óscar García
Artículos: 8