Llamadas bloqueantes y programación asíncrona con Scala

Introducción

Este post trata sobre el impacto que puede tener el código bloqueante en una aplicación y la importancia de utilizar librerías que soportan nativamente la programación asíncrona. El objetivo es que se entienda que una llamada bloqueante es siempre bloqueante. 

En el artículo, utilizamos el lenguaje de programación Scala con su herramienta para trabajar con código asíncrono, los Futuros. Sin embargo, el concepto es extrapolable a otros escenarios.

Llamadas bloqueantes

Supongamos que tenemos un servicio externo a nuestra aplicación del que queremos obtener un resultado. Esto en la práctica puede ser una simple llamada a una base de datos.

object Service {
   def callBlocking(): Int = {
     Thread.sleep(1000) // simulate long computation
     99
} 

El método callBLocking() simula la llamada al servicio bloqueando el hilo por 1 segundo. ¿Por qué es esto un problema? Supongamos que queremos hacer 100 llamadas a este servicio:

val startTime = System.currentTimeMillis()

(1 to 100).map { n =>
      logger.info("Starting iteration {}", n)
      val result = Service.callBlocking()
      logger.info("Ending iteration {}", n)
      result
}

val endTime = System.currentTimeMillis()
val totalTime = endTime - startTime
logger.info("Total time taken: {} ms", totalTime)

El resultado es:

13:19:40.054 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 1
13:19:41.057 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 1
13:19:41.058 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 2
13:19:42.059 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 2
13:19:42.059 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 3
13:19:43.060 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 3
13:19:43.060 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 4
13:19:44.061 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 4
13:19:44.061 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 5
13:19:45.062 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 5
13:19:45.063 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 6
13:19:46.063 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 6
...
...
...
13:21:19.146 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 99
13:21:19.147 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 100
13:21:20.147 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 100
13:21:20.150 [sbt-bg-threads-1] INFO Main$ -- Total time taken: 100122 ms

Se puede ver que las 100 llamadas al servicio se están ejecutando de manera secuencial en el mismo hilo, necesitando un total de 100122 ms para completarlas. ¿Esto parece bastante mejorable, no?

Figura 1: Cada caja representa una llamada al servicio. Las llamadas se hacen de manera secuencial en el mismo hilo.

Llamadas bloqueantes con Futuros

El cambio más directo es envolver la llamada bloqueante en un Futuro, para que se ejecute en un hilo diferente.

implicit val ec = ExecutionContext.fromExecutorService {   
Executors.newFixedThreadPool(8)
}

val startTime = System.currentTimeMillis()

val futures = (1 to 100).map { n =>
      Future {
            logger.info("Starting iteration {}", n)
            val result = Service.callBlocking()  
            logger.info("Ending iteration {}", n)
            result
      }
}

val result = Future.sequence(futures)
result.onComplete { _ =>
       val endTime = System.currentTimeMillis()
       val totalTime = endTime - startTime logger.info("Total time taken: {} ms", totalTime)
}

Se utiliza un conjunto de 8 hilos para ejecutar los futuros. El resultado es:

13:30:57.320 [pool-8-thread-5] INFO Main$ -- Starting iteration 5
13:30:57.320 [pool-8-thread-8] INFO Main$ -- Starting iteration 8
13:30:57.319 [pool-8-thread-6] INFO Main$ -- Starting iteration 6
13:30:57.319 [pool-8-thread-3] INFO Main$ -- Starting iteration 3
13:30:57.320 [pool-8-thread-2] INFO Main$ -- Starting iteration 2
13:30:57.319 [pool-8-thread-1] INFO Main$ -- Starting iteration 1
13:30:57.320 [pool-8-thread-4] INFO Main$ -- Starting iteration 4
13:30:57.320 [pool-8-thread-7] INFO Main$ -- Starting iteration 7
13:30:58.322 [pool-8-thread-8] INFO Main$ -- Ending iteration 8
13:30:58.322 [pool-8-thread-5] INFO Main$ -- Ending iteration 5
13:30:58.322 [pool-8-thread-6] INFO Main$ -- Ending iteration 6
13:30:58.324 [pool-8-thread-6] INFO Main$ -- Starting iteration 11
13:30:58.323 [pool-8-thread-3] INFO Main$ -- Ending iteration 3
13:30:58.323 [pool-8-thread-8] INFO Main$ -- Starting iteration 9
13:30:58.323 [pool-8-thread-2] INFO Main$ -- Ending iteration 2
13:30:58.323 [pool-8-thread-1] INFO Main$ -- Ending iteration 1
13:30:58.323 [pool-8-thread-4] INFO Main$ -- Ending iteration 4
13:30:58.323 [pool-8-thread-7] INFO Main$ -- Ending iteration 7
13:30:58.324 [pool-8-thread-5] INFO Main$ -- Starting iteration 10
13:30:58.325 [pool-8-thread-3] INFO Main$ -- Starting iteration 12
13:30:58.326 [pool-8-thread-2] INFO Main$ -- Starting iteration 13
...
...
...
13:31:10.335 [pool-8-thread-2] INFO Main$ -- Ending iteration 97
13:31:10.335 [pool-8-thread-6] INFO Main$ -- Ending iteration 98
13:31:10.335 [pool-8-thread-8] INFO Main$ -- Ending iteration 99
13:31:10.336 [pool-8-thread-5] INFO Main$ -- Ending iteration 100
13:31:10.336 [pool-8-thread-5] INFO Main$ -- Total time taken: 13057 ms

Se puede ver cómo las llamadas se van ejecutando de manera concurrente, necesitando 13057 ms para completarlas. Es una mejora considerable pero, ¿se puede hacer mejor? ¿Se están ejecutando de manera concurrente? En los logs se puede ver cómo las llamadas se van completando en batches de 8. Esto es porque el pool de hilos es de 8 y las llamadas bloqueantes acaban bloqueándolos todos. Esto es importante de entender: el código bloqueante es bloqueante. 

En este caso, el Thread.sleep(1000) cuando se ejecuta bloquea el thread por 1 segundo, lo que hace que no se pueda ejecutar otra cosa hasta que acabe. Envolver una llamada bloqueante en un Futuro no lo transforma en no bloqueante de manera mágica.

Figura 2. Se utiliza un pol de 4 hilos para hacer las llamadas. Estas se hacen en batches de 4.

En la práctica, en vez de utilizar un contexto de ejecución propio, es muy común utilizar el contexto de ejecución global de Scala, pero la idea es la misma. El contexto de ejecución global crea un conjunto de hilos igual al número de procesadores de la máquina, en mi caso de 16 hilos.

implicit val ec = ExecutionContext.global
val poolSize = global match {
      case ec: ExecutionContextExecutorService =>
        ec.asInstanceOf[ForkJoinPool].getParallelism()
}
logger.warn("Global EC pool size: {}", poolSize)

val startTime = System.currentTimeMillis()
val futures = (1 to 100).map { n =>
      Future {
     		logger.info("Starting iteration {}", n)
     		val result = Service.callBlocking()
     		logger.info("Ending iteration {}", n)
     		result
    }
}

val result = Future.sequence(futures)
result.onComplete { _ =>
      val endTime = System.currentTimeMillis()
      val totalTime = endTime - startTime
      logger.info("Total time taken: {} ms", totalTime)
}

El resultado es idéntico al anterior, pero con más hilos. 

13:42:32.074 [sbt-bg-threads-1] WARN Main$ -- Global EC pool size: 16
13:42:32.119 [scala-execution-context-global-167] INFO Main$ -- Starting iteration 1
13:42:32.119 [scala-execution-context-global-168] INFO Main$ -- Starting iteration 2
13:42:32.119 [scala-execution-context-global-169] INFO Main$ -- Starting iteration 3
13:42:32.119 [scala-execution-context-global-170] INFO Main$ -- Starting iteration 4
13:42:32.119 [scala-execution-context-global-172] INFO Main$ -- Starting iteration 5
13:42:32.120 [scala-execution-context-global-171] INFO Main$ -- Starting iteration 6
13:42:32.120 [scala-execution-context-global-173] INFO Main$ -- Starting iteration 7
13:42:32.120 [scala-execution-context-global-174] INFO Main$ -- Starting iteration 8
13:42:32.120 [scala-execution-context-global-175] INFO Main$ -- Starting iteration 9
13:42:32.121 [scala-execution-context-global-176] INFO Main$ -- Starting iteration 10
13:42:32.121 [scala-execution-context-global-177] INFO Main$ -- Starting iteration 11
13:42:32.121 [scala-execution-context-global-178] INFO Main$ -- Starting iteration 12
13:42:32.121 [scala-execution-context-global-179] INFO Main$ -- Starting iteration 13
13:42:32.121 [scala-execution-context-global-181] INFO Main$ -- Starting iteration 15
13:42:32.121 [scala-execution-context-global-180] INFO Main$ -- Starting iteration 14
13:42:32.121 [scala-execution-context-global-182] INFO Main$ -- Starting iteration 16
13:42:33.120 [scala-execution-context-global-170] INFO Main$ -- Ending iteration 4
13:42:33.120 [scala-execution-context-global-169] INFO Main$ -- Ending iteration 3
13:42:33.120 [scala-execution-context-global-167] INFO Main$ -- Ending iteration 1
...
...
...
13:42:39.132 [scala-execution-context-global-170] INFO Main$ -- Ending iteration 98
13:42:39.132 [scala-execution-context-global-167] INFO Main$ -- Ending iteration 99
13:42:39.133 [scala-execution-context-global-178] INFO Main$ -- Ending iteration 100
13:42:39.133 [scala-execution-context-global-178] INFO Main$ -- Total time taken: 7057 ms

El tiempo necesario es de 7057 ms, algo mejor que el caso anterior ya que se están procesando los futuros en batches de 16. Sin embargo, el problema es similar.

El contexto de ejecución global de escala presenta la posibilidad de exceder el número de hilos si el código se envuelve con el método scala.concurrent.blocking. El método blocking funciona de tal manera que se ejecutará en un nuevo hilo. 

implicit val ec = ExecutionContext.global

val startTime = System.currentTimeMillis()

val futures = (1 to 100).map { n =>
     Future {
           logger.info("Starting iteration {}", n)
           val result = blocking { Service.callBlocking() }
           logger.info("Ending iteration {}", n)
           result
     }
}
val result = Future.sequence(futures)
result.onComplete { _ =>
     val endTime = System.currentTimeMillis()
     val totalTime = endTime - startTime
     logger.info("Total time taken: {} ms", totalTime)
}

El resultado es:

18:45:16.048 [scala-execution-context-global-180] INFO Main$ -- Starting iteration 16
18:45:16.047 [scala-execution-context-global-166] INFO Main$ -- Starting iteration 1
18:45:16.048 [scala-execution-context-global-177] INFO Main$ -- Starting iteration 11
18:45:16.048 [scala-execution-context-global-176] INFO Main$ -- Starting iteration 10
18:45:16.047 [scala-execution-context-global-167] INFO Main$ -- Starting iteration 2
18:45:16.047 [scala-execution-context-global-173] INFO Main$ -- Starting iteration 7
18:45:16.048 [scala-execution-context-global-172] INFO Main$ -- Starting iteration 8
18:45:16.048 [scala-execution-context-global-174] INFO Main$ -- Starting iteration 9
18:45:16.048 [scala-execution-context-global-175] INFO Main$ -- Starting iteration 12
18:45:16.048 [scala-execution-context-global-178] INFO Main$ -- Starting iteration 13
18:45:16.047 [scala-execution-context-global-168] INFO Main$ -- Starting iteration 3
18:45:16.047 [scala-execution-context-global-169] INFO Main$ -- Starting iteration 4
18:45:16.048 [scala-execution-context-global-179] INFO Main$ -- Starting iteration 14
18:45:16.047 [scala-execution-context-global-170] INFO Main$ -- Starting iteration 5
18:45:16.047 [scala-execution-context-global-171] INFO Main$ -- Starting iteration 6
18:45:16.048 [scala-execution-context-global-181] INFO Main$ -- Starting iteration 15
18:45:16.051 [scala-execution-context-global-182] INFO Main$ -- Starting iteration 17
18:45:16.051 [scala-execution-context-global-183] INFO Main$ -- Starting iteration 18
18:45:16.051 [scala-execution-context-global-184] INFO Main$ -- Starting iteration 19
18:45:16.052 [scala-execution-context-global-185] INFO Main$ -- Starting iteration 20
...
...
...
18:45:17.067 [scala-execution-context-global-259] INFO Main$ -- Ending iteration 94
18:45:17.067 [scala-execution-context-global-260] INFO Main$ -- Ending iteration 95
18:45:17.067 [scala-execution-context-global-261] INFO Main$ -- Ending iteration 96
18:45:17.068 [scala-execution-context-global-262] INFO Main$ -- Ending iteration 97
18:45:17.068 [scala-execution-context-global-263] INFO Main$ -- Ending iteration 98
18:45:17.068 [scala-execution-context-global-264] INFO Main$ -- Ending iteration 99
18:45:17.069 [scala-execution-context-global-265] INFO Main$ -- Ending iteration 100
18:45:17.069 [scala-execution-context-global-265] INFO Main$ -- Total time taken: 1069 ms

Se puede ver que se crea un hilo por llamada al servicio. Todos los futuros se ejecutan de manera concurrente y el tiempo necesario para hacer las 100 llamadas es de 1069 ms, considerablemente más rápido que el resto. Sin embargo, esto que parece funcionar bien con 100 llamadas al servicio, no parece escalar demasiado bien para procesar, por ejemplo, cientos de miles de llamadas. Es inviable seguir creando un nuevo hilo para cada llamada al servicio sin ningún límite.

Figura 3. Se crea un nuevo hilo para cada llamada.

Llamadas no bloqueantes

Todos los problemas vienen porque la llamada al servicio callBlocking() es bloqueante. Simulemos ahora una llamada asíncrona a este servicio:

private object Service {
    def callBlocking(): Int = {
        Thread.sleep(1000)
        99
    }

