Chapter 7: Production Operations
Chapter at a Glance
Prerequisites: Chapter 1 (evaluation framework), Chapter 3 (feedback collection), Chapter 6 (routing and orchestration), understanding of basic infrastructure concepts
What You Will Learn:
- Why shipping is the starting line: production success requires cost-aware design, observability, and graceful degradation
- How to estimate and compare end-to-end RAG costs across write-time, read-time, retrieval, and generation
- When to choose write-time vs read-time computation based on content stability and query patterns
- How to implement multi-level caching strategies including semantic caching
- What metrics to monitor and how to connect them to the evaluation frameworks from earlier chapters
- How to design fallback and degradation strategies for high availability
- When to scale horizontally vs vertically and how to manage cost-effective growth
Case Study Reference: Construction company maintained improvement velocity in production, reducing cost per query from $0.09 to $0.04 while increasing overall success from 78% to 84% over 12 months
Time to Complete: 75-90 minutes
Key Insight
Shipping is the starting line—production success comes from cost-aware design, observability, and graceful degradation. The gap between a working prototype and a production system is significant. A system that works for 10 queries might fail at 10,000. Features matter less than operational excellence—reliability, cost-effectiveness, and maintainability. Optimize for total cost of ownership, not just model quality.
Learning Objectives
By the end of this chapter, you will be able to:
- Estimate and compare end-to-end RAG costs including write-time processing, read-time computation, retrieval infrastructure, and generation costs
- Choose between write-time and read-time computation based on content stability, query patterns, and latency requirements
- Design multi-level caching strategies including embedding caches, result caches, and semantic caches with appropriate invalidation policies
- Define and monitor key metrics for RAG systems, connecting production monitoring to the evaluation frameworks from Chapter 1
- Implement fallback and degradation strategies to maintain availability under failure conditions
- Select storage and retrieval backends based on scale, operational constraints, and cost requirements
- Apply security and compliance basics including PII handling, RBAC, and audit logging
Introduction
The journey from Chapter 1 to Chapter 6 built a comprehensive RAG system. But shipping that system is just the beginning—production is where the improvement flywheel must keep spinning while managing costs, reliability, and scale.
The Complete System in Production:
You have built a system with:
- Evaluation framework (Chapter 1) measuring 95% routing x 82% retrieval = 78% overall
- Fine-tuned embeddings (Chapter 2) delivering 6-10% improvements
- Feedback collection (Chapter 3) gathering 40 submissions daily vs original 10
- Query segmentation (Chapter 4) identifying high-value patterns
- Specialized retrievers (Chapter 5) each optimized for specific content types
- Intelligent routing (Chapter 6) directing queries to appropriate tools
The Production Challenge: Maintaining this flywheel at scale means:
- Keeping costs predictable as usage grows from 100 to 50,000 queries/day
- Monitoring the 78% success rate and detecting degradation before users notice
- Updating retrievers and routing without breaking the system
- Collecting feedback that improves the system rather than just tracking complaints
For Product Managers
This chapter establishes the operational framework for running RAG systems at scale. Focus on understanding cost structures, the ROI of different optimization strategies, and how to maintain the improvement flywheel in production. The construction company case study demonstrates how to reduce costs while improving quality over time.
For Engineers
This chapter provides concrete implementation patterns for production RAG systems. Pay attention to caching strategies, monitoring implementation, and graceful degradation patterns. The code examples demonstrate production-ready patterns including semantic caching and multi-level fallbacks.
Core Content
Understanding Token Economics
Before optimizing costs, you need to understand where money goes in a RAG system.
For Product Managers
Typical cost breakdown:
| Component | Percentage of Costs | Optimization Potential |
|---|---|---|
| LLM Generation | 60-75% | High (caching, smaller models) |
| Retrieval Infrastructure | 10-20% | Medium (storage tiering) |
| Embedding Generation | 5-10% | Medium (caching, batching) |
| Logging/Monitoring | 5-10% | Low (necessary overhead) |
Key insight: LLM generation dominates costs. Most optimization efforts should focus there first.
Cost Calculation Framework:
Always calculate expected costs before choosing an approach. Open source is often only 8x cheaper than APIs—the absolute cost difference may not justify the engineering effort.
Example: E-commerce search (50K queries/day)
| Approach | Monthly Cost | Engineering Effort |
|---|---|---|
| Pure API | $765/month | Low |
| Pure Self-Hosted | $3,150/month + $8,000 initial | High |
| Hybrid (self-host embeddings, API for generation) | $1,800/month | Medium |
The hybrid approach won because it avoided full self-hosted complexity while controlling high-volume embedding costs.
For Engineers
Cost calculation template:
from dataclasses import dataclass
from typing import Optional
@dataclass
class CostEstimate:
"""Estimate costs for a RAG deployment."""
daily_queries: int
avg_input_tokens: int
avg_output_tokens: int
document_count: int
avg_tokens_per_doc: int
# Pricing (per 1M tokens)
embedding_cost: float = 0.02 # text-embedding-3-small
input_cost: float = 0.15 # gpt-4o-mini input
output_cost: float = 0.60 # gpt-4o-mini output
def calculate_monthly_costs(self) -> dict:
"""Calculate monthly cost breakdown."""
# One-time embedding cost (amortized over 12 months)
embedding_tokens = self.document_count * self.avg_tokens_per_doc
monthly_embedding = (embedding_tokens / 1_000_000 * self.embedding_cost) / 12
# Daily query costs
daily_input = self.daily_queries * self.avg_input_tokens / 1_000_000 * self.input_cost
daily_output = self.daily_queries * self.avg_output_tokens / 1_000_000 * self.output_cost
monthly_query = (daily_input + daily_output) * 30
return {
"monthly_embedding": monthly_embedding,
"monthly_query": monthly_query,
"monthly_total": monthly_embedding + monthly_query,
"cost_per_query": monthly_query / (self.daily_queries * 30)
}
# Example calculation
estimate = CostEstimate(
daily_queries=50_000,
avg_input_tokens=1000,
avg_output_tokens=500,
document_count=100_000,
avg_tokens_per_doc=500
)
costs = estimate.calculate_monthly_costs()
# monthly_total: ~$765
# cost_per_query: ~$0.0005
Hidden costs to account for:
- Re-ranking API calls (can add 20-50% to retrieval costs)
- Failed requests requiring retries (typically 2-5% overhead)
- Development and maintenance time (often larger than infrastructure)
- Monitoring and logging infrastructure
Semantic Caching
Semantic caching reduces costs by recognizing that similar queries can share responses.
For Product Managers
Business value of semantic caching:
A customer support system found that 30% of queries were semantically similar. Using a 0.95 similarity threshold:
- Reduced LLM calls by 28%
- Saved $8,000/month in API costs
- Improved response latency for cached queries from 2s to 50ms
When semantic caching helps most:
| Scenario | Cache Hit Rate | ROI |
|---|---|---|
| Customer support (repetitive questions) | 25-40% | High |
| Internal knowledge base | 15-25% | Medium |
| Research/exploration (unique queries) | 5-10% | Low |
Key decision: Semantic caching requires upfront investment in similarity infrastructure. Calculate expected hit rate before implementing.
For Engineers
Multi-level caching architecture:
from typing import Optional, List
import hashlib
import numpy as np
from datetime import datetime, timedelta
class MultiLevelCache:
"""Three-tier caching for RAG systems."""
def __init__(
self,
embedding_model,
exact_cache_size: int = 10000,
semantic_threshold: float = 0.95,
ttl_hours: int = 24
):
self.embedding_model = embedding_model
self.semantic_threshold = semantic_threshold
self.ttl = timedelta(hours=ttl_hours)
# Level 1: Exact match cache (fastest)
self.exact_cache: dict = {}
# Level 2: Semantic cache (embedding similarity)
self.semantic_cache: List[dict] = []
# Level 3: Result cache (full responses)
self.result_cache: dict = {}
def _get_exact_key(self, query: str) -> str:
"""Generate exact match cache key."""
normalized = query.lower().strip()
return hashlib.md5(normalized.encode()).hexdigest()
def get(self, query: str) -> Optional[dict]:
"""
Try to retrieve from cache at each level.
Returns:
Cached result with metadata, or None if cache miss
"""
# Level 1: Exact match
exact_key = self._get_exact_key(query)
if exact_key in self.exact_cache:
entry = self.exact_cache[exact_key]
if datetime.now() - entry["timestamp"] < self.ttl:
return {"result": entry["result"], "cache_level": "exact"}
# Level 2: Semantic similarity
query_embedding = self.embedding_model.embed(query)
for entry in self.semantic_cache:
if datetime.now() - entry["timestamp"] > self.ttl:
continue
similarity = np.dot(query_embedding, entry["embedding"]) / (
np.linalg.norm(query_embedding) * np.linalg.norm(entry["embedding"])
)
if similarity >= self.semantic_threshold:
return {
"result": entry["result"],
"cache_level": "semantic",
"similarity": similarity
}
return None
def set(self, query: str, result: dict) -> None:
"""Store result in all cache levels."""
timestamp = datetime.now()
exact_key = self._get_exact_key(query)
embedding = self.embedding_model.embed(query)
# Store in exact cache
self.exact_cache[exact_key] = {
"result": result,
"timestamp": timestamp
}
# Store in semantic cache
self.semantic_cache.append({
"query": query,
"embedding": embedding,
"result": result,
"timestamp": timestamp
})
# Prune old entries periodically
self._prune_expired()
def _prune_expired(self) -> None:
"""Remove expired cache entries."""
now = datetime.now()
self.semantic_cache = [
entry for entry in self.semantic_cache
if now - entry["timestamp"] < self.ttl
]
Provider-specific caching:
- Anthropic: Caches prompts for 5 minutes automatically on repeat queries
- OpenAI: Automatically identifies optimal prefix to cache
- Self-hosted: Implement Redis-based caching for embeddings
Prompt caching impact: With 50+ examples in prompts, caching can reduce costs by 70-90% for repeat queries.
Write-Time vs Read-Time Computation
A fundamental architectural decision that affects cost, latency, and flexibility.
For Product Managers
The tradeoff:
| Factor | Write-Time (Preprocessing) | Read-Time (On-Demand) |
|---|---|---|
| Storage Cost | Higher | Lower |
| Query Latency | Lower | Higher |
| Content Freshness | May be stale | Always current |
| Best For | Stable content | Dynamic content |
Decision framework:
- Choose write-time when: Content changes rarely, query latency is critical, storage is cheap relative to compute
- Choose read-time when: Content changes frequently, storage is expensive, freshness matters more than speed
Example decisions:
| Content Type | Recommended Approach | Rationale |
|---|---|---|
| Product catalog | Write-time embeddings | Changes weekly, queries need sub-second response |
| News articles | Hybrid (recent = read-time, archive = write-time) | Recent content needs freshness |
| Legal documents | Write-time with versioning | Rarely changes, accuracy critical |
| Real-time data | Read-time | Freshness is the primary requirement |
For Engineers
Write-time preprocessing pipeline:
from typing import List, Dict, Any
from datetime import datetime
import asyncio
class WriteTimeProcessor:
"""Preprocess documents at write time for faster retrieval."""
def __init__(self, embedding_model, chunk_size: int = 512):
self.embedding_model = embedding_model
self.chunk_size = chunk_size
async def process_document(self, document: Dict[str, Any]) -> Dict[str, Any]:
"""
Process a document at write time.
Generates:
- Chunks with embeddings
- Extracted metadata
- Summary for routing
"""
# Chunk the document
chunks = self._chunk_document(document["content"])
# Generate embeddings in parallel
embeddings = await asyncio.gather(*[
self._embed_chunk(chunk) for chunk in chunks
])
# Extract metadata
metadata = await self._extract_metadata(document)
# Generate summary for routing
summary = await self._generate_summary(document["content"])
return {
"document_id": document["id"],
"chunks": [
{
"text": chunk,
"embedding": emb,
"position": i
}
for i, (chunk, emb) in enumerate(zip(chunks, embeddings))
],
"metadata": metadata,
"summary": summary,
"processed_at": datetime.now().isoformat()
}
def _chunk_document(self, content: str) -> List[str]:
"""Split document into chunks."""
# Simple chunking - production would use semantic boundaries
words = content.split()
chunks = []
for i in range(0, len(words), self.chunk_size):
chunk = " ".join(words[i:i + self.chunk_size])
chunks.append(chunk)
return chunks
async def _embed_chunk(self, chunk: str) -> List[float]:
"""Generate embedding for a chunk."""
return self.embedding_model.embed(chunk)
async def _extract_metadata(self, document: Dict[str, Any]) -> Dict[str, Any]:
"""Extract structured metadata from document."""
# Use LLM for metadata extraction
# Implementation depends on document type
return {}
async def _generate_summary(self, content: str) -> str:
"""Generate summary for routing decisions."""
# Use LLM to generate concise summary
return ""
Read-time computation pattern:
from typing import List, Dict, Any
from datetime import datetime
class ReadTimeProcessor:
"""Process queries at read time for maximum freshness."""
def __init__(self, embedding_model, llm_client):
self.embedding_model = embedding_model
self.llm_client = llm_client
async def process_query(
self,
query: str,
documents: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Process query and documents at read time.
Trade-off: Higher latency for fresher results.
"""
# Embed query
query_embedding = self.embedding_model.embed(query)
# Score documents (could be pre-embedded or embedded now)
scored_docs = []
for doc in documents:
if "embedding" in doc:
# Use pre-computed embedding
score = self._cosine_similarity(query_embedding, doc["embedding"])
else:
# Compute embedding on demand
doc_embedding = self.embedding_model.embed(doc["content"])
score = self._cosine_similarity(query_embedding, doc_embedding)
scored_docs.append((doc, score))
# Sort by relevance
scored_docs.sort(key=lambda x: x[1], reverse=True)
return {
"query": query,
"results": scored_docs[:10],
"processed_at": datetime.now().isoformat()
}
def _cosine_similarity(self, a: List[float], b: List[float]) -> float:
"""Calculate cosine similarity between two vectors."""
import numpy as np
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
Monitoring and Observability
Production monitoring builds directly on the evaluation frameworks from Chapter 1 and feedback collection from Chapter 3.
For Product Managers
Connecting to earlier chapters:
From Chapter 1's evaluation framework:
- Retrieval Recall: Track the 85% blueprint search accuracy in production—alert if it drops below 80%
- Precision Metrics: Monitor whether retrieved documents are relevant
- Experiment Velocity: Continue running A/B tests on retrieval improvements
From Chapter 3's feedback collection:
- User Satisfaction: The 40 daily submissions should maintain or increase
- Feedback Response Time: How quickly you address reported issues
- Citation Interactions: Which sources users trust and click
From Chapter 6's routing metrics:
- Routing Accuracy: The 95% routing success rate should be monitored per tool
- Tool Usage Distribution: Ensure queries are balanced across tools as expected
- End-to-End Success: 95% routing x 82% retrieval = 78% overall (track this daily)
Key metrics dashboard:
| Metric Category | Metrics | Alert Threshold |
|---|---|---|
| Performance | p50, p95, p99 latency | p95 > 3s |
| Quality | Retrieval recall, routing accuracy | Drop > 5% week-over-week |
| Cost | Cost per query, daily spend | > 120% of budget |
| User | Satisfaction rate, feedback volume | Satisfaction < 70% |
For Engineers
Monitoring implementation:
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from datetime import datetime, timedelta
import statistics
@dataclass
class MetricsCollector:
"""Collect and aggregate RAG system metrics."""
# Latency tracking
latencies: List[float] = field(default_factory=list)
# Quality tracking
routing_decisions: List[Dict] = field(default_factory=list)
retrieval_scores: List[float] = field(default_factory=list)
# Cost tracking
token_usage: Dict[str, int] = field(default_factory=lambda: {
"input": 0, "output": 0, "embedding": 0
})
# User feedback
feedback_scores: List[int] = field(default_factory=list)
def record_query(
self,
latency_ms: float,
input_tokens: int,
output_tokens: int,
routing_decision: Optional[Dict] = None,
retrieval_score: Optional[float] = None
) -> None:
"""Record metrics for a single query."""
self.latencies.append(latency_ms)
self.token_usage["input"] += input_tokens
self.token_usage["output"] += output_tokens
if routing_decision:
self.routing_decisions.append(routing_decision)
if retrieval_score is not None:
self.retrieval_scores.append(retrieval_score)
def record_feedback(self, score: int) -> None:
"""Record user feedback (1-5 scale)."""
self.feedback_scores.append(score)
def get_summary(self) -> Dict:
"""Get summary metrics."""
return {
"latency": {
"p50": self._percentile(self.latencies, 50),
"p95": self._percentile(self.latencies, 95),
"p99": self._percentile(self.latencies, 99),
},
"quality": {
"avg_retrieval_score": (
statistics.mean(self.retrieval_scores)
if self.retrieval_scores else None
),
"routing_accuracy": self._calculate_routing_accuracy(),
},
"cost": {
"total_input_tokens": self.token_usage["input"],
"total_output_tokens": self.token_usage["output"],
"estimated_cost": self._estimate_cost(),
},
"user": {
"avg_feedback": (
statistics.mean(self.feedback_scores)
if self.feedback_scores else None
),
"feedback_count": len(self.feedback_scores),
}
}
def _percentile(self, data: List[float], p: int) -> Optional[float]:
"""Calculate percentile of data."""
if not data:
return None
sorted_data = sorted(data)
idx = int(len(sorted_data) * p / 100)
return sorted_data[min(idx, len(sorted_data) - 1)]
def _calculate_routing_accuracy(self) -> Optional[float]:
"""Calculate routing accuracy from decisions."""
if not self.routing_decisions:
return None
correct = sum(1 for d in self.routing_decisions if d.get("correct", False))
return correct / len(self.routing_decisions)
def _estimate_cost(self) -> float:
"""Estimate cost based on token usage."""
# Using gpt-4o-mini pricing
input_cost = self.token_usage["input"] / 1_000_000 * 0.15
output_cost = self.token_usage["output"] / 1_000_000 * 0.60
return input_cost + output_cost
class AlertManager:
"""Manage alerts for RAG system metrics."""
def __init__(self):
self.thresholds = {
"latency_p95_ms": 3000,
"retrieval_score_min": 0.75,
"routing_accuracy_min": 0.90,
"cost_daily_max": 100.0,
"feedback_min": 3.5,
}
def check_alerts(self, metrics: Dict) -> List[str]:
"""Check metrics against thresholds and return alerts."""
alerts = []
# Latency alert
if metrics["latency"]["p95"] and metrics["latency"]["p95"] > self.thresholds["latency_p95_ms"]:
alerts.append(
f"High latency: p95 = {metrics['latency']['p95']:.0f}ms "
f"(threshold: {self.thresholds['latency_p95_ms']}ms)"
)
# Quality alerts
if metrics["quality"]["avg_retrieval_score"]:
if metrics["quality"]["avg_retrieval_score"] < self.thresholds["retrieval_score_min"]:
alerts.append(
f"Low retrieval quality: {metrics['quality']['avg_retrieval_score']:.2f} "
f"(threshold: {self.thresholds['retrieval_score_min']})"
)
if metrics["quality"]["routing_accuracy"]:
if metrics["quality"]["routing_accuracy"] < self.thresholds["routing_accuracy_min"]:
alerts.append(
f"Low routing accuracy: {metrics['quality']['routing_accuracy']:.2%} "
f"(threshold: {self.thresholds['routing_accuracy_min']:.0%})"
)
# User feedback alert
if metrics["user"]["avg_feedback"]:
if metrics["user"]["avg_feedback"] < self.thresholds["feedback_min"]:
alerts.append(
f"Low user satisfaction: {metrics['user']['avg_feedback']:.1f}/5 "
f"(threshold: {self.thresholds['feedback_min']})"
)
return alerts
Cost Optimization Strategies
Systematic approaches to reducing costs while maintaining quality.
For Product Managers
Cost optimization priority matrix:
| Strategy | Effort | Savings | Risk |
|---|---|---|---|
| Prompt caching | Low | 20-40% | Low |
| Semantic caching | Medium | 15-30% | Low |
| Model downgrade for simple queries | Medium | 30-50% | Medium |
| Self-host embeddings | High | 10-20% | Medium |
| Full self-hosting | Very High | 50-70% | High |
Recommended approach: Start with caching (low effort, low risk), then consider model selection strategies. Only pursue self-hosting when volume justifies the engineering investment.
ROI calculation example:
Current monthly cost: $5,000
Prompt caching implementation: 8 hours engineering time
Expected savings: 25% = $1,250/month
Break-even: < 1 week
For Engineers
Model selection based on query complexity:
from enum import Enum
from typing import Optional
class QueryComplexity(Enum):
SIMPLE = "simple" # Direct lookups, yes/no questions
MODERATE = "moderate" # Single-hop reasoning
COMPLEX = "complex" # Multi-hop reasoning, synthesis
class CostOptimizedRouter:
"""Route queries to appropriate models based on complexity."""
def __init__(self):
self.model_config = {
QueryComplexity.SIMPLE: {
"model": "gpt-4o-mini",
"max_tokens": 256,
"cost_per_1k_input": 0.00015,
"cost_per_1k_output": 0.0006,
},
QueryComplexity.MODERATE: {
"model": "gpt-4o-mini",
"max_tokens": 1024,
"cost_per_1k_input": 0.00015,
"cost_per_1k_output": 0.0006,
},
QueryComplexity.COMPLEX: {
"model": "gpt-4o",
"max_tokens": 4096,
"cost_per_1k_input": 0.0025,
"cost_per_1k_output": 0.01,
},
}
def classify_complexity(self, query: str, context_length: int) -> QueryComplexity:
"""
Classify query complexity based on heuristics.
Simple: Short queries, direct questions, small context
Moderate: Medium queries, single reasoning step
Complex: Long queries, multi-step reasoning, large context
"""
# Heuristic classification
query_words = len(query.split())
if query_words < 10 and context_length < 1000:
return QueryComplexity.SIMPLE
elif query_words < 30 and context_length < 4000:
return QueryComplexity.MODERATE
else:
return QueryComplexity.COMPLEX
def get_model_config(self, complexity: QueryComplexity) -> dict:
"""Get model configuration for complexity level."""
return self.model_config[complexity]
def estimate_cost(
self,
complexity: QueryComplexity,
input_tokens: int,
output_tokens: int
) -> float:
"""Estimate cost for a query."""
config = self.model_config[complexity]
input_cost = input_tokens / 1000 * config["cost_per_1k_input"]
output_cost = output_tokens / 1000 * config["cost_per_1k_output"]
return input_cost + output_cost
Batch processing for cost efficiency:
import asyncio
from typing import List, Dict, Any
class BatchProcessor:
"""Batch similar queries for cost efficiency."""
def __init__(self, batch_size: int = 10, max_wait_ms: int = 100):
self.batch_size = batch_size
self.max_wait_ms = max_wait_ms
self.pending_queries: List[Dict] = []
self.lock = asyncio.Lock()
async def add_query(self, query: Dict[str, Any]) -> Dict[str, Any]:
"""
Add query to batch and wait for result.
Batching reduces API calls and can leverage batch pricing.
"""
async with self.lock:
self.pending_queries.append(query)
# Process batch if full
if len(self.pending_queries) >= self.batch_size:
return await self._process_batch()
# Wait for more queries or timeout
await asyncio.sleep(self.max_wait_ms / 1000)
async with self.lock:
if self.pending_queries:
return await self._process_batch()
async def _process_batch(self) -> List[Dict[str, Any]]:
"""Process all pending queries as a batch."""
batch = self.pending_queries.copy()
self.pending_queries.clear()
# Process batch (implementation depends on API)
results = await self._call_api_batch(batch)
return results
async def _call_api_batch(self, batch: List[Dict]) -> List[Dict]:
"""Call API with batched queries."""
# Implementation depends on provider
# Some providers offer batch endpoints with discounts
pass
Graceful Degradation
Design systems that maintain availability even when components fail.
For Product Managers
Degradation strategy tiers:
| Tier | Condition | Response | User Impact |
|---|---|---|---|
| Normal | All systems healthy | Full functionality | None |
| Degraded | LLM latency high | Use cached responses, simpler models | Slightly slower |
| Limited | Primary retriever down | Use fallback retriever | Reduced quality |
| Minimal | Multiple failures | Pre-computed FAQ responses | Limited functionality |
Example: Financial advisory degradation
- Primary: Complex multi-index RAG with real-time data
- Fallback 1: Single-index semantic search with 5-minute stale data
- Fallback 2: Pre-computed FAQ responses for common questions
- Result: 99.9% availability even during API outages
Key principle: Users prefer a degraded response over no response. Design fallbacks that provide value even when imperfect.
For Engineers
Fallback implementation pattern:
from typing import Optional, List, Callable, Any
from dataclasses import dataclass
import asyncio
import logging
logger = logging.getLogger(__name__)
@dataclass
class FallbackConfig:
"""Configuration for a fallback tier."""
name: str
handler: Callable
timeout_ms: int
max_retries: int = 1
class GracefulDegradation:
"""Implement graceful degradation with multiple fallback tiers."""
def __init__(self, fallbacks: List[FallbackConfig]):
self.fallbacks = fallbacks
self.health_status: dict = {f.name: True for f in fallbacks}
async def execute(self, query: str, context: dict) -> dict:
"""
Execute query with fallback chain.
Tries each fallback tier until one succeeds.
"""
last_error = None
for fallback in self.fallbacks:
if not self.health_status[fallback.name]:
logger.warning(f"Skipping unhealthy fallback: {fallback.name}")
continue
for attempt in range(fallback.max_retries):
try:
result = await asyncio.wait_for(
fallback.handler(query, context),
timeout=fallback.timeout_ms / 1000
)
return {
"result": result,
"fallback_tier": fallback.name,
"degraded": fallback != self.fallbacks[0]
}
except asyncio.TimeoutError:
logger.warning(
f"Timeout on {fallback.name} (attempt {attempt + 1})"
)
last_error = "timeout"
except Exception as e:
logger.error(f"Error on {fallback.name}: {e}")
last_error = str(e)
# Mark as unhealthy after all retries fail
self.health_status[fallback.name] = False
# All fallbacks failed
return {
"result": self._get_emergency_response(query),
"fallback_tier": "emergency",
"degraded": True,
"error": last_error
}
def _get_emergency_response(self, query: str) -> str:
"""Return emergency response when all fallbacks fail."""
return (
"I'm experiencing technical difficulties and cannot fully "
"answer your question right now. Please try again in a few "
"minutes, or contact support if the issue persists."
)
async def health_check(self) -> None:
"""Periodically check and restore fallback health."""
for fallback in self.fallbacks:
if not self.health_status[fallback.name]:
try:
# Simple health check query
await asyncio.wait_for(
fallback.handler("health check", {}),
timeout=fallback.timeout_ms / 1000
)
self.health_status[fallback.name] = True
logger.info(f"Restored health: {fallback.name}")
except Exception:
pass # Still unhealthy
# Example usage
async def primary_rag(query: str, context: dict) -> str:
"""Primary RAG with full capabilities."""
# Full retrieval + generation pipeline
pass
async def fallback_simple(query: str, context: dict) -> str:
"""Simplified retrieval with cached embeddings."""
# Use pre-computed embeddings, simpler model
pass
async def fallback_faq(query: str, context: dict) -> str:
"""Return pre-computed FAQ response."""
# Match against FAQ database
pass
degradation = GracefulDegradation([
FallbackConfig("primary", primary_rag, timeout_ms=5000, max_retries=2),
FallbackConfig("simple", fallback_simple, timeout_ms=2000, max_retries=1),
FallbackConfig("faq", fallback_faq, timeout_ms=500, max_retries=1),
])
Circuit breaker pattern:
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
"""Prevent cascade failures with circuit breaker pattern."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 30,
half_open_requests: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = timedelta(seconds=recovery_timeout)
self.half_open_requests = half_open_requests
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time: Optional[datetime] = None
self.half_open_successes = 0
def can_execute(self) -> bool:
"""Check if request should be allowed."""
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# Check if recovery timeout has passed
if self.last_failure_time and datetime.now() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_successes = 0
return True
return False
if self.state == CircuitState.HALF_OPEN:
return True
return False
def record_success(self) -> None:
"""Record successful request."""
if self.state == CircuitState.HALF_OPEN:
self.half_open_successes += 1
if self.half_open_successes >= self.half_open_requests:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.failure_count = 0
def record_failure(self) -> None:
"""Record failed request."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Scaling Strategies
Growing from hundreds to millions of queries requires deliberate architecture decisions.
For Product Managers
Scaling decision framework:
| Scale | Queries/Day | Recommended Architecture |
|---|---|---|
| Small | < 1,000 | Single server, managed services |
| Medium | 1,000 - 50,000 | Load balancing, dedicated vector DB |
| Large | 50,000 - 500,000 | Sharded indices, read replicas |
| Very Large | > 500,000 | Distributed systems, edge caching |
Database selection by scale:
| Document Count | Recommended Solution |
|---|---|
| < 1M | PostgreSQL with pgvector |
| 1M - 10M | Dedicated vector database (Pinecone, Weaviate) |
| > 10M | Distributed solutions (Elasticsearch with vectors) |
Key insight from TurboPuffer: Object storage-first architectures can reduce costs by up to 95% for large-scale deployments where only a subset of data is actively accessed.
For Engineers
Horizontal scaling patterns:
from typing import List, Dict, Any
import hashlib
class ShardedRetriever:
"""Distribute retrieval across multiple shards."""
def __init__(self, shards: List[Any], shard_key: str = "category"):
self.shards = shards
self.shard_key = shard_key
self.shard_map: Dict[str, int] = {}
def get_shard(self, document: Dict[str, Any]) -> int:
"""Determine which shard a document belongs to."""
key_value = document.get(self.shard_key, "default")
if key_value not in self.shard_map:
# Consistent hashing for shard assignment
hash_value = int(hashlib.md5(key_value.encode()).hexdigest(), 16)
self.shard_map[key_value] = hash_value % len(self.shards)
return self.shard_map[key_value]
async def search(
self,
query: str,
target_shards: List[str] = None
) -> List[Dict[str, Any]]:
"""
Search across shards.
If target_shards specified, only search those.
Otherwise, search all shards and merge results.
"""
import asyncio
if target_shards:
# Search specific shards
shard_indices = [
self.shard_map.get(s, 0) for s in target_shards
]
shards_to_search = [self.shards[i] for i in set(shard_indices)]
else:
# Search all shards
shards_to_search = self.shards
# Parallel search across shards
results = await asyncio.gather(*[
shard.search(query) for shard in shards_to_search
])
# Merge and re-rank results
all_results = []
for shard_results in results:
all_results.extend(shard_results)
# Sort by score and return top results
all_results.sort(key=lambda x: x.get("score", 0), reverse=True)
return all_results[:10]
class ReadReplicaManager:
"""Manage read replicas for query distribution."""
def __init__(self, primary, replicas: List[Any]):
self.primary = primary
self.replicas = replicas
self.replica_index = 0
def get_read_connection(self):
"""Get connection for read operations (round-robin)."""
if not self.replicas:
return self.primary
replica = self.replicas[self.replica_index]
self.replica_index = (self.replica_index + 1) % len(self.replicas)
return replica
def get_write_connection(self):
"""Get connection for write operations (always primary)."""
return self.primary
Async processing for heavy operations:
import asyncio
from typing import Callable, Any
from collections import deque
class AsyncQueue:
"""Queue heavy operations for async processing."""
def __init__(self, max_workers: int = 10):
self.queue: deque = deque()
self.max_workers = max_workers
self.active_workers = 0
async def enqueue(
self,
operation: Callable,
*args,
**kwargs
) -> str:
"""
Enqueue an operation for async processing.
Returns job ID for status tracking.
"""
import uuid
job_id = str(uuid.uuid4())
self.queue.append({
"id": job_id,
"operation": operation,
"args": args,
"kwargs": kwargs,
"status": "pending"
})
# Start worker if capacity available
if self.active_workers < self.max_workers:
asyncio.create_task(self._process_queue())
return job_id
async def _process_queue(self) -> None:
"""Process queued operations."""
self.active_workers += 1
try:
while self.queue:
job = self.queue.popleft()
job["status"] = "processing"
try:
result = await job["operation"](
*job["args"],
**job["kwargs"]
)
job["status"] = "completed"
job["result"] = result
except Exception as e:
job["status"] = "failed"
job["error"] = str(e)
finally:
self.active_workers -= 1
Security and Compliance
Critical considerations for production deployments.
For Product Managers
Security checklist:
- PII detection and masking in queries and responses
- Audit logging for all queries (who asked what, when)
- Role-based access control (RBAC) for document access
- Data retention policies (how long to keep logs, feedback)
- Encryption at rest and in transit
Industry-specific requirements:
| Industry | Key Requirements |
|---|---|
| Healthcare | HIPAA compliance, patient data isolation |
| Financial | SOC2 compliance, transaction auditing |
| Legal | Privilege preservation, citation accuracy |
Reality check: In regulated industries, technical implementation is 20% of the work. The other 80% is compliance, audit trails, and governance.
For Engineers
PII detection and masking:
import re
from typing import List, Tuple
class PIIDetector:
"""Detect and mask personally identifiable information."""
def __init__(self):
self.patterns = {
"email": r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"phone": r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
"ssn": r'\b\d{3}-\d{2}-\d{4}\b',
"credit_card": r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
}
def detect(self, text: str) -> List[Tuple[str, str, int, int]]:
"""
Detect PII in text.
Returns list of (pii_type, value, start, end) tuples.
"""
findings = []
for pii_type, pattern in self.patterns.items():
for match in re.finditer(pattern, text):
findings.append((
pii_type,
match.group(),
match.start(),
match.end()
))
return findings
def mask(self, text: str) -> str:
"""Mask all detected PII in text."""
findings = self.detect(text)
# Sort by position (reverse) to avoid offset issues
findings.sort(key=lambda x: x[2], reverse=True)
masked = text
for pii_type, value, start, end in findings:
mask_char = "*"
masked = masked[:start] + mask_char * (end - start) + masked[end:]
return masked
class AuditLogger:
"""Log all RAG system interactions for compliance."""
def __init__(self, storage_backend):
self.storage = storage_backend
self.pii_detector = PIIDetector()
async def log_query(
self,
user_id: str,
query: str,
response: str,
metadata: dict
) -> str:
"""
Log a query interaction.
Masks PII before storage for privacy compliance.
"""
import uuid
from datetime import datetime
log_entry = {
"id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"query_masked": self.pii_detector.mask(query),
"response_masked": self.pii_detector.mask(response),
"metadata": metadata,
"pii_detected": len(self.pii_detector.detect(query)) > 0,
}
await self.storage.insert(log_entry)
return log_entry["id"]
Case Study Deep Dive
Construction Company: Maintaining the Flywheel in Production
The construction company from previous chapters provides a complete example of production operations.
For Product Managers
Production journey over 12 months:
| Metric | Month 1-2 | Month 3-6 | Month 7-12 |
|---|---|---|---|
| Daily Queries | 500 | 500 | 2,500 |
| Routing Accuracy | 95% | 95% | 96% |
| Retrieval Accuracy | 82% | 85% | 87% |
| Overall Success | 78% | 81% | 84% |
| Daily Cost | $45 | $32 | $98 |
| Cost per Query | $0.09 | $0.064 | $0.04 |
| Feedback/Day | 40 | 45 | 60 |
Month 1-2 (Initial Deploy):
- Baseline established with evaluation framework from Chapter 1
- Feedback collection from Chapter 3 generating 40 submissions daily
- Cost per query: $0.09
Month 3-6 (First Improvement Cycle):
- Used feedback to identify schedule search issues (dates parsed incorrectly)
- Fine-tuned date extraction (Chapter 2 techniques)
- Implemented prompt caching: $45/day to $32/day (29% reduction)
- Overall success improved from 78% to 81%
Month 7-12 (Sustained Improvement):
- 5x query growth while improving unit economics
- Added new tool for permit search based on usage patterns
- Updated routing with 60 examples per tool
- Cost per query dropped to $0.04 despite increased complexity
Key insight: Production success meant maintaining the improvement flywheel while managing costs and reliability. The evaluation framework from Chapter 1, feedback from Chapter 3, and routing from Chapter 6 all remained active in production—continuously measuring, collecting data, and improving.
For Engineers
Implementation details:
Cost optimization timeline:
import time
# Month 1-2: Baseline monitoring
class ProductionMonitor:
def __init__(self):
self.metrics = MetricsCollector()
self.alerts = AlertManager()
async def process_query(self, query: str) -> dict:
start_time = time.time()
# Process query
result = await self.rag_pipeline.process(query)
# Record metrics
latency = (time.time() - start_time) * 1000
self.metrics.record_query(
latency_ms=latency,
input_tokens=result["input_tokens"],
output_tokens=result["output_tokens"],
routing_decision=result.get("routing"),
retrieval_score=result.get("retrieval_score")
)
return result
# Month 3-6: Added caching
class CachedPipeline:
def __init__(self):
self.cache = MultiLevelCache(
embedding_model=embedding_model,
semantic_threshold=0.95,
ttl_hours=24
)
async def process_query(self, query: str) -> dict:
# Check cache first
cached = self.cache.get(query)
if cached:
return {**cached["result"], "cache_hit": True}
# Process and cache
result = await self._process_uncached(query)
self.cache.set(query, result)
return {**result, "cache_hit": False}
# Month 7-12: Added graceful degradation
class ProductionPipeline:
def __init__(self):
self.cache = MultiLevelCache(...)
self.degradation = GracefulDegradation([
FallbackConfig("primary", self._primary_rag, timeout_ms=5000),
FallbackConfig("simple", self._simple_rag, timeout_ms=2000),
FallbackConfig("cached", self._cached_only, timeout_ms=500),
])
async def process_query(self, query: str) -> dict:
# Check cache
cached = self.cache.get(query)
if cached:
return {**cached["result"], "cache_hit": True}
# Process with fallbacks
result = await self.degradation.execute(query, {})
# Cache successful results
if not result.get("degraded"):
self.cache.set(query, result["result"])
return result
Monitoring dashboard queries:
from datetime import datetime
# Daily health check
async def daily_health_report() -> dict:
metrics = metrics_collector.get_summary()
alerts = alert_manager.check_alerts(metrics)
return {
"date": datetime.now().date().isoformat(),
"metrics": metrics,
"alerts": alerts,
"status": "healthy" if not alerts else "degraded"
}
# Weekly trend analysis
async def weekly_trends() -> dict:
# Compare this week to last week
this_week = await get_metrics_for_period(days=7)
last_week = await get_metrics_for_period(days=14, offset=7)
return {
"latency_change": (
this_week["latency"]["p95"] - last_week["latency"]["p95"]
) / last_week["latency"]["p95"],
"quality_change": (
this_week["quality"]["avg_retrieval_score"] -
last_week["quality"]["avg_retrieval_score"]
),
"cost_change": (
this_week["cost"]["estimated_cost"] -
last_week["cost"]["estimated_cost"]
) / last_week["cost"]["estimated_cost"],
}
Implementation Guide
Quick Start for PMs: Production Readiness Assessment
Step 1: Cost analysis
Current monthly cost: $____
Cost per query: $____
Largest cost component: ____
Optimization opportunity: ____
Step 2: Reliability assessment
| Component | Current Availability | Target | Gap |
|---|---|---|---|
| Retrieval | ___% | 99.9% | |
| Generation | ___% | 99.5% | |
| Overall | ___% | 99% |
Step 3: Monitoring coverage
- Latency tracking (p50, p95, p99)
- Quality metrics (retrieval score, routing accuracy)
- Cost tracking (daily, per-query)
- User feedback collection
- Alerting configured
Step 4: Degradation planning
| Failure Scenario | Fallback Strategy | User Impact |
|---|---|---|
| LLM API down | ||
| Vector DB slow | ||
| High load |
Detailed Implementation for Engineers
Step 1: Set up monitoring
# Initialize monitoring
from production import MetricsCollector, AlertManager
metrics = MetricsCollector()
alerts = AlertManager()
# Configure thresholds
alerts.thresholds = {
"latency_p95_ms": 3000,
"retrieval_score_min": 0.75,
"routing_accuracy_min": 0.90,
"cost_daily_max": 100.0,
}
Step 2: Implement caching
# Set up multi-level cache
from production import MultiLevelCache
cache = MultiLevelCache(
embedding_model=embedding_model,
semantic_threshold=0.95,
ttl_hours=24
)
# Integrate with pipeline
async def cached_query(query: str) -> dict:
cached = cache.get(query)
if cached:
return cached["result"]
result = await process_query(query)
cache.set(query, result)
return result
Step 3: Add graceful degradation
# Configure fallback chain
from production import GracefulDegradation, FallbackConfig
degradation = GracefulDegradation([
FallbackConfig("primary", primary_rag, timeout_ms=5000, max_retries=2),
FallbackConfig("simple", simple_rag, timeout_ms=2000, max_retries=1),
FallbackConfig("cached", cached_only, timeout_ms=500, max_retries=1),
])
# Use in production
async def production_query(query: str) -> dict:
return await degradation.execute(query, {})
Step 4: Set up cost tracking
# Track costs per query
class CostTracker:
def __init__(self):
self.daily_costs = []
def record(self, input_tokens: int, output_tokens: int, model: str):
cost = self._calculate_cost(input_tokens, output_tokens, model)
self.daily_costs.append({
"timestamp": datetime.now(),
"cost": cost,
"model": model
})
def get_daily_total(self) -> float:
today = datetime.now().date()
return sum(
c["cost"] for c in self.daily_costs
if c["timestamp"].date() == today
)
Common Pitfalls
PM Pitfalls
PM Pitfall: Optimizing for model quality over total cost of ownership
The mistake: Choosing the most accurate model without considering operational costs.
Why it happens: Demo performance is measured on quality alone; production costs are discovered later.
The fix: Calculate total cost of ownership including infrastructure, maintenance, and API costs before selecting models. A 5% quality improvement rarely justifies a 10x cost increase.
PM Pitfall: Underestimating maintenance burden
The mistake: Planning for build cost but not ongoing maintenance.
Why it happens: Self-hosting looks cheap when only considering infrastructure costs.
The fix: Include engineering time in cost calculations. 20 hours/month of maintenance at $150/hour = $3,000/month—often more than API costs.
PM Pitfall: No degradation planning
The mistake: Assuming 100% availability from external APIs.
Why it happens: APIs work reliably during development; failures happen at scale.
The fix: Design fallback strategies before launch. Users prefer degraded responses over errors.
Engineering Pitfalls
Engineering Pitfall: Caching without invalidation strategy
The mistake: Implementing caching without considering when to invalidate.
Why it happens: Cache hits feel like wins; stale data problems emerge slowly.
The fix: Define TTL policies based on content freshness requirements. Implement cache invalidation on document updates.
Engineering Pitfall: Monitoring only happy paths
The mistake: Tracking successful queries but not failures.
Why it happens: Success metrics are easier to collect and more pleasant to review.
The fix: Log all queries including failures. Track error rates, timeout rates, and fallback usage.
Engineering Pitfall: No circuit breakers
The mistake: Retrying failed requests indefinitely, causing cascade failures.
Why it happens: Retry logic is added without considering system-wide effects.
The fix: Implement circuit breakers that stop retries after threshold failures. Allow recovery time before resuming.
Engineering Pitfall: Premature optimization
The mistake: Building complex caching and sharding before understanding actual load.
Why it happens: Engineers anticipate scale that may never materialize.
The fix: Start simple. Add complexity only when metrics show it is needed. Measure before optimizing.
Related Content
Expert Talks
TurboPuffer: Object Storage-First Vector Database Architecture - Simon
Key insights:
- Object storage-first architecture can reduce costs by up to 95%
- Three-tier storage hierarchy: object storage (cold), NVMe SSD (warm), RAM (hot)
- Clustered indexes outperform graph-based approaches for object storage
- Companies like Notion and Cursor use this architecture at scale
Production relevance: Consider object storage-first databases when you have large datasets with power-law access patterns (most data rarely accessed).
The RAG Mistakes That Are Killing Your AI - Skylar Payne
Key insights:
- 21% of documents silently dropped due to encoding issues in one medical chatbot
- Teams that iterate quickly on data quality win
- Evaluate beyond your retrieval window to catch false negatives
- 90% of complexity additions perform worse when properly evaluated
Production relevance: Monitor document counts at each pipeline stage. Silent failures are the most dangerous.
Office Hours
Cohort 2 Week 6 Summary
Key discussions:
- Deep Research as RAG with strong reasoning capabilities
- Long context windows vs chunking tradeoffs
- Human-labeled data remains essential for high-quality systems
- Structured reports provide more business value than ad-hoc answers
Action Items
For Product Teams
- Conduct cost analysis (Week 1)
- Calculate current cost per query
- Identify largest cost components
-
Estimate savings from caching and model optimization
-
Define SLAs (Week 1)
- Set latency targets (p50, p95, p99)
- Define availability requirements
-
Establish quality thresholds
-
Plan degradation strategy (Week 2)
- Map failure scenarios to fallback responses
- Define acceptable degraded states
-
Communicate expectations to stakeholders
-
Establish monitoring dashboard (Week 2)
- Track key metrics daily
- Set up alerting for threshold breaches
- Review trends weekly
For Engineering Teams
- Implement monitoring (Week 1)
- Add latency tracking to all queries
- Log token usage for cost tracking
-
Record routing decisions and outcomes
-
Add caching layer (Week 1-2)
- Implement exact-match cache first
- Add semantic cache if hit rate justifies
-
Define TTL and invalidation policies
-
Build fallback chain (Week 2)
- Implement primary and fallback handlers
- Add timeout handling
-
Test failure scenarios
-
Set up alerting (Week 2)
- Configure alerts for latency spikes
- Alert on quality degradation
-
Alert on cost overruns
-
Document runbooks (Week 3)
- Document common failure scenarios
- Write recovery procedures
- Create on-call escalation paths
Reflection Questions
-
What is your current cost per query, and what is your target? Consider whether you have measured all cost components including infrastructure and maintenance.
-
What happens when your LLM provider has an outage? Think about whether you have fallback strategies that maintain user value.
-
How would you detect a 10% degradation in retrieval quality? Consider whether your monitoring would catch gradual quality decline before users notice.
-
What is the ROI of your next optimization investment? Calculate expected savings against implementation cost before starting.
-
How do you balance freshness vs cost in your caching strategy? Think about which queries need real-time data vs which can tolerate staleness.
Summary
Key Takeaways for Product Managers
-
Cost structure matters: LLM generation is 60-75% of costs. Focus optimization there first. Calculate total cost of ownership including maintenance before choosing self-hosting.
-
Reliability requires planning: Design fallback strategies before launch. Users prefer degraded responses over errors. 99.9% availability requires multiple fallback tiers.
-
Monitoring connects to the flywheel: Production metrics should extend the evaluation framework from Chapter 1. Track the same quality metrics in production that you measured in development.
-
Scaling is a business decision: Choose architecture based on actual scale needs. Premature optimization wastes engineering resources. Measure before optimizing.
Key Takeaways for Engineers
-
Multi-level caching reduces costs: Exact match cache (fastest), semantic cache (flexible), result cache (complete responses). Define TTL and invalidation policies upfront.
-
Graceful degradation maintains availability: Implement fallback chains with timeouts. Use circuit breakers to prevent cascade failures. Test failure scenarios regularly.
-
Monitor everything: Track latency (p50, p95, p99), quality metrics, costs, and user feedback. Alert on threshold breaches. Review trends weekly.
-
Start simple, add complexity when needed: Measure actual load before building complex infrastructure. Most optimization opportunities come from caching and model selection, not architectural complexity.
Further Reading
Books and Resources
- Google SRE Book - Reliability engineering principles
- High Performance Browser Networking - Latency optimization
- Designing Data-Intensive Applications - Scalability patterns
Tools
- Prometheus - Metrics collection and alerting
- Grafana - Monitoring dashboards
- Redis - Caching infrastructure
Related Chapters
- Chapter 1: Evaluation-First Development - Evaluation framework that becomes production monitoring
- Chapter 3: Feedback Systems and UX - Feedback collection that continues in production
- Chapter 6: Query Routing and Orchestration - Routing metrics to monitor
Navigation
- Previous: Chapter 6: Query Routing and Orchestration - Building intelligent routing systems
- Next: Chapter 8: Hybrid Search - Combining semantic and lexical search
- Reference: Glossary | Quick Reference
- Book Index: Book Overview