4. StorageAvancé

Milvus : Recherche Vectorielle à l'Échelle Milliards

15 novembre 2025
13 min de lecture
Équipe de Recherche Ailog

Déployez Milvus pour un RAG à l'Échelle Production Gérant des Milliards de Vecteurs avec Mise à l'Échelle Horizontale et Accélération GPU.

Pourquoi Milvus ?

Conçu pour l'échelle :

  • Des milliards de vecteurs
  • Accélération GPU
  • Mise à l'échelle horizontale
  • Stockage S3/MinIO
  • Natif Kubernetes

Utilisé par : Shopify, NVIDIA, Salesforce

Configuration Docker

DEVELOPERbash
# Standalone (développement) docker run -d --name milvus -p 19530:19530 -p 9091:9091 \ milvusdb/milvus:v2.3.4 milvus run standalone

Déploiement Distribué

DEVELOPERyaml
# docker-compose.yml (production) version: '3.8' services: etcd: image: quay.io/coreos/etcd:v3.5.5 minio: image: minio/minio:RELEASE.2023-03-20T20-16-18Z milvus-proxy: image: milvusdb/milvus:v2.3.4 command: ["milvus", "run", "proxy"] depends_on: - etcd - minio milvus-querynode: image: milvusdb/milvus:v2.3.4 command: ["milvus", "run", "querynode"] deploy: replicas: 3 # Mise à l'échelle horizontale

Client Python

DEVELOPERpython
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType # Connexion connections.connect("default", host="localhost", port="19530") # Définir le schéma fields = [ FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536), FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535), FieldSchema(name="metadata", dtype=DataType.JSON) ] schema = CollectionSchema(fields=fields, description="RAG documents") # Créer la collection collection = Collection(name="documents", schema=schema)

Stratégies d'Indexation

DEVELOPERpython
# IVF_FLAT (équilibré) index_params = { "index_type": "IVF_FLAT", "metric_type": "COSINE", "params": {"nlist": 1024} } # HNSW (recherche plus rapide, plus de mémoire) index_params = { "index_type": "HNSW", "metric_type": "COSINE", "params": { "M": 16, "efConstruction": 256 } } # Index GPU (10x plus rapide) index_params = { "index_type": "GPU_IVF_FLAT", "metric_type": "COSINE", "params": {"nlist": 2048} } collection.create_index(field_name="embedding", index_params=index_params)

Insertion de Données

DEVELOPERpython
# Insertion par lots data = [ [embedding1, embedding2, ...], # embeddings ["text1", "text2", ...], # text [{"category": "A"}, {"category": "B"}, ...] # metadata ] collection.insert(data) collection.flush() # Persister sur disque

Recherche

DEVELOPERpython
# Charger la collection en mémoire collection.load() # Recherche search_params = { "metric_type": "COSINE", "params": {"nprobe": 16} # Plus élevé = plus précis mais plus lent } results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, output_fields=["text", "metadata"] ) for hits in results: for hit in hits: print(f"Score: {hit.score}, Text: {hit.entity.get('text')}")

Filtrage

DEVELOPERpython
# Filtrage des métadonnées avec expressions booléennes results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, expr='metadata["category"] == "tech"', output_fields=["text", "metadata"] )

Partitionnement

Diviser la collection pour des requêtes plus rapides :

DEVELOPERpython
# Créer des partitions collection.create_partition("partition_2024") collection.create_partition("partition_2025") # Insérer dans une partition spécifique collection.insert(data, partition_name="partition_2025") # Rechercher uniquement dans une partition spécifique results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, partition_names=["partition_2025"], limit=10 )

Time Travel

Interroger les données historiques :

DEVELOPERpython
import time # Obtenir le timestamp avant la suppression ts_before = int(time.time() * 1000) # Supprimer des données collection.delete(expr="id in [1, 2, 3]") # Interroger les données telles qu'elles étaient avant la suppression results = collection.search( data=[query_embedding], anns_field="embedding", param=search_params, limit=10, travel_timestamp=ts_before )

Groupes de Ressources

Isoler les charges de travail sur différents nœuds :

DEVELOPERpython
from pymilvus import utility # Créer des groupes de ressources utility.create_resource_group("rg1", config={"node_num": 2}) utility.create_resource_group("rg2", config={"node_num": 1}) # Assigner une collection à un groupe de ressources collection.set_properties({"resource_groups": ["rg1"]})

Surveillance

DEVELOPERpython
# Statistiques de la collection stats = collection.get_stats() print(f"Row count: {stats['row_count']}") # Progression de l'index index = collection.index() print(f"Index state: {index.state}") # Métriques de requête (endpoint Prometheus) # http://localhost:9091/metrics

Pipeline RAG de Production

DEVELOPERpython
from pymilvus import Collection, connections import openai connections.connect("default", host="milvus-proxy", port="19530") collection = Collection("documents") collection.load() def milvus_rag(query): # Créer l'embedding de la requête query_emb = openai.Embedding.create( input=query, model="text-embedding-3-small" )['data'][0]['embedding'] # Rechercher dans Milvus results = collection.search( data=[query_emb], anns_field="embedding", param={"metric_type": "COSINE", "params": {"nprobe": 32}}, limit=5, output_fields=["text"] ) # Construire le contexte context = "\n\n".join([hit.entity.get('text') for hit in results[0]]) # Générer la réponse response = openai.ChatCompletion.create( model="gpt-4-turbo", messages=[{ "role": "user", "content": f"Context: {context}\n\nQuestion: {query}" }] ) return response.choices[0].message.content # Utilisation answer = milvus_rag("What is Milvus?")

Milvus gère l'échelle de milliards de vecteurs avec facilité. Parfait pour les déploiements RAG en entreprise.

Tags

milvusbase-de-données-vectorielleéchellestorageperformance

Articles connexes

Ailog Assistant

Ici pour vous aider

Salut ! Pose-moi des questions sur Ailog et comment intégrer votre RAG dans vos projets !