El uso de Window en Apache Spark

Cómo usar Window en Apache Spark en su implementación en pySpark

Introducción

Al procesar datos muchas veces nos encontramos en una situación en la que queremos calcular variables sobre un cierto subconjunto de observaciones. Por ejemplo, podríamos estar interesados en la media por grupo o el valor máximo para cada grupo. La función groupBy disponible en muchos lenguajes de programación o de consulta permite que hagamos estos cálculos fácilmente, simplemente especificando las variables que definen un grupo y las funciones que se quieren aplicar dentro de cada grupo.  

Normalmente tras aplicar groupBy se obtiene una sola observación por cada uno de los grupos que se definen, lo que implica que si quisiéramos comparar los agregados obtenidos con las observaciones que los generaron habría que crear una función específica o utilizar un join de la tabla original con los resultados del groupBy.  

Para facilitar labores como esta, muchos lenguajes cuentan con la funcion Window. Esta tiene un funcionamiento similar a groupBy con la diferencia que Window no modifica el número de filas del dataframe. A continuación explicamos cómo usar Window en Apache Spark, en concreto en su implementación en pySpark.

Para comparar el comportamiento de groupBy con el de Window imaginemos el siguiente problema: Tenemos un conjunto de estudiantes y para cada uno tenemos la clase en la que estaba y la calificación obtenida. A partir de estos datos, queremos determinar si un alumno obtuvo una calificación superior al promedio de su clase.

Generemos unos datos simulados de prueba:

~~~~ python
notas = [('ES001', 'Ingles', 18), 
         ('ES002', 'Ingles', 14),
         ('ES003', 'Ingles', 19), 
         ('ES001', 'Literatura', 13), 
         ('ES002', 'Literatura', 16), 
         ('ES003', 'Literatura', 11), 
         ('ES001', 'Matematicas', 15), 
         ('ES002', 'Matematicas', 17), 
         ('ES003', 'Matematicas', 19)]

notasSpark = spark.createDataFrame(notas, schema=['estudiante', 'curso', 'nota']).cache() 

notasSpark.show()
~~~~

~~~~ console
+----------+-----------+----+
|estudiante|      curso|nota|
+----------+-----------+----+
|     ES001|     Ingles|  18|
|     ES002|     Ingles|  14|
|     ES003|     Ingles|  19|
|     ES001| Literatura|  13|
|     ES002| Literatura|  16|
|     ES003| Literatura|  11|
|     ES001|Matematicas|  15|
|     ES002|Matematicas|  17|
|     ES003|Matematicas|  19|
+----------+-----------+----+
~~~~

Para resolver este problema usando solamente groupBy primero agrupamos por clase para obtener el promedio y luego para unir esta variable con los datos originales usamos un join:

~~~~ python
result = notasSpark.join(notasSpark.groupBy('curso').agg(avg(col('nota')).alias('promedioClase')), 
                         on=['curso'],
                         how='left')

result = result.withColumn('superiorPromedio', when(col('nota')>col('promedioClase'), lit(1)).otherwise(lit(0)))

result.show()
~~~~

~~~~ console
+-----------+----------+----+------------------+----------------+
|      curso|estudiante|nota|     promedioClase|superiorPromedio|
+-----------+----------+----+------------------+----------------+
|     Ingles|     ES001|  18|              17.0|               1|
|     Ingles|     ES002|  14|              17.0|               0|
|     Ingles|     ES003|  19|              17.0|               1|
| Literatura|     ES001|  13|13.333333333333334|               0|
| Literatura|     ES002|  16|13.333333333333334|               1|
| Literatura|     ES003|  11|13.333333333333334|               0|
|Matematicas|     ES001|  15|              17.0|               0|
|Matematicas|     ES002|  17|              17.0|               0|
|Matematicas|     ES003|  19|              17.0|               1|
+-----------+----------+----+------------------+----------------+
~~~~

Este mismo ejemplo utilizando Window es mucho más sencillo ya que solo debemos definir la ventana en la que queremos aplicar la función y luego crear la variable usando la función over() indicando la ventana que queremos utilizar para el cálculo:

~~~~ python
win = Window.partitionBy('curso')

