Add intelligent context window management for optimal RAG performance: ## Core Features - Dynamic context sizing based on model capabilities - User-friendly configuration menu with Development/Production/Advanced presets - Automatic validation against model limits (qwen3:0.6b/1.7b = 32K, qwen3:4b = 131K) - Educational content explaining context window importance for RAG ## Technical Implementation - Enhanced LLMConfig with context_window and auto_context parameters - Intelligent _get_optimal_context_size() method with model-specific limits - Consistent context application across synthesizer and explorer - YAML configuration output with helpful context explanations ## User Experience Improvements - Clear context window display in configuration status - Guided selection: Development (8K), Production (16K), Advanced (32K) - Memory usage estimates and performance guidance - Validation prevents invalid context/model combinations ## Educational Value - Explains why default 2048 tokens fails for RAG - Shows relationship between context size and conversation length - Guides users toward optimal settings for their use case - Highlights advanced capabilities (15+ results, 4000+ character chunks) This addresses the critical issue where Ollama's default context severely limits RAG performance, providing users with proper configuration tools and understanding of this crucial parameter.
2199 lines
95 KiB
Python
Executable File
2199 lines
95 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
FSS-Mini-RAG Text User Interface
|
|
Simple, educational TUI that shows CLI commands while providing easy interaction.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Optional, List, Dict, Any
|
|
|
|
# Simple TUI without external dependencies
|
|
class SimpleTUI:
|
|
def __init__(self):
|
|
self.project_path: Optional[Path] = None
|
|
self.current_config: Dict[str, Any] = {}
|
|
self.search_count = 0 # Track searches for sample reminder
|
|
self.config_dir = Path.home() / '.mini-rag-tui'
|
|
self.config_file = self.config_dir / 'last_project.json'
|
|
|
|
# Load last project on startup
|
|
self._load_last_project()
|
|
|
|
def _load_last_project(self):
|
|
"""Load the last used project from config file, or auto-detect current directory."""
|
|
# First check if current directory has .mini-rag folder (auto-detect)
|
|
current_dir = Path.cwd()
|
|
if (current_dir / '.mini-rag').exists():
|
|
self.project_path = current_dir
|
|
# Save this as the last project too
|
|
self._save_last_project()
|
|
return
|
|
|
|
# If no auto-detection, try loading from config file
|
|
try:
|
|
if hasattr(self, 'config_file') and self.config_file.exists():
|
|
with open(self.config_file, 'r') as f:
|
|
data = json.load(f)
|
|
project_path = Path(data.get('last_project', ''))
|
|
if project_path.exists() and project_path.is_dir():
|
|
self.project_path = project_path
|
|
except Exception:
|
|
# If loading fails, just continue without last project
|
|
pass
|
|
|
|
def _save_last_project(self):
|
|
"""Save current project as last used."""
|
|
if not self.project_path:
|
|
return
|
|
try:
|
|
self.config_dir.mkdir(exist_ok=True)
|
|
data = {'last_project': str(self.project_path)}
|
|
with open(self.config_file, 'w') as f:
|
|
json.dump(data, f)
|
|
except Exception:
|
|
# If saving fails, just continue
|
|
pass
|
|
|
|
def _get_llm_status(self):
|
|
"""Get LLM status for display in main menu."""
|
|
try:
|
|
# Import here to avoid startup delays
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from mini_rag.llm_synthesizer import LLMSynthesizer
|
|
from mini_rag.config import RAGConfig, ConfigManager
|
|
|
|
# Load config for model rankings
|
|
if self.project_path:
|
|
config_manager = ConfigManager(self.project_path)
|
|
config = config_manager.load_config()
|
|
else:
|
|
config = RAGConfig()
|
|
|
|
synthesizer = LLMSynthesizer(config=config)
|
|
if synthesizer.is_available():
|
|
# Get the model that would be selected
|
|
synthesizer._ensure_initialized()
|
|
model = synthesizer.model
|
|
return "✅ Ready", model
|
|
else:
|
|
return "❌ Ollama not running", None
|
|
except Exception as e:
|
|
return f"❌ Error: {str(e)[:20]}...", None
|
|
|
|
def clear_screen(self):
|
|
"""Clear the terminal screen."""
|
|
os.system('cls' if os.name == 'nt' else 'clear')
|
|
|
|
def print_header(self):
|
|
"""Print the main header."""
|
|
print("+====================================================+")
|
|
print("| FSS-Mini-RAG TUI |")
|
|
print("| Semantic Code Search Interface |")
|
|
print("+====================================================+")
|
|
print()
|
|
|
|
def print_cli_command(self, command: str, description: str = ""):
|
|
"""Show the equivalent CLI command."""
|
|
print(f"💻 CLI equivalent: {command}")
|
|
if description:
|
|
print(f" {description}")
|
|
print()
|
|
|
|
def get_input(self, prompt: str, default: str = "") -> str:
|
|
"""Get user input with optional default."""
|
|
if default:
|
|
full_prompt = f"{prompt} [{default}]: "
|
|
else:
|
|
full_prompt = f"{prompt}: "
|
|
|
|
try:
|
|
result = input(full_prompt).strip()
|
|
return result if result else default
|
|
except (KeyboardInterrupt, EOFError):
|
|
print("\nGoodbye!")
|
|
sys.exit(0)
|
|
|
|
def show_menu(self, title: str, options: List[str], show_cli: bool = True, back_option: str = None) -> int:
|
|
"""Show a menu and get user selection."""
|
|
print(f"🎯 {title}")
|
|
print("=" * (len(title) + 3))
|
|
print()
|
|
|
|
for i, option in enumerate(options, 1):
|
|
print(f"{i}. {option}")
|
|
|
|
# Add back/exit option
|
|
if back_option:
|
|
print(f"0. {back_option}")
|
|
|
|
if show_cli:
|
|
print()
|
|
print("💡 All these actions can be done via CLI commands")
|
|
print(" You'll see the commands as you use this interface!")
|
|
|
|
print()
|
|
while True:
|
|
try:
|
|
choice = int(input("Select option (number): "))
|
|
if choice == 0 and back_option:
|
|
return -1 # Special value for back/exit
|
|
elif 1 <= choice <= len(options):
|
|
return choice - 1
|
|
else:
|
|
valid_range = "0-" + str(len(options)) if back_option else "1-" + str(len(options))
|
|
print(f"Please enter a number between {valid_range}")
|
|
except ValueError:
|
|
print("Please enter a valid number")
|
|
except (KeyboardInterrupt, EOFError):
|
|
print("\nGoodbye!")
|
|
sys.exit(0)
|
|
|
|
def select_project(self):
|
|
"""Select or create project directory."""
|
|
self.clear_screen()
|
|
self.print_header()
|
|
|
|
print("📁 Project Selection")
|
|
print("==================")
|
|
print()
|
|
|
|
# Show current project if any
|
|
if self.project_path:
|
|
print(f"Current project: {self.project_path}")
|
|
print()
|
|
|
|
print("💡 New to FSS-Mini-RAG? Select 'Use current directory' to")
|
|
print(" explore this RAG system's own codebase as your first demo!")
|
|
print()
|
|
|
|
# If we already have a project, show it prominently and offer quick actions
|
|
if self.project_path:
|
|
rag_dir = self.project_path / '.mini-rag'
|
|
is_indexed = rag_dir.exists()
|
|
status_text = "Ready for search ✅" if is_indexed else "Needs indexing ❌"
|
|
|
|
print(f"Current: {self.project_path.name} ({status_text})")
|
|
print()
|
|
|
|
options = [
|
|
"Keep current project (go back to main menu)",
|
|
"Use current directory (this folder)",
|
|
"Enter different project path",
|
|
"Browse recent projects"
|
|
]
|
|
else:
|
|
options = [
|
|
"Use current directory (perfect for beginners - try the RAG codebase!)",
|
|
"Enter project path (if you have a specific project)",
|
|
"Browse recent projects"
|
|
]
|
|
|
|
choice = self.show_menu("Choose project directory", options, show_cli=False, back_option="Back to main menu")
|
|
|
|
if choice == -1: # Back to main menu
|
|
return
|
|
|
|
# Handle different choice patterns based on whether we have a project
|
|
if self.project_path:
|
|
if choice == 0:
|
|
# Keep current project - just go back
|
|
return
|
|
elif choice == 1:
|
|
# Use current directory
|
|
self.project_path = Path.cwd()
|
|
print(f"✅ Using current directory: {self.project_path}")
|
|
self._save_last_project()
|
|
elif choice == 2:
|
|
# Enter different project path
|
|
self._enter_project_path()
|
|
elif choice == 3:
|
|
# Browse recent projects
|
|
self.browse_recent_projects()
|
|
else:
|
|
if choice == 0:
|
|
# Use current directory
|
|
self.project_path = Path.cwd()
|
|
print(f"✅ Using current directory: {self.project_path}")
|
|
self._save_last_project()
|
|
elif choice == 1:
|
|
# Enter project path
|
|
self._enter_project_path()
|
|
elif choice == 2:
|
|
# Browse recent projects
|
|
self.browse_recent_projects()
|
|
|
|
input("\nPress Enter to continue...")
|
|
|
|
def _enter_project_path(self):
|
|
"""Helper method to handle manual project path entry."""
|
|
while True:
|
|
path_str = self.get_input("Enter project directory path",
|
|
str(self.project_path) if self.project_path else "")
|
|
|
|
if not path_str:
|
|
continue
|
|
|
|
project_path = Path(path_str).expanduser().resolve()
|
|
|
|
if project_path.exists() and project_path.is_dir():
|
|
self.project_path = project_path
|
|
print(f"✅ Selected: {self.project_path}")
|
|
self._save_last_project()
|
|
break
|
|
else:
|
|
print(f"❌ Directory not found: {project_path}")
|
|
retry = input("Try again? (y/N): ").lower()
|
|
if retry != 'y':
|
|
break
|
|
|
|
def browse_recent_projects(self):
|
|
"""Browse recently indexed projects."""
|
|
print("🕒 Recent Projects")
|
|
print("=================")
|
|
print()
|
|
|
|
# Look for .mini-rag directories in common locations
|
|
search_paths = [
|
|
Path.home(),
|
|
Path.home() / "projects",
|
|
Path.home() / "code",
|
|
Path.home() / "dev",
|
|
Path.cwd().parent,
|
|
Path.cwd()
|
|
]
|
|
|
|
recent_projects = []
|
|
for search_path in search_paths:
|
|
if search_path.exists() and search_path.is_dir():
|
|
try:
|
|
for item in search_path.iterdir():
|
|
if item.is_dir():
|
|
rag_dir = item / '.mini-rag'
|
|
if rag_dir.exists():
|
|
recent_projects.append(item)
|
|
except (PermissionError, OSError):
|
|
continue
|
|
|
|
# Remove duplicates and sort by modification time
|
|
recent_projects = list(set(recent_projects))
|
|
try:
|
|
recent_projects.sort(key=lambda p: (p / '.mini-rag').stat().st_mtime, reverse=True)
|
|
except:
|
|
pass
|
|
|
|
if not recent_projects:
|
|
print("❌ No recently indexed projects found")
|
|
print(" Projects with .mini-rag directories will appear here")
|
|
return
|
|
|
|
print("Found indexed projects:")
|
|
for i, project in enumerate(recent_projects[:10], 1): # Show up to 10
|
|
try:
|
|
manifest = project / '.mini-rag' / 'manifest.json'
|
|
if manifest.exists():
|
|
with open(manifest) as f:
|
|
data = json.load(f)
|
|
file_count = data.get('file_count', 0)
|
|
indexed_at = data.get('indexed_at', 'Unknown')
|
|
print(f"{i}. {project.name} ({file_count} files, {indexed_at})")
|
|
else:
|
|
print(f"{i}. {project.name} (incomplete index)")
|
|
except:
|
|
print(f"{i}. {project.name} (index status unknown)")
|
|
|
|
print()
|
|
try:
|
|
choice = int(input("Select project number (or 0 to cancel): "))
|
|
if 1 <= choice <= len(recent_projects):
|
|
self.project_path = recent_projects[choice - 1]
|
|
print(f"✅ Selected: {self.project_path}")
|
|
self._save_last_project()
|
|
except (ValueError, IndexError):
|
|
print("Selection cancelled")
|
|
|
|
def index_project_interactive(self):
|
|
"""Interactive project indexing."""
|
|
if not self.project_path:
|
|
print("❌ No project selected")
|
|
input("Press Enter to continue...")
|
|
return
|
|
|
|
self.clear_screen()
|
|
self.print_header()
|
|
|
|
print("🚀 Project Indexing")
|
|
print("==================")
|
|
print()
|
|
print(f"Project: {self.project_path}")
|
|
print()
|
|
|
|
# Check if already indexed
|
|
rag_dir = self.project_path / '.mini-rag'
|
|
if rag_dir.exists():
|
|
force = self._show_existing_index_info(rag_dir)
|
|
else:
|
|
force = False
|
|
|
|
# Show CLI command
|
|
cli_cmd = f"./rag-mini index {self.project_path}"
|
|
if force:
|
|
cli_cmd += " --force"
|
|
|
|
self.print_cli_command(cli_cmd, "Index project for semantic search")
|
|
|
|
# Import here to avoid startup delays
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from mini_rag.indexer import ProjectIndexer
|
|
|
|
# Get file count and show preview before starting
|
|
print("🔍 Analyzing project structure...")
|
|
print("=" * 50)
|
|
|
|
try:
|
|
indexer = ProjectIndexer(self.project_path)
|
|
|
|
# Get files that would be indexed
|
|
files_to_index = indexer._get_files_to_index()
|
|
total_files = len(files_to_index)
|
|
|
|
if total_files == 0:
|
|
print("✅ All files are already up to date!")
|
|
print(" No indexing needed.")
|
|
input("\nPress Enter to continue...")
|
|
return
|
|
|
|
# Show file analysis
|
|
print(f"📊 Indexing Analysis:")
|
|
print(f" Files to process: {total_files}")
|
|
|
|
# Analyze file types
|
|
file_types = {}
|
|
total_size = 0
|
|
for file_path in files_to_index:
|
|
ext = file_path.suffix.lower() or 'no extension'
|
|
file_types[ext] = file_types.get(ext, 0) + 1
|
|
try:
|
|
total_size += file_path.stat().st_size
|
|
except:
|
|
pass
|
|
|
|
# Show breakdown
|
|
print(f" Total size: {total_size / (1024*1024):.1f}MB")
|
|
print(f" File types:")
|
|
for ext, count in sorted(file_types.items(), key=lambda x: x[1], reverse=True):
|
|
print(f" • {ext}: {count} files")
|
|
|
|
# Conservative time estimate for average hardware
|
|
estimated_time = self._estimate_processing_time(total_files, total_size)
|
|
print(f" Estimated time: {estimated_time}")
|
|
|
|
print()
|
|
print("💡 What indexing does:")
|
|
print(" • Reads and analyzes each file's content (READ-ONLY)")
|
|
print(" • Breaks content into semantic chunks")
|
|
print(" • Generates embeddings for semantic search")
|
|
print(" • Stores everything in a separate .mini-rag/ database")
|
|
print()
|
|
print("🛡️ SAFETY GUARANTEE:")
|
|
print(" • Your original files are NEVER modified or touched")
|
|
print(" • Only reads files to create the search index")
|
|
print(" • All data stored separately in .mini-rag/ folder")
|
|
print(" • You can delete the .mini-rag/ folder anytime to remove all traces")
|
|
print()
|
|
|
|
# Confirmation
|
|
confirm = input("🚀 Proceed with indexing? [Y/n]: ").strip().lower()
|
|
if confirm and confirm != 'y' and confirm != 'yes':
|
|
print("Indexing cancelled.")
|
|
input("Press Enter to continue...")
|
|
return
|
|
|
|
print("\n🚀 Starting indexing...")
|
|
print("=" * 50)
|
|
|
|
# Actually run the indexing
|
|
result = indexer.index_project(force_reindex=force)
|
|
|
|
print()
|
|
print("🎉 INDEXING COMPLETE!")
|
|
print("=" * 50)
|
|
|
|
# Comprehensive performance summary
|
|
files_processed = result.get('files_indexed', 0)
|
|
chunks_created = result.get('chunks_created', 0)
|
|
time_taken = result.get('time_taken', 0)
|
|
files_failed = result.get('files_failed', 0)
|
|
files_per_second = result.get('files_per_second', 0)
|
|
|
|
print(f"📊 PROCESSING SUMMARY:")
|
|
print(f" ✅ Files successfully processed: {files_processed:,}")
|
|
print(f" 🧩 Semantic chunks created: {chunks_created:,}")
|
|
print(f" ⏱️ Total processing time: {time_taken:.2f} seconds")
|
|
print(f" 🚀 Processing speed: {files_per_second:.1f} files/second")
|
|
|
|
if files_failed > 0:
|
|
print(f" ⚠️ Files with issues: {files_failed}")
|
|
|
|
# Show what we analyzed
|
|
if chunks_created > 0:
|
|
avg_chunks_per_file = chunks_created / max(files_processed, 1)
|
|
print()
|
|
print(f"🔍 CONTENT ANALYSIS:")
|
|
print(f" • Average chunks per file: {avg_chunks_per_file:.1f}")
|
|
print(f" • Semantic boundaries detected and preserved")
|
|
print(f" • Function and class contexts captured")
|
|
print(f" • Documentation and code comments indexed")
|
|
|
|
# Try to show embedding info
|
|
try:
|
|
embedder = indexer.embedder
|
|
embed_info = embedder.get_embedding_info()
|
|
print(f" • Embedding method: {embed_info.get('method', 'Unknown')}")
|
|
print(f" • Vector dimensions: {embedder.get_embedding_dim()}")
|
|
except:
|
|
pass
|
|
|
|
# Database info
|
|
print()
|
|
print(f"💾 DATABASE CREATED:")
|
|
print(f" • Location: {self.project_path}/.mini-rag/")
|
|
print(f" • Vector database with {chunks_created:,} searchable chunks")
|
|
print(f" • Optimized for fast semantic similarity search")
|
|
print(f" • Supports natural language queries")
|
|
|
|
# Performance metrics
|
|
if time_taken > 0:
|
|
print()
|
|
print(f"⚡ PERFORMANCE METRICS:")
|
|
chunks_per_second = chunks_created / time_taken if time_taken > 0 else 0
|
|
print(f" • {chunks_per_second:.0f} chunks processed per second")
|
|
|
|
# Estimate search performance
|
|
estimated_search_time = max(0.1, chunks_created / 10000) # Very rough estimate
|
|
print(f" • Estimated search time: ~{estimated_search_time:.1f}s per query")
|
|
|
|
if total_size > 0:
|
|
mb_per_second = (total_size / (1024*1024)) / time_taken
|
|
print(f" • Data processing rate: {mb_per_second:.1f} MB/second")
|
|
|
|
# What's next
|
|
print()
|
|
print(f"🎯 READY FOR SEARCH!")
|
|
print(f" Your codebase is now fully indexed and searchable.")
|
|
print(f" Try queries like:")
|
|
print(f" • 'authentication logic'")
|
|
print(f" • 'error handling patterns'")
|
|
print(f" • 'database connection setup'")
|
|
print(f" • 'unit tests for validation'")
|
|
|
|
if files_failed > 0:
|
|
print()
|
|
print(f"📋 NOTES:")
|
|
print(f" • {files_failed} files couldn't be processed (binary files, encoding issues, etc.)")
|
|
print(f" • This is normal - only text-based files are indexed")
|
|
print(f" • All processable content has been successfully indexed")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Indexing failed: {e}")
|
|
print(" Try running the CLI command directly for more details")
|
|
|
|
print()
|
|
input("Press Enter to continue...")
|
|
|
|
def _show_existing_index_info(self, rag_dir: Path) -> bool:
|
|
"""Show essential info about existing index and ask about re-indexing."""
|
|
print("📊 EXISTING INDEX FOUND")
|
|
print("=" * 50)
|
|
print()
|
|
print("🛡️ Your original files are safe and unmodified.")
|
|
print()
|
|
|
|
try:
|
|
manifest_path = rag_dir / 'manifest.json'
|
|
if manifest_path.exists():
|
|
import json
|
|
from datetime import datetime
|
|
|
|
with open(manifest_path, 'r') as f:
|
|
manifest = json.load(f)
|
|
|
|
file_count = manifest.get('file_count', 0)
|
|
chunk_count = manifest.get('chunk_count', 0)
|
|
indexed_at = manifest.get('indexed_at', 'Unknown')
|
|
|
|
print(f"• Files indexed: {file_count:,}")
|
|
print(f"• Chunks created: {chunk_count:,}")
|
|
|
|
# Show when it was last indexed
|
|
if indexed_at != 'Unknown':
|
|
try:
|
|
dt = datetime.fromisoformat(indexed_at.replace('Z', '+00:00'))
|
|
time_ago = datetime.now() - dt.replace(tzinfo=None)
|
|
|
|
if time_ago.days > 0:
|
|
age_str = f"{time_ago.days} day(s) ago"
|
|
elif time_ago.seconds > 3600:
|
|
age_str = f"{time_ago.seconds // 3600} hour(s) ago"
|
|
else:
|
|
age_str = f"{time_ago.seconds // 60} minute(s) ago"
|
|
|
|
print(f"• Last indexed: {age_str}")
|
|
except:
|
|
print(f"• Last indexed: {indexed_at}")
|
|
else:
|
|
print("• Last indexed: Unknown")
|
|
|
|
# Simple recommendation
|
|
if time_ago.days >= 7:
|
|
print(f"\n💡 RECOMMEND: Re-index (index is {time_ago.days} days old)")
|
|
elif time_ago.days >= 1:
|
|
print(f"\n💡 MAYBE: Re-index if you've made changes ({time_ago.days} day(s) old)")
|
|
else:
|
|
print(f"\n💡 RECOMMEND: Skip (index is recent)")
|
|
|
|
estimate = self._estimate_processing_time(file_count, 0)
|
|
print(f"• Re-indexing would take: {estimate}")
|
|
|
|
else:
|
|
print("⚠️ Index corrupted - recommend re-indexing")
|
|
|
|
except Exception:
|
|
print("⚠️ Could not read index info - recommend re-indexing")
|
|
|
|
print()
|
|
choice = input("🚀 Re-index everything? [y/N]: ").strip().lower()
|
|
return choice in ['y', 'yes']
|
|
|
|
def _estimate_processing_time(self, file_count: int, total_size_bytes: int) -> str:
|
|
"""Conservative time estimates for average hardware (not high-end dev machines)."""
|
|
# Conservative: 2 seconds per file for average hardware (4x buffer from fast machines)
|
|
estimated_seconds = file_count * 2.0 + 15 # +15s startup overhead
|
|
|
|
if estimated_seconds < 60:
|
|
return "1-2 minutes"
|
|
elif estimated_seconds < 300: # 5 minutes
|
|
minutes = int(estimated_seconds / 60)
|
|
return f"{minutes}-{minutes + 1} minutes"
|
|
else:
|
|
minutes = int(estimated_seconds / 60)
|
|
return f"{minutes}+ minutes"
|
|
def search_interactive(self):
|
|
"""Interactive search interface."""
|
|
if not self.project_path:
|
|
print("❌ No project selected")
|
|
input("Press Enter to continue...")
|
|
return
|
|
|
|
# Check if indexed
|
|
rag_dir = self.project_path / '.mini-rag'
|
|
if not rag_dir.exists():
|
|
print(f"❌ Project not indexed: {self.project_path.name}")
|
|
print(" Index the project first!")
|
|
input("Press Enter to continue...")
|
|
return
|
|
|
|
self.clear_screen()
|
|
self.print_header()
|
|
|
|
print("🔍 Semantic Search")
|
|
print("=================")
|
|
print()
|
|
print(f"Project: {self.project_path.name}")
|
|
print()
|
|
|
|
# More prominent search input
|
|
print("🎯 ENTER YOUR SEARCH QUERY:")
|
|
print(" Ask any question about your codebase using natural language")
|
|
print(" Examples: 'chunking strategy', 'ollama integration', 'embedding generation'")
|
|
print()
|
|
|
|
# Primary input - direct query entry
|
|
query = self.get_input("Search query", "").strip()
|
|
|
|
# If they didn't enter anything, show sample options
|
|
if not query:
|
|
print()
|
|
print("💡 Need inspiration? Try one of these sample queries:")
|
|
print()
|
|
|
|
sample_questions = [
|
|
"chunking strategy",
|
|
"ollama integration",
|
|
"indexing performance",
|
|
"why does indexing take long",
|
|
"how to improve search results",
|
|
"embedding generation"
|
|
]
|
|
|
|
for i, question in enumerate(sample_questions[:3], 1):
|
|
print(f" {i}. {question}")
|
|
print()
|
|
|
|
choice_str = self.get_input("Select a sample query (1-3) or press Enter to go back", "")
|
|
|
|
if choice_str.isdigit():
|
|
choice = int(choice_str)
|
|
if 1 <= choice <= 3:
|
|
query = sample_questions[choice - 1]
|
|
print(f"✅ Using: '{query}'")
|
|
print()
|
|
|
|
# If still no query, return to menu
|
|
if not query:
|
|
return
|
|
|
|
# Use a sensible default for results to streamline UX
|
|
top_k = 10 # Good default, advanced users can use CLI for more options
|
|
|
|
# Show CLI command
|
|
cli_cmd = f"./rag-mini search {self.project_path} \"{query}\""
|
|
if top_k != 10:
|
|
cli_cmd += f" --top-k {top_k}"
|
|
|
|
self.print_cli_command(cli_cmd, "Search for semantic matches")
|
|
|
|
print("Searching...")
|
|
print("=" * 50)
|
|
|
|
# Actually run the search
|
|
try:
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from mini_rag.search import CodeSearcher
|
|
|
|
searcher = CodeSearcher(self.project_path)
|
|
# Enable query expansion in TUI for better results
|
|
searcher.config.search.expand_queries = True
|
|
results = searcher.search(query, top_k=top_k)
|
|
|
|
if not results:
|
|
print("❌ No results found")
|
|
print()
|
|
print("💡 Try:")
|
|
print(" • Broader search terms")
|
|
print(" • Different keywords")
|
|
print(" • Concepts instead of exact names")
|
|
else:
|
|
print(f"✅ Found {len(results)} results:")
|
|
print()
|
|
|
|
for i, result in enumerate(results, 1):
|
|
# Add divider and whitespace before each result (except first)
|
|
if i > 1:
|
|
print()
|
|
print("-" * 60)
|
|
print()
|
|
|
|
# Clean up file path
|
|
try:
|
|
if hasattr(result.file_path, 'relative_to'):
|
|
rel_path = result.file_path.relative_to(self.project_path)
|
|
else:
|
|
rel_path = Path(result.file_path).relative_to(self.project_path)
|
|
except:
|
|
rel_path = result.file_path
|
|
|
|
print(f"{i}. {rel_path}")
|
|
print(f" Relevance: {result.score:.3f}")
|
|
|
|
# Show line information if available
|
|
if hasattr(result, 'start_line') and result.start_line:
|
|
print(f" Lines: {result.start_line}-{result.end_line}")
|
|
|
|
# Show function/class context if available
|
|
if hasattr(result, 'name') and result.name:
|
|
print(f" Context: {result.name}")
|
|
|
|
# Show full content with proper formatting
|
|
content_lines = result.content.strip().split('\n')
|
|
print(f" Content:")
|
|
for line_num, line in enumerate(content_lines[:8], 1): # Show up to 8 lines
|
|
print(f" {line}")
|
|
|
|
if len(content_lines) > 8:
|
|
print(f" ... ({len(content_lines) - 8} more lines)")
|
|
|
|
print()
|
|
|
|
# Offer to view full results
|
|
if len(results) > 1:
|
|
print("💡 To see more context or specific results:")
|
|
print(f" Run: ./rag-mini search {self.project_path} \"{query}\" --verbose")
|
|
|
|
# Suggest follow-up questions based on the search
|
|
print()
|
|
print("🔍 Suggested follow-up searches:")
|
|
follow_up_questions = self.generate_follow_up_questions(query, results)
|
|
for i, question in enumerate(follow_up_questions, 1):
|
|
print(f" {i}. {question}")
|
|
|
|
# Show additional CLI commands
|
|
print()
|
|
print("💻 CLI Commands:")
|
|
print(f" ./rag-mini search {self.project_path} \"{query}\" --top-k 20 # More results")
|
|
print(f" ./rag-mini explore {self.project_path} # Interactive mode")
|
|
print(f" ./rag-mini search {self.project_path} \"{query}\" --synthesize # With AI summary")
|
|
|
|
# Ask if they want to run a follow-up search
|
|
print()
|
|
choice = input("Run a follow-up search? Enter number (1-3) or press Enter to continue: ").strip()
|
|
if choice.isdigit() and 1 <= int(choice) <= len(follow_up_questions):
|
|
# Recursive search with the follow-up question
|
|
follow_up_query = follow_up_questions[int(choice) - 1]
|
|
print(f"\nSearching for: '{follow_up_query}'")
|
|
print("=" * 50)
|
|
# Run another search
|
|
follow_results = searcher.search(follow_up_query, top_k=5)
|
|
|
|
if follow_results:
|
|
print(f"✅ Found {len(follow_results)} follow-up results:")
|
|
print()
|
|
for i, result in enumerate(follow_results[:3], 1): # Show top 3
|
|
# Add divider for follow-up results too
|
|
if i > 1:
|
|
print()
|
|
print("-" * 40)
|
|
print()
|
|
|
|
try:
|
|
if hasattr(result.file_path, 'relative_to'):
|
|
rel_path = result.file_path.relative_to(self.project_path)
|
|
else:
|
|
rel_path = Path(result.file_path).relative_to(self.project_path)
|
|
except:
|
|
rel_path = result.file_path
|
|
print(f"{i}. {rel_path} (Score: {result.score:.3f})")
|
|
print(f" {result.content.strip()[:100]}...")
|
|
print()
|
|
else:
|
|
print("❌ No follow-up results found")
|
|
|
|
# Track searches and show sample reminder
|
|
self.search_count += 1
|
|
|
|
# Show sample reminder after 2 searches
|
|
if self.search_count >= 2 and self.project_path.name == '.sample_test':
|
|
print()
|
|
print("⚠️ Sample Limitation Notice")
|
|
print("=" * 30)
|
|
print("You've been searching a small sample project.")
|
|
print("For full exploration of your codebase, you need to index the complete project.")
|
|
print()
|
|
|
|
# Show timing estimate if available
|
|
try:
|
|
with open('/tmp/fss-rag-sample-time.txt', 'r') as f:
|
|
sample_time = int(f.read().strip())
|
|
# Rough estimate: multiply by file count ratio
|
|
estimated_time = sample_time * 20 # Rough multiplier
|
|
print(f"🕒 Estimated full indexing time: ~{estimated_time} seconds")
|
|
except:
|
|
print("🕒 Estimated full indexing time: 1-3 minutes for typical projects")
|
|
|
|
print()
|
|
choice = input("Index the full project now? [y/N]: ").strip().lower()
|
|
if choice == 'y':
|
|
# Switch to full project and index
|
|
parent_dir = self.project_path.parent
|
|
self.project_path = parent_dir
|
|
print(f"\nSwitching to full project: {parent_dir}")
|
|
print("Starting full indexing...")
|
|
# Note: This would trigger full indexing in real implementation
|
|
|
|
except Exception as e:
|
|
print(f"❌ Search failed: {e}")
|
|
print()
|
|
print("💡 Try these CLI commands for more details:")
|
|
print(f" ./rag-mini search {self.project_path} \"{query}\" --verbose")
|
|
print(f" ./rag-mini status {self.project_path}")
|
|
print(" ./rag-mini --help")
|
|
print()
|
|
print("🔧 Common solutions:")
|
|
print(" • Make sure the project is indexed first")
|
|
print(" • Check if Ollama is running: ollama serve")
|
|
print(" • Try a simpler search query")
|
|
|
|
print()
|
|
input("Press Enter to continue...")
|
|
|
|
def generate_follow_up_questions(self, original_query: str, results) -> List[str]:
|
|
"""Generate contextual follow-up questions based on search results."""
|
|
# Simple pattern-based follow-up generation
|
|
follow_ups = []
|
|
|
|
# Based on original query patterns
|
|
query_lower = original_query.lower()
|
|
|
|
# FSS-Mini-RAG specific follow-ups
|
|
if "chunk" in query_lower:
|
|
follow_ups.extend(["chunk size optimization", "smart chunking boundaries", "chunk overlap strategies"])
|
|
elif "ollama" in query_lower:
|
|
follow_ups.extend(["embedding model comparison", "ollama server setup", "nomic-embed-text performance"])
|
|
elif "index" in query_lower or "performance" in query_lower:
|
|
follow_ups.extend(["indexing speed optimization", "memory usage during indexing", "file processing pipeline"])
|
|
elif "search" in query_lower or "result" in query_lower:
|
|
follow_ups.extend(["search result ranking", "semantic vs keyword search", "query expansion techniques"])
|
|
elif "embed" in query_lower:
|
|
follow_ups.extend(["vector embedding storage", "embedding model fallbacks", "similarity scoring"])
|
|
else:
|
|
# Generic RAG-related follow-ups
|
|
follow_ups.extend(["vector database internals", "search quality tuning", "embedding optimization"])
|
|
|
|
# Based on file types found in results (FSS-Mini-RAG specific)
|
|
if results:
|
|
file_extensions = set()
|
|
for result in results[:3]: # Check first 3 results
|
|
try:
|
|
# Handle both Path objects and strings
|
|
if hasattr(result.file_path, 'suffix'):
|
|
ext = result.file_path.suffix.lower()
|
|
else:
|
|
ext = Path(result.file_path).suffix.lower()
|
|
file_extensions.add(ext)
|
|
except:
|
|
continue # Skip if we can't get extension
|
|
|
|
if '.py' in file_extensions:
|
|
follow_ups.append("Python module dependencies")
|
|
if '.md' in file_extensions:
|
|
follow_ups.append("documentation implementation")
|
|
if 'chunker' in str(results[0].file_path).lower():
|
|
follow_ups.append("chunking algorithm details")
|
|
if 'search' in str(results[0].file_path).lower():
|
|
follow_ups.append("search algorithm implementation")
|
|
|
|
# Return top 3 unique follow-ups
|
|
return list(dict.fromkeys(follow_ups))[:3]
|
|
|
|
def explore_interactive(self):
|
|
"""Interactive exploration interface with thinking mode."""
|
|
if not self.project_path:
|
|
print("❌ No project selected")
|
|
input("Press Enter to continue...")
|
|
return
|
|
|
|
# Check if indexed
|
|
rag_dir = self.project_path / '.mini-rag'
|
|
if not rag_dir.exists():
|
|
print(f"❌ Project not indexed: {self.project_path.name}")
|
|
print(" Index the project first!")
|
|
input("Press Enter to continue...")
|
|
return
|
|
|
|
self.clear_screen()
|
|
self.print_header()
|
|
|
|
print("🧠 Interactive Exploration Mode")
|
|
print("==============================")
|
|
print()
|
|
print(f"Project: {self.project_path.name}")
|
|
print()
|
|
print("💡 This mode enables:")
|
|
print(" • Thinking-enabled LLM for detailed reasoning")
|
|
print(" • Conversation memory across questions")
|
|
print(" • Perfect for learning and debugging")
|
|
print()
|
|
|
|
# Show CLI command
|
|
cli_cmd = f"./rag-mini explore {self.project_path}"
|
|
self.print_cli_command(cli_cmd, "Start interactive exploration session")
|
|
|
|
print("Starting exploration mode...")
|
|
print("=" * 50)
|
|
|
|
# Launch exploration mode
|
|
try:
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from mini_rag.explorer import CodeExplorer
|
|
|
|
explorer = CodeExplorer(self.project_path)
|
|
|
|
if not explorer.start_exploration_session():
|
|
print("❌ Could not start exploration mode")
|
|
print(" Make sure Ollama is running with a model installed")
|
|
input("Press Enter to continue...")
|
|
return
|
|
|
|
# Show initial prompt
|
|
self._show_exploration_prompt(explorer, is_first=True)
|
|
|
|
is_first_question = True
|
|
while True:
|
|
try:
|
|
question = input("➤ ").strip()
|
|
|
|
# Handle numbered options
|
|
if question == '0':
|
|
print(explorer.end_session())
|
|
break
|
|
elif question == '1':
|
|
# Use improved summary function
|
|
summary = self._generate_conversation_summary(explorer)
|
|
print(f"\n{summary}")
|
|
self._show_exploration_prompt(explorer)
|
|
continue
|
|
elif question == '2':
|
|
if hasattr(explorer.current_session, 'conversation_history') and explorer.current_session.conversation_history:
|
|
print("\n📋 Recent Question History:")
|
|
print("═" * 40)
|
|
for i, exchange in enumerate(explorer.current_session.conversation_history[-5:], 1):
|
|
q = exchange["question"][:60] + "..." if len(exchange["question"]) > 60 else exchange["question"]
|
|
confidence = exchange["response"].get("confidence", 0)
|
|
print(f" {i}. {q} (confidence: {confidence:.0f}%)")
|
|
print()
|
|
else:
|
|
print("\n📝 No questions asked yet")
|
|
self._show_exploration_prompt(explorer)
|
|
continue
|
|
elif question == '3':
|
|
# Generate smart suggestion
|
|
suggested_question = self._generate_smart_suggestion(explorer)
|
|
if suggested_question:
|
|
print(f"\n💡 Suggested question: {suggested_question}")
|
|
print(" Press Enter to use this, or type your own question:")
|
|
next_input = input("➤ ").strip()
|
|
if not next_input: # User pressed Enter to use suggestion
|
|
question = suggested_question
|
|
else:
|
|
question = next_input
|
|
else:
|
|
print("\n💡 No suggestions available yet. Ask a question first!")
|
|
self._show_exploration_prompt(explorer)
|
|
continue
|
|
|
|
# Simple exit handling
|
|
if question.lower() in ['quit', 'exit', 'q', 'back']:
|
|
print(explorer.end_session())
|
|
break
|
|
|
|
# Skip empty input
|
|
if not question:
|
|
print("💡 Please enter a question or choose an option (0-3)")
|
|
continue
|
|
|
|
# Simple help
|
|
if question.lower() in ['help', 'h', '?']:
|
|
print("\n💡 Exploration Help:")
|
|
print(" • Just ask any question about the codebase!")
|
|
print(" • Examples: 'how does search work?' or 'explain the indexing'")
|
|
print(" • Use options 0-3 for quick actions")
|
|
self._show_exploration_prompt(explorer)
|
|
continue
|
|
|
|
# Process the question with streaming
|
|
print("\n🔍 Starting analysis...")
|
|
response = explorer.explore_question(question)
|
|
|
|
if response:
|
|
print(f"\n{response}")
|
|
is_first_question = False
|
|
# Show prompt for next question
|
|
self._show_exploration_prompt(explorer)
|
|
else:
|
|
print("❌ Sorry, I couldn't process that question.")
|
|
print("💡 Try rephrasing or using simpler terms.")
|
|
self._show_exploration_prompt(explorer)
|
|
|
|
except KeyboardInterrupt:
|
|
print(f"\n{explorer.end_session()}")
|
|
break
|
|
except EOFError:
|
|
print(f"\n{explorer.end_session()}")
|
|
break
|
|
|
|
except Exception as e:
|
|
print(f"❌ Exploration mode failed: {e}")
|
|
print(" Try running the CLI command directly for more details")
|
|
input("\nPress Enter to continue...")
|
|
return
|
|
|
|
# Exploration session completed successfully, return to menu without extra prompt
|
|
|
|
def _get_context_tokens_estimate(self, explorer):
|
|
"""Estimate the total tokens used in the conversation context."""
|
|
if not explorer.current_session or not explorer.current_session.conversation_history:
|
|
return 0
|
|
|
|
total_chars = 0
|
|
for exchange in explorer.current_session.conversation_history:
|
|
total_chars += len(exchange["question"])
|
|
# Estimate response character count (summary + key points)
|
|
response = exchange["response"]
|
|
total_chars += len(response.get("summary", ""))
|
|
for point in response.get("key_points", []):
|
|
total_chars += len(point)
|
|
|
|
# Rough estimate: 4 characters = 1 token
|
|
return total_chars // 4
|
|
|
|
def _get_context_limit_estimate(self):
|
|
"""Get estimated context limit for current model."""
|
|
# Conservative estimates for common models
|
|
return 32000 # Most models we use have 32k context
|
|
|
|
def _format_token_display(self, used_tokens, limit_tokens):
|
|
"""Format token usage display with color coding."""
|
|
percentage = (used_tokens / limit_tokens) * 100 if limit_tokens > 0 else 0
|
|
|
|
if percentage < 50:
|
|
color = "🟢" # Green - plenty of space
|
|
elif percentage < 75:
|
|
color = "🟡" # Yellow - getting full
|
|
else:
|
|
color = "🔴" # Red - almost full
|
|
|
|
return f"{color} Context: {used_tokens}/{limit_tokens} tokens ({percentage:.0f}%)"
|
|
|
|
def _show_exploration_prompt(self, explorer, is_first=False):
|
|
"""Show standardized input prompt for exploration mode."""
|
|
print()
|
|
print("═" * 60)
|
|
if is_first:
|
|
print("🤔 Ask your first question about the codebase:")
|
|
else:
|
|
print("🤔 What would you like to explore next?")
|
|
print()
|
|
|
|
# Show context usage
|
|
used_tokens = self._get_context_tokens_estimate(explorer)
|
|
limit_tokens = self._get_context_limit_estimate()
|
|
token_display = self._format_token_display(used_tokens, limit_tokens)
|
|
print(f"📊 {token_display}")
|
|
print()
|
|
|
|
print("🔧 Quick Options:")
|
|
print(" 0 = Quit exploration 1 = Summarize conversation")
|
|
print(" 2 = Show question history 3 = Suggest next question")
|
|
print()
|
|
print("💬 Enter your question or choose an option:")
|
|
|
|
def _generate_conversation_summary(self, explorer):
|
|
"""Generate a detailed summary of the conversation history."""
|
|
if not explorer.current_session or not explorer.current_session.conversation_history:
|
|
return "📝 No conversation to summarize yet. Ask a question first!"
|
|
|
|
try:
|
|
# Build conversation context
|
|
conversation_text = ""
|
|
for i, exchange in enumerate(explorer.current_session.conversation_history, 1):
|
|
conversation_text += f"Question {i}: {exchange['question']}\n"
|
|
conversation_text += f"Response {i}: {exchange['response']['summary']}\n"
|
|
# Add key points if available
|
|
if exchange['response'].get('key_points'):
|
|
for point in exchange['response']['key_points']:
|
|
conversation_text += f"- {point}\n"
|
|
conversation_text += "\n"
|
|
|
|
# Determine summary length based on conversation length
|
|
char_count = len(conversation_text)
|
|
if char_count < 500:
|
|
target_length = "brief"
|
|
target_words = "50-80"
|
|
elif char_count < 2000:
|
|
target_length = "moderate"
|
|
target_words = "100-150"
|
|
else:
|
|
target_length = "comprehensive"
|
|
target_words = "200-300"
|
|
|
|
# Create summary prompt for natural conversation style
|
|
prompt = f"""Please summarize this conversation about the project we've been exploring. Write a {target_length} summary ({target_words} words) in a natural, conversational style that captures:
|
|
|
|
1. Main topics we explored together
|
|
2. Key insights we discovered
|
|
3. Important details we learned
|
|
4. Overall understanding we gained
|
|
|
|
Conversation:
|
|
{conversation_text.strip()}
|
|
|
|
Write your summary as if you're explaining to a colleague what we discussed. Use a friendly, informative tone and avoid JSON or structured formats."""
|
|
|
|
# Use the synthesizer to generate summary with streaming and thinking
|
|
print("\n💭 Generating summary...")
|
|
response = explorer.synthesizer._call_ollama(prompt, temperature=0.1, disable_thinking=False, use_streaming=True)
|
|
|
|
if response:
|
|
return f"📋 **Conversation Summary**\n\n{response.strip()}"
|
|
else:
|
|
# Fallback summary
|
|
return self._generate_fallback_summary(explorer.current_session.conversation_history)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Summary generation failed: {e}")
|
|
return self._generate_fallback_summary(explorer.current_session.conversation_history)
|
|
|
|
def _generate_fallback_summary(self, conversation_history):
|
|
"""Generate a simple fallback summary when AI summary fails."""
|
|
if not conversation_history:
|
|
return "📝 No conversation to summarize yet."
|
|
|
|
question_count = len(conversation_history)
|
|
topics = []
|
|
|
|
# Extract simple topics from questions
|
|
for exchange in conversation_history:
|
|
question = exchange["question"].lower()
|
|
if "component" in question or "part" in question:
|
|
topics.append("system components")
|
|
elif "error" in question or "bug" in question:
|
|
topics.append("error handling")
|
|
elif "security" in question or "auth" in question:
|
|
topics.append("security/authentication")
|
|
elif "test" in question:
|
|
topics.append("testing")
|
|
elif "config" in question or "setting" in question:
|
|
topics.append("configuration")
|
|
elif "performance" in question or "speed" in question:
|
|
topics.append("performance")
|
|
else:
|
|
# Extract first few words as topic
|
|
words = question.split()[:3]
|
|
topics.append(" ".join(words))
|
|
|
|
unique_topics = list(dict.fromkeys(topics)) # Remove duplicates while preserving order
|
|
|
|
summary = f"📋 **Conversation Summary**\n\n"
|
|
summary += f"Questions asked: {question_count}\n"
|
|
summary += f"Topics explored: {', '.join(unique_topics[:5])}\n"
|
|
summary += f"Session duration: {len(conversation_history) * 2} minutes (estimated)\n\n"
|
|
summary += "💡 Use option 2 to see recent question history for more details."
|
|
|
|
return summary
|
|
|
|
def _generate_smart_suggestion(self, explorer):
|
|
"""Generate a smart follow-up question based on conversation context."""
|
|
if not explorer.current_session or not explorer.current_session.conversation_history:
|
|
# First question - provide a random starter question
|
|
import random
|
|
starters = [
|
|
"What are the main components of this project?",
|
|
"How is error handling implemented?",
|
|
"Show me the authentication and security logic",
|
|
"What are the key functions I should understand first?",
|
|
"How does data flow through this system?",
|
|
"What configuration options are available?",
|
|
"Show me the most important files to understand"
|
|
]
|
|
return random.choice(starters)
|
|
|
|
try:
|
|
# Get recent conversation context
|
|
recent_exchanges = explorer.current_session.conversation_history[-2:] # Last 2 exchanges
|
|
context_summary = ""
|
|
|
|
for i, exchange in enumerate(recent_exchanges, 1):
|
|
q = exchange["question"]
|
|
summary = exchange["response"]["summary"][:100] + "..." if len(exchange["response"]["summary"]) > 100 else exchange["response"]["summary"]
|
|
context_summary += f"Q{i}: {q}\nA{i}: {summary}\n\n"
|
|
|
|
# Create a very focused prompt that encourages short responses
|
|
prompt = f"""Based on this recent conversation about a codebase, suggest ONE short follow-up question (under 10 words).
|
|
|
|
Recent conversation:
|
|
{context_summary.strip()}
|
|
|
|
Respond with ONLY a single short question that would logically explore deeper or connect to what was discussed. Examples:
|
|
- "Why does this approach work better?"
|
|
- "What could go wrong here?"
|
|
- "How is this tested?"
|
|
- "Where else is this pattern used?"
|
|
|
|
Your suggested question (under 10 words):"""
|
|
|
|
# Use the synthesizer to generate suggestion with thinking collapse
|
|
response = explorer.synthesizer._call_ollama(prompt, temperature=0.3, disable_thinking=False, use_streaming=True, collapse_thinking=True)
|
|
|
|
if response:
|
|
# Clean up the response - extract just the question
|
|
lines = response.strip().split('\n')
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line and ('?' in line or line.lower().startswith(('what', 'how', 'why', 'where', 'when', 'which', 'who'))):
|
|
# Remove any prefixes like "Question:" or numbers
|
|
cleaned = line.split(':', 1)[-1].strip()
|
|
if len(cleaned) < 80 and ('?' in cleaned or cleaned.lower().startswith(('what', 'how', 'why', 'where', 'when', 'which', 'who'))):
|
|
return cleaned
|
|
|
|
# Fallback: use first non-empty line if it looks like a question
|
|
first_line = lines[0].strip() if lines else ""
|
|
if first_line and len(first_line) < 80:
|
|
return first_line
|
|
|
|
# Fallback: pattern-based suggestions if LLM fails
|
|
return self._get_fallback_suggestion(recent_exchanges)
|
|
|
|
except Exception as e:
|
|
# Silent fail with pattern-based fallback
|
|
recent_exchanges = explorer.current_session.conversation_history[-2:] if explorer.current_session.conversation_history else []
|
|
return self._get_fallback_suggestion(recent_exchanges)
|
|
|
|
def _get_fallback_suggestion(self, recent_exchanges):
|
|
"""Generate pattern-based suggestions as fallback."""
|
|
if not recent_exchanges:
|
|
return None
|
|
|
|
last_question = recent_exchanges[-1]["question"].lower()
|
|
|
|
# Simple pattern matching for common follow-ups
|
|
if "how" in last_question and "work" in last_question:
|
|
return "What could go wrong with this approach?"
|
|
elif "what" in last_question and ("is" in last_question or "does" in last_question):
|
|
return "How is this implemented?"
|
|
elif "implement" in last_question or "code" in last_question:
|
|
return "How is this tested?"
|
|
elif "error" in last_question or "bug" in last_question:
|
|
return "How can this be prevented?"
|
|
elif "performance" in last_question or "speed" in last_question:
|
|
return "What are the bottlenecks here?"
|
|
elif "security" in last_question or "safe" in last_question:
|
|
return "What other security concerns exist?"
|
|
elif "test" in last_question:
|
|
return "What edge cases should be considered?"
|
|
else:
|
|
# Generic follow-ups
|
|
fallbacks = [
|
|
"How is this used elsewhere?",
|
|
"What are the alternatives?",
|
|
"Why was this approach chosen?",
|
|
"What happens when this fails?",
|
|
"How can this be improved?"
|
|
]
|
|
import random
|
|
return random.choice(fallbacks)
|
|
|
|
def show_status(self):
|
|
"""Show project and system status."""
|
|
self.clear_screen()
|
|
self.print_header()
|
|
|
|
print("📊 System Status")
|
|
print("===============")
|
|
print()
|
|
|
|
if self.project_path:
|
|
cli_cmd = f"./rag-mini status {self.project_path}"
|
|
self.print_cli_command(cli_cmd, "Show detailed status information")
|
|
|
|
# Check project status
|
|
rag_dir = self.project_path / '.mini-rag'
|
|
if rag_dir.exists():
|
|
try:
|
|
manifest = rag_dir / 'manifest.json'
|
|
if manifest.exists():
|
|
with open(manifest) as f:
|
|
data = json.load(f)
|
|
|
|
print(f"Project: {self.project_path.name}")
|
|
print("✅ Indexed")
|
|
print(f" Files: {data.get('file_count', 0)}")
|
|
print(f" Chunks: {data.get('chunk_count', 0)}")
|
|
print(f" Last update: {data.get('indexed_at', 'Unknown')}")
|
|
else:
|
|
print("⚠️ Index incomplete")
|
|
except Exception as e:
|
|
print(f"❌ Could not read status: {e}")
|
|
else:
|
|
print(f"Project: {self.project_path.name}")
|
|
print("❌ Not indexed")
|
|
else:
|
|
print("❌ No project selected")
|
|
|
|
print()
|
|
|
|
# Show embedding system status
|
|
try:
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from mini_rag.ollama_embeddings import OllamaEmbedder
|
|
|
|
embedder = OllamaEmbedder()
|
|
info = embedder.get_status()
|
|
|
|
print("🧠 Embedding System:")
|
|
method = info.get('method', 'unknown')
|
|
if method == 'ollama':
|
|
print(" ✅ Ollama (high quality)")
|
|
elif method == 'ml':
|
|
print(" ✅ ML fallback (good quality)")
|
|
elif method == 'hash':
|
|
print(" ⚠️ Hash fallback (basic quality)")
|
|
else:
|
|
print(f" ❓ Unknown: {method}")
|
|
|
|
except Exception as e:
|
|
print(f"🧠 Embedding System: ❌ Error: {e}")
|
|
|
|
print()
|
|
input("Press Enter to continue...")
|
|
|
|
def show_configuration(self):
|
|
"""Show and manage configuration options with interactive editing."""
|
|
if not self.project_path:
|
|
print("❌ No project selected")
|
|
input("Press Enter to continue...")
|
|
return
|
|
|
|
while True:
|
|
self.clear_screen()
|
|
self.print_header()
|
|
|
|
print("⚙️ Configuration Manager")
|
|
print("========================")
|
|
print()
|
|
print(f"Project: {self.project_path.name}")
|
|
print()
|
|
|
|
# Load current configuration
|
|
try:
|
|
from mini_rag.config import ConfigManager
|
|
config_manager = ConfigManager(self.project_path)
|
|
config = config_manager.load_config()
|
|
config_path = self.project_path / '.mini-rag' / 'config.yaml'
|
|
|
|
print("📋 Current Settings:")
|
|
print(f" 🤖 AI model: {config.llm.synthesis_model}")
|
|
print(f" 🧠 Context window: {config.llm.context_window} tokens")
|
|
print(f" 📁 Chunk size: {config.chunking.max_size} characters")
|
|
print(f" 🔄 Chunking strategy: {config.chunking.strategy}")
|
|
print(f" 🔍 Search results: {config.search.default_top_k} results")
|
|
print(f" 📊 Embedding method: {config.embedding.preferred_method}")
|
|
print(f" 🚀 Query expansion: {'enabled' if config.search.expand_queries else 'disabled'}")
|
|
print(f" ⚡ LLM synthesis: {'enabled' if config.llm.enable_synthesis else 'disabled'}")
|
|
print()
|
|
|
|
print("🛠️ Quick Configuration Options:")
|
|
print(" 1. Select AI model (Fast/Recommended/Quality)")
|
|
print(" 2. Configure context window (Development/Production/Advanced)")
|
|
print(" 3. Adjust chunk size (performance vs accuracy)")
|
|
print(" 4. Toggle query expansion (smarter searches)")
|
|
print(" 5. Configure search behavior")
|
|
print(" 6. View/edit full configuration file")
|
|
print(" 7. Reset to defaults")
|
|
print(" 8. Advanced settings")
|
|
print()
|
|
print(" V. View current config file")
|
|
print(" B. Back to main menu")
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error loading configuration: {e}")
|
|
print(" A default config will be created when needed")
|
|
print()
|
|
print(" B. Back to main menu")
|
|
|
|
print()
|
|
choice = input("Choose option: ").strip().lower()
|
|
|
|
if choice == 'b' or choice == '' or choice == '0':
|
|
break
|
|
elif choice == 'v':
|
|
self._show_config_file(config_path)
|
|
elif choice == '1':
|
|
self._configure_llm_model(config_manager, config)
|
|
elif choice == '2':
|
|
self._configure_context_window(config_manager, config)
|
|
elif choice == '3':
|
|
self._configure_chunk_size(config_manager, config)
|
|
elif choice == '4':
|
|
self._toggle_query_expansion(config_manager, config)
|
|
elif choice == '5':
|
|
self._configure_search_behavior(config_manager, config)
|
|
elif choice == '6':
|
|
self._edit_config_file(config_path)
|
|
elif choice == '7':
|
|
self._reset_config(config_manager)
|
|
elif choice == '8':
|
|
self._advanced_settings(config_manager, config)
|
|
else:
|
|
print("Invalid option. Press Enter to continue...")
|
|
input()
|
|
|
|
def _show_config_file(self, config_path):
|
|
"""Display the full configuration file."""
|
|
self.clear_screen()
|
|
print("📄 Configuration File Contents")
|
|
print("=" * 50)
|
|
print()
|
|
|
|
if config_path.exists():
|
|
try:
|
|
with open(config_path) as f:
|
|
content = f.read()
|
|
print(content)
|
|
except Exception as e:
|
|
print(f"❌ Could not read file: {e}")
|
|
else:
|
|
print("⚠️ Configuration file doesn't exist yet")
|
|
print(" It will be created when you first index a project")
|
|
|
|
print("\n" + "=" * 50)
|
|
input("Press Enter to continue...")
|
|
|
|
def _configure_llm_model(self, config_manager, config):
|
|
"""Interactive LLM model selection with download capability."""
|
|
self.clear_screen()
|
|
print("🤖 AI Model Configuration")
|
|
print("=========================")
|
|
print()
|
|
|
|
# Check if Ollama is available
|
|
import subprocess
|
|
import requests
|
|
|
|
ollama_available = False
|
|
try:
|
|
subprocess.run(['ollama', '--version'], capture_output=True, check=True)
|
|
response = requests.get("http://localhost:11434/api/version", timeout=3)
|
|
ollama_available = response.status_code == 200
|
|
except:
|
|
pass
|
|
|
|
if not ollama_available:
|
|
print("❌ Ollama not available")
|
|
print()
|
|
print("To use AI features, please:")
|
|
print(" 1. Install Ollama: https://ollama.com/download")
|
|
print(" 2. Start the service: ollama serve")
|
|
print(" 3. Return to this menu")
|
|
print()
|
|
input("Press Enter to continue...")
|
|
return
|
|
|
|
# Get available models
|
|
try:
|
|
available_models = subprocess.run(['ollama', 'list'], capture_output=True, text=True, check=True)
|
|
model_lines = available_models.stdout.strip().split('\n')[1:] # Skip header
|
|
installed_models = [line.split()[0] for line in model_lines if line.strip()]
|
|
except:
|
|
installed_models = []
|
|
|
|
print("🧠 Why Small Models Work Great for RAG")
|
|
print("=====================================")
|
|
print()
|
|
print("RAG systems like FSS-Mini-RAG don't need massive models because:")
|
|
print("• The relevant code/docs are provided as context")
|
|
print("• Models focus on analysis, not memorizing facts")
|
|
print("• Even 0.6B models give excellent results with good context")
|
|
print("• Smaller models = faster responses = better user experience")
|
|
print()
|
|
print("💡 Advanced Use: For heavy development work with 15+ results")
|
|
print(" and 4000+ character chunks, even these models excel!")
|
|
print(" The 4B Qwen3 model will help you code remarkably well.")
|
|
print()
|
|
|
|
# Model options
|
|
model_options = {
|
|
'fast': {
|
|
'model': 'qwen3:0.6b',
|
|
'description': 'Ultra-fast responses (~500MB)',
|
|
'details': 'Perfect for quick searches and exploration. Surprisingly capable!'
|
|
},
|
|
'recommended': {
|
|
'model': 'qwen3:1.7b',
|
|
'description': 'Best balance of speed and quality (~1.4GB)',
|
|
'details': 'Ideal for most users. Great analysis with good speed.'
|
|
},
|
|
'quality': {
|
|
'model': 'qwen3:4b',
|
|
'description': 'Highest quality responses (~2.5GB)',
|
|
'details': 'Excellent for coding assistance and detailed analysis.'
|
|
}
|
|
}
|
|
|
|
print("🎯 Recommended Models:")
|
|
print()
|
|
for key, info in model_options.items():
|
|
is_installed = any(info['model'] in model for model in installed_models)
|
|
status = "✅ Installed" if is_installed else "📥 Available for download"
|
|
|
|
print(f" {key.upper()}: {info['model']}")
|
|
print(f" {info['description']} - {status}")
|
|
print(f" {info['details']}")
|
|
print()
|
|
|
|
current_model = config.llm.synthesis_model
|
|
print(f"Current model: {current_model}")
|
|
print()
|
|
|
|
print("Options:")
|
|
print(" F. Select Fast model (qwen3:0.6b)")
|
|
print(" R. Select Recommended model (qwen3:1.7b)")
|
|
print(" Q. Select Quality model (qwen3:4b)")
|
|
print(" C. Keep current model")
|
|
print(" B. Back to configuration menu")
|
|
print()
|
|
|
|
choice = input("Choose option: ").strip().lower()
|
|
|
|
selected_model = None
|
|
if choice == 'f':
|
|
selected_model = model_options['fast']['model']
|
|
elif choice == 'r':
|
|
selected_model = model_options['recommended']['model']
|
|
elif choice == 'q':
|
|
selected_model = model_options['quality']['model']
|
|
elif choice == 'c':
|
|
print("Keeping current model.")
|
|
input("Press Enter to continue...")
|
|
return
|
|
elif choice == 'b':
|
|
return
|
|
else:
|
|
print("Invalid option.")
|
|
input("Press Enter to continue...")
|
|
return
|
|
|
|
# Check if model is installed
|
|
model_installed = any(selected_model in model for model in installed_models)
|
|
|
|
if not model_installed:
|
|
print(f"\n📥 Model {selected_model} not installed.")
|
|
print("Would you like to download it now?")
|
|
print("This may take 2-5 minutes depending on your internet speed.")
|
|
print()
|
|
|
|
download = input("Download now? [Y/n]: ").strip().lower()
|
|
if download != 'n' and download != 'no':
|
|
print(f"\n🔄 Downloading {selected_model}...")
|
|
print("This may take a few minutes...")
|
|
|
|
try:
|
|
result = subprocess.run(['ollama', 'pull', selected_model],
|
|
capture_output=True, text=True, check=True)
|
|
print(f"✅ Successfully downloaded {selected_model}")
|
|
model_installed = True
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"❌ Download failed: {e}")
|
|
print("You can try downloading manually later with:")
|
|
print(f" ollama pull {selected_model}")
|
|
input("Press Enter to continue...")
|
|
return
|
|
else:
|
|
print("Model not downloaded. Configuration not changed.")
|
|
input("Press Enter to continue...")
|
|
return
|
|
|
|
if model_installed:
|
|
# Update configuration
|
|
config.llm.synthesis_model = selected_model
|
|
config.llm.expansion_model = selected_model # Keep them in sync
|
|
|
|
try:
|
|
config_manager.save_config(config)
|
|
print(f"\n✅ Model updated to {selected_model}")
|
|
print("Configuration saved successfully!")
|
|
except Exception as e:
|
|
print(f"❌ Error saving configuration: {e}")
|
|
|
|
print()
|
|
input("Press Enter to continue...")
|
|
|
|
def _configure_context_window(self, config_manager, config):
|
|
"""Interactive context window configuration."""
|
|
self.clear_screen()
|
|
print("🧠 Context Window Configuration")
|
|
print("===============================")
|
|
print()
|
|
|
|
print("💡 Why Context Window Size Matters for RAG")
|
|
print("==========================================")
|
|
print()
|
|
print("Context window determines how much text the AI can 'remember' during conversation:")
|
|
print()
|
|
print("❌ Default 2048 tokens = Only 1-2 responses before forgetting")
|
|
print("✅ Proper context = 5-15+ responses with maintained conversation")
|
|
print()
|
|
print("For RAG systems like FSS-Mini-RAG:")
|
|
print("• Larger context = better analysis of multiple code files")
|
|
print("• Thinking tokens consume ~200-500 tokens per response")
|
|
print("• Search results can be 1000-3000 tokens depending on chunk size")
|
|
print("• Conversation history builds up over time")
|
|
print()
|
|
print("💻 Memory Usage Impact:")
|
|
print("• 8K context ≈ 6MB memory per conversation")
|
|
print("• 16K context ≈ 12MB memory per conversation")
|
|
print("• 32K context ≈ 24MB memory per conversation")
|
|
print()
|
|
|
|
current_context = config.llm.context_window
|
|
current_model = config.llm.synthesis_model
|
|
|
|
# Get model capabilities
|
|
model_limits = {
|
|
'qwen3:0.6b': 32768,
|
|
'qwen3:1.7b': 32768,
|
|
'qwen3:4b': 131072,
|
|
'qwen2.5:1.5b': 32768,
|
|
'qwen2.5:3b': 32768,
|
|
'default': 8192
|
|
}
|
|
|
|
model_limit = model_limits.get('default', 8192)
|
|
for model_pattern, limit in model_limits.items():
|
|
if model_pattern != 'default' and model_pattern.lower() in current_model.lower():
|
|
model_limit = limit
|
|
break
|
|
|
|
print(f"Current model: {current_model}")
|
|
print(f"Model maximum: {model_limit:,} tokens")
|
|
print(f"Current setting: {current_context:,} tokens")
|
|
print()
|
|
|
|
# Context options
|
|
context_options = {
|
|
'development': {
|
|
'size': 8192,
|
|
'description': 'Fast and efficient for most development work',
|
|
'details': 'Perfect for code exploration and basic analysis. Quick responses.',
|
|
'memory': '~6MB'
|
|
},
|
|
'production': {
|
|
'size': 16384,
|
|
'description': 'Balanced performance for professional use',
|
|
'details': 'Ideal for most users. Handles complex analysis well.',
|
|
'memory': '~12MB'
|
|
},
|
|
'advanced': {
|
|
'size': 32768,
|
|
'description': 'Maximum performance for heavy development',
|
|
'details': 'For large codebases, 15+ search results, complex analysis.',
|
|
'memory': '~24MB'
|
|
}
|
|
}
|
|
|
|
print("🎯 Recommended Context Sizes:")
|
|
print()
|
|
for key, info in context_options.items():
|
|
# Check if this size is supported by current model
|
|
if info['size'] <= model_limit:
|
|
status = "✅ Supported"
|
|
else:
|
|
status = f"❌ Exceeds model limit ({model_limit:,})"
|
|
|
|
print(f" {key.upper()}: {info['size']:,} tokens ({info['memory']})")
|
|
print(f" {info['description']} - {status}")
|
|
print(f" {info['details']}")
|
|
print()
|
|
|
|
print("Options:")
|
|
print(" D. Development (8K tokens - fast)")
|
|
print(" P. Production (16K tokens - balanced)")
|
|
print(" A. Advanced (32K tokens - maximum)")
|
|
print(" C. Custom size (manual entry)")
|
|
print(" K. Keep current setting")
|
|
print(" B. Back to configuration menu")
|
|
print()
|
|
|
|
choice = input("Choose option: ").strip().lower()
|
|
|
|
new_context = None
|
|
if choice == 'd':
|
|
new_context = context_options['development']['size']
|
|
elif choice == 'p':
|
|
new_context = context_options['production']['size']
|
|
elif choice == 'a':
|
|
new_context = context_options['advanced']['size']
|
|
elif choice == 'c':
|
|
print()
|
|
print("Enter custom context size in tokens:")
|
|
print(f" Minimum: 4096 (4K)")
|
|
print(f" Maximum for {current_model}: {model_limit:,}")
|
|
print()
|
|
try:
|
|
custom_size = int(input("Context size: ").strip())
|
|
if custom_size < 4096:
|
|
print("❌ Context too small. Minimum is 4096 tokens for RAG.")
|
|
input("Press Enter to continue...")
|
|
return
|
|
elif custom_size > model_limit:
|
|
print(f"❌ Context too large. Maximum for {current_model} is {model_limit:,} tokens.")
|
|
input("Press Enter to continue...")
|
|
return
|
|
else:
|
|
new_context = custom_size
|
|
except ValueError:
|
|
print("❌ Invalid number.")
|
|
input("Press Enter to continue...")
|
|
return
|
|
elif choice == 'k':
|
|
print("Keeping current context setting.")
|
|
input("Press Enter to continue...")
|
|
return
|
|
elif choice == 'b':
|
|
return
|
|
else:
|
|
print("Invalid option.")
|
|
input("Press Enter to continue...")
|
|
return
|
|
|
|
if new_context:
|
|
# Validate against model capabilities
|
|
if new_context > model_limit:
|
|
print(f"⚠️ Warning: {new_context:,} tokens exceeds {current_model} limit of {model_limit:,}")
|
|
print("The system will automatically cap at the model limit.")
|
|
print()
|
|
|
|
# Update configuration
|
|
config.llm.context_window = new_context
|
|
|
|
try:
|
|
config_manager.save_config(config)
|
|
print(f"✅ Context window updated to {new_context:,} tokens")
|
|
print()
|
|
|
|
# Provide usage guidance
|
|
if new_context >= 32768:
|
|
print("🚀 Advanced context enabled!")
|
|
print("• Perfect for large codebases and complex analysis")
|
|
print("• Try cranking up search results to 15+ for deep exploration")
|
|
print("• Increase chunk size to 4000+ characters for comprehensive context")
|
|
elif new_context >= 16384:
|
|
print("⚖️ Balanced context configured!")
|
|
print("• Great for professional development work")
|
|
print("• Supports extended conversations and analysis")
|
|
elif new_context >= 8192:
|
|
print("⚡ Development context set!")
|
|
print("• Fast responses with good conversation length")
|
|
print("• Perfect for code exploration and basic analysis")
|
|
|
|
print("Configuration saved successfully!")
|
|
except Exception as e:
|
|
print(f"❌ Error saving configuration: {e}")
|
|
|
|
print()
|
|
input("Press Enter to continue...")
|
|
|
|
def _configure_chunk_size(self, config_manager, config):
|
|
"""Interactive chunk size configuration."""
|
|
self.clear_screen()
|
|
print("📁 Chunk Size Configuration")
|
|
print("===========================")
|
|
print()
|
|
print("Chunk size affects both performance and search accuracy:")
|
|
print("• Smaller chunks (500-1000): More precise but may miss context")
|
|
print("• Medium chunks (1500-2500): Good balance (recommended)")
|
|
print("• Larger chunks (3000+): More context but less precise")
|
|
print()
|
|
print(f"Current chunk size: {config.chunking.max_size} characters")
|
|
print()
|
|
|
|
print("Quick presets:")
|
|
print(" 1. Small (1000) - Precise searching")
|
|
print(" 2. Medium (2000) - Balanced (default)")
|
|
print(" 3. Large (3000) - More context")
|
|
print(" 4. Custom size")
|
|
print()
|
|
|
|
choice = input("Choose preset or enter custom size: ").strip()
|
|
|
|
new_size = None
|
|
if choice == '1':
|
|
new_size = 1000
|
|
elif choice == '2':
|
|
new_size = 2000
|
|
elif choice == '3':
|
|
new_size = 3000
|
|
elif choice == '4':
|
|
try:
|
|
new_size = int(input("Enter custom chunk size (500-5000): "))
|
|
if new_size < 500 or new_size > 5000:
|
|
print("❌ Size must be between 500 and 5000")
|
|
input("Press Enter to continue...")
|
|
return
|
|
except ValueError:
|
|
print("❌ Invalid number")
|
|
input("Press Enter to continue...")
|
|
return
|
|
elif choice.isdigit():
|
|
try:
|
|
new_size = int(choice)
|
|
if new_size < 500 or new_size > 5000:
|
|
print("❌ Size must be between 500 and 5000")
|
|
input("Press Enter to continue...")
|
|
return
|
|
except ValueError:
|
|
pass
|
|
|
|
if new_size and new_size != config.chunking.max_size:
|
|
config.chunking.max_size = new_size
|
|
config_manager.save_config(config)
|
|
print(f"\n✅ Chunk size updated to {new_size} characters")
|
|
print("💡 Tip: Re-index your project for changes to take effect")
|
|
input("Press Enter to continue...")
|
|
|
|
def _toggle_query_expansion(self, config_manager, config):
|
|
"""Toggle query expansion on/off."""
|
|
self.clear_screen()
|
|
print("🚀 Query Expansion Configuration")
|
|
print("================================")
|
|
print()
|
|
print("Query expansion automatically adds related terms to your searches")
|
|
print("to improve results quality. This uses an LLM to understand your")
|
|
print("intent and find related concepts.")
|
|
print()
|
|
print("Benefits:")
|
|
print("• Find relevant results even with different terminology")
|
|
print("• Better semantic understanding of queries")
|
|
print("• Improved search for complex technical concepts")
|
|
print()
|
|
print("Requirements:")
|
|
print("• Ollama with a language model (e.g., qwen3:1.7b)")
|
|
print("• Slightly slower search (1-2 seconds)")
|
|
print()
|
|
|
|
current_status = "enabled" if config.search.expand_queries else "disabled"
|
|
print(f"Current status: {current_status}")
|
|
print()
|
|
|
|
if config.search.expand_queries:
|
|
choice = input("Query expansion is currently ON. Turn OFF? [y/N]: ").lower()
|
|
if choice == 'y':
|
|
config.search.expand_queries = False
|
|
config_manager.save_config(config)
|
|
print("✅ Query expansion disabled")
|
|
else:
|
|
choice = input("Query expansion is currently OFF. Turn ON? [y/N]: ").lower()
|
|
if choice == 'y':
|
|
config.search.expand_queries = True
|
|
config_manager.save_config(config)
|
|
print("✅ Query expansion enabled")
|
|
print("💡 Make sure Ollama is running with a language model")
|
|
|
|
input("\nPress Enter to continue...")
|
|
|
|
def _configure_search_behavior(self, config_manager, config):
|
|
"""Configure search behavior settings."""
|
|
self.clear_screen()
|
|
print("🔍 Search Behavior Configuration")
|
|
print("================================")
|
|
print()
|
|
print(f"Current settings:")
|
|
print(f"• Default results: {config.search.default_top_k}")
|
|
print(f"• BM25 keyword boost: {'enabled' if config.search.enable_bm25 else 'disabled'}")
|
|
print(f"• Similarity threshold: {config.search.similarity_threshold}")
|
|
print()
|
|
|
|
print("Configuration options:")
|
|
print(" 1. Change default number of results")
|
|
print(" 2. Toggle BM25 keyword matching")
|
|
print(" 3. Adjust similarity threshold")
|
|
print(" B. Back")
|
|
print()
|
|
|
|
choice = input("Choose option: ").strip().lower()
|
|
|
|
if choice == '1':
|
|
try:
|
|
new_top_k = int(input(f"Enter default number of results (current: {config.search.default_top_k}): "))
|
|
if 1 <= new_top_k <= 100:
|
|
config.search.default_top_k = new_top_k
|
|
config_manager.save_config(config)
|
|
print(f"✅ Default results updated to {new_top_k}")
|
|
else:
|
|
print("❌ Number must be between 1 and 100")
|
|
except ValueError:
|
|
print("❌ Invalid number")
|
|
elif choice == '2':
|
|
config.search.enable_bm25 = not config.search.enable_bm25
|
|
config_manager.save_config(config)
|
|
status = "enabled" if config.search.enable_bm25 else "disabled"
|
|
print(f"✅ BM25 keyword matching {status}")
|
|
elif choice == '3':
|
|
try:
|
|
new_threshold = float(input(f"Enter similarity threshold 0.0-1.0 (current: {config.search.similarity_threshold}): "))
|
|
if 0.0 <= new_threshold <= 1.0:
|
|
config.search.similarity_threshold = new_threshold
|
|
config_manager.save_config(config)
|
|
print(f"✅ Similarity threshold updated to {new_threshold}")
|
|
else:
|
|
print("❌ Threshold must be between 0.0 and 1.0")
|
|
except ValueError:
|
|
print("❌ Invalid number")
|
|
|
|
if choice != 'b' and choice != '':
|
|
input("Press Enter to continue...")
|
|
|
|
def _edit_config_file(self, config_path):
|
|
"""Provide instructions for editing the config file."""
|
|
self.clear_screen()
|
|
print("📝 Edit Configuration File")
|
|
print("=========================")
|
|
print()
|
|
|
|
if config_path.exists():
|
|
print(f"Configuration file location:")
|
|
print(f" {config_path}")
|
|
print()
|
|
print("To edit the configuration:")
|
|
print(" • Use any text editor (nano, vim, VS Code, etc.)")
|
|
print(" • The file is in YAML format with helpful comments")
|
|
print(" • Changes take effect after saving")
|
|
print()
|
|
print("Quick edit commands:")
|
|
self.print_cli_command(f"nano {config_path}", "Edit with nano")
|
|
self.print_cli_command(f"code {config_path}", "Edit with VS Code")
|
|
self.print_cli_command(f"vim {config_path}", "Edit with vim")
|
|
else:
|
|
print("⚠️ Configuration file doesn't exist yet")
|
|
print(" It will be created automatically when you index a project")
|
|
|
|
input("\nPress Enter to continue...")
|
|
|
|
def _reset_config(self, config_manager):
|
|
"""Reset configuration to defaults."""
|
|
self.clear_screen()
|
|
print("🔄 Reset Configuration")
|
|
print("=====================")
|
|
print()
|
|
print("This will reset all settings to default values:")
|
|
print("• Chunk size: 2000 characters")
|
|
print("• Chunking strategy: semantic")
|
|
print("• Query expansion: disabled")
|
|
print("• Search results: 10")
|
|
print("• Embedding method: auto")
|
|
print()
|
|
|
|
confirm = input("Are you sure you want to reset to defaults? [y/N]: ").lower()
|
|
if confirm == 'y':
|
|
from mini_rag.config import RAGConfig
|
|
default_config = RAGConfig()
|
|
config_manager.save_config(default_config)
|
|
print("✅ Configuration reset to defaults")
|
|
print("💡 You may want to re-index for changes to take effect")
|
|
else:
|
|
print("❌ Reset cancelled")
|
|
|
|
input("Press Enter to continue...")
|
|
|
|
def _advanced_settings(self, config_manager, config):
|
|
"""Configure advanced settings."""
|
|
self.clear_screen()
|
|
print("⚙️ Advanced Configuration")
|
|
print("==========================")
|
|
print()
|
|
print("Advanced settings for power users:")
|
|
print()
|
|
print(f"Current advanced settings:")
|
|
print(f"• Min file size: {config.files.min_file_size} bytes")
|
|
print(f"• Streaming threshold: {config.streaming.threshold_bytes} bytes")
|
|
print(f"• Embedding batch size: {config.embedding.batch_size}")
|
|
print(f"• LLM synthesis: {'enabled' if config.llm.enable_synthesis else 'disabled'}")
|
|
print()
|
|
|
|
print("Advanced options:")
|
|
print(" 1. Configure file filtering")
|
|
print(" 2. Adjust performance settings")
|
|
print(" 3. LLM model preferences")
|
|
print(" B. Back")
|
|
print()
|
|
|
|
choice = input("Choose option: ").strip().lower()
|
|
|
|
if choice == '1':
|
|
print("\n📁 File filtering settings:")
|
|
print(f"Minimum file size: {config.files.min_file_size} bytes")
|
|
print(f"Excluded patterns: {len(config.files.exclude_patterns)} patterns")
|
|
print("\n💡 Edit the config file directly for detailed file filtering")
|
|
elif choice == '2':
|
|
print("\n⚡ Performance settings:")
|
|
print(f"Embedding batch size: {config.embedding.batch_size}")
|
|
print(f"Streaming threshold: {config.streaming.threshold_bytes}")
|
|
print("\n💡 Higher batch sizes = faster indexing but more memory")
|
|
elif choice == '3':
|
|
print("\n🧠 LLM model preferences:")
|
|
if hasattr(config.llm, 'model_rankings') and config.llm.model_rankings:
|
|
print("Current model priority order:")
|
|
for i, model in enumerate(config.llm.model_rankings[:5], 1):
|
|
print(f" {i}. {model}")
|
|
print("\n💡 Edit config file to change model preferences")
|
|
|
|
if choice != 'b' and choice != '':
|
|
input("Press Enter to continue...")
|
|
|
|
def show_cli_reference(self):
|
|
"""Show CLI command reference."""
|
|
self.clear_screen()
|
|
self.print_header()
|
|
|
|
print("💻 CLI Command Reference")
|
|
print("=======================")
|
|
print()
|
|
print("All TUI actions can be done via command line:")
|
|
print()
|
|
|
|
print("🚀 Basic Commands:")
|
|
print(" ./rag-mini index <project_path> # Index project")
|
|
print(" ./rag-mini search <project_path> <query> --synthesize # Fast synthesis")
|
|
print(" ./rag-mini explore <project_path> # Interactive thinking mode")
|
|
print(" ./rag-mini status <project_path> # Show status")
|
|
print()
|
|
|
|
print("🎯 Enhanced Commands:")
|
|
print(" ./rag-mini-enhanced search <project_path> <query> # Smart search")
|
|
print(" ./rag-mini-enhanced similar <project_path> <query> # Find patterns")
|
|
print(" ./rag-mini-enhanced analyze <project_path> # Optimization")
|
|
print()
|
|
|
|
print("🛠️ Quick Scripts:")
|
|
print(" ./run_mini_rag.sh index <project_path> # Simple indexing")
|
|
print(" ./run_mini_rag.sh search <project_path> <query> # Simple search")
|
|
print()
|
|
|
|
print("⚙️ Options:")
|
|
print(" --force # Force complete re-index")
|
|
print(" --top-k N # Number of top results to return")
|
|
print(" --verbose # Show detailed output")
|
|
print()
|
|
|
|
print("💡 Pro tip: Start with the TUI, then try the CLI commands!")
|
|
print(" The CLI is more powerful and faster for repeated tasks.")
|
|
print()
|
|
|
|
input("Press Enter to continue...")
|
|
|
|
def main_menu(self):
|
|
"""Main application loop."""
|
|
while True:
|
|
self.clear_screen()
|
|
self.print_header()
|
|
|
|
# Show current project status prominently
|
|
if self.project_path:
|
|
rag_dir = self.project_path / '.mini-rag'
|
|
is_indexed = rag_dir.exists()
|
|
status_icon = "✅" if is_indexed else "❌"
|
|
status_text = "Ready for search" if is_indexed else "Needs indexing"
|
|
|
|
# Check LLM status
|
|
llm_status, llm_model = self._get_llm_status()
|
|
|
|
print("╔════════════════════════════════════════════════════╗")
|
|
# Calculate exact spacing for 50-char content width
|
|
project_line = f" Current Project: {self.project_path.name}"
|
|
print(f"║{project_line:<50}║")
|
|
|
|
status_line = f" Index Status: {status_icon} {status_text}"
|
|
print(f"║{status_line:<50}║")
|
|
|
|
llm_line = f" LLM Status: {llm_status}"
|
|
print(f"║{llm_line:<50}║")
|
|
|
|
if llm_model:
|
|
model_line = f" Model: {llm_model}"
|
|
print(f"║{model_line:<50}║")
|
|
|
|
if is_indexed:
|
|
# Show quick stats if indexed
|
|
try:
|
|
manifest = rag_dir / 'manifest.json'
|
|
if manifest.exists():
|
|
with open(manifest) as f:
|
|
data = json.load(f)
|
|
file_count = data.get('file_count', 0)
|
|
files_line = f" Files indexed: {file_count}"
|
|
print(f"║{files_line:<50}║")
|
|
except:
|
|
pass
|
|
print("╚════════════════════════════════════════════════════╝")
|
|
print()
|
|
else:
|
|
# Show beginner tips when no project selected
|
|
print("🎯 Welcome to FSS-Mini-RAG!")
|
|
print(" Search through code, documents, emails, notes - anything text-based!")
|
|
print(" Start by selecting a project directory below.")
|
|
print()
|
|
|
|
# Create options with visual cues based on project status
|
|
if self.project_path:
|
|
rag_dir = self.project_path / '.mini-rag'
|
|
is_indexed = rag_dir.exists()
|
|
|
|
if is_indexed:
|
|
options = [
|
|
"Select project directory",
|
|
"\033[2mIndex project for search (already indexed)\033[0m",
|
|
"Search project (Fast synthesis)",
|
|
"Explore project (Deep thinking)",
|
|
"View status",
|
|
"Configuration",
|
|
"CLI command reference"
|
|
]
|
|
else:
|
|
options = [
|
|
"Select project directory",
|
|
"Index project for search",
|
|
"\033[2mSearch project (needs indexing first)\033[0m",
|
|
"\033[2mExplore project (needs indexing first)\033[0m",
|
|
"View status",
|
|
"Configuration",
|
|
"CLI command reference"
|
|
]
|
|
else:
|
|
# No project selected - gray out project-dependent options
|
|
options = [
|
|
"Select project directory",
|
|
"\033[2mIndex project for search (select project first)\033[0m",
|
|
"\033[2mSearch project (select project first)\033[0m",
|
|
"\033[2mExplore project (select project first)\033[0m",
|
|
"\033[2mView status (select project first)\033[0m",
|
|
"Configuration",
|
|
"CLI command reference"
|
|
]
|
|
|
|
choice = self.show_menu("Main Menu", options, back_option="Exit")
|
|
|
|
if choice == -1: # Exit (0 option)
|
|
print("\nThanks for using FSS-Mini-RAG! 🚀")
|
|
print("Try the CLI commands for even more power!")
|
|
break
|
|
elif choice == 0:
|
|
self.select_project()
|
|
elif choice == 1:
|
|
self.index_project_interactive()
|
|
elif choice == 2:
|
|
self.search_interactive()
|
|
elif choice == 3:
|
|
self.explore_interactive()
|
|
elif choice == 4:
|
|
self.show_status()
|
|
elif choice == 5:
|
|
self.show_configuration()
|
|
elif choice == 6:
|
|
self.show_cli_reference()
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
try:
|
|
# Check if we can import dependencies
|
|
try:
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
from mini_rag.venv_checker import check_and_warn_venv
|
|
check_and_warn_venv("rag-tui", force_exit=False)
|
|
except ImportError as e:
|
|
# Dependencies missing - show helpful message
|
|
script_dir = Path(__file__).parent
|
|
print("❌ FSS-Mini-RAG dependencies not found!")
|
|
print("")
|
|
print("🔧 To fix this:")
|
|
print(f" 1. Run the installer: {script_dir}/install_mini_rag.sh")
|
|
print(f" 2. Or use the wrapper script: {script_dir}/rag-tui")
|
|
print(" 3. Or activate the virtual environment first:")
|
|
print(f" cd {script_dir}")
|
|
print(" source .venv/bin/activate")
|
|
print(f" python3 {script_dir}/rag-tui.py")
|
|
print("")
|
|
print(f"💡 Dependencies missing: {e}")
|
|
input("\nPress Enter to exit...")
|
|
return
|
|
|
|
tui = SimpleTUI()
|
|
tui.main_menu()
|
|
except (KeyboardInterrupt, EOFError):
|
|
print("\n\nGoodbye! 👋")
|
|
except Exception as e:
|
|
print(f"\nUnexpected error: {e}")
|
|
print("Try running the CLI commands directly if this continues.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |