Skip to content

Message_queues

Chapter 19: Message Queues & Async Processing

Section titled “Chapter 19: Message Queues & Async Processing”

Asynchronous Communication in Distributed Systems

Section titled “Asynchronous Communication in Distributed Systems”

A Message Queue is a form of asynchronous service-to-service communication where messages are stored in a queue until processed and removed by a consumer.

Synchronous vs Asynchronous Communication
========================================
SYNCHRONOUS (Blocking):
┌─────────────────────────────────────────────────────────┐
│ Client │
│ │ │
│ │ 1. Send Request │
│ ▼ │
│ ┌──────────┐ │
│ │ Server │◀── Wait for response │
│ │ │──▶ Response │
│ └──────────┘ │
│ │ │
│ ▼ │
│ Client gets response (blocking!) │
│ │
│ Problems: │
│ • Slow if service is slow │
│ • If server down, client fails │
│ • Can't scale independently │
└─────────────────────────────────────────────────────────┘
ASYNCHRONOUS (Non-blocking):
┌─────────────────────────────────────────────────────────┐
│ Client │
│ │ │
│ │ 1. Send Message to Queue │
│ ▼ │
│ ┌──────────┐ │
│ │ Queue │──▶ Message stored │
│ │(Pending) │ (immediately returns) │
│ └──────────┘ │
│ │ │
│ │ 2. (Later) Worker picks up message │
│ ▼ │
│ ┌──────────┐ │
│ │ Worker │──▶ Process async │
│ └──────────┘ │
│ │
│ Benefits: │
│ • Instant response │
│ • Decoupled components │
│ • Resilient to failures │
│ • Scalable workers │
└─────────────────────────────────────────────────────────┘
Use CaseExampleWhy Queue?
Email notificationsSend welcome email after signupSlow, not time-critical
Image processingResize uploaded imagesCPU intensive
Order processingProcess payments, inventoryLong-running workflow
Report generationMonthly analyticsHeavy computation
Webhook deliveryNotify external servicesUnreliable recipients

Message Queue Benefits
=====================
┌─────────────────────────────────────────────────────────────┐
│ 1. DECOUPLING │
│ ───────────────── │
│ Producer doesn't know or care about consumers │
│ │
│ Service A (Producer) Service B (Consumer) │
│ │ │ │
│ ▼ │ │
│ ┌─────────┐ │ │
│ │ Queue │────────────────────────┘ │
│ └─────────┘ │
│ │
│ Add Service C, D anytime without changing A! │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 2. RELIABILITY │
│ ───────────────── │
│ Messages persisted until processed │
│ │
│ Worker crashes? Message stays in queue! │
│ Restart worker, it picks up where left off │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 3. SCALABILITY │
│ ───────────────── │
│ Scale consumers independently │
│ │
│ Traffic spike: Add more workers │
│ Low traffic: Fewer workers (or zero!) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 4. RESILIENCE (Traffic Burst Handling) │
│ ───────────────── │
│ │
│ 10,000 requests/sec │
│ │ │
│ ▼ │
│ ┌─────────┐ Workers process ┌─────────┐ │
│ │ Queue │──────at their pace────▶│ 100 msg │ │
│ │ 9000 msg│ (e.g., 100/sec) │in queue │ │
│ └─────────┘ └─────────┘ │
│ │
│ Queue absorbs the burst! No 503 errors! │
└─────────────────────────────────────────────────────────────┘
ApproachProsConsBest For
Message QueueReliable, flexibleComplexityMost async use cases
REST API CallsSimple, familiarBlockingTightly coupled
Webhook/CallbackReal-timeUnreliableEvent notifications
Database PollingSimpleWastefulRarely used
gRPC StreamingFast, bidirectionalComplexService-to-service