result = notasSpark.withColumn('promedioClase', avg(col('nota')).over(win))

result = result.withColumn('superiorPromedio', when(col('nota')>col('promedioClase'), lit(1)).otherwise(lit(0)))

result.show()
~~~~

~~~~ console
+----------+-----------+----+------------------+----------------+
|estudiante|      curso|nota|     promedioClase|superiorPromedio|
+----------+-----------+----+------------------+----------------+
|     ES001| Literatura|  13|13.333333333333334|               0|
|     ES002| Literatura|  16|13.333333333333334|               1|
|     ES003| Literatura|  11|13.333333333333334|               0|
|     ES001|     Ingles|  18|              17.0|               1|
|     ES002|     Ingles|  14|              17.0|               0|
|     ES003|     Ingles|  19|              17.0|               1|
|     ES001|Matematicas|  15|              17.0|               0|
|     ES002|Matematicas|  17|              17.0|               0|
|     ES003|Matematicas|  19|              17.0|               1|
+----------+-----------+----+------------------+----------------+
~~~~

El código con Window es mucho menos engorroso y más eficiente ya que no hay necesidad de hacer un join que muchas veces podría ser muy costoso computacionalmente.

Definición de ventanas

Spark nos provee de diferentes funciones que podemos combinar para establecer muchos tipos de ventana fácilmente:

partitionBy

Esta función es la que usamos en el ejemplo anterior y sirve para definir los grupos que tenemos dentro de los datos. Necesita como argumento una lista con las variables que delimitan tales grupos al igual que cuando usamos un groupBy.

orderBy

Nos permite ordenar los datos dentro de un grupo. Además, al usar orderBy solo o con partitionBy al definir la ventana, se consigue que los cálculos se realicen para todos las filas cuyo valor precede o es igual al de la fila actual. De esta forma, si dos filas tienen el mismo valor para la variable por la que se está ordenando, ambas entrarán en el cálculo y no solo una de ellas.

Para ejemplificar el uso de orderBy usemos los datos de contribución de diferentes sectores al PIB e imaginemos que queremos quedarnos con los sectores más grandes cuya aportación conjunta sea al menos de un 80%. Para esto podemos utilizar una ventana con orderBy para definir un orden descendente de acuerdo al porcentaje del PIB y sumar:

~~~~ python
pib = [('Agricultura', 5), 
       ('Telecomunicaciones', 25), 
       ('Hosteleria', 40), 
       ('Petroquimica', 25),
       ('Construccion', 5)]

pibSpark = spark.createDataFrame(pib, schema=['empresa', 'porcPIB'])

win = Window().orderBy(col('porcPIB').desc())
win2 = Window().orderBy(col('porcPIB').desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)


pibSpark.withColumn('aporteMasGrandes', sum(col('porcPIB')).over(win)).show()
~~~~

~~~~ console
+------------------+-------+----------------+
|           empresa|porcPIB|aporteMasGrandes|
+------------------+-------+----------------+
|        Hosteleria|     40|              40|
|Telecomunicaciones|     25|              90|
|      Petroquimica|     25|              90|
|       Agricultura|      5|             100|
|      Construccion|      5|             100|
+------------------+-------+----------------+
~~~~

Nótese que para Telecomunicaciones el valor de aportesMasGrandes es de 90=40+25+25, ya que orderBy no toma en cuenta que Petroquímica está en la línea siguiente sino que el valor 25 es el mismo en ambas filas. Otra característica importante que se desprende de este ejemplo, es que no es necesario usar partitionBy junto con orderBy. Si no se incluye partitionBy, el grupo es simplemente todos los datos.

orderBy también es útil cuando queremos usar el orden para crear variables. Por ejemplo, con los datos de notas que usamos en el apartado anterior podríamos crear un ranking de los alumnos de acuerdo a su nota dentro de cada clase:

~~~~ python
win = Window.partitionBy('curso').orderBy(col('nota').desc())

result = notasSpark.withColumn('ranking', rank().over(win))

result.show()
~~~~

