Async Fetch Data from N Sources and Combine In Java Async Fetch Data from N Sources and Combine In Java

Fetch Data from multiple sources asynchronously using Java executor fixed thread pool and combine them in Java

Use case

Let’s consider a use case where user profile information comes from multiple sources:-

  1. User Bio information from http API endpoint
  2. User profile pictures from S3 bucket
  3. User documents from FTP server

We want to use Java multi-threading to fetch the profile of 5 users from these 3 different sources asynchronously and return the combined result.

Program

We will do following things to achieve this:-

  1. We create 3 functions to fetch bio, pictures and documents asynchronously using CompletableFuture.supplyAsync, which return a CompletableFuture means some result in future. These 3 functions are:-
    fetchBioOverHttpAsync
    fetchPicturesFromS3BucketAsync
    fetchDocumentsFromFtpServerAsync
  2. Next, we create a function to fetch user profile fetchUserProfileAsync asynchronously, which combines the result using CompletableFuture::join from above 3 functions as they arrive and return the CompletableFuture
  3. Last, we create a function to fetch multiple user profiles fetchUserProfiles, which combines the result using CompletableFuture::join for multiple user profile as they arrive and return the list of user profiles.
  4. We also create a fixed thread pool of size 10 using Java executor service Executors.newFixedThreadPool(10) and use this to spawn the threads for all our asynchronous operations.

We use the suffix Async in function names which returns CompletableFuture which is a good practice to identify asynchronous functions.

package com.example.thread;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class FetchProfileDataFromNSourcesAsync {

    ExecutorService executor = Executors.newFixedThreadPool(6);

    public static void main(String[] args) {
        Instant start = Instant.now();
        FetchProfileDataFromNSourcesAsync async = new FetchProfileDataFromNSourcesAsync();
        List<Profile> profiles = async.fetchUserProfiles(Arrays.asList("andrew", "billy", "charlie", "david", "emma"));
        Instant finish = Instant.now();
        long timeElapsed = Duration.between(start, finish).toMillis();
        System.out.println("Time elapsed " + timeElapsed);
        System.out.println("Profiles " + profiles);
        System.exit(0);
    }

    List<Profile> fetchUserProfiles(List<String> profileIds){
        List<CompletableFuture<Profile>> future = profileIds.stream()
            .map((s) -> fetchUserProfileAsync(s))
            .collect(Collectors.toList());
        return future.stream().map(CompletableFuture::join)
            .collect(Collectors.toList());
    }

    CompletableFuture<Profile> fetchUserProfileAsync(String profileId) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("fetchUserProfile " + Thread.currentThread().getName());
            List<Object> result = Stream.of(fetchBioOverHttpAsync(profileId), fetchPicturesFromS3BucketAsync(profileId), fetchDocumentsFromFtpServerAsync(profileId))
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
            return new Profile((Bio)result.get(0), (List<String>) result.get(1), (List<String>) result.get(2));
        }, executor);
    }

    CompletableFuture<Bio> fetchBioOverHttpAsync(String profileId) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("fetchBioOverHttpAsync " + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // Thread.sleep to simulate 1 sec to fetch bio
            // Here write code to fetch bio from API
            Random random = new Random();
            List<String> gender = Arrays.asList("male", "female", "na");
            char[] alphabet = "abcdefghijklmnopqrstuvwxyz".toCharArray();
            Collections.shuffle(gender);
            return new Bio(profileId, random.nextInt(100), gender.get(0), "location " + alphabet[random.nextInt(25)+1]);
        }, executor);
    }

    CompletableFuture<List<String>> fetchPicturesFromS3BucketAsync(String profileId) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("fetchPicturesFromS3BucketAsync " + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // Thread.sleep to simulate 2 sec to fetch pictures
            // Here write code to fetch pictures from S3 bucket
            return Arrays.asList("picture of " + profileId);
        }, executor);
    }

    CompletableFuture<List<String>> fetchDocumentsFromFtpServerAsync(String profileId) {
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("fetchDocumentsFromFtpServerAsync " + Thread.currentThread().getName());
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // Thread.sleep to simulate 5 sec to fetch documents
            // Here write code to fetch documents from FTP server
            return Arrays.asList("document of " + profileId);
        }, executor);
    }
}

class Profile {
    Bio bio;
    List<String> pictures;
    List<String> documents;

    public Profile(Bio bio, List<String> pictures, List<String> documents) {
        this.bio = bio;
        this.pictures = pictures;
        this.documents = documents;
    }

    @Override
    public String toString() {
        return "Profile{" +
            "bio=" + bio +
            ", pictures=" + pictures +
            ", documents=" + documents +
            '}';
    }
}

class Bio {
    String name;
    Integer age;
    String gender;
    String location;

    public Bio(String name, Integer age, String gender, String location) {
        this.name = name;
        this.age = age;
        this.gender = gender;
        this.location = location;
    }

    @Override
    public String toString() {
        return "Bio{" +
            "name='" + name + '\'' +
            ", age=" + age +
            ", gender='" + gender + '\'' +
            ", location='" + location + '\'' +
            '}';
    }
}

Run

/jdk-11.0.10.jdk/Contents/Home/bin/java com.example.thread.FetchProfileDataFromNSourcesAsync
fetchUserProfile pool-1-thread-5
fetchUserProfile pool-1-thread-4
fetchUserProfile pool-1-thread-1
fetchUserProfile pool-1-thread-2
fetchUserProfile pool-1-thread-3
fetchBioOverHttpAsync pool-1-thread-6
fetchBioOverHttpAsync pool-1-thread-9
fetchBioOverHttpAsync pool-1-thread-10
fetchBioOverHttpAsync pool-1-thread-8
fetchBioOverHttpAsync pool-1-thread-7
fetchPicturesFromS3BucketAsync pool-1-thread-9
fetchPicturesFromS3BucketAsync pool-1-thread-6
fetchPicturesFromS3BucketAsync pool-1-thread-8
fetchPicturesFromS3BucketAsync pool-1-thread-10
fetchPicturesFromS3BucketAsync pool-1-thread-7
fetchDocumentsFromFtpServerAsync pool-1-thread-9
fetchDocumentsFromFtpServerAsync pool-1-thread-6
fetchDocumentsFromFtpServerAsync pool-1-thread-10
fetchDocumentsFromFtpServerAsync pool-1-thread-8
fetchDocumentsFromFtpServerAsync pool-1-thread-7
Time elapsed 8058
Profiles [Profile{bio=Bio{name='andrew', age=50, gender='male', location='location c'}, pictures=[picture of andrew], documents=[document of andrew]}, 
    Profile{bio=Bio{name='billy', age=33, gender='female', location='location x'}, pictures=[picture of billy], documents=[document of billy]}, 
    Profile{bio=Bio{name='charlie', age=27, gender='na', location='location n'}, pictures=[picture of charlie], documents=[document of charlie]}, 
    Profile{bio=Bio{name='david', age=38, gender='male', location='location y'}, pictures=[picture of david], documents=[document of david]}, 
    Profile{bio=Bio{name='emma', age=10, gender='male', location='location d'}, pictures=[picture of emma], documents=[document of emma]}]

Process finished with exit code 0

We simulated our program to fetch user bio, pictures and documents in 1s, 2s, and 5s respectively, a total of 8s to fetch single user profile. With the help of asynchronous programming, we were able to fetch 5 user profiles in the same time.