⚡ Backend Intermediate ⏱️ 15 min

Background Job Processing in .NET

Implement reliable background job processing with hosted services, channels, and distributed task queues in .NET applications.

By Victor Robin

Introduction

Background job processing enables applications to handle long-running tasks asynchronously without blocking the main request thread. This is critical for keeping APIs responsive while performing heavy operations like document analysis, video transcoding, or sending emails.

This guide covers implementing robust background processing patterns in .NET using native BackgroundService, local channels, and distributed message queues.

What We’ll Build

  1. Hosted Service: A simple loop for periodic tasks.
  2. Channel Queue: An in-memory producer-consumer queue.
  3. Job Handlers: A pattern for decoupling job logic from the execution engine.

Architecture Overview

flowchart LR
    API["🚀 API\n(User Request)"] -->|Enqueues| Queue["📥 Job Queue\n(Channel/NATS)"]
    
    subgraph Worker["⚙️ Worker Service"]
        Queue -->|Dequeues| Handler["👷 Job Handler"]
        Handler -->|Updates| DB[(Database)]
        Handler -->|Publishes| Event["📢 Event\n(Completed)"]
    end

    classDef primary fill:#7c3aed,color:#fff
    classDef secondary fill:#06b6d4,color:#fff
    classDef db fill:#f43f5e,color:#fff
    classDef warning fill:#fbbf24,color:#000

    class API,Worker primary
    class DB db

Implementation

Hosted Services

Basic Background Service

// Workers/Services/DocumentProcessingService.cs
public sealed class DocumentProcessingService : BackgroundService
{
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly ILogger<DocumentProcessingService> _logger;

    public DocumentProcessingService(
        IServiceScopeFactory scopeFactory,
        ILogger<DocumentProcessingService> logger)
    {
        _scopeFactory = scopeFactory;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Document processing service starting");

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await using var scope = _scopeFactory.CreateAsyncScope();
                var processor = scope.ServiceProvider.GetRequiredService<IDocumentProcessor>();
                
                await processor.ProcessPendingDocumentsAsync(stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error processing documents");
            }

            await Task.Delay(TimeSpan.FromSeconds(10), stoppingToken);
        }

        _logger.LogInformation("Document processing service stopping");
    }
}

Channel-Based Queue

Job Queue Implementation

// Application/Jobs/IBackgroundJobQueue.cs
public interface IBackgroundJobQueue<T> where T : class
{
    ValueTask QueueAsync(T job, CancellationToken ct = default);
    ValueTask<T> DequeueAsync(CancellationToken ct);
}

// Infrastructure/Jobs/BackgroundJobQueue.cs
public sealed class BackgroundJobQueue<T> : IBackgroundJobQueue<T> where T : class
{
    private readonly Channel<T> _channel;
    private readonly ILogger<BackgroundJobQueue<T>> _logger;

    public BackgroundJobQueue(ILogger<BackgroundJobQueue<T>> logger, int capacity = 100)
    {
        _logger = logger;
        _channel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait,
            SingleReader = false,
            SingleWriter = false
        });
    }

    public async ValueTask QueueAsync(T job, CancellationToken ct = default)
    {
        ArgumentNullException.ThrowIfNull(job);
        
        await _channel.Writer.WriteAsync(job, ct);
        _logger.LogDebug("Job queued: {JobType}", typeof(T).Name);
    }

    public async ValueTask<T> DequeueAsync(CancellationToken ct)
    {
        var job = await _channel.Reader.ReadAsync(ct);
        _logger.LogDebug("Job dequeued: {JobType}", typeof(T).Name);
        return job;
    }
}

Queue Consumer Service

// Workers/Services/QueuedHostedService.cs
public sealed class QueuedHostedService<T> : BackgroundService where T : class
{
    private readonly IBackgroundJobQueue<T> _queue;
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly ILogger<QueuedHostedService<T>> _logger;
    private readonly int _workerCount;

    public QueuedHostedService(
        IBackgroundJobQueue<T> queue,
        IServiceScopeFactory scopeFactory,
        ILogger<QueuedHostedService<T>> logger,
        int workerCount = 3)
    {
        _queue = queue;
        _scopeFactory = scopeFactory;
        _logger = logger;
        _workerCount = workerCount;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Starting {Count} workers for {Type}", _workerCount, typeof(T).Name);

        var workers = Enumerable.Range(0, _workerCount)
            .Select(i => ProcessQueueAsync(i, stoppingToken))
            .ToArray();

        await Task.WhenAll(workers);
    }

