Build a Complete RAG Pipeline with ChromaDB, Ollama & Python: Local AI Knowledge Base Guide | FreeLearning365 [ PART 05 ]

 

Build a Complete RAG Pipeline with ChromaDB, Ollama & Python: Local AI Knowledge Base Guide | FreeLearning365 [ PART 05 ]


Part 05 of 15RAG PipelineChromaDBEmbeddingsC# IntegrationFreeLearning365

From Zero to Private AI System — Complete Series

RAG — Teach AI Your Company's Own Data: Complete Pipeline Guide

Your AI knows everything the internet taught it — and nothing about your company. It does not know your return policy, your customer contracts, your product catalog, your HR rules, or your 3-year sales history. RAG (Retrieval Augmented Generation) is the technology that fixes this. This part builds a complete RAG pipeline from scratch — embeddings, ChromaDB vector store, intelligent chunking, semantic search, a full Python RAG service, Bengali document support, and a production-ready C# ASP.NET Core integration that lets your ERP query your company knowledge base.

By @FreeLearning365Part 05 — RAG PipelineRead time: ~50 minComplete Python RAG serviceC# ASP.NET Core integration

"An employee asks your AI: 'What is our return policy for damaged goods?' The AI confidently answers: 'Typically companies allow 30 days for returns...' — which is wrong. Your actual policy allows 7 days for visible damage and 14 days for hidden defects. The AI is not lying — it simply has no access to your policy document. RAG gives it that access. After this part, when an employee asks about your return policy, the AI will read the actual document you wrote, quote the exact clause, and answer correctly every single time."

7
pipeline stages
4
vector DBs covered
100%
local — no cloud
Bengali
document support
What this post covers
  • What RAG is and why it matters
  • The 7-stage RAG pipeline explained
  • What embeddings are — plain language
  • Embedding model selection & setup
  • Document chunking — strategies & code
  • ChromaDB setup and configuration
  • Alternative vector databases compared
  • Indexing company documents — full code
  • Semantic search implementation
  • Building the complete RAG query engine
  • Bengali document RAG support
  • ERP data export → RAG pipeline
  • Multi-collection namespace strategy
  • FastAPI RAG microservice — complete code
  • C# ASP.NET Core RAG integration
  • Real company document test scenarios
  • RAG evaluation & quality measurement
  • Troubleshooting — 8 common failures
  • Do's, Don'ts, Limitations, SEO & banner

Section 1 — What RAG is and why it is the most important AI capability for business

Every LLM you have run so far — LLaMA, Mistral, Qwen — was trained on massive datasets of internet text, books, and code. That training ended months or years ago. The model knows nothing about what happened after that date, and it knows absolutely nothing about your specific company.

RAG (Retrieval Augmented Generation) solves this by adding a retrieval step before generation. Instead of the AI guessing from training memory, it first searches your document store, retrieves the most relevant passages, and uses those passages as context when generating its answer. The result is an AI that speaks from your actual documents rather than from internet averages.

Before RAG vs After RAG — the exact same question, completely different results

Without RAG — AI guesses from training memory

User: What is Dhaka Traders Ltd.'s payment term for credit customers? AI (without RAG): "Most wholesale companies offer 30 to 60 day payment terms for credit customers. Standard industry practice is NET-30 for established accounts, with potential extension to NET-60 for high-volume buyers." # This answer sounds reasonable but is WRONG for your company. # Your actual policy is NET-45 with a 2% discount for payment within 10 days. # The AI invented a plausible answer using general industry knowledge.

With RAG — AI reads your actual policy document

# RAG retrieves this chunk from your credit_policy.pdf: Retrieved chunk (similarity: 0.94): "Section 3.2 — Credit Customer Payment Terms: Standard payment terms for approved credit accounts are NET-45 days from invoice date. A 2% early payment discount is available for payments received within 10 days of invoice date (2/10 NET-45). Accounts exceeding 45 days will incur a 1.5% monthly late charge. Credit limits are reviewed annually." User: What is Dhaka Traders Ltd.'s payment term for credit customers? AI (with RAG): "According to your credit policy, standard payment terms for approved credit accounts are NET-45 days from invoice date. Additionally, customers can earn a 2% discount if they pay within 10 days (2/10 NET-45). Late payments beyond 45 days attract a 1.5% monthly charge, and credit limits are reviewed each year." # 100% accurate. Directly sourced from your actual document. # AI cites the retrieved content rather than guessing.

Section 2 — The 7-stage RAG pipeline: Every step explained

RAG is not a single operation — it is a pipeline of seven distinct stages. Understanding each stage is essential because failures in any stage cascade into poor final answers. Here is the complete pipeline with what happens at each step.

1
Document ingestion — load your source documents
Load raw documents from disk: PDFs, Word files, text files, CSV exports from your ERP. Extract text while preserving structure. Handle Bengali Unicode text. This is where your company knowledge enters the pipeline.
2
Text chunking — split documents into searchable pieces
Split long documents into overlapping chunks of 500–1500 characters. Overlap of 10–15% ensures no information is lost at chunk boundaries. Each chunk must be self-contained enough to answer a question on its own.
3
Embedding generation — convert text to vectors
Send each chunk to the embedding model (nomic-embed-text via Ollama). The model returns a 768-dimensional vector — a list of numbers that mathematically represents the semantic meaning of that chunk. Similar meaning → similar vectors.
4
Vector storage — index chunks in ChromaDB
Store each chunk's text, its embedding vector, and metadata (source file, page number, section name, date) in ChromaDB. ChromaDB persists this to disk so you never re-embed unless documents change.
5
Query embedding — convert the user's question to a vector
When a user asks a question, embed that question using the same embedding model. The resulting vector represents the semantic meaning of the question — what the user is actually asking about, not just the exact words used.
6
Semantic search — find the most relevant chunks
ChromaDB computes cosine similarity between the query vector and all stored chunk vectors. Returns the top-K most similar chunks (typically 3–7). This retrieval works on meaning, not keywords — it finds "payment terms" when the user asks "how long do I have to pay?"
7
Augmented generation — AI answers using retrieved context
Construct a prompt: "Given the following context from company documents: [retrieved chunks]. Answer the user's question: [question]. Use only the provided context. If the answer is not in the context, say so." Send to Ollama. The AI generates an answer grounded in your actual documents.

