⚡ Backend Advanced ⏱️ 15 min

NATS-Powered Telegram Notification System

Building a real-time notification pipeline from document processing events to Telegram messages using NATS Core, environment-prefixed subjects, and the publisher-subscriber pattern.

By Victor Robin Updated:

2026 Upgrade Note

This article was upgraded on 2026-02-26 to reflect the event contract rename from upload-based wording to OCR requested semantics. The examples and narrative now align with the current document lifecycle subjects used across API, workers, and Telegram notification flows.

Introduction

When a user uploads a document to BlueRobin, it goes through OCR, classification, entity extraction, chunking, and embedding generation. This can take anywhere from 30 seconds to several minutes. Users shouldn’t have to refresh the page to know when processing completes—they should get a notification directly in Telegram.

This article covers how we built the notification pipeline using NATS messaging:

API → NATS Subject → Telegram Bot → Telegram API → User

Architecture Overview

flowchart TB
    subgraph API["BlueRobin API"]
        Service[TelegramNotificationService]
        Provider[IEventSubjectProvider]
    end
    
    subgraph NATS["NATS Server (data-layer)"]
        Subject["staging.notifications.telegram"]
    end
    
    subgraph Bot["Telegram Bot Service"]
        Worker[TelegramNotificationWorker]
        Client[TelegramBotClient]
    end
    
    subgraph Telegram["Telegram"]
        Server[Telegram API]
        App[User's Telegram]
    end
    
    Service -->|1. Get prefixed subject| Provider
    Provider -->|"staging.notifications.telegram"| Service
    Service -->|2. Publish message| Subject
    Subject -->|3. Deliver| Worker
    Worker -->|4. Format message| Client
    Client -->|5. Send| Server
    Server -->|6. Push| App
    
    classDef api fill:#7c3aed,color:#fff
    classDef nats fill:#06b6d4,color:#fff
    classDef bot fill:#22c55e,color:#fff
    classDef telegram fill:#0088cc,color:#fff
    
    class Service,Provider api
    class Subject nats
    class Worker,Client bot
    class Server,App telegram

Environment-Prefixed Subjects

A critical design decision: all NATS subjects are environment-prefixed. This enables:

  • Running staging and production on the same NATS cluster
  • Preventing cross-environment message leakage
  • Supporting local development against shared infrastructure
📄
public interface IEventSubjectProvider
{
    string EnvironmentPrefix { get; }
    
    // Document lifecycle subjects
    string OcrRequested { get; }
    string OcrCompleted { get; }
    string EmbeddingsCompleted { get; }
    
    // Notification subjects
    string NotificationTelegram { get; }
    string NotificationEmail { get; }
    string NotificationWebhook { get; }
    
    // Telegram link subjects (NEW)
    string TelegramLinkTokenValidate { get; }
    string TelegramPushLinkRequest { get; }
    
    string ApplyPrefix(string subject);
}

public class EventSubjectProvider : IEventSubjectProvider
{
    private readonly EnvironmentOptions _options;

    public EventSubjectProvider(IOptions<EnvironmentOptions> options)
    {
        _options = options.Value;

        // Pre-compute all static subjects for performance
        OcrRequested = _options.ApplySubjectPrefix("archives.documents.ocr.requested");
        OcrCompleted = _options.ApplySubjectPrefix("archives.documents.ocr.completed");
        EmbeddingsCompleted = _options.ApplySubjectPrefix("archives.documents.embeddings.completed");
        
        // Notification subjects
        NotificationTelegram = _options.ApplySubjectPrefix("notifications.telegram");
        NotificationEmail = _options.ApplySubjectPrefix("notifications.email");
        NotificationWebhook = _options.ApplySubjectPrefix("notifications.webhook");
        
        // Telegram link subjects
        TelegramLinkTokenValidate = _options.ApplySubjectPrefix("telegram.link-token.validate");
        TelegramPushLinkRequest = _options.ApplySubjectPrefix("telegram.push-link.request");
    }

    public string EnvironmentPrefix => _options.Prefix;
    
    public string ApplyPrefix(string subject) => _options.ApplySubjectPrefix(subject);
    
    // Cached properties...
    public string OcrRequested { get; }
    public string NotificationTelegram { get; }
    public string TelegramLinkTokenValidate { get; }
    public string TelegramPushLinkRequest { get; }
    // etc.
}

The EnvironmentOptions.ApplySubjectPrefix method handles the prefixing logic:

public class EnvironmentOptions
{
    public const string SectionName = "Environment";
    
    /// <summary>
    /// Environment prefix for NATS subjects and MinIO buckets.
    /// Empty string for production, "staging" or "dev" for other environments.
    /// </summary>
    public string Prefix { get; set; } = "";

    public string ApplySubjectPrefix(string subject)
    {
        return string.IsNullOrEmpty(Prefix) ? subject : $"{Prefix}.{subject}";
    }
}

Result:

  • Production: notifications.telegram, telegram.push-link.request
  • Staging: staging.notifications.telegram, staging.telegram.push-link.request
  • Dev: dev.notifications.telegram, dev.telegram.push-link.request

The Publisher: TelegramNotificationService

The API uses TelegramNotificationService to publish notifications. This service is injected wherever notifications need to be sent:

📄
public class TelegramNotificationService : INotificationService
{
    private readonly IMessagingConnection _messaging;
    private readonly IUserRepository _userRepository;
    private readonly IEventSubjectProvider _subjects;
    private readonly ILogger<TelegramNotificationService> _logger;

    public TelegramNotificationService(
        IUserRepository userRepository,
        IMessagingConnection messaging,
        IEventSubjectProvider subjects,
        ILogger<TelegramNotificationService> logger)
    {
        _userRepository = userRepository;
        _messaging = messaging;
        _subjects = subjects;
        _logger = logger;
    }

    public async Task NotifyDocumentProcessedAsync(
        BlueRobinId userId,
        BlueRobinId documentId,
        string documentName,
        bool success,
        string? errorMessage = null,
        CancellationToken cancellationToken = default)
    {
        var user = await _userRepository.GetByIdAsync(userId, cancellationToken);
        if (user == null) return;

        // Check notification preferences
        if (success && user.Preferences?.NotifyOnDocumentProcessed != true) return;
        if (!success && user.Preferences?.NotifyOnDocumentFailed != true) return;

        var chatId = user.GetTelegramChatId();
        if (!chatId.HasValue) return;

        var notification = new DocumentProcessedNotification
        {
            MessageId = Guid.NewGuid().ToString("N"),
            ChatId = chatId.Value,
            Title = success ? "✅ Document Processed" : "❌ Processing Failed",
            Body = success
                ? $"Your document *{documentName}* has been processed successfully."
                : $"Failed to process *{documentName}*.\n\nError: {errorMessage ?? "Unknown"}",
            UserId = userId.Value,
            DocumentId = documentId.Value,
            DocumentName = documentName,
            Success = success,
            Priority = success ? NotificationPriority.Normal : NotificationPriority.High,
            ParseMarkdown = true
        };

        await PublishTelegramNotificationAsync(notification, cancellationToken);
    }

    private async Task PublishTelegramNotificationAsync(
        TelegramNotificationMessage notification,
        CancellationToken cancellationToken)
    {
        try
        {
            // Use the environment-prefixed subject
            await _messaging.PublishAsync(
                _subjects.NotificationTelegram, 
                notification, 
                cancellationToken);
                
            _logger.LogDebug(
                "Published notification {MessageId} to {Subject} for chat {ChatId}",
                notification.MessageId, 
                _subjects.NotificationTelegram, 
                notification.ChatId);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to publish notification {MessageId}", notification.MessageId);
            throw;
        }
    }
}

The Subscriber: TelegramNotificationWorker

The Telegram Bot service runs a BackgroundService that subscribes to the notification subject and sends messages via the Telegram Bot API:

📄
/// <summary>
/// Background service that listens to NATS Core for outbound Telegram notifications
/// and sends them via the Telegram Bot API.
/// 
/// Uses Core NATS subscription to match the publisher (TelegramNotificationService)
/// which publishes via IMessagingConnection.PublishAsync().
/// Uses IEventSubjectProvider for environment-prefixed subjects.
/// </summary>
public class TelegramNotificationWorker : BackgroundService
{
    private readonly INatsConnection _natsConnection;
    private readonly TelegramBotOptions _options;
    private readonly IEventSubjectProvider _subjects;
    private readonly ILogger<TelegramNotificationWorker> _logger;
    private TelegramBotClient? _botClient;

    public TelegramNotificationWorker(
        INatsConnection natsConnection,
        IOptions<TelegramBotOptions> options,
        IEventSubjectProvider subjects,
        ILogger<TelegramNotificationWorker> logger)
    {
        _natsConnection = natsConnection;
        _options = options.Value;
        _subjects = subjects;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        if (string.IsNullOrWhiteSpace(_options.BotToken))
        {
            _logger.LogError("Telegram bot token not configured");
            return;
        }

        _botClient = new TelegramBotClient(_options.BotToken);

        // Use environment-prefixed notification subject
        var subject = _subjects.NotificationTelegram;
        
        _logger.LogInformation(
            "Telegram notification worker starting, subscribing to {Subject}", 
            subject);

        try
        {
            // Subscribe using Core NATS (matching publisher pattern)
            await foreach (var msg in _natsConnection.SubscribeAsync<TelegramNotificationMessage>(
                subject, cancellationToken: stoppingToken))
            {
                try
                {
                    if (msg.Data != null)
                    {
                        await ProcessNotificationAsync(msg.Data, stoppingToken);
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error processing notification");
                    // Continue processing other messages
                }
            }
        }
        catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
        {
            _logger.LogInformation("Notification worker stopping...");
        }
    }

    private async Task ProcessNotificationAsync(
        TelegramNotificationMessage notification, 
        CancellationToken cancellationToken)
    {
        if (_botClient == null) return;

        _logger.LogDebug(
            "Sending notification to chat {ChatId}: {Title}",
            notification.ChatId, notification.Title);

        var message = string.IsNullOrEmpty(notification.Title)
            ? notification.Body
            : $"*{notification.Title}*\n\n{notification.Body}";

        await _botClient.SendMessage(
            chatId: notification.ChatId,
            text: message,
            parseMode: notification.ParseMarkdown ? ParseMode.Markdown : ParseMode.None,
            cancellationToken: cancellationToken);

        _logger.LogInformation(
            "Sent notification {MessageId} to chat {ChatId}",
            notification.MessageId, notification.ChatId);
    }
}

Why Core NATS, Not JetStream?

You might wonder why we use Core NATS instead of JetStream for notifications. The key considerations:

FeatureCore NATSJetStream
DeliveryAt-most-onceAt-least-once
PersistenceNoYes
ReplayNoYes
LatencyLowerHigher
ComplexitySimpleMore complex

For notifications, we chose Core NATS because:

  1. Notifications are ephemeral - If the bot is down, users would rather get the notification late (next document) than get duplicate old notifications
  2. Latency matters - Users expect near-instant notification
  3. Idempotency is hard - De-duplicating “Document X processed” notifications adds complexity
  4. Simple debugging - No consumer state to troubleshoot

For document processing events (OCR, embeddings), we use JetStream because guaranteed delivery is critical.

Message Schema

The notification message is a simple DTO:

public record TelegramNotificationMessage
{
    public required string MessageId { get; init; }
    public required long ChatId { get; init; }
    public string Title { get; init; } = "";
    public required string Body { get; init; }
    public string? UserId { get; init; }
    public NotificationPriority Priority { get; init; } = NotificationPriority.Normal;
    public bool ParseMarkdown { get; init; } = true;
}

public record DocumentProcessedNotification : TelegramNotificationMessage
{
    public required string DocumentId { get; init; }
    public required string DocumentName { get; init; }
    public required bool Success { get; init; }
    public string? ErrorMessage { get; init; }
    public string NotificationType { get; init; } = "document_processed";
}

Deployment Configuration

Both services need matching environment configuration:

📄
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization

namespace: archives-staging

resources:
  - ../../base

patches:
  - target:
      kind: ConfigMap
      name: archives-telegram-bot-config
    patch: |-
      - op: replace
        path: /data/Environment__Prefix
        value: "staging"
      - op: replace
        path: /data/Telegram__MiniAppUrl
        value: "https://web-staging.bluerobin.local/telegram/setup-passkey"

For production, Environment__Prefix is empty (or absent), resulting in unprefixed subjects.

End-to-End Flow

Let’s trace a notification from document upload to Telegram:

sequenceDiagram
    participant User
    participant API as BlueRobin API
    participant Worker as OCR Worker
    participant NATS
    participant Bot as Telegram Bot
    participant TG as Telegram
    
    User->>API: Upload document
    API->>NATS: staging.archives.documents.ocr.requested
    API-->>User: 202 Accepted
    
    Note over Worker: Processing begins...
    Worker->>NATS: Subscribe staging.archives.documents.>
    NATS->>Worker: OCR requested event
    Worker->>Worker: OCR, classify, extract...
    
    alt Success
        Worker->>API: Mark complete
        API->>NATS: staging.notifications.telegram
        NATS->>Bot: Notification message
        Bot->>TG: SendMessage API
        TG->>User: "✅ Document Processed"
    else Failure
        Worker->>API: Mark failed
        API->>NATS: staging.notifications.telegram
        NATS->>Bot: Notification message
        Bot->>TG: SendMessage API
        TG->>User: "❌ Processing Failed"
    end

Testing the Pipeline

To verify notifications are flowing:

  1. Check NATS subscription:
nats sub "staging.notifications.telegram" --server nats://nats.data-layer:4222
  1. Publish a test message:
nats pub "staging.notifications.telegram" '{"MessageId":"test","ChatId":123456789,"Body":"Test notification"}' --server nats://nats.data-layer:4222
  1. Check bot logs:
kubectl logs -n archives-staging deployment/archives-telegram-bot -f

Conclusion

The NATS-powered notification system provides:

  • Real-time delivery - Sub-second latency from event to Telegram
  • Environment isolation - Safe multi-tenant infrastructure
  • Loose coupling - API and Bot deploy independently
  • Observable - Full tracing through OpenTelemetry

Key Patterns:

  1. Use IEventSubjectProvider for all NATS subjects
  2. Match publisher and subscriber messaging patterns (Core vs JetStream)
  3. Pre-compute subjects at startup for performance
  4. Check user preferences before sending notifications

Next in Series: Complete Telegram Bot Deployment Guide - Kubernetes manifests, secrets management, and Flux GitOps configuration.