Background Job Processing in .NET
Implement reliable background job processing with hosted services, channels, and distributed task queues in .NET applications.
When I first configured background services in our .NET application, graceful shutdown was the issue that bit me hardest. During local development everything looked fine, but once we deployed to Kubernetes and started rolling updates, we discovered that in-flight jobs were being silently killed mid-execution. Documents would end up in a half-processed state with no record of what went wrong. It took three separate incidents before I properly understood the relationship between CancellationToken, the host shutdown timeout, and the need to catch OperationCanceledException at exactly the right level. This article captures all of those lessons so you can avoid the same pain.
Introduction
Background job processing enables applications to handle long-running tasks asynchronously without blocking the main request thread. This is critical for keeping APIs responsive while performing heavy operations like document analysis, video transcoding, or sending emails.
[Background tasks with hosted services in ASP.NET Core] — Microsoft , 2024-11-15This guide covers implementing robust background processing patterns in .NET using native BackgroundService, local channels, and distributed message queues.
What We’ll Build
- Hosted Service: A simple loop for periodic tasks.
- Channel Queue: An in-memory producer-consumer queue.
- Job Handlers: A pattern for decoupling job logic from the execution engine.
Architecture Overview
flowchart LR
API["🚀 API\n(User Request)"] -->|Enqueues| Queue["📥 Job Queue\n(Channel/NATS)"]
subgraph Worker["⚙️ Worker Service"]
Queue -->|Dequeues| Handler["👷 Job Handler"]
Handler -->|Updates| DB[(Database)]
Handler -->|Publishes| Event["📢 Event\n(Completed)"]
end
classDef primary fill:#7c3aed,color:#fff
classDef secondary fill:#06b6d4,color:#fff
classDef db fill:#f43f5e,color:#fff
classDef warning fill:#fbbf24,color:#000
class API,Worker primary
class DB db
Implementation
Hosted Services
Basic Background Service
// Workers/Services/DocumentProcessingService.cs
public sealed class DocumentProcessingService : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<DocumentProcessingService> _logger;
public DocumentProcessingService(
IServiceScopeFactory scopeFactory,
ILogger<DocumentProcessingService> logger)
{
_scopeFactory = scopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Document processing service starting");
while (!stoppingToken.IsCancellationRequested)
{
try
{
await using var scope = _scopeFactory.CreateAsyncScope();
var processor = scope.ServiceProvider.GetRequiredService<IDocumentProcessor>();
await processor.ProcessPendingDocumentsAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing documents");
}
await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
}
_logger.LogInformation("Document processing service stopping");
}
}
Channel-Based Queue
Job Queue Implementation
// Application/Jobs/IBackgroundJobQueue.cs
public interface IBackgroundJobQueue<T> where T : class
{
ValueTask QueueAsync(T job, CancellationToken ct = default);
ValueTask<T> DequeueAsync(CancellationToken ct);
}
// Infrastructure/Jobs/BackgroundJobQueue.cs
public sealed class BackgroundJobQueue<T> : IBackgroundJobQueue<T> where T : class
{
private readonly Channel<T> _channel;
private readonly ILogger<BackgroundJobQueue<T>> _logger;
public BackgroundJobQueue(ILogger<BackgroundJobQueue<T>> logger, int capacity = 100)
{
_logger = logger;
_channel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false
});
}
public async ValueTask QueueAsync(T job, CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(job);
await _channel.Writer.WriteAsync(job, ct);
_logger.LogDebug("Job queued: {JobType}", typeof(T).Name);
}
public async ValueTask<T> DequeueAsync(CancellationToken ct)
{
var job = await _channel.Reader.ReadAsync(ct);
_logger.LogDebug("Job dequeued: {JobType}", typeof(T).Name);
return job;
}
}
The System.Threading.Channels library provides a high-performance, thread-safe producer-consumer implementation that is ideal for in-process job queues.
Queue Consumer Service
// Workers/Services/QueuedHostedService.cs
public sealed class QueuedHostedService<T> : BackgroundService where T : class
{
private readonly IBackgroundJobQueue<T> _queue;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<QueuedHostedService<T>> _logger;
private readonly int _workerCount;
public QueuedHostedService(
IBackgroundJobQueue<T> queue,
IServiceScopeFactory scopeFactory,
ILogger<QueuedHostedService<T>> logger,
int workerCount = 3)
{
_queue = queue;
_scopeFactory = scopeFactory;
_logger = logger;
_workerCount = workerCount;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting {Count} workers for {Type}", _workerCount, typeof(T).Name);
var workers = Enumerable.Range(0, _workerCount)
.Select(i => ProcessQueueAsync(i, stoppingToken))
.ToArray();
await Task.WhenAll(workers);
}
private async Task ProcessQueueAsync(int workerId, CancellationToken stoppingToken)
{
_logger.LogDebug("Worker {WorkerId} starting", workerId);
while (!stoppingToken.IsCancellationRequested)
{
try
{
var job = await _queue.DequeueAsync(stoppingToken);
await using var scope = _scopeFactory.CreateAsyncScope();
var handler = scope.ServiceProvider.GetRequiredService<IJobHandler<T>>();
await handler.HandleAsync(job, stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Worker {WorkerId} error processing job", workerId);
}
}
_logger.LogDebug("Worker {WorkerId} stopping", workerId);
}
}
Job Handler Pattern
Handler Interface
// Application/Jobs/IJobHandler.cs
public interface IJobHandler<in T> where T : class
{
Task HandleAsync(T job, CancellationToken ct = default);
}
Document Processing Handler
// Application/Jobs/Handlers/DocumentProcessingHandler.cs
public sealed class DocumentProcessingHandler : IJobHandler<DocumentProcessingJob>
{
private readonly IDocumentRepository _repository;
private readonly IOcrService _ocrService;
private readonly IEmbeddingService _embeddingService;
private readonly INatsPublisher _nats;
private readonly ILogger<DocumentProcessingHandler> _logger;
public DocumentProcessingHandler(
IDocumentRepository repository,
IOcrService ocrService,
IEmbeddingService embeddingService,
INatsPublisher nats,
ILogger<DocumentProcessingHandler> logger)
{
_repository = repository;
_ocrService = ocrService;
_embeddingService = embeddingService;
_nats = nats;
_logger = logger;
}
public async Task HandleAsync(DocumentProcessingJob job, CancellationToken ct = default)
{
_logger.LogInformation("Processing document: {DocumentId}", job.DocumentId);
using var activity = ActivitySources.Workers.StartActivity("ProcessDocument");
activity?.SetTag("document.id", job.DocumentId.Value);
try
{
// Perform OCR
var content = await _ocrService.ExtractTextAsync(job.Bucket, job.ObjectKey, ct);
// Generate embeddings
var embeddings = await _embeddingService.GenerateAsync(content, ct);
// Update document
var document = await _repository.GetByIdAsync(job.DocumentId, ct);
document!.SetContent(content, embeddings);
await _repository.UpdateAsync(document, ct);
// Publish completion event
await _nats.PublishAsync(
$"{job.Environment}.archives.documents.analysis.completed",
new DocumentAnalysisCompleted(job.DocumentId, job.OwnerId),
ct);
_logger.LogInformation("Document processed successfully: {DocumentId}", job.DocumentId);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process document: {DocumentId}", job.DocumentId);
throw;
}
}
}
Job Model
// Application/Jobs/Models/DocumentProcessingJob.cs
public sealed record DocumentProcessingJob(
DocumentId DocumentId,
CustomId OwnerId,
string Bucket,
string ObjectKey,
string Environment);
Timed Background Jobs
Scheduled Maintenance Service
// Workers/Services/MaintenanceService.cs
public sealed class MaintenanceService : BackgroundService
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<MaintenanceService> _logger;
private readonly TimeSpan _cleanupInterval = TimeSpan.FromHours(6);
private readonly TimeSpan _healthCheckInterval = TimeSpan.FromMinutes(5);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var tasks = new[]
{
RunPeriodicAsync(CleanupOrphanedFilesAsync, _cleanupInterval, stoppingToken),
RunPeriodicAsync(CheckServiceHealthAsync, _healthCheckInterval, stoppingToken)
};
await Task.WhenAll(tasks);
}
private async Task RunPeriodicAsync(
Func<CancellationToken, Task> action,
TimeSpan interval,
CancellationToken stoppingToken)
{
using var timer = new PeriodicTimer(interval);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await action(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Periodic task failed: {Action}", action.Method.Name);
}
try
{
await timer.WaitForNextTickAsync(stoppingToken);
}
catch (OperationCanceledException)
{
break;
}
}
}
private async Task CleanupOrphanedFilesAsync(CancellationToken ct)
{
_logger.LogInformation("Starting orphaned file cleanup");
await using var scope = _scopeFactory.CreateAsyncScope();
var cleanup = scope.ServiceProvider.GetRequiredService<IOrphanedFileCleanup>();
var deletedCount = await cleanup.CleanupAsync(ct);
_logger.LogInformation("Cleanup complete: {Count} files removed", deletedCount);
}
private async Task CheckServiceHealthAsync(CancellationToken ct)
{
await using var scope = _scopeFactory.CreateAsyncScope();
var healthChecker = scope.ServiceProvider.GetRequiredService<IServiceHealthChecker>();
var results = await healthChecker.CheckAllAsync(ct);
foreach (var (service, healthy) in results)
{
if (!healthy)
{
_logger.LogWarning("Service unhealthy: {Service}", service);
}
}
}
}
The PeriodicTimer was introduced in .NET 6 specifically for this kind of periodic background work, and it avoids the drift issues inherent in a Task.Delay loop.
Retry Policies
Polly Integration
Polly’s resilience pipelines integrate naturally with job handlers, providing exponential backoff and timeout protection for transient failures.
[Polly v8 - Resilience and transient-fault-handling library] — Polly Project , 2024-10-05// Application/Jobs/Handlers/ResilientJobHandler.cs
public sealed class ResilientJobHandler<T> : IJobHandler<T> where T : class
{
private readonly IJobHandler<T> _inner;
private readonly ILogger<ResilientJobHandler<T>> _logger;
private readonly ResiliencePipeline _pipeline;
public ResilientJobHandler(
IJobHandler<T> inner,
ILogger<ResilientJobHandler<T>> logger)
{
_inner = inner;
_logger = logger;
_pipeline = new ResiliencePipelineBuilder()
.AddRetry(new RetryStrategyOptions
{
MaxRetryAttempts = 3,
Delay = TimeSpan.FromSeconds(2),
BackoffType = DelayBackoffType.Exponential,
ShouldHandle = new PredicateBuilder().Handle<Exception>(ex =>
ex is not InvalidOperationException),
OnRetry = args =>
{
_logger.LogWarning(args.Outcome.Exception,
"Retry attempt {Attempt} for {JobType}",
args.AttemptNumber, typeof(T).Name);
return ValueTask.CompletedTask;
}
})
.AddTimeout(TimeSpan.FromMinutes(5))
.Build();
}
public async Task HandleAsync(T job, CancellationToken ct = default)
{
await _pipeline.ExecuteAsync(
async token => await _inner.HandleAsync(job, token),
ct);
}
}
Registration
Service Configuration
// Program.cs
builder.Services.AddSingleton<IBackgroundJobQueue<DocumentProcessingJob>,
BackgroundJobQueue<DocumentProcessingJob>>();
builder.Services.AddScoped<IJobHandler<DocumentProcessingJob>, DocumentProcessingHandler>();
builder.Services.AddHostedService<QueuedHostedService<DocumentProcessingJob>>();
builder.Services.AddHostedService<MaintenanceService>();
// Decorator pattern for retry
builder.Services.Decorate<IJobHandler<DocumentProcessingJob>,
ResilientJobHandler<DocumentProcessingJob>>();
The Scrutor library provides the Decorate extension method used here, which simplifies applying the decorator pattern in the DI container.
Enqueueing Jobs
From API Endpoint
// Api/Endpoints/Documents/UploadDocumentEndpoint.cs
public sealed class UploadDocumentEndpoint : Endpoint<UploadDocumentRequest, DocumentResponse>
{
private readonly IBackgroundJobQueue<DocumentProcessingJob> _jobQueue;
private readonly IDocumentRepository _repository;
public override async Task HandleAsync(UploadDocumentRequest req, CancellationToken ct)
{
// Save document metadata
var document = Document.Create(req.FileName, CurrentUser.CustomId);
await _repository.AddAsync(document, ct);
// Queue background processing
await _jobQueue.QueueAsync(new DocumentProcessingJob(
document.Id,
CurrentUser.CustomId,
$"{Environment}-{CurrentUser.CustomId}",
document.ObjectKey,
Environment), ct);
await SendCreatedAtAsync<GetDocumentEndpoint>(
new { id = document.Id.Value },
document.ToResponse());
}
}
Conclusion
Background processing is one of those areas where the happy path is simple but production resilience requires careful thought. The patterns covered here — BackgroundService for continuous loops, Channel<T> for bounded in-process queues, PeriodicTimer for scheduled work, and the handler-plus-decorator pattern for retry — form a layered approach that has served us well across multiple services. The biggest lesson I took away is that every background job needs an explicit answer to: “What happens if this job is interrupted mid-execution?” If you cannot answer that question for every handler, you are not ready for production.
| Pattern | Use Case | Characteristics |
|---|---|---|
| BackgroundService | Continuous processing | Long-running, singleton |
| Channel Queue | In-process async | Fast, bounded memory |
| Timed Service | Scheduled tasks | Periodic execution |
| Handler + Queue | Decoupled processing | Testable, scalable |
Next Steps
- Error Handling and Resilience Patterns
- Integration Testing with Testcontainers
- Caching Strategies in .NET with Redis