ai-development llamaindexrag pipelinedocument retrieval

LlamaIndex RAG Pipeline: Production Implementation Guide

Master LlamaIndex RAG pipeline implementation with production-ready code examples, architecture patterns, and optimization strategies for scalable document retrieval systems.

📖 25 min read 📅 March 23, 2026 ✍ By PropTechUSA AI
25m
Read Time
4.9k
Words
14
Sections

Building production-ready RAG (Retrieval-Augmented Generation) systems requires more than just connecting a vector database to an LLM. The LlamaIndex framework provides the scaffolding for sophisticated document retrieval pipelines, but implementing them at scale demands careful consideration of architecture, performance, and reliability. This comprehensive guide walks through the essential components of a production LlamaIndex RAG [pipeline](/custom-crm), from initial setup to deployment optimization.

Understanding LlamaIndex Architecture

Core Components of a RAG Pipeline

LlamaIndex organizes RAG systems around several key abstractions that work together to transform raw documents into queryable knowledge bases. The framework's modular design allows developers to swap components based on specific requirements while maintaining consistent interfaces.

The Document abstraction represents the fundamental unit of information in your pipeline. Unlike simple text strings, LlamaIndex documents carry metadata, relationship information, and transformation history that becomes crucial for advanced retrieval strategies.

python
from llama_index.core import Document

from llama_index.core.node_parser import SentenceSplitter

documents = [

Document(

text="[Property](/offer-check) management software streamlines tenant communications...",

metadata={

"source": "property_management_guide.pdf",

"section": "tenant_relations",

"page": 15,

"document_type": "guide",

"created_at": "2024-01-15"

}

)

]

node_parser = SentenceSplitter(

chunk_size=512,

chunk_overlap=50,

separator=" "

)

nodes = node_parser.get_nodes_from_documents(documents)

Vector Stores and Embedding Models

The choice of vector store significantly impacts both performance and cost at scale. While development might begin with simple in-memory storage, production systems require persistent, distributed solutions that can handle concurrent queries and updates.

python
from llama_index.vector_stores.chroma import ChromaVectorStore

from llama_index.embeddings.openai import OpenAIEmbedding

from llama_index.core import StorageContext, VectorStoreIndex

import chromadb

chroma_client = chromadb.PersistentClient(path="./chroma_db")

chroma_collection = chroma_client.create_collection("proptech_documents")

vector_store = ChromaVectorStore(chroma_collection=chroma_collection)

embedding_model = OpenAIEmbedding(

model="text-embedding-3-small",

api_key=os.getenv("OPENAI_API_KEY")

)

storage_context = StorageContext.from_defaults(vector_store=vector_store)

index = VectorStoreIndex.from_documents(

documents,

storage_context=storage_context,

embed_model=embedding_model

)

Query Engines and Retrieval Strategies

LlamaIndex supports multiple retrieval strategies beyond basic similarity search. Production systems often benefit from hybrid approaches that combine semantic similarity with keyword matching and metadata filtering.

python
from llama_index.core.query_engine import RetrieverQueryEngine

from llama_index.core.retrievers import VectorIndexRetriever

from llama_index.core.postprocessor import SimilarityPostprocessor

retriever = VectorIndexRetriever(

index=index,

similarity_top_k=10,

vector_store_query_mode="hybrid"

)

response_synthesizer = get_response_synthesizer(

response_mode="compact",

streaming=True

)

query_engine = RetrieverQueryEngine(

retriever=retriever,

response_synthesizer=response_synthesizer,

node_postprocessors=[SimilarityPostprocessor(similarity_cutoff=0.7)]

)

Production Architecture Patterns

Microservices-Based RAG Architecture

Production RAG systems benefit from separation of concerns through microservices architecture. This approach enables independent scaling, testing, and deployment of different pipeline components.

python
from fastapi import FastAPI, HTTPException, BackgroundTasks

from pydantic import BaseModel

from typing import List, Optional

import asyncio

app = FastAPI(title="RAG Pipeline API")

class QueryRequest(BaseModel):

query: str

filters: Optional[dict] = None

top_k: int = 5

class DocumentIngestionRequest(BaseModel):

documents: List[dict]

collection_name: str

@app.post("/query")

async def query_documents(request: QueryRequest):

try:

# Apply metadata filters if provided

if request.filters:

retriever.set_metadata_filters(request.filters)

response = await query_engine.aquery(request.query)

return {

"answer": str(response),

"source_nodes": [

{

"text": node.text,

"metadata": node.metadata,

"score": node.score

}

for node in response.source_nodes

]

}

except Exception as e:

raise HTTPException(status_code=500, detail=str(e))

@app.post("/ingest")

async def ingest_documents(request: DocumentIngestionRequest, background_tasks: BackgroundTasks):