Section 3 — Embeddings: What they are and why cosine similarity works

An embedding is the translation of text into a point in high-dimensional space. The embedding model learns to place similar-meaning text near each other in this space, regardless of the exact words used. This is why RAG can find "payment terms" when someone asks "how long before an invoice is due" — the meanings are geometrically close even though no words overlap.

Embedding similarity demo — same query, different chunks, cosine similarity scores

Query: "How long do credit customers have to pay?"

"NET-45 days from invoice date for approved credit accounts. 2% discount for payment within 10 days."
[0.023, -0.041, 0.187...]
0.94 match
"Outstanding invoices beyond 45 days will incur a 1.5% monthly late payment charge."
[0.018, -0.037, 0.162...]
0.87 match
"All deliveries must be accompanied by a signed delivery note and vehicle number."
[-0.091, 0.224, -0.043...]
0.12 match
"New customer credit applications require 2 trade references and 6 months bank statement."
[0.011, -0.029, 0.143...]
0.61 match

The first two chunks score highly because their meaning is semantically close to the question — even though the question uses "how long" while the document uses "days" and "NET-45". The delivery note chunk scores near zero because it is about a completely different topic. This meaning-based retrieval is what makes RAG vastly superior to keyword search.


Section 4 — Embedding model setup: nomic-embed-text via Ollama

Install embedding model and verify it works
# Pull the embedding model (274MB — very small) ollama pull nomic-embed-text # Verify it generates embeddings correctly: curl http://localhost:11434/api/embeddings \ -H "Content-Type: application/json" \ -d '{ "model": "nomic-embed-text", "prompt": "What is our return policy for damaged goods?" }' # Expected response structure: # { # "embedding": [ # 0.023064255, -0.014368402, -0.009838253, # 0.035714887, -0.024517306, 0.059927620, # ... (768 total float values) # ] # } # The embedding vector has 768 dimensions for nomic-embed-text # Alternative embedding models via Ollama: ollama pull mxbai-embed-large # 1024 dimensions — higher quality, larger ollama pull all-minilm # 384 dimensions — fastest, smallest # Recommendation: Use nomic-embed-text as default # Switch to mxbai-embed-large only if retrieval quality is insufficient
Python embedding helper — reusable throughout the pipeline
#!/usr/bin/env python3 """ File: rag/embeddings.py Embedding utilities for the company RAG pipeline. """ import requests from typing import List import numpy as np OLLAMA_URL = "http://localhost:11434" EMBED_MODEL = "nomic-embed-text" EMBED_DIM = 768 def get_embedding(text: str) -> List[float]: """Generate embedding for a single text string.""" response = requests.post( f"{OLLAMA_URL}/api/embeddings", json={"model": EMBED_MODEL, "prompt": text}, timeout=30 ) response.raise_for_status() return response.json()["embedding"] def get_embeddings_batch(texts: List[str]) -> List[List[float]]: """Generate embeddings for multiple texts efficiently.""" return [get_embedding(t) for t in texts] def cosine_similarity(vec_a: List[float], vec_b: List[float]) -> float: """Calculate cosine similarity between two embedding vectors.""" a = np.array(vec_a) b = np.array(vec_b) return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))) # Test the embedding utility: if __name__ == "__main__": q = get_embedding("payment terms for credit customers") d1 = get_embedding("NET-45 days from invoice date for credit accounts") d2 = get_embedding("delivery must be accompanied by signed note") print(f"Query vs payment chunk: {cosine_similarity(q, d1):.4f}") # Query vs payment chunk: 0.8934 ← High similarity ✓ print(f"Query vs delivery chunk: {cosine_similarity(q, d2):.4f}") # Query vs delivery chunk: 0.1247 ← Low similarity ✓

Section 5 — Document chunking: The most critical and most misunderstood step

Chunking is where most RAG implementations fail silently. Chunks that are too small lose context. Chunks that are too large dilute relevance. Chunks with no overlap lose information at boundaries. Getting chunking right is 50% of RAG quality.

The three chunking strategies with trade-offs

Fixed-size chunking with overlap — recommended default

Source text: "Section 3.2 — Credit Customer Payment Terms: Standard payment terms for approved credit accounts are NET-45 days from invoice date. A 2% early payment discount is available for payments received within 10 days of invoice date (2/10 NET-45). Accounts exceeding 45 days will incur a 1.5% monthly late charge..."

