Messaging Advanced 10 min

Idempotent Workers with NATS KV CAS

How compare-and-swap semantics in NATS KV can prevent duplicate event processing across multiple worker instances.

By Victor Robin Updated:

When I first deployed multiple instances of our OCR worker on k3s, I noticed something troubling in the logs: the same document was being processed two or three times within seconds. The JetStream consumer was doing its job — delivering at least once — but “at least once” quickly translates to “definitely more than once” during pod restarts and network hiccups. I tried solving it with a database unique constraint, but that added latency and another failure mode. Switching to NATS KV with CAS for deduplication was the turning point: it gave me atomic claim semantics without leaving the NATS ecosystem, and the stuck-event recovery mechanism meant I no longer had to manually clean up after crashed workers.

When you run multiple instances of a background worker consuming the same event stream, you quickly discover that “at-least-once” delivery means “probably-more-than-once” processing. This article shows how to use NATS KV with compare-and-swap (CAS) semantics to build idempotent event processing that works reliably across a distributed deployment.

The Duplicate Processing Problem

Most messaging systems guarantee at-least-once delivery. In practice, this means your consumer will receive the same message more than once — during rebalances, network partitions, or crashes mid-processing. Without deduplication, this leads to:

  • Documents processed twice, wasting compute
  • Duplicate side effects (emails, notifications, writes)
  • Inconsistent state across services

The usual solutions — database unique constraints or Redis locks — add external dependencies. If you already run NATS JetStream, you can use NATS KV as a lightweight distributed lock with CAS semantics.

[Idempotent Receiver Pattern] — Gregor Hohpe , 2023-11-01

Design: A Three-State Machine

Each event is tracked through a simple state machine stored as a NATS KV entry:

┌─────────────┐     ┌──────────────┐     ┌─────────────┐
│ (no entry)  │────▶│ in_progress  │────▶│  completed   │
└─────────────┘     └──────────────┘     └─────────────┘


                    ┌─────────────┐
                    │   failed    │
                    └─────────────┘

Key: {documentId}:{eventType} — for example, doc-abc123:ocr

  • No entry → event never seen before
  • in_progress → a worker claimed it
  • completed → processing finished successfully
  • failed → processing failed, eligible for retry
[Compare-and-Swap] — Wikipedia Contributors , 2024-02-10

NATS KV CAS in Practice

The critical operation is Create — it atomically sets a key only if it does not already exist. This is atomic across all NATS cluster nodes, so two workers racing to claim the same event will have exactly one winner.

EventDeduplicationService.cs
public class NatsKvEventDeduplicationService : IEventDeduplicationService
{
    private readonly INatsKVStore _store;
    private readonly TimeSpan _stuckTimeout = TimeSpan.FromMinutes(10);

    public async Task<bool> TryStartProcessingAsync(
        string eventKey, string eventType, CancellationToken ct)
    {
        var key = $"{eventKey}:{eventType}";
        try
        {
            // Create fails if key already exists — atomic CAS
            await _store.CreateAsync(key, "in_progress", ct);
            return true;
        }
        catch (NatsKVCreateException)
        {
            // Key exists — check if it's stuck
            var entry = await _store.GetEntryAsync<string>(key, ct);
            if (entry.Value == "in_progress"
                && DateTime.UtcNow - entry.Created > _stuckTimeout)
            {
                // Use Update with revision for CAS retry
                await _store.UpdateAsync(key, "in_progress",
                    entry.Revision, ct);
                return true;
            }
            return false; // Already processed or in progress
        }
    }

    public async Task MarkCompletedAsync(
        string eventKey, string eventType, CancellationToken ct)
    {
        var key = $"{eventKey}:{eventType}";
        var entry = await _store.GetEntryAsync<string>(key, ct);
        await _store.UpdateAsync(key, "completed", entry.Revision, ct);
    }

    public async Task MarkFailedAsync(
        string eventKey, string eventType, CancellationToken ct)
    {
        var key = $"{eventKey}:{eventType}";
        var entry = await _store.GetEntryAsync<string>(key, ct);
        await _store.UpdateAsync(key, "failed", entry.Revision, ct);
    }
}
[NATS Key/Value Store] — NATS Authors , 2024-03-20

Setting Up the KV Bucket

The bucket is configured once at startup with a TTL that automatically cleans up old entries:

Program.cs
var kvContext = new NatsKVContext(new NatsJSContext(natsConnection));
var store = await kvContext.CreateStoreAsync(new NatsKVConfig("event-dedup")
{
    MaxAge = TimeSpan.FromHours(24), // Auto-expire after 24h
    Storage = NatsKVStorageType.File, // Survive restarts
    Replicas = 3,                     // Match cluster size
});