~~~~ console
+----------+-----------+----+-------+
|estudiante|      curso|nota|ranking|
+----------+-----------+----+-------+
|     ES002| Literatura|  16|      1|
|     ES001| Literatura|  13|      2|
|     ES003| Literatura|  11|      3|
|     ES003|     Ingles|  19|      1|
|     ES001|     Ingles|  18|      2|
|     ES002|     Ingles|  14|      3|
|     ES003|Matematicas|  19|      1|
|     ES002|Matematicas|  17|      2|
|     ES001|Matematicas|  15|      3|
+----------+-----------+----+-------+
~~~~

rowsBetween

Esta función nos ayuda a definir ventanas dinámicas que dependen de la fila en la que se esté haciendo un cálculo y requiere dos argumentos que indican el comienzo y el final de la ventana dinámica. Estos dos valores son números enteros relativos a la fila actual, por ejemplo rowsBetween(-1, 1) indicaría que la ventana va de la fila anterior a la fila actual a la fila posterior a la fila actual. No obstante, Apache Spark provee y recomienda usar cuando sea posible diversas funciones para definir estos intervalos, de forma que se minimice el error humano y aumente la legibilidad del código: 

  • currentRow indica la fila actual donde se está realizando el cálculo.
  • unboundedPreceding indica la primera fila del grupo.
  • unboundedFollowing indica la última fila del grupo. 

Para observar el funcionamiento de rowsBetween, supongamos que tenemos los siguientes datos acerca del precio de una acción en el tiempo:

~~~~ python
precios = [('AAPL', '2021-01-01', 110), 
           ('AAPL', '2021-01-02', 120),
           ('AAPL', '2021-01-03', 110), 
           ('AAPL', '2021-01-04', 115), 
           ('AAPL', '2021-01-05', 150), 
           ('MSFT', '2021-01-01', 150), 
           ('MSFT', '2021-01-02', 130), 
           ('MSFT', '2021-01-03', 140), 
           ('MSFT', '2021-01-04', 120), 
           ('MSFT', '2021-01-05', 140)]

preciosSpark = spark.createDataFrame(precios, schema=['empresa', 'fecha', 'precio'])\
                    .withColumn('fecha', to_date(col('fecha'))).cache() 


preciosSpark.show()
~~~~

~~~~ console
+-------+----------+------+
|empresa|     fecha|precio|
+-------+----------+------+
|   AAPL|2021-01-01|   110|
|   AAPL|2021-01-02|   120|
|   AAPL|2021-01-03|   110|
|   AAPL|2021-01-04|   115|
|   AAPL|2021-01-05|   150|
|   MSFT|2021-01-01|   150|
|   MSFT|2021-01-02|   130|
|   MSFT|2021-01-03|   140|
|   MSFT|2021-01-04|   120|
|   MSFT|2021-01-05|   140|
+-------+----------+------+
~~~~

Si queremos calcular una media móvil desde la primera fecha hasta la fecha que se encuentra en la fila actual podríamos hacer lo siguiente:

~~~~ python
win = Window.partitionBy('empresa').orderBy('fecha').rowsBetween(Window.unboundedPreceding, Window.currentRow)
win2 = Window.partitionBy('empresa').orderBy('fecha')

preciosSpark.withColumn('mediaHistorica', avg(col('precio')).over(win))\
            .withColumn('mediaHistorica2', avg(col('precio')).over(win2)).show()
~~~~

~~~~ console
+-------+----------+------+------------------+------------------+
|empresa|     fecha|precio|    mediaHistorica|   mediaHistorica2|
+-------+----------+------+------------------+------------------+
|   AAPL|2021-01-01|   110|             110.0|             110.0|
|   AAPL|2021-01-02|   120|             115.0|             115.0|
|   AAPL|2021-01-03|   110|113.33333333333333|113.33333333333333|
|   AAPL|2021-01-04|   115|            113.75|            113.75|
|   AAPL|2021-01-05|   150|             121.0|             121.0|
|   MSFT|2021-01-01|   150|             150.0|             150.0|
|   MSFT|2021-01-02|   130|             140.0|             140.0|
|   MSFT|2021-01-03|   140|             140.0|             140.0|
|   MSFT|2021-01-04|   120|             135.0|             135.0|
|   MSFT|2021-01-05|   140|             136.0|             136.0|
+-------+----------+------+------------------+------------------+
~~~~

