Select a stage from the left rail.
AI Engineering · Data & Databases · Updated May 2025
12 stages covering relational databases, vector stores, caching, pipelines, analytical engines, and data quality — the complete storage layer behind production AI systems.
What this section covers
Select a stage from the left rail.
Atomicity, Consistency, Isolation, Durability; READ COMMITTED vs REPEATABLE READ vs SERIALIZABLE; open transactions blocking autovacuum.
ACID is the foundation of relational correctness. Atomicity: every statement in a transaction commits or none does — a partially written model evaluation result never appears. Consistency: constraints and triggers are always enforced, so a chunk referencing a deleted document will fail the FK check, not silently insert. Isolation: concurrent transactions cannot see each other's uncommitted writes by default (READ COMMITTED, the Postgres default). Durability: a committed row survives a crash because the WAL is flushed to disk before the commit returns. PostgreSQL offers four isolation levels. READ COMMITTED is the default and right for most AI logging pipelines — it prevents dirty reads and gives the best concurrency. REPEATABLE READ prevents non-repeatable reads within a transaction, useful when a batch scoring job must read a consistent snapshot across many queries. SERIALIZABLE additionally detects write conflicts and rolls back one of the conflicting transactions — use it for experiment slot reservation or credit deduction where a phantom row would break correctness. For most model inference logging, READ COMMITTED is the correct choice. Always commit or rollback explicitly; an idle-in-transaction connection blocks autovacuum on every table it has touched.
B-tree for equality/range/sort; GIN for JSONB/full-text/arrays; BRIN for append-only time-series; partial and covering indexes.
Index selection is the highest-leverage query optimisation in PostgreSQL. B-tree is the default and covers equality, range, ORDER BY, BETWEEN, LIKE 'prefix%', and NULL checks. Add a B-tree index on every foreign key column — PostgreSQL does not create FK indexes automatically, and a missing FK index turns JOINs into sequential scans. GIN (Generalised Inverted Index) is designed for values containing multiple searchable components: JSONB keys and arrays, full-text tsvectors, and pg_trgm trigrams. A GIN index on a jsonb metadata column turns operators like @> (containment) and ? (key existence) from full sequential scans into millisecond lookups. BRIN (Block Range Index) stores only the min/max value per range of disk pages — the index itself can be kilobytes on a billion-row table. It works well only when column values are physically correlated with storage order, such as an auto-increment ID or a timestamp column on an append-only log table. Partial indexes carry a WHERE clause, reducing index size and write overhead. A partial index on deleted_at IS NULL means only active chunks are indexed. Covering indexes with INCLUDE add non-key columns so the engine can satisfy a query entirely from the index without touching the heap (Index Only Scan).
JSONB storage and GIN indexing for dynamic metadata; PgBouncer transaction mode for AI API fan-out; connection limits in serverless environments.
JSONB is the correct column type for dynamic, semi-structured data: LLM response metadata, model configuration snapshots, per-tenant feature flags, or A/B test assignments. Unlike the text json type, JSONB parses on write and stores a binary format that supports indexing and efficient key access. Key operators: ->> extracts a text value by key, #>> extracts at a JSON path, @> tests containment (the row's column contains this sub-document), ? checks key existence. A GIN index on the JSONB column enables @> and ? to run in milliseconds. Connection pooling is non-negotiable for AI services. An LLM inference handler that spawns 200 concurrent threads will attempt 200 Postgres connections simultaneously. The default max_connections is 100–200. PgBouncer in transaction mode is the standard solution: the backend connection is released back to the pool immediately after each transaction commit, allowing 1000 application connections to share 20 backend connections. Session mode is needed only for session-scoped features like SET LOCAL, advisory locks, LISTEN/NOTIFY, or server-side prepared statements. For AWS Lambda and serverless functions, use RDS Proxy as a managed PgBouncer equivalent.
Every slow or wrong query in a data-intensive AI system traces back to a missing index, stale statistics, or the wrong isolation level. Getting these right once prevents the class of incidents that only surface in production.
import psycopg2
from contextlib import contextmanager
@contextmanager
def transaction(conn):
try:
yield conn.cursor()
conn.commit()
except Exception:
conn.rollback()
raise
conn = psycopg2.connect("postgresql://app:pass@pgbouncer:5432/aidb")
# Atomic: eval_run + scores committed together or not at all
with transaction(conn) as cur:
cur.execute(
"INSERT INTO eval_runs (run_id, model_id, started_at) VALUES (%s, %s, NOW())",
(run_id, model_id),
)
cur.execute(
"INSERT INTO eval_scores (run_id, metric, value) VALUES (%s, %s, %s)",
(run_id, "f1", 0.847),
)
# Use SERIALIZABLE only for write-conflict-sensitive operations
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
with transaction(conn) as cur:
cur.execute("SELECT slots_remaining FROM experiment_slots WHERE exp_id=%s FOR UPDATE", (exp_id,))
row = cur.fetchone()
if row and row[0] > 0:
cur.execute("UPDATE experiment_slots SET slots_remaining=slots_remaining-1 WHERE exp_id=%s", (exp_id,)) -- B-tree: equality + range + FK
CREATE INDEX idx_chunks_doc_id ON chunks(document_id);
CREATE INDEX idx_chunks_created ON chunks(created_at DESC);
-- GIN: JSONB containment + full-text search
CREATE INDEX idx_chunks_meta ON chunks USING GIN (metadata);
CREATE INDEX idx_docs_fts ON documents USING GIN (to_tsvector('english', content));
-- BRIN: append-only inference event log
CREATE INDEX idx_events_brin ON inference_events USING BRIN (occurred_at)
WITH (pages_per_range = 128);
-- Partial: index only active (non-deleted) chunks
CREATE INDEX idx_chunks_active ON chunks(document_id, embedding_model)
WHERE deleted_at IS NULL;
-- Covering: index-only scan for chunk listing (no heap fetch)
CREATE INDEX idx_chunks_cover ON chunks(document_id)
INCLUDE (chunk_id, content_hash, created_at)
WHERE deleted_at IS NULL;
-- Audit: find unused indexes wasting write overhead
SELECT indexrelname, idx_scan
FROM pg_stat_user_indexes
WHERE idx_scan = 0 AND schemaname = 'public'
ORDER BY pg_relation_size(indexrelid) DESC; import psycopg2, json
conn = psycopg2.connect("postgresql://app:pass@pgbouncer:5432/aidb")
cur = conn.cursor()
# Store LLM response + metadata in one column
cur.execute("""
INSERT INTO llm_responses (request_id, response, metadata)
VALUES (%s, %s, %s)
""", (
req_id,
answer_text,
json.dumps({
"model": "claude-sonnet-4-6",
"tokens": {"input": 420, "output": 180},
"latency_ms": 1240,
"retrieved_chunk_ids": chunk_ids,
}),
))
conn.commit()
# GIN-indexed containment search — fast even on millions of rows
cur.execute("""
SELECT request_id, metadata->>'latency_ms' AS latency
FROM llm_responses
WHERE metadata @> %s
AND created_at > NOW() - INTERVAL '7 days'
ORDER BY created_at DESC
LIMIT 200
""", (json.dumps({"model": "claude-sonnet-4-6"}),))
rows = cur.fetchall()
# Functional index for frequent single-key access
# CREATE INDEX idx_resp_model ON llm_responses ((metadata->>'model')); REPEATABLE READ prevents dirty reads and non-repeatable reads but allows phantom reads — a concurrent INSERT can add rows that a re-executed query sees. SERIALIZABLE additionally prevents phantoms and detects write-write conflicts using Serializable Snapshot Isolation, rolling back one of the conflicting transactions. SERIALIZABLE has higher overhead; use it only when phantom reads or write conflicts would break correctness, such as slot reservation or financial deduction.
Open transactions hold an xmin horizon. PostgreSQL's MVCC model keeps all row versions newer than the oldest active xmin, so autovacuum cannot reclaim any dead tuples created after that transaction started. Over hours, this causes unbounded table bloat, slower sequential scans, and wasted disk. Monitoring idle-in-transaction sessions in pg_stat_activity is a standard production health check.
Use GIN when the column contains multiple searchable elements: JSONB objects (for @>, ?, #> operators), full-text tsvector columns, array columns, or pg_trgm trigram similarity. B-tree indexes the whole column value as a unit and cannot search inside JSONB or arrays. A GIN index on a jsonb metadata column allows `WHERE metadata @> '{"model":"claude-3-5-sonnet"}'` to run in milliseconds instead of doing a sequential scan of millions of rows.
BRIN stores min/max values per range of consecutive disk pages rather than per row, making it kilobytes in size versus gigabytes for a B-tree on the same column. It is only effective when column values are physically correlated with disk order — an auto-increment ID, a monotonically increasing timestamp, or a date-partitioned log table. For randomly ordered data, BRIN must check too many page ranges and becomes slower than a sequential scan.
An Index Only Scan reads all needed column values from the index itself without fetching heap pages, eliminating disk I/O on the table. It requires that every column in SELECT and WHERE appears in the index. Use CREATE INDEX ... INCLUDE (col1, col2) to add non-key columns specifically for this purpose. Check with EXPLAIN ANALYZE that the plan shows "Index Only Scan" and not "Index Scan".
Almost never. JSONB stores binary, supports GIN indexing, and evaluates operators efficiently. The only reason to choose json (text storage) is when you need to preserve exact key order or whitespace — which matters only for applications that diff the raw JSON string. For all AI/ML use cases involving query, filter, or aggregation, use JSONB.
In transaction mode the backend connection is returned to the pool after each COMMIT or ROLLBACK, giving maximum concurrency. What breaks: server-side prepared statements (use Postgres protocol-level simple query mode instead), SET LOCAL and session-scoped settings (they reset when the connection is returned), LISTEN/NOTIFY, and advisory locks. These require session mode. For most AI API services, none of these features are needed and transaction mode is correct.
Reading node types (Seq Scan, Index Scan, Hash Join), actual vs estimated rows, buffer hit/miss, and fixing statistics staleness.
EXPLAIN (ANALYZE, BUFFERS) is the definitive source of truth for query performance. It runs the query and shows the actual execution plan: estimated versus actual row counts, loop count, per-node cost, and — with BUFFERS — the number of shared_buffers cache hits and disk reads. Key node types: Seq Scan reads the entire table (acceptable only on small tables or when most rows are returned). Index Scan uses an index but fetches each heap row individually. Index Only Scan reads entirely from the index — no heap fetch (fastest). Bitmap Heap Scan batches index lookups then fetches heap pages in bulk (good for range queries returning 1–5% of the table). Nested Loop is efficient for small outer sets with indexed inner joins. Hash Join builds an in-memory hash table on the smaller relation — good for medium tables but needs work_mem. Merge Join is good for large pre-sorted inputs. The most actionable signal in EXPLAIN output is the gap between rows= (estimate) and actual rows=. A large discrepancy means the planner chose a join strategy optimised for the wrong input size. Fix it with ANALYZE table_name to refresh column statistics. For persistently bad estimates, increase default_statistics_target for the column.
Detecting N+1 query loops, batch fetching with ANY, pg_stat_statements for cumulative impact analysis, and slow-query logging.
The N+1 problem is when code queries N items and then issues one additional query per item, producing N+1 total database calls instead of 1 or 2. In AI systems it appears most often as: loading all chunks for a list of documents (one SELECT per document_id in a Python loop), or fetching conversation history for each user in a batch scoring pipeline. N+1 is rarely noticeable in development (10 docs = 11 queries = undetectable). In production (10 000 docs = 10 001 queries = 5 minutes) it causes timeout cascades. pg_stat_statements is a PostgreSQL extension that normalises query text and accumulates call count, total execution time, mean time, and rows across the process lifetime. Enable it in shared_preload_libraries. Sort by total_exec_time to find the most expensive query patterns in aggregate — a query taking 0.5ms called 2 million times (1000 seconds total) is far more important to optimise than a 10-second query called once. Enable log_min_duration_statement = 1000 to also catch individual slow queries in the Postgres log. Fix N+1 by batching IDs with WHERE id = ANY(%s) or by rewriting with a single JOIN.
INSERT ON CONFLICT DO UPDATE for idempotent writes; RETURNING to eliminate second round trips; autovacuum tuning and table bloat prevention.
INSERT ... ON CONFLICT DO UPDATE (UPSERT) is the idempotent write primitive for tables that can receive the same record multiple times: embedding caches, deduplication tables, idempotency key stores, and feature store snapshots. The ON CONFLICT clause must specify either a unique constraint name or a set of columns matching a unique index. EXCLUDED refers to the row that would have been inserted — use it to reference new values in the SET clause. RETURNING retrieves column values from the affected rows in the same statement, eliminating a second SELECT query and closing the race window between write and read. Always use RETURNING for INSERT to get a server-generated primary key or default timestamp. VACUUM reclaims disk space from dead tuples created by UPDATE and DELETE under MVCC. Autovacuum runs automatically but uses conservative defaults: it waits until 20% of rows are dead before triggering. On write-heavy tables (inference logs, embedding caches), this means bloat accumulates for days before cleanup. Tune autovacuum_vacuum_scale_factor to 0.01 for these tables. Monitor n_dead_tup in pg_stat_user_tables and alert when it exceeds 5% of n_live_tup.
EXPLAIN ANALYZE is not a diagnostic tool you reach for when something breaks — it is a verification step you run before shipping any query that touches more than a thousand rows.
-- Always run with ANALYZE and BUFFERS in production investigations
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT c.chunk_id, c.content, d.title
FROM chunks c
JOIN documents d ON c.document_id = d.id
WHERE c.embedding_model = 'text-embedding-3-small'
AND c.deleted_at IS NULL
AND d.tenant_id = 'acme'
ORDER BY c.created_at DESC
LIMIT 50;
-- Refresh statistics after bulk insert or ETL load
ANALYZE chunks;
ANALYZE documents;
-- Increase statistics target for a high-cardinality column
ALTER TABLE chunks
ALTER COLUMN embedding_model SET STATISTICS 500;
ANALYZE chunks;
-- Check when each table was last analyzed and dead-tuple accumulation
SELECT relname,
last_analyze,
last_autoanalyze,
n_live_tup,
n_dead_tup,
round(n_dead_tup::numeric / NULLIF(n_live_tup,0) * 100, 1) AS dead_pct
FROM pg_stat_user_tables
WHERE relname IN ('chunks','documents')
ORDER BY n_dead_tup DESC; import psycopg2
# ❌ N+1 ANTIPATTERN — one query per doc_id
def get_chunks_bad(doc_ids, conn):
cur = conn.cursor()
result = {}
for doc_id in doc_ids: # 1000 iterations = 1000 queries
cur.execute(
"SELECT chunk_id, content FROM chunks WHERE document_id = %s AND deleted_at IS NULL",
(doc_id,),
)
result[doc_id] = cur.fetchall()
return result
# ✅ BATCH FETCH — one query for all doc_ids
def get_chunks(doc_ids, conn):
cur = conn.cursor()
cur.execute("""
SELECT document_id, chunk_id, content
FROM chunks
WHERE document_id = ANY(%s)
AND deleted_at IS NULL
ORDER BY document_id, chunk_index
""", (list(doc_ids),))
result = {}
for doc_id, chunk_id, content in cur.fetchall():
result.setdefault(str(doc_id), []).append((str(chunk_id), content))
return result
# Find most expensive query patterns cumulatively
# SELECT query, calls, total_exec_time, mean_exec_time, rows
# FROM pg_stat_statements
# ORDER BY total_exec_time DESC LIMIT 20; -- Idempotent embedding cache upsert
INSERT INTO embedding_cache (content_hash, embedding_model, embedding, computed_at)
VALUES (%(hash)s, %(model)s, %(vec)s, NOW())
ON CONFLICT (content_hash, embedding_model)
DO UPDATE SET
embedding = EXCLUDED.embedding,
computed_at = EXCLUDED.computed_at
RETURNING cache_id, (xmax = 0) AS was_new_insert;
-- xmax = 0 → new row; xmax != 0 → updated existing row
-- RETURNING avoids a second SELECT to get the generated PK
INSERT INTO chunks (document_id, content, chunk_index, embedding_model)
VALUES (%(doc_id)s, %(content)s, %(idx)s, %(model)s)
RETURNING chunk_id, created_at;
-- Tune autovacuum for a high-write inference log table
ALTER TABLE inference_events SET (
autovacuum_vacuum_scale_factor = 0.01, -- vacuum at 1% dead tuples
autovacuum_analyze_scale_factor = 0.005, -- analyze at 0.5%
autovacuum_vacuum_cost_delay = 2 -- less I/O throttling
);
-- Check table bloat
SELECT relname,
pg_size_pretty(pg_total_relation_size(oid)) AS total_size,
n_dead_tup,
n_live_tup,
round(n_dead_tup::numeric / NULLIF(n_live_tup, 0) * 100, 1) AS dead_pct
FROM pg_stat_user_tables
ORDER BY n_dead_tup DESC
LIMIT 10; Cost is a unitless estimate from the planner's cost model: cost=startup..total. Startup cost is the estimated work before the first row can be returned (e.g., building a hash table). Total cost is the estimated work to return all rows. It is not wall-clock time — it is a relative number used by the planner to compare alternative execution plans. A higher cost just means the planner estimated more work, not that it will be slower in wall time.
Run ANALYZE table_name to refresh column statistics. If the estimate is still wrong after ANALYZE, the column has unusual distribution that the default 100-sample statistics cannot capture. Increase the statistics target: ALTER TABLE t ALTER COLUMN c SET STATISTICS 500, then ANALYZE. For correlated multi-column predicates, create extended statistics: CREATE STATISTICS s ON col_a, col_b FROM t — this helps when both columns together narrow the result much more than either alone.
N+1 occurs when code issues 1 query to get a list of N IDs, then 1 query per ID, totalling N+1. Detect it by: enabling SQLAlchemy echo=True or Django DEBUG=True to log all queries in dev, counting queries per request in tests, or sorting pg_stat_statements by calls DESC and looking for query patterns with very high call counts per minute relative to their complexity.
SELECT query, calls, total_exec_time, mean_exec_time FROM pg_stat_statements ORDER BY total_exec_time DESC. Focus on total_exec_time (cumulative impact) not mean_exec_time (per-call cost). A query called 1 million times at 0.5ms each = 500 seconds of cumulative DB time and is more important to optimise than a 10-second query called once a day. After optimising, call pg_stat_statements_reset() and re-measure.
RETURNING fetches column values from the rows affected by INSERT, UPDATE, or DELETE in the same statement. It eliminates a follow-up SELECT to get the generated primary key or server-assigned timestamp, saving a network round trip and eliminating the race condition where another transaction could modify the row between the write and the read. RETURNING id, created_at is the correct pattern after every INSERT with a generated primary key.
PostgreSQL MVCC creates a new row version on every UPDATE rather than overwriting the old one. The old version becomes a dead tuple — invisible to new transactions but still occupying disk space until VACUUM reclaims it. Autovacuum handles this automatically. For write-heavy tables, lower the trigger threshold with autovacuum_vacuum_scale_factor = 0.01. For append-only log tables with millions of rows, use time-based table partitioning and DROP old partitions rather than vacuuming them.
HNSW graph navigation vs IVFFlat inverted file clustering; M, ef_construction, ef_search vs nlist, nprobe; recall vs latency vs build cost tradeoffs.
Approximate Nearest Neighbour (ANN) indexes return the k most similar vectors without scanning every row — the approximation is controlled and measurable via recall@k. IVFFlat partitions the vector space into nlist Voronoi cells using k-means clustering at index build time. A query searches the nprobe nearest cells, skipping the rest. Build time is fast (minutes on 1 M vectors), but recall degrades sharply if nprobe is too low for the nlist chosen. IVFFlat requires training data — build it only after the majority of vectors are loaded. HNSW (Hierarchical Navigable Small World) constructs a multi-layer proximity graph. Each node connects to its M nearest neighbours. Queries traverse graph edges from an entry point, navigating toward the query vector. HNSW achieves higher recall at lower latency than IVFFlat on most workloads and supports incremental inserts without retraining. Trade-offs: HNSW uses more memory (roughly M × 8–16 bytes per vector) and has slower build time. For production RAG: start with HNSW M=16, ef_construction=64. Set ef_search=100 for standard recall, increase to 200 for high-recall requirements. Benchmark recall@10 against exact brute-force search on a representative 10K-query sample before locking in parameters.
CREATE EXTENSION vector; column types and dimension limits; <->, <=>, <#> operators; index operator classes; normalisation and partial indexes.
pgvector adds a native vector(N) column type, HNSW and IVFFlat index types, and three distance operators to PostgreSQL. Install with CREATE EXTENSION vector. The column type is vector(N) where N is the embedding dimension: 1536 for text-embedding-3-small, 3072 for text-embedding-3-large, 768 for most open-source models (BGE, E5). pgvector supports up to 2000 dimensions for indexed search; for higher dimensions, use half-precision (halfvec) or binary (bit) columns. Three distance operators: <-> Euclidean (L2) distance, <=> cosine distance (1 − cos θ), <#> negative inner product (−a·b). For unit-normalised embeddings, cosine distance and negative inner product produce identical rankings. Always use the same distance metric at index creation and query time — mixing them bypasses the index entirely and falls back to an exact scan of all rows. Partial HNSW indexes with WHERE deleted_at IS NULL keep the index small and fast as documents are soft-deleted over time. Use vector_cosine_ops, vector_l2_ops, or vector_ip_ops as the operator class to declare which distance metric the index supports.
Collections, points, payloads; HNSW config; pre-filtering with payload indexes; scalar and binary quantization; comparison with pgvector and Weaviate.
Qdrant is a vector database written in Rust, optimised for high-throughput production search. Core concepts: Collection (equivalent to a table), Point (a vector with a numeric/UUID ID and an optional JSON payload), Payload (arbitrary metadata for filtering). Qdrant's key advantage over pgvector is pre-filtering: payload filters are applied before ANN graph traversal rather than after. This means filtering on tenant_id, category, or date range does not degrade recall — you always get k results from the filtered set. Create a payload index on every field you filter by. Without a payload index, Qdrant falls back to linear scan of payloads before ANN — effectively an exact search with metadata overhead. Quantization: scalar INT8 reduces memory 4× with <2% recall loss. Binary (1-bit) reduces 32× but requires rescoring with original float vectors to maintain quality. Use scalar quantization as the default when memory is constrained. Comparison: pgvector for SQL-native apps needing JOINs and transactions. Qdrant for high-QPS vector-first workloads with rich metadata filtering. Weaviate for built-in vectorizer modules and GraphQL API. Pinecone for zero-ops serverless with less control over indexing.
Approximate nearest neighbour means you trade a small, measurable recall loss for orders-of-magnitude latency gain. The index parameters are the lever between those two ends — benchmark recall against ground truth before committing to production parameters.
-- HNSW: best for production RAG (incremental inserts, high recall)
CREATE INDEX idx_chunks_hnsw ON chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Set ef_search per query for quality/speed tradeoff
SET hnsw.ef_search = 100; -- standard; increase to 200 for high-recall use cases
-- Search: returns cosine similarity (1 - distance)
SELECT chunk_id,
content,
1 - (embedding <=> $1::vector) AS cosine_sim
FROM chunks
WHERE deleted_at IS NULL
ORDER BY embedding <=> $1::vector
LIMIT 20;
-- IVFFlat: use for bulk re-indexing after full data load
-- Build only after inserting the majority of rows
CREATE INDEX idx_chunks_ivf ON chunks
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 200); -- rule of thumb: sqrt(total_rows)
SET ivfflat.probes = 10; -- check 10/200 = 5% of cells
-- Benchmark recall vs exact scan
SELECT chunk_id FROM chunks ORDER BY embedding <=> $1 LIMIT 10; -- ANN
SELECT chunk_id FROM chunks ORDER BY embedding <-> $1 LIMIT 10; -- exact L2 (no index)
-- Compare result sets to measure recall@10 CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE chunks (
chunk_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
content TEXT NOT NULL,
content_hash CHAR(64) NOT NULL,
embedding vector(1536), -- NULL until background job embeds
embedding_model TEXT, -- 'text-embedding-3-small'
embedding_dim INT, -- 1536
chunk_index INT NOT NULL,
chunk_total INT NOT NULL,
chunking_ver TEXT NOT NULL DEFAULT 'v1',
tenant_id UUID NOT NULL,
source_url TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ
);
-- Partial HNSW: only active rows, cosine distance
CREATE INDEX idx_chunks_hnsw ON chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
WHERE deleted_at IS NULL;
-- Similarity query: cosine similarity = 1 - cosine distance
SELECT chunk_id,
content,
1 - (embedding <=> $1::vector) AS similarity
FROM chunks
WHERE deleted_at IS NULL
AND tenant_id = $2
ORDER BY embedding <=> $1::vector
LIMIT 20; from qdrant_client import QdrantClient
from qdrant_client.models import (
VectorParams, Distance, PointStruct,
Filter, FieldCondition, MatchValue,
PayloadSchemaType, ScalarQuantization, ScalarType,
ScalarQuantizationConfig,
)
client = QdrantClient(host="localhost", port=6333)
# Create collection with HNSW + scalar quantization
client.recreate_collection(
collection_name="chunks",
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
hnsw_config={"m": 16, "ef_construct": 64, "full_scan_threshold": 10000},
quantization_config=ScalarQuantization(
scalar=ScalarQuantizationConfig(
type=ScalarType.INT8, quantile=0.99, always_ram=True
)
),
)
# Payload indexes — required for fast pre-filtering
client.create_payload_index("chunks", "tenant_id", PayloadSchemaType.KEYWORD)
client.create_payload_index("chunks", "doc_type", PayloadSchemaType.KEYWORD)
client.create_payload_index("chunks", "created_ts", PayloadSchemaType.INTEGER)
# Upsert chunks with metadata
client.upsert("chunks", points=[
PointStruct(
id=str(chunk_id),
vector=embedding.tolist(),
payload={"tenant_id": "acme", "document_id": str(doc_id), "content": text},
)
for chunk_id, doc_id, text, embedding in batch
])
# Pre-filtered search: ANN only over acme's chunks
results = client.search(
collection_name="chunks",
query_vector=query_embedding,
query_filter=Filter(must=[
FieldCondition(key="tenant_id", match=MatchValue(value="acme"))
]),
limit=20,
with_payload=["document_id", "content"],
) Recall@k is the fraction of the true k nearest neighbours (found by exact brute-force) that are also returned by the ANN index. Recall@10 = 0.95 means 9.5 of the true top-10 results are in the ANN result set. Measure it by running both exact search and ANN search on a ground-truth query sample and comparing the result sets. A recall@10 ≥ 0.95 is typically acceptable for RAG; for compliance or legal search you may need 0.99.
HNSW for most production RAG systems: higher recall at lower latency, supports incremental inserts without retraining, and no separate training phase. Use IVFFlat when: bulk re-indexing millions of vectors (faster build time), memory is tightly constrained (IVFFlat index is smaller), or you want to tune the recall/latency tradeoff dynamically per query by adjusting nprobe. In pgvector ≥0.5, HNSW is the recommended default.
<-> is Euclidean (L2) distance: sqrt(Σ(a_i − b_i)²). <=> is cosine distance: 1 − (a·b)/(|a|·|b|), measuring the angle between vectors. <#> is negative inner product: −(a·b). For unit-normalised embeddings, <=> and −<#> produce identical rankings; <#> is marginally faster because it skips the magnitude division. For unnormalised embeddings, use <=> for correctness. OpenAI and Cohere embeddings are unit-normalised; open-source models vary — check the documentation.
pgvector handles millions of vectors comfortably if RAM fits the index (roughly M×8 bytes per vector). Move to Qdrant or similar when: you need >50M vectors, you need multi-tenancy with payload pre-filtering at high QPS, you need scalar/binary quantization to reduce memory, you need a distributed deployment, or vector search throughput (>1000 QPS at <10ms) is the primary workload. Start with pgvector for its simplicity; migrate when benchmarks show it cannot meet your SLA.
Post-filtering: run ANN to get top-k across all vectors, then discard results outside the tenant's scope. If the tenant owns 2% of vectors, the top-k candidates are overwhelmingly from other tenants and the filtered set has far fewer than k results (requiring a much larger initial k). Pre-filtering with a payload index: Qdrant intersects the HNSW graph traversal with the matching payload set, returning exactly k results from the filtered space. Pre-filtering maintains result quality regardless of how selective the filter is.
Scalar quantization compresses each 32-bit float to an 8-bit integer, reducing the in-memory index size 4× (a 6 GB float index → 1.5 GB INT8). The recall loss is typically <1–2% at standard ef_search because Qdrant rescores candidates with the original float vectors. Enable it when RAM is the constraint: quantization_config=ScalarQuantization(...). Binary quantization (1-bit) compresses 32× but requires always_ram=True and original_vector=True rescoring to maintain reasonable recall.
Required columns: content_hash, embedding_model, chunking_version; delta detection on document update; parallel index strategy for model migration.
A production chunk schema must support four operations that many teams only consider after launch: delta re-embedding (only re-embed chunks whose content changed), model migration (switch from one embedding model to another without downtime), chunking strategy updates (change overlap or size without full re-parse), and audit trails (which version of what content produced each embedding). Mandatory columns: chunk_id (UUID PK), document_id (FK with ON DELETE CASCADE), content (the text shown to users), content_hash (SHA-256 of content — computed at write), embedding (vector(N)), embedding_model (e.g. 'text-embedding-3-small'), embedding_dim, chunk_index, chunk_total, chunking_ver (version of chunking logic), tenant_id, created_at, updated_at, deleted_at. For model migration: the new and old embeddings exist in different vector spaces — cosine similarity between an ada-002 embedding and a text-embedding-3-small embedding is meaningless. Strategy: (1) add a partial HNSW index WHERE embedding_model = 'new-model', (2) background job re-embeds all chunks updating embedding and embedding_model, (3) A/B test retrieval quality on both indexes, (4) swap routing to the new index, (5) drop the old partial index.
Pre-filter vs post-filter ANN; PostgreSQL RLS policies for tenant isolation; connecting as non-owner role; Qdrant mandatory payload filters.
Multi-tenant access control must be enforced at the database layer, not the application layer. PostgreSQL Row-Level Security (RLS) guarantees this: after enabling RLS, every SELECT, UPDATE, and DELETE automatically appends the policy predicate as a WHERE condition, regardless of how the SQL was written. Even a raw psql connection or an ORM that constructs arbitrary queries cannot return another tenant's rows — the database enforces it. Enable RLS with ALTER TABLE chunks ENABLE ROW LEVEL SECURITY. Write a policy: CREATE POLICY tenant_isolation ON chunks USING (tenant_id = current_setting('app.current_tenant_id')::UUID). The application sets the session parameter via SET app.current_tenant_id = '...' on connection setup. Critical: RLS is NOT enforced for the table owner (superuser). Create a dedicated application role with DML privileges only — never connect as postgres from application code. For metadata filtering performance: partial indexes per tenant give the best throughput for single-tenant queries (each search only traverses a small index). In Qdrant, create a payload index on tenant_id and include a mandatory filter on every search call. For post-filtering (ANN then filter), retrieve a candidate pool at least 5–10× larger than k to ensure enough results survive the filter.
Sparse (BM25) + dense (embedding) retrieval; Reciprocal Rank Fusion; SPLADE sparse vectors; result deduplication; reranking after fusion.
Dense embedding search captures semantic similarity but fails on exact terms: a query for "invoice INV-2026-1049" should return the exact document, not semantically similar invoices. Sparse BM25 search captures exact matches but misses paraphrases and synonyms. Hybrid retrieval combines both and consistently outperforms either alone on diverse production query distributions. The standard fusion method is Reciprocal Rank Fusion (RRF). For each candidate document: rrf_score = Σ 1/(k + rank_i) across all result lists where k=60 is a smoothing constant. RRF is robust because it works on ranks, not raw scores — BM25 and cosine similarity use incompatible scales that RRF sidesteps entirely. The key implementation detail: accumulate scores by chunk_id in a dictionary; a chunk appearing in both result lists receives contributions from both ranks and naturally ranks higher than single-list results. After RRF fusion, a cross-encoder reranker (Cohere Rerank, BGE Reranker) can further improve top-k precision. Rerankers score query-chunk pairs jointly (not separately), capturing interaction patterns. Always retrieve a larger candidate pool for reranking (k=40 for reranker input, return top-5 to the LLM).
The retrieval data model is more important than the embedding model. Bad schema choices compound over every re-index, every model migration, and every ACL audit — and they are expensive to fix retrospectively.
CREATE TABLE chunks (
chunk_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id UUID NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
content TEXT NOT NULL,
content_hash CHAR(64) NOT NULL, -- SHA-256 hex of content
embedding vector(1536), -- NULL until embedded
embedding_model TEXT, -- 'text-embedding-3-small'
embedding_dim INT, -- 1536
chunk_index INT NOT NULL,
chunk_total INT NOT NULL,
chunking_ver TEXT NOT NULL DEFAULT 'v1',
tenant_id UUID NOT NULL,
source_url TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
deleted_at TIMESTAMPTZ,
UNIQUE (document_id, chunk_index, chunking_ver)
);
-- Partial index per model for safe migration
CREATE INDEX idx_chunks_v1 ON chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64)
WHERE embedding_model = 'text-embedding-3-small' AND deleted_at IS NULL;
-- Detect chunks needing re-embedding
SELECT c.chunk_id, c.document_id
FROM chunks c
JOIN documents d ON c.document_id = d.id
WHERE (d.content_hash != c.content_hash -- document changed
OR c.embedding_model != 'text-embedding-3-small' -- model migration
OR c.embedding IS NULL) -- never embedded
AND c.deleted_at IS NULL; -- Step 1: Enable RLS on the chunks table
ALTER TABLE chunks ENABLE ROW LEVEL SECURITY;
ALTER TABLE chunks FORCE ROW LEVEL SECURITY; -- applies to table owner too
-- Step 2: Create isolation policy
CREATE POLICY tenant_isolation ON chunks
AS PERMISSIVE
FOR ALL
TO app_role -- only for the application role
USING (tenant_id = current_setting('app.current_tenant_id')::UUID);
-- Step 3: Application sets context at session/transaction start
-- SET app.current_tenant_id = 'a3f2c81b-4d23-...';
-- (via PgBouncer SET or application middleware)
-- Step 4: Queries automatically filtered — no WHERE needed
SELECT chunk_id, content, 1 - (embedding <=> $1) AS sim
FROM chunks
WHERE deleted_at IS NULL
ORDER BY embedding <=> $1
LIMIT 20;
-- Verify RLS is active
SELECT tablename, rowsecurity, forcerowsecurity
FROM pg_tables
WHERE tablename = 'chunks';
-- Check current_setting used in active policy
SELECT current_setting('app.current_tenant_id'); from collections import defaultdict
def reciprocal_rank_fusion(sparse_hits, dense_hits, k=60):
"""
Fuse BM25 and vector search results using Reciprocal Rank Fusion.
Args:
sparse_hits: [(chunk_id, payload), ...] ordered by BM25 score desc
dense_hits: [(chunk_id, payload), ...] ordered by cosine sim desc
k: smoothing constant (60 from original RRF paper)
Returns:
[(chunk_id, payload, rrf_score), ...] sorted by score desc
"""
scores = defaultdict(float)
payloads = {}
for rank, (chunk_id, payload) in enumerate(sparse_hits, start=1):
scores[chunk_id] += 1.0 / (k + rank)
payloads[chunk_id] = payload
for rank, (chunk_id, payload) in enumerate(dense_hits, start=1):
scores[chunk_id] += 1.0 / (k + rank)
payloads.setdefault(chunk_id, payload)
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [(cid, payloads[cid], score) for cid, score in ranked]
# Usage
query_text = "invoice INV-2026-1049 payment terms"
query_embedding = embed(query_text)
sparse_results = bm25_search(query_text, top_k=40) # [(chunk_id, payload)]
dense_results = vector_search(query_embedding, top_k=40) # [(chunk_id, payload)]
fused = reciprocal_rank_fusion(sparse_results, dense_results)[:20]
reranked = cohere_rerank(query_text, [p["content"] for _, p, _ in fused])[:8] chunk_id (PK), document_id (FK), content (displayed text), content_hash (SHA-256 for delta detection), embedding (vector), embedding_model (version string), chunk_index, tenant_id, created_at, deleted_at. The most commonly missing — and most expensive to add later — are content_hash (needed for delta re-embedding) and embedding_model (needed for model migration). Without them, every upgrade requires a full re-index from scratch.
Use a parallel index strategy: (1) keep the existing partial HNSW index on the old model, (2) start a background job that re-embeds chunks and updates embedding + embedding_model columns, (3) create a second partial HNSW index on the new model as it fills in, (4) A/B test both indexes on real queries using recall@k against a ground-truth set, (5) update the query routing to use the new index, (6) drop the old index after a stabilisation window. Zero downtime because the old index serves queries throughout the migration.
RLS attaches a security policy to a table. After ALTER TABLE t ENABLE ROW LEVEL SECURITY, every data-access operation automatically appends the USING clause as a WHERE filter in the query plan. The application cannot opt out — even raw SQL or an ORM generating arbitrary queries is bound by the policy. Application-level filtering can be bypassed by a query that forgets the WHERE clause, an admin tool, or a security bug. RLS cannot be bypassed as long as the connecting role is not the table owner.
Post-filter (ANN then filter): fails when the filter is highly selective (most results are discarded), returning fewer than k results or requiring a very large initial candidate pool. Pre-filter (filter then ANN): fails when the indexed filter set is too small for the HNSW graph to navigate efficiently, increasing recall error. Pre-filter with a payload index is the correct default for multi-tenant RAG. Post-filter is acceptable only when the filter is weakly selective (<50% removal) and exact recall@k is not required.
RRF is a rank-based fusion method: for each document, sum 1/(k + rank_i) across all result lists. It uses ranks instead of raw scores because BM25 and cosine similarity are on incompatible scales — BM25 depends on document length, IDF, and term frequency while cosine similarity is bounded 0–1. Rank-based fusion eliminates the need to normalise or calibrate these scores against each other. The k=60 constant prevents the top-1 result from dominating too strongly, smoothing the fusion.
Hybrid consistently outperforms dense-only for queries containing: exact identifiers (invoice numbers, product codes, error strings like "NullPointerException"), proper nouns underrepresented in the embedding model's training data, technical acronyms (HIPAA, SOC-2, RFC 7231), very short queries (<3 words), or code snippets. Dense-only is sufficient for conversational paraphrase queries. Hybrid is the safe default for production RAG because it is almost never worse than either component alone on a diverse query distribution.
Strings, Hashes, Lists, Sets, Sorted Sets, HyperLogLog, Streams — right structure for each AI/ML use case.
Redis provides eight distinct data structures, each optimised for a specific access pattern. Strings (SET/GET/INCR) handle simple caches, counters, and distributed locks (SETNX). Hashes (HSET/HGET/HMGET) store field-value collections — ideal for conversation memory, user session state, and model configuration snapshots where individual field updates are common. Lists (LPUSH/RPOP/LRANGE) are ordered sequences useful for conversation history queues and simple task queues (though Streams are better for durability). Sets (SADD/SISMEMBER/SMEMBERS) handle unique membership: tracking which documents have been embedded, deduplicating user IDs. Sorted Sets (ZADD/ZRANGEBYSCORE/ZRANK) store members with a float score — the right structure for rate limiting (sliding window with timestamps as scores), recommendation ranking (item scores), and leaderboards. HyperLogLog (PFADD/PFCOUNT) estimates cardinality of large sets in 12 KB regardless of set size — use it for unique daily active users or distinct query counting without storing every ID. Streams (XADD/XREAD/XGROUP) are append-only logs with consumer groups — the correct structure for async LLM inference pipelines where message durability and at-least-once processing matter.
Cache-aside, write-through, write-behind, read-through; cache stampede / thundering herd prevention; TTL jitter.
Cache-aside (lazy loading) is the most common pattern for AI systems: the application checks the cache on read, on miss fetches from the database and populates the cache, on write updates the database and invalidates or updates the cache entry. Simple to implement and resilient (cache miss = slower, not broken), but prone to cache stampede: when a popular key expires, many concurrent requests simultaneously find a miss and all hit the database at once. Write-through: every write goes to both the cache and the database synchronously. The cache is always consistent with the database. Simpler to reason about but doubles write latency and wastes cache space for write-heavy keys that are never read. Write-behind (write-back): writes go to the cache immediately, and the cache asynchronously flushes to the database. Lowest write latency but risks data loss if Redis crashes before the flush. Only use write-behind for non-critical data (analytics counters, view counts) where loss is acceptable. Cache stampede prevention: (1) Lock pattern — SET key:lock NX PX 5000 to acquire a distributed lock; the winning request populates the cache, others wait. (2) Probabilistic Early Recomputation (PER) — proactively regenerate a value slightly before its TTL expires, proportional to time remaining and re-computation cost. (3) TTL jitter — add random 0–10% variance to TTL so keys that were populated together don't all expire at the same moment.
EXPIRE, EXPIREAT, TTL; allkeys-lru vs allkeys-lfu vs volatile-lru; maxmemory-policy; keyspace notifications; MEMORY USAGE.
TTL (time-to-live) controls when a key is automatically deleted. SET key value EX seconds sets TTL on write. EXPIRE key seconds adds or updates TTL on an existing key. TTL key returns remaining seconds (-1 = no TTL, -2 = key does not exist). EXPIREAT sets an absolute Unix timestamp expiry. For AI caches: embedding caches (hours to days), model prediction caches (minutes to hours depending on staleness tolerance), rate limit windows (seconds to minutes). Eviction policies control what Redis deletes when it reaches maxmemory. noeviction (default) rejects new writes with an error when full — never correct for a cache. allkeys-lru evicts the least-recently-used key across all keys — the standard choice for caches where all keys are equally likely to be accessed. allkeys-lfu evicts the least-frequently-used key — better than LRU for workloads with a stable hot set (popular embedding models, frequent system prompts) because frequently accessed keys survive longer. volatile-lru and volatile-lfu apply only to keys with a TTL set; keys without TTLs are never evicted. volatile-* is appropriate when Redis holds both a cache (with TTLs) and durable data (without TTLs) in the same instance — though separating them into different Redis instances is cleaner. Set maxmemory as a fraction of available RAM (leave 20–30% headroom for overhead). Use MEMORY USAGE key to check per-key memory. Monitor used_memory_rss and mem_fragmentation_ratio in INFO memory.
Redis is not just a cache — it is a data structure server. Choosing the right structure cuts memory and improves throughput more than tuning eviction policies ever will.
import redis, json, time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Hash: conversation memory (field per message)
r.hset('conv:session_42', mapping={
'system': 'You are a helpful assistant.',
'last_user_msg': 'What is HNSW?',
'last_model_resp': 'HNSW is a graph-based ANN index...',
'turn_count': 7,
})
r.expire('conv:session_42', 3600) # 1 hour TTL
# Sorted Set: sliding-window rate limiter
def is_rate_limited(user_id, limit=60, window_s=60):
key = f'rl:{user_id}'
now = time.time()
pipe = r.pipeline()
pipe.zremrangebyscore(key, '-inf', now - window_s) # drop old entries
pipe.zadd(key, {str(now): now}) # add current request
pipe.zcard(key) # count in window
pipe.expire(key, window_s)
results = pipe.execute()
return results[2] > limit # True = limited
# Stream: async inference request queue
r.xadd('inference:queue', {
'request_id': 'req-abc123',
'prompt': 'Summarise this document...',
'model': 'claude-sonnet-4-6',
'tenant_id': 'acme',
})
# HyperLogLog: unique queries per hour (no exact list needed)
r.pfadd('unique_queries:2026-05-19T14', user_query)
approx_unique = r.pfcount('unique_queries:2026-05-19T14') import redis, json, hashlib, time
r = redis.Redis(decode_responses=True)
def get_embedding(text: str, model: str, embed_fn) -> list[float]:
"""Cache-aside with distributed lock to prevent stampede."""
cache_key = f'embed:{model}:{hashlib.sha256(text.encode()).hexdigest()[:16]}'
lock_key = f'{cache_key}:lock'
# 1. Cache hit
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 2. Acquire lock (NX = only if not exists, PX = expire in ms)
acquired = r.set(lock_key, '1', nx=True, px=5000)
if acquired:
try:
embedding = embed_fn(text, model)
# TTL jitter: 3600s ± 10% to prevent mass expiry
ttl = int(3600 * (0.9 + 0.1 * __import__('random').random()))
r.set(cache_key, json.dumps(embedding), ex=ttl)
return embedding
finally:
r.delete(lock_key)
else:
# 3. Wait for lock holder to populate cache
for _ in range(20):
time.sleep(0.25)
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# Fallback: compute without caching
return embed_fn(text, model) import redis
r = redis.Redis(decode_responses=True)
# Set TTL at write time
r.set('embedding:abc123', vector_json, ex=86400) # 24h TTL
r.set('prediction:req456', result_json, ex=300) # 5min TTL
# Update TTL on an existing key (cache hit — refresh expiry)
r.expire('embedding:abc123', 86400)
# Inspect key memory usage and TTL
print(r.memory_usage('embedding:abc123')) # bytes
print(r.ttl('embedding:abc123')) # seconds remaining
# Check eviction policy at runtime
info = r.info('memory')
print(info['maxmemory_policy']) # e.g. 'allkeys-lfu'
# Pipeline: batch TTL updates efficiently
pipe = r.pipeline()
for key in stale_keys:
pipe.expire(key, 3600)
pipe.execute()
# --- redis.conf settings ---
# maxmemory 4gb
# maxmemory-policy allkeys-lfu # LFU for stable hot-set workloads
# maxmemory-samples 10 # LFU accuracy (default 5)
# hz 20 # background tasks frequency Use Hash when you need to read or update individual fields without loading the entire document. HGET field is O(1) and returns only the field you need. GET on a JSON String requires fetching and parsing the whole document to update one field. Hash is ideal for session state, conversation memory, and config objects where fields are updated independently. Use String + JSON only when the object is always read and written as a whole.
HyperLogLog is a probabilistic data structure that estimates the cardinality (distinct count) of a set using at most 12 KB of memory, with a standard error of ~0.81%. PFADD adds elements, PFCOUNT returns the estimated unique count. Use it when you need to count distinct users, queries, or events at scale without storing every element. It cannot tell you which elements are in the set — only approximately how many distinct ones there are. For exact membership or enumeration, use a Set or Bloom filter instead.
Write-through guarantees that the cache is always consistent with the database — useful when reads immediately follow writes from different processes (e.g., a model evaluation result written by one service and immediately read by another). Cache-aside is simpler and more flexible: the cache is populated lazily on reads, tolerating a stale or cold cache. For most AI API read paths (embedding lookup, document metadata), cache-aside is correct. For write-then-read consistency within a tight window, write-through is safer.
Cache stampede (thundering herd) occurs when a popular cached key expires and many concurrent requests simultaneously find a miss, all hitting the origin database or API at once. Prevention strategies: (1) Distributed lock — first requester acquires a lock, populates cache; others wait for the cache to be populated. (2) TTL jitter — add random variance to TTLs so keys that were populated together don't all expire simultaneously. (3) Probabilistic Early Recomputation (PER) — proactively recompute a value slightly before expiry based on recompute cost and remaining TTL.
LRU evicts the key that was least recently accessed — it works well when access patterns shift over time (yesterday's popular queries become less relevant). LFU evicts the key accessed least frequently — it works better when there is a stable hot set of keys accessed repeatedly (popular embedding model names, system prompt templates, frequent user queries). For AI caches with a stable popular-query distribution, LFU keeps the hot set in memory better. For caches with rapidly shifting access patterns, LRU is more responsive.
Redis returns an OOM (Out Of Memory) error for all write commands: SET, LPUSH, ZADD, HSET, etc. Read commands (GET, HGET) continue to work. If the application does not handle OOM errors gracefully, it will raise exceptions and potentially crash. Always set maxmemory-policy to allkeys-lru or allkeys-lfu for cache instances so Redis silently evicts old keys rather than rejecting writes.
Throughput, latency, durability, and replay tradeoffs; consumer groups; when each tool fits AI/ML workloads.
Redis Streams are an append-only log with consumer groups built into Redis. Sub-millisecond publish latency. Consumer groups enable multiple workers to share a stream without duplicate delivery. Persistent with AOF/RDB. Use for: async LLM inference queues at moderate volume (<500K msg/day), background embedding jobs, real-time event forwarding between microservices. Limitation: single-node throughput ceiling; no multi-partition horizontal scaling. Apache Kafka is a distributed commit log. Millions of messages per second per partition, configurable retention (hours to weeks), replay from any offset, rich consumer group semantics, Kafka Connect for CDC and database integration. Kafka Streams and ksqlDB for stream processing. Operational complexity is high (ZooKeeper or KRaft, broker management, partition tuning). Use for: ML training data event pipelines, CDC from Postgres to the feature store, high-volume inference logging that must be replayed for model debugging. AWS SQS is a managed message queue with zero operational overhead. Standard queue: at-least-once, best-effort ordering. FIFO queue: exactly-once within a deduplication window, 3000 TPS. Built-in dead-letter queue. 256 KB message limit (store large payloads in S3, pass S3 key in SQS). Use for: asynchronous model invocations in AWS-native architectures, decoupling services without operating a broker.
At-most-once, at-least-once, exactly-once semantics; Kafka manual commit strategies; idempotency keys; consumer rebalancing.
Delivery guarantees describe what happens when a consumer crashes between receiving a message and acknowledging it. At-most-once: the message is acknowledged before processing. If the consumer crashes after ACK but before completing the work, the message is lost. Suitable for metrics and analytics where occasional loss is acceptable. At-least-once: the message is acknowledged after processing. If the consumer crashes after processing but before ACK, the message is redelivered. The consumer may process it twice — consumer must be idempotent. At-exactly-once: idempotent producers + transactional consumers ensure each message is processed exactly once. Complex and expensive. In practice: make consumers idempotent (same message processed twice = same outcome) and use at-least-once delivery — this is effectively exactly-once at lower cost. Kafka commit strategies: auto-commit (enable.auto.commit=true) ACKs messages on a timer regardless of processing status — risks message loss if the consumer crashes after auto-commit but before completing work. Manual synchronous commit: commitSync() after each batch — safe but slower. Manual async commit: commitAsync() for throughput — no guarantee of ordering on retry. The correct production pattern: process a batch, commitSync after all messages in the batch are successfully written to the destination. Idempotency key: a unique identifier per message that the consumer stores in a processed-IDs set before committing. On redelivery, check the set and skip if already processed.
DLQ for poisoned messages; retry with exponential backoff and jitter; consumer lag monitoring; flow control patterns.
A dead-letter queue (DLQ) receives messages that have failed processing N times. Without a DLQ, a poison message (malformed payload, unexpected data that always raises an exception) blocks the queue permanently as the consumer retries it in an infinite loop. In AWS SQS, set maxReceiveCount on the source queue's Redrive Policy to route to a DLQ after N failures. In Kafka, use a topics-based DLQ pattern: after N retries, produce the message to a dead-letter topic with error metadata. In Redis Streams, move the message to a separate dead-letter stream after exceeding a delivery count threshold. Retry strategy: exponential backoff with jitter prevents retry storms. Base delay × 2^attempt + random jitter. Cap at a maximum delay (e.g., 5 minutes). Jitter distributes retries across time so all workers don't retry the same message simultaneously. Backpressure: the producer is signalled to slow down when the consumer cannot keep up. Kafka: consumer lag (measured by the difference between the latest offset and the committed offset) drives auto-scaling — high lag triggers new consumer pod creation. SQS: no native backpressure; implement at the API layer by checking ApproximateNumberOfMessages before accepting new requests. Redis Streams: XLEN on the stream gives queue depth; a producer can check this and block or rate-limit if depth exceeds a threshold.
The gap between at-least-once and exactly-once is idempotency — if your consumer is idempotent, at-least-once delivery is effectively exactly-once and far cheaper to implement.
import redis, json, time
r = redis.Redis(decode_responses=True)
STREAM = 'inference:queue'
GROUP = 'inference-workers'
CONSUMER = 'worker-1'
# Create group (idempotent — ignore if already exists)
try:
r.xgroup_create(STREAM, GROUP, id='0', mkstream=True)
except redis.ResponseError:
pass # group already exists
def process_message(msg_id, fields):
payload = json.loads(fields['payload'])
result = call_llm(payload['prompt'], payload['model'])
store_result(payload['request_id'], result)
return True
# Main consumer loop
while True:
# Read up to 10 messages, block 2s if stream is empty
messages = r.xreadgroup(GROUP, CONSUMER, {STREAM: '>'}, count=10, block=2000)
for stream_name, entries in (messages or []):
for msg_id, fields in entries:
try:
process_message(msg_id, fields)
r.xack(STREAM, GROUP, msg_id) # ACK on success
except Exception:
pass # message stays pending → retry by another worker
# Reclaim messages pending > 30s (crashed worker recovery)
pending = r.xpending_range(STREAM, GROUP, min='-', max='+', count=50)
for p in pending:
if p['time_since_delivered'] > 30_000:
r.xclaim(STREAM, GROUP, CONSUMER, 30_000, [p['message_id']]) from confluent_kafka import Consumer, KafkaError
import json
consumer = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'feature-store-writer',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False, # manual commit for safety
})
consumer.subscribe(['user-events'])
def already_processed(event_id: str) -> bool:
return redis_client.sismember('processed_events', event_id)
def mark_processed(event_id: str) -> None:
redis_client.sadd('processed_events', event_id)
redis_client.expire('processed_events', 86400 * 7) # 7-day window
batch = []
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() != KafkaError._PARTITION_EOF:
raise Exception(msg.error())
continue
event = json.loads(msg.value())
event_id = event['event_id']
if not already_processed(event_id):
write_to_feature_store(event)
mark_processed(event_id)
batch.append(msg)
if len(batch) >= 100:
consumer.commit(asynchronous=False) # synchronous commit after batch
batch.clear() import time, random, json
import redis
r = redis.Redis(decode_responses=True)
STREAM = 'inference:queue'
DLQ = 'inference:dlq'
GROUP = 'workers'
MAX_RETRY = 3
def process_with_retry(msg_id: str, fields: dict) -> None:
delivery_count = int(fields.get('delivery_count', 0))
if delivery_count >= MAX_RETRY:
# Move to DLQ with error context
r.xadd(DLQ, {
**fields,
'failed_msg_id': msg_id,
'error': 'max_retries_exceeded',
'failed_at': str(time.time()),
})
r.xack(STREAM, GROUP, msg_id)
return
try:
result = call_llm(json.loads(fields['payload']))
store_result(fields['request_id'], result)
r.xack(STREAM, GROUP, msg_id)
except TransientError:
# Exponential backoff with jitter before re-enqueueing
delay = min(2 ** delivery_count + random.uniform(0, 1), 300)
time.sleep(delay)
r.xadd(STREAM, {**fields, 'delivery_count': delivery_count + 1})
r.xack(STREAM, GROUP, msg_id) # ack original; new entry queued
except PoisonError:
r.xadd(DLQ, {**fields, 'error': 'poison_message', 'failed_at': str(time.time())})
r.xack(STREAM, GROUP, msg_id)
def check_backpressure(max_depth: int = 10_000) -> bool:
info = r.xlen(STREAM)
return info > max_depth # True = apply backpressure Use Kafka when: (1) throughput exceeds Redis single-node capacity (>1M msg/day), (2) you need long retention and replay (re-process training data events from 30 days ago), (3) multiple independent consumer groups need the same stream (analytics + feature store + model training from one topic), (4) you need CDC via Kafka Connect to sync Postgres changes to a feature store, or (5) you need exactly-once semantics across a Kafka Streams topology. Use Redis Streams for lower-volume, lower-latency, operationally simpler queues where Redis is already in your stack.
A consumer group lets multiple workers share a stream without processing the same message twice. When a consumer calls XREADGROUP with ">" (new messages), Redis delivers each new message to exactly one consumer in the group and marks it as "pending" (delivered but not yet acknowledged). After successful processing, the consumer calls XACK, removing it from the pending set. If a consumer crashes, the message stays in the pending set and can be reclaimed (XCLAIM) by another worker after a timeout. This provides at-least-once delivery with exactly-once delivery achievable through idempotent processing.
At-least-once is sufficient whenever the consumer is idempotent — i.e., processing the same message twice produces the same result as processing it once. Examples: writing a feature value keyed on user_id + timestamp (second write is a no-op if the key already exists), incrementing a counter with a deduplication check, or storing an embedding keyed on content_hash (same hash → same result, skip re-embedding). Exactly-once (Kafka transactions + idempotent producers) is needed only when the side effect is non-idempotent and cannot be made so: charging a credit card, sending an email, deducting inventory.
A rebalance occurs when the consumer group membership changes: a new consumer joins, an existing one fails its heartbeat, or a consumer calls pause. During rebalance, Kafka redistributes partition assignments across all consumers in the group. Any message that was fetched but not committed by the leaving consumer will be redelivered to its new owner. If the original consumer was mid-processing when it lost its partition, the new owner also processes the same message — causing duplicates. Idempotent consumers handle this transparently; non-idempotent consumers need exactly-once semantics or external deduplication.
A DLQ receives messages that have exhausted their retry budget. Without it, a poison message (one that always fails processing) permanently blocks the queue as the consumer retries it indefinitely, starving all subsequent messages. The DLQ separates failed messages for investigation and replay without blocking healthy message flow. Monitor DLQ depth as a critical alert — a non-empty DLQ means real failures are happening that need investigation.
Without jitter, all workers that failed at the same time (e.g., during a transient database outage) will retry at the same exponential intervals — doubling their load on the recovering system in synchronized waves. Jitter adds random variance to the delay (e.g., delay = 2^attempt × random(0.5, 1.5)) so retries are spread out over time. This converts a synchronized retry storm into smoothly distributed retries, giving the downstream system time to recover without being re-overwhelmed.
Transform-before-load (ETL) vs load-then-transform (ELT); dbt as the SQL transformation layer; model materializations; lineage and tests.
ETL (Extract-Transform-Load) transforms data before loading it into the destination. The transform step runs in custom code — Python, Spark, or Glue jobs. Advantages: sensitive data can be masked or filtered before landing in the warehouse. Disadvantages: business logic is scattered across scripts, hard to audit, and expensive to reprocess when logic changes. ETL is appropriate when you cannot store raw data due to PII or cost. ELT (Extract-Load-Transform) loads raw data first, then transforms it inside the analytical warehouse or lakehouse using SQL. The modern default: cloud storage is cheap, BigQuery and Snowflake can process petabytes with SQL, and transformation logic in version-controlled SQL is auditable, testable, and reproducible. dbt (data build tool) is the ELT transformation layer: SQL SELECT statements become tables or views, with built-in tests, documentation, and column-level lineage. Materializations: view (re-runs query on every read, always fresh), table (snapshot, fast reads, stale until refreshed), incremental (appends or merges new rows only — critical for AI/ML feature pipelines that grow daily), and ephemeral (subquery, no storage). For ML feature engineering: use incremental materializations with a unique_key for idempotent upserts and an incremental predicate on updated_at to only process new rows.
DAG structure, operators, sensors; execution_date for idempotency; XCom for small results vs S3 for large artifacts; backfill strategy.
Apache Airflow orchestrates pipeline workflows as Directed Acyclic Graphs (DAGs). Each node is a Task with a specific operator: PythonOperator (run a Python function), BashOperator (run shell commands), SQLExecuteQueryOperator (run SQL), KubernetesPodOperator (run a container), S3Sensor (wait for a file to appear). Dependencies are set with >> or set_downstream. Idempotency is the critical design constraint: a task must produce the same output when run multiple times on the same input. The most common mistake is using NOW() or datetime.utcnow() inside a task — backfills fail because they compute values for today instead of the historical execution_date. Always use context['logical_date'] (formerly execution_date) for time-bounded queries and file paths. Example: a daily feature extraction task should query WHERE event_date = '{{ ds }}' (the DAG's logical date), not WHERE event_date = CURRENT_DATE. XCom (cross-communication) passes small values between tasks via the metadata database. For large data (>100 KB) use S3/GCS paths as XCom values. Never pass DataFrames through XCom — the metadata database is not designed for large binary objects and will degrade performance. Pass file paths and let downstream tasks read directly from S3.
Postgres logical replication; Debezium + Kafka Connect; op codes (c/u/d); WAL slot management; schema registry; backfill strategy.
Change Data Capture (CDC) captures every insert, update, and delete from a source database in real time, without modifying application code. The canonical stack: PostgreSQL logical replication → Debezium Kafka Connect source connector → Kafka topic → downstream consumers (feature store, search index, analytics warehouse). Postgres CDC requires: wal_level = logical in postgresql.conf, a replication slot, and a publication. Each CDC event is a JSON message with op (c=create, u=update, d=delete), before (previous row state), after (new row state), source (LSN, table, timestamp). Before field is null for inserts; after is null for deletes. Process the event to determine what changed. Replication slot management is critical: a replication slot accumulates WAL since the last consumer-acknowledged LSN. If the Debezium connector stops for hours, WAL accumulates indefinitely — Postgres disk fills and the database can crash. Monitor pg_replication_slots.confirmed_flush_lsn and alert on slots that are falling behind. Drop the slot if the connector is permanently stopped. Schema registry (Confluent, AWS Glue) serialises CDC event schemas to Avro or Protobuf. Consumers deserialise using the schema ID in each message. Without a registry, a source table column addition breaks all consumers immediately.
Idempotency is the single most important property in a data pipeline. A task that cannot be safely re-run will fail in production and take hours to debug. Design for re-runs from the start.
-- models/user_features.sql
-- Incremental materialization: only process new/updated rows
{{
config(
materialized = 'incremental',
unique_key = 'user_id',
incremental_strategy = 'merge'
)
}}
SELECT
user_id,
COUNT(*) AS total_sessions,
AVG(session_duration_s) AS avg_session_s,
SUM(CASE WHEN converted THEN 1 ELSE 0 END) AS conversions,
MAX(created_at) AS last_active_at,
CURRENT_TIMESTAMP AS feature_computed_at
FROM {{ source('raw', 'sessions') }}
WHERE 1=1
{% if is_incremental() %}
AND updated_at > (SELECT MAX(last_active_at) FROM {{ this }})
{% endif %}
GROUP BY user_id
-- schema.yml — tests run in CI
# models:
# - name: user_features
# columns:
# - name: user_id
# tests: [not_null, unique]
# - name: total_sessions
# tests: [not_null, {accepted_range: {min_value: 0}}] from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.transfers.sql_to_s3 import SqlToS3Operator
from datetime import datetime, timedelta
def extract_features(logical_date, **context):
"""Idempotent: uses logical_date, not NOW()."""
ds = logical_date.strftime('%Y-%m-%d')
query = f"""
SELECT user_id, COUNT(*) AS events, SUM(value) AS total_value
FROM events
WHERE event_date = '{ds}' -- logical_date, NOT CURRENT_DATE
GROUP BY user_id
"""
df = run_query(query)
# Write to S3 with date partition — idempotent (overwrite same key)
s3_path = f"s3://features/daily/date={ds}/features.parquet"
df.to_parquet(s3_path)
return s3_path # pass S3 path via XCom, not the data
with DAG(
dag_id='daily_feature_extraction',
start_date=datetime(2025, 1, 1),
schedule_interval='@daily',
catchup=True, # enables backfill
default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
) as dag:
extract = PythonOperator(
task_id='extract_features',
python_callable=extract_features,
)
# SqlToS3Operator uses {{ ds }} templating for idempotency
load = SqlToS3Operator(
task_id='load_to_warehouse',
sql="SELECT * FROM staging WHERE load_date = '{{ ds }}'",
s3_key='warehouse/{{ ds }}/data.csv',
)
extract >> load # Debezium Postgres connector config (Kafka Connect REST)
connector_config = {
"name": "postgres-chunks-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "replication_user",
"database.password": "secret",
"database.dbname": "aidb",
"database.server.name": "aidb",
"table.include.list": "public.chunks,public.documents",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_pub",
"key.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.converter": "io.confluent.kafka.serializers.KafkaAvroSerializer",
"schema.registry.url": "http://schema-registry:8081",
}
}
# CDC event consumer — sync chunks table changes to vector store
def handle_cdc_event(event: dict) -> None:
op = event['op'] # 'c' create, 'u' update, 'd' delete
after = event.get('after')
before = event.get('before')
if op in ('c', 'u') and after:
# Upsert chunk into vector store
upsert_vector(after['chunk_id'], after['embedding'], after)
elif op == 'd' and before:
# Soft-delete: mark chunk as deleted in vector store
delete_vector(before['chunk_id'])
# Monitor replication slot lag (run as a scheduled check)
# SELECT slot_name, confirmed_flush_lsn,
# pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
# FROM pg_replication_slots; ELT is the modern default. Load raw data first (preserving full fidelity and enabling reprocessing from source), then transform inside BigQuery, Snowflake, or DuckDB using dbt SQL models. ELT with dbt gives you version-controlled transformations, column-level lineage, built-in tests, and the ability to reprocess historical data without re-extracting. Use ETL only when you must mask PII or sensitive data before it lands in the warehouse, or when the transformation requires non-SQL logic (custom ML preprocessing, computer vision, audio processing).
An incremental model only processes rows that are new or updated since the last run, using a WHERE predicate on a timestamp or unique key. This is critical for ML feature pipelines that process billions of historical events: instead of recomputing from scratch on every run (hours), only new events are processed (minutes). The unique_key config enables upsert semantics — if a row already exists with the same key, it is updated rather than duplicated. Incremental models are idempotent: re-running them on the same time window produces the same result.
An idempotent pipeline task produces the same output when run multiple times for the same logical date. Achieve it by: (1) using logical_date ({{ ds }}) instead of NOW() for time-bounded queries, (2) writing outputs to deterministic paths keyed by date (s3://bucket/date=2026-05-19/data.parquet), (3) using INSERT OVERWRITE or MERGE instead of INSERT APPEND in SQL tasks, (4) making downstream steps upsert (not insert-duplicate) by design. Idempotent tasks enable safe backfills, retries, and incident recovery.
Backfill triggers historical DAG runs for dates between start_date and today: `airflow dags backfill -s 2026-01-01 -e 2026-05-01 my_dag`. For backfill to produce correct results, tasks must be idempotent (same logical_date → same output regardless of when the task runs) and catchup=True must be enabled. Backfill runs are sequential by default (use -p for parallelism). If tasks are not idempotent, backfill overwrites current data with wrong historical data — this is the most common and most damaging Airflow bug in production.
CDC captures every row-level change from the source database in real time (latency <1s) versus batch ETL which runs on a schedule (latency = batch interval, often 1 hour to 1 day). Use CDC when: (1) features must be fresh for real-time model serving (fraud detection, recommendation), (2) you need to sync a search index or vector store continuously with the source database, (3) training data must reflect the exact state at a specific point in time for point-in-time correctness. Use batch ETL when latency is acceptable and the simpler operational model is worth it.
A replication slot is a durable marker that tells Postgres the minimum WAL position it must retain for a specific consumer (like Debezium). Postgres keeps all WAL since the slot's last acknowledged position. If the consumer stops, WAL accumulates indefinitely because the slot is never advanced. This can fill the Postgres data volume and crash the database. Always monitor pg_replication_slots for lag, set a disk-usage alert, and immediately drop orphaned slots.
Redis for low-latency online serving; S3/DuckDB/BigQuery for offline training; Feast architecture; shared computation code.
A feature store separates feature computation from model training and serving, ensuring the same feature values are available in real time (online store) and in historical batch form (offline store). The online store is optimised for low-latency lookups (1–10ms) during model inference: Redis Hash or DynamoDB, keyed by entity ID (user_id, item_id, session_id). The offline store is optimised for high-throughput historical retrieval during training data generation: S3 + Parquet, BigQuery, Redshift, or DuckDB. The critical constraint: both stores must use the same feature computation logic. If the online store computes "average purchase value in the last 30 days" using one code path and the offline store uses another, the model trains on different features than it serves. This is training-serving skew. Feast is the dominant open-source feature store framework. Core concepts: FeatureView (how to compute a feature from a data source, plus TTL), Entity (the primary key — user_id, item_id), FeatureStore (the client orchestrating reads/writes). Materialisation pushes computed features from the offline source into the online store. get_historical_features() for training data, get_online_features() for inference.
Point-in-time join; target leakage from future features; as_of_timestamp in entity_df; temporal partitioning of feature tables.
Point-in-time correctness means that when building a training dataset, each feature value is the value that was available at the time of the label event — not the current value, not a value from after the event. Violating this causes target leakage: the model trains on information it would not have had access to in production, producing inflated eval metrics that collapse in production. Example: a fraud detection model's label is "transaction flagged as fraud on 2026-03-15". The feature "user_account_balance_7d" should be the balance in the 7 days before 2026-03-15, not the current balance. If the feature table is joined without a timestamp constraint, and the account balance was updated after the fraud event (e.g., account was frozen, balance went to 0), the model trains on the post-fraud balance — perfect predictive signal that does not exist at inference time. The offline AUC is 0.98; production AUC is 0.62. Implementation: entity_df must include an event_timestamp column per row. The feature store performs an as-of join: for each (entity, timestamp) row, find the most recent feature value with feature_timestamp ≤ event_timestamp. In Feast, this is get_historical_features(entity_df, features) where entity_df has event_timestamp.
Root causes of skew; shared preprocessing functions; feature distribution monitoring; KL divergence and PSI alerts.
Training-serving skew is any difference between the feature distribution the model was trained on and the distribution it receives at serving time. It is the most common and most insidious cause of silent model degradation. Root causes: (1) Different code paths — Python preprocessing in the training notebook vs SQL in the serving pipeline; (2) Different data sources — training on a DB snapshot, serving from a live API; (3) Online store staleness — materialisation job falls behind, serving stale features; (4) Feature drift — real-world distribution shifts over time (concept drift); (5) Null handling differences — Python returns 0 for missing values, SQL returns NULL, model predicts differently. Prevention: define all feature transformations in a single reusable function or dbt model. Both the training pipeline and the serving pipeline import and call the same function. Log feature values at serving time, compute distribution statistics (mean, std, quantiles), and compare to the training distribution using Population Stability Index (PSI) or Kullback-Leibler divergence. Alert when PSI > 0.2 for any feature. Monitor not just the model outputs but the model inputs.
Training-serving skew is invisible until a model fails in production. The only cure is using the same feature computation path at training time and serving time — not similar code, the same code.
from feast import FeatureStore, Entity, FeatureView, Field, FileSource
from feast.types import Float64, Int64, String
from datetime import timedelta
import pandas as pd
# Define entity and data source
user = Entity(name='user_id', join_keys=['user_id'])
user_stats_source = FileSource(
path='s3://features/user_stats/*.parquet',
timestamp_field='event_timestamp',
)
# FeatureView: shared definition for both training and serving
user_features_view = FeatureView(
name='user_features',
entities=[user],
ttl=timedelta(days=7), # features expire after 7 days
schema=[
Field(name='total_sessions', dtype=Int64),
Field(name='avg_session_s', dtype=Float64),
Field(name='conversion_rate', dtype=Float64),
Field(name='last_active_at', dtype=String),
],
source=user_stats_source,
)
store = FeatureStore(repo_path='.')
# Training: get historical features with point-in-time join
entity_df = pd.DataFrame({
'user_id': ['u1', 'u2', 'u3'],
'event_timestamp': pd.to_datetime(['2026-01-15', '2026-02-20', '2026-03-10']),
})
training_df = store.get_historical_features(
entity_df=entity_df,
features=['user_features:total_sessions', 'user_features:conversion_rate'],
).to_df()
# Serving: get online features at inference time (< 10ms)
online_features = store.get_online_features(
features=['user_features:total_sessions', 'user_features:conversion_rate'],
entity_rows=[{'user_id': 'u1'}],
).to_dict() import pandas as pd
from feast import FeatureStore
store = FeatureStore(repo_path='.')
# Label dataset: user_id + time of the label event
labels = pd.DataFrame({
'user_id': ['u1', 'u2', 'u3'],
'label': [1, 0, 1],
# CRITICAL: event_timestamp = when the label was generated
'event_timestamp': pd.to_datetime(['2026-01-15', '2026-02-20', '2026-03-10']),
})
# ✅ CORRECT: point-in-time join — features as-of the label timestamp
training_df = store.get_historical_features(
entity_df=labels,
features=[
'user_features:total_sessions', # sessions BEFORE the label date
'user_features:avg_order_value', # avg order BEFORE the label date
],
).to_df()
# ❌ WRONG: naive join without timestamp — joins current feature values
# (uses features from AFTER the label event = target leakage)
current_features = pd.read_parquet('s3://features/user_features/latest/')
leaky_df = labels.merge(current_features, on='user_id') # DO NOT DO THIS
# Verify no leakage: feature timestamps must be <= label timestamps
assert (training_df['user_features__event_timestamp']
<= training_df['event_timestamp']).all(), "Target leakage detected!" import numpy as np
from functools import lru_cache
# ✅ Shared feature computation — used by both training and serving
def compute_user_features(session_count: int | None,
total_value: float | None,
days_since_last: int | None) -> dict:
"""Single source of truth for user feature engineering."""
return {
'session_count': session_count or 0,
'avg_daily_sessions': (session_count or 0) / max(days_since_last or 1, 1),
'log_total_value': np.log1p(total_value or 0),
'is_recent_user': int((days_since_last or 999) < 7),
}
# Training pipeline
train_features = [compute_user_features(**row) for row in train_raw]
# Serving pipeline — same function
serving_features = compute_user_features(
session_count=redis_client.hget(f'user:{user_id}', 'sessions'),
total_value=redis_client.hget(f'user:{user_id}', 'total_value'),
days_since_last=redis_client.hget(f'user:{user_id}', 'days_since_last'),
)
def population_stability_index(expected: np.ndarray,
actual: np.ndarray,
n_bins: int = 10) -> float:
"""PSI > 0.1 = minor shift, > 0.2 = significant shift, > 0.25 = alert."""
breaks = np.percentile(expected, np.linspace(0, 100, n_bins + 1))
exp_pct = np.histogram(expected, bins=breaks)[0] / len(expected) + 1e-8
act_pct = np.histogram(actual, bins=breaks)[0] / len(actual) + 1e-8
psi = np.sum((act_pct - exp_pct) * np.log(act_pct / exp_pct))
return float(psi) Redis stores only the current feature value per entity (latest snapshot). Training a model requires point-in-time historical feature values: what were user_id u1's features on 2026-01-15, 3 months before the label event? Redis cannot answer this. The offline store (S3 + Parquet with event_timestamp column) preserves the full history and supports point-in-time joins. Redis handles real-time serving at inference time (1–10ms). Both are needed for correct model training and correct serving — the feature store ensures they use the same computation.
Feast (Feature Store) is an open-source framework for defining, storing, and serving ML features. It solves three problems: (1) Training-serving skew — one FeatureView definition is shared by both historical retrieval (training) and online retrieval (serving), ensuring the same computation. (2) Point-in-time correctness — get_historical_features() joins feature values to label events using the exact timestamp, preventing future data leakage. (3) Feature reuse — multiple models can share the same FeatureViews without duplicating computation logic.
Target leakage occurs when a model trains on feature values that contain information about the target that would not be available at inference time — typically because future data is accidentally included. Leakage manifests as: offline eval metrics (AUC, F1, accuracy) significantly higher than production metrics (a gap of 0.10 or more is suspicious), models that appear to "overfit" but actually learned the leaked signal, and performance degradation that happens immediately at deployment rather than gradually over time.
A point-in-time join finds, for each (entity, label_timestamp) pair in the training set, the most recent feature value where feature_timestamp ≤ label_timestamp. It answers: "what was the feature value just before the label event?" Feast implements this in get_historical_features(): it takes an entity_df with an event_timestamp column and performs a time-bounded join against the offline feature store, using the event_timestamp as the as-of time for each row. Without this, a naive join picks up the current or latest feature value, regardless of when it was computed.
The five main causes: (1) Different code paths for the same feature in training vs serving (Python vs SQL, different libraries). (2) Different data sources (training from DB dump, serving from live API). (3) Online store staleness — materialisation lag means serving stale values. (4) Feature drift — the real-world distribution shifts after training. (5) Preprocessing differences — null handling, scaling, encoding applied differently. Prevention: single shared feature computation function, feature store materialisation from the same source, and input feature monitoring in production.
PSI measures how much a distribution has shifted between two periods: PSI = Σ (actual% - expected%) × ln(actual%/expected%) across bins. Interpretation: PSI < 0.1 = minimal change, 0.1–0.25 = moderate shift (investigate), > 0.25 = significant shift (alert and investigate model). Compute PSI for each model input feature weekly by comparing the serving distribution to the training distribution. PSI > 0.2 on a critical feature is a leading indicator of model performance degradation, detectable before AUC or CTR metrics degrade.
In-process OLAP, no server; reading Parquet/CSV/S3 natively; vectorised columnar execution; Python API; ML feature computation.
DuckDB is an embedded, in-process OLAP database — no server, no separate process, no cluster. It runs inside Python, R, or a CLI and can query Parquet files on S3, local CSV, Delta Lake, and Iceberg tables without loading them into memory first. The execution model is columnar and vectorised: queries read only the columns needed, apply SIMD operations on batches of values, and leverage file-level and row-group statistics for aggressive predicate pushdown. The result is analytical query performance competitive with Spark for single-node workloads. For AI/ML feature engineering: DuckDB can read all Parquet files in an S3 prefix with a wildcard path, join them, apply window functions, and write the result to a new Parquet file — in minutes, from a notebook, with full SQL. This replaces entire Spark jobs for teams without dedicated data infrastructure. Key capabilities: full SQL (window functions, CTEs, ASOF JOIN for point-in-time joins), Parquet metadata pushdown (skip row groups by min/max statistics), parallel execution on all CPU cores, and a Python API that integrates directly with pandas and Arrow.
Slot-based vs credit pricing; date partitioning; clustering keys; materialized views; query result caching; cost optimisation.
BigQuery and Snowflake are the two dominant cloud data warehouses for AI/ML teams operating at scale. BigQuery pricing: on-demand charges per TB of data scanned ($5/TB); reservation slots provide predictable capacity. Cost optimisation levers: partition tables by DATE or TIMESTAMP (require a partition filter in queries to prevent full scans), cluster by high-cardinality filter columns (clustering reduces bytes scanned by pre-sorting within partitions), use materialized views for recurring aggregations (automatically refreshed), cache query results (same query within 24 hours returns cached results at no cost). Snowflake pricing: virtual warehouses (compute) billed per second of activity; storage billed separately. Scale compute independently of storage. Zero-copy cloning creates instant snapshots of tables/schemas without duplicating data — ideal for creating isolated training datasets or experiment branches. Time travel (up to 90 days by default) lets you query historical data states: SELECT * FROM table AT(TIMESTAMP => '2026-01-01'). Snowpark runs Python/Scala/Java directly inside Snowflake's compute engine — useful for model inference close to the data. For ML feature pipelines: partition feature tables by feature_date in BigQuery, cluster by entity_id. Use materialized views for pre-aggregated features that are expensive to recompute. Monitor slot utilization and query costs in the BigQuery Information Schema.
Column pruning, predicate pushdown, RLE and dictionary encoding; Parquet row groups and page statistics; ORC vs Parquet; small-file problem.
Columnar storage stores all values for a column together on disk, rather than row-by-row. This enables two key optimisations: column pruning (a query on 3 of 50 columns reads ~6% of the data) and high compression (consecutive similar values in a column — dates, user IDs, status enums — compress far better than interleaved row data). Common encoding schemes: Run-Length Encoding (RLE) stores "value repeated N times" rather than N copies — great for low-cardinality columns (status = 'active' × 10 000). Dictionary encoding replaces high-frequency strings with integer codes and a dictionary table — reduces string column storage 80–90%. Delta encoding for timestamps (store differences rather than absolute values). Parquet file structure: file → row groups (default 128 MB) → column chunks → pages. Each column chunk has min/max statistics and a Bloom filter. Query engines read the footer first (microseconds), use statistics to skip entire row groups, then read only required column chunks. This is predicate pushdown. ORC vs Parquet: ORC is optimised for Hive/Hadoop/Spark writes and has better ACID support (row-level inserts/updates in Hive ORC transactions). Parquet is the standard for the Arrow/Iceberg/Delta/BigQuery ecosystem — use Parquet as the default for all new AI/ML data lake work. Small-file problem: many tiny Parquet files (< 10 MB) have high S3 LIST API overhead and poor compression. Target row groups of 128–512 MB per file.
DuckDB changed local analytics: a single process can query terabytes of Parquet at columnar speed without a cluster, making feature engineering viable for any team regardless of infrastructure budget.
import duckdb
con = duckdb.connect()
# Install and load S3 extension
con.execute("INSTALL httpfs; LOAD httpfs;")
con.execute("""
SET s3_region='us-east-1';
SET s3_access_key_id='...';
SET s3_secret_access_key='...';
""")
# Query all Parquet files in an S3 prefix (no data loading required)
features = con.execute("""
WITH base AS (
SELECT
user_id,
DATE_TRUNC('day', event_ts) AS event_date,
SUM(value) AS daily_value,
COUNT(*) AS daily_events,
MAX(event_ts) AS last_event_ts
FROM read_parquet('s3://data-lake/events/year=2026/month=05/*.parquet')
WHERE event_type = 'purchase'
GROUP BY user_id, event_date
)
SELECT
user_id,
event_date,
daily_value,
daily_events,
AVG(daily_value) OVER (
PARTITION BY user_id
ORDER BY event_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS rolling_7d_avg_value
FROM base
ORDER BY user_id, event_date
""").arrow() # Returns Apache Arrow table — zero-copy to pandas/polars
# Write directly to Parquet for offline feature store
con.execute("""
COPY (SELECT * FROM features)
TO 's3://features/user_daily/date=2026-05-20/features.parquet'
(FORMAT PARQUET, COMPRESSION ZSTD)
""") -- BigQuery: partitioned by date, clustered by user_id
CREATE TABLE features.user_daily_features
PARTITION BY feature_date
CLUSTER BY user_id
OPTIONS (
require_partition_filter = TRUE, -- prevent accidental full-table scans
partition_expiration_days = 365
)
AS
SELECT
user_id,
DATE(event_ts) AS feature_date,
COUNT(*) AS event_count,
SUM(purchase_value) AS total_value,
MAX(event_ts) AS last_active
FROM `raw.events`
WHERE DATE(event_ts) = CURRENT_DATE()
GROUP BY user_id, DATE(event_ts);
-- Query: partition filter required (won't scan full table)
SELECT user_id, total_value, event_count
FROM features.user_daily_features
WHERE feature_date BETWEEN '2026-04-20' AND '2026-05-20'
AND user_id IN UNNEST(@user_ids)
ORDER BY total_value DESC;
-- Materialized view: pre-aggregate 30-day rolling features
CREATE MATERIALIZED VIEW features.user_30d_features AS
SELECT
user_id,
SUM(total_value) AS value_30d,
SUM(event_count) AS events_30d
FROM features.user_daily_features
WHERE feature_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY user_id; import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
# Write Parquet with compression, statistics, and partitioning
table = pa.Table.from_pandas(df)
pq.write_to_dataset(
table,
root_path='s3://features/user_daily/',
partition_cols=['feature_year', 'feature_month'],
basename_template='part-{i}.parquet',
existing_data_behavior='overwrite_or_ignore',
write_options=pq.ParquetWriteOptions(
compression='zstd', # best ratio/speed for analytical data
use_dictionary=True, # dictionary encoding for string columns
write_statistics=True, # enable row-group min/max statistics
row_group_size=500_000, # ~128MB per row group
data_page_size=1_024 * 1_024, # 1MB pages
),
)
# Read with column pruning (reads only 2 of many columns)
dataset = ds.dataset('s3://features/user_daily/', format='parquet')
result = dataset.to_table(
columns=['user_id', 'total_value'], # column pruning
filter=ds.field('feature_month') == 5, # predicate pushdown on partition
) DuckDB uses columnar vectorised execution: it reads data column by column (skipping unreferenced columns), processes batches of values with SIMD CPU instructions, and applies predicate pushdown from file-level and row-group statistics to skip large data ranges without reading them. It parallelises across all CPU cores automatically. For Parquet files, it reads only the columns and row groups needed by the query — a SELECT on 3 columns of a 100-column file reads ~3% of the data. This combination achieves competitive performance with distributed systems like Spark on single-node datasets.
DuckDB: datasets that fit in or can be streamed from a single node (<1 TB processed per query), local development and exploratory analysis, teams without GCP infrastructure, cost-sensitive workloads (DuckDB is free), and workflows where Python-native integration (Arrow, pandas) is important. BigQuery: datasets >1 TB, queries requiring massive parallelism, team collaboration on shared query results, scheduled production queries with Airflow integration, or when the data already lives in BigQuery. For most teams, DuckDB is the correct default for feature engineering; migrate to BigQuery when data volume or collaboration requirements exceed DuckDB's single-node capacity.
BigQuery bills per bytes scanned. A date-partitioned table stores data in separate physical partitions per date. When a query includes a WHERE clause on the partition column (feature_date = '2026-05-20'), BigQuery only reads the matching partition, skipping all others. A query on 1 day of a 3-year table reads 1/1095th of the data — reducing cost proportionally. Setting require_partition_filter = TRUE prevents accidental full-table scans by requiring all queries to include the partition filter.
Zero-copy cloning creates an instant snapshot of a table, schema, or database without physically duplicating the data — the clone references the same underlying storage files as the original, and only diverges when writes occur (copy-on-write). For ML: create an isolated training dataset snapshot without the cost or time of a full data copy, experiment on a cloned schema without affecting production, or create time-bounded training windows. CREATE TABLE train_data_2026q1 CLONE raw.events creates the snapshot in seconds regardless of data size.
Row storage stores all columns for one row together (good for OLTP: read/write one full record). Columnar storage groups all values for one column together (good for OLAP: read 3 of 50 columns, process only 6% of disk data). Benefits: column pruning (skip unreferenced columns entirely), compression (repeated or similar values in a column compress 5–20×), vectorised processing (SIMD operations on batches of the same type), and predicate pushdown (min/max statistics per row group let the engine skip sections of the file before reading).
Small files (< 10 MB) cause two problems: (1) S3/HDFS metadata overhead — listing, opening, and reading file metadata for thousands of files takes seconds to minutes before any data is processed; (2) poor compression — Parquet's columnar statistics and dictionary encoding work best on large row groups. Fix by: writing larger row groups (128 MB–1 GB per file), using compaction jobs (DuckDB COPY, Spark coalesce, or Delta Lake OPTIMIZE) to merge small files periodically, and partitioning at a coarser granularity (daily instead of hourly per-user).
Transaction log for ACID on object storage; MERGE INTO for upserts; optimistic concurrency; Delta vs Iceberg feature comparison.
Delta Lake (Databricks, open-sourced) adds a transaction log (_delta_log) to a directory of Parquet files, giving ACID semantics on S3 or ADLS. Every write is an atomic transaction recorded in a JSON log entry. Reads use the log to determine which Parquet files constitute the current table state. ACID operations: INSERT OVERWRITE (idempotent partition replace), MERGE INTO (upsert — insert new rows, update existing rows by key, delete removed rows in one atomic operation), DELETE WHERE (row-level deletes), and UPDATE SET (row-level updates). Concurrent writers use optimistic concurrency: each writer reads the current log version, writes its Parquet files, and attempts to commit a new log entry — if another writer committed first, the log entry fails and the writer retries or aborts. Apache Iceberg (Netflix, now Apache top-level) is an open table format specification: any engine that understands Iceberg metadata (Spark, Flink, Dask, Trino, BigQuery, DuckDB, Athena) can read and write the same table. Hidden partitioning: Iceberg tracks partition evolution in metadata, so queries never need to know the partition scheme — the query planner handles partition pruning automatically. Both support time travel, schema evolution, and partition evolution. Delta is tightly integrated with Databricks. Iceberg is the better choice for multi-engine open environments.
VERSION AS OF / TIMESTAMP AS OF; safe vs unsafe schema changes; partition evolution in Iceberg; schema registry vs lake schema management.
Time travel allows querying a table as it existed at a previous point in time. Delta Lake: SELECT * FROM delta.`s3://path/` VERSION AS OF 5 or TIMESTAMP AS OF '2026-01-01'. delta-rs: DeltaTable(...).load_as_version(5). Iceberg: SELECT * FROM catalog.table FOR SYSTEM_VERSION AS OF 12345 or FOR SYSTEM_TIME AS OF '2026-01-01'. Both retain old Parquet files for the configured retention period (Delta: 7 days by default via VACUUM; Iceberg: via expiry snapshots). Use cases: audit a model's training data as of a specific run, restore data accidentally deleted by a bug, compare two historical states, reproduce a specific experiment's feature values. Schema evolution enables adding, renaming, or (carefully) dropping columns without rewriting the table. Safe changes: add a nullable column (old Parquet files return NULL for the new column — fully backward-compatible), widen a type (INT32 → INT64, FLOAT → DOUBLE). Unsafe changes that break existing readers: rename a column (old files have the old name; readers expecting the new name get NULL), drop a non-nullable column (old files still have it; Parquet schema mismatch), narrow a type. Iceberg handles column renaming transparently via column IDs in metadata — the physical Parquet files use numeric IDs, not column names, so renames are metadata-only operations. Delta Lake requires explicit schema migration steps.
Bronze/Silver/Gold layer design; partition strategy per layer; idempotent partition overwrite; when to skip layers; feature versioning in Gold.
The medallion architecture organises a lakehouse into three quality tiers: Bronze (raw), Silver (validated and cleaned), and Gold (business-ready and ML-ready). Bronze is an exact, append-only replica of the source — raw JSON, CSV, CDC events, API responses. No transformations, no business logic, maximum fidelity. Bronze enables reprocessing from source at any time without re-ingestion. Silver applies schema enforcement, deduplication, null handling, type casting, and light normalisation. Gold applies business logic: joins, aggregations, ML features, denormalised fact tables. Each layer has a stronger quality guarantee than the previous. Partition strategy: Bronze by ingestion_date (when it arrived), Silver by event_date (business date of the event), Gold by feature_window (the computation window relevant to model training). Write strategy: INSERT OVERWRITE or MERGE for idempotency — writing the same Bronze partition twice produces the same result. Landing zone: a transient area (S3 prefix, GCS bucket) where raw files are dropped before Bronze ingestion. Bronze ingestion is triggered when the landing zone has new files. Landing zone is never queried directly. For ML feature pipelines: Gold tables should be versioned (feature_version column or partition) so models trained on Gold v1 can be traced back to the exact computation logic that produced them.
The lakehouse adds ACID, time travel, and schema evolution to cheap object storage — eliminating the need to choose between a data lake and a data warehouse. Design your medallion layers from the start; retrofitting them is expensive.
from deltalake import DeltaTable, write_deltalake
import pyarrow as pa
# Initial write: create Delta table from Arrow/pandas
write_deltalake(
's3://datalake/silver/users/',
new_data_table,
mode='append',
storage_options={'AWS_REGION': 'us-east-1'},
)
# MERGE INTO: upsert (insert new, update changed rows)
dt = DeltaTable('s3://datalake/silver/users/', storage_options={'AWS_REGION': 'us-east-1'})
dt.merge(
source=updates_arrow_table,
predicate='target.user_id = source.user_id',
source_alias='source',
target_alias='target',
).when_matched_update({
'email': 'source.email',
'updated_at': 'source.updated_at',
}).when_not_matched_insert({
'user_id': 'source.user_id',
'email': 'source.email',
'created_at': 'source.created_at',
'updated_at': 'source.updated_at',
}).execute()
# OPTIMIZE: compact small files (run daily)
dt.optimize.compact()
dt.optimize.z_order(columns=['user_id']) # co-locate by user_id for faster lookups
# Vacuum: remove old Parquet files no longer referenced
dt.vacuum(retention_hours=168) # keep 7 days for time travel from deltalake import DeltaTable
import pyarrow as pa
import pyarrow.parquet as pq
dt = DeltaTable('s3://datalake/silver/users/')
# Time travel: read table as of a specific version
df_v5 = dt.load_as_version(5).to_pyarrow()
# Time travel: as of a timestamp (audit model training data)
import datetime
dt_jan = DeltaTable(
's3://datalake/silver/users/',
version=dt.version_as_of(datetime.datetime(2026, 1, 15))
)
# Safe schema evolution: add nullable column
from deltalake.writer import write_deltalake
write_deltalake(
's3://datalake/silver/users/',
new_data_with_extra_column, # has a new 'country_code' column
mode='append',
schema_mode='merge', # adds new column to Delta schema; old rows = NULL
)
# Verify history
history = dt.history()
for h in history[:5]:
print(h['version'], h['timestamp'], h['operation'])
# 7 2026-05-20T00:00 WRITE
# 6 2026-05-19T00:00 MERGE
# 5 2026-05-18T00:00 WRITE from deltalake import write_deltalake
import pyarrow as pa, pyarrow.parquet as pq, pyarrow.compute as pc
from datetime import date
ingestion_date = str(date.today()) # '2026-05-20'
# ── BRONZE: raw ingestion (append-only, no transformation) ──────────────────
raw_arrow = pq.read_table('s3://landing/events/2026-05-20/*.json')
write_deltalake(
f's3://datalake/bronze/events/ingestion_date={ingestion_date}/',
raw_arrow,
mode='overwrite', # idempotent: re-run = overwrite same partition
)
# ── SILVER: validate, deduplicate, cast types ───────────────────────────────
bronze = pq.read_table(f's3://datalake/bronze/events/ingestion_date={ingestion_date}/')
silver = (
bronze
.filter(pc.field('event_id').is_valid()) # drop nulls
.filter(pc.field('user_id').is_valid())
# Deduplicate on event_id (keep first occurrence)
# ... deduplicate logic here ...
)
write_deltalake(
f's3://datalake/silver/events/event_date={ingestion_date}/',
silver,
mode='overwrite',
)
# ── GOLD: business features (ML-ready, versioned) ───────────────────────────
# Use DuckDB for feature computation over Silver
import duckdb
gold_features = duckdb.execute("""
SELECT user_id, COUNT(*) AS events_30d,
SUM(value) AS value_30d, CURRENT_DATE AS feature_date
FROM read_deltalake('s3://datalake/silver/events/')
WHERE event_date >= CURRENT_DATE - INTERVAL 30 DAYS
GROUP BY user_id
""").arrow()
write_deltalake('s3://datalake/gold/user_features/', gold_features, mode='overwrite') Delta Lake stores a transaction log (_delta_log/) alongside the Parquet files. Each committed transaction adds a new JSON log entry listing which Parquet files were added, removed, or modified. Reads reconstruct the current table state by replaying the log from the latest checkpoint. ACID properties: Atomicity — the log entry is committed atomically (S3 PUT + RENAME); Consistency — constraints can be enforced at commit; Isolation — concurrent writers use optimistic concurrency (retry on conflict); Durability — the log is persistent in S3. The Parquet files themselves are immutable — DELETE and UPDATE write new files, not modifications to existing ones.
Choose Iceberg when: (1) multiple engines must read and write the same table (Spark for ingestion + Trino for BI + Flink for streaming + BigQuery for ad-hoc) — Iceberg is an open standard with broad engine support; (2) you need hidden partitioning — queries never need to know the partition scheme, reducing query complexity; (3) you want engine-agnostic governance and catalog portability. Choose Delta Lake when: you are fully on Databricks (tighter integration, auto-optimize, DeltaSharing), or you are using delta-rs for Python-native lakehouse without Spark.
Record the Delta table version or Iceberg snapshot ID at training time as part of the experiment metadata (MLflow run parameters or a training config file). To reproduce: dt.load_as_version(recorded_version) or SELECT * FROM iceberg_table FOR SYSTEM_VERSION AS OF recorded_snapshot_id. This returns exactly the rows that existed when the model was trained, enabling exact reproduction of training features. This requires not running VACUUM before the retention window — extend retention for tables used as training data sources.
Both Delta and Iceberg: safe = add nullable column (backward-compatible, old files return NULL), widen numeric type (INT32→INT64). Delta safe: reorder columns (transparent). Delta unsafe: rename column (breaks name-based readers), drop column (schema mismatch on old files), narrow type. Iceberg safe: rename column (uses column IDs in metadata — transparent to readers), drop column (readers get NULL), rearrange columns. Iceberg is significantly more flexible for schema evolution because it tracks columns by ID, not by name.
Three layers provide independent quality guarantees: Bronze can be reprocessed if Silver logic changes, Silver can be recomputed if Gold logic changes, and each layer is independently auditable. Skip Silver when: the source is already clean and schema-enforced (an internal API with a Pydantic contract), the data volume is small (<1 GB/day), or the pipeline is simple enough that the overhead of three layers exceeds the benefit. Never skip Bronze — the raw source is your only recovery point if downstream processing has a bug.
Bronze: partition by ingestion_date (when the data arrived), not event_date — this makes reprocessing simple (reprocess all data ingested on a specific day). Silver: partition by event_date (the business timestamp of the event) to enable efficient time-range queries. Gold: partition by feature_window or model_training_date — the window relevant to the model consuming it. Different partition granularities per layer reflect the different query patterns of each consumer.
Expectation suites; checkpoints; Data Docs; built-in vs custom expectations; blocking vs warning severity; Airflow integration.
Great Expectations (GX) is a Python data validation framework built around expectations — declarative assertions about the shape, content, and distribution of data. An Expectation Suite is a collection of expectations grouped for a dataset or pipeline stage. A Checkpoint runs a suite against a batch of data and produces a validation result and Data Docs (auto-generated HTML report). Core built-in expectations: expect_column_values_to_not_be_null (null rate = 0), expect_column_values_to_be_in_set (enum membership), expect_column_values_to_be_between (range check), expect_table_row_count_to_be_between (freshness proxy), expect_column_mean_to_be_between (distribution check), expect_column_unique_value_count_to_be_between (cardinality check). Severity: result_format='COMPLETE' and raised=True blocks the pipeline on failure; raised=False logs a warning without stopping. Use blocking expectations for structural violations (null PK, wrong type) and warning expectations for distribution drift. For AI/ML pipelines: run a GX checkpoint at every ETL/ELT stage boundary — after Bronze ingestion (schema check), after Silver cleaning (null rate and row count), before Gold feature computation (distribution checks). Integrate with Airflow via a GXValidateOperator task that fails the DAG if critical expectations fail.
Data contracts between producers and consumers; Pydantic runtime validation; schema registry for Kafka; preventing silent schema drift.
A data contract is a formal agreement between a data producer and data consumer specifying the schema, semantics, SLAs, and ownership of a dataset. Without contracts, schema drift is discovered when a downstream model fails — often weeks after the change. With a contract, the producer gets an automated test failure before merging a breaking change. Pydantic provides runtime schema validation for Python: define a model with typed fields, and model.model_validate(dict) raises a ValidationError if the input violates the schema. Use strict mode (model_config = ConfigDict(strict=True)) to disallow implicit type coercions (e.g., "123" → int). Custom validators with @field_validator enforce business rules: user_id must be a valid UUID, event_timestamp must be in the past, feature_value must be non-negative. For Kafka pipelines, a schema registry (Confluent Schema Registry, AWS Glue) serialises schemas to Avro or Protobuf and enforces compatibility modes: BACKWARD (new schema can read old data), FORWARD (old schema can read new data), FULL (both). For AI/ML systems: validate at every system boundary — API ingress, message queue consumption, feature store writes, model input. The cost of a Pydantic validation is microseconds; the cost of training a model on corrupted data is weeks.
Freshness expectations; null rate and cardinality drift; row count anomalies; Z-score and PSI alerting; completeness monitoring.
Data freshness SLA: the maximum acceptable age of data at consumption time. For a real-time recommendation model, features must be fresher than 15 minutes. For a daily batch scoring job, features can be up to 26 hours old. Freshness checks: query MAX(event_timestamp) from the feature table and compare to NOW(). Alert if the gap exceeds the SLA threshold. Completeness: the fraction of expected records that are present. For a feature table expected to have one row per active user (500 000 users), a completeness check verifies row_count ≥ 490 000. Row count drops > 10% from the previous day are anomalies. Null rate drift: if the feature column avg_session_duration has historically <0.5% nulls and suddenly has 15% nulls, a data source change has likely broken the computation. Cardinality drift: the number of distinct values in a categorical column. If the feature model_name column had 5 distinct values yesterday and has 50 today, a new model was deployed and the feature pipeline picked it up — or a bug introduced random strings. Z-score on rolling metric history detects sudden changes. Population Stability Index (PSI) detects gradual distribution drift. Alert thresholds: PSI > 0.1 for informational, > 0.25 for critical. Null rate: alert if current null rate / baseline null rate > 3.0.
A data contract written before the pipeline is built saves more debugging hours than all the monitoring added after the fact. Validate at boundaries; trust nothing that crosses a system or team boundary.
import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest
context = gx.get_context()
# Add a pandas datasource for in-memory DataFrame validation
datasource = context.sources.add_pandas('my_pandas')
asset = datasource.add_dataframe_asset('features')
# Build expectation suite
suite = context.add_or_update_expectation_suite('feature_suite')
validator = context.get_validator(
batch_request=asset.build_batch_request(dataframe=features_df),
expectation_suite=suite,
)
# Structural expectations — block on failure
validator.expect_column_values_to_not_be_null('user_id')
validator.expect_column_values_to_not_be_null('feature_date')
validator.expect_column_values_to_be_unique('user_id')
# Type expectations
validator.expect_column_values_to_be_in_type_list(
'total_value', ['FLOAT', 'DOUBLE', 'float64']
)
# Distribution expectations — warn only
validator.expect_column_mean_to_be_between(
'total_value', min_value=50.0, max_value=500.0,
result_format='COMPLETE', raise_exception=False
)
# Row count guard (detects pipeline failures upstream)
validator.expect_table_row_count_to_be_between(
min_value=100_000, max_value=5_000_000
)
validator.save_expectation_suite()
# Run checkpoint
result = context.run_checkpoint(
checkpoint_name='daily_feature_check',
batch_request=asset.build_batch_request(dataframe=features_df),
)
assert result.success, f"Data quality check failed: {result}" # blocks pipeline from pydantic import BaseModel, Field, field_validator, ConfigDict
from typing import Literal
from datetime import datetime
import uuid
class UserFeatureEvent(BaseModel):
model_config = ConfigDict(strict=True) # no implicit coercions
event_id: str = Field(..., min_length=36, max_length=36)
user_id: str = Field(..., min_length=1)
event_type: Literal['purchase', 'view', 'click', 'search']
event_ts: datetime
value: float = Field(..., ge=0.0) # non-negative
session_id: str | None = None
@field_validator('event_id')
@classmethod
def must_be_uuid(cls, v: str) -> str:
try:
uuid.UUID(v)
except ValueError:
raise ValueError(f'event_id must be a valid UUID, got: {v!r}')
return v
@field_validator('event_ts')
@classmethod
def must_be_past(cls, v: datetime) -> datetime:
if v > datetime.utcnow():
raise ValueError('event_ts cannot be in the future')
return v
# Validate at Kafka consumer boundary
def consume_event(raw: dict) -> UserFeatureEvent | None:
try:
return UserFeatureEvent.model_validate(raw)
except Exception as e:
send_to_dlq(raw, reason=str(e))
return None
# Validate model input at inference boundary
def predict(features: dict) -> float:
validated = UserFeatureEvent.model_validate(features) # raises on violation
return model.predict(validated.model_dump()) import duckdb, boto3
from datetime import datetime, timedelta, timezone
def check_feature_freshness(
table_path: str,
sla_minutes: int,
timestamp_col: str = 'feature_computed_at',
) -> dict:
con = duckdb.connect()
con.execute("INSTALL httpfs; LOAD httpfs;")
result = con.execute(f"""
SELECT
MAX({timestamp_col}) AS latest_ts,
NOW() AS check_ts,
DATEDIFF('minute', MAX({timestamp_col}), NOW()) AS age_minutes,
COUNT(*) AS row_count,
AVG(CASE WHEN user_id IS NULL THEN 1.0 ELSE 0.0 END) AS user_id_null_rate,
COUNT(DISTINCT model_name) AS model_name_cardinality
FROM read_parquet('{table_path}')
""").fetchone()
latest_ts, check_ts, age_minutes, row_count, null_rate, cardinality = result
alerts = []
if age_minutes > sla_minutes:
alerts.append(f"FRESHNESS BREACH: data is {age_minutes}m old, SLA={sla_minutes}m")
if null_rate > 0.02:
alerts.append(f"NULL RATE ALERT: user_id null rate={null_rate:.2%} > 2%")
if row_count < 400_000:
alerts.append(f"ROW COUNT ALERT: only {row_count:,} rows, expected >= 400K")
return {
'age_minutes': age_minutes,
'row_count': row_count,
'null_rate': null_rate,
'cardinality': cardinality,
'alerts': alerts,
'passed': len(alerts) == 0,
}
result = check_feature_freshness(
table_path='s3://features/user_daily/*.parquet',
sla_minutes=90,
)
if not result['passed']:
send_slack_alert(result['alerts']) # page on-call A GX checkpoint combines a data source, a batch request, and an expectation suite into a runnable validation step. It reads a batch of data, runs all expectations in the suite, and returns a validation result with pass/fail status per expectation and a Data Docs HTML report. In Airflow, add a GXValidateOperator task that runs the checkpoint and sets the downstream task dependency to succeed only if the checkpoint passes. This creates a quality gate that prevents bad data from propagating to model training or feature serving.
Blocking expectations (raise_exception=True, default): the checkpoint result.success=False and the pipeline task fails. Use for: non-nullable columns having nulls, row counts near zero, wrong schema types — violations that make downstream processing incorrect or impossible. Warning expectations (raise_exception=False): the expectation is evaluated and logged to Data Docs, but result.success is not affected. Use for: distribution drift, cardinality changes, mild mean shifts — anomalies that deserve human attention but may be legitimate (seasonality, marketing events).
A schema defines the structure of data (field names, types, nullability). A data contract is broader: it includes the schema, plus semantic definitions (what does "value" mean — revenue in USD? clicks? impressions?), SLAs (freshness within 1 hour, >99.9% completeness), ownership (who produces this data, who consumes it), and the versioning and compatibility policy (backward-compatible changes only, 30-day deprecation notice). A schema tells you what the data looks like; a data contract tells you what the data means and what you can depend on.
Default mode (strict=False) allows implicit coercions: "123" → int, "true" → bool, 1.0 → int. This can silently accept malformed data. Strict mode (strict=True) requires exact type matching: if a field is declared as int and the input is "123" (a string), validation raises ValidationError instead of coercing. For data contracts where silent coercion could mask upstream bugs, use strict=True. For API endpoints where JSON numbers may arrive as strings (JavaScript quirk), use default mode and add explicit validators for the fields where coercion is unacceptable.
At minimum: MAX(event_timestamp) age vs SLA threshold (primary freshness signal), row count compared to expected count and to yesterday's count (detects zero-row and duplicate-row bugs), null rate per critical column compared to baseline (detects upstream data source changes), cardinality per categorical column (detects schema or encoding changes). Secondary: p99 value distribution per numerical feature (detects unit changes, scale errors), distinct source_id count (detects loss of a data source). All metrics should have alerting thresholds derived from 30-day historical baselines.
Compute the null rate for each column daily: COUNT(NULL values) / COUNT(*). Store the result in a metrics table. Compute a rolling baseline (e.g., 7-day average). Alert when current_null_rate > 3× baseline_null_rate. For structured monitoring: use Great Expectations expect_column_proportion_of_unique_values_to_be_between or a custom expectation. For automated detection without predefined thresholds, use a Z-score on the time series of null rates: alert when |z| > 3.
Table-level vs column-level lineage; impact analysis; OpenLineage standard; Marquez metadata store; dbt lineage graph.
Data lineage tracks the provenance of data: where it came from, what transformed it, and where it goes. Table-level lineage shows which tables feed which tables — sufficient for impact analysis at coarse granularity. Column-level lineage tracks which source columns produce which output columns — critical for AI/ML where you need to trace a model input feature back to the exact source column and transformation applied to it. OpenLineage is an open specification for lineage events. Each event has a run (job + run_id), inputs (datasets read), and outputs (datasets written), with facets for schema, statistics, and custom metadata. Emitting OpenLineage events from dbt, Spark, Airflow, and custom Python jobs makes lineage aggregatable across the entire data stack. Marquez is the reference OpenLineage server — it collects events and provides a lineage graph API and UI. dbt generates lineage automatically from model dependencies: SELECT from source A and model B → dbt draws an edge from A and B to the current model. dbt docs generate a navigable lineage DAG. For ML: trace any Gold feature back to its source columns in Bronze, identify all downstream models consuming a Gold table, and perform impact analysis when a source schema changes — "if raw.events.user_revenue is renamed, which models break?".
Monte Carlo, DataHub, Elementary; automated anomaly detection vs manual rules; circuit breakers; ML input feature monitoring.
Data observability extends monitoring beyond predefined rules to automated anomaly detection across freshness, volume, schema, distribution, and lineage. The key distinction from traditional monitoring: observability platforms learn normal baselines from historical data and alert on deviations without requiring manual threshold tuning for every column and table. Monte Carlo: managed SaaS, automatically profiles all tables and fields, detects anomalies using ML on query logs and table metadata, provides circuit breakers to block downstream pipelines when anomalies are detected, and maps issues to affected assets via lineage. DataHub: open-source data catalog + lineage + observability. Supports metadata ingestion from 50+ sources, lineage from dbt/Spark/Airflow/BigQuery, and basic data quality assertions. Elementary: dbt-native observability — runs as a dbt package, collects test results and model run statistics, and generates an observability dashboard using dbt artifacts. ML input monitoring: extend observability to the model serving layer. Log feature distributions from every inference request. Compare the rolling serving distribution to the training distribution using PSI or KL divergence. Alert when model inputs drift — this is the early-warning signal for model performance degradation, detectable weeks before business metrics decline.
DVC for dataset versioning; MLflow dataset logging; model training data provenance; experiment reproducibility requirements.
ML experiment reproducibility requires knowing exactly which dataset, which code version, and which model code was used to produce a specific model artifact. Months after training, a compliance audit or model regression investigation may require reproducing the exact training conditions. DVC (Data Version Control) adds git-like versioning to large data files and datasets. dvc add registers a file or directory, storing a hash in a .dvc file that is committed to git. dvc push uploads the actual data to a remote (S3, GCS, Azure). dvc pull downloads the exact version checked out in git. This makes dataset versions reproducible: git checkout model-v2-branch && dvc pull restores the exact training data used for that branch. MLflow Datasets API logs dataset metadata (name, digest/hash, schema, source) with each training run: mlflow.log_input(). On reproduction, the logged digest identifies the exact dataset version. Combined with DVC (for the actual file) and git (for the code), this creates a three-way reproducibility anchor. For feature stores: record the Feast offline feature retrieval job parameters (as_of_timestamp, entity_df hash) with the MLflow run. For Delta Lake training data: record the table version number. These form a complete provenance chain: code version + data version + feature version = reproducible model.
Lineage turns a data quality alert from a fire into a diagnosis — you know immediately which upstream source changed, which downstream features and models are affected, and who owns each component.
from openlineage.client import OpenLineageClient
from openlineage.client.run import (
RunEvent, RunState, Run, Job,
Dataset, DatasetFacets, SchemaDatasetFacet, SchemaField,
)
from openlineage.client.facet import BaseFacet
import datetime, uuid
client = OpenLineageClient(url='http://marquez:5000')
# Emit lineage event when feature computation runs
run_id = str(uuid.uuid4())
start_event = RunEvent(
eventType=RunState.START,
eventTime=datetime.datetime.utcnow().isoformat() + 'Z',
run=Run(runId=run_id),
job=Job(namespace='feature_store', name='compute_user_30d_features'),
inputs=[
Dataset(
namespace='s3://datalake',
name='silver/events',
facets={'schema': SchemaDatasetFacet(fields=[
SchemaField('user_id', 'STRING'),
SchemaField('event_ts', 'TIMESTAMP'),
SchemaField('purchase_value', 'DOUBLE'),
])},
)
],
outputs=[
Dataset(
namespace='s3://datalake',
name='gold/user_features',
facets={'schema': SchemaDatasetFacet(fields=[
SchemaField('user_id', 'STRING'),
SchemaField('value_30d', 'DOUBLE'),
SchemaField('events_30d', 'INTEGER'),
])},
)
],
producer='feature-pipeline/v2.1',
)
client.emit(start_event)
# --- dbt: query lineage graph (manifest.json) ---
# import json
# manifest = json.load(open('target/manifest.json'))
# for node, meta in manifest['nodes'].items():
# print(node, '->', meta.get('depends_on', {}).get('nodes', [])) from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
DatasetLineageTypeClass, UpstreamClass, UpstreamLineageClass,
)
import datahub.emitter.mce_builder as builder
emitter = DatahubRestEmitter('http://datahub-gms:8080')
# Push lineage: gold table depends on silver table
lineage_mcp = MetadataChangeProposalWrapper(
entityUrn=builder.make_dataset_urn('datalake', 'gold.user_features', 's3'),
aspect=UpstreamLineageClass(upstreams=[
UpstreamClass(
dataset=builder.make_dataset_urn('datalake', 'silver.events', 's3'),
type=DatasetLineageTypeClass.TRANSFORMED,
)
]),
)
emitter.emit_mcp(lineage_mcp)
# Feature distribution monitor: compare serving vs training
import numpy as np
def monitor_feature_distribution(
serving_values: list[float],
training_stats: dict,
feature_name: str,
) -> None:
serving_mean = np.mean(serving_values)
serving_std = np.std(serving_values)
z_mean = abs(serving_mean - training_stats['mean']) / training_stats['std']
psi = compute_psi(np.array(serving_values),
np.array(training_stats['sample']))
if psi > 0.25:
alert(f"CRITICAL: {feature_name} PSI={psi:.3f} > 0.25 — significant distribution shift")
elif z_mean > 3:
alert(f"WARNING: {feature_name} serving mean={serving_mean:.2f} is {z_mean:.1f}σ from training mean") # ── DVC: version the training dataset ─────────────────────────────────────
# dvc add data/training/features_2026q1.parquet
# → creates data/training/features_2026q1.parquet.dvc (committed to git)
# → stores hash in .dvc/cache
# dvc push → uploads to S3 remote
# dvc pull → restores exact file from S3 by hash
import mlflow, mlflow.data
import pandas as pd
from mlflow.data.pandas_dataset import PandasDataset
# Load training data
df = pd.read_parquet('data/training/features_2026q1.parquet')
with mlflow.start_run(run_name='user-ranking-v3') as run:
# Log dataset with schema and hash to MLflow
dataset: PandasDataset = mlflow.data.from_pandas(
df,
source='data/training/features_2026q1.parquet',
name='user_ranking_features',
targets='label',
)
mlflow.log_input(dataset, context='training')
# Log DVC file hash for cross-referencing
with open('data/training/features_2026q1.parquet.dvc') as f:
mlflow.log_text(f.read(), 'dvc_metadata.yaml')
# Log Delta Lake version used for feature extraction
mlflow.log_param('silver_events_delta_version', 142)
mlflow.log_param('gold_features_snapshot_date', '2026-05-19')
mlflow.log_param('feast_as_of_timestamp', '2026-01-15T00:00:00Z')
# ... train model ...
mlflow.sklearn.log_model(model, 'model') Table-level lineage shows which tables depend on which other tables — a directed graph of table-to-table relationships. It answers "which tables are affected if raw.events changes?" Column-level lineage tracks which source columns produce which destination columns through each transformation. It answers "which model features are derived from raw.events.purchase_value, and which transformation was applied?" Column-level lineage is essential for ML because model debugging requires tracing a specific feature value back to its exact source column and the transformation that produced it.
OpenLineage is an open specification for capturing data lineage events: each event describes a job run, its input datasets, and its output datasets, with optional facets (schema, statistics, custom metadata). It is tool-agnostic — dbt, Airflow, Spark, Flink, and custom Python jobs all emit the same OpenLineage event format. This enables a unified lineage graph across the entire data stack, regardless of which tools produce or consume data. The alternative — proprietary lineage per tool (dbt lineage, Airflow lineage, Spark lineage, separately) — creates siloed lineage graphs that cannot be connected across tool boundaries.
Data monitoring: predefined rules and thresholds set by humans (null rate must be <1%, row count must be >100 000). Catches known failure modes. Requires manual maintenance as data evolves. Data observability: automated anomaly detection that learns baselines from historical data and detects deviations without predefined thresholds. Catches novel failure modes that humans didn't anticipate. Platforms like Monte Carlo use ML on table metadata and query patterns to detect anomalies. Observability is proactive; monitoring is reactive. For production AI systems, you need both: observability for unknown failures, monitoring for known SLAs.
A data circuit breaker automatically halts downstream pipelines when a data quality check fails, preventing bad data from propagating to model training, feature serving, or BI reports. Use it for: model training jobs (halt if features have >5% nulls or PSI > 0.25), batch scoring jobs (halt if input data fails freshness SLA), and ETL pipeline stages (halt if row count drops > 20% from yesterday). Circuit breakers require explicit reset (human review + approval) before the downstream pipeline runs again. They convert a passive alert into an active block — appropriate for high-value downstream consumers like production ML models.
DVC (Data Version Control) versions large data files and directories the way git versions code. It stores a hash of each file in a lightweight .dvc pointer file (committed to git) and uploads the actual data to a remote (S3, GCS). Checkout a git branch → dvc pull → exact dataset version is restored. MLflow versions model artifacts, hyperparameters, metrics, and now dataset metadata (mlflow.log_input). Together: git tracks code, DVC tracks data files, MLflow tracks training runs and artifacts. This three-way anchor makes any experiment reproducible: git checkout + dvc pull + mlflow run details = identical conditions.
Minimum: (1) code version (git commit hash), (2) dataset version (DVC hash or Delta table version), (3) feature computation timestamp or Feast as_of_timestamp (for point-in-time correctness), (4) random seeds (numpy, Python, framework), (5) framework and library versions (requirements.txt or conda env). For feature stores: the exact entity_df and as_of_timestamp used in get_historical_features(). For lakehouses: the Delta table version or Iceberg snapshot ID. Without all five, reproduction is probabilistic at best and impossible at worst.