Message Queue Architecture
=========================
┌─────────────────────────────────────────────────────────────────┐
│ PRODUCERS │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ API │ │ Order │ │ Payment │ │
│ │ Service │ │ Service │ │ Service │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
└───────┼──────────────┼──────────────┼──────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ MESSAGE QUEUE │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Queue: orders.created │ │
│ │ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │ │
│ │ │msg1│ │msg2│ │msg3│ │msg4│ │msg5│ │msg6│ ... │ │
│ │ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Dead Letter Queue (DLQ) │ │
│ │ Failed messages after N retries │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ CONSUMERS (Workers) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Email │ │ Inventory│ │ Analytics│ │
│ │ Worker │ │ Worker │ │ Worker │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ Each worker processes messages independently │
└─────────────────────────────────────────────────────────────────┘

Point-to-Point Pattern
=====================
One producer sends to one consumer.
Producer ──▶ [Queue: orders] ──▶ Consumer 1
─────────────────────────────────────────
Use Cases:
• Order processing (one system handles each order)
• Task distribution (one worker picks up each task)
• Background jobs (single processor)
─────────────────────────────────────────
How it works:
Time →
┌────┐ ┌────┐ ┌────┐ ┌────┐
│msg1│────▶│msg1│ │msg1│ │ │
└────┘ └────┘ └────┘ └────┘
┌────────┐
│Worker 1│ (processes msg1)
└────────┘
┌────┐ ┌────┐ ┌────┐ ┌────┐
│msg2│────▶│msg2│────▶│msg2│ │ │
└────┘ └────┘ └────┘ └────┘
┌────────┐
│Worker 1│ (processes msg2)
└────────┘
Each message processed once, by one consumer
Publish-Subscribe Pattern
========================
One producer, multiple independent consumers.
Producer ──▶ [Topic: user.signup] ──▶ Consumer 1
├─────────────────────────▶ Consumer 2
└─────────────────────────▶ Consumer 3
─────────────────────────────────────────
Use Cases:
• Event broadcasting (notify all interested services)
• Microservices event-driven communication
• Real-time updates (multiple components need same data)
─────────────────────────────────────────
How it works:
Producer publishes: UserSignedUp {userId: 123}
┌─────────────────────────────────────────────────────────┐
│ Topic: user.signup │
│ msg: UserSignedUp │
│ │
│ Subscribers (each gets copy): │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ SendWelcome│ │ CreateS3 │ │ Update │ │
│ │ Email │ │ Profile │ │ Analytics │ │
│ └────────────┘ └────────────┘ └────────────┘ │
└─────────────────────────────────────────────────────────┘
AspectPoint-to-PointPublish-Subscribe
Message deliveryOne consumerAll consumers
Consumer groupsSingle consumerIndependent subscribers
OrderPreserved per queueBest-effort
Use caseTask processingEvent broadcasting

