Fss-Rag-Mini/docs/TECHNICAL_GUIDE.md
BobAi a1f84e2bd5 Update model recommendations to Qwen3 4B and fix status command
- Changed primary model recommendation from qwen3:1.7b to qwen3:4b
- Added Q8 quantization info in technical docs for production users
- Fixed method name error: get_embedding_info() -> get_status()
- Updated all error messages and test files with new recommendations
- Maintained beginner-friendly options (1.7b still very good, 0.6b surprisingly good)
- Added explanation of why small models work well with RAG context
- Comprehensive testing completed - system ready for clean release
2025-08-12 20:01:16 +10:00

29 KiB

FSS-Mini-RAG Technical Deep Dive

How the system actually works under the hood
For developers who want to understand, modify, and extend the implementation

Table of Contents

System Architecture

FSS-Mini-RAG implements a hybrid semantic search system with three core stages:

graph LR
    subgraph "Input Processing"
        Files[📁 Source Files<br/>.py .md .js .json]
        Language[🔤 Language Detection]
        Files --> Language
    end
    
    subgraph "Intelligent Chunking"
        Language --> Python[🐍 Python AST<br/>Functions & Classes]
        Language --> Markdown[📝 Markdown<br/>Header Sections]
        Language --> Code[💻 Other Code<br/>Smart Boundaries]
        Language --> Text[📄 Plain Text<br/>Fixed Size]
    end
    
    subgraph "Embedding Pipeline"
        Python --> Embed[🧠 Generate Embeddings]
        Markdown --> Embed
        Code --> Embed
        Text --> Embed
        
        Embed --> Ollama[🤖 Ollama API]
        Embed --> ML[🧠 ML Models] 
        Embed --> Hash[#️⃣ Hash Fallback]
    end
    
    subgraph "Storage & Search"
        Ollama --> Store[(💾 LanceDB<br/>Vector Database)]
        ML --> Store
        Hash --> Store
        
        Query[❓ Search Query] --> Vector[🎯 Vector Search]
        Query --> Keyword[🔤 BM25 Search]
        
        Store --> Vector
        Vector --> Hybrid[🔄 Hybrid Results]
        Keyword --> Hybrid
        Hybrid --> Ranked[📊 Ranked Output]
    end
    
    style Files fill:#e3f2fd
    style Store fill:#fff3e0
    style Ranked fill:#e8f5e8

Core Components

  1. ProjectIndexer (indexer.py) - Orchestrates the indexing pipeline
  2. CodeChunker (chunker.py) - Breaks files into meaningful pieces
  3. OllamaEmbedder (ollama_embeddings.py) - Converts text to vectors
  4. CodeSearcher (search.py) - Finds and ranks relevant content
  5. FileWatcher (watcher.py) - Monitors changes for incremental updates

How Text Becomes Searchable

Step 1: File Discovery and Filtering

The system scans directories recursively, applying these filters:

  • Supported extensions: .py, .js, .md, .json, etc. (50+ types)
  • Size limits: Skip files larger than 10MB (configurable)
  • Exclusion patterns: Skip node_modules, .git, __pycache__, etc.
  • Binary detection: Skip binary files automatically

Step 2: Change Detection (Incremental Updates)

Before processing any file, the system checks if re-indexing is needed:

def _needs_reindex(self, file_path: Path, manifest: Dict) -> bool:
    """Smart change detection to avoid unnecessary work."""
    file_info = manifest.get('files', {}).get(str(file_path))
    
    # Quick checks first (fast)
    current_size = file_path.stat().st_size
    current_mtime = file_path.stat().st_mtime
    
    if not file_info:
        return True  # New file
    
    if (file_info.get('size') != current_size or 
        file_info.get('mtime') != current_mtime):
        return True  # Size or time changed
    
    # Content hash check (slower, only when needed)
    if file_info.get('hash') != self._get_file_hash(file_path):
        return True  # Content actually changed
    
    return False  # File unchanged, skip processing

Step 3: Streaming for Large Files

Files larger than 1MB are processed in chunks to avoid memory issues:

def _read_file_streaming(self, file_path: Path) -> str:
    """Read large files in chunks to manage memory."""
    content_parts = []
    
    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
        while True:
            chunk = f.read(8192)  # 8KB chunks
            if not chunk:
                break
            content_parts.append(chunk)
    
    return ''.join(content_parts)

The Embedding Pipeline

Three-Tier Embedding System

The system implements graceful degradation across three embedding methods:

Tier 1: Ollama (Best Quality)

def _get_ollama_embedding(self, text: str) -> Optional[np.ndarray]:
    """High-quality embeddings using local Ollama server."""
    try:
        response = requests.post(
            f"{self.ollama_host}/api/embeddings",
            json={
                "model": self.ollama_model,  # nomic-embed-text
                "prompt": text
            },
            timeout=30
        )
        
        if response.status_code == 200:
            embedding = response.json()["embedding"]
            return np.array(embedding, dtype=np.float32)
            
    except (requests.RequestException, KeyError, ValueError):
        return None  # Fall back to next tier

Tier 2: ML Models (Good Quality)

def _get_ml_embedding(self, text: str) -> Optional[np.ndarray]:
    """Fallback using sentence-transformers."""
    try:
        if not self.ml_model:
            from sentence_transformers import SentenceTransformer
            self.ml_model = SentenceTransformer(
                'sentence-transformers/all-MiniLM-L6-v2'
            )
        
        embedding = self.ml_model.encode(text)
        
        # Pad to 768 dimensions to match Ollama
        if len(embedding) < 768:
            padding = np.zeros(768 - len(embedding))
            embedding = np.concatenate([embedding, padding])
        
        return embedding.astype(np.float32)
        
    except Exception:
        return None  # Fall back to hash method

Tier 3: Hash-Based (Always Works)

def _get_hash_embedding(self, text: str) -> np.ndarray:
    """Deterministic hash-based embedding that always works."""
    # Create deterministic 768-dimensional vector from text hash
    hash_val = hashlib.sha256(text.encode()).hexdigest()
    
    # Convert hex to numbers
    numbers = [int(hash_val[i:i+2], 16) for i in range(0, 64, 2)]
    
    # Expand to 768 dimensions with mathematical transformations
    embedding = []
    for i in range(768):
        base_num = numbers[i % len(numbers)]
        # Apply position-dependent transformations
        transformed = (base_num * (i + 1)) % 256
        embedding.append(transformed / 255.0)  # Normalize to [0,1]
    
    return np.array(embedding, dtype=np.float32)

Batch Processing for Efficiency

When processing multiple texts, the system batches requests:

def embed_texts_batch(self, texts: List[str]) -> np.ndarray:
    """Process multiple texts efficiently with batching."""
    embeddings = []
    
    # Process in batches to manage memory and API limits
    batch_size = self.batch_size  # Default: 32
    
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        
        if self.ollama_available:
            # Concurrent Ollama requests
            with ThreadPoolExecutor(max_workers=4) as executor:
                futures = [executor.submit(self._get_ollama_embedding, text) 
                          for text in batch]
                batch_embeddings = [f.result() for f in futures]
        else:
            # Sequential fallback processing
            batch_embeddings = [self.embed_text(text) for text in batch]
        
        embeddings.extend(batch_embeddings)
    
    return np.array(embeddings)

Chunking Strategies

The system uses different chunking strategies based on file type and content:

Python Files: AST-Based Chunking

def chunk_python_file(self, content: str, file_path: str) -> List[CodeChunk]:
    """Parse Python files using AST for semantic boundaries."""
    try:
        tree = ast.parse(content)
        chunks = []
        
        for node in ast.walk(tree):
            if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
                # Extract function with context
                start_line = node.lineno
                end_line = getattr(node, 'end_lineno', start_line + 10)
                
                func_content = self._extract_lines(content, start_line, end_line)
                
                chunks.append(CodeChunk(
                    content=func_content,
                    file_path=file_path,
                    start_line=start_line,
                    end_line=end_line,
                    chunk_type='function',
                    name=node.name,
                    language='python'
                ))
                
            elif isinstance(node, ast.ClassDef):
                # Similar extraction for classes...
                
    except SyntaxError:
        # Fall back to fixed-size chunking for invalid Python
        return self.chunk_fixed_size(content, file_path)

Markdown Files: Header-Based Chunking

def chunk_markdown_file(self, content: str, file_path: str) -> List[CodeChunk]:
    """Split markdown on headers for logical sections."""
    lines = content.split('\n')
    chunks = []
    current_chunk = []
    current_header = None
    
    for line_num, line in enumerate(lines, 1):
        if line.startswith('#'):
            # New header found - save previous chunk
            if current_chunk:
                chunk_content = '\n'.join(current_chunk)
                chunks.append(CodeChunk(
                    content=chunk_content,
                    file_path=file_path,
                    start_line=line_num - len(current_chunk),
                    end_line=line_num - 1,
                    chunk_type='section',
                    name=current_header,
                    language='markdown'
                ))
                current_chunk = []
            
            current_header = line.strip('#').strip()
        
        current_chunk.append(line)
    
    # Don't forget the last chunk
    if current_chunk:
        # ... save final chunk

Fixed-Size Chunking with Overlap

def chunk_fixed_size(self, content: str, file_path: str) -> List[CodeChunk]:
    """Fallback chunking for unsupported file types."""
    chunks = []
    max_size = self.config.chunking.max_size  # Default: 2000 chars
    overlap = 200  # Character overlap between chunks
    
    for i in range(0, len(content), max_size - overlap):
        chunk_content = content[i:i + max_size]
        
        # Try to break at word boundaries
        if i + max_size < len(content):
            last_space = chunk_content.rfind(' ')
            if last_space > max_size * 0.8:  # Don't break too early
                chunk_content = chunk_content[:last_space]
        
        if len(chunk_content.strip()) >= self.config.chunking.min_size:
            chunks.append(CodeChunk(
                content=chunk_content.strip(),
                file_path=file_path,
                start_line=None,  # Unknown for fixed-size chunks
                end_line=None,
                chunk_type='text',
                name=None,
                language='text'
            ))
    
    return chunks

Search Algorithm

The search combines vector similarity with keyword matching:

def hybrid_search(self, query: str, top_k: int = 10) -> List[SearchResult]:
    """Combine semantic and keyword search for best results."""
    
    # 1. Get semantic results using vector similarity
    query_embedding = self.embedder.embed_text(query)
    semantic_results = self.vector_search(query_embedding, top_k * 2)
    
    # 2. Get keyword results using BM25
    keyword_results = self.keyword_search(query, top_k * 2)
    
    # 3. Combine and re-rank results
    combined_results = self._merge_results(semantic_results, keyword_results)
    
    # 4. Apply final ranking
    final_results = self._rank_results(combined_results, query)
    
    return final_results[:top_k]

def _rank_results(self, results: List[SearchResult], query: str) -> List[SearchResult]:
    """Advanced ranking combining multiple signals."""
    query_terms = set(query.lower().split())
    
    for result in results:
        # Base score from vector similarity
        score = result.similarity_score
        
        # Boost for exact keyword matches
        content_lower = result.content.lower()
        keyword_matches = sum(1 for term in query_terms if term in content_lower)
        keyword_boost = (keyword_matches / len(query_terms)) * 0.3
        
        # Boost for function/class names matching query
        if result.chunk_type in ['function', 'class'] and result.name:
            name_matches = sum(1 for term in query_terms 
                             if term in result.name.lower())
            name_boost = (name_matches / len(query_terms)) * 0.2
        else:
            name_boost = 0
        
        # Penalty for very short chunks (likely incomplete)
        length_penalty = 0
        if len(result.content) < 100:
            length_penalty = 0.1
        
        # Final combined score
        result.final_score = score + keyword_boost + name_boost - length_penalty
    
    return sorted(results, key=lambda r: r.final_score, reverse=True)

Vector Database Operations

Storage and retrieval using LanceDB:

def _create_vector_table(self, chunks: List[CodeChunk], embeddings: np.ndarray):
    """Create LanceDB table with vectors and metadata."""
    
    # Prepare data for LanceDB
    data = []
    for chunk, embedding in zip(chunks, embeddings):
        data.append({
            'vector': embedding.tolist(),  # LanceDB requires lists
            'content': chunk.content,
            'file_path': str(chunk.file_path),
            'start_line': chunk.start_line or 0,
            'end_line': chunk.end_line or 0,
            'chunk_type': chunk.chunk_type,
            'name': chunk.name or '',
            'language': chunk.language,
            'created_at': datetime.now().isoformat()
        })
    
    # Create table with vector index
    table = self.db.create_table("chunks", data, mode="overwrite")
    
    # Add vector index for fast similarity search
    table.create_index("vector", metric="cosine")
    
    return table

def vector_search(self, query_embedding: np.ndarray, limit: int) -> List[SearchResult]:
    """Fast vector similarity search."""
    table = self.db.open_table("chunks")
    
    # LanceDB vector search
    results = (table
               .search(query_embedding.tolist())
               .limit(limit)
               .to_pandas())
    
    search_results = []
    for _, row in results.iterrows():
        search_results.append(SearchResult(
            content=row['content'],
            file_path=Path(row['file_path']),
            similarity_score=1.0 - row['_distance'],  # Convert distance to similarity
            start_line=row['start_line'] if row['start_line'] > 0 else None,
            end_line=row['end_line'] if row['end_line'] > 0 else None,
            chunk_type=row['chunk_type'],
            name=row['name'] if row['name'] else None
        ))
    
    return search_results

Performance Architecture

Memory Management

The system is designed to handle large codebases efficiently:

class MemoryEfficientIndexer:
    """Streaming indexer that processes files without loading everything into memory."""
    
    def __init__(self, max_memory_mb: int = 500):
        self.max_memory_mb = max_memory_mb
        self.current_batch = []
        self.batch_size_bytes = 0
        
    def process_file_batch(self, files: List[Path]):
        """Process files in memory-efficient batches."""
        for file_path in files:
            file_size = file_path.stat().st_size
            
            # Check if adding this file would exceed memory limit
            if (self.batch_size_bytes + file_size > 
                self.max_memory_mb * 1024 * 1024):
                
                # Process current batch and start new one
                self._process_current_batch()
                self._clear_batch()
            
            self.current_batch.append(file_path)
            self.batch_size_bytes += file_size
        
        # Process remaining files
        if self.current_batch:
            self._process_current_batch()

Concurrent Processing

Multiple files are processed in parallel:

def index_files_parallel(self, file_paths: List[Path]) -> List[CodeChunk]:
    """Process multiple files concurrently."""
    all_chunks = []
    
    # Determine optimal worker count based on CPU and file count
    max_workers = min(4, len(file_paths), os.cpu_count() or 1)
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all files for processing
        future_to_file = {
            executor.submit(self._process_single_file, file_path): file_path
            for file_path in file_paths
        }
        
        # Collect results as they complete
        for future in as_completed(future_to_file):
            file_path = future_to_file[future]
            try:
                chunks = future.result()
                all_chunks.extend(chunks)
                
                # Update progress
                self._update_progress(file_path)
                
            except Exception as e:
                logger.error(f"Failed to process {file_path}: {e}")
                self.failed_files.append(file_path)
    
    return all_chunks

Database Optimization

LanceDB is optimized for vector operations:

def optimize_database(self):
    """Optimize database for search performance."""
    table = self.db.open_table("chunks")
    
    # Compact the table to remove deleted rows
    table.compact_files()
    
    # Rebuild vector index for optimal performance
    table.create_index("vector", 
                      metric="cosine",
                      num_partitions=256,  # Optimize for dataset size
                      num_sub_vectors=96)  # Balance speed vs accuracy
    
    # Add secondary indexes for filtering
    table.create_index("file_path")
    table.create_index("chunk_type")
    table.create_index("language")

Configuration System

Hierarchical Configuration

Configuration is loaded from multiple sources with precedence:

def load_configuration(self, project_path: Path) -> RAGConfig:
    """Load configuration with hierarchical precedence."""
    
    # 1. Start with system defaults
    config = RAGConfig()  # Built-in defaults
    
    # 2. Apply global user config if it exists
    global_config_path = Path.home() / '.config' / 'fss-mini-rag' / 'config.yaml'
    if global_config_path.exists():
        global_config = self._load_yaml_config(global_config_path)
        config = self._merge_configs(config, global_config)
    
    # 3. Apply project-specific config
    project_config_path = project_path / '.mini-rag' / 'config.yaml'
    if project_config_path.exists():
        project_config = self._load_yaml_config(project_config_path)
        config = self._merge_configs(config, project_config)
    
    # 4. Apply environment variable overrides
    config = self._apply_env_overrides(config)
    
    return config

Auto-Optimization

The system analyzes projects and suggests optimizations:

class ProjectAnalyzer:
    """Analyzes project characteristics to suggest optimal configuration."""
    
    def analyze_project(self, project_path: Path) -> Dict[str, Any]:
        """Analyze project structure and content patterns."""
        analysis = {
            'total_files': 0,
            'languages': Counter(),
            'file_sizes': [],
            'avg_function_length': 0,
            'documentation_ratio': 0.0
        }
        
        for file_path in project_path.rglob('*'):
            if not file_path.is_file():
                continue
                
            analysis['total_files'] += 1
            
            # Detect language from extension
            language = self._detect_language(file_path)
            analysis['languages'][language] += 1
            
            # Analyze file size
            size = file_path.stat().st_size
            analysis['file_sizes'].append(size)
            
            # Analyze content patterns for supported languages
            if language == 'python':
                func_lengths = self._analyze_python_functions(file_path)
                analysis['avg_function_length'] = np.mean(func_lengths)
        
        return analysis
    
    def generate_recommendations(self, analysis: Dict[str, Any]) -> RAGConfig:
        """Generate optimal configuration based on analysis."""
        config = RAGConfig()
        
        # Adjust chunk size based on average function length
        if analysis['avg_function_length'] > 0:
            # Make chunks large enough to contain average function
            optimal_chunk_size = min(4000, int(analysis['avg_function_length'] * 1.5))
            config.chunking.max_size = optimal_chunk_size
        
        # Adjust streaming threshold based on project size
        if analysis['total_files'] > 1000:
            # Use streaming for smaller files in large projects
            config.streaming.threshold_bytes = 512 * 1024  # 512KB
        
        # Optimize for dominant language
        dominant_language = analysis['languages'].most_common(1)[0][0]
        if dominant_language == 'python':
            config.chunking.strategy = 'semantic'  # Use AST parsing
        elif dominant_language in ['markdown', 'text']:
            config.chunking.strategy = 'header'    # Use header-based
        
        return config

Error Handling & Fallbacks

Graceful Degradation

The system continues working even when components fail:

class RobustIndexer:
    """Indexer with comprehensive error handling and recovery."""
    
    def index_project_with_recovery(self, project_path: Path) -> Dict[str, Any]:
        """Index project with automatic error recovery."""
        results = {
            'files_processed': 0,
            'files_failed': 0,
            'chunks_created': 0,
            'errors': [],
            'fallbacks_used': []
        }
        
        try:
            # Primary indexing path
            return self._index_project_primary(project_path)
            
        except DatabaseCorruptionError as e:
            # Database corrupted - rebuild from scratch
            logger.warning(f"Database corruption detected: {e}")
            self._rebuild_database(project_path)
            results['fallbacks_used'].append('database_rebuild')
            return self._index_project_primary(project_path)
            
        except EmbeddingServiceError as e:
            # Embedding service failed - try fallback
            logger.warning(f"Primary embedding service failed: {e}")
            self.embedder.force_fallback_mode()
            results['fallbacks_used'].append('embedding_fallback')
            return self._index_project_primary(project_path)
            
        except InsufficientMemoryError as e:
            # Out of memory - switch to streaming mode
            logger.warning(f"Memory limit exceeded: {e}")
            self.config.streaming.enabled = True
            self.config.streaming.threshold_bytes = 100 * 1024  # 100KB
            results['fallbacks_used'].append('streaming_mode')
            return self._index_project_primary(project_path)
            
        except Exception as e:
            # Unknown error - attempt minimal indexing
            logger.error(f"Unexpected error during indexing: {e}")
            results['errors'].append(str(e))
            return self._index_project_minimal(project_path, results)
    
    def _index_project_minimal(self, project_path: Path, results: Dict) -> Dict:
        """Minimal indexing mode that processes files individually."""
        # Process files one by one with individual error handling
        for file_path in self._discover_files(project_path):
            try:
                chunks = self._process_single_file_safe(file_path)
                results['chunks_created'] += len(chunks)
                results['files_processed'] += 1
                
            except Exception as e:
                logger.debug(f"Failed to process {file_path}: {e}")
                results['files_failed'] += 1
                results['errors'].append(f"{file_path}: {e}")
        
        return results

Validation and Recovery

The system validates data integrity and can recover from corruption:

def validate_index_integrity(self, project_path: Path) -> bool:
    """Validate that the index is consistent and complete."""
    try:
        rag_dir = project_path / '.mini-rag'
        
        # Check required files exist
        required_files = ['manifest.json', 'database.lance']
        for filename in required_files:
            if not (rag_dir / filename).exists():
                raise IntegrityError(f"Missing required file: {filename}")
        
        # Validate manifest structure
        with open(rag_dir / 'manifest.json') as f:
            manifest = json.load(f)
            
        required_keys = ['file_count', 'chunk_count', 'indexed_at']
        for key in required_keys:
            if key not in manifest:
                raise IntegrityError(f"Missing manifest key: {key}")
        
        # Validate database accessibility
        db = lancedb.connect(rag_dir / 'database.lance')
        table = db.open_table('chunks')
        
        # Quick consistency check
        chunk_count_db = table.count_rows()
        chunk_count_manifest = manifest['chunk_count']
        
        if abs(chunk_count_db - chunk_count_manifest) > 0.1 * chunk_count_manifest:
            raise IntegrityError(f"Chunk count mismatch: DB={chunk_count_db}, Manifest={chunk_count_manifest}")
        
        return True
        
    except Exception as e:
        logger.error(f"Index integrity validation failed: {e}")
        return False

def repair_index(self, project_path: Path) -> bool:
    """Attempt to repair a corrupted index."""
    try:
        rag_dir = project_path / '.mini-rag'
        
        # Create backup of existing index
        backup_dir = rag_dir.parent / f'.mini-rag-backup-{int(time.time())}'
        shutil.copytree(rag_dir, backup_dir)
        
        # Attempt repair operations
        if (rag_dir / 'database.lance').exists():
            # Try to rebuild manifest from database
            db = lancedb.connect(rag_dir / 'database.lance')
            table = db.open_table('chunks')
            
            # Reconstruct manifest
            manifest = {
                'chunk_count': table.count_rows(),
                'file_count': len(set(table.to_pandas()['file_path'])),
                'indexed_at': datetime.now().isoformat(),
                'repaired_at': datetime.now().isoformat(),
                'backup_location': str(backup_dir)
            }
            
            with open(rag_dir / 'manifest.json', 'w') as f:
                json.dump(manifest, f, indent=2)
            
            logger.info(f"Index repaired successfully. Backup saved to {backup_dir}")
            return True
        else:
            # Database missing - need full rebuild
            logger.warning("Database missing - full rebuild required")
            return False
            
    except Exception as e:
        logger.error(f"Index repair failed: {e}")
        return False

LLM Model Selection & Performance

Model Recommendations by Use Case

FSS-Mini-RAG works well with various LLM sizes because our rich context and guided prompts help small models perform excellently:

Recommended (Best Balance):

  • qwen3:4b - Excellent quality, good performance
  • qwen3:4b:q8_0 - High-precision quantized version for production

Still Excellent (Faster/CPU-friendly):

  • qwen3:1.7b - Very good results, faster responses
  • qwen3:0.6b - Surprisingly good considering size (522MB)

Why Small Models Work Well Here

Small models can produce excellent results in RAG systems because:

  1. Rich Context: Our chunking provides substantial context around each match
  2. Guided Prompts: Well-structured prompts give models a clear "runway" to continue
  3. Specific Domain: Code analysis is more predictable than general conversation

Without good context, small models tend to get lost and produce erratic output. But with RAG's rich context and focused prompts, even the 0.6B model can provide meaningful analysis.

Quantization Benefits

For production deployments, consider quantized models like qwen3:4b:q8_0:

  • Q8_0: 8-bit quantization with minimal quality loss
  • Smaller memory footprint: ~50% reduction vs full precision
  • Better CPU performance: Faster inference on CPU-only systems
  • Production ready: Maintains analysis quality while improving efficiency

This technical guide provides the deep implementation details that developers need to understand, modify, and extend the system, while keeping the main README focused on getting users started quickly.