Chunk 1 (chars 0–500)
Section 3.2 — Credit Customer Payment Terms: Standard payment terms for approved credit accounts are NET-45 days from invoice date. A 2% early payment discount is available for payments received within 10 days of invoice date (2/10 NET-45). Accounts exceeding 45 days will incur a 1.5% monthly late charge. Credit limits are reviewed annually by the Finance Manager.
Overlap zone (chars 450–550) — shared between chunk 1 and chunk 2
Credit limits are reviewed annually by the Finance Manager. New credit applications require...
Chunk 2 (chars 450–950) — starts in overlap zone
Credit limits are reviewed annually by the Finance Manager. New credit applications require 2 trade references and 6 months of bank statements. Credit decisions are made within 5 business days. The maximum initial credit limit for new accounts is Tk. 5,00,000...
Complete chunking implementation — three strategies
#!/usr/bin/env python3 """ File: rag/chunker.py Document chunking strategies for the RAG pipeline. """ import re from typing import List, Dict, Any from dataclasses import dataclass @dataclass class Chunk: text: str chunk_id: str source_file: str chunk_index: int char_start: int char_end: int metadata: Dict[str, Any] class DocumentChunker: def __init__(self, chunk_size: int = 1000, overlap: int = 150, min_size: int = 100): self.chunk_size = chunk_size self.overlap = overlap self.min_size = min_size # ── Strategy 1: Fixed-size sliding window (recommended default) ────── def chunk_fixed(self, text: str, source: str) -> List[Chunk]: """Split by character count with overlap. Best general-purpose strategy.""" chunks = [] start = 0 idx = 0 while start < len(text): end = min(start + self.chunk_size, len(text)) chunk_text = text[start:end].strip() if len(chunk_text) >= self.min_size: chunks.append(Chunk( text = chunk_text, chunk_id = f"{source}_chunk_{idx:04d}", source_file = source, chunk_index = idx, char_start = start, char_end = end, metadata = {"strategy": "fixed", "source": source} )) idx += 1 start += self.chunk_size - self.overlap # Slide forward with overlap return chunks # ── Strategy 2: Sentence-aware chunking (better for Q&A) ───────────── def chunk_sentences(self, text: str, source: str) -> List[Chunk]: """Group sentences into chunks. Preserves sentence boundaries.""" # Split on sentence boundaries (handles English and Bengali) sentences = re.split(r'(?<=[।.!?])\s+', text) chunks, current, start_char, idx = [], [], 0, 0 for sent in sentences: current.append(sent) current_text = " ".join(current) if len(current_text) >= self.chunk_size: chunks.append(Chunk( text = current_text.strip(), chunk_id = f"{source}_sent_{idx:04d}", source_file = source, chunk_index = idx, char_start = start_char, char_end = start_char + len(current_text), metadata = {"strategy": "sentence", "source": source} )) # Keep last 2 sentences for overlap context current = current[-2:] if len(current) > 2 else [] start_char += len(current_text) idx += 1 if current and len(" ".join(current)) >= self.min_size: chunks.append(Chunk( text = " ".join(current).strip(), chunk_id = f"{source}_sent_{idx:04d}", source_file = source, chunk_index = idx, char_start = start_char, char_end = len(text), metadata = {"strategy": "sentence", "source": source} )) return chunks # ── Strategy 3: Section-aware chunking (best for policy documents) ─── def chunk_sections(self, text: str, source: str) -> List[Chunk]: """Split on section headers. Best for structured policy/HR documents.""" # Detect headers: "Section X", numbered headings, ALL CAPS lines section_pattern = re.compile( r'(?m)^(?:Section\s+[\d.]+|[\d]+\.|[A-Z\s]{5,}:?)\s*$' ) splits = list(section_pattern.finditer(text)) chunks = [] for i, match in enumerate(splits): section_start = match.start() section_end = splits[i+1].start() if i+1 < len(splits) else len(text) section_text = text[section_start:section_end].strip() section_name = match.group().strip() if len(section_text) >= self.min_size: # If section is very long, sub-chunk it if len(section_text) > self.chunk_size * 2: sub_chunks = self.chunk_fixed(section_text, source) for sc in sub_chunks: sc.metadata["section"] = section_name chunks.append(sc) else: chunks.append(Chunk( text = section_text, chunk_id = f"{source}_sec_{i:04d}", source_file = source, chunk_index = i, char_start = section_start, char_end = section_end, metadata = {"strategy": "section", "section": section_name, "source": source} )) return chunks # Chunking strategy selection guide: # Policy documents (HR, Finance): chunk_sections → preserves rule structure # Long narrative reports: chunk_sentences → preserves readability # Product catalogs, CSVs, lists: chunk_fixed → consistent size, fast # Mixed / unknown: chunk_fixed → safe default

Section 6 — ChromaDB: Setup, configuration, and collection design

ChromaDB is the recommended vector database for starting your company RAG deployment. It is free, open source, runs entirely locally as a Python process or persistent server, requires no infrastructure setup beyond pip install, and has an excellent Python API. It stores vector embeddings alongside the original text and arbitrary metadata, and queries return both the nearest chunks and their similarity scores.

ChromaDB
Recommended — start here
Pure Python, zero infrastructure, persistent on disk. Can run embedded in your Python process or as a standalone HTTP server. Perfect for company deployments up to ~500K documents.
+ Easiest setup (pip install chromadb), built-in persistence, great Python API, metadata filtering, Docker-deployable server mode
- Not designed for billion-scale; RAM grows with collection size
Qdrant
Production scale
Dedicated vector database with REST API, Docker deployment, and excellent filtering. Better than ChromaDB for large collections and concurrent users. More setup required.
+ REST API, Docker-native, advanced filtering, payload indexing, production-grade performance
- Separate service to manage, more complex than ChromaDB for getting started
FAISS
Maximum speed
Facebook's library — fastest similarity search, used at massive scale. No built-in persistence (must save/load manually). Best when you need maximum retrieval speed and control all other aspects yourself.
+ Blazing fast retrieval, GPU acceleration support, battle-tested at scale
- No metadata filtering, no persistence, more manual work
Milvus
Enterprise scale
Enterprise-grade distributed vector database. Handles billions of vectors. Kubernetes-deployable. Overkill for most companies but the right choice if you plan to index millions of documents.
+ Billion-scale, distributed, enterprise features, Kubernetes native
- Significant infrastructure investment; start with ChromaDB and migrate later
ChromaDB installation and server setup
# Install ChromaDB pip install chromadb --break-system-packages # Option A: Embedded mode (Python process — simplest) import chromadb client = chromadb.PersistentClient(path="/data/company-rag/chromadb") # Data persists to disk at the specified path # Option B: HTTP server mode (recommended for production — shared by multiple apps) # Start the server (runs on port 8000): chroma run --path /data/company-rag/chromadb --host 0.0.0.0 --port 8000 # Connect from Python (embedded or server): import chromadb # Embedded: client = chromadb.PersistentClient(path="/data/company-rag/chromadb") # HTTP server: client = chromadb.HttpClient(host="localhost", port=8000) # Docker Compose deployment (recommended for production): # docker-compose.yml entry: chromadb: image: chromadb/chroma:latest container_name: company-chromadb restart: always ports: - "8000:8000" volumes: - ./data/chromadb:/chroma/chroma environment: - CHROMA_SERVER_AUTH_PROVIDER=chromadb.auth.token.TokenAuthServerProvider - CHROMA_SERVER_AUTH_CREDENTIALS=your-secret-token-here - CHROMA_SERVER_AUTH_TOKEN_TRANSPORT_HEADER=X-Chroma-Token