Véase que usando la ventana win o la ventana win2 se obtiene el mismo resultado ya que en este caso no existen fechas repetidas dentro de cada grupo por lo que el comportamiento de orderBy no se manifiesta. Sin embargo, si usamos el ejemplo con datos del PIB se nota que al usar rowsBetween los datos usados para calcular la suma son solo hasta la fila actual por lo que en aporteMasGrandes2 el valor de telecomunicaciones es solo 65 = 40 + 25 :

~~~~ python
win = Window().orderBy(col('porcPIB').desc())
win2 = Window().orderBy(col('porcPIB').desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)

pibSpark.withColumn('aporteMasGrandes', sum(col('porcPIB')).over(win))\
        .withColumn('aporteMasGrandes2', sum(col('porcPIB')).over(win2)).show()
~~~~

~~~~ console
+------------------+-------+----------------+-----------------+
|           empresa|porcPIB|aporteMasGrandes|aporteMasGrandes2|
+------------------+-------+----------------+-----------------+
|        Hosteleria|     40|              40|               40|
|Telecomunicaciones|     25|              90|               65|
|      Petroquimica|     25|              90|               90|
|       Agricultura|      5|             100|               95|
|      Construccion|      5|             100|              100|
+------------------+-------+----------------+-----------------+
~~~~

rangeBetween

Esta función es parecida a rowsBetween pero trabaja con los valores de la variable usada dentro de orderBy en lugar de con las filas. Una ventana creada con rangeBetween tiene el mismo comportamiento que cuando se usa solo orderBy pero nos posibilita la definición de un intervalo diferente y no solo desde el primer valor al actual. Recibe dos argumentos que señalan cuantos valores por debajo y por encima del valor actual se quieren dentro de la ventana. 

Por ejemplo, si se tiene que para una fila el salario es 10 euros y se tiene una ventana definida por Window.orderBy(‘salario’).rangeBetween(-5, 5); si realizamos un cálculo sobre esta ventana el conjunto que se tomará en cuenta para esta línea será todas las filas cuyos salarios están entre 5 y 15 euros.

Usando los datos de los estudiantes podríamos querer compararlos con aquellos estudiantes que sacan notas similares, por ejemplo podríamos compararlos con el promedio de su clase pero solo de aquellos estudiantes que obtienen +-2 puntos que ellos. Usando rangeBetween el código sería:

~~~~ python
win = Window.partitionBy('curso').orderBy('nota').rangeBetween(-2, 2)

result = notasSpark.withColumn('promedioSimilares', mean(col('nota')).over(win))

result = result.withColumn('superiorPromedio', when(col('nota')>=col('promedioSimilares'), lit(1)).otherwise(lit(0)))

result.show()
~~~~

~~~~ console
+----------+-----------+----+-----------------+----------------+
|estudiante|      curso|nota|promedioSimilares|superiorPromedio|
+----------+-----------+----+-----------------+----------------+
|     ES003| Literatura|  11|             12.0|               0|
|     ES001| Literatura|  13|             12.0|               1|
|     ES002| Literatura|  16|             16.0|               1|
|     ES002|     Ingles|  14|             14.0|               1|
|     ES001|     Ingles|  18|             18.5|               0|
|     ES003|     Ingles|  19|             18.5|               1|
|     ES001|Matematicas|  15|             16.0|               0|
|     ES002|Matematicas|  17|             17.0|               1|
|     ES003|Matematicas|  19|             18.0|               1|
+----------+-----------+----+-----------------+----------------+
~~~~

Funciones para crear variables con ventanas

En Apache Spark podriamos dividir en dos grandes grupos las funciones que pueden usarse sobre una ventana. Además, los usuarios pueden definir sus propias funciones al igual que al usar groupBy (el uso de udfs debe evitarse ya que estas tienden a tener un desempeño muy pobre).

Funciones analíticas

