Backend Intermediate 15 min

Background Job Processing in .NET

Implement reliable background job processing with hosted services, channels, and distributed task queues in .NET applications.

By Victor Robin Updated:

When I first configured background services in our .NET application, graceful shutdown was the issue that bit me hardest. During local development everything looked fine, but once we deployed to Kubernetes and started rolling updates, we discovered that in-flight jobs were being silently killed mid-execution. Documents would end up in a half-processed state with no record of what went wrong. It took three separate incidents before I properly understood the relationship between CancellationToken, the host shutdown timeout, and the need to catch OperationCanceledException at exactly the right level. This article captures all of those lessons so you can avoid the same pain.

Introduction

Background job processing enables applications to handle long-running tasks asynchronously without blocking the main request thread. This is critical for keeping APIs responsive while performing heavy operations like document analysis, video transcoding, or sending emails.

[Background tasks with hosted services in ASP.NET Core] — Microsoft , 2024-11-15

This guide covers implementing robust background processing patterns in .NET using native BackgroundService, local channels, and distributed message queues.

What We’ll Build

  1. Hosted Service: A simple loop for periodic tasks.
  2. Channel Queue: An in-memory producer-consumer queue.
  3. Job Handlers: A pattern for decoupling job logic from the execution engine.

Architecture Overview

flowchart LR
    API["🚀 API\n(User Request)"] -->|Enqueues| Queue["📥 Job Queue\n(Channel/NATS)"]

    subgraph Worker["⚙️ Worker Service"]
        Queue -->|Dequeues| Handler["👷 Job Handler"]
        Handler -->|Updates| DB[(Database)]
        Handler -->|Publishes| Event["📢 Event\n(Completed)"]
    end

    classDef primary fill:#7c3aed,color:#fff
    classDef secondary fill:#06b6d4,color:#fff
    classDef db fill:#f43f5e,color:#fff
    classDef warning fill:#fbbf24,color:#000

    class API,Worker primary
    class DB db

Implementation

Hosted Services

Basic Background Service

// Workers/Services/DocumentProcessingService.cs
public sealed class DocumentProcessingService : BackgroundService
{
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly ILogger<DocumentProcessingService> _logger;

    public DocumentProcessingService(
        IServiceScopeFactory scopeFactory,
        ILogger<DocumentProcessingService> logger)
    {
        _scopeFactory = scopeFactory;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Document processing service starting");

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await using var scope = _scopeFactory.CreateAsyncScope();
                var processor = scope.ServiceProvider.GetRequiredService<IDocumentProcessor>();

                await processor.ProcessPendingDocumentsAsync(stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing documents");
            }

            await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
        }

        _logger.LogInformation("Document processing service stopping");
    }
}

Channel-Based Queue

Job Queue Implementation

// Application/Jobs/IBackgroundJobQueue.cs
public interface IBackgroundJobQueue<T> where T : class
{
    ValueTask QueueAsync(T job, CancellationToken ct = default);
    ValueTask<T> DequeueAsync(CancellationToken ct);
}

// Infrastructure/Jobs/BackgroundJobQueue.cs
public sealed class BackgroundJobQueue<T> : IBackgroundJobQueue<T> where T : class
{
    private readonly Channel<T> _channel;
    private readonly ILogger<BackgroundJobQueue<T>> _logger;

    public BackgroundJobQueue(ILogger<BackgroundJobQueue<T>> logger, int capacity = 100)
    {
        _logger = logger;
        _channel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait,
            SingleReader = false,
            SingleWriter = false
        });
    }

    public async ValueTask QueueAsync(T job, CancellationToken ct = default)
    {
        ArgumentNullException.ThrowIfNull(job);

        await _channel.Writer.WriteAsync(job, ct);
        _logger.LogDebug("Job queued: {JobType}", typeof(T).Name);
    }

    public async ValueTask<T> DequeueAsync(CancellationToken ct)
    {
        var job = await _channel.Reader.ReadAsync(ct);
        _logger.LogDebug("Job dequeued: {JobType}", typeof(T).Name);
        return job;
    }
}

The System.Threading.Channels library provides a high-performance, thread-safe producer-consumer implementation that is ideal for in-process job queues.

[System.Threading.Channels] — Microsoft , 2024-08-20

Queue Consumer Service

// Workers/Services/QueuedHostedService.cs
public sealed class QueuedHostedService<T> : BackgroundService where T : class
{
    private readonly IBackgroundJobQueue<T> _queue;
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly ILogger<QueuedHostedService<T>> _logger;
    private readonly int _workerCount;

    public QueuedHostedService(
        IBackgroundJobQueue<T> queue,
        IServiceScopeFactory scopeFactory,
        ILogger<QueuedHostedService<T>> logger,
        int workerCount = 3)
    {
        _queue = queue;
        _scopeFactory = scopeFactory;
        _logger = logger;
        _workerCount = workerCount;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Starting {Count} workers for {Type}", _workerCount, typeof(T).Name);

        var workers = Enumerable.Range(0, _workerCount)
            .Select(i => ProcessQueueAsync(i, stoppingToken))
            .ToArray();

        await Task.WhenAll(workers);
    }