background_tasks.add_task(process_documents, request.documents, request.collection_name)

return {"status": "accepted", "message": "Documents queued for processing"}

async def process_documents(documents: List[dict], collection_name: str):

# Implement batch processing with error handling

batch_size = 10

for i in range(0, len(documents), batch_size):

batch = documents[i:i + batch_size]

try:

await process_document_batch(batch, collection_name)

except Exception as e:

# Log error and continue with next batch

logger.error(f"Batch processing failed: {e}")

Caching and Performance Optimization

Implementing intelligent caching strategies dramatically improves response times and reduces computational costs. Multi-layer caching addresses different performance bottlenecks in the pipeline.

python
import redis

from functools import wraps

import hashlib

import json

class RAGCache:

def __init__(self, redis_client):

self.redis = redis_client

self.embedding_cache_ttl = 3600 * 24 # 24 hours

self.query_cache_ttl = 3600 # 1 hour

def cache_embeddings(self, func):

@wraps(func)

def wrapper(text):

# Create cache key from text hash

text_hash = hashlib.md5(text.encode()).hexdigest()

cache_key = f"embedding:{text_hash}"

# Check cache first

cached_result = self.redis.get(cache_key)

if cached_result:

return json.loads(cached_result)

# Generate embedding and cache result

result = func(text)

self.redis.setex(

cache_key,

self.embedding_cache_ttl,

json.dumps(result)

)

return result

return wrapper

def cache_queries(self, func):

@wraps(func)

async def wrapper(query, **kwargs):

# Create cache key from query and parameters

cache_data = {"query": query, **kwargs}

cache_key = f"query:{hashlib.md5(json.dumps(cache_data, sort_keys=True).encode()).hexdigest()}"

cached_result = self.redis.get(cache_key)

if cached_result:

return json.loads(cached_result)

result = await func(query, **kwargs)

self.redis.setex(

cache_key,

self.query_cache_ttl,

json.dumps(result, default=str)

)

return result

return wrapper

redis_client = redis.Redis(host='localhost', port=6379, db=0)

rag_cache = RAGCache(redis_client)

@rag_cache.cache_embeddings

def get_embedding(text: str):

return embedding_model.get_text_embedding(text)

@rag_cache.cache_queries

async def cached_query(query: str, top_k: int = 5):

return await query_engine.aquery(query)

Error Handling and Resilience

Production RAG systems must gracefully handle various failure modes, from API rate limits to vector store connectivity issues.

python
import asyncio

from tenacity import retry, stop_after_attempt, wait_exponential

from contextlib import asynccontextmanager

import logging

class ResilientRAGPipeline:

def __init__(self, max_retries=3, backoff_factor=1.5):

self.max_retries = max_retries

self.backoff_factor = backoff_factor

self.circuit_breaker_failures = 0

self.circuit_breaker_threshold = 5

self.circuit_breaker_timeout = 300 # 5 minutes

self.circuit_breaker_last_failure = 0

@retry(

stop=stop_after_attempt(3),

wait=wait_exponential(multiplier=1, min=4, max=10)

)

async def resilient_query(self, query: str, **kwargs):

# Check circuit breaker

if self.is_circuit_breaker_open():

raise Exception("Circuit breaker is open")

try:

response = await query_engine.aquery(query)

self.circuit_breaker_failures = 0 # Reset on success

return response

except Exception as e:

self.circuit_breaker_failures += 1

self.circuit_breaker_last_failure = time.time()

if "rate_limit" in str(e).lower():

# Implement exponential backoff for rate limits

await asyncio.sleep(2 ** self.circuit_breaker_failures)

raise e

def is_circuit_breaker_open(self):

if self.circuit_breaker_failures < self.circuit_breaker_threshold:

return False

time_since_failure = time.time() - self.circuit_breaker_last_failure

if time_since_failure > self.circuit_breaker_timeout:

self.circuit_breaker_failures = 0 # Reset after timeout

return False

return True

async def fallback_query(self, query: str):

"""Provide degraded service when primary pipeline fails"""

# Implement keyword-based search as fallback

try:

return await simple_keyword_search(query)

except Exception:

return {"answer": "Service temporarily unavailable. Please try again later."}

Deployment and Monitoring Best Practices

Container Orchestration with Docker

Containerizing RAG components ensures consistent deployment across environments while enabling horizontal scaling based on demand.

dockerfile
FROM python:3.11-slim

WORKDIR /app

RUN apt-get update && apt-get install -y \

gcc \

g++ \

