The Transactional Outbox Pattern with NATS
Achieve 100% reliable messaging by atomically committing state changes and events. A deep dive into the Outbox Pattern with EF Core and NATS utilizing background relay workers.
Introduction
In distributed systems, the “dual-write problem” is a classic villain. It occurs when a service needs to do two things:
- Update its local database (e.g., “Create Order”).
- Publish an event to a message broker (e.g., “OrderCreated” to NATS).
If you do step 1 and step 2 fails, your system is inconsistent (Order exists, but no one knows). If you do step 2 first and step 1 fails, downstream services process a ghost order.
Atomic transactions across purely distributed resources (Two-Phase Commit) are heavy and brittle. The Transactional Outbox Pattern offers an elegant solution by turning the database into a temporary queue.
Why Use the Outbox Pattern:
- Atomicity: The business data and the event are saved in the same database transaction. They either both succeed or both fail.
- Resilience: Even if the message broker (NATS) is down, the event is safely stored in your database and will be sent later.
- Ordering: Events can be processed in the exact order they were committed.
What We’ll Build
We will implement a robust Outbox mechanism in a .NET application.
- Outbox Entity: A database table to store pending messages.
- Atomic Commit: Storing the domain entity and the outbox message in one EF Core
SaveChanges(). - The Relay Worker: A background service that polls the outbox and publishes to NATS.
Architecture Overview
Instead of publishing directly to NATS during the request, we write to an OutboxMessages table. A background process pick these up and sends them.
flowchart LR
Api[API Request] -->|1. Transaction| DB[(PostgreSQL)]
subgraph DB_Transaction
Table1[Orders Table]
Table2[Outbox Table]
end
Api -- Writes --> Table1
Api -- Writes --> Table2
Relay[Background Worker] -->|2. Poll| Table2
Relay -->|3. Publish| NATS[NATS JetStream]
NATS -->Consumer[Downstream Service]
classDef primary fill:#7c3aed,color:#fff
classDef secondary fill:#06b6d4,color:#fff
classDef db fill:#f43f5e,color:#fff
class Api,Relay,Consumer primary
class NATS db
class DB,Table1,Table2 db
Section 1: The Outbox Entity
First, we define the schema for our stored messages. This table acts as our “durability buffer.”
public class OutboxMessage
{
public Guid Id { get; set; } = Guid.NewGuid();
public string Type { get; set; } = string.Empty; // e.g., "OrderCreated"
public string Payload { get; set; } = string.Empty; // JSON Content
public string Subject { get; set; } = string.Empty; // NATS Subject
public DateTime OccurredOnUtc { get; set; } = DateTime.UtcNow;
public DateTime? ProcessedOnUtc { get; set; }
public string? Error { get; set; }
}
Section 2: Atomic Retrieval & Save
In your Command Handler (using MediatR or FastEndpoints), you simply add the event to the DbSet alongside your entity.
public async Task Handle(CreateOrderCommand command, CancellationToken ct)
{
using var transaction = _dbContext.Database.BeginTransaction();
// 1. Domain Logic
var order = Order.Create(command.Product, command.Quantity);
_dbContext.Orders.Add(order);
// 2. Prepare Event
var evt = new OrderCreatedEvent(order.Id);
var outboxMsg = new OutboxMessage
{
Type = nameof(OrderCreatedEvent),
Subject = $"orders.{order.Id}.created",
Payload = JsonSerializer.Serialize(evt)
};
_dbContext.OutboxMessages.Add(outboxMsg);
// 3. Atomic Commit
await _dbContext.SaveChangesAsync(ct);
await transaction.CommitAsync(ct);
}
At this point, even if the server crashes immediately after commit, the event is safe on disk.
Section 3: The Relay Worker
A BackgroundService runs continuously to flush the outbox.
public class OutboxRelayWorker : BackgroundService
{
private readonly IServiceProvider _provider;
private readonly INatsJSContext _nats;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await ProcessBatchAsync(stoppingToken);
await Task.Delay(1000, stoppingToken); // Poll interval
}
}
private async Task ProcessBatchAsync(CancellationToken ct)
{
using var scope = _provider.CreateScope();
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
// Fetch pending messages
var messages = await db.OutboxMessages
.Where(m => m.ProcessedOnUtc == null)
.OrderBy(m => m.OccurredOnUtc)
.Take(50)
.ToListAsync(ct);
foreach (var msg in messages)
{
try
{
// Publish to NATS
await _nats.PublishAsync(msg.Subject, msg.Payload);
// Mark as processed
msg.ProcessedOnUtc = DateTime.UtcNow;
}
catch (Exception ex)
{
// Log and optionally retry later
msg.Error = ex.Message;
}
}
await db.SaveChangesAsync(ct);
}
}
Conclusion
The Transactional Outbox pattern decouples your database transaction from your message broker availability. While it adds moving parts (the relay worker), it is essential for systems where data integrity cannot be compromised.
In the BlueRobin architecture, we rely on this pattern for all critical events, ensuring that even if NATS undergoes maintenance or a network partition occurs, no data is ever lost.
Next Steps:
- Explore [Idempotency Consumers] to handle duplicate messages.
- Look into CDC (Change Data Capture) tools like Debezium for a zero-code outbox implementation.