Blocking calls and asynchronous programming with Scala 

Introduction

This post is about the impact that blocking code can have on an application and the importance of using libraries that natively support asynchronous programming. The goal is to understand that a blocking call is always blocking.

In the article, we use the Scala programming language with its tool for working with asynchronous code, Futures. However, the concept is extrapolable to other scenarios.

Blocking calls

Let’s suppose that we have a service external to our application from which we want to obtain a result. This in practice can be a simple call to a database.

object Service {
   def callBlocking(): Int = {
     Thread.sleep(1000) // simulate long computation
     99
} 

The callBLocking() method simulates calling the service by blocking the thread for 1 second. Why is this a problem? Suppose we want to make 100 calls to this service:

val startTime = System.currentTimeMillis()

(1 to 100).map { n =>
      logger.info("Starting iteration {}", n)
      val result = Service.callBlocking()
      logger.info("Ending iteration {}", n)
      result
}

val endTime = System.currentTimeMillis()
val totalTime = endTime - startTime
logger.info("Total time taken: {} ms", totalTime)

The result is:

13:19:40.054 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 1
13:19:41.057 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 1
13:19:41.058 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 2
13:19:42.059 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 2
13:19:42.059 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 3
13:19:43.060 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 3
13:19:43.060 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 4
13:19:44.061 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 4
13:19:44.061 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 5
13:19:45.062 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 5
13:19:45.063 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 6
13:19:46.063 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 6
...
...
...
13:21:19.146 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 99
13:21:19.147 [sbt-bg-threads-1] INFO Main$ -- Starting iteration 100
13:21:20.147 [sbt-bg-threads-1] INFO Main$ -- Ending iteration 100
13:21:20.150 [sbt-bg-threads-1] INFO Main$ -- Total time taken: 100122 ms

You can see that the 100 calls to the service are being executed sequentially on the same thread, taking a total of 100122 ms to complete. This seems quite improvable, doesn’t it?


Figure 1: Each box represents a call for service. The calls are made sequentially in the same thread.

Blocking calls with Futures

The most straightforward change is to wrap the blocking call in a Future, so that it runs on a different thread.

implicit val ec = ExecutionContext.fromExecutorService {   
Executors.newFixedThreadPool(8)
}

val startTime = System.currentTimeMillis()

val futures = (1 to 100).map { n =>
     Future {
           logger.info("Starting iteration {}", n)
           val result = Service.callBlocking()  
           logger.info("Ending iteration {}", n)
           result
     }
}

val result = Future.sequence(futures)
result.onComplete { _ =>
      val endTime = System.currentTimeMillis()
      val totalTime = endTime - startTime logger.info("Total time taken: {} ms", totalTime)
}

A set of 8 threads is used to execute the futures. The result is:

13:30:57.320 [pool-8-thread-5] INFO Main$ -- Starting iteration 5
13:30:57.320 [pool-8-thread-8] INFO Main$ -- Starting iteration 8
13:30:57.319 [pool-8-thread-6] INFO Main$ -- Starting iteration 6
13:30:57.319 [pool-8-thread-3] INFO Main$ -- Starting iteration 3
13:30:57.320 [pool-8-thread-2] INFO Main$ -- Starting iteration 2
13:30:57.319 [pool-8-thread-1] INFO Main$ -- Starting iteration 1
13:30:57.320 [pool-8-thread-4] INFO Main$ -- Starting iteration 4
13:30:57.320 [pool-8-thread-7] INFO Main$ -- Starting iteration 7
13:30:58.322 [pool-8-thread-8] INFO Main$ -- Ending iteration 8
13:30:58.322 [pool-8-thread-5] INFO Main$ -- Ending iteration 5
13:30:58.322 [pool-8-thread-6] INFO Main$ -- Ending iteration 6
13:30:58.324 [pool-8-thread-6] INFO Main$ -- Starting iteration 11
13:30:58.323 [pool-8-thread-3] INFO Main$ -- Ending iteration 3
13:30:58.323 [pool-8-thread-8] INFO Main$ -- Starting iteration 9
13:30:58.323 [pool-8-thread-2] INFO Main$ -- Ending iteration 2
13:30:58.323 [pool-8-thread-1] INFO Main$ -- Ending iteration 1
13:30:58.323 [pool-8-thread-4] INFO Main$ -- Ending iteration 4
13:30:58.323 [pool-8-thread-7] INFO Main$ -- Ending iteration 7
13:30:58.324 [pool-8-thread-5] INFO Main$ -- Starting iteration 10
13:30:58.325 [pool-8-thread-3] INFO Main$ -- Starting iteration 12
13:30:58.326 [pool-8-thread-2] INFO Main$ -- Starting iteration 13
...
...
...
13:31:10.335 [pool-8-thread-2] INFO Main$ -- Ending iteration 97
13:31:10.335 [pool-8-thread-6] INFO Main$ -- Ending iteration 98
13:31:10.335 [pool-8-thread-8] INFO Main$ -- Ending iteration 99
13:31:10.336 [pool-8-thread-5] INFO Main$ -- Ending iteration 100
13:31:10.336 [pool-8-thread-5] INFO Main$ -- Total time taken: 13057 ms

You can see how the calls are running concurrently, taking 13057 ms to complete. This is a considerable improvement, but can it be done better? Are they being executed concurrently? In the logs you can see how the calls are being completed in batches of 8. This is because the thread pool is 8 and the blocking calls end up blocking all of them. This is important to understand: blocking code is blocking. 

In this case, Thread.sleep(1000) when executed blocks the thread for 1 second, which makes it impossible to run anything else until it finishes. Wrapping a blocking call in a Future does not magically transform it into non-blocking.


Figure 2. A 4-wire pol is used to make the calls. These are made in batches of 4.

In practice, instead of using your own execution context, it is very common to use Scala’s global execution context, but the idea is the same. The global execution context creates a set of threads equal to the number of processors on the machine, in my case 16 threads.

implicit val ec = ExecutionContext.global
val poolSize = global match {
     case ec: ExecutionContextExecutorService =>
        ec.asInstanceOf[ForkJoinPool].getParallelism()
}
logger.warn("Global EC pool size: {}", poolSize)

val startTime = System.currentTimeMillis()
val futures = (1 to 100).map { n =>
     Future {
     		logger.info("Starting iteration {}", n)
     		val result = Service.callBlocking()
     		logger.info("Ending iteration {}", n)
     		result
    }
}

val result = Future.sequence(futures)
result.onComplete { _ =>
      val endTime = System.currentTimeMillis()
      val totalTime = endTime - startTime
      logger.info("Total time taken: {} ms", totalTime)
}

The result is identical to the previous one, but with more threads.

13:42:32.074 [sbt-bg-threads-1] WARN Main$ -- Global EC pool size: 16
13:42:32.119 [scala-execution-context-global-167] INFO Main$ -- Starting iteration 1
13:42:32.119 [scala-execution-context-global-168] INFO Main$ -- Starting iteration 2
13:42:32.119 [scala-execution-context-global-169] INFO Main$ -- Starting iteration 3
13:42:32.119 [scala-execution-context-global-170] INFO Main$ -- Starting iteration 4
13:42:32.119 [scala-execution-context-global-172] INFO Main$ -- Starting iteration 5
13:42:32.120 [scala-execution-context-global-171] INFO Main$ -- Starting iteration 6
13:42:32.120 [scala-execution-context-global-173] INFO Main$ -- Starting iteration 7
13:42:32.120 [scala-execution-context-global-174] INFO Main$ -- Starting iteration 8
13:42:32.120 [scala-execution-context-global-175] INFO Main$ -- Starting iteration 9
13:42:32.121 [scala-execution-context-global-176] INFO Main$ -- Starting iteration 10
13:42:32.121 [scala-execution-context-global-177] INFO Main$ -- Starting iteration 11
13:42:32.121 [scala-execution-context-global-178] INFO Main$ -- Starting iteration 12
13:42:32.121 [scala-execution-context-global-179] INFO Main$ -- Starting iteration 13
13:42:32.121 [scala-execution-context-global-181] INFO Main$ -- Starting iteration 15
13:42:32.121 [scala-execution-context-global-180] INFO Main$ -- Starting iteration 14
13:42:32.121 [scala-execution-context-global-182] INFO Main$ -- Starting iteration 16
13:42:33.120 [scala-execution-context-global-170] INFO Main$ -- Ending iteration 4
13:42:33.120 [scala-execution-context-global-169] INFO Main$ -- Ending iteration 3
13:42:33.120 [scala-execution-context-global-167] INFO Main$ -- Ending iteration 1
...
...
...
13:42:39.132 [scala-execution-context-global-170] INFO Main$ -- Ending iteration 98
13:42:39.132 [scala-execution-context-global-167] INFO Main$ -- Ending iteration 99
13:42:39.133 [scala-execution-context-global-178] INFO Main$ -- Ending iteration 100
13:42:39.133 [scala-execution-context-global-178] INFO Main$ -- Total time taken: 7057 ms

The time required is 7057 ms, which is somewhat better than the previous case since the futures are being processed in batches of 16. However, the problem is similar.

The global execution context of scaling presents the possibility of exceeding the number of threads if the code is wrapped with the scala.concurrent.blocking method. The blocking method works in such a way that it will run on a new thread.

implicit val ec = ExecutionContext.global

val startTime = System.currentTimeMillis()

val futures = (1 to 100).map { n =>
     Future {
          logger.info("Starting iteration {}", n)
          val result = blocking { Service.callBlocking() }
          logger.info("Ending iteration {}", n)
          result
     }
}
val result = Future.sequence(futures)
result.onComplete { _ =>
     val endTime = System.currentTimeMillis()
     val totalTime = endTime - startTime
     logger.info("Total time taken: {} ms", totalTime)
}

The result is:

18:45:16.048 [scala-execution-context-global-180] INFO Main$ -- Starting iteration 16
18:45:16.047 [scala-execution-context-global-166] INFO Main$ -- Starting iteration 1
18:45:16.048 [scala-execution-context-global-177] INFO Main$ -- Starting iteration 11
18:45:16.048 [scala-execution-context-global-176] INFO Main$ -- Starting iteration 10
18:45:16.047 [scala-execution-context-global-167] INFO Main$ -- Starting iteration 2
18:45:16.047 [scala-execution-context-global-173] INFO Main$ -- Starting iteration 7
18:45:16.048 [scala-execution-context-global-172] INFO Main$ -- Starting iteration 8
18:45:16.048 [scala-execution-context-global-174] INFO Main$ -- Starting iteration 9
18:45:16.048 [scala-execution-context-global-175] INFO Main$ -- Starting iteration 12
18:45:16.048 [scala-execution-context-global-178] INFO Main$ -- Starting iteration 13
18:45:16.047 [scala-execution-context-global-168] INFO Main$ -- Starting iteration 3
18:45:16.047 [scala-execution-context-global-169] INFO Main$ -- Starting iteration 4
18:45:16.048 [scala-execution-context-global-179] INFO Main$ -- Starting iteration 14
18:45:16.047 [scala-execution-context-global-170] INFO Main$ -- Starting iteration 5
18:45:16.047 [scala-execution-context-global-171] INFO Main$ -- Starting iteration 6
18:45:16.048 [scala-execution-context-global-181] INFO Main$ -- Starting iteration 15
18:45:16.051 [scala-execution-context-global-182] INFO Main$ -- Starting iteration 17
18:45:16.051 [scala-execution-context-global-183] INFO Main$ -- Starting iteration 18
18:45:16.051 [scala-execution-context-global-184] INFO Main$ -- Starting iteration 19
18:45:16.052 [scala-execution-context-global-185] INFO Main$ -- Starting iteration 20
...
...
...
18:45:17.067 [scala-execution-context-global-259] INFO Main$ -- Ending iteration 94
18:45:17.067 [scala-execution-context-global-260] INFO Main$ -- Ending iteration 95
18:45:17.067 [scala-execution-context-global-261] INFO Main$ -- Ending iteration 96
18:45:17.068 [scala-execution-context-global-262] INFO Main$ -- Ending iteration 97
18:45:17.068 [scala-execution-context-global-263] INFO Main$ -- Ending iteration 98
18:45:17.068 [scala-execution-context-global-264] INFO Main$ -- Ending iteration 99
18:45:17.069 [scala-execution-context-global-265] INFO Main$ -- Ending iteration 100
18:45:17.069 [scala-execution-context-global-265] INFO Main$ -- Total time taken: 1069 ms

It can be seen that one thread is created per service call. All futures run concurrently and the time required to make the 100 calls is 1069 ms, considerably faster than the rest. However, this which seems to work well with 100 service calls, does not seem to scale too well for processing, say, hundreds of thousands of calls. It is unfeasible to keep creating a new thread for each call to service without any limit.


Figure 3. A new thread is created for each call.

Non-blocking calls

All the problems come because the call to the callBlocking() service is blocking. Let us now simulate an asynchronous call to this service:

private object Service {
    def callBlocking(): Int = {
        Thread.sleep(1000)
        99
    }

