Part 05 of 15RAG PipelineChromaDBEmbeddingsC# IntegrationFreeLearning365
From Zero to Private AI System — Complete Series
RAG — Teach AI Your Company's Own Data: Complete Pipeline Guide
Your AI knows everything the internet taught it — and nothing about your company. It does not know your return policy, your customer contracts, your product catalog, your HR rules, or your 3-year sales history. RAG (Retrieval Augmented Generation) is the technology that fixes this. This part builds a complete RAG pipeline from scratch — embeddings, ChromaDB vector store, intelligent chunking, semantic search, a full Python RAG service, Bengali document support, and a production-ready C# ASP.NET Core integration that lets your ERP query your company knowledge base.
By @FreeLearning365Part 05 — RAG PipelineRead time: ~50 minComplete Python RAG serviceC# ASP.NET Core integration
"An employee asks your AI: 'What is our return policy for damaged goods?' The AI confidently answers: 'Typically companies allow 30 days for returns...' — which is wrong. Your actual policy allows 7 days for visible damage and 14 days for hidden defects. The AI is not lying — it simply has no access to your policy document. RAG gives it that access. After this part, when an employee asks about your return policy, the AI will read the actual document you wrote, quote the exact clause, and answer correctly every single time."
What this post covers
- What RAG is and why it matters
- The 7-stage RAG pipeline explained
- What embeddings are — plain language
- Embedding model selection & setup
- Document chunking — strategies & code
- ChromaDB setup and configuration
- Alternative vector databases compared
- Indexing company documents — full code
- Semantic search implementation
- Building the complete RAG query engine
- Bengali document RAG support
- ERP data export → RAG pipeline
- Multi-collection namespace strategy
- FastAPI RAG microservice — complete code
- C# ASP.NET Core RAG integration
- Real company document test scenarios
- RAG evaluation & quality measurement
- Troubleshooting — 8 common failures
- Do's, Don'ts, Limitations, SEO & banner
Section 1 — What RAG is and why it is the most important AI capability for business
Every LLM you have run so far — LLaMA, Mistral, Qwen — was trained on massive datasets of internet text, books, and code. That training ended months or years ago. The model knows nothing about what happened after that date, and it knows absolutely nothing about your specific company.
RAG (Retrieval Augmented Generation) solves this by adding a retrieval step before generation. Instead of the AI guessing from training memory, it first searches your document store, retrieves the most relevant passages, and uses those passages as context when generating its answer. The result is an AI that speaks from your actual documents rather than from internet averages.
Before RAG vs After RAG — the exact same question, completely different results
Without RAG — AI guesses from training memory
User: What is Dhaka Traders Ltd.'s payment term for credit customers?
AI (without RAG): "Most wholesale companies offer 30 to 60 day payment terms
for credit customers. Standard industry practice is NET-30 for established
accounts, with potential extension to NET-60 for high-volume buyers."
# This answer sounds reasonable but is WRONG for your company.
# Your actual policy is NET-45 with a 2% discount for payment within 10 days.
# The AI invented a plausible answer using general industry knowledge.
With RAG — AI reads your actual policy document
# RAG retrieves this chunk from your credit_policy.pdf:
Retrieved chunk (similarity: 0.94):
"Section 3.2 — Credit Customer Payment Terms:
Standard payment terms for approved credit accounts are NET-45 days from
invoice date. A 2% early payment discount is available for payments received
within 10 days of invoice date (2/10 NET-45). Accounts exceeding 45 days
will incur a 1.5% monthly late charge. Credit limits are reviewed annually."
User: What is Dhaka Traders Ltd.'s payment term for credit customers?
AI (with RAG): "According to your credit policy, standard payment terms for
approved credit accounts are NET-45 days from invoice date. Additionally,
customers can earn a 2% discount if they pay within 10 days (2/10 NET-45).
Late payments beyond 45 days attract a 1.5% monthly charge, and credit
limits are reviewed each year."
# 100% accurate. Directly sourced from your actual document.
# AI cites the retrieved content rather than guessing.
Section 2 — The 7-stage RAG pipeline: Every step explained
RAG is not a single operation — it is a pipeline of seven distinct stages. Understanding each stage is essential because failures in any stage cascade into poor final answers. Here is the complete pipeline with what happens at each step.
Document ingestion — load your source documents
Load raw documents from disk: PDFs, Word files, text files, CSV exports from your ERP. Extract text while preserving structure. Handle Bengali Unicode text. This is where your company knowledge enters the pipeline.
Text chunking — split documents into searchable pieces
Split long documents into overlapping chunks of 500–1500 characters. Overlap of 10–15% ensures no information is lost at chunk boundaries. Each chunk must be self-contained enough to answer a question on its own.
Embedding generation — convert text to vectors
Send each chunk to the embedding model (nomic-embed-text via Ollama). The model returns a 768-dimensional vector — a list of numbers that mathematically represents the semantic meaning of that chunk. Similar meaning → similar vectors.
Vector storage — index chunks in ChromaDB
Store each chunk's text, its embedding vector, and metadata (source file, page number, section name, date) in ChromaDB. ChromaDB persists this to disk so you never re-embed unless documents change.
Query embedding — convert the user's question to a vector
When a user asks a question, embed that question using the same embedding model. The resulting vector represents the semantic meaning of the question — what the user is actually asking about, not just the exact words used.
Semantic search — find the most relevant chunks
ChromaDB computes cosine similarity between the query vector and all stored chunk vectors. Returns the top-K most similar chunks (typically 3–7). This retrieval works on meaning, not keywords — it finds "payment terms" when the user asks "how long do I have to pay?"
Augmented generation — AI answers using retrieved context
Construct a prompt: "Given the following context from company documents: [retrieved chunks]. Answer the user's question: [question]. Use only the provided context. If the answer is not in the context, say so." Send to Ollama. The AI generates an answer grounded in your actual documents.
Section 3 — Embeddings: What they are and why cosine similarity works
An embedding is the translation of text into a point in high-dimensional space. The embedding model learns to place similar-meaning text near each other in this space, regardless of the exact words used. This is why RAG can find "payment terms" when someone asks "how long before an invoice is due" — the meanings are geometrically close even though no words overlap.
Embedding similarity demo — same query, different chunks, cosine similarity scores
Query: "How long do credit customers have to pay?"
"NET-45 days from invoice date for approved credit accounts. 2% discount for payment within 10 days."
→
[0.023, -0.041, 0.187...]
0.94 match
"Outstanding invoices beyond 45 days will incur a 1.5% monthly late payment charge."
→
[0.018, -0.037, 0.162...]
0.87 match
"All deliveries must be accompanied by a signed delivery note and vehicle number."
→
[-0.091, 0.224, -0.043...]
0.12 match
"New customer credit applications require 2 trade references and 6 months bank statement."
→
[0.011, -0.029, 0.143...]
0.61 match
The first two chunks score highly because their meaning is semantically close to the question — even though the question uses "how long" while the document uses "days" and "NET-45". The delivery note chunk scores near zero because it is about a completely different topic. This meaning-based retrieval is what makes RAG vastly superior to keyword search.
Section 4 — Embedding model setup: nomic-embed-text via Ollama
Install embedding model and verify it works
# Pull the embedding model (274MB — very small)
ollama pull nomic-embed-text
# Verify it generates embeddings correctly:
curl http://localhost:11434/api/embeddings \
-H "Content-Type: application/json" \
-d '{
"model": "nomic-embed-text",
"prompt": "What is our return policy for damaged goods?"
}'
# Expected response structure:
# {
# "embedding": [
# 0.023064255, -0.014368402, -0.009838253,
# 0.035714887, -0.024517306, 0.059927620,
# ... (768 total float values)
# ]
# }
# The embedding vector has 768 dimensions for nomic-embed-text
# Alternative embedding models via Ollama:
ollama pull mxbai-embed-large # 1024 dimensions — higher quality, larger
ollama pull all-minilm # 384 dimensions — fastest, smallest
# Recommendation: Use nomic-embed-text as default
# Switch to mxbai-embed-large only if retrieval quality is insufficient
Python embedding helper — reusable throughout the pipeline
#!/usr/bin/env python3
"""
File: rag/embeddings.py
Embedding utilities for the company RAG pipeline.
"""
import requests
from typing import List
import numpy as np
OLLAMA_URL = "http://localhost:11434"
EMBED_MODEL = "nomic-embed-text"
EMBED_DIM = 768
def get_embedding(text: str) -> List[float]:
"""Generate embedding for a single text string."""
response = requests.post(
f"{OLLAMA_URL}/api/embeddings",
json={"model": EMBED_MODEL, "prompt": text},
timeout=30
)
response.raise_for_status()
return response.json()["embedding"]
def get_embeddings_batch(texts: List[str]) -> List[List[float]]:
"""Generate embeddings for multiple texts efficiently."""
return [get_embedding(t) for t in texts]
def cosine_similarity(vec_a: List[float], vec_b: List[float]) -> float:
"""Calculate cosine similarity between two embedding vectors."""
a = np.array(vec_a)
b = np.array(vec_b)
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
# Test the embedding utility:
if __name__ == "__main__":
q = get_embedding("payment terms for credit customers")
d1 = get_embedding("NET-45 days from invoice date for credit accounts")
d2 = get_embedding("delivery must be accompanied by signed note")
print(f"Query vs payment chunk: {cosine_similarity(q, d1):.4f}")
# Query vs payment chunk: 0.8934 ← High similarity ✓
print(f"Query vs delivery chunk: {cosine_similarity(q, d2):.4f}")
# Query vs delivery chunk: 0.1247 ← Low similarity ✓
Section 5 — Document chunking: The most critical and most misunderstood step
Chunking is where most RAG implementations fail silently. Chunks that are too small lose context. Chunks that are too large dilute relevance. Chunks with no overlap lose information at boundaries. Getting chunking right is 50% of RAG quality.
The three chunking strategies with trade-offs
Fixed-size chunking with overlap — recommended default
Source text: "Section 3.2 — Credit Customer Payment Terms: Standard payment terms for approved credit accounts are NET-45 days from invoice date. A 2% early payment discount is available for payments received within 10 days of invoice date (2/10 NET-45). Accounts exceeding 45 days will incur a 1.5% monthly late charge..."
Chunk 1 (chars 0–500)
Section 3.2 — Credit Customer Payment Terms: Standard payment terms for approved credit accounts are NET-45 days from invoice date. A 2% early payment discount is available for payments received within 10 days of invoice date (2/10 NET-45). Accounts exceeding 45 days will incur a 1.5% monthly late charge. Credit limits are reviewed annually by the Finance Manager.
Overlap zone (chars 450–550) — shared between chunk 1 and chunk 2
Credit limits are reviewed annually by the Finance Manager. New credit applications require...
Chunk 2 (chars 450–950) — starts in overlap zone
Credit limits are reviewed annually by the Finance Manager. New credit applications require 2 trade references and 6 months of bank statements. Credit decisions are made within 5 business days. The maximum initial credit limit for new accounts is Tk. 5,00,000...
Complete chunking implementation — three strategies
#!/usr/bin/env python3
"""
File: rag/chunker.py
Document chunking strategies for the RAG pipeline.
"""
import re
from typing import List, Dict, Any
from dataclasses import dataclass
@dataclass
class Chunk:
text: str
chunk_id: str
source_file: str
chunk_index: int
char_start: int
char_end: int
metadata: Dict[str, Any]
class DocumentChunker:
def __init__(self,
chunk_size: int = 1000,
overlap: int = 150,
min_size: int = 100):
self.chunk_size = chunk_size
self.overlap = overlap
self.min_size = min_size
# ── Strategy 1: Fixed-size sliding window (recommended default) ──────
def chunk_fixed(self, text: str, source: str) -> List[Chunk]:
"""Split by character count with overlap. Best general-purpose strategy."""
chunks = []
start = 0
idx = 0
while start < len(text):
end = min(start + self.chunk_size, len(text))
chunk_text = text[start:end].strip()
if len(chunk_text) >= self.min_size:
chunks.append(Chunk(
text = chunk_text,
chunk_id = f"{source}_chunk_{idx:04d}",
source_file = source,
chunk_index = idx,
char_start = start,
char_end = end,
metadata = {"strategy": "fixed", "source": source}
))
idx += 1
start += self.chunk_size - self.overlap # Slide forward with overlap
return chunks
# ── Strategy 2: Sentence-aware chunking (better for Q&A) ─────────────
def chunk_sentences(self, text: str, source: str) -> List[Chunk]:
"""Group sentences into chunks. Preserves sentence boundaries."""
# Split on sentence boundaries (handles English and Bengali)
sentences = re.split(r'(?<=[।.!?])\s+', text)
chunks, current, start_char, idx = [], [], 0, 0
for sent in sentences:
current.append(sent)
current_text = " ".join(current)
if len(current_text) >= self.chunk_size:
chunks.append(Chunk(
text = current_text.strip(),
chunk_id = f"{source}_sent_{idx:04d}",
source_file = source,
chunk_index = idx,
char_start = start_char,
char_end = start_char + len(current_text),
metadata = {"strategy": "sentence", "source": source}
))
# Keep last 2 sentences for overlap context
current = current[-2:] if len(current) > 2 else []
start_char += len(current_text)
idx += 1
if current and len(" ".join(current)) >= self.min_size:
chunks.append(Chunk(
text = " ".join(current).strip(),
chunk_id = f"{source}_sent_{idx:04d}",
source_file = source,
chunk_index = idx,
char_start = start_char,
char_end = len(text),
metadata = {"strategy": "sentence", "source": source}
))
return chunks
# ── Strategy 3: Section-aware chunking (best for policy documents) ───
def chunk_sections(self, text: str, source: str) -> List[Chunk]:
"""Split on section headers. Best for structured policy/HR documents."""
# Detect headers: "Section X", numbered headings, ALL CAPS lines
section_pattern = re.compile(
r'(?m)^(?:Section\s+[\d.]+|[\d]+\.|[A-Z\s]{5,}:?)\s*$'
)
splits = list(section_pattern.finditer(text))
chunks = []
for i, match in enumerate(splits):
section_start = match.start()
section_end = splits[i+1].start() if i+1 < len(splits) else len(text)
section_text = text[section_start:section_end].strip()
section_name = match.group().strip()
if len(section_text) >= self.min_size:
# If section is very long, sub-chunk it
if len(section_text) > self.chunk_size * 2:
sub_chunks = self.chunk_fixed(section_text, source)
for sc in sub_chunks:
sc.metadata["section"] = section_name
chunks.append(sc)
else:
chunks.append(Chunk(
text = section_text,
chunk_id = f"{source}_sec_{i:04d}",
source_file = source,
chunk_index = i,
char_start = section_start,
char_end = section_end,
metadata = {"strategy": "section",
"section": section_name, "source": source}
))
return chunks
# Chunking strategy selection guide:
# Policy documents (HR, Finance): chunk_sections → preserves rule structure
# Long narrative reports: chunk_sentences → preserves readability
# Product catalogs, CSVs, lists: chunk_fixed → consistent size, fast
# Mixed / unknown: chunk_fixed → safe default
Section 6 — ChromaDB: Setup, configuration, and collection design
ChromaDB is the recommended vector database for starting your company RAG deployment. It is free, open source, runs entirely locally as a Python process or persistent server, requires no infrastructure setup beyond pip install, and has an excellent Python API. It stores vector embeddings alongside the original text and arbitrary metadata, and queries return both the nearest chunks and their similarity scores.
ChromaDB
Recommended — start here
Pure Python, zero infrastructure, persistent on disk. Can run embedded in your Python process or as a standalone HTTP server. Perfect for company deployments up to ~500K documents.
+ Easiest setup (pip install chromadb), built-in persistence, great Python API, metadata filtering, Docker-deployable server mode
- Not designed for billion-scale; RAM grows with collection size
Qdrant
Production scale
Dedicated vector database with REST API, Docker deployment, and excellent filtering. Better than ChromaDB for large collections and concurrent users. More setup required.
+ REST API, Docker-native, advanced filtering, payload indexing, production-grade performance
- Separate service to manage, more complex than ChromaDB for getting started
FAISS
Maximum speed
Facebook's library — fastest similarity search, used at massive scale. No built-in persistence (must save/load manually). Best when you need maximum retrieval speed and control all other aspects yourself.
+ Blazing fast retrieval, GPU acceleration support, battle-tested at scale
- No metadata filtering, no persistence, more manual work
Milvus
Enterprise scale
Enterprise-grade distributed vector database. Handles billions of vectors. Kubernetes-deployable. Overkill for most companies but the right choice if you plan to index millions of documents.
+ Billion-scale, distributed, enterprise features, Kubernetes native
- Significant infrastructure investment; start with ChromaDB and migrate later
ChromaDB installation and server setup
# Install ChromaDB
pip install chromadb --break-system-packages
# Option A: Embedded mode (Python process — simplest)
import chromadb
client = chromadb.PersistentClient(path="/data/company-rag/chromadb")
# Data persists to disk at the specified path
# Option B: HTTP server mode (recommended for production — shared by multiple apps)
# Start the server (runs on port 8000):
chroma run --path /data/company-rag/chromadb --host 0.0.0.0 --port 8000
# Connect from Python (embedded or server):
import chromadb
# Embedded:
client = chromadb.PersistentClient(path="/data/company-rag/chromadb")
# HTTP server:
client = chromadb.HttpClient(host="localhost", port=8000)
# Docker Compose deployment (recommended for production):
# docker-compose.yml entry:
chromadb:
image: chromadb/chroma:latest
container_name: company-chromadb
restart: always
ports:
- "8000:8000"
volumes:
- ./data/chromadb:/chroma/chroma
environment:
- CHROMA_SERVER_AUTH_PROVIDER=chromadb.auth.token.TokenAuthServerProvider
- CHROMA_SERVER_AUTH_CREDENTIALS=your-secret-token-here
- CHROMA_SERVER_AUTH_TOKEN_TRANSPORT_HEADER=X-Chroma-Token
Collection namespace strategy — organizing your company knowledge
ChromaDB collection design — one collection per department/domain
import chromadb
client = chromadb.PersistentClient(path="/data/company-rag/chromadb")
# Create separate collections for different knowledge domains
# This allows department-specific RAG without cross-contamination
COLLECTIONS = {
"hr_policies": {
"description": "HR manuals, leave policy, code of conduct, job descriptions",
"metadata": {"department": "hr", "sensitivity": "internal"}
},
"finance_policies": {
"description": "Credit policy, payment terms, expense rules, VAT guidelines",
"metadata": {"department": "finance", "sensitivity": "confidential"}
},
"product_catalog": {
"description": "Product specs, pricing tiers, availability, lead times",
"metadata": {"department": "sales", "sensitivity": "internal"}
},
"customer_contracts": {
"description": "Customer agreements, SLAs, special terms per account",
"metadata": {"department": "sales", "sensitivity": "confidential"}
},
"operations_sops": {
"description": "Warehouse SOPs, inventory procedures, safety guidelines",
"metadata": {"department": "operations", "sensitivity": "internal"}
},
"erp_data_exports": {
"description": "Monthly sales data, customer history, inventory snapshots",
"metadata": {"department": "all", "sensitivity": "confidential"}
},
}
def get_or_create_collection(name: str) -> chromadb.Collection:
"""Get existing collection or create it if it doesn't exist."""
config = COLLECTIONS.get(name, {})
return client.get_or_create_collection(
name = name,
metadata = config.get("metadata", {})
)
# Create all collections:
for name in COLLECTIONS:
col = get_or_create_collection(name)
print(f"Collection '{name}': {col.count()} documents")
Section 7 — Indexing company documents: Complete ingestion pipeline
This is the core of the RAG system — loading your actual company documents, processing them through the chunker, generating embeddings, and storing everything in ChromaDB. The code below handles PDFs, Word documents, plain text files, and CSV exports from your ERP.
Install document processing dependencies
# All document processing dependencies
pip install \
pymupdf \ # PDF text extraction (PyMuPDF)
python-docx \ # Word document processing
chromadb \ # Vector database
requests \ # Ollama API calls
numpy \ # Vector math
pandas \ # CSV/Excel data processing
tqdm \ # Progress bars for long indexing jobs
--break-system-packages
File: rag/document_loader.py — load any document type
#!/usr/bin/env python3
"""Load and extract text from PDF, DOCX, TXT, and CSV files."""
import fitz # PyMuPDF
import docx
import pandas as pd
from pathlib import Path
from typing import Dict, Any
class DocumentLoader:
def load(self, file_path: str) -> Dict[str, Any]:
"""
Load a document and return its text content with metadata.
Returns: {text, filename, file_type, page_count, char_count}
"""
path = Path(file_path)
ext = path.suffix.lower()
loaders = {
".pdf": self._load_pdf,
".docx": self._load_docx,
".doc": self._load_docx,
".txt": self._load_text,
".md": self._load_text,
".csv": self._load_csv,
".xlsx": self._load_excel,
}
loader = loaders.get(ext)
if not loader:
raise ValueError(f"Unsupported file type: {ext}")
result = loader(file_path)
result["filename"] = path.name
result["file_type"] = ext
result["char_count"] = len(result.get("text", ""))
return result
def _load_pdf(self, path: str) -> Dict:
doc = fitz.open(path)
pages = []
for page_num, page in enumerate(doc):
text = page.get_text("text")
if text.strip():
pages.append(f"[Page {page_num + 1}]\n{text}")
doc.close()
return {"text": "\n\n".join(pages), "page_count": len(doc)}
def _load_docx(self, path: str) -> Dict:
document = docx.Document(path)
full_text = []
for para in document.paragraphs:
if para.text.strip():
prefix = "## " if para.style.name.startswith("Heading") else ""
full_text.append(f"{prefix}{para.text}")
return {"text": "\n".join(full_text), "page_count": 1}
def _load_text(self, path: str) -> Dict:
text = Path(path).read_text(encoding="utf-8")
return {"text": text, "page_count": 1}
def _load_csv(self, path: str) -> Dict:
df = pd.read_csv(path, encoding="utf-8-sig")
rows = []
for _, row in df.iterrows():
# Convert each row to a natural language sentence for embedding
row_text = ". ".join([f"{col}: {val}" for col, val in row.items()
if pd.notna(val)])
rows.append(row_text)
return {"text": "\n".join(rows), "page_count": 1, "rows": len(df)}
def _load_excel(self, path: str) -> Dict:
df = pd.read_excel(path)
return self._load_csv.__wrapped__(self, path) if hasattr(self._load_csv, '__wrapped__') \
else {"text": df.to_string(index=False), "page_count": 1}
File: rag/indexer.py — complete document indexing pipeline
#!/usr/bin/env python3
"""
File: rag/indexer.py
Index company documents into ChromaDB for RAG retrieval.
"""
import chromadb, time, hashlib
from pathlib import Path
from typing import List
from tqdm import tqdm
from embeddings import get_embedding
from chunker import DocumentChunker, Chunk
from document_loader import DocumentLoader
class RAGIndexer:
def __init__(self, chroma_path: str = "/data/company-rag/chromadb"):
self.client = chromadb.PersistentClient(path=chroma_path)
self.loader = DocumentLoader()
self.chunker = DocumentChunker(chunk_size=1000, overlap=150)
def _file_hash(self, path: str) -> str:
"""MD5 hash of file — detect unchanged files to avoid re-indexing."""
h = hashlib.md5()
h.update(Path(path).read_bytes())
return h.hexdigest()[:12]
def index_file(self,
file_path: str,
collection_name: str,
extra_metadata: dict = None) -> int:
"""
Index a single file into a ChromaDB collection.
Returns number of chunks indexed.
"""
collection = self.client.get_or_create_collection(collection_name)
file_hash = self._file_hash(file_path)
filename = Path(file_path).name
# Skip if already indexed (same file hash)
existing = collection.get(where={"file_hash": {"$eq": file_hash}})
if existing["ids"]:
print(f" SKIPPED (unchanged): {filename} — {len(existing['ids'])} chunks already indexed")
return 0
# Load and chunk the document
print(f" Loading: {filename}")
doc_data = self.loader.load(file_path)
text = doc_data["text"]
# Choose chunking strategy based on file type
ext = Path(file_path).suffix.lower()
if ext in [".csv", ".xlsx"]:
chunks = self.chunker.chunk_fixed(text, filename)
elif ext in [".pdf", ".docx"]:
chunks = self.chunker.chunk_sections(text, filename)
else:
chunks = self.chunker.chunk_sentences(text, filename)
print(f" Chunked: {len(chunks)} chunks from {doc_data['char_count']:,} chars")
# Generate embeddings and index in batches of 50
batch_size = 50
indexed = 0
for i in tqdm(range(0, len(chunks), batch_size), desc=" Embedding"):
batch = chunks[i:i + batch_size]
texts = [c.text for c in batch]
embeddings = [get_embedding(t) for t in texts]
ids = [c.chunk_id for c in batch]
metadatas = [{
"source_file": c.source_file,
"chunk_index": c.chunk_index,
"char_start": c.char_start,
"char_end": c.char_end,
"file_hash": file_hash,
"file_type": ext,
**(extra_metadata or {}),
**c.metadata
} for c in batch]
collection.add(
ids = ids,
embeddings = embeddings,
documents = texts,
metadatas = metadatas
)
indexed += len(batch)
print(f" Indexed: {indexed} chunks → collection '{collection_name}'")
return indexed
def index_directory(self,
directory: str,
collection_name: str,
extensions: List[str] = None) -> int:
"""Index all matching files in a directory."""
extensions = extensions or [".pdf", ".docx", ".txt", ".csv"]
files = [f for ext in extensions
for f in Path(directory).glob(f"**/*{ext}")]
total = 0
print(f"\nIndexing {len(files)} files into '{collection_name}'...")
for file_path in files:
total += self.index_file(str(file_path), collection_name)
print(f"\nTotal chunks indexed: {total}")
return total
# ── Usage example — index your company documents ───────────────────────
if __name__ == "__main__":
indexer = RAGIndexer()
# Index HR policy documents
indexer.index_directory(
directory = "/company-docs/hr",
collection_name = "hr_policies",
extensions = [".pdf", ".docx"]
)
# Index finance documents
indexer.index_directory(
directory = "/company-docs/finance",
collection_name = "finance_policies"
)
# Index ERP monthly export with extra metadata
indexer.index_file(
file_path = "/erp-exports/sales_nov_2024.csv",
collection_name = "erp_data_exports",
extra_metadata = {"period": "2024-11", "data_type": "sales"}
)
Section 8 — The complete RAG query engine
File: rag/query_engine.py — semantic search + augmented generation
#!/usr/bin/env python3
"""
File: rag/query_engine.py
RAG query engine: retrieve relevant chunks then generate grounded answers.
"""
import chromadb, requests
from typing import List, Dict, Optional
from dataclasses import dataclass
from embeddings import get_embedding
OLLAMA_URL = "http://localhost:11434"
@dataclass
class RetrievedChunk:
text: str
source: str
score: float
metadata: Dict
@dataclass
class RAGResponse:
answer: str
retrieved_chunks: List[RetrievedChunk]
model_used: str
sources: List[str]
grounded: bool # True if answer came from retrieved context
class RAGQueryEngine:
def __init__(self,
chroma_path: str = "/data/company-rag/chromadb",
llm_model: str = "llama3.1:8b",
top_k: int = 5,
min_score: float = 0.5):
self.client = chromadb.PersistentClient(path=chroma_path)
self.llm_model = llm_model
self.top_k = top_k
self.min_score = min_score
def retrieve(self,
query: str,
collection_name: str,
filters: Optional[Dict] = None) -> List[RetrievedChunk]:
"""Embed the query and find the top-K most similar chunks."""
collection = self.client.get_or_create_collection(collection_name)
if collection.count() == 0:
return []
query_embedding = get_embedding(query)
query_params = {
"query_embeddings": [query_embedding],
"n_results": min(self.top_k, collection.count()),
"include": ["documents", "metadatas", "distances"]
}
if filters:
query_params["where"] = filters
results = collection.query(**query_params)
chunks = []
for doc, meta, dist in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0]
):
# ChromaDB returns distance (lower=better); convert to similarity
similarity = 1.0 - dist
if similarity >= self.min_score:
chunks.append(RetrievedChunk(
text = doc,
source = meta.get("source_file", "unknown"),
score = round(similarity, 4),
metadata = meta
))
return sorted(chunks, key=lambda c: c.score, reverse=True)
def generate(self,
query: str,
chunks: List[RetrievedChunk],
language: str = "auto") -> str:
"""Generate an answer using retrieved chunks as context."""
if not chunks:
return ("I could not find relevant information in the company documents "
"to answer this question. Please check with the relevant department.")
context_parts = []
for i, chunk in enumerate(chunks, 1):
context_parts.append(
f"[Source {i}: {chunk.source} | Relevance: {chunk.score:.2f}]\n{chunk.text}"
)
context = "\n\n---\n\n".join(context_parts)
lang_instruction = {
"bn": "Respond in formal Bengali (আনুষ্ঠানিক বাংলা).",
"en": "Respond in professional English.",
"auto": "Respond in the same language the question was asked in."
}.get(language, "Respond in the same language the question was asked in.")
prompt = f"""You are a company knowledge assistant for Dhaka Traders Ltd.
Answer the user's question using ONLY the context provided below.
{lang_instruction}
Rules:
- If the answer is clearly in the context: answer directly and precisely
- If the context partially answers: answer what you can, state what is missing
- If the answer is NOT in the context: say "This information is not available in the provided documents"
- Never invent information not present in the context
- When relevant, mention which source document the answer comes from
CONTEXT FROM COMPANY DOCUMENTS:
{context}
USER QUESTION: {query}
ANSWER:"""
response = requests.post(
f"{OLLAMA_URL}/api/chat",
json={
"model": self.llm_model,
"stream": False,
"options": {"temperature": 0.2, "num_ctx": 8192},
"messages": [{"role": "user", "content": prompt}]
},
timeout=120
)
response.raise_for_status()
return response.json()["message"]["content"]
def query(self,
question: str,
collection_name: str,
language: str = "auto",
filters: Optional[Dict] = None) -> RAGResponse:
"""Complete RAG pipeline: retrieve + generate."""
chunks = self.retrieve(question, collection_name, filters)
answer = self.generate(question, chunks, language)
sources = list({c.source for c in chunks})
return RAGResponse(
answer = answer,
retrieved_chunks = chunks,
model_used = self.llm_model,
sources = sources,
grounded = len(chunks) > 0
)
Real RAG test — company policy document queries
Testing the RAG engine with real company document queries
# Test the complete pipeline with real company scenarios
engine = RAGQueryEngine(
chroma_path = "/data/company-rag/chromadb",
llm_model = "llama3.1:8b",
top_k = 5,
min_score = 0.45
)
# ── Test 1: Finance policy query ───────────────────────────────────────
response = engine.query(
question = "What is the payment term for credit customers and what happens if they pay late?",
collection_name = "finance_policies"
)
print("=== Test 1: Finance Policy ===")
print(f"Answer:\n{response.answer}")
print(f"\nRetrieved {len(response.retrieved_chunks)} chunks:")
for c in response.retrieved_chunks:
print(f" [{c.score:.3f}] {c.source} — {c.text[:80]}...")
"""
=== Test 1: Finance Policy ===
Answer:
According to the credit policy document, approved credit accounts are on NET-45
day terms from invoice date. Customers can receive a 2% discount if payment is
made within 10 days (2/10 NET-45 terms).
For late payments — any amount unpaid beyond 45 days incurs a 1.5% monthly
late charge on the outstanding balance. Additionally, accounts may be placed
on credit hold after 60 days, requiring Finance Manager approval to resume.
Source: credit_policy.pdf (Sections 3.2 and 3.4)
Retrieved 3 chunks:
[0.941] credit_policy.pdf — NET-45 days from invoice date for approved credit accounts...
[0.872] credit_policy.pdf — Outstanding invoices beyond 45 days will incur a 1.5%...
[0.634] credit_policy.pdf — Accounts on credit hold require Finance Manager sign-off...
"""
# ── Test 2: Bengali language query ────────────────────────────────────
response_bn = engine.query(
question = "আমাদের ছুটির নীতি কী? বার্ষিক কত দিন ছুটি পাওয়া যায়?",
collection_name = "hr_policies",
language = "bn"
)
print("\n=== Test 2: Bengali HR Policy Query ===")
print(f"Answer:\n{response_bn.answer}")
"""
=== Test 2: Bengali HR Policy Query ===
Answer:
আমাদের ছুটির নীতি অনুযায়ী (HR Policy Manual, Section 5):
বার্ষিক ছুটি: প্রতিটি নিশ্চিত কর্মচারী বছরে ১৫ দিন বার্ষিক ছুটি পাওয়ার
অধিকারী। ছুটি ভোগের জন্য কমপক্ষে ৭ দিন আগে আবেদন করতে হবে।
অসুস্থতার ছুটি: বছরে সর্বোচ্চ ১০ দিন অসুস্থতার ছুটি অনুমোদিত। ৩ দিনের
বেশি অসুস্থতার জন্য ডাক্তারের সার্টিফিকেট বাধ্যতামূলক।
নৈমিত্তিক ছুটি: বছরে ৩ দিন নৈমিত্তিক ছুটি পাওয়া যাবে।
সূত্র: HR Policy Manual — Section 5: Leave Entitlement
"""
# ── Test 3: ERP data query — sales history ────────────────────────────
response_erp = engine.query(
question = "Which customers had the highest orders in November 2024?",
collection_name = "erp_data_exports",
filters = {"period": {"$eq": "2024-11"}}
)
print("\n=== Test 3: ERP Sales Data Query ===")
print(f"Answer:\n{response_erp.answer}")
"""
=== Test 3: ERP Sales Data Query ===
Answer:
Based on the November 2024 sales export data, the top customers by order value were:
1. Hossain Brothers — Tk. 6,90,000 (12 orders)
2. Karim Textiles — Tk. 5,12,000 (9 orders)
3. Narayan Exports — Tk. 4,48,000 (7 orders)
The total sales for November 2024 across all customers were Tk. 20,95,000.
Note: Rahman Garments and Dhaka Fabrics both showed significant declines versus
October 2024, with drops of 39% and 58% respectively.
Source: sales_nov_2024.csv
"""
Section 9 — FastAPI RAG microservice: Production-ready HTTP API
File: rag/api.py — complete FastAPI service with authentication
#!/usr/bin/env python3
"""
File: rag/api.py
FastAPI microservice that exposes the RAG pipeline as a REST API.
Called by the C# ERP integration in Section 10.
Run: uvicorn api:app --host 0.0.0.0 --port 8080
"""
from fastapi import FastAPI, HTTPException, Depends, Header
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import List, Optional, Dict
from query_engine import RAGQueryEngine
import os, logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("rag-api")
app = FastAPI(title="Company RAG API", version="1.0.0")
engine = RAGQueryEngine()
API_TOKEN = os.getenv("RAG_API_TOKEN", "change-this-secret-token")
app.add_middleware(
CORSMiddleware,
allow_origins = ["http://localhost:5000", "http://192.168.1.100:5000"],
allow_methods = ["POST", "GET"],
allow_headers = ["*"],
)
def verify_token(x_api_token: str = Header(...)):
if x_api_token != API_TOKEN:
raise HTTPException(status_code=401, detail="Invalid API token")
class RAGQueryRequest(BaseModel):
question: str
collection: str = "hr_policies"
language: str = "auto"
top_k: int = 5
min_score: float = 0.45
filters: Optional[Dict] = None
class ChunkResponse(BaseModel):
text: str
source: str
score: float
class RAGQueryResponse(BaseModel):
answer: str
sources: List[str]
chunks: List[ChunkResponse]
grounded: bool
@app.post("/api/rag/query",
response_model=RAGQueryResponse,
dependencies=[Depends(verify_token)])
async def rag_query(request: RAGQueryRequest):
logger.info(f"RAG query: collection={request.collection} q={request.question[:60]}")
engine.top_k = request.top_k
engine.min_score = request.min_score
result = engine.query(
question = request.question,
collection_name = request.collection,
language = request.language,
filters = request.filters
)
return RAGQueryResponse(
answer = result.answer,
sources = result.sources,
grounded = result.grounded,
chunks = [
ChunkResponse(text=c.text[:300], source=c.source, score=c.score)
for c in result.retrieved_chunks
]
)
@app.get("/api/rag/collections", dependencies=[Depends(verify_token)])
async def list_collections():
return {"collections": [c.name for c in engine.client.list_collections()]}
@app.get("/health")
async def health():
return {"status": "ok", "model": engine.llm_model}
# Start command:
# RAG_API_TOKEN=your-secret uvicorn api:app --host 0.0.0.0 --port 8080 --workers 2
Section 10 — C# ASP.NET Core integration: Query RAG from your ERP
File: Services/RAGService.cs — complete C# RAG client
using Microsoft.AspNetCore.Mvc;
using System.Text.Json;
using System.Text.Json.Serialization;
namespace DhakaTraders.ERP.Services
{
/// <summary>Client for the Python RAG microservice.</summary>
public interface IRAGService
{
Task<RAGQueryResult> QueryAsync(RAGQueryRequest request);
Task<string> AskHRAsync(string question, string language = "auto");
Task<string> AskFinanceAsync(string question);
Task<string> AskSalesDataAsync(string question, string? period = null);
}
public class RAGService : IRAGService
{
private readonly IHttpClientFactory _http;
private readonly ILogger<RAGService> _log;
private readonly RAGServiceOptions _opts;
public RAGService(IHttpClientFactory http,
ILogger<RAGService> log,
IOptions<RAGServiceOptions> opts)
{
_http = http;
_log = log;
_opts = opts.Value;
}
public async Task<RAGQueryResult> QueryAsync(RAGQueryRequest request)
{
var client = _http.CreateClient("RAGClient");
client.DefaultRequestHeaders.Add("X-Api-Token", _opts.ApiToken);
var json = JsonSerializer.Serialize(request,
new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower });
var content = new StringContent(json, System.Text.Encoding.UTF8, "application/json");
var response = await client.PostAsync($"{_opts.BaseUrl}/api/rag/query", content);
response.EnsureSuccessStatusCode();
var body = await response.Content.ReadAsStringAsync();
var result = JsonSerializer.Deserialize<RAGQueryResult>(body,
new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
_log.LogInformation("RAG query completed. Grounded: {G} Sources: {S}",
result?.Grounded, string.Join(", ", result?.Sources ?? []));
return result ?? new RAGQueryResult();
}
public async Task<string> AskHRAsync(string question, string language = "auto")
{
var result = await QueryAsync(new RAGQueryRequest
{
Question = question,
Collection = "hr_policies",
Language = language,
TopK = 5,
MinScore = 0.45f
});
return result.Answer;
}
public async Task<string> AskFinanceAsync(string question)
{
var result = await QueryAsync(new RAGQueryRequest
{
Question = question,
Collection = "finance_policies",
TopK = 5,
MinScore = 0.50f
});
return result.Answer;
}
public async Task<string> AskSalesDataAsync(string question, string? period = null)
{
var filters = period != null
? new Dictionary<string, object> { ["period"] = period }
: null;
var result = await QueryAsync(new RAGQueryRequest
{
Question = question,
Collection = "erp_data_exports",
TopK = 7,
MinScore = 0.40f,
Filters = filters
});
return result.Answer;
}
}
public class RAGQueryRequest
{
public string Question { get; set; } = "";
public string Collection { get; set; } = "hr_policies";
public string Language { get; set; } = "auto";
public int TopK { get; set; } = 5;
public float MinScore { get; set; } = 0.45f;
public Dictionary<string, object>? Filters { get; set; }
}
public class RAGQueryResult
{
public string Answer { get; set; } = "";
public List<string> Sources { get; set; } = [];
public bool Grounded { get; set; }
}
public class RAGServiceOptions
{
public string BaseUrl { get; set; } = "http://localhost:8080";
public string ApiToken { get; set; } = "";
}
}
Program.cs — register RAG service in ASP.NET Core DI
// Program.cs additions:
builder.Services.Configure<RAGServiceOptions>(
builder.Configuration.GetSection("RAGService"));
builder.Services.AddHttpClient("RAGClient", client => {
client.Timeout = TimeSpan.FromSeconds(60);
});
builder.Services.AddScoped<IRAGService, RAGService>();
// appsettings.json:
{
"RAGService": {
"BaseUrl": "http://localhost:8080",
"ApiToken": "your-secret-token-here"
}
}
// Usage in any MVC controller:
public class HRController : Controller
{
private readonly IRAGService _rag;
public HRController(IRAGService rag) => _rag = rag;
[HttpGet("hr/policy-answer")]
public async Task<IActionResult> PolicyAnswer(string question, string lang = "auto")
{
var answer = await _rag.AskHRAsync(question, lang);
return Json(new { answer });
}
}
Section 11 — RAG troubleshooting: 8 common failures and exact fixes
Failure 1 — Retrieval returns wrong chunks (low relevance)
Cause: Chunks are too large (diluted meaning), too small (no context), or the embedding model is mismatched.
Fix: Try chunk_size=800 instead of 1000. Verify the same embedding model is used for both indexing and querying — mixing models breaks similarity scores entirely. Lower min_score from 0.5 to 0.4 if good chunks are being filtered out. Print retrieved chunks during development to verify they contain the expected content.
Failure 2 — AI gives correct-sounding but wrong answer despite good retrieval
Cause: The LLM is blending retrieved context with its training memory instead of staying grounded in the retrieved content.
Fix: Strengthen the constraint in your generation prompt: "Answer ONLY from the context. If you cannot find the answer in the context, say 'Not found in documents.' Do not use general knowledge." Lower temperature to 0.1. Use LLaMA 3.1 8B rather than smaller models — it follows grounding instructions more reliably.
Failure 3 — Bengali documents return poor results
Cause: nomic-embed-text has weaker multilingual support. Bengali semantic relationships are not as well represented in its vector space.
Fix: Switch embedding model to mxbai-embed-large (better multilingual). Pull with ollama pull mxbai-embed-large. Re-index all Bengali documents with the new model. Use Qwen2.5:7b as the generation model for Bengali RAG — its Bengali comprehension is significantly better than LLaMA 3.1 for grounded response generation.
Failure 4 — ChromaDB collection count returns 0 despite indexing
Cause: Using a different path in the query engine than in the indexer, or the PersistentClient path directory does not have write permissions.
Fix: Verify: python3 -c "import chromadb; c=chromadb.PersistentClient('/data/company-rag/chromadb'); print(c.list_collections())". Ensure the path is identical in both indexer and query engine. Check permissions: ls -la /data/company-rag/ — the process user must have read/write access.
Failure 5 — PDF text extraction returns empty or garbled text
Cause: The PDF is a scanned image (not text-based) or uses an unusual font encoding that PyMuPDF cannot decode.
Fix: Test: python3 -c "import fitz; doc=fitz.open('your.pdf'); print(doc[0].get_text())". If empty, it is a scanned PDF — run Tesseract OCR first: pip install pytesseract Pillow --break-system-packages then convert to text before indexing. Bengali PDF fonts can cause issues — try re-exporting the source document as UTF-8 text from Word.
Failure 6 — RAG response is slow (30+ seconds)
Cause: The generation model is running on CPU, or the context window is very large (too many retrieved chunks).
Fix: Reduce top_k from 5 to 3 — fewer chunks means shorter prompt means faster generation. Verify GPU is being used by Ollama during the generation step with ollama ps. The retrieval step (ChromaDB query) should take under 1 second — if it is slow, ChromaDB may be loading the collection from disk. Keep the engine instance alive (singleton pattern) so the collection stays warm in memory.
Failure 7 — Old/stale content returned after document update
Cause: The old chunks are still in ChromaDB after the source document was updated. The indexer's file hash detection prevents re-indexing if the hash changed, but old chunks remain.
Fix: Before re-indexing a changed file, delete the old chunks: collection.delete(where={"source_file": {"$eq": "old_policy.pdf"}}). Then re-index the updated file. Add this cleanup to your indexer's update workflow. For daily automated re-indexing, use the file_hash detection — only re-index files whose hash has changed.
Failure 8 — C# HttpClient call to RAG API times out
Cause: RAG generation is slow (model loading or CPU inference) and the default HttpClient timeout (100 seconds) is exceeded.
Fix: Set a longer timeout in Program.cs: client.Timeout = TimeSpan.FromSeconds(120);. Also check the FastAPI server is running: curl http://localhost:8080/health. For production, ensure the generation model is GPU-accelerated — CPU inference may exceed any reasonable timeout for complex queries with large context.
Section 12 — Do's, Don'ts, and limitations
Do — RAG best practices
- Use the same embedding model for index and query
- Test retrieval quality before tuning generation
- Use section-aware chunking for policy documents
- Add metadata (department, date, sensitivity) to all chunks
- Use separate collections per knowledge domain
- Store file hash to avoid unnecessary re-indexing
- Set min_score threshold — reject low-quality retrievals
- Print retrieved chunks during development to verify quality
- Use Bengali-capable models (Qwen2.5) for Bengali RAG
- Re-index when source documents change
- Use metadata filters to scope queries by department
- Log all RAG queries for quality monitoring
Don't — RAG mistakes to avoid
- Don't mix embedding models between indexing and querying
- Don't use chunk sizes below 200 or above 2000 characters
- Don't skip overlap — it causes boundary information loss
- Don't index scanned PDFs without OCR preprocessing
- Don't index every file into one giant collection
- Don't trust AI answers without grounded = true check
- Don't retrieve too many chunks — 3–7 is optimal
- Don't forget to delete old chunks when updating documents
- Don't use temperature 0.7+ for grounded generation
- Don't expose the RAG API without authentication
- Don't index confidential documents into shared collections
- Don't skip quality testing before deploying to employees
Coming next
Part 06 — API Integration: Connect AI to Your .NET ERP System
Build a complete AI integration layer for your ASP.NET Core MVC ERP — AI service architecture, streaming responses in Razor views, real-time report explanation buttons, invoice processing pipeline, department-specific AI endpoints, middleware for audit logging, and a complete working ERP module with AI-powered insights.
#RAG#ChromaDB#Embeddings#VectorDatabase#Ollama#LocalAI#PrivateAI#NomicEmbed#SemanticSearch#LangChain#RetrievalAugmentedGeneration#FastAPI#CSharpAI#ASPNetCore#ERPIntegration#BengaliAI#DocumentAI#KnowledgeBase#FreeLearning365#BangladeshTech#FAISS#Qdrant#PyMuPDF#PythonAI#VectorSearch#AIForBusiness
0 Comments
thanks for your comments!