JMM, Visibility & Atomicity

Broken AtomicLong Watermark Update

Broken AtomicLong Watermark Update: practice a Java concurrency bug with symptoms like Watermark moves backward, Atomic field still wrong, No exception....

  • Read-modify-write races
  • Atomicity
  • Atomics
  • Java
  • Intermediate

Production symptoms

  • Watermark moves backward
  • Atomic field still wrong
  • No exception

Failure scenario

Code

Java example
class WatermarkTracker {
    private final AtomicLong highestOffset = new AtomicLong();

    void record(long offset) {
        long current = highestOffset.get();
        if (offset > current) {
            highestOffset.set(offset);
        }
    }
}

Prod Symptoms

A service tracks the highest processed offset, event timestamp, version, or checkpoint watermark with AtomicLong. Under concurrency, the stored value sometimes regresses even though each individual AtomicLong operation is thread-safe.

  • Logs or metrics show a lower watermark written after a higher one
  • Lag, replay, or checkpoint state contradicts broker offsets, database rows, or audit records
  • The code looks safe because the field is AtomicLong
  • The issue appears only when lower and higher values are recorded concurrently
  • No exception is thrown and thread dumps usually look normal

Run Locally

  • offset-100 reads the old highest value first
  • offset-200 records the newer highest value
  • offset-100 resumes and calls set(100) based on its stale check
  • The final value is lower than the true highest offset

Inspect hints

  • Search for AtomicLong.get() followed by set() inside a conditional update
  • Look for max, min, watermark, timestamp, quota, or sequence logic implemented as separate atomic operations
  • Thread dumps rarely prove this race after the stale decision has already been applied
Run
javac AtomicLongGetSetRaceDemo.java
java AtomicLongGetSetRaceDemo
AtomicLongGetSetRaceDemo.java
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

public class AtomicLongGetSetRaceDemo {
    private static final AtomicLong highestOffset = new AtomicLong();
    private static final CountDownLatch lowThreadRead = new CountDownLatch(1);
    private static final CountDownLatch highThreadStored = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        Thread low = new Thread(AtomicLongGetSetRaceDemo::recordOffset100, "offset-100");
        Thread high = new Thread(AtomicLongGetSetRaceDemo::recordOffset200, "offset-200");

        low.start();
        high.start();

        low.join();
        high.join();

        System.out.println("expected highest = 200");
        System.out.println("actual highest   = " + highestOffset.get());
    }

    private static void recordOffset100() {
        try {
            long current = highestOffset.get();
            if (100 > current) {
                lowThreadRead.countDown();
                highThreadStored.await();
                highestOffset.set(100);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private static void recordOffset200() {
        try {
            lowThreadRead.await();
            long current = highestOffset.get();
            if (200 > current) {
                highestOffset.set(200);
            }
            highThreadStored.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Note: The lower-offset thread reads first, waits, then overwrites the higher value with a stale decision.

Diagnosis and fix

Explanation

The bug is the gap between get() and set().

Key signal: Atomic variable does not mean atomic workflow.

  • One thread reads the current value and decides its offset should be stored
  • Another thread can store a higher value before the first thread calls set()
  • The first thread then overwrites the higher value using an old decision
  • The AtomicLong did not fail; the algorithm split one logical update into multiple operations
  • Use a compound atomic operation when the decision and the write must happen together

How to Diagnose

Look for monotonic values that contradict production evidence.

  • Compare the in-process watermark with broker offsets, persisted checkpoints, audit records, or event timestamps
  • Search for AtomicLong or AtomicInteger code that reads, branches, and later calls set()
  • Look for logs where a lower value is stored after a higher value
  • Reproduce with stress tests or deterministic latches around the get/set window
  • Do not expect thread dumps, JFR, or a profiler to prove the lost interleaving after the fact
Typical output
expected highest = 200
actual highest   = 100

How to Fix

  • Use accumulateAndGet(value, Math::max) for max-style watermarks
  • Use updateAndGet(...) when the new value is derived from the current value
  • Use compareAndSet in a retry loop for custom conditional updates
  • Keep update functions side-effect free because they may be retried
  • Do not use get() followed by set() for a conditional update that must be atomic
  • For persisted or cross-pod watermarks, use a durable atomic claim or update: conditional SQL update, optimistic locking, Redis script, or a single-writer state store
AtomicLongWatermarkFixed.java
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

public class AtomicLongWatermarkFixed {
    private static final AtomicLong highestOffset = new AtomicLong();

    public static void main(String[] args) throws Exception {
        CountDownLatch start = new CountDownLatch(1);

        Thread low = new Thread(() -> recordAfterStart(start, 100), "offset-100");
        Thread high = new Thread(() -> recordAfterStart(start, 200), "offset-200");

        low.start();
        high.start();
        start.countDown();

        low.join();
        high.join();

        System.out.println("expected highest = 200");
        System.out.println("actual highest   = " + highestOffset.get());
    }

    private static void recordAfterStart(CountDownLatch start, long offset) {
        try {
            start.await();
            highestOffset.accumulateAndGet(offset, Math::max);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

Note: accumulateAndGet makes the read, comparison, and write one atomic update operation.