7. OptimizationAvancé

Surveillance et Observabilité des Systèmes RAG

11 novembre 2025
12 min de lecture
Équipe de Recherche Ailog

Surveillez les systèmes RAG en production : suivez la latence, les coûts, la précision et la satisfaction utilisateur avec des métriques et tableaux de bord.

Métriques Clés à Suivre

Performance :

  • Latence (p50, p95, p99)
  • Débit (requêtes/seconde)
  • Taux d'erreur

Qualité :

  • Précision de la récupération
  • Qualité de la réponse
  • Feedback utilisateur

Coût :

  • Coûts API par requête
  • Coûts de stockage
  • Coûts de calcul

Configuration Prometheus + Grafana

DEVELOPERyaml
# docker-compose.yml version: '3.8' services: prometheus: image: prom/prometheus:latest ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml grafana: image: grafana/grafana:latest ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin

Instrumentation du Pipeline RAG

DEVELOPERpython
from prometheus_client import Counter, Histogram, Gauge, start_http_server import time # Metrics query_counter = Counter('rag_queries_total', 'Total RAG queries') latency_histogram = Histogram('rag_latency_seconds', 'RAG latency', buckets=[0.1, 0.5, 1, 2, 5]) error_counter = Counter('rag_errors_total', 'Total RAG errors', ['error_type']) cost_counter = Counter('rag_cost_usd', 'Total costs', ['component']) relevance_gauge = Gauge('rag_relevance_score', 'Average relevance score') def instrumented_rag(query): query_counter.inc() start = time.time() try: # Embedding emb_start = time.time() embedding = embed(query) emb_time = time.time() - emb_start cost_counter.labels(component='embedding').inc(0.001) # Retrieval ret_start = time.time() docs = vector_db.search(embedding, limit=5) ret_time = time.time() - ret_start # LLM llm_start = time.time() response = llm_call(query, docs) llm_time = time.time() - llm_start cost_counter.labels(component='llm').inc(0.01) # Track latency total_time = time.time() - start latency_histogram.observe(total_time) # Log breakdown logger.info(f"Latency breakdown: emb={emb_time:.3f}s, ret={ret_time:.3f}s, llm={llm_time:.3f}s") return response except Exception as e: error_counter.labels(error_type=type(e).__name__).inc() raise # Start metrics server start_http_server(8000)

Suivi de la Qualité de Récupération

DEVELOPERpython
from sklearn.metrics import ndcg_score import numpy as np def track_retrieval_quality(query, retrieved_docs, ground_truth_docs): # Calculate nDCG relevance_scores = [] for doc in retrieved_docs: if doc.id in ground_truth_docs: relevance_scores.append(1) else: relevance_scores.append(0) ndcg = ndcg_score([relevance_scores], [range(len(relevance_scores))]) # Update metric relevance_gauge.set(ndcg) # Log to database db.insert({ "timestamp": time.time(), "query": query, "ndcg": ndcg, "retrieved_ids": [doc.id for doc in retrieved_docs] })

Collecte de Feedback Utilisateur

DEVELOPERpython
from fastapi import FastAPI from pydantic import BaseModel app = FastAPI() class Feedback(BaseModel): query_id: str rating: int # 1-5 helpful: bool comment: str | None feedback_counter = Counter( 'rag_feedback_total', 'User feedback', ['rating'] ) @app.post("/feedback") def collect_feedback(feedback: Feedback): # Track in Prometheus feedback_counter.labels(rating=str(feedback.rating)).inc() # Store in database db.insert({ "query_id": feedback.query_id, "rating": feedback.rating, "helpful": feedback.helpful, "comment": feedback.comment, "timestamp": time.time() }) return {"status": "ok"}

Traçage Distribué avec OpenTelemetry

DEVELOPERpython
from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.jaeger.thrift import JaegerExporter # Setup tracing trace.set_tracer_provider(TracerProvider()) jaeger_exporter = JaegerExporter( agent_host_name="localhost", agent_port=6831, ) trace.get_tracer_provider().add_span_processor( BatchSpanProcessor(jaeger_exporter) ) tracer = trace.get_tracer(__name__) def traced_rag(query): with tracer.start_as_current_span("rag_pipeline") as span: span.set_attribute("query", query) # Embedding with tracer.start_as_current_span("embedding"): embedding = embed(query) # Retrieval with tracer.start_as_current_span("retrieval"): docs = vector_db.search(embedding, limit=5) span.set_attribute("num_docs_retrieved", len(docs)) # Reranking with tracer.start_as_current_span("reranking"): reranked = rerank(query, docs) # LLM with tracer.start_as_current_span("llm_generation"): response = llm_call(query, reranked) span.set_attribute("response_length", len(response)) return response

Bonnes Pratiques de Logging

DEVELOPERpython
import structlog import json logger = structlog.get_logger() def logged_rag(query, user_id): logger.info( "rag_query_started", query=query, user_id=user_id, timestamp=time.time() ) try: # Retrieval docs = retrieve(query) logger.info( "documents_retrieved", num_docs=len(docs), doc_ids=[d.id for d in docs], scores=[d.score for d in docs] ) # LLM response = llm_call(query, docs) logger.info( "rag_query_completed", response_length=len(response), tokens_used=estimate_tokens(query, docs, response) ) return response except Exception as e: logger.error( "rag_query_failed", error=str(e), error_type=type(e).__name__ ) raise

Règles d'Alertes

DEVELOPERyaml
# prometheus_alerts.yml groups: - name: rag_alerts interval: 30s rules: # High latency - alert: HighRAGLatency expr: histogram_quantile(0.95, rag_latency_seconds) > 2 for: 5m annotations: summary: "RAG p95 latency > 2s" # High error rate - alert: HighErrorRate expr: rate(rag_errors_total[5m]) > 0.05 for: 2m annotations: summary: "RAG error rate > 5%" # Low relevance - alert: LowRelevance expr: rag_relevance_score < 0.7 for: 10m annotations: summary: "RAG relevance score < 0.7" # High costs - alert: HighDailyCost expr: increase(rag_cost_usd[24h]) > 100 annotations: summary: "RAG costs > $100/day"

Requêtes Dashboard (Grafana)

DEVELOPERpromql
# Latence moyenne au fil du temps rate(rag_latency_seconds_sum[5m]) / rate(rag_latency_seconds_count[5m]) # Débit de requêtes rate(rag_queries_total[1m]) # Taux d'erreur rate(rag_errors_total[5m]) / rate(rag_queries_total[5m]) # Coût par jour increase(rag_cost_usd[24h]) # Satisfaction utilisateur avg(rag_feedback_total{rating="5"}) / sum(rag_feedback_total)

Framework de Tests A/B

DEVELOPERpython
import random def ab_test_rag(query, user_id): # Assign user to variant variant = "A" if hash(user_id) % 2 == 0 else "B" if variant == "A": # Control: current RAG response = rag_pipeline_v1(query) else: # Treatment: new RAG response = rag_pipeline_v2(query) # Track variant variant_counter.labels(variant=variant).inc() # Log for analysis db.insert({ "query": query, "user_id": user_id, "variant": variant, "timestamp": time.time() }) return response, variant

Le RAG en production nécessite une surveillance. Suivez tout, alertez sur les anomalies, itérez basé sur les données.

Tags

optimizationsurveillanceobservabilityproduction

Articles connexes

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !