Message_queues
Chapter 19: Message Queues & Async Processing
Section titled “Chapter 19: Message Queues & Async Processing”Asynchronous Communication in Distributed Systems
Section titled “Asynchronous Communication in Distributed Systems”19.1 What is a Message Queue?
Section titled “19.1 What is a Message Queue?”A Message Queue is a form of asynchronous service-to-service communication where messages are stored in a queue until processed and removed by a consumer.
Synchronous vs Asynchronous Communication ========================================
SYNCHRONOUS (Blocking): ┌─────────────────────────────────────────────────────────┐ │ Client │ │ │ │ │ │ 1. Send Request │ │ ▼ │ │ ┌──────────┐ │ │ │ Server │◀── Wait for response │ │ │ │──▶ Response │ │ └──────────┘ │ │ │ │ │ ▼ │ │ Client gets response (blocking!) │ │ │ │ Problems: │ │ • Slow if service is slow │ │ • If server down, client fails │ │ • Can't scale independently │ └─────────────────────────────────────────────────────────┘
ASYNCHRONOUS (Non-blocking): ┌─────────────────────────────────────────────────────────┐ │ Client │ │ │ │ │ │ 1. Send Message to Queue │ │ ▼ │ │ ┌──────────┐ │ │ │ Queue │──▶ Message stored │ │ │(Pending) │ (immediately returns) │ │ └──────────┘ │ │ │ │ │ │ 2. (Later) Worker picks up message │ │ ▼ │ │ ┌──────────┐ │ │ │ Worker │──▶ Process async │ │ └──────────┘ │ │ │ │ Benefits: │ │ • Instant response │ │ • Decoupled components │ │ • Resilient to failures │ │ • Scalable workers │ └─────────────────────────────────────────────────────────┘When to Use Message Queues
Section titled “When to Use Message Queues”| Use Case | Example | Why Queue? |
|---|---|---|
| Email notifications | Send welcome email after signup | Slow, not time-critical |
| Image processing | Resize uploaded images | CPU intensive |
| Order processing | Process payments, inventory | Long-running workflow |
| Report generation | Monthly analytics | Heavy computation |
| Webhook delivery | Notify external services | Unreliable recipients |
19.2 Why Use Message Queues?
Section titled “19.2 Why Use Message Queues?”Core Benefits
Section titled “Core Benefits” Message Queue Benefits =====================
┌─────────────────────────────────────────────────────────────┐ │ 1. DECOUPLING │ │ ───────────────── │ │ Producer doesn't know or care about consumers │ │ │ │ Service A (Producer) Service B (Consumer) │ │ │ │ │ │ ▼ │ │ │ ┌─────────┐ │ │ │ │ Queue │────────────────────────┘ │ │ └─────────┘ │ │ │ │ Add Service C, D anytime without changing A! │ └─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐ │ 2. RELIABILITY │ │ ───────────────── │ │ Messages persisted until processed │ │ │ │ Worker crashes? Message stays in queue! │ │ Restart worker, it picks up where left off │ └─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐ │ 3. SCALABILITY │ │ ───────────────── │ │ Scale consumers independently │ │ │ │ Traffic spike: Add more workers │ │ Low traffic: Fewer workers (or zero!) │ └─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐ │ 4. RESILIENCE (Traffic Burst Handling) │ │ ───────────────── │ │ │ │ 10,000 requests/sec │ │ │ │ │ ▼ │ │ ┌─────────┐ Workers process ┌─────────┐ │ │ │ Queue │──────at their pace────▶│ 100 msg │ │ │ │ 9000 msg│ (e.g., 100/sec) │in queue │ │ │ └─────────┘ └─────────┘ │ │ │ │ Queue absorbs the burst! No 503 errors! │ └─────────────────────────────────────────────────────────────┘Comparison with Alternatives
Section titled “Comparison with Alternatives”| Approach | Pros | Cons | Best For |
|---|---|---|---|
| Message Queue | Reliable, flexible | Complexity | Most async use cases |
| REST API Calls | Simple, familiar | Blocking | Tightly coupled |
| Webhook/Callback | Real-time | Unreliable | Event notifications |
| Database Polling | Simple | Wasteful | Rarely used |
| gRPC Streaming | Fast, bidirectional | Complex | Service-to-service |
19.3 Components Architecture
Section titled “19.3 Components Architecture” Message Queue Architecture =========================
┌─────────────────────────────────────────────────────────────────┐ │ PRODUCERS │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ API │ │ Order │ │ Payment │ │ │ │ Service │ │ Service │ │ Service │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ └───────┼──────────────┼──────────────┼──────────────────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ MESSAGE QUEUE │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ Queue: orders.created │ │ │ │ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ ┌────┐ │ │ │ │ │msg1│ │msg2│ │msg3│ │msg4│ │msg5│ │msg6│ ... │ │ │ │ └────┘ └────┘ └────┘ └────┘ └────┘ └────┘ │ │ │ └──────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌──────────────────────────────────────────────────────────┐ │ │ │ Dead Letter Queue (DLQ) │ │ │ │ Failed messages after N retries │ │ │ └──────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────────┐ │ CONSUMERS (Workers) │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Email │ │ Inventory│ │ Analytics│ │ │ │ Worker │ │ Worker │ │ Worker │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ Each worker processes messages independently │ └─────────────────────────────────────────────────────────────────┘19.4 Message Patterns
Section titled “19.4 Message Patterns”19.4.1 Point-to-Point (P2P)
Section titled “19.4.1 Point-to-Point (P2P)” Point-to-Point Pattern =====================
One producer sends to one consumer.
Producer ──▶ [Queue: orders] ──▶ Consumer 1
─────────────────────────────────────────
Use Cases: • Order processing (one system handles each order) • Task distribution (one worker picks up each task) • Background jobs (single processor)
─────────────────────────────────────────
How it works:
Time → ┌────┐ ┌────┐ ┌────┐ ┌────┐ │msg1│────▶│msg1│ │msg1│ │ │ └────┘ └────┘ └────┘ └────┘ │ ▼ ┌────────┐ │Worker 1│ (processes msg1) └────────┘
┌────┐ ┌────┐ ┌────┐ ┌────┐ │msg2│────▶│msg2│────▶│msg2│ │ │ └────┘ └────┘ └────┘ └────┘
▼ ┌────────┐ │Worker 1│ (processes msg2) └────────┘
Each message processed once, by one consumer19.4.2 Publish-Subscribe (Pub/Sub)
Section titled “19.4.2 Publish-Subscribe (Pub/Sub)” Publish-Subscribe Pattern ========================
One producer, multiple independent consumers.
Producer ──▶ [Topic: user.signup] ──▶ Consumer 1 │ ├─────────────────────────▶ Consumer 2 │ └─────────────────────────▶ Consumer 3
─────────────────────────────────────────
Use Cases: • Event broadcasting (notify all interested services) • Microservices event-driven communication • Real-time updates (multiple components need same data)
─────────────────────────────────────────
How it works:
Producer publishes: UserSignedUp {userId: 123}
┌─────────────────────────────────────────────────────────┐ │ Topic: user.signup │ │ msg: UserSignedUp │ │ │ │ Subscribers (each gets copy): │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ SendWelcome│ │ CreateS3 │ │ Update │ │ │ │ Email │ │ Profile │ │ Analytics │ │ │ └────────────┘ └────────────┘ └────────────┘ │ └─────────────────────────────────────────────────────────┘Comparison
Section titled “Comparison”| Aspect | Point-to-Point | Publish-Subscribe |
|---|---|---|
| Message delivery | One consumer | All consumers |
| Consumer groups | Single consumer | Independent subscribers |
| Order | Preserved per queue | Best-effort |
| Use case | Task processing | Event broadcasting |
19.5 Popular Message Brokers
Section titled “19.5 Popular Message Brokers”Comparison Table
Section titled “Comparison Table”| Broker | Type | Throughput | Latency | Managed | Best For |
|---|---|---|---|---|---|
| RabbitMQ | Traditional | Medium | Low | Self-hosted | Flexible routing |
| Apache Kafka | Stream | Very High | Medium | Self/Managed | High-volume streams |
| AWS SQS | Queue | High | Low | ✓ AWS | Simple, serverless |
| AWS SNS | Pub/Sub | High | Low | ✓ AWS | Push notifications |
| Google Pub/Sub | Queue/Pub | High | Low | ✓ GCP | GCP integration |
| Redis Streams | Both | High | Very Low | Self/Managed | Low-latency |
19.5.1 RabbitMQ
Section titled “19.5.1 RabbitMQ” RabbitMQ Architecture ====================
Producer ──▶ Exchange ──▶ Queue ──▶ Consumer │ ├── Direct (exact match) ├── Topic (wildcards) ├── Fanout (broadcast) └── Headers (attribute-based)
─────────────────────────────────────────
Features: • Multiple message patterns • Flexible routing • Reliability (publisher confirms, consumer acks) • Plugins available# RabbitMQ Producer Exampleimport pika
connection = pika.BlockingConnection( pika.ConnectionParameters('localhost'))channel = connection.channel()
# Declare queuechannel.queue_declare(queue='orders')
# Publish messagechannel.basic_publish( exchange='', routing_key='orders', body='{"orderId": "123", "amount": 99.99}')
connection.close()19.5.2 Apache Kafka
Section titled “19.5.2 Apache Kafka” Kafka Architecture =================
┌─────────────────────────────────────────────────────────────┐ │ Topics (Partitioned) │ │ │ │ orders │ │ ┌─────────┬─────────┬─────────┐ │ │ │Part 0 │Part 1 │Part 2 │ │ │ │[0,1,2] │[3,4,5] │[6,7,8] │ │ │ └────┬────┘└────┬────┘└────┬────┘ │ │ │ │ │ │ │ └─────────┼─────────┘ │ │ ▼ │ │ Consumer Groups │ │ ┌─────────────┐ │ │ │ Group A │ (load balanced) │ │ │ Consumer 1 │ │ │ │ Consumer 2 │ │ │ └─────────────┘ │ └─────────────────────────────────────────────────────────────┘
Key Concepts: • Partitioned for parallelism • Offset management (at-least-once) • Log retention (days/weeks) • Replicated for durability# Kafka Producer Examplefrom kafka import KafkaProducerimport json
producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('orders', { 'orderId': '123', 'amount': 99.99, 'timestamp': 1699999999})
producer.flush()producer.close()19.5.3 AWS SQS
Section titled “19.5.3 AWS SQS” AWS SQS Features ================
Standard Queue: • Unlimited throughput • At-least-once delivery • Best-effort ordering • 14-day retention
FIFO Queue: • Exactly-once processing • First-in-first-out • 300 msg/sec limit • Deduplication
─────────────────────────────────────────
Visibility Timeout: ┌────────────────────────────────────────────────────┐ │ Message received by Worker 1 │ │ (Hidden for 30 seconds) │ │ │ │ │ │ Worker 1 processes (takes 10 sec) │ │ │ │ │ ▼ After 30 sec, message visible again │ │ If no delete, message reappears (retry!) │ └────────────────────────────────────────────────────┘
Dead Letter Queue: • Failed messages after maxReceiveCount • Inspect without blocking main queue# AWS SQS Example (boto3)import boto3
sqs = boto3.client('sqs')
# Send messageresponse = sqs.send_message( QueueUrl='https://sqs.us-east-1.amazonaws.com/123/orders', MessageBody='{"orderId": "123"}', DelaySeconds=0)
# Receive messageresponse = sqs.receive_message( QueueUrl='https://sqs.us-east-1.amazonaws.com/123/orders', MaxNumberOfMessages=1, WaitTimeSeconds=20)
message = response['Messages'][0]# Process message...# Delete when donesqs.delete_message( QueueUrl='...', ReceiptHandle=message['ReceiptHandle'])19.6 Message Types: Commands vs Events
Section titled “19.6 Message Types: Commands vs Events”Commands
Section titled “Commands” Command: "Do Something" ======================
Imperative - requesting an action to be performed
Examples: ─────────────────────────────────────────
CreateOrder { productId, quantity, address } SendEmail { to, subject, body } ProcessPayment { orderId, amount } ReserveInventory { productId, quantity } CancelSubscription { userId }
─────────────────────────────────────────
Characteristics: • Expects action to be taken • Usually one consumer • May return result • Can fail or succeed
─────────────────────────────────────────
Naming: Verb + Noun (imperative) CreateUser, UpdateProfile, DeleteOrderEvents
Section titled “Events” Event: "Something Happened" ==========================
Descriptive - notification that something occurred
Examples: ─────────────────────────────────────────
OrderCreated { orderId, timestamp, total } UserSignedUp { userId, email, referrer } PaymentProcessed { paymentId, status } InventoryReserved { productId, quantity }
─────────────────────────────────────────
Characteristics: • Past tense naming • Multiple consumers possible • No response expected • Immutable record
─────────────────────────────────────────
Naming: Past tense (event) OrderCreated, UserSignedUp, PaymentCompletedCommand vs Event Flow
Section titled “Command vs Event Flow” Command Flow Event Flow ============ ==========
API ──▶ Command ──▶ Handler Event Bus │ │ │ │ │ │ ▼ ├─▶ Consumer 1 │ │ ┌─────────┐ ├─▶ Consumer 2 │ │ │ Process │ ├─▶ Consumer 3 │ │ │ Action │ │ │ └─────────┘ │ │ Async, decoupled ▼ ▼ Response Multiple independent handlers (sync or async) No response expected19.7 Handling Failures
Section titled “19.7 Handling Failures”19.7.1 Dead Letter Queue (DLQ)
Section titled “19.7.1 Dead Letter Queue (DLQ)” Dead Letter Queue Pattern =========================
┌─────────────────────────────────────────────────────────────┐ │ Processing Flow │ │ │ │ ┌─────────┐ │ │ │ Message │ │ │ │Received │ │ │ └────┬────┘ │ │ │ │ │ ▼ │ │ ┌─────────┐ Success ┌──────────┐ │ │ │ Process │──────────────▶│ Complete │ │ │ │ Message │ │ │ │ │ └────┬────┘ └──────────┘ │ │ │ Fail │ │ ▼ │ │ ┌─────────┐ Fail again │ │ │ Retry? │──────────┐ │ │ │ count++ │ │ │ │ └────┬────┘ │ │ │ │ < 3 │ │ │ ▼ │ │ │ ┌─────────┐ │ │ │ │ Back to │ │ │ │ │ Queue │──────────┘ │ │ └─────────┘ │ │ │ │ │ │ >= 3 │ │ ▼ │ │ ┌─────────┐ │ │ │ DLQ │ (Manual intervention required) │ │ └─────────┘ │ └─────────────────────────────────────────────────────────────┘19.7.2 Retry Strategies
Section titled “19.7.2 Retry Strategies” Retry Strategies ================
1. Immediate Retry (Not recommended) ───────────────────────────────── Fail → Retry immediately → Fail again
Problem: Same error, wasted resources
2. Fixed Delay Retry ───────────────────────────────── Fail → Wait 1 sec → Retry → Fail → Wait 1 sec → Retry
Better, but can overwhelm downstream
3. Exponential Backoff ───────────────────────────────── Fail → Wait 1s → Retry Fail → Wait 2s → Retry Fail → Wait 4s → Retry Fail → Wait 8s → Retry ...
Best practice! Delays grow exponentially
4. Exponential Backoff with Jitter ───────────────────────────────── Random delay added to prevent thundering herd
Wait = min(cap, base * 2^attempt + random(0, jitter))# Exponential Backoff with Jitterimport randomimport time
def retry_with_backoff(max_retries=5, base_delay=1, cap=60, jitter=True): for attempt in range(max_retries): try: # Your logic here return do_work() except Exception as e: if attempt == max_retries - 1: raise
delay = min(cap, base_delay * (2 ** attempt)) if jitter: delay = delay * (0.5 + random.random()) # 50-150%
print(f"Retry {attempt + 1} in {delay:.2f}s") time.sleep(delay)19.7.3 Idempotency
Section titled “19.7.3 Idempotency” Idempotency: Handle Duplicate Messages =======================================
Why duplicates happen: • Producer retries after timeout • Consumer crashes before acknowledging • Queue redelivers "in-flight" messages
Solution: Make operations idempotent!
─────────────────────────────────────────
Non-Idempotent (BAD): ┌─────────────────────────────────────────┐ │ Message: IncrementCounter { counterId }│ │ │ │ Received twice: │ │ Counter = 10 → 11 → 12 (wrong!) │ └─────────────────────────────────────────┘
Idempotent (GOOD): ┌─────────────────────────────────────────┐ │ Message: SetCounter { counterId, value }│ │ │ │ Received twice: │ │ Counter = 10 → 11 → 11 (correct!) │ └─────────────────────────────────────────┘
─────────────────────────────────────────
Techniques: • Use unique message IDs (check if already processed) • Use database unique constraints • Store processed message IDs in cache# Idempotent Message Processingimport redis
r = redis.Redis()
def process_message(message): msg_id = message['id']
# Check if already processed if r.exists(f"processed:{msg_id}"): print(f"Skipping duplicate: {msg_id}") return
# Process the message do_work(message)
# Mark as processed (with TTL) r.setex(f"processed:{msg_id}", 86400, "1") # 24 hour TTL19.8 Message Queue Best Practices
Section titled “19.8 Message Queue Best Practices”Design Checklist
Section titled “Design Checklist” Message Queue Best Practices ============================
✓ Use meaningful message format (JSON/Protobuf) ✓ Include unique message ID ✓ Add timestamps ✓ Version your messages ✓ Keep message size small (<1MB) ✓ Implement idempotency ✓ Set appropriate timeouts ✓ Monitor queue depth ✓ Set up DLQ for failed messages ✓ Handle poison messages ✓ Consider message ordering if needed ✓ Test failure scenariosSecurity
Section titled “Security” Message Queue Security ======================
1. Encryption in transit (TLS/SSL) 2. Encryption at rest 3. Access control (IAM policies) 4. VPC/private subnets 5. Audit logging 6. Input validation 7. Sanitize message content
AWS SQS Example: ───────────────────────────────────────── • Use SSE (Server-Side Encryption) • IAM policies for queue access • VPC endpoints for private accessSummary
Section titled “Summary”- Message Queues - Async communication for decoupled services
- Point-to-Point - One consumer per message
- Pub/Sub - Multiple independent consumers
- Brokers - RabbitMQ, Kafka, SQS, SNS
- Commands vs Events - Do something vs Something happened
- Dead Letter Queue - Handle poison messages
- Exponential Backoff - Graceful retry strategy
- Idempotency - Handle duplicate messages safely