Tips and Tricks – Use CQRS for Complex Domain Logic

Traditional CRUD architectures struggle when read and write
requirements diverge. You need complex joins for reports while writes need simple transactions. You optimize reads
with denormalization, breaking write consistency. Or optimize writes with normalization, making reads painfully
slow. CQRS (Command Query Responsibility Segregation) solves this by separating reads and writes completely.

This guide covers production CQRS implementations that scale
from monoliths to distributed systems. We’ll build systems where reads and writes can evolve independently, each
optimized for its specific use case.

Why CQRS Transforms Complex Systems

The Problem with Traditional CRUD

Single-model architectures force compromises:

  • Competing optimization goals: Normalized for writes vs denormalized for reads
  • Query complexity: Joins across 10+ tables for simple reports
  • Performance bottlenecks: Complex reads slow down simple writes
  • Scalability limits: Can’t scale reads and writes independently
  • Domain complexity: Business logic mixed with query logic
  • Stale cache management: Invalidation is hard

CQRS Benefits

  • Independent optimization: Optimize reads and writes separately
  • Simplified queries: Pre-computed read models eliminate joins
  • Better scalability: Scale read and write sides independently
  • Clear boundaries: Commands change state, queries don’t
  • Event sourcing synergy: Natural fit for event-driven architectures
  • Performance: Read models can be cached aggressively

Pattern 1: Basic CQRS (Single Database)

Separate Read and Write Models

# commands.py - Write side
from dataclasses import dataclass
from datetime import datetime

@dataclass
class CreateOrderCommand:
    customer_id: int
    items: list[dict]
    total_amount: float

@dataclass
class CancelOrderCommand:
    order_id: int
    reason: str

class OrderCommandHandler:
    def __init__(self, db_session, event_bus):
        self.db = db_session
        self.event_bus = event_bus
    
    def handle_create_order(self, command: CreateOrderCommand):
        # Write to normalized tables
        order = Order(
            customer_id=command.customer_id,
            status='pending',
            total=command.total_amount,
            created_at=datetime.now()
        )
        self.db.add(order)
        
        for item in command.items:
            order_item = OrderItem(
                order_id=order.id,
                product_id=item['product_id'],
                quantity=item['quantity'],
                price=item['price']
            )
            self.db.add(order_item)
        
        self.db.commit()
        
        # Publish event for read model update
        self.event_bus.publish(OrderCreatedEvent(
            order_id=order.id,
            customer_id=command.customer_id,
            total=command.total_amount,
            created_at=order.created_at
        ))
        
        return order.id

# queries.py - Read side
@dataclass
class OrderDetailsQuery:
    order_id: int

@dataclass
class CustomerOrdersQuery:
    customer_id: int
    page: int = 1
    page_size: int = 20

class OrderQueryHandler:
    def __init__(self, read_db):
        self.read_db = read_db
    
    def get_order_details(self, query: OrderDetailsQuery):
        # Query denormalized read model
        return self.read_db.query("""
            SELECT 
                order_id,
                customer_name,
                customer_email,
                status,
                total_amount,
                item_count,
                created_at,
                items_json
            FROM order_read_model
            WHERE order_id = %s
        """, (query.order_id,))
    
    def get_customer_orders(self, query: CustomerOrdersQuery):
        offset = (query.page - 1) * query.page_size
        return self.read_db.query("""
            SELECT 
                order_id,
                status,
                total_amount,
                created_at
            FROM order_read_model
            WHERE customer_id = %s
            ORDER BY created_at DESC
            LIMIT %s OFFSET %s
        """, (query.customer_id, query.page_size, offset))

Pattern 2: Event-Driven Read Model Updates

Projections from Domain Events

# events.py
from dataclasses import dataclass
from datetime import datetime

@dataclass
class OrderCreatedEvent:
    order_id: int
    customer_id: int
    total: float
    created_at: datetime

@dataclass
class OrderShippedEvent:
    order_id: int
    tracking_number: str
    shipped_at: datetime

# read_model_projector.py
class OrderReadModelProjector:
    """Updates denormalized read models from events"""
    
    def __init__(self, read_db):
        self.read_db = read_db
    
    def project_order_created(self, event: OrderCreatedEvent):
        """Create read model entry when order is created"""
        # Fetch customer details for denormalization
        customer = self.read_db.query(
            "SELECT name, email FROM customers WHERE id = %s",
            (event.customer_id,)
        )[0]
        
        # Insert into denormalized read model
        self.read_db.execute("""
            INSERT INTO order_read_model (
                order_id, 
                customer_id,
                customer_name,
                customer_email,
                status,
                total_amount,
                created_at
            ) VALUES (%s, %s, %s, %s, %s, %s, %s)
        """, (
            event.order_id,
            event.customer_id,
            customer['name'],
            customer['email'],
            'pending',
            event.total,
            event.created_at
        ))
    
    def project_order_shipped(self, event: OrderShippedEvent):
        """Update read model when order ships"""
        self.read_db.execute("""
            UPDATE order_read_model
            SET status = 'shipped',
                tracking_number = %s,
                shipped_at = %s
            WHERE order_id = %s
        """, (event.tracking_number, event.shipped_at, event.order_id))

# event_handler.py
class EventHandler:
    def __init__(self, projector: OrderReadModelProjector):
        self.projector = projector
    
    def handle(self, event):
        if isinstance(event, OrderCreatedEvent):
            self.projector.project_order_created(event)
        elif isinstance(event, OrderShippedEvent):
            self.projector.project_order_shipped(event)
        # ... handle other events

Pattern 3: CQRS with Separate Databases

Read and Write Database Separation

# config.py
DATABASE_CONFIG = {
    'write': {
        'host': 'postgres-primary.example.com',
        'database': 'orders_write',
        'user': 'write_user',
        'pool_size': 10  # Smaller pool for writes
    },
    'read': {
        'host': 'postgres-replica.example.com',  # Read replica
        'database': 'orders_read',
        'user': 'read_user',
        'pool_size': 50  # Larger pool for reads
    }
}

# dual_database_handler.py
class OrderService:
    def __init__(self, write_db, read_db, event_bus):
        self.write_db = write_db
        self.read_db = read_db
        self.event_bus = event_bus
    
    # Commands go to write database
    def create_order(self, command: CreateOrderCommand):
        with self.write_db.transaction() as tx:
            order = self._persist_order(tx, command)
            
            # Publish event asynchronously
            self.event_bus.publish(OrderCreatedEvent(
                order_id=order.id,
                customer_id=command.customer_id,
                total=command.total_amount,
                created_at=order.created_at
            ))
            
            tx.commit()
            return order.id
    
    # Queries go to read database
    def get_order(self, order_id: int):
        # Read from optimized read model
        return self.read_db.query_one(
            "SELECT * FROM order_summary_view WHERE order_id = %s",
            (order_id,)
        )
    
    def get_customer_orders(self, customer_id: int):
        # Complex aggregations pre-computed in read model
        return self.read_db.query(
            "SELECT * FROM customer_order_summary WHERE customer_id = %s",
            (customer_id,)
        )

Pattern 4: Async Read Model Updates

Message Queue for Eventual Consistency

# Using RabbitMQ/Redis for async updates
import pika
import json

class EventPublisher:
    def __init__(self, rabbitmq_url):
        self.connection = pika.BlockingConnection(
            pika.URLParameters(rabbitmq_url)
        )
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='domain_events', durable=True)
    
    def publish(self, event):
        message = json.dumps({
            'event_type': type(event).__name__,
            'data': event.__dict__,
            'timestamp': datetime.now().isoformat()
        })
        
        self.channel.basic_publish(
            exchange='',
            routing_key='domain_events',
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2,  # Persistent
            )
        )