Devuelven un valor para cada fila de un conjunto que puede ser distinto para cada una de ellas.

  • rank → Nos permite asignar a cada fila dentro de un grupo un número natural que indica el lugar que tiene la fila de acuerdo al valor de la variable usada en el orderBy. Si dos filas están empatadas, se les asigna el mismo número en el ranking y se salta un número para la siguiente fila.
  • denseRank → Tiene la misma finalidad que rank solo que cuando hay empates no se salta un número para la fila posterior a las empatadas
  • percentRank → Nos proporciona el percentil que corresponde a la fila cuando se ordenan los datos de acuerdo a la variable usada en orderBy
  • rowNumber → Nos permite asignar a cada fila dentro de un grupo un número natural diferente que indica el lugar que tiene la fila de acuerdo al valor de la variable usada en el orderBy. Si dos filas están empatadas se le asigna el ranking aleatoriamente por lo que los resultados no siempre son iguales.
  • nTile(n) → Sirve para distribuir las filas de un grupo entre n grupos. Es muy útil cuando se quiere hacer una variable ordinal a partir de una variable numérica; por ejemplo 4 niveles de ingreso con mismo número de observaciones cada uno.
  • lag(col, n, default) → Nos proporciona el valor que tiene la columna col, n filas antes de la fila actual. Si n es un número negativo implica que se obtiene el valor que tiene la columna col, n filas posteriores a la fila actual. Si el grupo no tiene suficientes filas anteriores o posteriores a la fila actual lag devuelve nan a menos que se llame la función con el parámetro opcional default en cuyo caso devolverá este valor. El orden de las filas viene dado por la columna usada en el orderBy de la ventana por lo que al usar lag es necesario que la ventana contenga esta cláusula.
  • lead(col, n, default) → Es igual a lag solo que los números positivos implican filas posteriores y los números negativos filas anteriores. 
  • cumeDist → Proporciona la probabilidad acumulada de cada fila de acuerdo a la variable usada en orderBy. En otras palabras, nos dice la proporción de filas cuyo valor en la variable usada en el orderBy es menor o igual al valor de esa variable para la fila actual.
  • last(col, ignorenulls) → Devuelve el último valor que se observa en la columna col antes de la fila actual dentro de la ventana especificada. Requiere que la ventana tenga un orderBy. El argumento ignorenulls=True permite ignorar todos los valores faltantes lo que hace que se devuelva el primer valor no nulo anterior a la fila actual.

Funciones de agregación

Todas aquellas funciones que compactan un conjunto de datos en un único valor que representa al conjunto, tales como las funciones sum, min, max, avg y count.

Ejemplos de uso adicionales

rangeBetween y ventanas de fechas

Para aplicar rangeBetween para crear ventanas de fechas basta con transformar las fechas a timestamp y dividir por el número de segundos en el periodo que queremos calcular la ventana, por ejemplo dividiendo entre 86400 para días o 3600 para horas. En el siguiente ejemplo si quisiéramos encontrar la media móvil de tres días no podríamos utilizar rowsBetween como antes ya que tenemos fechas faltantes pero podemos utilizar rangeBetween:

~~~~ python
precios2 = [('AAPL', '2021-01-05', 110), 
            ('AAPL', '2021-01-07', 120),
            ('AAPL', '2021-01-13', 110), 
            ('AAPL', '2021-01-14', 115), 
            ('AAPL', '2021-01-15', 150), 
            ('MSFT', '2021-01-01', 150), 
            ('MSFT', '2021-01-05', 130), 
            ('MSFT', '2021-01-06', 140), 
            ('MSFT', '2021-01-09', 120), 
            ('MSFT', '2021-01-12', 140)]

preciosSpark2 = spark.createDataFrame(precios2, schema=['empresa', 'fecha', 'precio'])\
                     .withColumn('fecha', to_date(col('fecha'))).cache() 

preciosSpark2.show()
~~~~

~~~~ console
+-------+----------+------+
|empresa|     fecha|precio|
+-------+----------+------+
|   AAPL|2021-01-05|   110|
|   AAPL|2021-01-07|   120|
|   AAPL|2021-01-13|   110|
|   AAPL|2021-01-14|   115|
|   AAPL|2021-01-15|   150|
|   MSFT|2021-01-01|   150|
|   MSFT|2021-01-05|   130|
|   MSFT|2021-01-06|   140|
|   MSFT|2021-01-09|   120|
|   MSFT|2021-01-12|   140|
+-------+----------+------+
~~~~