Collection namespace strategy — organizing your company knowledge

ChromaDB collection design — one collection per department/domain
import chromadb client = chromadb.PersistentClient(path="/data/company-rag/chromadb") # Create separate collections for different knowledge domains # This allows department-specific RAG without cross-contamination COLLECTIONS = { "hr_policies": { "description": "HR manuals, leave policy, code of conduct, job descriptions", "metadata": {"department": "hr", "sensitivity": "internal"} }, "finance_policies": { "description": "Credit policy, payment terms, expense rules, VAT guidelines", "metadata": {"department": "finance", "sensitivity": "confidential"} }, "product_catalog": { "description": "Product specs, pricing tiers, availability, lead times", "metadata": {"department": "sales", "sensitivity": "internal"} }, "customer_contracts": { "description": "Customer agreements, SLAs, special terms per account", "metadata": {"department": "sales", "sensitivity": "confidential"} }, "operations_sops": { "description": "Warehouse SOPs, inventory procedures, safety guidelines", "metadata": {"department": "operations", "sensitivity": "internal"} }, "erp_data_exports": { "description": "Monthly sales data, customer history, inventory snapshots", "metadata": {"department": "all", "sensitivity": "confidential"} }, } def get_or_create_collection(name: str) -> chromadb.Collection: """Get existing collection or create it if it doesn't exist.""" config = COLLECTIONS.get(name, {}) return client.get_or_create_collection( name = name, metadata = config.get("metadata", {}) ) # Create all collections: for name in COLLECTIONS: col = get_or_create_collection(name) print(f"Collection '{name}': {col.count()} documents")

Section 7 — Indexing company documents: Complete ingestion pipeline

This is the core of the RAG system — loading your actual company documents, processing them through the chunker, generating embeddings, and storing everything in ChromaDB. The code below handles PDFs, Word documents, plain text files, and CSV exports from your ERP.