The 24-hour TTL is essential — without it, the bucket grows unbounded. Events older than 24 hours are assumed to have either completed or need fresh processing anyway.

Worker Integration Pattern

In your message consumer, wrap every handler in the deduplication guard:

OcrWorker.cs
public class OcrWorker : BackgroundService
{
    private readonly IEventDeduplicationService _dedup;

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        await foreach (var msg in _consumer.ConsumeAsync<OcrEvent>(ct))
        {
            string? eventKey = null;
            try
            {
                eventKey = $"{msg.Data.DocumentId}:{msg.Data.ObjectKey}";

                if (!await _dedup.TryStartProcessingAsync(
                    eventKey, "ocr", ct))
                {
                    await msg.AckAsync(ct); // Skip duplicate
                    continue;
                }

                await ProcessOcrAsync(msg.Data, ct);
                await _dedup.MarkCompletedAsync(eventKey, "ocr", ct);
                await msg.AckAsync(ct);
            }
            catch (Exception ex)
            {
                if (eventKey != null)
                    await _dedup.MarkFailedAsync(eventKey, "ocr", ct);
                // NakAsync triggers redelivery after backoff
                await msg.NakAsync(delay: TimeSpan.FromSeconds(30), ct);
            }
        }
    }
}

Handling Stuck Events

Events stuck in in_progress for longer than the timeout (default: 10 minutes) are considered abandoned — the worker probably crashed. The TryStartProcessingAsync method automatically reclaims these using CAS:

  1. Read the current entry and its revision number
  2. Verify the value is in_progress and older than the timeout
  3. Atomically update it using UpdateAsync with the revision — this fails if another worker already reclaimed it

This gives you distributed lock recovery without a separate reaper process.

[Distributed Locking] — Martin Kleppmann , 2016-02-08

Testing

For unit tests, swap in an in-memory implementation:

InMemoryEventDeduplicationService.cs
public class InMemoryEventDeduplicationService : IEventDeduplicationService
{
    private readonly ConcurrentDictionary<string, string> _states = new();

    public Task<bool> TryStartProcessingAsync(
        string eventKey, string eventType, CancellationToken ct)
    {
        var key = $"{eventKey}:{eventType}";
        return Task.FromResult(
            _states.TryAdd(key, "in_progress"));
    }

    public Task MarkCompletedAsync(
        string eventKey, string eventType, CancellationToken ct)
    {
        _states[$"{eventKey}:{eventType}"] = "completed";
        return Task.CompletedTask;
    }

    public Task MarkFailedAsync(
        string eventKey, string eventType, CancellationToken ct)
    {
        _states[$"{eventKey}:{eventType}"] = "failed";
        return Task.CompletedTask;
    }
}
[Testing Distributed Systems] — Andrey Satarin , 2024-01-20

Configuration

{
  "Deduplication": {
    "NatsKv": {
      "Enabled": true,
      "BucketName": "event-dedup",
      "TtlHours": 24,
      "StuckTimeoutMinutes": 10
    }
  }
}

Set Enabled: false in local development to use the in-memory fallback.

Key Takeaways

  • CAS is your friend: Create for first-claim atomicity, Update with revision for safe reclaim
  • TTL prevents unbounded growth: 24-hour expiry keeps the bucket manageable
  • Stuck detection: 10-minute timeout + CAS reclaim eliminates the need for a reaper
  • Interface abstraction: Swap NATS KV for in-memory during testing

Building idempotent workers changed how I think about reliability in our event-driven pipeline. Before CAS-based deduplication, every deployment and every pod restart carried the risk of duplicate processing, and we accepted it as a cost of distributed systems. After implementing this pattern, duplicate processing dropped to effectively zero, and the operational burden of investigating “why did this document get processed twice” disappeared entirely. The upfront investment in the deduplication layer paid for itself within the first week of production use.

Next Steps

  • Add metrics tracking for deduplication hit rates to understand how often duplicates are actually caught
  • Implement a dead letter inspection tool that queries the KV bucket for events stuck in the failed state
  • Explore NATS JetStream’s built-in message deduplication window as a complementary first-pass filter
  • Build integration tests that simulate concurrent worker claims using multiple NATS connections

Further Reading

[NATS KV Documentation] — NATS Authors , 2024 [Idempotent Consumer Pattern] — Enterpriseintegrationpatterns , 2024 [How to Do Distributed Locking] — Martin , 2024 [NATS JetStream Message Deduplication] — NATS Authors , 2024