    private async Task ProcessQueueAsync(int workerId, CancellationToken stoppingToken)
    {
        _logger.LogDebug("Worker {WorkerId} starting", workerId);

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var job = await _queue.DequeueAsync(stoppingToken);

                await using var scope = _scopeFactory.CreateAsyncScope();
                var handler = scope.ServiceProvider.GetRequiredService<IJobHandler<T>>();

                await handler.HandleAsync(job, stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Worker {WorkerId} error processing job", workerId);
            }
        }

        _logger.LogDebug("Worker {WorkerId} stopping", workerId);
    }
}

Job Handler Pattern

Handler Interface

// Application/Jobs/IJobHandler.cs
public interface IJobHandler<in T> where T : class
{
    Task HandleAsync(T job, CancellationToken ct = default);
}

Document Processing Handler

// Application/Jobs/Handlers/DocumentProcessingHandler.cs
public sealed class DocumentProcessingHandler : IJobHandler<DocumentProcessingJob>
{
    private readonly IDocumentRepository _repository;
    private readonly IOcrService _ocrService;
    private readonly IEmbeddingService _embeddingService;
    private readonly INatsPublisher _nats;
    private readonly ILogger<DocumentProcessingHandler> _logger;

    public DocumentProcessingHandler(
        IDocumentRepository repository,
        IOcrService ocrService,
        IEmbeddingService embeddingService,
        INatsPublisher nats,
        ILogger<DocumentProcessingHandler> logger)
    {
        _repository = repository;
        _ocrService = ocrService;
        _embeddingService = embeddingService;
        _nats = nats;
        _logger = logger;
    }

    public async Task HandleAsync(DocumentProcessingJob job, CancellationToken ct = default)
    {
        _logger.LogInformation("Processing document: {DocumentId}", job.DocumentId);

        using var activity = ActivitySources.Workers.StartActivity("ProcessDocument");
        activity?.SetTag("document.id", job.DocumentId.Value);

        try
        {
            // Perform OCR
            var content = await _ocrService.ExtractTextAsync(job.Bucket, job.ObjectKey, ct);

            // Generate embeddings
            var embeddings = await _embeddingService.GenerateAsync(content, ct);

            // Update document
            var document = await _repository.GetByIdAsync(job.DocumentId, ct);
            document!.SetContent(content, embeddings);
            await _repository.UpdateAsync(document, ct);

            // Publish completion event
            await _nats.PublishAsync(
                $"{job.Environment}.archives.documents.analysis.completed",
                new DocumentAnalysisCompleted(job.DocumentId, job.OwnerId),
                ct);

            _logger.LogInformation("Document processed successfully: {DocumentId}", job.DocumentId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to process document: {DocumentId}", job.DocumentId);
            throw;
        }
    }
}

Job Model

// Application/Jobs/Models/DocumentProcessingJob.cs
public sealed record DocumentProcessingJob(
    DocumentId DocumentId,
    CustomId OwnerId,
    string Bucket,
    string ObjectKey,
    string Environment);

Timed Background Jobs

Scheduled Maintenance Service

// Workers/Services/MaintenanceService.cs
public sealed class MaintenanceService : BackgroundService
{
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly ILogger<MaintenanceService> _logger;

    private readonly TimeSpan _cleanupInterval = TimeSpan.FromHours(6);
    private readonly TimeSpan _healthCheckInterval = TimeSpan.FromMinutes(5);

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var tasks = new[]
        {
            RunPeriodicAsync(CleanupOrphanedFilesAsync, _cleanupInterval, stoppingToken),
            RunPeriodicAsync(CheckServiceHealthAsync, _healthCheckInterval, stoppingToken)
        };

        await Task.WhenAll(tasks);
    }

    private async Task RunPeriodicAsync(
        Func<CancellationToken, Task> action,
        TimeSpan interval,
        CancellationToken stoppingToken)
    {
        using var timer = new PeriodicTimer(interval);

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await action(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Periodic task failed: {Action}", action.Method.Name);
            }

            try
            {
                await timer.WaitForNextTickAsync(stoppingToken);
            }
            catch (OperationCanceledException)
            {
                break;
            }
        }
    }

    private async Task CleanupOrphanedFilesAsync(CancellationToken ct)
    {
        _logger.LogInformation("Starting orphaned file cleanup");

        await using var scope = _scopeFactory.CreateAsyncScope();
        var cleanup = scope.ServiceProvider.GetRequiredService<IOrphanedFileCleanup>();

        var deletedCount = await cleanup.CleanupAsync(ct);
        _logger.LogInformation("Cleanup complete: {Count} files removed", deletedCount);
    }

    private async Task CheckServiceHealthAsync(CancellationToken ct)
    {
        await using var scope = _scopeFactory.CreateAsyncScope();
        var healthChecker = scope.ServiceProvider.GetRequiredService<IServiceHealthChecker>();

        var results = await healthChecker.CheckAllAsync(ct);

        foreach (var (service, healthy) in results)
        {
            if (!healthy)
            {
                _logger.LogWarning("Service unhealthy: {Service}", service);
            }
        }
    }
}