Install document processing dependencies
# All document processing dependencies pip install \ pymupdf \ # PDF text extraction (PyMuPDF) python-docx \ # Word document processing chromadb \ # Vector database requests \ # Ollama API calls numpy \ # Vector math pandas \ # CSV/Excel data processing tqdm \ # Progress bars for long indexing jobs --break-system-packages
File: rag/document_loader.py — load any document type
#!/usr/bin/env python3 """Load and extract text from PDF, DOCX, TXT, and CSV files.""" import fitz # PyMuPDF import docx import pandas as pd from pathlib import Path from typing import Dict, Any class DocumentLoader: def load(self, file_path: str) -> Dict[str, Any]: """ Load a document and return its text content with metadata. Returns: {text, filename, file_type, page_count, char_count} """ path = Path(file_path) ext = path.suffix.lower() loaders = { ".pdf": self._load_pdf, ".docx": self._load_docx, ".doc": self._load_docx, ".txt": self._load_text, ".md": self._load_text, ".csv": self._load_csv, ".xlsx": self._load_excel, } loader = loaders.get(ext) if not loader: raise ValueError(f"Unsupported file type: {ext}") result = loader(file_path) result["filename"] = path.name result["file_type"] = ext result["char_count"] = len(result.get("text", "")) return result def _load_pdf(self, path: str) -> Dict: doc = fitz.open(path) pages = [] for page_num, page in enumerate(doc): text = page.get_text("text") if text.strip(): pages.append(f"[Page {page_num + 1}]\n{text}") doc.close() return {"text": "\n\n".join(pages), "page_count": len(doc)} def _load_docx(self, path: str) -> Dict: document = docx.Document(path) full_text = [] for para in document.paragraphs: if para.text.strip(): prefix = "## " if para.style.name.startswith("Heading") else "" full_text.append(f"{prefix}{para.text}") return {"text": "\n".join(full_text), "page_count": 1} def _load_text(self, path: str) -> Dict: text = Path(path).read_text(encoding="utf-8") return {"text": text, "page_count": 1} def _load_csv(self, path: str) -> Dict: df = pd.read_csv(path, encoding="utf-8-sig") rows = [] for _, row in df.iterrows(): # Convert each row to a natural language sentence for embedding row_text = ". ".join([f"{col}: {val}" for col, val in row.items() if pd.notna(val)]) rows.append(row_text) return {"text": "\n".join(rows), "page_count": 1, "rows": len(df)} def _load_excel(self, path: str) -> Dict: df = pd.read_excel(path) return self._load_csv.__wrapped__(self, path) if hasattr(self._load_csv, '__wrapped__') \ else {"text": df.to_string(index=False), "page_count": 1}
File: rag/indexer.py — complete document indexing pipeline
#!/usr/bin/env python3 """ File: rag/indexer.py Index company documents into ChromaDB for RAG retrieval. """ import chromadb, time, hashlib from pathlib import Path from typing import List from tqdm import tqdm from embeddings import get_embedding from chunker import DocumentChunker, Chunk from document_loader import DocumentLoader class RAGIndexer: def __init__(self, chroma_path: str = "/data/company-rag/chromadb"): self.client = chromadb.PersistentClient(path=chroma_path) self.loader = DocumentLoader() self.chunker = DocumentChunker(chunk_size=1000, overlap=150) def _file_hash(self, path: str) -> str: """MD5 hash of file — detect unchanged files to avoid re-indexing.""" h = hashlib.md5() h.update(Path(path).read_bytes()) return h.hexdigest()[:12] def index_file(self, file_path: str, collection_name: str, extra_metadata: dict = None) -> int: """ Index a single file into a ChromaDB collection. Returns number of chunks indexed. """ collection = self.client.get_or_create_collection(collection_name) file_hash = self._file_hash(file_path) filename = Path(file_path).name # Skip if already indexed (same file hash) existing = collection.get(where={"file_hash": {"$eq": file_hash}}) if existing["ids"]: print(f" SKIPPED (unchanged): {filename} — {len(existing['ids'])} chunks already indexed") return 0 # Load and chunk the document print(f" Loading: {filename}") doc_data = self.loader.load(file_path) text = doc_data["text"] # Choose chunking strategy based on file type ext = Path(file_path).suffix.lower() if ext in [".csv", ".xlsx"]: chunks = self.chunker.chunk_fixed(text, filename) elif ext in [".pdf", ".docx"]: chunks = self.chunker.chunk_sections(text, filename) else: chunks = self.chunker.chunk_sentences(text, filename) print(f" Chunked: {len(chunks)} chunks from {doc_data['char_count']:,} chars") # Generate embeddings and index in batches of 50 batch_size = 50 indexed = 0 for i in tqdm(range(0, len(chunks), batch_size), desc=" Embedding"): batch = chunks[i:i + batch_size] texts = [c.text for c in batch] embeddings = [get_embedding(t) for t in texts] ids = [c.chunk_id for c in batch] metadatas = [{ "source_file": c.source_file, "chunk_index": c.chunk_index, "char_start": c.char_start, "char_end": c.char_end, "file_hash": file_hash, "file_type": ext, **(extra_metadata or {}), **c.metadata } for c in batch] collection.add( ids = ids, embeddings = embeddings, documents = texts, metadatas = metadatas ) indexed += len(batch) print(f" Indexed: {indexed} chunks → collection '{collection_name}'") return indexed def index_directory(self, directory: str, collection_name: str, extensions: List[str] = None) -> int: """Index all matching files in a directory.""" extensions = extensions or [".pdf", ".docx", ".txt", ".csv"] files = [f for ext in extensions for f in Path(directory).glob(f"**/*{ext}")] total = 0 print(f"\nIndexing {len(files)} files into '{collection_name}'...") for file_path in files: total += self.index_file(str(file_path), collection_name) print(f"\nTotal chunks indexed: {total}") return total # ── Usage example — index your company documents ─────────────────────── if __name__ == "__main__": indexer = RAGIndexer() # Index HR policy documents indexer.index_directory( directory = "/company-docs/hr", collection_name = "hr_policies", extensions = [".pdf", ".docx"] ) # Index finance documents indexer.index_directory( directory = "/company-docs/finance", collection_name = "finance_policies" ) # Index ERP monthly export with extra metadata indexer.index_file( file_path = "/erp-exports/sales_nov_2024.csv", collection_name = "erp_data_exports", extra_metadata = {"period": "2024-11", "data_type": "sales"} )

Section 8 — The complete RAG query engine

File: rag/query_engine.py — semantic search + augmented generation
#!/usr/bin/env python3 """ File: rag/query_engine.py RAG query engine: retrieve relevant chunks then generate grounded answers. """ import chromadb, requests from typing import List, Dict, Optional from dataclasses import dataclass from embeddings import get_embedding OLLAMA_URL = "http://localhost:11434" @dataclass class RetrievedChunk: text: str source: str score: float metadata: Dict @dataclass class RAGResponse: answer: str retrieved_chunks: List[RetrievedChunk] model_used: str sources: List[str] grounded: bool # True if answer came from retrieved context class RAGQueryEngine: def __init__(self, chroma_path: str = "/data/company-rag/chromadb", llm_model: str = "llama3.1:8b", top_k: int = 5, min_score: float = 0.5): self.client = chromadb.PersistentClient(path=chroma_path) self.llm_model = llm_model self.top_k = top_k self.min_score = min_score def retrieve(self, query: str, collection_name: str, filters: Optional[Dict] = None) -> List[RetrievedChunk]: """Embed the query and find the top-K most similar chunks.""" collection = self.client.get_or_create_collection(collection_name) if collection.count() == 0: return [] query_embedding = get_embedding(query) query_params = { "query_embeddings": [query_embedding], "n_results": min(self.top_k, collection.count()), "include": ["documents", "metadatas", "distances"] } if filters: query_params["where"] = filters results = collection.query(**query_params) chunks = [] for doc, meta, dist in zip( results["documents"][0], results["metadatas"][0], results["distances"][0] ): # ChromaDB returns distance (lower=better); convert to similarity similarity = 1.0 - dist if similarity >= self.min_score: chunks.append(RetrievedChunk( text = doc, source = meta.get("source_file", "unknown"), score = round(similarity, 4), metadata = meta )) return sorted(chunks, key=lambda c: c.score, reverse=True) def generate(self, query: str, chunks: List[RetrievedChunk], language: str = "auto") -> str: """Generate an answer using retrieved chunks as context.""" if not chunks: return ("I could not find relevant information in the company documents " "to answer this question. Please check with the relevant department.") context_parts = [] for i, chunk in enumerate(chunks, 1): context_parts.append( f"[Source {i}: {chunk.source} | Relevance: {chunk.score:.2f}]\n{chunk.text}" ) context = "\n\n---\n\n".join(context_parts) lang_instruction = { "bn": "Respond in formal Bengali (আনুষ্ঠানিক বাংলা).", "en": "Respond in professional English.", "auto": "Respond in the same language the question was asked in." }.get(language, "Respond in the same language the question was asked in.") prompt = f"""You are a company knowledge assistant for Dhaka Traders Ltd. Answer the user's question using ONLY the context provided below. {lang_instruction} Rules: - If the answer is clearly in the context: answer directly and precisely - If the context partially answers: answer what you can, state what is missing - If the answer is NOT in the context: say "This information is not available in the provided documents" - Never invent information not present in the context - When relevant, mention which source document the answer comes from CONTEXT FROM COMPANY DOCUMENTS: {context} USER QUESTION: {query} ANSWER:""" response = requests.post( f"{OLLAMA_URL}/api/chat", json={ "model": self.llm_model, "stream": False, "options": {"temperature": 0.2, "num_ctx": 8192}, "messages": [{"role": "user", "content": prompt}] }, timeout=120 ) response.raise_for_status() return response.json()["message"]["content"] def query(self, question: str, collection_name: str, language: str = "auto", filters: Optional[Dict] = None) -> RAGResponse: """Complete RAG pipeline: retrieve + generate.""" chunks = self.retrieve(question, collection_name, filters) answer = self.generate(question, chunks, language) sources = list({c.source for c in chunks}) return RAGResponse( answer = answer, retrieved_chunks = chunks, model_used = self.llm_model, sources = sources, grounded = len(chunks) > 0 )

