Executors, Futures & Starvation
Blocking Work Inside parallelStream / ForkJoinPool
Blocking Work Inside parallelStream / ForkJoinPool: practice a Java concurrency bug with symptoms like Throughput collapse, Parallel work underperforms,...
- Work-stealing pitfalls
- ForkJoinPool
- parallelStream
- Java
- Intermediate
Production symptoms
- Throughput collapse
- Parallel work underperforms
- Latency spikes
Failure scenario
Code
List<Response> responses = ids.parallelStream()
.map(id -> callRemoteLikeThing(id))
.collect(toList());
Prod Symptoms
A request path uses parallelStream for many HTTP, database, filesystem, or rate-limited calls. Under load, the work finishes in slow waves instead of smooth parallel progress.
Key signal: parallelStream is a poor fit for unbounded blocking work on the shared common pool.
- Latency arrives in batches tied to common-pool parallelism
- ForkJoin common-pool workers spend time waiting on external work
- Unrelated code using parallel streams slows down during the same window
- CPU is not maxed because the workers are waiting on external work
- Increasing input size produces tail-latency spikes rather than proportional speedup
Run Locally
- Elapsed time tends to arrive in batches tied to common-pool parallelism
- Workers are not doing CPU work; they are sleeping as a stand-in for I/O
- Reducing common-pool parallelism makes the batching easier to see
Inspect hints
- Thread dumps during the sleep show ForkJoinPool.commonPool-worker threads in TIMED_WAITING
- If production has real I/O, inspect network/client waits rather than CPU hotspots
javac BlockingParallelStreamDemo.java
java BlockingParallelStreamDemo
jps
jcmd <pid> Thread.print
java -Djava.util.concurrent.ForkJoinPool.common.parallelism=2 BlockingParallelStreamDemo
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class BlockingParallelStreamDemo {
public static void main(String[] args) {
int parallelism = ForkJoinPool.getCommonPoolParallelism();
int tasks = Math.max(16, parallelism * 4);
List<Integer> ids = IntStream.range(0, tasks)
.boxed()
.collect(Collectors.toList());
long start = System.nanoTime();
List<String> results = ids.parallelStream()
.map(BlockingParallelStreamDemo::callRemoteLikeThing)
.collect(Collectors.toList());
long elapsedMillis = (System.nanoTime() - start) / 1_000_000;
System.out.println("common pool parallelism = " + parallelism);
System.out.println("tasks = " + results.size());
System.out.println("elapsed ms = " + elapsedMillis);
}
private static String callRemoteLikeThing(int id) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "ok-" + id;
}
}
Note: Thread.sleep stands in for blocking external I/O. Exact timings depend on common-pool parallelism.
Diagnosis and fix
Explanation
ForkJoinPool works best when tasks split CPU work and keep workers available for stealing.
Key signal: Use explicit concurrency control for blocking work instead of borrowing the shared common pool.
- Blocking calls occupy workers without using CPU
- The common pool has limited parallelism
- Other parallel streams share that same common pool by default
- Blocking inside the pool can reduce effective parallelism for unrelated work
- The issue is workload mismatch and shared-capacity exhaustion, not incorrect results
How to Diagnose
Measure elapsed time, common-pool occupancy, and downstream wait time while the work is in progress.
- Look for ForkJoinPool.commonPool-worker threads blocked, waiting, or sleeping
- Check whether the mapped function performs I/O, sleeps, waits on futures, or takes locks
- Compare elapsed time with expected external-call latency and common-pool parallelism
- Look for unrelated parallel streams slowing down during the same window
- Correlate with downstream client metrics such as connection-pool waits, timeouts, and rate limits
- Use load tests because a single local run may not reveal shared common-pool interference
jps
jcmd <pid> Thread.print
jstack <pid>
"ForkJoinPool.commonPool-worker-1" #... TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at BlockingParallelStreamDemo.callRemoteLikeThing(BlockingParallelStreamDemo.java:...)
Note: Sleeping in the demo represents real waits such as HTTP, database, filesystem, or rate-limited APIs.
How to Fix
- Use a dedicated ExecutorService, async client, or explicit bulkhead for blocking work
- Size concurrency from expected blocking latency and downstream capacity, not CPU count alone
- Keep parallelStream for bounded CPU work with small, non-blocking operations
- Apply timeouts, cancellation, and backpressure around real external calls
- Do not globally tune ForkJoinPool.common.parallelism as a local fix for one blocking path
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class BlockingWorkDedicatedExecutorFixed {
public static void main(String[] args) {
List<Integer> ids = IntStream.range(0, 32)
.boxed()
.collect(Collectors.toList());
ExecutorService blockingPool = Executors.newFixedThreadPool(16);
try {
long start = System.nanoTime();
List<CompletableFuture<String>> futures = new ArrayList<>();
for (Integer id : ids) {
futures.add(CompletableFuture.supplyAsync(
() -> callRemoteLikeThing(id),
blockingPool));
}
List<String> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
long elapsedMillis = (System.nanoTime() - start) / 1_000_000;
System.out.println("tasks = " + results.size());
System.out.println("elapsed ms = " + elapsedMillis);
} finally {
blockingPool.shutdown();
}
}
private static String callRemoteLikeThing(int id) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "ok-" + id;
}
}
Note: A dedicated pool makes blocking capacity explicit. In production, pair it with queue limits and downstream protection.