~~~~ python
dias = lambda i: i * 86400 
win = Window.partitionBy('empresa')\
            .orderBy(col('fecha').cast("timestamp").cast("long"))\
            .rangeBetween(-dias(3), 0)

preciosSpark2.withColumn('mediaMovil3Dias', avg(col('precio')).over(win)).show()
~~~~

~~~~ console
+-------+----------+------+---------------+
|empresa|     fecha|precio|mediaMovil3Dias|
+-------+----------+------+---------------+
|   AAPL|2021-01-05|   110|          110.0|
|   AAPL|2021-01-07|   120|          115.0|
|   AAPL|2021-01-13|   110|          110.0|
|   AAPL|2021-01-14|   115|          112.5|
|   AAPL|2021-01-15|   150|          125.0|
|   MSFT|2021-01-01|   150|          150.0|
|   MSFT|2021-01-05|   130|          130.0|
|   MSFT|2021-01-06|   140|          135.0|
|   MSFT|2021-01-09|   120|          130.0|
|   MSFT|2021-01-12|   140|          130.0|
+-------+----------+------+---------------+
~~~~

Creando rezagos para modelos de series temporales

Si tenemos un dataset que no tiene fechas faltantes entonces obtener una variable rezagada es tan sencillo como usar lag con la ventana que se desea. Ejemplifiquemos aplicando este procedimiento al conjunto de preciosSpark:

~~~~ python
dias = lambda i: i * 86400 
win = Window.partitionBy('empresa')\
            .orderBy(col('fecha').cast("timestamp").cast("long"))
preciosSpark2.withColumn('precio_1', lag(col('precio')).over(win)).show()
~~~~

~~~~ console
+-------+----------+------+--------+
|empresa|     fecha|precio|precio_1|
+-------+----------+------+--------+
|   AAPL|2021-01-05|   110|    null|
|   AAPL|2021-01-07|   120|     110|
|   AAPL|2021-01-13|   110|     120|
|   AAPL|2021-01-14|   115|     110|
|   AAPL|2021-01-15|   150|     115|
|   MSFT|2021-01-01|   150|    null|
|   MSFT|2021-01-05|   130|     150|
|   MSFT|2021-01-06|   140|     130|
|   MSFT|2021-01-09|   120|     140|
|   MSFT|2021-01-12|   140|     120|
+-------+----------+------+--------+
~~~~

Sin embargo, en el caso del dataset preciosSpark2 no es posible implementar esta metodología ya que el valor de la fila anterior puede corresponder tanto a 1 dia antes como a varios días antes. En este caso podríamos usar cualquier función de agregación (por ejemplo sum) junto con una ventana que contenga rangeBetween para limitar las fechas en las que se calcula la función a exactamente el número de días anteriores que queremos:

~~~~ python
dias = lambda i: i * 86400 
win = Window.partitionBy('empresa')\
            .orderBy(col('fecha').cast("timestamp").cast("long"))\
            .rangeBetween(-dias(1),-dias(1))
preciosSpark2.withColumn('precio_1', sum(col('precio')).over(win)).show()
~~~~

~~~~ console
+-------+----------+------+--------+
|empresa|     fecha|precio|precio_1|
+-------+----------+------+--------+
|   AAPL|2021-01-05|   110|    null|
|   AAPL|2021-01-07|   120|    null|
|   AAPL|2021-01-13|   110|    null|
|   AAPL|2021-01-14|   115|     110|
|   AAPL|2021-01-15|   150|     115|
|   MSFT|2021-01-01|   150|    null|
|   MSFT|2021-01-05|   130|    null|
|   MSFT|2021-01-06|   140|     130|
|   MSFT|2021-01-09|   120|    null|
|   MSFT|2021-01-12|   140|    null|
+-------+----------+------+--------+
~~~~

Tiempo que ha pasado desde un evento

En muchas situaciones nos encontramos con datasets donde tenemos en cada fila un evento y queremos saber la fecha entre algunos de ellos. Por ejemplo, podríamos tener un dataset con los logs de ciertas maquinarias junto con los fallos que han tenido:

