Broken AtomicLong Watermark Update
Broken AtomicLong Watermark Update: practice a Java concurrency bug with symptoms like Watermark moves backward, Atomic field still wrong, No exception....
- Read-modify-write races
- Atomicity
- Atomics
- Java
- Intermediate
Production symptoms
- Watermark moves backward
- Atomic field still wrong
- No exception
Failure scenario
Code
class WatermarkTracker {
private final AtomicLong highestOffset = new AtomicLong();
void record(long offset) {
long current = highestOffset.get();
if (offset > current) {
highestOffset.set(offset);
}
}
}
Prod Symptoms
A service tracks the highest processed offset, event timestamp, version, or checkpoint watermark with AtomicLong. Under concurrency, the stored value sometimes regresses even though each individual AtomicLong operation is thread-safe.
- Logs or metrics show a lower watermark written after a higher one
- Lag, replay, or checkpoint state contradicts broker offsets, database rows, or audit records
- The code looks safe because the field is AtomicLong
- The issue appears only when lower and higher values are recorded concurrently
- No exception is thrown and thread dumps usually look normal
Run Locally
- offset-100 reads the old highest value first
- offset-200 records the newer highest value
- offset-100 resumes and calls set(100) based on its stale check
- The final value is lower than the true highest offset
Inspect hints
- Search for AtomicLong.get() followed by set() inside a conditional update
- Look for max, min, watermark, timestamp, quota, or sequence logic implemented as separate atomic operations
- Thread dumps rarely prove this race after the stale decision has already been applied
javac AtomicLongGetSetRaceDemo.java
java AtomicLongGetSetRaceDemo
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
public class AtomicLongGetSetRaceDemo {
private static final AtomicLong highestOffset = new AtomicLong();
private static final CountDownLatch lowThreadRead = new CountDownLatch(1);
private static final CountDownLatch highThreadStored = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
Thread low = new Thread(AtomicLongGetSetRaceDemo::recordOffset100, "offset-100");
Thread high = new Thread(AtomicLongGetSetRaceDemo::recordOffset200, "offset-200");
low.start();
high.start();
low.join();
high.join();
System.out.println("expected highest = 200");
System.out.println("actual highest = " + highestOffset.get());
}
private static void recordOffset100() {
try {
long current = highestOffset.get();
if (100 > current) {
lowThreadRead.countDown();
highThreadStored.await();
highestOffset.set(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void recordOffset200() {
try {
lowThreadRead.await();
long current = highestOffset.get();
if (200 > current) {
highestOffset.set(200);
}
highThreadStored.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Note: The lower-offset thread reads first, waits, then overwrites the higher value with a stale decision.
Diagnosis and fix
Explanation
The bug is the gap between get() and set().
Key signal: Atomic variable does not mean atomic workflow.
- One thread reads the current value and decides its offset should be stored
- Another thread can store a higher value before the first thread calls set()
- The first thread then overwrites the higher value using an old decision
- The AtomicLong did not fail; the algorithm split one logical update into multiple operations
- Use a compound atomic operation when the decision and the write must happen together
How to Diagnose
Look for monotonic values that contradict production evidence.
- Compare the in-process watermark with broker offsets, persisted checkpoints, audit records, or event timestamps
- Search for AtomicLong or AtomicInteger code that reads, branches, and later calls set()
- Look for logs where a lower value is stored after a higher value
- Reproduce with stress tests or deterministic latches around the get/set window
- Do not expect thread dumps, JFR, or a profiler to prove the lost interleaving after the fact
expected highest = 200
actual highest = 100
How to Fix
- Use accumulateAndGet(value, Math::max) for max-style watermarks
- Use updateAndGet(...) when the new value is derived from the current value
- Use compareAndSet in a retry loop for custom conditional updates
- Keep update functions side-effect free because they may be retried
- Do not use get() followed by set() for a conditional update that must be atomic
- For persisted or cross-pod watermarks, use a durable atomic claim or update: conditional SQL update, optimistic locking, Redis script, or a single-writer state store
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
public class AtomicLongWatermarkFixed {
private static final AtomicLong highestOffset = new AtomicLong();
public static void main(String[] args) throws Exception {
CountDownLatch start = new CountDownLatch(1);
Thread low = new Thread(() -> recordAfterStart(start, 100), "offset-100");
Thread high = new Thread(() -> recordAfterStart(start, 200), "offset-200");
low.start();
high.start();
start.countDown();
low.join();
high.join();
System.out.println("expected highest = 200");
System.out.println("actual highest = " + highestOffset.get());
}
private static void recordAfterStart(CountDownLatch start, long offset) {
try {
start.await();
highestOffset.accumulateAndGet(offset, Math::max);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Note: accumulateAndGet makes the read, comparison, and write one atomic update operation.