&& rm -rf /var/lib/apt/lists/*

COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt

COPY . .

ENV PYTHONPATH=/app

ENV [WORKERS](/workers)=4

HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \

CMD curl -f http://localhost:8000/health || exit 1

CMD ["gunicorn", "--worker-class", "uvicorn.workers.UvicornWorker", "--workers", "4", "--bind", "0.0.0.0:8000", "main:app"]

yaml
version: '3.8'

services:

rag-api:

build: .

ports:

- "8000:8000"

environment:

- OPENAI_API_KEY=${OPENAI_API_KEY}

- REDIS_URL=redis://redis:6379

- CHROMA_HOST=chroma

depends_on:

- redis

- chroma

volumes:

- ./data:/app/data

redis:

image: redis:7-alpine

ports:

- "6379:6379"

volumes:

- redis_data:/data

chroma:

image: chromadb/chroma:latest

ports:

- "8001:8000"

volumes:

- chroma_data:/chroma/chroma

volumes:

redis_data:

chroma_data:

Monitoring and Observability

Comprehensive monitoring provides visibility into system performance, user behavior, and potential issues before they impact users.

python
from prometheus_client import Counter, Histogram, Gauge, generate_latest

from opentelemetry import trace

from opentelemetry.exporter.jaeger.thrift import JaegerExporter

from opentelemetry.sdk.trace import TracerProvider

from opentelemetry.sdk.trace.export import BatchSpanProcessor

import time

query_counter = Counter('rag_queries_total', 'Total number of queries', ['status'])

query_duration = Histogram('rag_query_duration_seconds', 'Query processing time')

active_connections = Gauge('rag_active_connections', 'Active WebSocket connections')

embedding_cache_hits = Counter('rag_embedding_cache_hits_total', 'Embedding cache hits')

trace.set_tracer_provider(TracerProvider())

tracer = trace.get_tracer(__name__)

jaeger_exporter = JaegerExporter(

agent_host_name="localhost",

agent_port=6831,

)

span_processor = BatchSpanProcessor(jaeger_exporter)

trace.get_tracer_provider().add_span_processor(span_processor)

class MonitoredQueryEngine:

def __init__(self, query_engine):

self.query_engine = query_engine

async def aquery(self, query: str, **kwargs):

with tracer.start_as_current_span("rag_query") as span:

span.set_attribute("query.length", len(query))

span.set_attribute("query.top_k", kwargs.get('top_k', 5))

start_time = time.time()

try:

response = await self.query_engine.aquery(query, **kwargs)

# Record success metrics

query_counter.labels(status='success').inc()

span.set_attribute("response.source_nodes_count", len(response.source_nodes))

span.set_status(trace.Status(trace.StatusCode.OK))

return response

except Exception as e:

# Record failure metrics

query_counter.labels(status='error').inc()

span.record_exception(e)

span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))

raise

finally:

# Record timing

duration = time.time() - start_time

query_duration.observe(duration)

span.set_attribute("query.duration", duration)

@app.get("/metrics")

def get_metrics():

return Response(generate_latest(), media_type="text/plain")

@app.get("/health")

async def health_check():

health_status = {

"status": "healthy",

"timestamp": time.time(),

"version": os.getenv("APP_VERSION", "unknown"),

"checks": {}

}

# Check vector store connectivity

try:

await vector_store.ping()

health_status["checks"]["vector_store"] = "ok"

except Exception as e:

health_status["checks"]["vector_store"] = f"error: {str(e)}"

health_status["status"] = "degraded"

# Check Redis connectivity

try:

redis_client.ping()

health_status["checks"]["cache"] = "ok"

except Exception as e:

health_status["checks"]["cache"] = f"error: {str(e)}"

health_status["status"] = "degraded"

return health_status

💡
Pro TipImplement gradual rollouts for pipeline changes using feature flags. This allows you to test new retrieval strategies or model updates with a subset of users before full deployment.

Performance Testing and Optimization

Regular performance testing ensures your RAG pipeline can handle expected load while maintaining acceptable response times.

python
import asyncio

import aiohttp

import time

from dataclasses import dataclass

from typing import List

@dataclass

class LoadTestResult:

total_requests: int

successful_requests: int

failed_requests: int

average_response_time: float

p95_response_time: float

requests_per_second: float

async def load_test_rag_endpoint(base_url: str, queries: List[str], concurrent_users: int = 10, duration_seconds: int = 60):

"""Performance test the RAG endpoint under load"""

results = []

start_time = time.time()

async def make_request(session, query):

request_start = time.time()

try:

async with session.post(f"{base_url}/query", json={"query": query}) as response:

await response.json()

return time.time() - request_start, response.status == 200

except Exception as e:

return time.time() - request_start, False

async with aiohttp.ClientSession() as session:

tasks = []

query_index = 0

while time.time() - start_time < duration_seconds:

if len(tasks) < concurrent_users:

query = queries[query_index % len(queries)]

task = asyncio.create_task(make_request(session, query))

tasks.append(task)

query_index += 1

# Collect completed tasks

done_tasks = [task for task in tasks if task.done()]

for task in done_tasks:

try:

duration, success = await task

results.append((duration, success))

except Exception:

results.append((0, False))

tasks.remove(task)

await asyncio.sleep(0.1) # Small delay to prevent overwhelming

# Wait for remaining tasks

for task in tasks:

try:

duration, success = await task

results.append((duration, success))

except Exception:

results.append((0, False))

# Calculate metrics

response_times = [r[0] for r in results]

successful_requests = sum(1 for r in results if r[1])

response_times.sort()

p95_index = int(0.95 * len(response_times))

return LoadTestResult(

total_requests=len(results),

successful_requests=successful_requests,

failed_requests=len(results) - successful_requests,

average_response_time=sum(response_times) / len(response_times),

p95_response_time=response_times[p95_index] if response_times else 0,

requests_per_second=len(results) / duration_seconds

)

if __name__ == "__main__":

test_queries = [

"What are the best practices for property management?",

"How do I handle tenant complaints effectively?",

"What maintenance scheduling [tools](/free-tools) are recommended?"

]

result = asyncio.run(load_test_rag_endpoint(

"http://localhost:8000",

test_queries,

concurrent_users=20,

duration_seconds=120

))

print(f"Load Test Results:")

print(f"Total Requests: {result.total_requests}")

print(f"Success Rate: {result.successful_requests / result.total_requests * 100:.2f}%")

print(f"Average Response Time: {result.average_response_time:.3f}s")

print(f"95th Percentile: {result.p95_response_time:.3f}s")

print(f"Requests/Second: {result.requests_per_second:.2f}")

⚠️
WarningAlways test RAG pipeline changes in a staging environment that mirrors production data volumes and query patterns. Performance characteristics can vary significantly with different document collections and query types.

Scaling and Future-Proofing Your RAG Implementation

Multi-Modal and Advanced Retrieval Strategies

As your RAG system matures, consider implementing advanced retrieval strategies that go beyond simple similarity search. Hybrid retrieval combining dense and sparse retrieval often produces superior results.

python
from llama_index.core.retrievers import BaseRetriever

from llama_index.retrievers.bm25 import BM25Retriever

from typing import List

class HybridRetriever(BaseRetriever):

def __init__(self, vector_retriever, bm25_retriever, alpha=0.7):

self.vector_retriever = vector_retriever

self.bm25_retriever = bm25_retriever

self.alpha = alpha # Weight for vector search vs BM25

def _retrieve(self, query_bundle) -> List[NodeWithScore]:

# Get results from both retrievers

vector_results = self.vector_retriever.retrieve(query_bundle)

bm25_results = self.bm25_retriever.retrieve(query_bundle)

# Combine and re-rank results

all_results = {}

# Add vector search results

for result in vector_results:

node_id = result.node.node_id

all_results[node_id] = {

'node': result.node,

'vector_score': result.score,

'bm25_score': 0.0

}

# Add BM25 results

for result in bm25_results:

node_id = result.node.node_id

if node_id in all_results:

all_results[node_id]['bm25_score'] = result.score

else:

all_results[node_id] = {

'node': result.node,

'vector_score': 0.0,

'bm25_score': result.score

}

# Calculate hybrid scores

final_results = []

for node_id, data in all_results.items():

hybrid_score = (self.alpha * data['vector_score'] +

(1 - self.alpha) * data['bm25_score'])

final_results.append(NodeWithScore(

node=data['node'],

score=hybrid_score

))

# Sort by hybrid score and return top results

final_results.sort(key=lambda x: x.score, reverse=True)

return final_results[:self.similarity_top_k]

Production RAG systems increasingly need to handle multiple document types and data sources. PropTechUSA.ai's implementation demonstrates how specialized retrievers can be combined to query across property listings, maintenance records, and regulatory documents simultaneously, providing comprehensive responses to complex real estate queries.

The key to successful LlamaIndex RAG pipeline implementation lies in treating it as an iterative process. Start with a solid foundation of proper document processing, reliable vector storage, and comprehensive monitoring. As your system scales and requirements evolve, you can introduce advanced features like hybrid retrieval, specialized fine-tuned models, and intelligent query routing.

Remember that the most sophisticated RAG pipeline is only as good as the quality of documents you feed into it and the relevance of the retrieved results to your users' needs. Focus on building robust data ingestion processes, implementing comprehensive testing strategies, and maintaining close feedback loops with your users to ensure your system continues to deliver value at scale.

🚀 Ready to Build?

Let's discuss how we can help with your project.

Start Your Project →