- Changed primary model recommendation from qwen3:1.7b to qwen3:4b - Added Q8 quantization info in technical docs for production users - Fixed method name error: get_embedding_info() -> get_status() - Updated all error messages and test files with new recommendations - Maintained beginner-friendly options (1.7b still very good, 0.6b surprisingly good) - Added explanation of why small models work well with RAG context - Comprehensive testing completed - system ready for clean release
416 lines
16 KiB
Python
416 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
LLM Synthesizer for RAG Results
|
|
|
|
Provides intelligent synthesis of search results using Ollama LLMs.
|
|
Takes raw search results and generates coherent, contextual summaries.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import time
|
|
from typing import List, Dict, Any, Optional
|
|
from dataclasses import dataclass
|
|
import requests
|
|
from pathlib import Path
|
|
|
|
try:
|
|
from .llm_safeguards import ModelRunawayDetector, SafeguardConfig, get_optimal_ollama_parameters
|
|
except ImportError:
|
|
# Graceful fallback if safeguards not available
|
|
ModelRunawayDetector = None
|
|
SafeguardConfig = None
|
|
get_optimal_ollama_parameters = lambda x: {}
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class SynthesisResult:
|
|
"""Result of LLM synthesis."""
|
|
summary: str
|
|
key_points: List[str]
|
|
code_examples: List[str]
|
|
suggested_actions: List[str]
|
|
confidence: float
|
|
|
|
class LLMSynthesizer:
|
|
"""Synthesizes RAG search results using Ollama LLMs."""
|
|
|
|
def __init__(self, ollama_url: str = "http://localhost:11434", model: str = None, enable_thinking: bool = False):
|
|
self.ollama_url = ollama_url.rstrip('/')
|
|
self.available_models = []
|
|
self.model = model
|
|
self.enable_thinking = enable_thinking # Default False for synthesis mode
|
|
self._initialized = False
|
|
|
|
# Initialize safeguards
|
|
if ModelRunawayDetector:
|
|
self.safeguard_detector = ModelRunawayDetector(SafeguardConfig())
|
|
else:
|
|
self.safeguard_detector = None
|
|
|
|
def _get_available_models(self) -> List[str]:
|
|
"""Get list of available Ollama models."""
|
|
try:
|
|
response = requests.get(f"{self.ollama_url}/api/tags", timeout=5)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
return [model['name'] for model in data.get('models', [])]
|
|
except Exception as e:
|
|
logger.warning(f"Could not fetch Ollama models: {e}")
|
|
return []
|
|
|
|
def _select_best_model(self) -> str:
|
|
"""Select the best available model based on modern performance rankings."""
|
|
if not self.available_models:
|
|
return "qwen2.5:1.5b" # Fallback preference
|
|
|
|
# Modern model preference ranking (CPU-friendly first)
|
|
# Prioritize: Ultra-efficient > Standard efficient > Larger models
|
|
model_rankings = [
|
|
# Recommended model (excellent quality)
|
|
"qwen3:4b",
|
|
|
|
# Ultra-efficient models (perfect for CPU-only systems)
|
|
"qwen3:0.6b", "qwen3:1.7b", "llama3.2:1b",
|
|
|
|
# Standard efficient models
|
|
"qwen2.5:1.5b", "qwen3:3b",
|
|
|
|
# Qwen2.5 models (excellent performance/size ratio)
|
|
"qwen2.5-coder:1.5b", "qwen2.5:1.5b", "qwen2.5:3b", "qwen2.5-coder:3b",
|
|
"qwen2.5:7b", "qwen2.5-coder:7b",
|
|
|
|
# Qwen2 models (older but still good)
|
|
"qwen2:1.5b", "qwen2:3b", "qwen2:7b",
|
|
|
|
# Mistral models (good quality, reasonable size)
|
|
"mistral:7b", "mistral-nemo", "mistral-small",
|
|
|
|
# Llama3.2 models (decent but larger)
|
|
"llama3.2:1b", "llama3.2:3b", "llama3.2", "llama3.2:8b",
|
|
|
|
# Fallback to other Llama models
|
|
"llama3.1:8b", "llama3:8b", "llama3",
|
|
|
|
# Other decent models
|
|
"gemma2:2b", "gemma2:9b", "phi3:3.8b", "phi3.5",
|
|
]
|
|
|
|
# Find first available model from our ranked list
|
|
for preferred_model in model_rankings:
|
|
for available_model in self.available_models:
|
|
# Match model names (handle version tags)
|
|
available_base = available_model.split(':')[0].lower()
|
|
preferred_base = preferred_model.split(':')[0].lower()
|
|
|
|
if preferred_base in available_base or available_base in preferred_base:
|
|
# Additional size filtering - prefer smaller models
|
|
if any(size in available_model.lower() for size in ['1b', '1.5b', '2b', '3b']):
|
|
logger.info(f"Selected efficient model: {available_model}")
|
|
return available_model
|
|
elif any(size in available_model.lower() for size in ['7b', '8b']):
|
|
# Only use larger models if no smaller ones available
|
|
logger.info(f"Selected larger model: {available_model}")
|
|
return available_model
|
|
elif ':' not in available_model:
|
|
# Handle models without explicit size tags
|
|
return available_model
|
|
|
|
# If no preferred models found, use first available
|
|
fallback = self.available_models[0]
|
|
logger.warning(f"Using fallback model: {fallback}")
|
|
return fallback
|
|
|
|
def _ensure_initialized(self):
|
|
"""Lazy initialization with LLM warmup."""
|
|
if self._initialized:
|
|
return
|
|
|
|
# Load available models
|
|
self.available_models = self._get_available_models()
|
|
if not self.model:
|
|
self.model = self._select_best_model()
|
|
|
|
# Warm up LLM with minimal request (ignores response)
|
|
if self.available_models:
|
|
try:
|
|
self._call_ollama("testing, just say 'hi'", temperature=0.1, disable_thinking=True)
|
|
except:
|
|
pass # Warmup failure is non-critical
|
|
|
|
self._initialized = True
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if Ollama is available and has models."""
|
|
self._ensure_initialized()
|
|
return len(self.available_models) > 0
|
|
|
|
def _call_ollama(self, prompt: str, temperature: float = 0.3, disable_thinking: bool = False) -> Optional[str]:
|
|
"""Make a call to Ollama API with safeguards."""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Use the best available model
|
|
model_to_use = self.model
|
|
if self.model not in self.available_models:
|
|
# Fallback to first available model
|
|
if self.available_models:
|
|
model_to_use = self.available_models[0]
|
|
else:
|
|
logger.error("No Ollama models available")
|
|
return None
|
|
|
|
# Handle thinking mode for Qwen3 models
|
|
final_prompt = prompt
|
|
if not self.enable_thinking or disable_thinking:
|
|
if not final_prompt.endswith(" <no_think>"):
|
|
final_prompt += " <no_think>"
|
|
|
|
# Get optimal parameters for this model
|
|
optimal_params = get_optimal_ollama_parameters(model_to_use)
|
|
|
|
payload = {
|
|
"model": model_to_use,
|
|
"prompt": final_prompt,
|
|
"stream": False,
|
|
"options": {
|
|
"temperature": temperature,
|
|
"top_p": optimal_params.get("top_p", 0.9),
|
|
"top_k": optimal_params.get("top_k", 40),
|
|
"num_ctx": optimal_params.get("num_ctx", 32768),
|
|
"num_predict": optimal_params.get("num_predict", 2000),
|
|
"repeat_penalty": optimal_params.get("repeat_penalty", 1.1),
|
|
"presence_penalty": optimal_params.get("presence_penalty", 1.0)
|
|
}
|
|
}
|
|
|
|
response = requests.post(
|
|
f"{self.ollama_url}/api/generate",
|
|
json=payload,
|
|
timeout=65 # Slightly longer than safeguard timeout
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
raw_response = result.get('response', '').strip()
|
|
|
|
# Apply safeguards to check response quality
|
|
if self.safeguard_detector and raw_response:
|
|
is_valid, issue_type, explanation = self.safeguard_detector.check_response_quality(
|
|
raw_response, prompt[:100], start_time # First 100 chars of prompt for context
|
|
)
|
|
|
|
if not is_valid:
|
|
logger.warning(f"Safeguard triggered: {issue_type}")
|
|
# Return a safe explanation instead of the problematic response
|
|
return self._create_safeguard_response(issue_type, explanation, prompt)
|
|
|
|
return raw_response
|
|
else:
|
|
logger.error(f"Ollama API error: {response.status_code}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Ollama call failed: {e}")
|
|
return None
|
|
|
|
def _create_safeguard_response(self, issue_type: str, explanation: str, original_prompt: str) -> str:
|
|
"""Create a helpful response when safeguards are triggered."""
|
|
return f"""⚠️ Model Response Issue Detected
|
|
|
|
{explanation}
|
|
|
|
**Original query context:** {original_prompt[:200]}{'...' if len(original_prompt) > 200 else ''}
|
|
|
|
**What happened:** The AI model encountered a common issue with small language models and was prevented from giving a problematic response.
|
|
|
|
**Your options:**
|
|
1. **Try again**: Ask the same question (often resolves itself)
|
|
2. **Rephrase**: Make your question more specific or break it into parts
|
|
3. **Use exploration mode**: `rag-mini explore` for complex questions
|
|
4. **Different approach**: Try synthesis mode: `--synthesize` for simpler responses
|
|
|
|
This is normal with smaller AI models and helps ensure you get quality responses."""
|
|
|
|
def synthesize_search_results(self, query: str, results: List[Any], project_path: Path) -> SynthesisResult:
|
|
"""Synthesize search results into a coherent summary."""
|
|
|
|
self._ensure_initialized()
|
|
if not self.is_available():
|
|
return SynthesisResult(
|
|
summary="LLM synthesis unavailable (Ollama not running or no models)",
|
|
key_points=[],
|
|
code_examples=[],
|
|
suggested_actions=["Install and run Ollama with a model"],
|
|
confidence=0.0
|
|
)
|
|
|
|
# Prepare context from search results
|
|
context_parts = []
|
|
for i, result in enumerate(results[:8], 1): # Limit to top 8 results
|
|
file_path = result.file_path if hasattr(result, 'file_path') else 'unknown'
|
|
content = result.content if hasattr(result, 'content') else str(result)
|
|
score = result.score if hasattr(result, 'score') else 0.0
|
|
|
|
context_parts.append(f"""
|
|
Result {i} (Score: {score:.3f}):
|
|
File: {file_path}
|
|
Content: {content[:500]}{'...' if len(content) > 500 else ''}
|
|
""")
|
|
|
|
context = "\n".join(context_parts)
|
|
|
|
# Create synthesis prompt
|
|
prompt = f"""You are a senior software engineer analyzing code search results. Your task is to synthesize the search results into a helpful, actionable summary.
|
|
|
|
SEARCH QUERY: "{query}"
|
|
PROJECT: {project_path.name}
|
|
|
|
SEARCH RESULTS:
|
|
{context}
|
|
|
|
Please provide a synthesis in the following JSON format:
|
|
{{
|
|
"summary": "A 2-3 sentence overview of what the search results show",
|
|
"key_points": [
|
|
"Important finding 1",
|
|
"Important finding 2",
|
|
"Important finding 3"
|
|
],
|
|
"code_examples": [
|
|
"Relevant code snippet or pattern from the results",
|
|
"Another important code example"
|
|
],
|
|
"suggested_actions": [
|
|
"What the developer should do next",
|
|
"Additional recommendations"
|
|
],
|
|
"confidence": 0.85
|
|
}}
|
|
|
|
Focus on:
|
|
- What the code does and how it works
|
|
- Patterns and relationships between the results
|
|
- Practical next steps for the developer
|
|
- Code quality observations
|
|
|
|
Respond with ONLY the JSON, no other text."""
|
|
|
|
# Get LLM response
|
|
response = self._call_ollama(prompt, temperature=0.2)
|
|
|
|
if not response:
|
|
return SynthesisResult(
|
|
summary="LLM synthesis failed (API error)",
|
|
key_points=[],
|
|
code_examples=[],
|
|
suggested_actions=["Check Ollama status and try again"],
|
|
confidence=0.0
|
|
)
|
|
|
|
# Parse JSON response
|
|
try:
|
|
# Extract JSON from response (in case there's extra text)
|
|
start_idx = response.find('{')
|
|
end_idx = response.rfind('}') + 1
|
|
if start_idx >= 0 and end_idx > start_idx:
|
|
json_str = response[start_idx:end_idx]
|
|
data = json.loads(json_str)
|
|
|
|
return SynthesisResult(
|
|
summary=data.get('summary', 'No summary generated'),
|
|
key_points=data.get('key_points', []),
|
|
code_examples=data.get('code_examples', []),
|
|
suggested_actions=data.get('suggested_actions', []),
|
|
confidence=float(data.get('confidence', 0.5))
|
|
)
|
|
else:
|
|
# Fallback: use the raw response as summary
|
|
return SynthesisResult(
|
|
summary=response[:300] + '...' if len(response) > 300 else response,
|
|
key_points=[],
|
|
code_examples=[],
|
|
suggested_actions=[],
|
|
confidence=0.3
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse LLM response: {e}")
|
|
return SynthesisResult(
|
|
summary="LLM synthesis failed (JSON parsing error)",
|
|
key_points=[],
|
|
code_examples=[],
|
|
suggested_actions=["Try the search again or check LLM output"],
|
|
confidence=0.0
|
|
)
|
|
|
|
def format_synthesis_output(self, synthesis: SynthesisResult, query: str) -> str:
|
|
"""Format synthesis result for display."""
|
|
|
|
output = []
|
|
output.append("🧠 LLM SYNTHESIS")
|
|
output.append("=" * 50)
|
|
output.append("")
|
|
|
|
output.append(f"📝 Summary:")
|
|
output.append(f" {synthesis.summary}")
|
|
output.append("")
|
|
|
|
if synthesis.key_points:
|
|
output.append("🔍 Key Findings:")
|
|
for point in synthesis.key_points:
|
|
output.append(f" • {point}")
|
|
output.append("")
|
|
|
|
if synthesis.code_examples:
|
|
output.append("💡 Code Patterns:")
|
|
for example in synthesis.code_examples:
|
|
output.append(f" {example}")
|
|
output.append("")
|
|
|
|
if synthesis.suggested_actions:
|
|
output.append("🎯 Suggested Actions:")
|
|
for action in synthesis.suggested_actions:
|
|
output.append(f" • {action}")
|
|
output.append("")
|
|
|
|
confidence_emoji = "🟢" if synthesis.confidence > 0.7 else "🟡" if synthesis.confidence > 0.4 else "🔴"
|
|
output.append(f"{confidence_emoji} Confidence: {synthesis.confidence:.1%}")
|
|
output.append("")
|
|
|
|
return "\n".join(output)
|
|
|
|
# Quick test function
|
|
def test_synthesizer():
|
|
"""Test the synthesizer with sample data."""
|
|
from dataclasses import dataclass
|
|
|
|
@dataclass
|
|
class MockResult:
|
|
file_path: str
|
|
content: str
|
|
score: float
|
|
|
|
synthesizer = LLMSynthesizer()
|
|
|
|
if not synthesizer.is_available():
|
|
print("❌ Ollama not available for testing")
|
|
return
|
|
|
|
# Mock search results
|
|
results = [
|
|
MockResult("auth.py", "def authenticate_user(username, password):\n return verify_credentials(username, password)", 0.95),
|
|
MockResult("models.py", "class User:\n def login(self):\n return authenticate_user(self.username, self.password)", 0.87)
|
|
]
|
|
|
|
synthesis = synthesizer.synthesize_search_results(
|
|
"user authentication",
|
|
results,
|
|
Path("/test/project")
|
|
)
|
|
|
|
print(synthesizer.format_synthesis_output(synthesis, "user authentication"))
|
|
|
|
if __name__ == "__main__":
|
|
test_synthesizer() |