# event_consumer.py
class EventConsumer:
    def __init__(self, rabbitmq_url, projector):
        self.connection = pika.BlockingConnection(
            pika.URLParameters(rabbitmq_url)
        )
        self.channel = self.connection.channel()
        self.projector = projector
    
    def start_consuming(self):
        self.channel.basic_qos(prefetch_count=10)
        self.channel.basic_consume(
            queue='domain_events',
            on_message_callback=self.on_message
        )
        
        print("Started consuming events...")
        self.channel.start_consuming()
    
    def on_message(self, ch, method, properties, body):
        try:
            message = json.loads(body)
            event = self._deserialize_event(message)
            
            # Update read model
            self.projector.handle(event)
            
            # Acknowledge
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            print(f"Error processing event: {e}")
            # Reject and requeue
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

Pattern 5: Multiple Read Models

Purpose-Built Views

# Different read models for different use cases
class MultipleReadModelProjector:
    def __init__(self, postgres_db, redis_client, elasticsearch_client):
        self.postgres = postgres_db
        self.redis = redis_client
        self.elasticsearch = elasticsearch_client
    
    def project_order_created(self, event: OrderCreatedEvent):
        # 1. PostgreSQL: Detailed relational read model
        self.postgres.execute("""
            INSERT INTO order_details_view (...)
            VALUES (...)
        """)
        
        # 2. Redis: Fast lookup cache
        order_key = f"order:{event.order_id}"
        self.redis.hset(order_key, mapping={
            'customer_id': event.customer_id,
            'total': event.total,
            'status': 'pending'
        })
        self.redis.expire(order_key, 3600)  # 1 hour TTL
        
        # 3. Elasticsearch: Full-text search
        self.elasticsearch.index(
            index='orders',
            id=event.order_id,
            body={
                'order_id': event.order_id,
                'customer_id': event.customer_id,
                'total': event.total,
                'created_at': event.created_at.isoformat(),
                'status': 'pending'
            }
        )

# Query handlers use appropriate read model
class OrderQueries:
    def get_order_details(self, order_id):
        # Use PostgreSQL for detailed view
        return self.postgres.query_one(
            "SELECT * FROM order_details_view WHERE order_id = %s",
            (order_id,)
        )
    
    def get_order_quick(self, order_id):
        # Use Redis for fast lookup
        return self.redis.hgetall(f"order:{order_id}")
    
    def search_orders(self, query):
        # Use Elasticsearch for search
        return self.elasticsearch.search(
            index='orders',
            body={'query': {'query_string': {'query': query}}}
        )

Pattern 6: Handling Eventual Consistency

Read-Your-Writes Consistency

class OrderService:
    def __init__(self, write_db, read_db, cache):
        self.write_db = write_db
        self.read_db = read_db
        self.cache = cache
    
    def create_order(self, command: CreateOrderCommand):
        # Write to database
        order_id = self._write_order(command)
        
        # Cache write for immediate read consistency
        cache_key = f"order:{order_id}"
        self.cache.set(cache_key, {
            'order_id': order_id,
            'customer_id': command.customer_id,
            'status': 'pending',
            'total': command.total_amount,
            '_source': 'write_cache'  # Flag as write cache
        }, ttl=60)  # Short TTL
        
        # Publish event for eventual read model update
        self.event_bus.publish(OrderCreatedEvent(...))
        
        return order_id
    
    def get_order(self, order_id: int):
        # Check write cache first (read-your-writes)
        cache_key = f"order:{order_id}"
        cached = self.cache.get(cache_key)
        if cached:
            return cached
        
        # Fall back to read model
        return self.read_db.query_one(
            "SELECT * FROM order_read_model WHERE order_id = %s",
            (order_id,)
        )

Pattern 7: CQRS in Microservices

Service-Level Separation

# order_command_service.py
from fastapi import FastAPI, HTTPException

app = FastAPI(title="Order Command Service")

@app.post("/orders")
async def create_order(command: CreateOrderCommand):
    try:
        order_id = command_handler.handle_create_order(command)
        return {"order_id": order_id}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/orders/{order_id}/cancel")
async def cancel_order(order_id: int, command: CancelOrderCommand):
    command_handler.handle_cancel_order(command)
    return {"status": "cancelled"}