BrokerTypeThroughputLatencyManagedBest For
RabbitMQTraditionalMediumLowSelf-hostedFlexible routing
Apache KafkaStreamVery HighMediumSelf/ManagedHigh-volume streams
AWS SQSQueueHighLow✓ AWSSimple, serverless
AWS SNSPub/SubHighLow✓ AWSPush notifications
Google Pub/SubQueue/PubHighLow✓ GCPGCP integration
Redis StreamsBothHighVery LowSelf/ManagedLow-latency
RabbitMQ Architecture
====================
Producer ──▶ Exchange ──▶ Queue ──▶ Consumer
├── Direct (exact match)
├── Topic (wildcards)
├── Fanout (broadcast)
└── Headers (attribute-based)
─────────────────────────────────────────
Features:
• Multiple message patterns
• Flexible routing
• Reliability (publisher confirms, consumer acks)
• Plugins available
# RabbitMQ Producer Example
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Declare queue
channel.queue_declare(queue='orders')
# Publish message
channel.basic_publish(
exchange='',
routing_key='orders',
body='{"orderId": "123", "amount": 99.99}'
)
connection.close()
Kafka Architecture
=================
┌─────────────────────────────────────────────────────────────┐
│ Topics (Partitioned) │
│ │
│ orders │
│ ┌─────────┬─────────┬─────────┐ │
│ │Part 0 │Part 1 │Part 2 │ │
│ │[0,1,2] │[3,4,5] │[6,7,8] │ │
│ └────┬────┘└────┬────┘└────┬────┘ │
│ │ │ │ │
│ └─────────┼─────────┘ │
│ ▼ │
│ Consumer Groups │
│ ┌─────────────┐ │
│ │ Group A │ (load balanced) │
│ │ Consumer 1 │ │
│ │ Consumer 2 │ │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
Key Concepts:
• Partitioned for parallelism
• Offset management (at-least-once)
• Log retention (days/weeks)
• Replicated for durability
# Kafka Producer Example
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
producer.send('orders', {
'orderId': '123',
'amount': 99.99,
'timestamp': 1699999999
})
producer.flush()
producer.close()
AWS SQS Features
================
Standard Queue:
• Unlimited throughput
• At-least-once delivery
• Best-effort ordering
• 14-day retention
FIFO Queue:
• Exactly-once processing
• First-in-first-out
• 300 msg/sec limit
• Deduplication
─────────────────────────────────────────
Visibility Timeout:
┌────────────────────────────────────────────────────┐
│ Message received by Worker 1 │
│ (Hidden for 30 seconds) │
│ │ │
│ │ Worker 1 processes (takes 10 sec) │
│ │ │
│ ▼ After 30 sec, message visible again │
│ If no delete, message reappears (retry!) │
└────────────────────────────────────────────────────┘
Dead Letter Queue:
• Failed messages after maxReceiveCount
• Inspect without blocking main queue
# AWS SQS Example (boto3)
import boto3
sqs = boto3.client('sqs')
# Send message
response = sqs.send_message(
QueueUrl='https://sqs.us-east-1.amazonaws.com/123/orders',
MessageBody='{"orderId": "123"}',
DelaySeconds=0
)
# Receive message
response = sqs.receive_message(
QueueUrl='https://sqs.us-east-1.amazonaws.com/123/orders',
MaxNumberOfMessages=1,
WaitTimeSeconds=20
)
message = response['Messages'][0]
# Process message...
# Delete when done
sqs.delete_message(
QueueUrl='...',
ReceiptHandle=message['ReceiptHandle']
)

Command: "Do Something"
======================
Imperative - requesting an action to be performed
Examples:
─────────────────────────────────────────
CreateOrder { productId, quantity, address }
SendEmail { to, subject, body }
ProcessPayment { orderId, amount }
ReserveInventory { productId, quantity }
CancelSubscription { userId }
─────────────────────────────────────────
Characteristics:
• Expects action to be taken
• Usually one consumer
• May return result
• Can fail or succeed
─────────────────────────────────────────
Naming: Verb + Noun (imperative)
CreateUser, UpdateProfile, DeleteOrder
Event: "Something Happened"
==========================
Descriptive - notification that something occurred
Examples:
─────────────────────────────────────────
OrderCreated { orderId, timestamp, total }
UserSignedUp { userId, email, referrer }
PaymentProcessed { paymentId, status }
InventoryReserved { productId, quantity }
─────────────────────────────────────────
Characteristics:
• Past tense naming
• Multiple consumers possible
• No response expected
• Immutable record
─────────────────────────────────────────
Naming: Past tense (event)
OrderCreated, UserSignedUp, PaymentCompleted
Command Flow Event Flow
============ ==========
API ──▶ Command ──▶ Handler Event Bus
│ │ │ │
│ │ ▼ ├─▶ Consumer 1
│ │ ┌─────────┐ ├─▶ Consumer 2
│ │ │ Process │ ├─▶ Consumer 3
│ │ │ Action │
│ │ └─────────┘
│ │ Async, decoupled
▼ ▼
Response Multiple independent handlers
(sync or async) No response expected