Real RAG test — company policy document queries

Testing the RAG engine with real company document queries
# Test the complete pipeline with real company scenarios engine = RAGQueryEngine( chroma_path = "/data/company-rag/chromadb", llm_model = "llama3.1:8b", top_k = 5, min_score = 0.45 ) # ── Test 1: Finance policy query ─────────────────────────────────────── response = engine.query( question = "What is the payment term for credit customers and what happens if they pay late?", collection_name = "finance_policies" ) print("=== Test 1: Finance Policy ===") print(f"Answer:\n{response.answer}") print(f"\nRetrieved {len(response.retrieved_chunks)} chunks:") for c in response.retrieved_chunks: print(f" [{c.score:.3f}] {c.source} — {c.text[:80]}...") """ === Test 1: Finance Policy === Answer: According to the credit policy document, approved credit accounts are on NET-45 day terms from invoice date. Customers can receive a 2% discount if payment is made within 10 days (2/10 NET-45 terms). For late payments — any amount unpaid beyond 45 days incurs a 1.5% monthly late charge on the outstanding balance. Additionally, accounts may be placed on credit hold after 60 days, requiring Finance Manager approval to resume. Source: credit_policy.pdf (Sections 3.2 and 3.4) Retrieved 3 chunks: [0.941] credit_policy.pdf — NET-45 days from invoice date for approved credit accounts... [0.872] credit_policy.pdf — Outstanding invoices beyond 45 days will incur a 1.5%... [0.634] credit_policy.pdf — Accounts on credit hold require Finance Manager sign-off... """ # ── Test 2: Bengali language query ──────────────────────────────────── response_bn = engine.query( question = "আমাদের ছুটির নীতি কী? বার্ষিক কত দিন ছুটি পাওয়া যায়?", collection_name = "hr_policies", language = "bn" ) print("\n=== Test 2: Bengali HR Policy Query ===") print(f"Answer:\n{response_bn.answer}") """ === Test 2: Bengali HR Policy Query === Answer: আমাদের ছুটির নীতি অনুযায়ী (HR Policy Manual, Section 5): বার্ষিক ছুটি: প্রতিটি নিশ্চিত কর্মচারী বছরে ১৫ দিন বার্ষিক ছুটি পাওয়ার অধিকারী। ছুটি ভোগের জন্য কমপক্ষে ৭ দিন আগে আবেদন করতে হবে। অসুস্থতার ছুটি: বছরে সর্বোচ্চ ১০ দিন অসুস্থতার ছুটি অনুমোদিত। ৩ দিনের বেশি অসুস্থতার জন্য ডাক্তারের সার্টিফিকেট বাধ্যতামূলক। নৈমিত্তিক ছুটি: বছরে ৩ দিন নৈমিত্তিক ছুটি পাওয়া যাবে। সূত্র: HR Policy Manual — Section 5: Leave Entitlement """ # ── Test 3: ERP data query — sales history ──────────────────────────── response_erp = engine.query( question = "Which customers had the highest orders in November 2024?", collection_name = "erp_data_exports", filters = {"period": {"$eq": "2024-11"}} ) print("\n=== Test 3: ERP Sales Data Query ===") print(f"Answer:\n{response_erp.answer}") """ === Test 3: ERP Sales Data Query === Answer: Based on the November 2024 sales export data, the top customers by order value were: 1. Hossain Brothers — Tk. 6,90,000 (12 orders) 2. Karim Textiles — Tk. 5,12,000 (9 orders) 3. Narayan Exports — Tk. 4,48,000 (7 orders) The total sales for November 2024 across all customers were Tk. 20,95,000. Note: Rahman Garments and Dhaka Fabrics both showed significant declines versus October 2024, with drops of 39% and 58% respectively. Source: sales_nov_2024.csv """

Section 9 — FastAPI RAG microservice: Production-ready HTTP API

File: rag/api.py — complete FastAPI service with authentication
#!/usr/bin/env python3 """ File: rag/api.py FastAPI microservice that exposes the RAG pipeline as a REST API. Called by the C# ERP integration in Section 10. Run: uvicorn api:app --host 0.0.0.0 --port 8080 """ from fastapi import FastAPI, HTTPException, Depends, Header from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import List, Optional, Dict from query_engine import RAGQueryEngine import os, logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("rag-api") app = FastAPI(title="Company RAG API", version="1.0.0") engine = RAGQueryEngine() API_TOKEN = os.getenv("RAG_API_TOKEN", "change-this-secret-token") app.add_middleware( CORSMiddleware, allow_origins = ["http://localhost:5000", "http://192.168.1.100:5000"], allow_methods = ["POST", "GET"], allow_headers = ["*"], ) def verify_token(x_api_token: str = Header(...)): if x_api_token != API_TOKEN: raise HTTPException(status_code=401, detail="Invalid API token") class RAGQueryRequest(BaseModel): question: str collection: str = "hr_policies" language: str = "auto" top_k: int = 5 min_score: float = 0.45 filters: Optional[Dict] = None class ChunkResponse(BaseModel): text: str source: str score: float class RAGQueryResponse(BaseModel): answer: str sources: List[str] chunks: List[ChunkResponse] grounded: bool @app.post("/api/rag/query", response_model=RAGQueryResponse, dependencies=[Depends(verify_token)]) async def rag_query(request: RAGQueryRequest): logger.info(f"RAG query: collection={request.collection} q={request.question[:60]}") engine.top_k = request.top_k engine.min_score = request.min_score result = engine.query( question = request.question, collection_name = request.collection, language = request.language, filters = request.filters ) return RAGQueryResponse( answer = result.answer, sources = result.sources, grounded = result.grounded, chunks = [ ChunkResponse(text=c.text[:300], source=c.source, score=c.score) for c in result.retrieved_chunks ] ) @app.get("/api/rag/collections", dependencies=[Depends(verify_token)]) async def list_collections(): return {"collections": [c.name for c in engine.client.list_collections()]} @app.get("/health") async def health(): return {"status": "ok", "model": engine.llm_model} # Start command: # RAG_API_TOKEN=your-secret uvicorn api:app --host 0.0.0.0 --port 8080 --workers 2

Section 10 — C# ASP.NET Core integration: Query RAG from your ERP

