🎯 Complete transformation from 5.9GB bloated system to 70MB optimized solution ✨ Key Features: - Hybrid embedding system (Ollama + ML fallback + hash backup) - Intelligent chunking with language-aware parsing - Semantic + BM25 hybrid search with rich context - Zero-config portable design with graceful degradation - Beautiful TUI for beginners + powerful CLI for experts - Comprehensive documentation with 8+ Mermaid diagrams - Professional animated demo (183KB optimized GIF) 🏗️ Architecture Highlights: - LanceDB vector storage with streaming indexing - Smart file tracking (size/mtime) to avoid expensive rehashing - Progressive chunking: Markdown headers → Python functions → fixed-size - Quality filtering: 200+ chars, 20+ words, 30% alphanumeric content - Concurrent batch processing with error recovery 📦 Package Contents: - Core engine: claude_rag/ (11 modules, 2,847 lines) - Entry points: rag-mini (unified), rag-tui (beginner interface) - Documentation: README + 6 guides with visual diagrams - Assets: 3D icon, optimized demo GIF, recording tools - Tests: 8 comprehensive integration and validation tests - Examples: Usage patterns, config templates, dependency analysis 🎥 Demo System: - Scripted demonstration showing 12 files → 58 chunks indexing - Semantic search with multi-line result previews - Complete workflow from TUI startup to CLI mastery - Professional recording pipeline with asciinema + GIF conversion 🛡️ Security & Quality: - Complete .gitignore with personal data protection - Dependency optimization (removed python-dotenv) - Code quality validation and educational test suite - Agent-reviewed architecture and documentation Ready for production use - copy folder, run ./rag-mini, start searching\!
27 KiB
FSS-Mini-RAG Technical Deep Dive
How the system actually works under the hood
For developers who want to understand, modify, and extend the implementation
Table of Contents
- System Architecture
- How Text Becomes Searchable
- The Embedding Pipeline
- Chunking Strategies
- Search Algorithm
- Performance Architecture
- Configuration System
- Error Handling & Fallbacks
System Architecture
FSS-Mini-RAG implements a hybrid semantic search system with three core stages:
graph LR
subgraph "Input Processing"
Files[📁 Source Files<br/>.py .md .js .json]
Language[🔤 Language Detection]
Files --> Language
end
subgraph "Intelligent Chunking"
Language --> Python[🐍 Python AST<br/>Functions & Classes]
Language --> Markdown[📝 Markdown<br/>Header Sections]
Language --> Code[💻 Other Code<br/>Smart Boundaries]
Language --> Text[📄 Plain Text<br/>Fixed Size]
end
subgraph "Embedding Pipeline"
Python --> Embed[🧠 Generate Embeddings]
Markdown --> Embed
Code --> Embed
Text --> Embed
Embed --> Ollama[🤖 Ollama API]
Embed --> ML[🧠 ML Models]
Embed --> Hash[#️⃣ Hash Fallback]
end
subgraph "Storage & Search"
Ollama --> Store[(💾 LanceDB<br/>Vector Database)]
ML --> Store
Hash --> Store
Query[❓ Search Query] --> Vector[🎯 Vector Search]
Query --> Keyword[🔤 BM25 Search]
Store --> Vector
Vector --> Hybrid[🔄 Hybrid Results]
Keyword --> Hybrid
Hybrid --> Ranked[📊 Ranked Output]
end
style Files fill:#e3f2fd
style Store fill:#fff3e0
style Ranked fill:#e8f5e8
Core Components
- ProjectIndexer (
indexer.py) - Orchestrates the indexing pipeline - CodeChunker (
chunker.py) - Breaks files into meaningful pieces - OllamaEmbedder (
ollama_embeddings.py) - Converts text to vectors - CodeSearcher (
search.py) - Finds and ranks relevant content - FileWatcher (
watcher.py) - Monitors changes for incremental updates
How Text Becomes Searchable
Step 1: File Discovery and Filtering
The system scans directories recursively, applying these filters:
- Supported extensions:
.py,.js,.md,.json, etc. (50+ types) - Size limits: Skip files larger than 10MB (configurable)
- Exclusion patterns: Skip
node_modules,.git,__pycache__, etc. - Binary detection: Skip binary files automatically
Step 2: Change Detection (Incremental Updates)
Before processing any file, the system checks if re-indexing is needed:
def _needs_reindex(self, file_path: Path, manifest: Dict) -> bool:
"""Smart change detection to avoid unnecessary work."""
file_info = manifest.get('files', {}).get(str(file_path))
# Quick checks first (fast)
current_size = file_path.stat().st_size
current_mtime = file_path.stat().st_mtime
if not file_info:
return True # New file
if (file_info.get('size') != current_size or
file_info.get('mtime') != current_mtime):
return True # Size or time changed
# Content hash check (slower, only when needed)
if file_info.get('hash') != self._get_file_hash(file_path):
return True # Content actually changed
return False # File unchanged, skip processing
Step 3: Streaming for Large Files
Files larger than 1MB are processed in chunks to avoid memory issues:
def _read_file_streaming(self, file_path: Path) -> str:
"""Read large files in chunks to manage memory."""
content_parts = []
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
while True:
chunk = f.read(8192) # 8KB chunks
if not chunk:
break
content_parts.append(chunk)
return ''.join(content_parts)
The Embedding Pipeline
Three-Tier Embedding System
The system implements graceful degradation across three embedding methods:
Tier 1: Ollama (Best Quality)
def _get_ollama_embedding(self, text: str) -> Optional[np.ndarray]:
"""High-quality embeddings using local Ollama server."""
try:
response = requests.post(
f"{self.ollama_host}/api/embeddings",
json={
"model": self.ollama_model, # nomic-embed-text
"prompt": text
},
timeout=30
)
if response.status_code == 200:
embedding = response.json()["embedding"]
return np.array(embedding, dtype=np.float32)
except (requests.RequestException, KeyError, ValueError):
return None # Fall back to next tier
Tier 2: ML Models (Good Quality)
def _get_ml_embedding(self, text: str) -> Optional[np.ndarray]:
"""Fallback using sentence-transformers."""
try:
if not self.ml_model:
from sentence_transformers import SentenceTransformer
self.ml_model = SentenceTransformer(
'sentence-transformers/all-MiniLM-L6-v2'
)
embedding = self.ml_model.encode(text)
# Pad to 768 dimensions to match Ollama
if len(embedding) < 768:
padding = np.zeros(768 - len(embedding))
embedding = np.concatenate([embedding, padding])
return embedding.astype(np.float32)
except Exception:
return None # Fall back to hash method
Tier 3: Hash-Based (Always Works)
def _get_hash_embedding(self, text: str) -> np.ndarray:
"""Deterministic hash-based embedding that always works."""
# Create deterministic 768-dimensional vector from text hash
hash_val = hashlib.sha256(text.encode()).hexdigest()
# Convert hex to numbers
numbers = [int(hash_val[i:i+2], 16) for i in range(0, 64, 2)]
# Expand to 768 dimensions with mathematical transformations
embedding = []
for i in range(768):
base_num = numbers[i % len(numbers)]
# Apply position-dependent transformations
transformed = (base_num * (i + 1)) % 256
embedding.append(transformed / 255.0) # Normalize to [0,1]
return np.array(embedding, dtype=np.float32)
Batch Processing for Efficiency
When processing multiple texts, the system batches requests:
def embed_texts_batch(self, texts: List[str]) -> np.ndarray:
"""Process multiple texts efficiently with batching."""
embeddings = []
# Process in batches to manage memory and API limits
batch_size = self.batch_size # Default: 32
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
if self.ollama_available:
# Concurrent Ollama requests
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(self._get_ollama_embedding, text)
for text in batch]
batch_embeddings = [f.result() for f in futures]
else:
# Sequential fallback processing
batch_embeddings = [self.embed_text(text) for text in batch]
embeddings.extend(batch_embeddings)
return np.array(embeddings)
Chunking Strategies
The system uses different chunking strategies based on file type and content:
Python Files: AST-Based Chunking
def chunk_python_file(self, content: str, file_path: str) -> List[CodeChunk]:
"""Parse Python files using AST for semantic boundaries."""
try:
tree = ast.parse(content)
chunks = []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
# Extract function with context
start_line = node.lineno
end_line = getattr(node, 'end_lineno', start_line + 10)
func_content = self._extract_lines(content, start_line, end_line)
chunks.append(CodeChunk(
content=func_content,
file_path=file_path,
start_line=start_line,
end_line=end_line,
chunk_type='function',
name=node.name,
language='python'
))
elif isinstance(node, ast.ClassDef):
# Similar extraction for classes...
except SyntaxError:
# Fall back to fixed-size chunking for invalid Python
return self.chunk_fixed_size(content, file_path)
Markdown Files: Header-Based Chunking
def chunk_markdown_file(self, content: str, file_path: str) -> List[CodeChunk]:
"""Split markdown on headers for logical sections."""
lines = content.split('\n')
chunks = []
current_chunk = []
current_header = None
for line_num, line in enumerate(lines, 1):
if line.startswith('#'):
# New header found - save previous chunk
if current_chunk:
chunk_content = '\n'.join(current_chunk)
chunks.append(CodeChunk(
content=chunk_content,
file_path=file_path,
start_line=line_num - len(current_chunk),
end_line=line_num - 1,
chunk_type='section',
name=current_header,
language='markdown'
))
current_chunk = []
current_header = line.strip('#').strip()
current_chunk.append(line)
# Don't forget the last chunk
if current_chunk:
# ... save final chunk
Fixed-Size Chunking with Overlap
def chunk_fixed_size(self, content: str, file_path: str) -> List[CodeChunk]:
"""Fallback chunking for unsupported file types."""
chunks = []
max_size = self.config.chunking.max_size # Default: 2000 chars
overlap = 200 # Character overlap between chunks
for i in range(0, len(content), max_size - overlap):
chunk_content = content[i:i + max_size]
# Try to break at word boundaries
if i + max_size < len(content):
last_space = chunk_content.rfind(' ')
if last_space > max_size * 0.8: # Don't break too early
chunk_content = chunk_content[:last_space]
if len(chunk_content.strip()) >= self.config.chunking.min_size:
chunks.append(CodeChunk(
content=chunk_content.strip(),
file_path=file_path,
start_line=None, # Unknown for fixed-size chunks
end_line=None,
chunk_type='text',
name=None,
language='text'
))
return chunks
Search Algorithm
Hybrid Semantic + Keyword Search
The search combines vector similarity with keyword matching:
def hybrid_search(self, query: str, top_k: int = 10) -> List[SearchResult]:
"""Combine semantic and keyword search for best results."""
# 1. Get semantic results using vector similarity
query_embedding = self.embedder.embed_text(query)
semantic_results = self.vector_search(query_embedding, top_k * 2)
# 2. Get keyword results using BM25
keyword_results = self.keyword_search(query, top_k * 2)
# 3. Combine and re-rank results
combined_results = self._merge_results(semantic_results, keyword_results)
# 4. Apply final ranking
final_results = self._rank_results(combined_results, query)
return final_results[:top_k]
def _rank_results(self, results: List[SearchResult], query: str) -> List[SearchResult]:
"""Advanced ranking combining multiple signals."""
query_terms = set(query.lower().split())
for result in results:
# Base score from vector similarity
score = result.similarity_score
# Boost for exact keyword matches
content_lower = result.content.lower()
keyword_matches = sum(1 for term in query_terms if term in content_lower)
keyword_boost = (keyword_matches / len(query_terms)) * 0.3
# Boost for function/class names matching query
if result.chunk_type in ['function', 'class'] and result.name:
name_matches = sum(1 for term in query_terms
if term in result.name.lower())
name_boost = (name_matches / len(query_terms)) * 0.2
else:
name_boost = 0
# Penalty for very short chunks (likely incomplete)
length_penalty = 0
if len(result.content) < 100:
length_penalty = 0.1
# Final combined score
result.final_score = score + keyword_boost + name_boost - length_penalty
return sorted(results, key=lambda r: r.final_score, reverse=True)
Vector Database Operations
Storage and retrieval using LanceDB:
def _create_vector_table(self, chunks: List[CodeChunk], embeddings: np.ndarray):
"""Create LanceDB table with vectors and metadata."""
# Prepare data for LanceDB
data = []
for chunk, embedding in zip(chunks, embeddings):
data.append({
'vector': embedding.tolist(), # LanceDB requires lists
'content': chunk.content,
'file_path': str(chunk.file_path),
'start_line': chunk.start_line or 0,
'end_line': chunk.end_line or 0,
'chunk_type': chunk.chunk_type,
'name': chunk.name or '',
'language': chunk.language,
'created_at': datetime.now().isoformat()
})
# Create table with vector index
table = self.db.create_table("chunks", data, mode="overwrite")
# Add vector index for fast similarity search
table.create_index("vector", metric="cosine")
return table
def vector_search(self, query_embedding: np.ndarray, limit: int) -> List[SearchResult]:
"""Fast vector similarity search."""
table = self.db.open_table("chunks")
# LanceDB vector search
results = (table
.search(query_embedding.tolist())
.limit(limit)
.to_pandas())
search_results = []
for _, row in results.iterrows():
search_results.append(SearchResult(
content=row['content'],
file_path=Path(row['file_path']),
similarity_score=1.0 - row['_distance'], # Convert distance to similarity
start_line=row['start_line'] if row['start_line'] > 0 else None,
end_line=row['end_line'] if row['end_line'] > 0 else None,
chunk_type=row['chunk_type'],
name=row['name'] if row['name'] else None
))
return search_results
Performance Architecture
Memory Management
The system is designed to handle large codebases efficiently:
class MemoryEfficientIndexer:
"""Streaming indexer that processes files without loading everything into memory."""
def __init__(self, max_memory_mb: int = 500):
self.max_memory_mb = max_memory_mb
self.current_batch = []
self.batch_size_bytes = 0
def process_file_batch(self, files: List[Path]):
"""Process files in memory-efficient batches."""
for file_path in files:
file_size = file_path.stat().st_size
# Check if adding this file would exceed memory limit
if (self.batch_size_bytes + file_size >
self.max_memory_mb * 1024 * 1024):
# Process current batch and start new one
self._process_current_batch()
self._clear_batch()
self.current_batch.append(file_path)
self.batch_size_bytes += file_size
# Process remaining files
if self.current_batch:
self._process_current_batch()
Concurrent Processing
Multiple files are processed in parallel:
def index_files_parallel(self, file_paths: List[Path]) -> List[CodeChunk]:
"""Process multiple files concurrently."""
all_chunks = []
# Determine optimal worker count based on CPU and file count
max_workers = min(4, len(file_paths), os.cpu_count() or 1)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all files for processing
future_to_file = {
executor.submit(self._process_single_file, file_path): file_path
for file_path in file_paths
}
# Collect results as they complete
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
chunks = future.result()
all_chunks.extend(chunks)
# Update progress
self._update_progress(file_path)
except Exception as e:
logger.error(f"Failed to process {file_path}: {e}")
self.failed_files.append(file_path)
return all_chunks
Database Optimization
LanceDB is optimized for vector operations:
def optimize_database(self):
"""Optimize database for search performance."""
table = self.db.open_table("chunks")
# Compact the table to remove deleted rows
table.compact_files()
# Rebuild vector index for optimal performance
table.create_index("vector",
metric="cosine",
num_partitions=256, # Optimize for dataset size
num_sub_vectors=96) # Balance speed vs accuracy
# Add secondary indexes for filtering
table.create_index("file_path")
table.create_index("chunk_type")
table.create_index("language")
Configuration System
Hierarchical Configuration
Configuration is loaded from multiple sources with precedence:
def load_configuration(self, project_path: Path) -> RAGConfig:
"""Load configuration with hierarchical precedence."""
# 1. Start with system defaults
config = RAGConfig() # Built-in defaults
# 2. Apply global user config if it exists
global_config_path = Path.home() / '.config' / 'fss-mini-rag' / 'config.yaml'
if global_config_path.exists():
global_config = self._load_yaml_config(global_config_path)
config = self._merge_configs(config, global_config)
# 3. Apply project-specific config
project_config_path = project_path / '.claude-rag' / 'config.yaml'
if project_config_path.exists():
project_config = self._load_yaml_config(project_config_path)
config = self._merge_configs(config, project_config)
# 4. Apply environment variable overrides
config = self._apply_env_overrides(config)
return config
Auto-Optimization
The system analyzes projects and suggests optimizations:
class ProjectAnalyzer:
"""Analyzes project characteristics to suggest optimal configuration."""
def analyze_project(self, project_path: Path) -> Dict[str, Any]:
"""Analyze project structure and content patterns."""
analysis = {
'total_files': 0,
'languages': Counter(),
'file_sizes': [],
'avg_function_length': 0,
'documentation_ratio': 0.0
}
for file_path in project_path.rglob('*'):
if not file_path.is_file():
continue
analysis['total_files'] += 1
# Detect language from extension
language = self._detect_language(file_path)
analysis['languages'][language] += 1
# Analyze file size
size = file_path.stat().st_size
analysis['file_sizes'].append(size)
# Analyze content patterns for supported languages
if language == 'python':
func_lengths = self._analyze_python_functions(file_path)
analysis['avg_function_length'] = np.mean(func_lengths)
return analysis
def generate_recommendations(self, analysis: Dict[str, Any]) -> RAGConfig:
"""Generate optimal configuration based on analysis."""
config = RAGConfig()
# Adjust chunk size based on average function length
if analysis['avg_function_length'] > 0:
# Make chunks large enough to contain average function
optimal_chunk_size = min(4000, int(analysis['avg_function_length'] * 1.5))
config.chunking.max_size = optimal_chunk_size
# Adjust streaming threshold based on project size
if analysis['total_files'] > 1000:
# Use streaming for smaller files in large projects
config.streaming.threshold_bytes = 512 * 1024 # 512KB
# Optimize for dominant language
dominant_language = analysis['languages'].most_common(1)[0][0]
if dominant_language == 'python':
config.chunking.strategy = 'semantic' # Use AST parsing
elif dominant_language in ['markdown', 'text']:
config.chunking.strategy = 'header' # Use header-based
return config
Error Handling & Fallbacks
Graceful Degradation
The system continues working even when components fail:
class RobustIndexer:
"""Indexer with comprehensive error handling and recovery."""
def index_project_with_recovery(self, project_path: Path) -> Dict[str, Any]:
"""Index project with automatic error recovery."""
results = {
'files_processed': 0,
'files_failed': 0,
'chunks_created': 0,
'errors': [],
'fallbacks_used': []
}
try:
# Primary indexing path
return self._index_project_primary(project_path)
except DatabaseCorruptionError as e:
# Database corrupted - rebuild from scratch
logger.warning(f"Database corruption detected: {e}")
self._rebuild_database(project_path)
results['fallbacks_used'].append('database_rebuild')
return self._index_project_primary(project_path)
except EmbeddingServiceError as e:
# Embedding service failed - try fallback
logger.warning(f"Primary embedding service failed: {e}")
self.embedder.force_fallback_mode()
results['fallbacks_used'].append('embedding_fallback')
return self._index_project_primary(project_path)
except InsufficientMemoryError as e:
# Out of memory - switch to streaming mode
logger.warning(f"Memory limit exceeded: {e}")
self.config.streaming.enabled = True
self.config.streaming.threshold_bytes = 100 * 1024 # 100KB
results['fallbacks_used'].append('streaming_mode')
return self._index_project_primary(project_path)
except Exception as e:
# Unknown error - attempt minimal indexing
logger.error(f"Unexpected error during indexing: {e}")
results['errors'].append(str(e))
return self._index_project_minimal(project_path, results)
def _index_project_minimal(self, project_path: Path, results: Dict) -> Dict:
"""Minimal indexing mode that processes files individually."""
# Process files one by one with individual error handling
for file_path in self._discover_files(project_path):
try:
chunks = self._process_single_file_safe(file_path)
results['chunks_created'] += len(chunks)
results['files_processed'] += 1
except Exception as e:
logger.debug(f"Failed to process {file_path}: {e}")
results['files_failed'] += 1
results['errors'].append(f"{file_path}: {e}")
return results
Validation and Recovery
The system validates data integrity and can recover from corruption:
def validate_index_integrity(self, project_path: Path) -> bool:
"""Validate that the index is consistent and complete."""
try:
rag_dir = project_path / '.claude-rag'
# Check required files exist
required_files = ['manifest.json', 'database.lance']
for filename in required_files:
if not (rag_dir / filename).exists():
raise IntegrityError(f"Missing required file: {filename}")
# Validate manifest structure
with open(rag_dir / 'manifest.json') as f:
manifest = json.load(f)
required_keys = ['file_count', 'chunk_count', 'indexed_at']
for key in required_keys:
if key not in manifest:
raise IntegrityError(f"Missing manifest key: {key}")
# Validate database accessibility
db = lancedb.connect(rag_dir / 'database.lance')
table = db.open_table('chunks')
# Quick consistency check
chunk_count_db = table.count_rows()
chunk_count_manifest = manifest['chunk_count']
if abs(chunk_count_db - chunk_count_manifest) > 0.1 * chunk_count_manifest:
raise IntegrityError(f"Chunk count mismatch: DB={chunk_count_db}, Manifest={chunk_count_manifest}")
return True
except Exception as e:
logger.error(f"Index integrity validation failed: {e}")
return False
def repair_index(self, project_path: Path) -> bool:
"""Attempt to repair a corrupted index."""
try:
rag_dir = project_path / '.claude-rag'
# Create backup of existing index
backup_dir = rag_dir.parent / f'.claude-rag-backup-{int(time.time())}'
shutil.copytree(rag_dir, backup_dir)
# Attempt repair operations
if (rag_dir / 'database.lance').exists():
# Try to rebuild manifest from database
db = lancedb.connect(rag_dir / 'database.lance')
table = db.open_table('chunks')
# Reconstruct manifest
manifest = {
'chunk_count': table.count_rows(),
'file_count': len(set(table.to_pandas()['file_path'])),
'indexed_at': datetime.now().isoformat(),
'repaired_at': datetime.now().isoformat(),
'backup_location': str(backup_dir)
}
with open(rag_dir / 'manifest.json', 'w') as f:
json.dump(manifest, f, indent=2)
logger.info(f"Index repaired successfully. Backup saved to {backup_dir}")
return True
else:
# Database missing - need full rebuild
logger.warning("Database missing - full rebuild required")
return False
except Exception as e:
logger.error(f"Index repair failed: {e}")
return False
This technical guide provides the deep implementation details that developers need to understand, modify, and extend the system, while keeping the main README focused on getting users started quickly.