Async-Helper is a Java utility (also an OSGi bundle) to invoke/schedule tasks or fetch data asynchronously using tags/flags in a functional way. This internally utilizes ForkJoin pool to submit the tasks.
This contains various helper classes such as AsyncContext, AsyncTask, AsyncSupplier, SchedulingTask and SchedulingSupplier to perform various asynchronous operations.
Please refer to the JavaDocs also.
- Submitting one or more Runnable(s) to run asynchronously.
- Submitting one or more Supplier(s) to fetch some data asynchronously, which can be then obtained by a tags(key made of one or more Objects)
- Wait for some flag in one thread until that flag is notified in another thread.
- Schedule Runnable(s) and Supplier(s) one time or rotating until a flag.
- Some of the above operations also support option to submit/schedule asynchronously and then wait untill all asynchronous tasks are compete.
Please look into the Unit tests for all the use-cases and examples.
Also refer to the Wiki page for some example usages.
This release introduces 22 new enhanced features with comprehensive test coverage and documentation:
- Timeout Support - Operations with configurable timeouts for better control
- Batch Processing - Process collections concurrently with optional concurrency limits
- Resilience Patterns - Retry logic, fallback values, and custom error handlers
- Functional Composition - Chain operations and combine results elegantly
- Competitive Execution - Race conditions and fastest result selection
- Modern Integration - CompletableFuture support for seamless integration
- Monitoring & Control - Check pending status and cancel operations
- Multi-Flag Coordination - Wait for multiple flags with all/any semantics
- Advanced Scheduling - Execute N times with exponential backoff support
- Comprehensive README with 10+ real-world usage examples
- Full unit test coverage for all 22 new methods
- Migration guide documenting backward compatibility
See the New Enhanced Features section below for detailed API documentation and the Advanced Usage Examples section for comprehensive code examples.
-
Async-Helper is an OSGi bundle now :), to use it directly in OSGi applications.
-
Renamed Async helper class to AsyncContext so that there is option to limit the context of Asynchronous operations. The global context can be obtained using
AsyncContext.getDefault(). -
All the existing helper classes and their methods are now converted from static to instances, so that,
Either their default instances can be obtained using their getDefault() methods,
- AsyncContext ==>
AsyncContext.getDefault() - AsyncTask ==>
AsyncTask.getDefault() - AsyncSupplier ==>
AsyncSupplier.getDefault() - SchedulingTask ==>
SchedulingTask.getDefault() - SchedulingSupplier ==>
SchedulingSupplier.getDefault()
Or they can be instantiated with a specific arguments.
- AsyncContext ==>
AsyncContext.newInstance() - AsyncTask ==>
AsyncTask.of(ExecutorService)orAsyncTask.of(ExecutorService, AsyncContext) - AsyncSupplier ==>
AsyncSupplier.of(ExecutorService)orAsyncSupplier.of(ExecutorService, AsyncContext) - SchedulingTask ==>
SchedulingTask.of(ScheduledExecutorService)orSchedulingTask.of(ScheduledExecutorService, AsyncContext) - SchedulingSupplier ==>
SchedulingSupplier.of(ScheduledExecutorService)orSchedulingSupplier.of(ScheduledExecutorService, AsyncContext)
- AsyncContext ==>
-
The default instances of
AsyncTaskandAsyncSupplieruse a commonForkJoinPool. But it is possible to get customized instances of these can be obtained by passing a newExecutorServiceinstance. -
The default instances of
SchedulingTaskandSchedulingSupplieruse a commonScheduledExecutorService. But it is possible to get customized instances of these can be obtained by passing a newScheduledExecutorServiceinstance. -
AsyncTask includes a new static helper method
AsyncTask.submitTaskInNewThread(Runnable)to submit a task by spawning a new thread.
Async-Helper now includes 22 powerful new methods across AsyncSupplier, AsyncContext, SchedulingTask, and SchedulingSupplier classes, providing production-ready features for enterprise applications:
submitAndGetWithTimeout- Execute and retrieve results with a timeout, preventing indefinite waitswaitForFlagWithTimeout- Wait for flags with timeout control in AsyncContextwaitAndGetFromSupplierWithTimeout- Retrieve supplier results with timeout protection
submitAndProcessAll- Process collections in parallel with automatic result aggregationsubmitAndProcessAllWithLimit- Batch process with concurrency limits to prevent resource exhaustion
submitSupplierWithRetry- Automatic retry with configurable attempts and delayssubmitSupplierWithFallback- Graceful fallback values when operations failsubmitSupplierWithErrorHandler- Custom error handling with recovery logic
submitChained- Chain async operations with dependent transformationssubmitAndCombine- Combine multiple async operations into a single result
submitRace- Race multiple operations, return the fastest resultsubmitAndGetFastest- Execute multiple suppliers, get the first successful completion
submitAsCompletableFuture- Bridge to Java 8+ CompletableFuture API for advanced composition
isPending- Check if async operations are still runningcancelSupplier- Cancel running async suppliers
waitForAllFlags- Wait for multiple flags before proceedingwaitForAnyFlag- Proceed when any flag is set
scheduleTaskNTimes- Execute scheduled tasks exactly N timesscheduleTaskWithBackoff- Exponential backoff scheduling for resilient pollingscheduleSupplierNTimes- Schedule suppliers N times with result collection
The following internal improvements have been made to enhance code quality and reliability. These changes are fully backward compatible and require no migration:
- Changed logger calls from eager string concatenation to lazy evaluation using lambda suppliers
- Example:
logger.config(() -> e.getMessage())instead oflogger.config(e.getMessage()) - Impact: Improved performance when logging is disabled, no API changes
- Added
Thread.currentThread().interrupt()after catchingInterruptedException - Ensures interrupted status is properly restored per Java best practices
- Impact: Better thread interrupt propagation, no API changes
- Made internal fields
finalwhere appropriate (DEFAULT_INSTANCE, executor, scheduler, asyncContext, etc.) - Impact: Improved thread safety and code reliability, no API changes
- Fixed class-level Javadoc descriptions for accuracy
- Improved grammar and clarity in documentation
- Impact: Better documentation quality, no API changes
Migration Required: ❌ None - All improvements are internal implementation enhancements that maintain full backward compatibility with existing code.
<dependency>
<groupId>org.vishag</groupId>
<artifactId>async-helper</artifactId>
<version>4.1.0</version>
</dependency>
If it is desired to run a set of method calls or code blocks asynchronously, the Async-Helper library includes an useful helper method AsyncTask.submitTasks as in below snippet.
AsyncTask.getDefault().submitTasks(
() -> getMethodParam1(arg1, arg2),
() -> getMethodParam2(arg2, arg3)
() -> getMethodParam3(arg3, arg4),
() -> {
//Some other code to run asynchronously
}
);
If it is desired to wait till all asynchronous codes are completed running, the AsyncTask.submitTasksAndWait varient can be used.
Also if it is desired to obtain a return value from each of the asynchronous method call or code block, the AsyncSupplier.submitSuppliers can be used so that the result can be then obtained by from the result suppliers array returned by the method. Below is the sample snippet:
Supplier<Object>[] resultSuppliers =
AsyncSupplier.getDefault().submitSuppliers(
() -> getMethodParam1(arg1, arg2),
() -> getMethodParam2(arg3, arg4),
() -> getMethodParam3(arg5, arg6)
);
Object a = resultSuppliers[0].get();
Object b = resultSuppliers[1].get();
Object c = resultSuppliers[2].get();
These result can be then passed to the myBigMethod as below.
myBigMethod(a,b,c);
If the return type of each method differ, use the below kind of snippet.
Supplier<String> aResultSupplier = AsyncSupplier.getDefault().submitSupplier(() -> getMethodParam1(arg1, arg2));
Supplier<Integer> bResultSupplier = AsyncSupplier.getDefault().submitSupplier(() -> getMethodParam2(arg3, arg4));
Supplier<Object> cResultSupplier = AsyncSupplier.getDefault().submitSupplier(() -> getMethodParam3(arg5, arg6));
myBigMethod(aResultSupplier.get(), bResultSupplier.get(), cResultSupplier.get());
The result of the asynchronous method calls/code blocks can also be obtained at a different point of code in the same thread or a different thread as in the below snippet.
AsyncSupplier.getDefault().submitSupplierForSingleAccess(() -> getMethodParam1(arg1, arg2), "a");
AsyncSupplier.getDefault().submitSupplierForSingleAccess(() -> getMethodParam2(arg3, arg4), "b");
AsyncSupplier.getDefault().submitSupplierForSingleAccess(() -> getMethodParam3(arg5, arg6), "c");
//Following can be in the same thread or a different thread
Optional<String> aResult = AsyncSupplier.getDefault().waitAndGetFromSupplier(String.class, "a");
Optional<Integer> bResult = AsyncSupplier.getDefault().waitAndGetFromSupplier(Integer.class, "b");
Optional<Object> cResult = AsyncSupplier.getDefault().waitAndGetFromSupplier(Object.class, "c");
myBigMethod(aResult.get(),bResult.get(),cResult.get());
Prevent indefinite waits with timeout-enabled operations:
// Execute with timeout (returns Optional)
Optional<String> result = AsyncSupplier.getDefault()
.submitAndGetWithTimeout(() -> fetchDataFromAPI(), 5, TimeUnit.SECONDS);
if (result.isPresent()) {
System.out.println("Got result: " + result.get());
} else {
System.out.println("Operation timed out");
}
// Wait for flag with timeout in AsyncContext
boolean flagSet = AsyncContext.getDefault()
.waitForFlagWithTimeout(3, TimeUnit.SECONDS, "operationComplete");Process collections in parallel efficiently:
List<Integer> ids = Arrays.asList(1, 2, 3, 4, 5);
// Process all items in parallel
List<User> users = AsyncSupplier.getDefault()
.submitAndProcessAll(ids, id -> userService.fetchUser(id));
// Process with concurrency limit (max 3 parallel operations)
List<Result> results = AsyncSupplier.getDefault()
.submitAndProcessAllWithLimit(largeList, item -> processItem(item), 3);Build fault-tolerant applications:
// Automatic retry with delays
Supplier<String> resilientOp = AsyncSupplier.getDefault()
.submitSupplierWithRetry(() -> {
return callUnreliableService();
}, 3, 1000); // max 3 retries, 1 second delay
// Fallback value on failure
Supplier<Config> configSupplier = AsyncSupplier.getDefault()
.submitSupplierWithFallback(
() -> loadConfigFromRemote(),
getDefaultConfig() // fallback value
);
// Custom error handling
Supplier<Data> dataSupplier = AsyncSupplier.getDefault()
.submitSupplierWithErrorHandler(
() -> fetchData(),
ex -> {
logger.error("Failed to fetch data", ex);
return getCachedData();
}
);Chain and combine async operations:
// Chain dependent operations
Supplier<String> chained = AsyncSupplier.getDefault()
.submitChained(
() -> fetchUserId(),
userId -> fetchUserProfile(userId)
);
// Combine multiple independent operations
Supplier<Report> report = AsyncSupplier.getDefault()
.submitAndCombine(
results -> generateReport((Data1)results.get(0), (Data2)results.get(1)),
() -> fetchData1(),
() -> fetchData2()
);Get fastest results from multiple sources:
// Race multiple operations
Supplier<String> fastest = AsyncSupplier.getDefault()
.submitRace(
() -> fetchFromCache(),
() -> fetchFromDatabase(),
() -> fetchFromAPI()
);
String result = fastest.get(); // Returns result from fastest source
// Or get fastest directly as Optional
Optional<Data> fastestData = AsyncSupplier.getDefault()
.submitAndGetFastest(
() -> source1.getData(),
() -> source2.getData(),
() -> source3.getData()
);Bridge to modern Java async APIs:
CompletableFuture<String> future = AsyncSupplier.getDefault()
.submitAsCompletableFuture(() -> performOperation());
// Now use CompletableFuture's rich API
future.thenApply(String::toUpperCase)
.thenAccept(System.out::println)
.exceptionally(ex -> {
System.err.println("Error: " + ex.getMessage());
return null;
});Check and control async operations:
// Submit a long-running operation
AsyncSupplier.getDefault()
.submitSupplierForSingleAccess(() -> longRunningTask(), "taskKey");
// Check if still running
if (AsyncSupplier.getDefault().isPending("taskKey")) {
System.out.println("Task is still running...");
// Cancel if needed
boolean cancelled = AsyncSupplier.getDefault().cancelSupplier("taskKey");
}Coordinate multiple async operations:
// Start multiple operations that set flags when complete
AsyncTask.getDefault().submitTask(() -> {
processStep1();
AsyncContext.getDefault().notifyAllFlag("step1");
}, "task1");
AsyncTask.getDefault().submitTask(() -> {
processStep2();
AsyncContext.getDefault().notifyAllFlag("step2");
}, "task2");
// Wait for all steps to complete
AsyncContext.getDefault().waitForAllFlags(
new String[]{"step1"},
new String[]{"step2"}
);
// Or wait for any step to complete
String[] firstCompleted = AsyncContext.getDefault().waitForAnyFlag(
new String[]{"step1"},
new String[]{"step2"}
);Schedule tasks with fine-grained control:
// Execute exactly N times
SchedulingTask.getDefault().scheduleTaskNTimes(
5, // execute 5 times
100, // initial delay 100ms
500, // interval 500ms
TimeUnit.MILLISECONDS,
() -> sendHeartbeat()
);
// Exponential backoff for polling
SchedulingTask.getDefault().scheduleTaskWithBackoff(
100, // initial delay 100ms
10000, // max delay 10 seconds
2.0, // double delay each time
TimeUnit.MILLISECONDS,
() -> checkForUpdates(),
"pollingTask"
);
// Schedule supplier N times and collect results
Stream<Status> statuses = SchedulingSupplier.getDefault()
.scheduleSupplierNTimes(
10, // collect 10 samples
0, // start immediately
1000, // every 1 second
TimeUnit.MILLISECONDS,
() -> getSystemStatus()
);
List<Status> collectedStatuses = statuses.collect(Collectors.toList());public Optional<UserData> fetchUserDataResilient(String userId) {
try {
// Combine retry, timeout, and fallback
Optional<UserData> userData = AsyncSupplier.getDefault()
.submitAndGetWithTimeout(() -> {
Supplier<UserData> resilient = AsyncSupplier.getDefault()
.submitSupplierWithRetry(() -> {
return apiClient.getUserData(userId);
}, 3, 500); // 3 retries, 500ms delay
return resilient.get();
}, 10, TimeUnit.SECONDS); // 10 second total timeout
return userData;
} catch (Exception e) {
logger.error("Failed to fetch user data", e);
return Optional.empty();
}
}