Traditional CRUD architectures struggle when read and write
requirements diverge. You need complex joins for reports while writes need simple transactions. You optimize reads
with denormalization, breaking write consistency. Or optimize writes with normalization, making reads painfully
slow. CQRS (Command Query Responsibility Segregation) solves this by separating reads and writes completely.
This guide covers production CQRS implementations that scale
from monoliths to distributed systems. We’ll build systems where reads and writes can evolve independently, each
optimized for its specific use case.
Why CQRS Transforms Complex Systems
The Problem with Traditional CRUD
Single-model architectures force compromises:
- Competing optimization goals: Normalized for writes vs denormalized for reads
- Query complexity: Joins across 10+ tables for simple reports
- Performance bottlenecks: Complex reads slow down simple writes
- Scalability limits: Can’t scale reads and writes independently
- Domain complexity: Business logic mixed with query logic
- Stale cache management: Invalidation is hard
CQRS Benefits
- Independent optimization: Optimize reads and writes separately
- Simplified queries: Pre-computed read models eliminate joins
- Better scalability: Scale read and write sides independently
- Clear boundaries: Commands change state, queries don’t
- Event sourcing synergy: Natural fit for event-driven architectures
- Performance: Read models can be cached aggressively
Pattern 1: Basic CQRS (Single Database)
Separate Read and Write Models
# commands.py - Write side
from dataclasses import dataclass
from datetime import datetime
@dataclass
class CreateOrderCommand:
customer_id: int
items: list[dict]
total_amount: float
@dataclass
class CancelOrderCommand:
order_id: int
reason: str
class OrderCommandHandler:
def __init__(self, db_session, event_bus):
self.db = db_session
self.event_bus = event_bus
def handle_create_order(self, command: CreateOrderCommand):
# Write to normalized tables
order = Order(
customer_id=command.customer_id,
status='pending',
total=command.total_amount,
created_at=datetime.now()
)
self.db.add(order)
for item in command.items:
order_item = OrderItem(
order_id=order.id,
product_id=item['product_id'],
quantity=item['quantity'],
price=item['price']
)
self.db.add(order_item)
self.db.commit()
# Publish event for read model update
self.event_bus.publish(OrderCreatedEvent(
order_id=order.id,
customer_id=command.customer_id,
total=command.total_amount,
created_at=order.created_at
))
return order.id
# queries.py - Read side
@dataclass
class OrderDetailsQuery:
order_id: int
@dataclass
class CustomerOrdersQuery:
customer_id: int
page: int = 1
page_size: int = 20
class OrderQueryHandler:
def __init__(self, read_db):
self.read_db = read_db
def get_order_details(self, query: OrderDetailsQuery):
# Query denormalized read model
return self.read_db.query("""
SELECT
order_id,
customer_name,
customer_email,
status,
total_amount,
item_count,
created_at,
items_json
FROM order_read_model
WHERE order_id = %s
""", (query.order_id,))
def get_customer_orders(self, query: CustomerOrdersQuery):
offset = (query.page - 1) * query.page_size
return self.read_db.query("""
SELECT
order_id,
status,
total_amount,
created_at
FROM order_read_model
WHERE customer_id = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""", (query.customer_id, query.page_size, offset))
Pattern 2: Event-Driven Read Model Updates
Projections from Domain Events
# events.py
from dataclasses import dataclass
from datetime import datetime
@dataclass
class OrderCreatedEvent:
order_id: int
customer_id: int
total: float
created_at: datetime
@dataclass
class OrderShippedEvent:
order_id: int
tracking_number: str
shipped_at: datetime
# read_model_projector.py
class OrderReadModelProjector:
"""Updates denormalized read models from events"""
def __init__(self, read_db):
self.read_db = read_db
def project_order_created(self, event: OrderCreatedEvent):
"""Create read model entry when order is created"""
# Fetch customer details for denormalization
customer = self.read_db.query(
"SELECT name, email FROM customers WHERE id = %s",
(event.customer_id,)
)[0]
# Insert into denormalized read model
self.read_db.execute("""
INSERT INTO order_read_model (
order_id,
customer_id,
customer_name,
customer_email,
status,
total_amount,
created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
event.order_id,
event.customer_id,
customer['name'],
customer['email'],
'pending',
event.total,
event.created_at
))
def project_order_shipped(self, event: OrderShippedEvent):
"""Update read model when order ships"""
self.read_db.execute("""
UPDATE order_read_model
SET status = 'shipped',
tracking_number = %s,
shipped_at = %s
WHERE order_id = %s
""", (event.tracking_number, event.shipped_at, event.order_id))
# event_handler.py
class EventHandler:
def __init__(self, projector: OrderReadModelProjector):
self.projector = projector
def handle(self, event):
if isinstance(event, OrderCreatedEvent):
self.projector.project_order_created(event)
elif isinstance(event, OrderShippedEvent):
self.projector.project_order_shipped(event)
# ... handle other events
Pattern 3: CQRS with Separate Databases
Read and Write Database Separation
# config.py
DATABASE_CONFIG = {
'write': {
'host': 'postgres-primary.example.com',
'database': 'orders_write',
'user': 'write_user',
'pool_size': 10 # Smaller pool for writes
},
'read': {
'host': 'postgres-replica.example.com', # Read replica
'database': 'orders_read',
'user': 'read_user',
'pool_size': 50 # Larger pool for reads
}
}
# dual_database_handler.py
class OrderService:
def __init__(self, write_db, read_db, event_bus):
self.write_db = write_db
self.read_db = read_db
self.event_bus = event_bus
# Commands go to write database
def create_order(self, command: CreateOrderCommand):
with self.write_db.transaction() as tx:
order = self._persist_order(tx, command)
# Publish event asynchronously
self.event_bus.publish(OrderCreatedEvent(
order_id=order.id,
customer_id=command.customer_id,
total=command.total_amount,
created_at=order.created_at
))
tx.commit()
return order.id
# Queries go to read database
def get_order(self, order_id: int):
# Read from optimized read model
return self.read_db.query_one(
"SELECT * FROM order_summary_view WHERE order_id = %s",
(order_id,)
)
def get_customer_orders(self, customer_id: int):
# Complex aggregations pre-computed in read model
return self.read_db.query(
"SELECT * FROM customer_order_summary WHERE customer_id = %s",
(customer_id,)
)
Pattern 4: Async Read Model Updates
Message Queue for Eventual Consistency
# Using RabbitMQ/Redis for async updates
import pika
import json
class EventPublisher:
def __init__(self, rabbitmq_url):
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
self.channel.queue_declare(queue='domain_events', durable=True)
def publish(self, event):
message = json.dumps({
'event_type': type(event).__name__,
'data': event.__dict__,
'timestamp': datetime.now().isoformat()
})
self.channel.basic_publish(
exchange='',
routing_key='domain_events',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
)
)
# event_consumer.py
class EventConsumer:
def __init__(self, rabbitmq_url, projector):
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
self.projector = projector
def start_consuming(self):
self.channel.basic_qos(prefetch_count=10)
self.channel.basic_consume(
queue='domain_events',
on_message_callback=self.on_message
)
print("Started consuming events...")
self.channel.start_consuming()
def on_message(self, ch, method, properties, body):
try:
message = json.loads(body)
event = self._deserialize_event(message)
# Update read model
self.projector.handle(event)
# Acknowledge
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error processing event: {e}")
# Reject and requeue
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
Pattern 5: Multiple Read Models
Purpose-Built Views
# Different read models for different use cases
class MultipleReadModelProjector:
def __init__(self, postgres_db, redis_client, elasticsearch_client):
self.postgres = postgres_db
self.redis = redis_client
self.elasticsearch = elasticsearch_client
def project_order_created(self, event: OrderCreatedEvent):
# 1. PostgreSQL: Detailed relational read model
self.postgres.execute("""
INSERT INTO order_details_view (...)
VALUES (...)
""")
# 2. Redis: Fast lookup cache
order_key = f"order:{event.order_id}"
self.redis.hset(order_key, mapping={
'customer_id': event.customer_id,
'total': event.total,
'status': 'pending'
})
self.redis.expire(order_key, 3600) # 1 hour TTL
# 3. Elasticsearch: Full-text search
self.elasticsearch.index(
index='orders',
id=event.order_id,
body={
'order_id': event.order_id,
'customer_id': event.customer_id,
'total': event.total,
'created_at': event.created_at.isoformat(),
'status': 'pending'
}
)
# Query handlers use appropriate read model
class OrderQueries:
def get_order_details(self, order_id):
# Use PostgreSQL for detailed view
return self.postgres.query_one(
"SELECT * FROM order_details_view WHERE order_id = %s",
(order_id,)
)
def get_order_quick(self, order_id):
# Use Redis for fast lookup
return self.redis.hgetall(f"order:{order_id}")
def search_orders(self, query):
# Use Elasticsearch for search
return self.elasticsearch.search(
index='orders',
body={'query': {'query_string': {'query': query}}}
)
Pattern 6: Handling Eventual Consistency
Read-Your-Writes Consistency
class OrderService:
def __init__(self, write_db, read_db, cache):
self.write_db = write_db
self.read_db = read_db
self.cache = cache
def create_order(self, command: CreateOrderCommand):
# Write to database
order_id = self._write_order(command)
# Cache write for immediate read consistency
cache_key = f"order:{order_id}"
self.cache.set(cache_key, {
'order_id': order_id,
'customer_id': command.customer_id,
'status': 'pending',
'total': command.total_amount,
'_source': 'write_cache' # Flag as write cache
}, ttl=60) # Short TTL
# Publish event for eventual read model update
self.event_bus.publish(OrderCreatedEvent(...))
return order_id
def get_order(self, order_id: int):
# Check write cache first (read-your-writes)
cache_key = f"order:{order_id}"
cached = self.cache.get(cache_key)
if cached:
return cached
# Fall back to read model
return self.read_db.query_one(
"SELECT * FROM order_read_model WHERE order_id = %s",
(order_id,)
)
Pattern 7: CQRS in Microservices
Service-Level Separation
# order_command_service.py
from fastapi import FastAPI, HTTPException
app = FastAPI(title="Order Command Service")
@app.post("/orders")
async def create_order(command: CreateOrderCommand):
try:
order_id = command_handler.handle_create_order(command)
return {"order_id": order_id}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/orders/{order_id}/cancel")
async def cancel_order(order_id: int, command: CancelOrderCommand):
command_handler.handle_cancel_order(command)
return {"status": "cancelled"}
# order_query_service.py
from fastapi import FastAPI
app = FastAPI(title="Order Query Service")
@app.get("/orders/{order_id}")
async def get_order(order_id: int):
order = query_handler.get_order_details(
OrderDetailsQuery(order_id=order_id)
)
if not order:
raise HTTPException(status_code=404)
return order
@app.get("/customers/{customer_id}/orders")
async def get_customer_orders(
customer_id: int,
page: int = 1,
page_size: int = 20
):
return query_handler.get_customer_orders(
CustomerOrdersQuery(
customer_id=customer_id,
page=page,
page_size=page_size
)
)
Real-World Example: E-commerce Order System
# Complete CQRS implementation
class EcommerceOrderSystem:
def __init__(self):
# Write side
self.write_db = PostgresConnection(WRITE_DB_CONFIG)
self.command_handler = OrderCommandHandler(self.write_db)
# Read side
self.read_db = PostgresConnection(READ_DB_CONFIG)
self.query_handler = OrderQueryHandler(self.read_db)
# Infrastructure
self.event_bus = RabbitMQEventBus()
self.projector = OrderReadModelProjector(self.read_db)
# Start event consumer in background
self.event_consumer = EventConsumer(
self.event_bus,
self.projector
)
self.event_consumer.start()
# Commands (write operations)
def place_order(self, customer_id, items):
command = CreateOrderCommand(
customer_id=customer_id,
items=items,
total_amount=sum(item['price'] * item['qty'] for item in items)
)
return self.command_handler.handle(command)
def ship_order(self, order_id, tracking_number):
command = ShipOrderCommand(
order_id=order_id,
tracking_number=tracking_number
)
self.command_handler.handle(command)
# Queries (read operations)
def get_order_summary(self, order_id):
return self.query_handler.get_order_summary(order_id)
def get_order_history(self, customer_id):
return self.query_handler.get_customer_order_history(customer_id)
def get_sales_dashboard(self, date_range):
return self.query_handler.get_sales_analytics(date_range)
Best Practices
- Start simple: Begin with single database, separate models
- Commands are imperative: CreateOrder, CancelOrder (actions)
- Queries are descriptive: GetOrderDetails, GetCustomerOrders (questions)
- Immutable events: Events represent facts, never modify them
- Idempotent projections: Handle duplicate events gracefully
- Monitor lag: Track time between write and read model update
- Version events: Plan for event schema evolution
Common Pitfalls
- Over-engineering: Don’t use CQRS for simple CRUD apps
- Ignoring consistency: Plan for eventual consistency edge cases
- Complex event versioning: Events are forever, plan schema carefully
- No monitoring: Must monitor event processing lag
- Synchronous projections: Update read models asynchronously
- Forgetting idempotency: Events may be delivered multiple times
When to Use CQRS
✅ Use CQRS when:
- Read and write patterns are dramatically different
- Need to scale reads and writes independently
- Complex domain with event sourcing
- Multiple read models for different use cases
- High read-to-write ratio (10:1 or more)
❌ Avoid CQRS when:
- Simple CRUD application
- Team lacks distributed systems experience
- Strong consistency is required
- Read and write patterns are similar
Key Takeaways
- CQRS separates reads and writes for independent optimization
- Start with single database, separate models before going distributed
- Use events to update read models asynchronously
- Multiple read models serve different query needs
- Eventual consistency is a feature, not a bug—plan for it
- Commands change state, queries never do—enforce strictly
- Monitor event processing lag in production
- CQRS + Event Sourcing is powerful but adds complexity
CQRS is not for every system, but when your reads and writes have different needs, it’s transformative. By
separating concerns, you gain the freedom to optimize each side independently, resulting in systems that are
faster, more scalable, and easier to reason about.
Discover more from C4: Container, Code, Cloud & Context
Subscribe to get the latest posts sent to your email.