~~~~ python
eventos = [('m1', '2021-01-01', 'e1'), 
           ('m1', '2021-02-02', 'e1'),
           ('m1', '2021-03-03', 'e2'), 
           ('m1', '2021-03-04', 'critico'), 
           ('m1', '2021-07-05', 'e1'), 
           ('m2', '2021-01-01', 'e1'), 
           ('m2', '2021-06-02', 'critico'), 
           ('m2', '2021-08-03', 'e2'), 
           ('m2', '2021-09-04', 'e1'), 
           ('m2', '2021-10-17', 'critico'),
           ('m2', '2021-10-25', 'e1')]

eventosSpark = spark.createDataFrame(eventos, schema=['maquina', 'fecha', 'error'])\
                    .withColumn('fecha', to_date(col('fecha'))).cache() 
    
eventosSpark.show()
~~~~

~~~~ console
+-------+----------+-------+
|maquina|     fecha|  error|
+-------+----------+-------+
|     m1|2021-01-01|     e1|
|     m1|2021-02-02|     e1|
|     m1|2021-03-03|     e2|
|     m1|2021-03-04|critico|
|     m1|2021-07-05|     e1|
|     m2|2021-01-01|     e1|
|     m2|2021-06-02|critico|
|     m2|2021-08-03|     e2|
|     m2|2021-09-04|     e1|
|     m2|2021-10-17|critico|
|     m2|2021-10-25|     e1|
+-------+----------+-------+
~~~~

Si estamos interesados en el tiempo que ha pasado desde cada uno de los tipos de errores hasta un error crítico podríamos usar la función last junto con algunas variables auxiliares que tienen valores faltantes a menos que el error sea de cierto tipo:

~~~~ python
errores = ['e1', 'e2']

# Creamos variables que tienen la fecha si el error es de cierto tipo 
for i in errores:
    eventosSpark = eventosSpark.withColumn('fecha_'+i, when(col('error')==i, col('fecha')).otherwise(None))
    
    
# Calculamos el tiempo desde cada tipo de error al error critico
win = Window.partitionBy('maquina').orderBy('fecha')
for i in errores:
    eventosSpark = eventosSpark.withColumn('fechaUlt_'+i, last(col('fecha_'+i), True).over(win))\
                               .withColumn('tiempoUlt_'+i, 
                                           (unix_timestamp(col('fecha'))-unix_timestamp(col('fechaUlt_'+i)))/86400)

eventosSpark.filter(col('error')=='critico').show()
~~~~

~~~~ console
+-------+----------+-------+--------+--------+-----------+------------+-----------+------------+
|maquina|     fecha|  error|fecha_e1|fecha_e2|fechaUlt_e1|tiempoUlt_e1|fechaUlt_e2|tiempoUlt_e2|
+-------+----------+-------+--------+--------+-----------+------------+-----------+------------+
|     m1|2021-03-04|critico|    null|    null| 2021-02-02|        30.0| 2021-03-03|         1.0|
|     m2|2021-06-02|critico|    null|    null| 2021-01-01|       152.0|       null|        null|
|     m2|2021-10-17|critico|    null|    null| 2021-09-04|        43.0| 2021-08-03|        75.0|
+-------+----------+-------+--------+--------+-----------+------------+-----------+------------+
~~~~

Conclusión

En este post hemos aprendido qué son las ventanas en Spark, la sintaxis para su definición y las funciones más comunes utilizadas con ellas. Además, hemos visto ejemplos de tareas que se pueden resolver con ventanas así como las ventajas de su uso.

En post futuros hablaremos de cómo definir un modelo de demanda simple y una estrategia recursiva para encontrar el precio óptimo de venta. Mientras tanto, te animamos a visitar la categoría Data Science del blog de Damavis y ver otros artículos similares a este.

Te invitamos a compartir este artículo con tus amigos. Recuerda etiquetarnos para hacernos llegar tu opinión (@DamavisStudio). ¡Nos vemos en redes!
Carlos Rodriguez
Carlos Rodriguez
Artículos: 10