Tips and Tricks – Use Generators for Memory-Efficient Data Processing

After processing terabytes of data with Python, I’ve learned that generators are the difference between code that works and code that crashes with OutOfMemory. This guide shows how to use Python generators for memory-efficient data processing in production.

1. The Memory Problem

Without generators, loading large datasets crashes:

# BAD: Loads entire file into memory (crashes on 10GB file)
def read_large_file(filename):
    with open(filename) as f:
        lines = f.readlines()  # Loads ALL lines into RAM
    return [process(line) for line in lines]

# GOOD: Generator processes one line at a time
def read_large_file_generator(filename):
    with open(filename) as f:
        for line in f:  # Yields one line at a time
            yield process(line)

2. Generator Basics

# Function that returns a generator
def count_up_to(n):
    count = 1
    while count <= n:
        yield count
        count += 1

# Generator expression (like list comprehension)
squares = (x**2 for x in range(1000000))  # Lazy evaluation

# Using generators
for num in count_up_to(5):
    print(num)  # Prints 1, 2, 3, 4, 5

# Memory usage: O(1) vs O(n) for lists

3. Production Patterns

3.1 Processing Large CSV Files

# Process 100GB CSV without loading into memory
import csv

def process_large_csv(filename):
    with open(filename, 'r') as f:
        reader = csv.DictReader(f)
        for row in reader:
            # Process one row at a time
            if row['amount'] > 1000:
                yield {
                    'transaction_id': row['id'],
                    'amount': float(row['amount']),
                    'timestamp': row['timestamp']
                }

# Use in pipeline
for transaction in process_large_csv('transactions.csv'):
    save_to_database(transaction)

3.2 Batch Processing with Generators

# Process data in batches
def batch_generator(iterable, batch_size=1000):
    batch = []
    for item in iterable:
        batch.append(item)
        if len(batch) == batch_size:
            yield batch
            batch = []
    
    # Yield remaining items
    if batch:
        yield batch

# Usage: Process millions of records in batches
for batch in batch_generator(process_large_csv('data.csv'), batch_size=10000):
    bulk_insert_to_db(batch)  # Insert 10K records at a time

3.3 Pipeline Pattern

# Chain generators for data pipeline
def read_logs(filename):
    with open(filename) as f:
        for line in f:
            yield line.strip()

def parse_logs(lines):
    for line in lines:
        if 'ERROR' in line:
            yield {'level': 'ERROR', 'message': line}

def filter_recent(logs, hours=24):
    from datetime import datetime, timedelta
    cutoff = datetime.now() - timedelta(hours=hours)
    
    for log in logs:
        if log['timestamp'] > cutoff:
            yield log

# Compose pipeline (memory efficient!)
logs = read_logs('app.log')
errors = parse_logs(logs)
recent_errors = filter_recent(errors, hours=1)

for error in recent_errors:
    alert(error)  # Process one at a time

4. Real-World Example: ETL Pipeline

# Memory-efficient ETL with generators
def extract_from_api(url, page_size=100):
    page = 1
    while True:
        response = requests.get(url, params={'page': page, 'size': page_size})
        data = response.json()
        
        if not data:
            break
        
        for record in data:
            yield record
        
        page += 1

def transform_record(records):
    for record in records:
        yield {
            'id': record['user_id'],
            'name': f"{record['first_name']} {record['last_name']}",
            'email': record['email'].lower(),
            'created_at': parse_timestamp(record['created'])
        }

def load_to_db(records, batch_size=1000):
    batch = []
    for record in records:
        batch.append(record)
        if len(batch) >= batch_size:
            db.bulk_insert(batch)
            batch = []
    
    if batch:
        db.bulk_insert(batch)

# Execute ETL pipeline
records = extract_from_api('https://api.example.com/users')
transformed = transform_record(records)
load_to_db(transformed)

# Processes millions of records with constant memory usage

5. Performance Comparison

# Benchmark: Process 10M records
import time

# List comprehension (loads all into memory)
start = time.time()
data = [process(x) for x in range(10_000_000)]  # 8GB RAM usage
result = sum(data)
print(f"List: {time.time() - start:.2f}s, {len(data)} items")

# Generator (constant memory)
start = time.time()
data = (process(x) for x in range(10_000_000))  # 100MB RAM usage
result = sum(data)
print(f"Generator: {time.time() - start:.2f}s")

# Generator is slightly slower but uses 98% less memory

6. Best Practices

  • Use for one-time iteration: Generators can’t be reused
  • Chain generators: Build data pipelines
  • itertools module: Powerful generator utilities
  • Generator expressions: More readable than functions for simple cases
  • Don’t convert to list: Defeats the purpose (list(generator))

7. Conclusion

Python generators enable processing unlimited data with constant memory. Essential for big data pipelines, log processing, and any scenario where data doesn’t fit in RAM.

Written for data engineers and Python developers building scalable data processing systems.


Discover more from C4: Container, Code, Cloud & Context

Subscribe to get the latest posts sent to your email.

Leave a comment

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.