Dead Letter Queue Pattern
=========================
┌─────────────────────────────────────────────────────────────┐
│ Processing Flow │
│ │
│ ┌─────────┐ │
│ │ Message │ │
│ │Received │ │
│ └────┬────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ Success ┌──────────┐ │
│ │ Process │──────────────▶│ Complete │ │
│ │ Message │ │ │ │
│ └────┬────┘ └──────────┘ │
│ │ Fail │
│ ▼ │
│ ┌─────────┐ Fail again │
│ │ Retry? │──────────┐ │
│ │ count++ │ │ │
│ └────┬────┘ │ │
│ │ < 3 │ │
│ ▼ │ │
│ ┌─────────┐ │ │
│ │ Back to │ │ │
│ │ Queue │──────────┘ │
│ └─────────┘ │
│ │ │
│ │ >= 3 │
│ ▼ │
│ ┌─────────┐ │
│ │ DLQ │ (Manual intervention required) │
│ └─────────┘ │
└─────────────────────────────────────────────────────────────┘
Retry Strategies
================
1. Immediate Retry (Not recommended)
─────────────────────────────────
Fail → Retry immediately → Fail again
Problem: Same error, wasted resources
2. Fixed Delay Retry
─────────────────────────────────
Fail → Wait 1 sec → Retry → Fail → Wait 1 sec → Retry
Better, but can overwhelm downstream
3. Exponential Backoff
─────────────────────────────────
Fail → Wait 1s → Retry
Fail → Wait 2s → Retry
Fail → Wait 4s → Retry
Fail → Wait 8s → Retry
...
Best practice! Delays grow exponentially
4. Exponential Backoff with Jitter
─────────────────────────────────
Random delay added to prevent thundering herd
Wait = min(cap, base * 2^attempt + random(0, jitter))
# Exponential Backoff with Jitter
import random
import time
def retry_with_backoff(max_retries=5, base_delay=1, cap=60, jitter=True):
for attempt in range(max_retries):
try:
# Your logic here
return do_work()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = min(cap, base_delay * (2 ** attempt))
if jitter:
delay = delay * (0.5 + random.random()) # 50-150%
print(f"Retry {attempt + 1} in {delay:.2f}s")
time.sleep(delay)
Idempotency: Handle Duplicate Messages
=======================================
Why duplicates happen:
• Producer retries after timeout
• Consumer crashes before acknowledging
• Queue redelivers "in-flight" messages
Solution: Make operations idempotent!
─────────────────────────────────────────
Non-Idempotent (BAD):
┌─────────────────────────────────────────┐
│ Message: IncrementCounter { counterId }│
│ │
│ Received twice: │
│ Counter = 10 → 11 → 12 (wrong!) │
└─────────────────────────────────────────┘
Idempotent (GOOD):
┌─────────────────────────────────────────┐
│ Message: SetCounter { counterId, value }│
│ │
│ Received twice: │
│ Counter = 10 → 11 → 11 (correct!) │
└─────────────────────────────────────────┘
─────────────────────────────────────────
Techniques:
• Use unique message IDs (check if already processed)
• Use database unique constraints
• Store processed message IDs in cache
# Idempotent Message Processing
import redis
r = redis.Redis()
def process_message(message):
msg_id = message['id']
# Check if already processed
if r.exists(f"processed:{msg_id}"):
print(f"Skipping duplicate: {msg_id}")
return
# Process the message
do_work(message)
# Mark as processed (with TTL)
r.setex(f"processed:{msg_id}", 86400, "1") # 24 hour TTL

Message Queue Best Practices
============================
✓ Use meaningful message format (JSON/Protobuf)
✓ Include unique message ID
✓ Add timestamps
✓ Version your messages
✓ Keep message size small (<1MB)
✓ Implement idempotency
✓ Set appropriate timeouts
✓ Monitor queue depth
✓ Set up DLQ for failed messages
✓ Handle poison messages
✓ Consider message ordering if needed
✓ Test failure scenarios
Message Queue Security
======================
1. Encryption in transit (TLS/SSL)
2. Encryption at rest
3. Access control (IAM policies)
4. VPC/private subnets
5. Audit logging
6. Input validation
7. Sanitize message content
AWS SQS Example:
─────────────────────────────────────────
• Use SSE (Server-Side Encryption)
• IAM policies for queue access
• VPC endpoints for private access

  1. Message Queues - Async communication for decoupled services
  2. Point-to-Point - One consumer per message
  3. Pub/Sub - Multiple independent consumers
  4. Brokers - RabbitMQ, Kafka, SQS, SNS
  5. Commands vs Events - Do something vs Something happened
  6. Dead Letter Queue - Handle poison messages
  7. Exponential Backoff - Graceful retry strategy
  8. Idempotency - Handle duplicate messages safely

Next: Chapter 20: CDN & Content Delivery