# order_query_service.py
from fastapi import FastAPI

app = FastAPI(title="Order Query Service")

@app.get("/orders/{order_id}")
async def get_order(order_id: int):
    order = query_handler.get_order_details(
        OrderDetailsQuery(order_id=order_id)
    )
    if not order:
        raise HTTPException(status_code=404)
    return order

@app.get("/customers/{customer_id}/orders")
async def get_customer_orders(
    customer_id: int,
    page: int = 1,
    page_size: int = 20
):
    return query_handler.get_customer_orders(
        CustomerOrdersQuery(
            customer_id=customer_id,
            page=page,
            page_size=page_size
        )
    )

Real-World Example: E-commerce Order System

# Complete CQRS implementation
class EcommerceOrderSystem:
    def __init__(self):
        # Write side
        self.write_db = PostgresConnection(WRITE_DB_CONFIG)
        self.command_handler = OrderCommandHandler(self.write_db)
        
        # Read side
        self.read_db = PostgresConnection(READ_DB_CONFIG)
        self.query_handler = OrderQueryHandler(self.read_db)
        
        # Infrastructure
        self.event_bus = RabbitMQEventBus()
        self.projector = OrderReadModelProjector(self.read_db)
        
        # Start event consumer in background
        self.event_consumer = EventConsumer(
            self.event_bus,
            self.projector
        )
        self.event_consumer.start()
    
    # Commands (write operations)
    def place_order(self, customer_id, items):
        command = CreateOrderCommand(
            customer_id=customer_id,
            items=items,
            total_amount=sum(item['price'] * item['qty'] for item in items)
        )
        return self.command_handler.handle(command)
    
    def ship_order(self, order_id, tracking_number):
        command = ShipOrderCommand(
            order_id=order_id,
            tracking_number=tracking_number
        )
        self.command_handler.handle(command)
    
    # Queries (read operations)
    def get_order_summary(self, order_id):
        return self.query_handler.get_order_summary(order_id)
    
    def get_order_history(self, customer_id):
        return self.query_handler.get_customer_order_history(customer_id)
    
    def get_sales_dashboard(self, date_range):
        return self.query_handler.get_sales_analytics(date_range)

Best Practices

  • Start simple: Begin with single database, separate models
  • Commands are imperative: CreateOrder, CancelOrder (actions)
  • Queries are descriptive: GetOrderDetails, GetCustomerOrders (questions)
  • Immutable events: Events represent facts, never modify them
  • Idempotent projections: Handle duplicate events gracefully
  • Monitor lag: Track time between write and read model update
  • Version events: Plan for event schema evolution

Common Pitfalls

  • Over-engineering: Don’t use CQRS for simple CRUD apps
  • Ignoring consistency: Plan for eventual consistency edge cases
  • Complex event versioning: Events are forever, plan schema carefully
  • No monitoring: Must monitor event processing lag
  • Synchronous projections: Update read models asynchronously
  • Forgetting idempotency: Events may be delivered multiple times

When to Use CQRS

✅ Use CQRS when:

  • Read and write patterns are dramatically different
  • Need to scale reads and writes independently
  • Complex domain with event sourcing
  • Multiple read models for different use cases
  • High read-to-write ratio (10:1 or more)

❌ Avoid CQRS when:

  • Simple CRUD application
  • Team lacks distributed systems experience
  • Strong consistency is required
  • Read and write patterns are similar

Key Takeaways

  • CQRS separates reads and writes for independent optimization
  • Start with single database, separate models before going distributed
  • Use events to update read models asynchronously
  • Multiple read models serve different query needs
  • Eventual consistency is a feature, not a bug—plan for it
  • Commands change state, queries never do—enforce strictly
  • Monitor event processing lag in production
  • CQRS + Event Sourcing is powerful but adds complexity

CQRS is not for every system, but when your reads and writes have different needs, it’s transformative. By
separating concerns, you gain the freedom to optimize each side independently, resulting in systems that are
faster, more scalable, and easier to reason about.


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.