    def callNonBlocking(implicit ec: ExecutionContext): Future[Int] = {
        val promise = Promise[Int]()
        val executor: ScheduledExecutorService =       Executors.newSingleThreadScheduledExecutor()
        executor.schedule(new Runnable {
        override def run(): Unit = {
            promise.success(99)
            executor.shutdown()
        }}, 1, java.util.concurrent.TimeUnit.SECONDS)
    promise.future
    }
}

El código programa una tarea que se ejecutará en el futuro (en un segundo) sin bloquear el hilo. Utilizando este callNonBLocking() tenemos:

implicit val ec = ExecutionContext.fromExecutorService {
    Executors.newFixedThreadPool(8)
}
val startTime = System.currentTimeMillis()
val futures = (1 to total).map { n =>

logger.info("Starting iteration {}", n)
val result = Service.callNonBlocking
result.onComplete(_ =>
    logger.info("Ending iteration {}", n))
    result
}

val result = Future.sequence(futures)
result.onComplete { _ =>
    val endTime = System.currentTimeMillis()
    val totalTime = endTime - startTime

    logger.info("Total time taken: {} ms", totalTime)
}

Los logs:

19:09:59.384 [pool-8-thread-4] INFO Main$ -- Starting iteration 4
19:09:59.384 [pool-8-thread-3] INFO Main$ -- Starting iteration 3
19:09:59.384 [pool-8-thread-2] INFO Main$ -- Starting iteration 2
19:09:59.384 [pool-8-thread-7] INFO Main$ -- Starting iteration 7
19:09:59.384 [pool-8-thread-8] INFO Main$ -- Starting iteration 8
19:09:59.384 [pool-8-thread-1] INFO Main$ -- Starting iteration 1
19:09:59.384 [pool-8-thread-6] INFO Main$ -- Starting iteration 6
19:09:59.384 [pool-8-thread-5] INFO Main$ -- Starting iteration 5
19:09:59.387 [pool-8-thread-4] INFO Main$ -- Starting iteration 9
19:09:59.387 [pool-8-thread-3] INFO Main$ -- Starting iteration 10
19:09:59.387 [pool-8-thread-2] INFO Main$ -- Starting iteration 11
19:09:59.387 [pool-8-thread-7] INFO Main$ -- Starting iteration 12
19:09:59.387 [pool-8-thread-8] INFO Main$ -- Starting iteration 13
19:09:59.387 [pool-8-thread-1] INFO Main$ -- Starting iteration 14
19:09:59.387 [pool-8-thread-6] INFO Main$ -- Starting iteration 15
...
...
...
19:10:41.860 [pool-8-thread-2] INFO Main$ -- Ending iteration 1
19:10:41.861 [pool-8-thread-1] INFO Main$ -- Ending iteration 3
19:10:41.861 [pool-8-thread-5] INFO Main$ -- Ending iteration 5
19:10:41.861 [pool-8-thread-6] INFO Main$ -- Ending iteration 2
19:10:41.861 [pool-8-thread-2] INFO Main$ -- Ending iteration 4
19:10:41.861 [pool-8-thread-3] INFO Main$ -- Ending iteration 6
19:10:41.861 [pool-8-thread-7] INFO Main$ -- Ending iteration 7
19:10:41.862 [pool-8-thread-8] INFO Main$ -- Ending iteration 8
19:10:41.862 [pool-8-thread-4] INFO Main$ -- Ending iteration 9
19:10:41.862 [pool-8-thread-1] INFO Main$ -- Ending iteration 10
19:10:41.863 [pool-8-thread-2] INFO Main$ -- Ending iteration 11
19:10:41.863 [pool-8-thread-5] INFO Main$ -- Ending iteration 12
...
...
...
19:10:41.875 [pool-8-thread-8] INFO Main$ -- Ending iteration 88
19:10:41.875 [pool-8-thread-4] INFO Main$ -- Ending iteration 89
19:10:41.875 [pool-8-thread-5] INFO Main$ -- Ending iteration 90
19:10:41.875 [pool-8-thread-8] INFO Main$ -- Ending iteration 91
19:10:41.875 [pool-8-thread-2] INFO Main$ -- Ending iteration 92
19:10:41.875 [pool-8-thread-5] INFO Main$ -- Ending iteration 93
19:10:41.875 [pool-8-thread-8] INFO Main$ -- Ending iteration 94
19:10:41.875 [pool-8-thread-2] INFO Main$ -- Ending iteration 95
19:10:41.875 [pool-8-thread-5] INFO Main$ -- Ending iteration 96
19:10:41.875 [pool-8-thread-8] INFO Main$ -- Ending iteration 97
19:10:41.876 [pool-8-thread-2] INFO Main$ -- Ending iteration 98
19:10:41.876 [pool-8-thread-5] INFO Main$ -- Ending iteration 99
19:10:41.876 [pool-8-thread-1] INFO Main$ -- Ending iteration 100
19:10:41.876 [pool-8-thread-6] INFO Main$ -- Total time taken: 1044 ms

Se puede ver cómo evitando la llamada bloqueante al servicio, con un thread pool de 8 hilos podemos procesar las 100 llamadas en 1044 ms. Esta solución escala mucho mejor que la anterior usando el blocking ya que no requiere la creación de tantos hilos. Recalcar que cada llamada al servicio sigue requiriendo de 1 segundo para completarse, la única diferencia es que no bloquea el hilo.

Figura 4. La llamada al servicio se hace de manera no bloqueante. Las cajas verdes simbolizan la llamada al método y las rojas cuando ésta finaliza.

La siguiente tabla muestra una comparativa de los diferentes métodos presentados:

Figura 5. Tiempo de ejecución de diferentes métodos en función del número de llamadas al servicio.

Conclusión

Hemos visto con un sencillo ejemplo que el código bloqueante es siempre bloqueante, y cómo encapsularlo en un Futuro no resuelve el problema de raíz. Idealmente, se debería evitar el código bloqueante utilizando frameworks, librerías y servicios que soporten nativamente la asincronía. Si es inevitable hacer llamadas bloqueantes, lo recomendable es crear un contexto de ejecución específico para este tipo de operaciones con un número fijo de hilos para que el resto de la aplicación siga ejecutándose.

Hasta aquí nuestro post de hoy. Si te ha parecido interesante, te animamos a visitar la categoría Data Engineering para ver todos los posts relacionados y a compartirlo en redes con tus contactos. ¡Hasta pronto!
Paul Sasieta
Paul Sasieta
Artículos: 4