The PeriodicTimer was introduced in .NET 6 specifically for this kind of periodic background work, and it avoids the drift issues inherent in a Task.Delay loop.

[PeriodicTimer Class] — Microsoft , 2024-09-10

Retry Policies

Polly Integration

Polly’s resilience pipelines integrate naturally with job handlers, providing exponential backoff and timeout protection for transient failures.

[Polly v8 - Resilience and transient-fault-handling library] — Polly Project , 2024-10-05
// Application/Jobs/Handlers/ResilientJobHandler.cs
public sealed class ResilientJobHandler<T> : IJobHandler<T> where T : class
{
    private readonly IJobHandler<T> _inner;
    private readonly ILogger<ResilientJobHandler<T>> _logger;
    private readonly ResiliencePipeline _pipeline;

    public ResilientJobHandler(
        IJobHandler<T> inner,
        ILogger<ResilientJobHandler<T>> logger)
    {
        _inner = inner;
        _logger = logger;

        _pipeline = new ResiliencePipelineBuilder()
            .AddRetry(new RetryStrategyOptions
            {
                MaxRetryAttempts = 3,
                Delay = TimeSpan.FromSeconds(2),
                BackoffType = DelayBackoffType.Exponential,
                ShouldHandle = new PredicateBuilder().Handle<Exception>(ex =>
                    ex is not InvalidOperationException),
                OnRetry = args =>
                {
                    _logger.LogWarning(args.Outcome.Exception,
                        "Retry attempt {Attempt} for {JobType}",
                        args.AttemptNumber, typeof(T).Name);
                    return ValueTask.CompletedTask;
                }
            })
            .AddTimeout(TimeSpan.FromMinutes(5))
            .Build();
    }

    public async Task HandleAsync(T job, CancellationToken ct = default)
    {
        await _pipeline.ExecuteAsync(
            async token => await _inner.HandleAsync(job, token),
            ct);
    }
}

Registration

Service Configuration

// Program.cs
builder.Services.AddSingleton<IBackgroundJobQueue<DocumentProcessingJob>,
    BackgroundJobQueue<DocumentProcessingJob>>();

builder.Services.AddScoped<IJobHandler<DocumentProcessingJob>, DocumentProcessingHandler>();

builder.Services.AddHostedService<QueuedHostedService<DocumentProcessingJob>>();
builder.Services.AddHostedService<MaintenanceService>();

// Decorator pattern for retry
builder.Services.Decorate<IJobHandler<DocumentProcessingJob>,
    ResilientJobHandler<DocumentProcessingJob>>();

The Scrutor library provides the Decorate extension method used here, which simplifies applying the decorator pattern in the DI container.

[Scrutor - Assembly scanning and decoration for Microsoft.Extensions.DependencyInjection] — Kristian Hellang , 2024-07-22

Enqueueing Jobs

From API Endpoint

// Api/Endpoints/Documents/UploadDocumentEndpoint.cs
public sealed class UploadDocumentEndpoint : Endpoint<UploadDocumentRequest, DocumentResponse>
{
    private readonly IBackgroundJobQueue<DocumentProcessingJob> _jobQueue;
    private readonly IDocumentRepository _repository;

    public override async Task HandleAsync(UploadDocumentRequest req, CancellationToken ct)
    {
        // Save document metadata
        var document = Document.Create(req.FileName, CurrentUser.CustomId);
        await _repository.AddAsync(document, ct);

        // Queue background processing
        await _jobQueue.QueueAsync(new DocumentProcessingJob(
            document.Id,
            CurrentUser.CustomId,
            $"{Environment}-{CurrentUser.CustomId}",
            document.ObjectKey,
            Environment), ct);

        await SendCreatedAtAsync<GetDocumentEndpoint>(
            new { id = document.Id.Value },
            document.ToResponse());
    }
}

Conclusion

Background processing is one of those areas where the happy path is simple but production resilience requires careful thought. The patterns covered here — BackgroundService for continuous loops, Channel<T> for bounded in-process queues, PeriodicTimer for scheduled work, and the handler-plus-decorator pattern for retry — form a layered approach that has served us well across multiple services. The biggest lesson I took away is that every background job needs an explicit answer to: “What happens if this job is interrupted mid-execution?” If you cannot answer that question for every handler, you are not ready for production.

PatternUse CaseCharacteristics
BackgroundServiceContinuous processingLong-running, singleton
Channel QueueIn-process asyncFast, bounded memory
Timed ServiceScheduled tasksPeriodic execution
Handler + QueueDecoupled processingTestable, scalable

Next Steps

Further Reading

[Microsoft: Background tasks with hosted services] — Microsoft , 2024 [System.Threading.Channels deep dive] — Microsoft , 2024 [Polly resilience and transient-fault-handling] — GitHub Community , 2024 [Scrutor for DI decoration patterns] — GitHub Community , 2024 [Background tasks with hosted services in ASP.NET Core] — Microsoft , 2024-11-15