La llegada de la versión 3.5 de Apache Spark ha supuesto una revolución en la forma en la que trabajamos con los UDFs. A pesar de que siempre han existido problemas con ellas, como ya abordamos en Evitando usar UDFs en Apache Spark, ahora podemos disfrutar de la optimización de nuestros UDFs mediante Apache Arrow.
- ¿Qué son las funciones definidas por usuario o UDFs?
- Cómo activar UDFs para Arrow en Spark 3.5
- Conclusión
¿Qué son las funciones definidas por usuario o UDFs?
Al trabajar con PySpark, las UDFs (User Defined Functions) son elementos que permiten ampliar y personalizar las operaciones que se realizan con los datos. Básicamente, son funciones personalizadas que se definen en Python y se aplican a las columnas de un DataFrame. El problema que subyace aquí es que las UDFs estándar pueden resultar un poco lentas si no se optimizan bien.
La dificultad radica en que Spark se ejecuta sobre la JVM, pero las UDFs se ejecutan en Python. Cada vez que se llama a la UDF, los datos deben serializarse, ejecutar la UDF en Python y hacer el camino inverso para que vuelva al contexto de la JVM.
Ahora, con Spark 3.5 que integra Apache Arrow para el proceso de UDFs, todo se acelera drásticamente. En este post, no entraremos en el detalle de por qué Arrow acelera el intercambio, ya que eso lo abordamos en la Introducción a Apache Arrow. No obstante, si aclararemos que, en resumen, Arrow nos permite mantener los datos en una memoria compartida entre JVM y el contexto de Python. Gracias a esta posibilidad, podemos ahorrar serializaciones y deserializaciones de datos.
Cómo activar UDFs para Arrow en Spark 3.5
Para habilitar el uso de las UDFs optimizadas para Apache Arrow en la versión 3.5 de Apache Spark, ejecutamos el siguiente comando:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
Hasta la versión 4.0, esta opción viene desactivada por defecto. A continuación, veremos mediante una serie de ejemplos cómo mejora el rendimiento con y sin el uso de Arrow.
Conversión de DataFrame de PySpark a Pandas
A continuación, veremos un ejemplo de cómo convertir un DataFrame de PySpark a uno de Pandas.
from pyspark.sql import SparkSession
import time
spark = SparkSession.builder \
.config("spark.driver.memory", "6g") \
.config("spark.driver.cores", "3") \
.getOrCreate()
df = spark.range(1, 1000000)
start = time.time()
pdf = df.toPandas()
end = time.time()
total_time = end - start
print(f"Total time without arrow enabled was {total_time}")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
start = time.time()
pdf = df.toPandas()
end = time.time()
total_time = end - start
print(f"Total time with arrow enabled was {total_time}")
Resultados de la ejecución:
Total time without arrow enabled was 5.553412675857544
Total time with arrow enabled was 0.46186256408691406Si observamos los resultados de la ejecución, el rendimiento a la hora de habilitar PyArrow aumenta de manera drástica.
Ejecución de UDFs estándar en PySpark
A continuación, probaremos otra implementación en el uso de UDFs definidos en PySpark. Una función UDF en PySpark ejecuta lógica personalizada en cada fila de un DataFrame. En este sentido, hay que tener en cuenta que los UDFs en PySpark no están optimizados para funcionar de manera distribuida. Generalmente, los UDFs requieren la serialización y deserialización de datos entre JVM (Java Virtual Machine, donde se ejecuta Spark) y Python, lo cual es costoso.
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, rand
from pyspark.sql.types import StringType
import time
spark = SparkSession.builder \
.config("spark.driver.memory", "6g") \
.config("spark.driver.cores", "3") \
.getOrCreate()
df = spark.range(1, 100000)
random_df = df.select(*((col(c) + rand(seed=1234)).alias(c) for c in df.columns))
@udf(returnType=StringType())
def add_prefix(index):
return f"Hi {index}"
start = time.time()
result = random_df.withColumn("nc", add_prefix(random_df["id"]))
result.collect()
end = time.time()
total_time = end - start
print(f"Total time without arrow was {total_time}")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
@udf(returnType=StringType())
def add_prefix_2(index):
return f"Hi {index}"
start = time.time()
result = random_df.withColumn("nc", add_prefix_2(random_df["id"]))
result.collect()
end = time.time()
total_time = end - start
print(f"Total time with arrow was {total_time}")
Resultados de la ejecución:
Total time without arrow was 2.7320878505706787
Total time with arrow was 0.7661206722259521Tal y como podemos observar, se produce otra mejora sustancial del rendimiento a la hora de hacer la conversión de integer a string para cada una de las ejecuciones.
Uso de la función pandas_udf
En este caso, veremos un último ejemplo en el que se utiliza la función pandas_udf.
import random
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
from pyspark.sql.functions import pandas_udf
import time
import pandas as pd
spark = SparkSession.builder.config("spark.driver.memory", "6g").config("spark.driver.cores",
"3").getOrCreate()
schema = StructType([
StructField("log_id", StringType(), True),
StructField("message", StringType(), True)
])
random_data = [(f"log_{random.randint(1000, 9999)}",
random.choice(
["Error occurred", "Operation successful", "Timeout", "Invalid input"]))
for _ in range(100000)]
df = spark.createDataFrame(random_data, schema)
@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series) -> pd.DataFrame:
return s1 + s2
start = time.time()
result_df = df.withColumn("result", func(df["log_id"], df["message"]))
result_df.collect()
end = time.time()
total_time = end - start
print(f"Total time without arrow was {total_time}")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
start = time.time()
result_df = df.withColumn("result", func(df["log_id"], df["message"]))
result_df.collect()
end = time.time()
total_time = end - start
print(f"Total time without arrow was {total_time}")
Resultados de la ejecución:
Total time without arrow was 6.746865749359131
Total time without arrow was 1.6970367431640625En PySpark, el decorador @pandas_udf permite definir funciones UDFs pero utilizando Pandas en lugar de operar fila por fila. Por otro lado, pandas_udf opera con las series de Pandas como parámetro. Este detalle posibilita procesar los datos en bloques y aprovechar los beneficios de Pandas Series.
Conclusión
En este post, hemos analizado cómo el uso de UDFs en PySpark ha sido históricamente una solución flexible pero ineficiente, especialmente por la necesidad de serializar y deserializar datos entre Python y la JVM de Spark.
Sin embargo, a partir de la versión 3.5 de Spark, la integración con Apache Arrow ha supuesto una mejora significativa en el rendimiento. Como hemos visto en los distintos ejemplos, habilitar Arrow acelera drásticamente las operaciones, tanto en la conversión de DataFrames de PySpark a Pandas como en la ejecución de UDFs.
Hasta aquí nuestro post de hoy. Si te ha parecido interesante, te animamos a compartirlo en redes con tus contactos. ¡Hasta pronto!