    def callNonBlocking(implicit ec: ExecutionContext): Future[Int] = {
        val promise = Promise[Int]()
        val executor: ScheduledExecutorService =       Executors.newSingleThreadScheduledExecutor()
        executor.schedule(new Runnable {
        override def run(): Unit = {
            promise.success(99)
            executor.shutdown()
        }}, 1, java.util.concurrent.TimeUnit.SECONDS)
    promise.future
    }
}

Logs:

19:09:59.384 [pool-8-thread-4] INFO Main$ -- Starting iteration 4
19:09:59.384 [pool-8-thread-3] INFO Main$ -- Starting iteration 3
19:09:59.384 [pool-8-thread-2] INFO Main$ -- Starting iteration 2
19:09:59.384 [pool-8-thread-7] INFO Main$ -- Starting iteration 7
19:09:59.384 [pool-8-thread-8] INFO Main$ -- Starting iteration 8
19:09:59.384 [pool-8-thread-1] INFO Main$ -- Starting iteration 1
19:09:59.384 [pool-8-thread-6] INFO Main$ -- Starting iteration 6
19:09:59.384 [pool-8-thread-5] INFO Main$ -- Starting iteration 5
19:09:59.387 [pool-8-thread-4] INFO Main$ -- Starting iteration 9
19:09:59.387 [pool-8-thread-3] INFO Main$ -- Starting iteration 10
19:09:59.387 [pool-8-thread-2] INFO Main$ -- Starting iteration 11
19:09:59.387 [pool-8-thread-7] INFO Main$ -- Starting iteration 12
19:09:59.387 [pool-8-thread-8] INFO Main$ -- Starting iteration 13
19:09:59.387 [pool-8-thread-1] INFO Main$ -- Starting iteration 14
19:09:59.387 [pool-8-thread-6] INFO Main$ -- Starting iteration 15
...
...
...
19:10:41.860 [pool-8-thread-2] INFO Main$ -- Ending iteration 1
19:10:41.861 [pool-8-thread-1] INFO Main$ -- Ending iteration 3
19:10:41.861 [pool-8-thread-5] INFO Main$ -- Ending iteration 5
19:10:41.861 [pool-8-thread-6] INFO Main$ -- Ending iteration 2
19:10:41.861 [pool-8-thread-2] INFO Main$ -- Ending iteration 4
19:10:41.861 [pool-8-thread-3] INFO Main$ -- Ending iteration 6
19:10:41.861 [pool-8-thread-7] INFO Main$ -- Ending iteration 7
19:10:41.862 [pool-8-thread-8] INFO Main$ -- Ending iteration 8
19:10:41.862 [pool-8-thread-4] INFO Main$ -- Ending iteration 9
19:10:41.862 [pool-8-thread-1] INFO Main$ -- Ending iteration 10
19:10:41.863 [pool-8-thread-2] INFO Main$ -- Ending iteration 11
19:10:41.863 [pool-8-thread-5] INFO Main$ -- Ending iteration 12
...
...
...
19:10:41.875 [pool-8-thread-8] INFO Main$ -- Ending iteration 88
19:10:41.875 [pool-8-thread-4] INFO Main$ -- Ending iteration 89
19:10:41.875 [pool-8-thread-5] INFO Main$ -- Ending iteration 90
19:10:41.875 [pool-8-thread-8] INFO Main$ -- Ending iteration 91
19:10:41.875 [pool-8-thread-2] INFO Main$ -- Ending iteration 92
19:10:41.875 [pool-8-thread-5] INFO Main$ -- Ending iteration 93
19:10:41.875 [pool-8-thread-8] INFO Main$ -- Ending iteration 94
19:10:41.875 [pool-8-thread-2] INFO Main$ -- Ending iteration 95
19:10:41.875 [pool-8-thread-5] INFO Main$ -- Ending iteration 96
19:10:41.875 [pool-8-thread-8] INFO Main$ -- Ending iteration 97
19:10:41.876 [pool-8-thread-2] INFO Main$ -- Ending iteration 98
19:10:41.876 [pool-8-thread-5] INFO Main$ -- Ending iteration 99
19:10:41.876 [pool-8-thread-1] INFO Main$ -- Ending iteration 100
19:10:41.876 [pool-8-thread-6] INFO Main$ -- Total time taken: 1044 ms

You can see how by avoiding the blocking call to the service, with a thread pool of 8 threads we can process the 100 calls in 1044 ms. This solution scales much better than the previous one using blocking as it does not require the creation of so many threads. Note that each call to the service still requires 1 second to complete, the only difference is that it does not block the thread.


Figure 4. The call to the service is made in a non-blocking manner. The green boxes symbolize the method call and the red boxes symbolize when the method call is completed.

The following table shows a comparison of the different methods presented:


Figure 5. Execution time of different methods as a function of the number of calls to the service.

Conclusion

We have seen with a simple example that blocking code is always blocking, and how encapsulating it in a Future does not solve the root problem. Ideally, blocking code should be avoided by using frameworks, libraries and services that natively support asynchrony. If blocking calls are unavoidable, it is advisable to create a specific execution context for this type of operation with a fixed number of threads so that the rest of the application continues to run.

So far, this is the end of today’s post. If you found it interesting, we encourage you to visit the Data Engineering category to see all the related posts and to share it in networks with your contacts. See you soon!
Paul Sasieta
Paul Sasieta
Articles: 4