Async Fetch Data from N Sources and Combine In Kotlin
Fetch Data from multiple sources asynchronously using Kotlin Coroutines and combine them in Kotlin
Use case
Let’s consider a use case where user profile information comes from multiple sources:-
- User Bio information from http API endpoint
- User profile pictures from S3 bucket
- User documents from FTP server
We want to use Kotlin coroutines to fetch the profile of 10 users from these 3 different sources asynchronously and return the combined result.
Program
We will do following things to achieve this:-
- We create 3 functions to fetch bio, pictures and documents asynchronously using
GlobalScope.async
, which return aDeferred
result sometime in future. These 3 functions are:-
fetchBioOverHttpAsync
fetchPicturesFromS3BucketAsync
fetchDocumentsFromFtpServerAsync
- Next, we create a function to fetch user profile
fetchUserProfileAsync
asynchronously, which wait for the results usingawait()
from above 3 sources as they arrive, combine them, and return theDeferred
result - Last, we create a function to fetch multiple user profiles
fetchUserProfiles
, which map the result usingawait()
for multiple user profile as they arrive and return the list of user profiles. - We also create a Coroutine dispatcher fixed thread pool of size 10 using Java executor service
Executors.newFixedThreadPool(10).asCoroutineDispatcher()
and use this to spawn the threads for all our asynchronous operations.
We use the suffix Async
in function names which returns Deferred
result, which is a good practice to identify asynchronous functions.
package com.example.concurrency
import kotlinx.coroutines.*
import java.util.concurrent.Executors
import kotlin.system.measureTimeMillis
val executor = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
data class Profile(val bio: Bio, val picture: List<String>, val documents: List<String>)
data class Bio(val name: String, val age: Int, val gender: String, val location: String)
fun main() {
runBlocking {
val timeElapsed = measureTimeMillis {
// fetch profile for 10 users
fetchUserProfiles(listOf("andrew", "billy", "charlie", "david", "emma", "flora", "gavin", "harry", "idris", "jack"))
.forEach(::println)
}
println("Time elapsed: $timeElapsed")
}
}
suspend fun fetchUserProfiles(profileIds: List<String>): List<Profile> = profileIds.map { fetchUserProfileAsync(it) }.map { it.await() }
fun fetchUserProfileAsync(profileId: String): Deferred<Profile> = GlobalScope.async(executor) {
val bio = fetchBioOverHttpAsync(profileId)
val picture = fetchPicturesFromS3BucketAsync(profileId)
val documents = fetchDocumentsFromFtpServerAsync(profileId)
Profile(bio.await(), picture.await(), documents.await())
}
fun fetchBioOverHttpAsync(id: String): Deferred<Bio> = GlobalScope.async(executor) {
println("fetchBioOverHttpAsync ${Thread.currentThread().name}")
delay(1000)
// delay to simulate 1 sec to fetch bio
// Here write code to fetch bio from API
Bio(id, (1..100).random(), listOf("male", "female", "na").random(), "location ${('a'..'z').random()}")
}
fun fetchPicturesFromS3BucketAsync(id: String): Deferred<List<String>> = GlobalScope.async(executor) {
println("fetchPictureFromDBAsync ${Thread.currentThread().name}")
delay(2000)
// delay to simulate 2 sec to fetch pictures
// Here write code to fetch pictures from S3 bucket
listOf("picture of $id")
}
fun fetchDocumentsFromFtpServerAsync(id: String): Deferred<List<String>> = GlobalScope.async(executor) {
println("fetchDocumentsFromFtpAsync ${Thread.currentThread().name}")
delay(5000)
// delay to simulate 5 sec to fetch documents
// Here write code to fetch documents from FTP server
listOf("document for $id")
}
Run
/jdk-11.0.10.jdk/Contents/Home/bin/java com.example.concurrency.FetchProfileCoroutineAsyncKt
fetchBioOverHttpAsync pool-1-thread-6
fetchBioOverHttpAsync pool-1-thread-8
fetchBioOverHttpAsync pool-1-thread-9
fetchBioOverHttpAsync pool-1-thread-10
fetchBioOverHttpAsync pool-1-thread-2
fetchBioOverHttpAsync pool-1-thread-4
fetchPictureFromDBAsync pool-1-thread-5
fetchPictureFromDBAsync pool-1-thread-3
fetchPictureFromDBAsync pool-1-thread-7
fetchPictureFromDBAsync pool-1-thread-3
fetchPictureFromDBAsync pool-1-thread-5
fetchPictureFromDBAsync pool-1-thread-7
fetchDocumentsFromFtpAsync pool-1-thread-3
fetchDocumentsFromFtpAsync pool-1-thread-7
fetchDocumentsFromFtpAsync pool-1-thread-5
fetchDocumentsFromFtpAsync pool-1-thread-7
fetchDocumentsFromFtpAsync pool-1-thread-3
fetchDocumentsFromFtpAsync pool-1-thread-5
fetchBioOverHttpAsync pool-1-thread-7
fetchBioOverHttpAsync pool-1-thread-3
fetchBioOverHttpAsync pool-1-thread-5
fetchBioOverHttpAsync pool-1-thread-8
fetchDocumentsFromFtpAsync pool-1-thread-5
fetchPictureFromDBAsync pool-1-thread-9
fetchPictureFromDBAsync pool-1-thread-2
fetchDocumentsFromFtpAsync pool-1-thread-10
fetchDocumentsFromFtpAsync pool-1-thread-3
fetchDocumentsFromFtpAsync pool-1-thread-4
fetchPictureFromDBAsync pool-1-thread-7
fetchPictureFromDBAsync pool-1-thread-1
Profile(bio=Bio(name=andrew, age=46, gender=male, location=location o), picture=[picture of andrew], documents=[document for andrew])
Profile(bio=Bio(name=billy, age=78, gender=na, location=location b), picture=[picture of billy], documents=[document for billy])
Profile(bio=Bio(name=charlie, age=69, gender=male, location=location n), picture=[picture of charlie], documents=[document for charlie])
Profile(bio=Bio(name=david, age=18, gender=female, location=location u), picture=[picture of david], documents=[document for david])
Profile(bio=Bio(name=emma, age=66, gender=female, location=location p), picture=[picture of emma], documents=[document for emma])
Profile(bio=Bio(name=flora, age=65, gender=female, location=location v), picture=[picture of flora], documents=[document for flora])
Profile(bio=Bio(name=gavin, age=76, gender=na, location=location v), picture=[picture of gavin], documents=[document for gavin])
Profile(bio=Bio(name=harry, age=32, gender=female, location=location i), picture=[picture of harry], documents=[document for harry])
Profile(bio=Bio(name=idris, age=18, gender=female, location=location b), picture=[picture of idris], documents=[document for idris])
Profile(bio=Bio(name=jack, age=42, gender=na, location=location p), picture=[picture of jack], documents=[document for jack])
Time elapsed 5038
We simulated our program to fetch user bio, pictures and documents in 1s, 2s, and 5s respectively, a total of 8s to fetch single user profile. With the help of asynchronous programming using Coroutines, we were able to fetch 10 user profiles in around 5s.
You see the total time taken (~5s) is almost equal to max delayed function i.e. fetch documents with delay (5s). Isn’t it amazing?
In Kotlin, each async operation GlobalScope.async {}
is a coroutine, which returns a Deferred
result. Unlike threads, coroutines are not bound to any particular thread. A coroutine can start executing in one thread, suspend execution, and resume on a different thread.
Thread vs Coroutines
Kotlin coroutines are lightweight version of Java threads. Coroutines also use the Java thread pool behind the scene though there is a difference how coroutines perform tasks.
Thread
When a thread is performing a task which block for some time (IO operation or network call), thread needs to wait. Other threads in the thread pool can take turn to perform some other tasks while the previous thread is waiting. Operating system determines the scheduling of threads and context-switching, which is an extra overhead.
Coroutines
When a coroutine is performing a task under a thread which block for some time (IO operation or network call), coroutine is suspended, same thread can perform some other coroutines while the previous coroutine is waiting. Programming language (Kotlin) determines when to switch coroutines, which is lightweight.
In our program, while coroutine fetchDocumentsFromFtpAsync
wait for 5s, it get suspended, and the current thread can perform some other coroutines in that period say fetchBioOverHttpAsync
and fetchPictureFromDBAsync
in 1s and 2s (total 3s), then it resumes back fetchDocumentsFromFtpAsync
. In this way, coroutines able to fetch bio, pictures, and document all in 5s using a single thread. This is just an example, in practical, these coroutines might have performed by different threads, but you got the gist. Coroutine provide a very high level of concurrency because of its non blocking nature and less overhead of switching threads.
If you use threads without coroutines, like Java, it would take 8s to fetch bio, pictures and document by a single thread.