Async Fetch Data from N Sources and Combine In Java
Fetch Data from multiple sources asynchronously using Java executor fixed thread pool and combine them in Java
Use case
Let’s consider a use case where user profile information comes from multiple sources:-
- User Bio information from http API endpoint
- User profile pictures from S3 bucket
- User documents from FTP server
We want to use Java multi-threading to fetch the profile of 5 users from these 3 different sources asynchronously and return the combined result.
Program
We will do following things to achieve this:-
- We create 3 functions to fetch bio, pictures and documents asynchronously using
CompletableFuture.supplyAsync
, which return aCompletableFuture
means some result in future. These 3 functions are:-
fetchBioOverHttpAsync
fetchPicturesFromS3BucketAsync
fetchDocumentsFromFtpServerAsync
- Next, we create a function to fetch user profile
fetchUserProfileAsync
asynchronously, which combines the result usingCompletableFuture::join
from above 3 functions as they arrive and return theCompletableFuture
- Last, we create a function to fetch multiple user profiles
fetchUserProfiles
, which combines the result usingCompletableFuture::join
for multiple user profile as they arrive and return the list of user profiles. - We also create a fixed thread pool of size 10 using Java executor service
Executors.newFixedThreadPool(10)
and use this to spawn the threads for all our asynchronous operations.
We use the suffix Async
in function names which returns CompletableFuture
which is a good practice to identify asynchronous functions.
package com.example.thread;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class FetchProfileDataFromNSourcesAsync {
ExecutorService executor = Executors.newFixedThreadPool(6);
public static void main(String[] args) {
Instant start = Instant.now();
FetchProfileDataFromNSourcesAsync async = new FetchProfileDataFromNSourcesAsync();
List<Profile> profiles = async.fetchUserProfiles(Arrays.asList("andrew", "billy", "charlie", "david", "emma"));
Instant finish = Instant.now();
long timeElapsed = Duration.between(start, finish).toMillis();
System.out.println("Time elapsed " + timeElapsed);
System.out.println("Profiles " + profiles);
System.exit(0);
}
List<Profile> fetchUserProfiles(List<String> profileIds){
List<CompletableFuture<Profile>> future = profileIds.stream()
.map((s) -> fetchUserProfileAsync(s))
.collect(Collectors.toList());
return future.stream().map(CompletableFuture::join)
.collect(Collectors.toList());
}
CompletableFuture<Profile> fetchUserProfileAsync(String profileId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("fetchUserProfile " + Thread.currentThread().getName());
List<Object> result = Stream.of(fetchBioOverHttpAsync(profileId), fetchPicturesFromS3BucketAsync(profileId), fetchDocumentsFromFtpServerAsync(profileId))
.map(CompletableFuture::join)
.collect(Collectors.toList());
return new Profile((Bio)result.get(0), (List<String>) result.get(1), (List<String>) result.get(2));
}, executor);
}
CompletableFuture<Bio> fetchBioOverHttpAsync(String profileId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("fetchBioOverHttpAsync " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Thread.sleep to simulate 1 sec to fetch bio
// Here write code to fetch bio from API
Random random = new Random();
List<String> gender = Arrays.asList("male", "female", "na");
char[] alphabet = "abcdefghijklmnopqrstuvwxyz".toCharArray();
Collections.shuffle(gender);
return new Bio(profileId, random.nextInt(100), gender.get(0), "location " + alphabet[random.nextInt(25)+1]);
}, executor);
}
CompletableFuture<List<String>> fetchPicturesFromS3BucketAsync(String profileId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("fetchPicturesFromS3BucketAsync " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Thread.sleep to simulate 2 sec to fetch pictures
// Here write code to fetch pictures from S3 bucket
return Arrays.asList("picture of " + profileId);
}, executor);
}
CompletableFuture<List<String>> fetchDocumentsFromFtpServerAsync(String profileId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("fetchDocumentsFromFtpServerAsync " + Thread.currentThread().getName());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Thread.sleep to simulate 5 sec to fetch documents
// Here write code to fetch documents from FTP server
return Arrays.asList("document of " + profileId);
}, executor);
}
}
class Profile {
Bio bio;
List<String> pictures;
List<String> documents;
public Profile(Bio bio, List<String> pictures, List<String> documents) {
this.bio = bio;
this.pictures = pictures;
this.documents = documents;
}
@Override
public String toString() {
return "Profile{" +
"bio=" + bio +
", pictures=" + pictures +
", documents=" + documents +
'}';
}
}
class Bio {
String name;
Integer age;
String gender;
String location;
public Bio(String name, Integer age, String gender, String location) {
this.name = name;
this.age = age;
this.gender = gender;
this.location = location;
}
@Override
public String toString() {
return "Bio{" +
"name='" + name + '\'' +
", age=" + age +
", gender='" + gender + '\'' +
", location='" + location + '\'' +
'}';
}
}
Run
/jdk-11.0.10.jdk/Contents/Home/bin/java com.example.thread.FetchProfileDataFromNSourcesAsync
fetchUserProfile pool-1-thread-5
fetchUserProfile pool-1-thread-4
fetchUserProfile pool-1-thread-1
fetchUserProfile pool-1-thread-2
fetchUserProfile pool-1-thread-3
fetchBioOverHttpAsync pool-1-thread-6
fetchBioOverHttpAsync pool-1-thread-9
fetchBioOverHttpAsync pool-1-thread-10
fetchBioOverHttpAsync pool-1-thread-8
fetchBioOverHttpAsync pool-1-thread-7
fetchPicturesFromS3BucketAsync pool-1-thread-9
fetchPicturesFromS3BucketAsync pool-1-thread-6
fetchPicturesFromS3BucketAsync pool-1-thread-8
fetchPicturesFromS3BucketAsync pool-1-thread-10
fetchPicturesFromS3BucketAsync pool-1-thread-7
fetchDocumentsFromFtpServerAsync pool-1-thread-9
fetchDocumentsFromFtpServerAsync pool-1-thread-6
fetchDocumentsFromFtpServerAsync pool-1-thread-10
fetchDocumentsFromFtpServerAsync pool-1-thread-8
fetchDocumentsFromFtpServerAsync pool-1-thread-7
Time elapsed 8058
Profiles [Profile{bio=Bio{name='andrew', age=50, gender='male', location='location c'}, pictures=[picture of andrew], documents=[document of andrew]},
Profile{bio=Bio{name='billy', age=33, gender='female', location='location x'}, pictures=[picture of billy], documents=[document of billy]},
Profile{bio=Bio{name='charlie', age=27, gender='na', location='location n'}, pictures=[picture of charlie], documents=[document of charlie]},
Profile{bio=Bio{name='david', age=38, gender='male', location='location y'}, pictures=[picture of david], documents=[document of david]},
Profile{bio=Bio{name='emma', age=10, gender='male', location='location d'}, pictures=[picture of emma], documents=[document of emma]}]
Process finished with exit code 0
We simulated our program to fetch user bio, pictures and documents in 1s, 2s, and 5s respectively, a total of 8s to fetch single user profile. With the help of asynchronous programming, we were able to fetch 5 user profiles in the same time.