    private async Task ProcessQueueAsync(int workerId, CancellationToken stoppingToken)
    {
        _logger.LogDebug("Worker {WorkerId} starting", workerId);

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var job = await _queue.DequeueAsync(stoppingToken);
                
                await using var scope = _scopeFactory.CreateAsyncScope();
                var handler = scope.ServiceProvider.GetRequiredService<IJobHandler<T>>();
                
                await handler.HandleAsync(job, stoppingToken);
            }
            catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
            {
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Worker {WorkerId} error processing job", workerId);
            }
        }

        _logger.LogDebug("Worker {WorkerId} stopping", workerId);
    }
}

Job Handler Pattern

Handler Interface

// Application/Jobs/IJobHandler.cs
public interface IJobHandler<in T> where T : class
{
    Task HandleAsync(T job, CancellationToken ct = default);
}

Document Processing Handler

// Application/Jobs/Handlers/DocumentProcessingHandler.cs
public sealed class DocumentProcessingHandler : IJobHandler<DocumentProcessingJob>
{
    private readonly IDocumentRepository _repository;
    private readonly IOcrService _ocrService;
    private readonly IEmbeddingService _embeddingService;
    private readonly INatsPublisher _nats;
    private readonly ILogger<DocumentProcessingHandler> _logger;

    public DocumentProcessingHandler(
        IDocumentRepository repository,
        IOcrService ocrService,
        IEmbeddingService embeddingService,
        INatsPublisher nats,
        ILogger<DocumentProcessingHandler> logger)
    {
        _repository = repository;
        _ocrService = ocrService;
        _embeddingService = embeddingService;
        _nats = nats;
        _logger = logger;
    }

    public async Task HandleAsync(DocumentProcessingJob job, CancellationToken ct = default)
    {
        _logger.LogInformation("Processing document: {DocumentId}", job.DocumentId);
        
        using var activity = ActivitySources.Workers.StartActivity("ProcessDocument");
        activity?.SetTag("document.id", job.DocumentId.Value);

        try
        {
            // Perform OCR
            var content = await _ocrService.ExtractTextAsync(job.Bucket, job.ObjectKey, ct);
            
            // Generate embeddings
            var embeddings = await _embeddingService.GenerateAsync(content, ct);
            
            // Update document
            var document = await _repository.GetByIdAsync(job.DocumentId, ct);
            document!.SetContent(content, embeddings);
            await _repository.UpdateAsync(document, ct);
            
            // Publish completion event
            await _nats.PublishAsync(
                $"{job.Environment}.archives.documents.analysis.completed",
                new DocumentAnalysisCompleted(job.DocumentId, job.OwnerId),
                ct);

            _logger.LogInformation("Document processed successfully: {DocumentId}", job.DocumentId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to process document: {DocumentId}", job.DocumentId);
            throw;
        }
    }
}

Job Model

// Application/Jobs/Models/DocumentProcessingJob.cs
public sealed record DocumentProcessingJob(
    DocumentId DocumentId,
    BlueRobinId OwnerId,
    string Bucket,
    string ObjectKey,
    string Environment);

Timed Background Jobs

Scheduled Maintenance Service

// Workers/Services/MaintenanceService.cs
public sealed class MaintenanceService : BackgroundService
{
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly ILogger<MaintenanceService> _logger;
    
    private readonly TimeSpan _cleanupInterval = TimeSpan.FromHours(6);
    private readonly TimeSpan _healthCheckInterval = TimeSpan.FromMinutes(5);

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var tasks = new[]
        {
            RunPeriodicAsync(CleanupOrphanedFilesAsync, _cleanupInterval, stoppingToken),
            RunPeriodicAsync(CheckServiceHealthAsync, _healthCheckInterval, stoppingToken)
        };

        await Task.WhenAll(tasks);
    }

