Fss-Rag-Mini/docs/TECHNICAL_GUIDE.md
BobAi 4166d0a362 Initial release: FSS-Mini-RAG - Lightweight semantic code search system
🎯 Complete transformation from 5.9GB bloated system to 70MB optimized solution

 Key Features:
- Hybrid embedding system (Ollama + ML fallback + hash backup)
- Intelligent chunking with language-aware parsing
- Semantic + BM25 hybrid search with rich context
- Zero-config portable design with graceful degradation
- Beautiful TUI for beginners + powerful CLI for experts
- Comprehensive documentation with 8+ Mermaid diagrams
- Professional animated demo (183KB optimized GIF)

🏗️ Architecture Highlights:
- LanceDB vector storage with streaming indexing
- Smart file tracking (size/mtime) to avoid expensive rehashing
- Progressive chunking: Markdown headers → Python functions → fixed-size
- Quality filtering: 200+ chars, 20+ words, 30% alphanumeric content
- Concurrent batch processing with error recovery

📦 Package Contents:
- Core engine: claude_rag/ (11 modules, 2,847 lines)
- Entry points: rag-mini (unified), rag-tui (beginner interface)
- Documentation: README + 6 guides with visual diagrams
- Assets: 3D icon, optimized demo GIF, recording tools
- Tests: 8 comprehensive integration and validation tests
- Examples: Usage patterns, config templates, dependency analysis

🎥 Demo System:
- Scripted demonstration showing 12 files → 58 chunks indexing
- Semantic search with multi-line result previews
- Complete workflow from TUI startup to CLI mastery
- Professional recording pipeline with asciinema + GIF conversion

🛡️ Security & Quality:
- Complete .gitignore with personal data protection
- Dependency optimization (removed python-dotenv)
- Code quality validation and educational test suite
- Agent-reviewed architecture and documentation

Ready for production use - copy folder, run ./rag-mini, start searching\!
2025-08-12 16:38:28 +10:00

27 KiB

FSS-Mini-RAG Technical Deep Dive

How the system actually works under the hood
For developers who want to understand, modify, and extend the implementation

Table of Contents

System Architecture

FSS-Mini-RAG implements a hybrid semantic search system with three core stages:

