Surveillance et Observabilité des Systèmes RAG
Surveillez les systèmes RAG en production : suivez la latence, les coûts, la précision et la satisfaction utilisateur avec des métriques et tableaux de bord.
Métriques Clés à Suivre
Performance :
- Latence (p50, p95, p99)
- Débit (requêtes/seconde)
- Taux d'erreur
Qualité :
- Précision de la récupération
- Qualité de la réponse
- Feedback utilisateur
Coût :
- Coûts API par requête
- Coûts de stockage
- Coûts de calcul
Configuration Prometheus + Grafana
DEVELOPERyaml# docker-compose.yml version: '3.8' services: prometheus: image: prom/prometheus:latest ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin
Instrumentation du Pipeline RAG
DEVELOPERpythonfrom prometheus_client import Counter, Histogram, Gauge, start_http_server import time # Metrics query_counter = Counter('rag_queries_total', 'Total RAG queries') latency_histogram = Histogram('rag_latency_seconds', 'RAG latency', buckets=[0.1, 0.5, 1, 2, 5]) error_counter = Counter('rag_errors_total', 'Total RAG errors', ['error_type']) cost_counter = Counter('rag_cost_usd', 'Total costs', ['component']) relevance_gauge = Gauge('rag_relevance_score', 'Average relevance score') def instrumented_rag(query): query_counter.inc() start = time.time() try: # Embedding emb_start = time.time() embedding = embed(query) emb_time = time.time() - emb_start cost_counter.labels(component='embedding').inc(0.001) # Retrieval ret_start = time.time() docs = vector_db.search(embedding, limit=5) ret_time = time.time() - ret_start # LLM llm_start = time.time() response = llm_call(query, docs) llm_time = time.time() - llm_start cost_counter.labels(component='llm').inc(0.01) # Track latency total_time = time.time() - start latency_histogram.observe(total_time) # Log breakdown logger.info(f"Latency breakdown: emb={emb_time:.3f}s, ret={ret_time:.3f}s, llm={llm_time:.3f}s") return response except Exception as e: error_counter.labels(error_type=type(e).__name__).inc() raise # Start metrics server start_http_server(8000)
Suivi de la Qualité de Récupération
DEVELOPERpythonfrom sklearn.metrics import ndcg_score import numpy as np def track_retrieval_quality(query, retrieved_docs, ground_truth_docs): # Calculate nDCG relevance_scores = [] for doc in retrieved_docs: if doc.id in ground_truth_docs: relevance_scores.append(1) else: relevance_scores.append(0) ndcg = ndcg_score([relevance_scores], [range(len(relevance_scores))]) # Update metric relevance_gauge.set(ndcg) # Log to database db.insert({ "timestamp": time.time(), "query": query, "ndcg": ndcg, "retrieved_ids": [doc.id for doc in retrieved_docs] })
Collecte de Feedback Utilisateur
DEVELOPERpythonfrom fastapi import FastAPI from pydantic import BaseModel app = FastAPI() class Feedback(BaseModel): query_id: str rating: int # 1-5 helpful: bool comment: str | None feedback_counter = Counter( 'rag_feedback_total', 'User feedback', ['rating'] ) @app.post("/feedback") def collect_feedback(feedback: Feedback): # Track in Prometheus feedback_counter.labels(rating=str(feedback.rating)).inc() # Store in database db.insert({ "query_id": feedback.query_id, "rating": feedback.rating, "helpful": feedback.helpful, "comment": feedback.comment, "timestamp": time.time() }) return {"status": "ok"}
Traçage Distribué avec OpenTelemetry
DEVELOPERpythonfrom opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.jaeger.thrift import JaegerExporter # Setup tracing trace.set_tracer_provider(TracerProvider()) jaeger_exporter = JaegerExporter( agent_host_name="localhost", agent_port=6831, ) trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(jaeger_exporter) ) tracer = trace.get_tracer(__name__) def traced_rag(query): with tracer.start_as_current_span("rag_pipeline") as span: span.set_attribute("query", query) # Embedding with tracer.start_as_current_span("embedding"): embedding = embed(query) # Retrieval with tracer.start_as_current_span("retrieval"): docs = vector_db.search(embedding, limit=5) span.set_attribute("num_docs_retrieved", len(docs)) # Reranking with tracer.start_as_current_span("reranking"): reranked = rerank(query, docs) # LLM with tracer.start_as_current_span("llm_generation"): response = llm_call(query, reranked) span.set_attribute("response_length", len(response)) return response
Bonnes Pratiques de Logging
DEVELOPERpythonimport structlog import json logger = structlog.get_logger() def logged_rag(query, user_id): logger.info( "rag_query_started", query=query, user_id=user_id, timestamp=time.time() ) try: # Retrieval docs = retrieve(query) logger.info( "documents_retrieved", num_docs=len(docs), doc_ids=[d.id for d in docs], scores=[d.score for d in docs] ) # LLM response = llm_call(query, docs) logger.info( "rag_query_completed", response_length=len(response), tokens_used=estimate_tokens(query, docs, response) ) return response except Exception as e: logger.error( "rag_query_failed", error=str(e), error_type=type(e).__name__ ) raise
Règles d'Alertes
DEVELOPERyaml# prometheus_alerts.yml groups: - name: rag_alerts interval: 30s rules: # High latency - alert: HighRAGLatency expr: histogram_quantile(0.95, rag_latency_seconds) > 2 for: 5m annotations: summary: "RAG p95 latency > 2s" # High error rate - alert: HighErrorRate expr: rate(rag_errors_total[5m]) > 0.05 for: 2m annotations: summary: "RAG error rate > 5%" # Low relevance - alert: LowRelevance expr: rag_relevance_score < 0.7 for: 10m annotations: summary: "RAG relevance score < 0.7" # High costs - alert: HighDailyCost expr: increase(rag_cost_usd[24h]) > 100 annotations: summary: "RAG costs > $100/day"
Requêtes Dashboard (Grafana)
DEVELOPERpromql# Latence moyenne au fil du temps rate(rag_latency_seconds_sum[5m]) / rate(rag_latency_seconds_count[5m]) # Débit de requêtes rate(rag_queries_total[1m]) # Taux d'erreur rate(rag_errors_total[5m]) / rate(rag_queries_total[5m]) # Coût par jour increase(rag_cost_usd[24h]) # Satisfaction utilisateur avg(rag_feedback_total{rating="5"}) / sum(rag_feedback_total)
Framework de Tests A/B
DEVELOPERpythonimport random def ab_test_rag(query, user_id): # Assign user to variant variant = "A" if hash(user_id) % 2 == 0 else "B" if variant == "A": # Control: current RAG response = rag_pipeline_v1(query) else: # Treatment: new RAG response = rag_pipeline_v2(query) # Track variant variant_counter.labels(variant=variant).inc() # Log for analysis db.insert({ "query": query, "user_id": user_id, "variant": variant, "timestamp": time.time() }) return response, variant
Le RAG en production nécessite une surveillance. Suivez tout, alertez sur les anomalies, itérez basé sur les données.
Tags
Articles connexes
Optimisation de la Fenêtre de Contexte : Gérer les Limites de Tokens
Stratégies pour Intégrer Plus d'Informations dans des Fenêtres de Contexte Limitées : Compression, Résumé, Sélection Intelligente et Techniques de Gestion de Fenêtre.
Réduire la Latence RAG : De 2000ms à 200ms
RAG 10x Plus Rapide : Récupération Parallèle, Réponses en Streaming et Optimisations Architecturales pour une Latence Inférieure à 200ms.
Stratégies de Mise en Cache pour Réduire la Latence et le Coût RAG
Réduisez les Coûts de 80% : Implémentez la Mise en Cache Sémantique, la Mise en Cache d'Embeddings et la Mise en Cache de Réponses pour un RAG Production.