    private async Task RunPeriodicAsync(
        Func<CancellationToken, Task> action,
        TimeSpan interval,
        CancellationToken stoppingToken)
    {
        using var timer = new PeriodicTimer(interval);
        
        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                await action(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Periodic task failed: {Action}", action.Method.Name);
            }

            try
            {
                await timer.WaitForNextTickAsync(stoppingToken);
            }
            catch (OperationCanceledException)
            {
                break;
            }
        }
    }

    private async Task CleanupOrphanedFilesAsync(CancellationToken ct)
    {
        _logger.LogInformation("Starting orphaned file cleanup");
        
        await using var scope = _scopeFactory.CreateAsyncScope();
        var cleanup = scope.ServiceProvider.GetRequiredService<IOrphanedFileCleanup>();
        
        var deletedCount = await cleanup.CleanupAsync(ct);
        _logger.LogInformation("Cleanup complete: {Count} files removed", deletedCount);
    }

    private async Task CheckServiceHealthAsync(CancellationToken ct)
    {
        await using var scope = _scopeFactory.CreateAsyncScope();
        var healthChecker = scope.ServiceProvider.GetRequiredService<IServiceHealthChecker>();
        
        var results = await healthChecker.CheckAllAsync(ct);
        
        foreach (var (service, healthy) in results)
        {
            if (!healthy)
            {
                _logger.LogWarning("Service unhealthy: {Service}", service);
            }
        }
    }
}

Retry Policies

Polly Integration

// Application/Jobs/Handlers/ResilientJobHandler.cs
public sealed class ResilientJobHandler<T> : IJobHandler<T> where T : class
{
    private readonly IJobHandler<T> _inner;
    private readonly ILogger<ResilientJobHandler<T>> _logger;
    private readonly ResiliencePipeline _pipeline;

    public ResilientJobHandler(
        IJobHandler<T> inner,
        ILogger<ResilientJobHandler<T>> logger)
    {
        _inner = inner;
        _logger = logger;
        
        _pipeline = new ResiliencePipelineBuilder()
            .AddRetry(new RetryStrategyOptions
            {
                MaxRetryAttempts = 3,
                Delay = TimeSpan.FromSeconds(2),
                BackoffType = DelayBackoffType.Exponential,
                ShouldHandle = new PredicateBuilder().Handle<Exception>(ex => 
                    ex is not InvalidOperationException),
                OnRetry = args =>
                {
                    _logger.LogWarning(args.Outcome.Exception,
                        "Retry attempt {Attempt} for {JobType}",
                        args.AttemptNumber, typeof(T).Name);
                    return ValueTask.CompletedTask;
                }
            })
            .AddTimeout(TimeSpan.FromMinutes(5))
            .Build();
    }

    public async Task HandleAsync(T job, CancellationToken ct = default)
    {
        await _pipeline.ExecuteAsync(
            async token => await _inner.HandleAsync(job, token),
            ct);
    }
}

Registration

Service Configuration

// Program.cs
builder.Services.AddSingleton<IBackgroundJobQueue<DocumentProcessingJob>, 
    BackgroundJobQueue<DocumentProcessingJob>>();

builder.Services.AddScoped<IJobHandler<DocumentProcessingJob>, DocumentProcessingHandler>();

builder.Services.AddHostedService<QueuedHostedService<DocumentProcessingJob>>();
builder.Services.AddHostedService<MaintenanceService>();

// Decorator pattern for retry
builder.Services.Decorate<IJobHandler<DocumentProcessingJob>, 
    ResilientJobHandler<DocumentProcessingJob>>();

Enqueueing Jobs

From API Endpoint

// Api/Endpoints/Documents/UploadDocumentEndpoint.cs
public sealed class UploadDocumentEndpoint : Endpoint<UploadDocumentRequest, DocumentResponse>
{
    private readonly IBackgroundJobQueue<DocumentProcessingJob> _jobQueue;
    private readonly IDocumentRepository _repository;

    public override async Task HandleAsync(UploadDocumentRequest req, CancellationToken ct)
    {
        // Save document metadata
        var document = Document.Create(req.FileName, CurrentUser.BlueRobinId);
        await _repository.AddAsync(document, ct);
        
        // Queue background processing
        await _jobQueue.QueueAsync(new DocumentProcessingJob(
            document.Id,
            CurrentUser.BlueRobinId,
            $"{Environment}-{CurrentUser.BlueRobinId}",
            document.ObjectKey,
            Environment), ct);
        
        await SendCreatedAtAsync<GetDocumentEndpoint>(
            new { id = document.Id.Value },
            document.ToResponse());
    }
}

Conclusion

Background processing patterns:

PatternUse CaseCharacteristics
BackgroundServiceContinuous processingLong-running, singleton
Channel QueueIn-process asyncFast, bounded memory
Timed ServiceScheduled tasksPeriodic execution
Handler + QueueDecoupled processingTestable, scalable

Proper background job processing ensures responsive APIs while handling complex, long-running operations reliably.

[Background tasks with hosted services in ASP.NET Core] — Microsoft