File: Services/RAGService.cs — complete C# RAG client
using Microsoft.AspNetCore.Mvc; using System.Text.Json; using System.Text.Json.Serialization; namespace DhakaTraders.ERP.Services { /// <summary>Client for the Python RAG microservice.</summary> public interface IRAGService { Task<RAGQueryResult> QueryAsync(RAGQueryRequest request); Task<string> AskHRAsync(string question, string language = "auto"); Task<string> AskFinanceAsync(string question); Task<string> AskSalesDataAsync(string question, string? period = null); } public class RAGService : IRAGService { private readonly IHttpClientFactory _http; private readonly ILogger<RAGService> _log; private readonly RAGServiceOptions _opts; public RAGService(IHttpClientFactory http, ILogger<RAGService> log, IOptions<RAGServiceOptions> opts) { _http = http; _log = log; _opts = opts.Value; } public async Task<RAGQueryResult> QueryAsync(RAGQueryRequest request) { var client = _http.CreateClient("RAGClient"); client.DefaultRequestHeaders.Add("X-Api-Token", _opts.ApiToken); var json = JsonSerializer.Serialize(request, new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower }); var content = new StringContent(json, System.Text.Encoding.UTF8, "application/json"); var response = await client.PostAsync($"{_opts.BaseUrl}/api/rag/query", content); response.EnsureSuccessStatusCode(); var body = await response.Content.ReadAsStringAsync(); var result = JsonSerializer.Deserialize<RAGQueryResult>(body, new JsonSerializerOptions { PropertyNameCaseInsensitive = true }); _log.LogInformation("RAG query completed. Grounded: {G} Sources: {S}", result?.Grounded, string.Join(", ", result?.Sources ?? [])); return result ?? new RAGQueryResult(); } public async Task<string> AskHRAsync(string question, string language = "auto") { var result = await QueryAsync(new RAGQueryRequest { Question = question, Collection = "hr_policies", Language = language, TopK = 5, MinScore = 0.45f }); return result.Answer; } public async Task<string> AskFinanceAsync(string question) { var result = await QueryAsync(new RAGQueryRequest { Question = question, Collection = "finance_policies", TopK = 5, MinScore = 0.50f }); return result.Answer; } public async Task<string> AskSalesDataAsync(string question, string? period = null) { var filters = period != null ? new Dictionary<string, object> { ["period"] = period } : null; var result = await QueryAsync(new RAGQueryRequest { Question = question, Collection = "erp_data_exports", TopK = 7, MinScore = 0.40f, Filters = filters }); return result.Answer; } } public class RAGQueryRequest { public string Question { get; set; } = ""; public string Collection { get; set; } = "hr_policies"; public string Language { get; set; } = "auto"; public int TopK { get; set; } = 5; public float MinScore { get; set; } = 0.45f; public Dictionary<string, object>? Filters { get; set; } } public class RAGQueryResult { public string Answer { get; set; } = ""; public List<string> Sources { get; set; } = []; public bool Grounded { get; set; } } public class RAGServiceOptions { public string BaseUrl { get; set; } = "http://localhost:8080"; public string ApiToken { get; set; } = ""; } }
Program.cs — register RAG service in ASP.NET Core DI
// Program.cs additions: builder.Services.Configure<RAGServiceOptions>( builder.Configuration.GetSection("RAGService")); builder.Services.AddHttpClient("RAGClient", client => { client.Timeout = TimeSpan.FromSeconds(60); }); builder.Services.AddScoped<IRAGService, RAGService>(); // appsettings.json: { "RAGService": { "BaseUrl": "http://localhost:8080", "ApiToken": "your-secret-token-here" } } // Usage in any MVC controller: public class HRController : Controller { private readonly IRAGService _rag; public HRController(IRAGService rag) => _rag = rag; [HttpGet("hr/policy-answer")] public async Task<IActionResult> PolicyAnswer(string question, string lang = "auto") { var answer = await _rag.AskHRAsync(question, lang); return Json(new { answer }); } }

Section 11 — RAG troubleshooting: 8 common failures and exact fixes

Failure 1 — Retrieval returns wrong chunks (low relevance)
Cause: Chunks are too large (diluted meaning), too small (no context), or the embedding model is mismatched.

Fix: Try chunk_size=800 instead of 1000. Verify the same embedding model is used for both indexing and querying — mixing models breaks similarity scores entirely. Lower min_score from 0.5 to 0.4 if good chunks are being filtered out. Print retrieved chunks during development to verify they contain the expected content.
Failure 2 — AI gives correct-sounding but wrong answer despite good retrieval
Cause: The LLM is blending retrieved context with its training memory instead of staying grounded in the retrieved content.

Fix: Strengthen the constraint in your generation prompt: "Answer ONLY from the context. If you cannot find the answer in the context, say 'Not found in documents.' Do not use general knowledge." Lower temperature to 0.1. Use LLaMA 3.1 8B rather than smaller models — it follows grounding instructions more reliably.
Failure 3 — Bengali documents return poor results
Cause: nomic-embed-text has weaker multilingual support. Bengali semantic relationships are not as well represented in its vector space.

Fix: Switch embedding model to mxbai-embed-large (better multilingual). Pull with ollama pull mxbai-embed-large. Re-index all Bengali documents with the new model. Use Qwen2.5:7b as the generation model for Bengali RAG — its Bengali comprehension is significantly better than LLaMA 3.1 for grounded response generation.
Failure 4 — ChromaDB collection count returns 0 despite indexing
Cause: Using a different path in the query engine than in the indexer, or the PersistentClient path directory does not have write permissions.

Fix: Verify: python3 -c "import chromadb; c=chromadb.PersistentClient('/data/company-rag/chromadb'); print(c.list_collections())". Ensure the path is identical in both indexer and query engine. Check permissions: ls -la /data/company-rag/ — the process user must have read/write access.
Failure 5 — PDF text extraction returns empty or garbled text
Cause: The PDF is a scanned image (not text-based) or uses an unusual font encoding that PyMuPDF cannot decode.

Fix: Test: python3 -c "import fitz; doc=fitz.open('your.pdf'); print(doc[0].get_text())". If empty, it is a scanned PDF — run Tesseract OCR first: pip install pytesseract Pillow --break-system-packages then convert to text before indexing. Bengali PDF fonts can cause issues — try re-exporting the source document as UTF-8 text from Word.
Failure 6 — RAG response is slow (30+ seconds)
Cause: The generation model is running on CPU, or the context window is very large (too many retrieved chunks).

Fix: Reduce top_k from 5 to 3 — fewer chunks means shorter prompt means faster generation. Verify GPU is being used by Ollama during the generation step with ollama ps. The retrieval step (ChromaDB query) should take under 1 second — if it is slow, ChromaDB may be loading the collection from disk. Keep the engine instance alive (singleton pattern) so the collection stays warm in memory.
Failure 7 — Old/stale content returned after document update
Cause: The old chunks are still in ChromaDB after the source document was updated. The indexer's file hash detection prevents re-indexing if the hash changed, but old chunks remain.

Fix: Before re-indexing a changed file, delete the old chunks: collection.delete(where={"source_file": {"$eq": "old_policy.pdf"}}). Then re-index the updated file. Add this cleanup to your indexer's update workflow. For daily automated re-indexing, use the file_hash detection — only re-index files whose hash has changed.
Failure 8 — C# HttpClient call to RAG API times out
Cause: RAG generation is slow (model loading or CPU inference) and the default HttpClient timeout (100 seconds) is exceeded.

Fix: Set a longer timeout in Program.cs: client.Timeout = TimeSpan.FromSeconds(120);. Also check the FastAPI server is running: curl http://localhost:8080/health. For production, ensure the generation model is GPU-accelerated — CPU inference may exceed any reasonable timeout for complex queries with large context.

Section 12 — Do's, Don'ts, and limitations

Do — RAG best practices
  • Use the same embedding model for index and query
  • Test retrieval quality before tuning generation
  • Use section-aware chunking for policy documents
  • Add metadata (department, date, sensitivity) to all chunks
  • Use separate collections per knowledge domain
  • Store file hash to avoid unnecessary re-indexing
  • Set min_score threshold — reject low-quality retrievals
  • Print retrieved chunks during development to verify quality
  • Use Bengali-capable models (Qwen2.5) for Bengali RAG
  • Re-index when source documents change
  • Use metadata filters to scope queries by department
  • Log all RAG queries for quality monitoring
Don't — RAG mistakes to avoid
  • Don't mix embedding models between indexing and querying
  • Don't use chunk sizes below 200 or above 2000 characters
  • Don't skip overlap — it causes boundary information loss
  • Don't index scanned PDFs without OCR preprocessing
  • Don't index every file into one giant collection
  • Don't trust AI answers without grounded = true check
  • Don't retrieve too many chunks — 3–7 is optimal
  • Don't forget to delete old chunks when updating documents
  • Don't use temperature 0.7+ for grounded generation
  • Don't expose the RAG API without authentication
  • Don't index confidential documents into shared collections
  • Don't skip quality testing before deploying to employees




Coming next
Part 06 — API Integration: Connect AI to Your .NET ERP System

Build a complete AI integration layer for your ASP.NET Core MVC ERP — AI service architecture, streaming responses in Razor views, real-time report explanation buttons, invoice processing pipeline, department-specific AI endpoints, middleware for audit logging, and a complete working ERP module with AI-powered insights.

#RAG#ChromaDB#Embeddings#VectorDatabase#Ollama#LocalAI#PrivateAI#NomicEmbed#SemanticSearch#LangChain#RetrievalAugmentedGeneration#FastAPI#CSharpAI#ASPNetCore#ERPIntegration#BengaliAI#DocumentAI#KnowledgeBase#FreeLearning365#BangladeshTech#FAISS#Qdrant#PyMuPDF#PythonAI#VectorSearch#AIForBusiness

Post a Comment

0 Comments