graph LR
    subgraph "Input Processing"
        Files[📁 Source Files<br/>.py .md .js .json]
        Language[🔤 Language Detection]
        Files --> Language
    end
    
    subgraph "Intelligent Chunking"
        Language --> Python[🐍 Python AST<br/>Functions & Classes]
        Language --> Markdown[📝 Markdown<br/>Header Sections]
        Language --> Code[💻 Other Code<br/>Smart Boundaries]
        Language --> Text[📄 Plain Text<br/>Fixed Size]
    end
    
    subgraph "Embedding Pipeline"
        Python --> Embed[🧠 Generate Embeddings]
        Markdown --> Embed
        Code --> Embed
        Text --> Embed
        
        Embed --> Ollama[🤖 Ollama API]
        Embed --> ML[🧠 ML Models] 
        Embed --> Hash[#️⃣ Hash Fallback]
    end
    
    subgraph "Storage & Search"
        Ollama --> Store[(💾 LanceDB<br/>Vector Database)]
        ML --> Store
        Hash --> Store
        
        Query[❓ Search Query] --> Vector[🎯 Vector Search]
        Query --> Keyword[🔤 BM25 Search]
        
        Store --> Vector
        Vector --> Hybrid[🔄 Hybrid Results]
        Keyword --> Hybrid
        Hybrid --> Ranked[📊 Ranked Output]
    end
    
    style Files fill:#e3f2fd
    style Store fill:#fff3e0
    style Ranked fill:#e8f5e8

Core Components

  1. ProjectIndexer (indexer.py) - Orchestrates the indexing pipeline
  2. CodeChunker (chunker.py) - Breaks files into meaningful pieces
  3. OllamaEmbedder (ollama_embeddings.py) - Converts text to vectors
  4. CodeSearcher (search.py) - Finds and ranks relevant content
  5. FileWatcher (watcher.py) - Monitors changes for incremental updates

How Text Becomes Searchable

Step 1: File Discovery and Filtering

The system scans directories recursively, applying these filters:

  • Supported extensions: .py, .js, .md, .json, etc. (50+ types)
  • Size limits: Skip files larger than 10MB (configurable)
  • Exclusion patterns: Skip node_modules, .git, __pycache__, etc.
  • Binary detection: Skip binary files automatically

Step 2: Change Detection (Incremental Updates)

Before processing any file, the system checks if re-indexing is needed:

def _needs_reindex(self, file_path: Path, manifest: Dict) -> bool:
    """Smart change detection to avoid unnecessary work."""
    file_info = manifest.get('files', {}).get(str(file_path))
    
    # Quick checks first (fast)
    current_size = file_path.stat().st_size
    current_mtime = file_path.stat().st_mtime
    
    if not file_info:
        return True  # New file
    
    if (file_info.get('size') != current_size or 
        file_info.get('mtime') != current_mtime):
        return True  # Size or time changed
    
    # Content hash check (slower, only when needed)
    if file_info.get('hash') != self._get_file_hash(file_path):
        return True  # Content actually changed
    
    return False  # File unchanged, skip processing

Step 3: Streaming for Large Files

Files larger than 1MB are processed in chunks to avoid memory issues:

def _read_file_streaming(self, file_path: Path) -> str:
    """Read large files in chunks to manage memory."""
    content_parts = []
    
    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
        while True:
            chunk = f.read(8192)  # 8KB chunks
            if not chunk:
                break
            content_parts.append(chunk)
    
    return ''.join(content_parts)

The Embedding Pipeline

Three-Tier Embedding System

The system implements graceful degradation across three embedding methods:

Tier 1: Ollama (Best Quality)

def _get_ollama_embedding(self, text: str) -> Optional[np.ndarray]:
    """High-quality embeddings using local Ollama server."""
    try:
        response = requests.post(
            f"{self.ollama_host}/api/embeddings",
            json={
                "model": self.ollama_model,  # nomic-embed-text
                "prompt": text
            },
            timeout=30
        )
        
        if response.status_code == 200:
            embedding = response.json()["embedding"]
            return np.array(embedding, dtype=np.float32)
            
    except (requests.RequestException, KeyError, ValueError):
        return None  # Fall back to next tier

Tier 2: ML Models (Good Quality)

def _get_ml_embedding(self, text: str) -> Optional[np.ndarray]:
    """Fallback using sentence-transformers."""
    try:
        if not self.ml_model:
            from sentence_transformers import SentenceTransformer
            self.ml_model = SentenceTransformer(
                'sentence-transformers/all-MiniLM-L6-v2'
            )
        
        embedding = self.ml_model.encode(text)
        
        # Pad to 768 dimensions to match Ollama
        if len(embedding) < 768:
            padding = np.zeros(768 - len(embedding))
            embedding = np.concatenate([embedding, padding])
        
        return embedding.astype(np.float32)
        
    except Exception:
        return None  # Fall back to hash method

Tier 3: Hash-Based (Always Works)

def _get_hash_embedding(self, text: str) -> np.ndarray:
    """Deterministic hash-based embedding that always works."""
    # Create deterministic 768-dimensional vector from text hash
    hash_val = hashlib.sha256(text.encode()).hexdigest()
    
    # Convert hex to numbers
    numbers = [int(hash_val[i:i+2], 16) for i in range(0, 64, 2)]
    
    # Expand to 768 dimensions with mathematical transformations
    embedding = []
    for i in range(768):
        base_num = numbers[i % len(numbers)]
        # Apply position-dependent transformations
        transformed = (base_num * (i + 1)) % 256
        embedding.append(transformed / 255.0)  # Normalize to [0,1]
    
    return np.array(embedding, dtype=np.float32)

Batch Processing for Efficiency

When processing multiple texts, the system batches requests:

def embed_texts_batch(self, texts: List[str]) -> np.ndarray:
    """Process multiple texts efficiently with batching."""
    embeddings = []
    
    # Process in batches to manage memory and API limits
    batch_size = self.batch_size  # Default: 32
    
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        
        if self.ollama_available:
            # Concurrent Ollama requests
            with ThreadPoolExecutor(max_workers=4) as executor:
                futures = [executor.submit(self._get_ollama_embedding, text) 
                          for text in batch]
                batch_embeddings = [f.result() for f in futures]
        else:
            # Sequential fallback processing
            batch_embeddings = [self.embed_text(text) for text in batch]
        
        embeddings.extend(batch_embeddings)
    
    return np.array(embeddings)

Chunking Strategies

The system uses different chunking strategies based on file type and content:

Python Files: AST-Based Chunking

def chunk_python_file(self, content: str, file_path: str) -> List[CodeChunk]:
    """Parse Python files using AST for semantic boundaries."""
    try:
        tree = ast.parse(content)
        chunks = []
        
        for node in ast.walk(tree):
            if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
                # Extract function with context
                start_line = node.lineno
                end_line = getattr(node, 'end_lineno', start_line + 10)
                
                func_content = self._extract_lines(content, start_line, end_line)
                
                chunks.append(CodeChunk(
                    content=func_content,
                    file_path=file_path,
                    start_line=start_line,
                    end_line=end_line,
                    chunk_type='function',
                    name=node.name,
                    language='python'
                ))
                
            elif isinstance(node, ast.ClassDef):
                # Similar extraction for classes...
                
    except SyntaxError:
        # Fall back to fixed-size chunking for invalid Python
        return self.chunk_fixed_size(content, file_path)

Markdown Files: Header-Based Chunking

def chunk_markdown_file(self, content: str, file_path: str) -> List[CodeChunk]:
    """Split markdown on headers for logical sections."""
    lines = content.split('\n')
    chunks = []
    current_chunk = []
    current_header = None
    
    for line_num, line in enumerate(lines, 1):
        if line.startswith('#'):
            # New header found - save previous chunk
            if current_chunk:
                chunk_content = '\n'.join(current_chunk)
                chunks.append(CodeChunk(
                    content=chunk_content,
                    file_path=file_path,
                    start_line=line_num - len(current_chunk),
                    end_line=line_num - 1,
                    chunk_type='section',
                    name=current_header,
                    language='markdown'
                ))
                current_chunk = []
            
            current_header = line.strip('#').strip()
        
        current_chunk.append(line)
    
    # Don't forget the last chunk
    if current_chunk:
        # ... save final chunk

Fixed-Size Chunking with Overlap

def chunk_fixed_size(self, content: str, file_path: str) -> List[CodeChunk]:
    """Fallback chunking for unsupported file types."""
    chunks = []
    max_size = self.config.chunking.max_size  # Default: 2000 chars
    overlap = 200  # Character overlap between chunks
    
    for i in range(0, len(content), max_size - overlap):
        chunk_content = content[i:i + max_size]
        
        # Try to break at word boundaries
        if i + max_size < len(content):
            last_space = chunk_content.rfind(' ')
            if last_space > max_size * 0.8:  # Don't break too early
                chunk_content = chunk_content[:last_space]
        
        if len(chunk_content.strip()) >= self.config.chunking.min_size:
            chunks.append(CodeChunk(
                content=chunk_content.strip(),
                file_path=file_path,
                start_line=None,  # Unknown for fixed-size chunks
                end_line=None,
                chunk_type='text',
                name=None,
                language='text'
            ))
    
    return chunks

Search Algorithm

The search combines vector similarity with keyword matching:

def hybrid_search(self, query: str, top_k: int = 10) -> List[SearchResult]:
    """Combine semantic and keyword search for best results."""
    
    # 1. Get semantic results using vector similarity
    query_embedding = self.embedder.embed_text(query)
    semantic_results = self.vector_search(query_embedding, top_k * 2)
    
    # 2. Get keyword results using BM25
    keyword_results = self.keyword_search(query, top_k * 2)
    
    # 3. Combine and re-rank results
    combined_results = self._merge_results(semantic_results, keyword_results)
    
    # 4. Apply final ranking
    final_results = self._rank_results(combined_results, query)
    
    return final_results[:top_k]

def _rank_results(self, results: List[SearchResult], query: str) -> List[SearchResult]:
    """Advanced ranking combining multiple signals."""
    query_terms = set(query.lower().split())
    
    for result in results:
        # Base score from vector similarity
        score = result.similarity_score
        
        # Boost for exact keyword matches
        content_lower = result.content.lower()
        keyword_matches = sum(1 for term in query_terms if term in content_lower)
        keyword_boost = (keyword_matches / len(query_terms)) * 0.3
        
        # Boost for function/class names matching query
        if result.chunk_type in ['function', 'class'] and result.name:
            name_matches = sum(1 for term in query_terms 
                             if term in result.name.lower())
            name_boost = (name_matches / len(query_terms)) * 0.2
        else:
            name_boost = 0
        
        # Penalty for very short chunks (likely incomplete)
        length_penalty = 0
        if len(result.content) < 100:
            length_penalty = 0.1
        
        # Final combined score
        result.final_score = score + keyword_boost + name_boost - length_penalty
    
    return sorted(results, key=lambda r: r.final_score, reverse=True)

Vector Database Operations

Storage and retrieval using LanceDB:

def _create_vector_table(self, chunks: List[CodeChunk], embeddings: np.ndarray):
    """Create LanceDB table with vectors and metadata."""
    
    # Prepare data for LanceDB
    data = []
    for chunk, embedding in zip(chunks, embeddings):
        data.append({
            'vector': embedding.tolist(),  # LanceDB requires lists
            'content': chunk.content,
            'file_path': str(chunk.file_path),
            'start_line': chunk.start_line or 0,
            'end_line': chunk.end_line or 0,
            'chunk_type': chunk.chunk_type,
            'name': chunk.name or '',
            'language': chunk.language,
            'created_at': datetime.now().isoformat()
        })
    
    # Create table with vector index
    table = self.db.create_table("chunks", data, mode="overwrite")
    
    # Add vector index for fast similarity search
    table.create_index("vector", metric="cosine")
    
    return table

def vector_search(self, query_embedding: np.ndarray, limit: int) -> List[SearchResult]:
    """Fast vector similarity search."""
    table = self.db.open_table("chunks")
    
    # LanceDB vector search
    results = (table
               .search(query_embedding.tolist())
               .limit(limit)
               .to_pandas())
    
    search_results = []
    for _, row in results.iterrows():
        search_results.append(SearchResult(
            content=row['content'],
            file_path=Path(row['file_path']),
            similarity_score=1.0 - row['_distance'],  # Convert distance to similarity
            start_line=row['start_line'] if row['start_line'] > 0 else None,
            end_line=row['end_line'] if row['end_line'] > 0 else None,
            chunk_type=row['chunk_type'],
            name=row['name'] if row['name'] else None
        ))
    
    return search_results

Performance Architecture

Memory Management

The system is designed to handle large codebases efficiently:

class MemoryEfficientIndexer:
    """Streaming indexer that processes files without loading everything into memory."""
    
    def __init__(self, max_memory_mb: int = 500):
        self.max_memory_mb = max_memory_mb
        self.current_batch = []
        self.batch_size_bytes = 0
        
    def process_file_batch(self, files: List[Path]):
        """Process files in memory-efficient batches."""
        for file_path in files:
            file_size = file_path.stat().st_size
            
            # Check if adding this file would exceed memory limit
            if (self.batch_size_bytes + file_size > 
                self.max_memory_mb * 1024 * 1024):
                
                # Process current batch and start new one
                self._process_current_batch()
                self._clear_batch()
            
            self.current_batch.append(file_path)
            self.batch_size_bytes += file_size
        
        # Process remaining files
        if self.current_batch:
            self._process_current_batch()

Concurrent Processing

Multiple files are processed in parallel:

def index_files_parallel(self, file_paths: List[Path]) -> List[CodeChunk]:
    """Process multiple files concurrently."""
    all_chunks = []
    
    # Determine optimal worker count based on CPU and file count
    max_workers = min(4, len(file_paths), os.cpu_count() or 1)
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all files for processing
        future_to_file = {
            executor.submit(self._process_single_file, file_path): file_path
            for file_path in file_paths
        }
        
        # Collect results as they complete
        for future in as_completed(future_to_file):
            file_path = future_to_file[future]
            try:
                chunks = future.result()
                all_chunks.extend(chunks)
                
                # Update progress
                self._update_progress(file_path)
                
            except Exception as e:
                logger.error(f"Failed to process {file_path}: {e}")
                self.failed_files.append(file_path)
    
    return all_chunks

Database Optimization

LanceDB is optimized for vector operations:

def optimize_database(self):
    """Optimize database for search performance."""
    table = self.db.open_table("chunks")
    
    # Compact the table to remove deleted rows
    table.compact_files()
    
    # Rebuild vector index for optimal performance
    table.create_index("vector", 
                      metric="cosine",
                      num_partitions=256,  # Optimize for dataset size
                      num_sub_vectors=96)  # Balance speed vs accuracy
    
    # Add secondary indexes for filtering
    table.create_index("file_path")
    table.create_index("chunk_type")
    table.create_index("language")

Configuration System

Hierarchical Configuration

Configuration is loaded from multiple sources with precedence:

def load_configuration(self, project_path: Path) -> RAGConfig:
    """Load configuration with hierarchical precedence."""
    
    # 1. Start with system defaults
    config = RAGConfig()  # Built-in defaults
    
    # 2. Apply global user config if it exists
    global_config_path = Path.home() / '.config' / 'fss-mini-rag' / 'config.yaml'
    if global_config_path.exists():
        global_config = self._load_yaml_config(global_config_path)
        config = self._merge_configs(config, global_config)
    
    # 3. Apply project-specific config
    project_config_path = project_path / '.claude-rag' / 'config.yaml'
    if project_config_path.exists():
        project_config = self._load_yaml_config(project_config_path)
        config = self._merge_configs(config, project_config)
    
    # 4. Apply environment variable overrides
    config = self._apply_env_overrides(config)
    
    return config

Auto-Optimization

The system analyzes projects and suggests optimizations:

class ProjectAnalyzer:
    """Analyzes project characteristics to suggest optimal configuration."""
    
    def analyze_project(self, project_path: Path) -> Dict[str, Any]:
        """Analyze project structure and content patterns."""
        analysis = {
            'total_files': 0,
            'languages': Counter(),
            'file_sizes': [],
            'avg_function_length': 0,
            'documentation_ratio': 0.0
        }
        
        for file_path in project_path.rglob('*'):
            if not file_path.is_file():
                continue
                
            analysis['total_files'] += 1
            
            # Detect language from extension
            language = self._detect_language(file_path)
            analysis['languages'][language] += 1
            
            # Analyze file size
            size = file_path.stat().st_size
            analysis['file_sizes'].append(size)
            
            # Analyze content patterns for supported languages
            if language == 'python':
                func_lengths = self._analyze_python_functions(file_path)
                analysis['avg_function_length'] = np.mean(func_lengths)
        
        return analysis
    
    def generate_recommendations(self, analysis: Dict[str, Any]) -> RAGConfig:
        """Generate optimal configuration based on analysis."""
        config = RAGConfig()
        
        # Adjust chunk size based on average function length
        if analysis['avg_function_length'] > 0:
            # Make chunks large enough to contain average function
            optimal_chunk_size = min(4000, int(analysis['avg_function_length'] * 1.5))
            config.chunking.max_size = optimal_chunk_size
        
        # Adjust streaming threshold based on project size
        if analysis['total_files'] > 1000:
            # Use streaming for smaller files in large projects
            config.streaming.threshold_bytes = 512 * 1024  # 512KB
        
        # Optimize for dominant language
        dominant_language = analysis['languages'].most_common(1)[0][0]
        if dominant_language == 'python':
            config.chunking.strategy = 'semantic'  # Use AST parsing
        elif dominant_language in ['markdown', 'text']:
            config.chunking.strategy = 'header'    # Use header-based
        
        return config

Error Handling & Fallbacks

Graceful Degradation

The system continues working even when components fail:

class RobustIndexer:
    """Indexer with comprehensive error handling and recovery."""
    
    def index_project_with_recovery(self, project_path: Path) -> Dict[str, Any]:
        """Index project with automatic error recovery."""
        results = {
            'files_processed': 0,
            'files_failed': 0,
            'chunks_created': 0,
            'errors': [],
            'fallbacks_used': []
        }
        
        try:
            # Primary indexing path
            return self._index_project_primary(project_path)
            
        except DatabaseCorruptionError as e:
            # Database corrupted - rebuild from scratch
            logger.warning(f"Database corruption detected: {e}")
            self._rebuild_database(project_path)
            results['fallbacks_used'].append('database_rebuild')
            return self._index_project_primary(project_path)
            
        except EmbeddingServiceError as e:
            # Embedding service failed - try fallback
            logger.warning(f"Primary embedding service failed: {e}")
            self.embedder.force_fallback_mode()
            results['fallbacks_used'].append('embedding_fallback')
            return self._index_project_primary(project_path)
            
        except InsufficientMemoryError as e:
            # Out of memory - switch to streaming mode
            logger.warning(f"Memory limit exceeded: {e}")
            self.config.streaming.enabled = True
            self.config.streaming.threshold_bytes = 100 * 1024  # 100KB
            results['fallbacks_used'].append('streaming_mode')
            return self._index_project_primary(project_path)
            
        except Exception as e:
            # Unknown error - attempt minimal indexing
            logger.error(f"Unexpected error during indexing: {e}")
            results['errors'].append(str(e))
            return self._index_project_minimal(project_path, results)
    
    def _index_project_minimal(self, project_path: Path, results: Dict) -> Dict:
        """Minimal indexing mode that processes files individually."""
        # Process files one by one with individual error handling
        for file_path in self._discover_files(project_path):
            try:
                chunks = self._process_single_file_safe(file_path)
                results['chunks_created'] += len(chunks)
                results['files_processed'] += 1
                
            except Exception as e:
                logger.debug(f"Failed to process {file_path}: {e}")
                results['files_failed'] += 1
                results['errors'].append(f"{file_path}: {e}")
        
        return results

Validation and Recovery

The system validates data integrity and can recover from corruption:

def validate_index_integrity(self, project_path: Path) -> bool:
    """Validate that the index is consistent and complete."""
    try:
        rag_dir = project_path / '.claude-rag'
        
        # Check required files exist
        required_files = ['manifest.json', 'database.lance']
        for filename in required_files:
            if not (rag_dir / filename).exists():
                raise IntegrityError(f"Missing required file: {filename}")
        
        # Validate manifest structure
        with open(rag_dir / 'manifest.json') as f:
            manifest = json.load(f)
            
        required_keys = ['file_count', 'chunk_count', 'indexed_at']
        for key in required_keys:
            if key not in manifest:
                raise IntegrityError(f"Missing manifest key: {key}")
        
        # Validate database accessibility
        db = lancedb.connect(rag_dir / 'database.lance')
        table = db.open_table('chunks')
        
        # Quick consistency check
        chunk_count_db = table.count_rows()
        chunk_count_manifest = manifest['chunk_count']
        
        if abs(chunk_count_db - chunk_count_manifest) > 0.1 * chunk_count_manifest:
            raise IntegrityError(f"Chunk count mismatch: DB={chunk_count_db}, Manifest={chunk_count_manifest}")
        
        return True
        
    except Exception as e:
        logger.error(f"Index integrity validation failed: {e}")
        return False

def repair_index(self, project_path: Path) -> bool:
    """Attempt to repair a corrupted index."""
    try:
        rag_dir = project_path / '.claude-rag'
        
        # Create backup of existing index
        backup_dir = rag_dir.parent / f'.claude-rag-backup-{int(time.time())}'
        shutil.copytree(rag_dir, backup_dir)
        
        # Attempt repair operations
        if (rag_dir / 'database.lance').exists():
            # Try to rebuild manifest from database
            db = lancedb.connect(rag_dir / 'database.lance')
            table = db.open_table('chunks')
            
            # Reconstruct manifest
            manifest = {
                'chunk_count': table.count_rows(),
                'file_count': len(set(table.to_pandas()['file_path'])),
                'indexed_at': datetime.now().isoformat(),
                'repaired_at': datetime.now().isoformat(),
                'backup_location': str(backup_dir)
            }
            
            with open(rag_dir / 'manifest.json', 'w') as f:
                json.dump(manifest, f, indent=2)
            
            logger.info(f"Index repaired successfully. Backup saved to {backup_dir}")
            return True
        else:
            # Database missing - need full rebuild
            logger.warning("Database missing - full rebuild required")
            return False
            
    except Exception as e:
        logger.error(f"Index repair failed: {e}")
        return False

This technical guide provides the deep implementation details that developers need to understand, modify, and extend the system, while keeping the main README focused on getting users started quickly.