Fss-Rag-Mini/mini_rag/llm_synthesizer.py
BobAi a189a4fe29 Implement comprehensive context window configuration system
Add intelligent context window management for optimal RAG performance:

## Core Features
- Dynamic context sizing based on model capabilities
- User-friendly configuration menu with Development/Production/Advanced presets
- Automatic validation against model limits (qwen3:0.6b/1.7b = 32K, qwen3:4b = 131K)
- Educational content explaining context window importance for RAG

## Technical Implementation
- Enhanced LLMConfig with context_window and auto_context parameters
- Intelligent _get_optimal_context_size() method with model-specific limits
- Consistent context application across synthesizer and explorer
- YAML configuration output with helpful context explanations

## User Experience Improvements
- Clear context window display in configuration status
- Guided selection: Development (8K), Production (16K), Advanced (32K)
- Memory usage estimates and performance guidance
- Validation prevents invalid context/model combinations

## Educational Value
- Explains why default 2048 tokens fails for RAG
- Shows relationship between context size and conversation length
- Guides users toward optimal settings for their use case
- Highlights advanced capabilities (15+ results, 4000+ character chunks)

This addresses the critical issue where Ollama's default context severely
limits RAG performance, providing users with proper configuration tools
and understanding of this crucial parameter.
2025-08-15 13:09:53 +10:00

709 lines
31 KiB
Python

#!/usr/bin/env python3
"""
LLM Synthesizer for RAG Results
Provides intelligent synthesis of search results using Ollama LLMs.
Takes raw search results and generates coherent, contextual summaries.
"""
import json
import logging
import time
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import requests
from pathlib import Path
try:
from .llm_safeguards import ModelRunawayDetector, SafeguardConfig, get_optimal_ollama_parameters
except ImportError:
# Graceful fallback if safeguards not available
ModelRunawayDetector = None
SafeguardConfig = None
get_optimal_ollama_parameters = lambda x: {}
logger = logging.getLogger(__name__)
@dataclass
class SynthesisResult:
"""Result of LLM synthesis."""
summary: str
key_points: List[str]
code_examples: List[str]
suggested_actions: List[str]
confidence: float
class LLMSynthesizer:
"""Synthesizes RAG search results using Ollama LLMs."""
def __init__(self, ollama_url: str = "http://localhost:11434", model: str = None, enable_thinking: bool = False, config=None):
self.ollama_url = ollama_url.rstrip('/')
self.available_models = []
self.model = model
self.enable_thinking = enable_thinking # Default False for synthesis mode
self._initialized = False
self.config = config # For accessing model rankings
# Initialize safeguards
if ModelRunawayDetector:
self.safeguard_detector = ModelRunawayDetector(SafeguardConfig())
else:
self.safeguard_detector = None
def _get_available_models(self) -> List[str]:
"""Get list of available Ollama models."""
try:
response = requests.get(f"{self.ollama_url}/api/tags", timeout=5)
if response.status_code == 200:
data = response.json()
return [model['name'] for model in data.get('models', [])]
except Exception as e:
logger.warning(f"Could not fetch Ollama models: {e}")
return []
def _select_best_model(self) -> str:
"""Select the best available model based on configuration rankings."""
if not self.available_models:
return "qwen2.5:1.5b" # Fallback preference
# Get model rankings from config or use defaults
if self.config and hasattr(self.config, 'llm') and hasattr(self.config.llm, 'model_rankings'):
model_rankings = self.config.llm.model_rankings
else:
# Fallback rankings if no config
model_rankings = [
"qwen3:1.7b", "qwen3:0.6b", "qwen3:4b", "qwen2.5:3b",
"qwen2.5:1.5b", "qwen2.5-coder:1.5b"
]
# Find first available model from our ranked list (exact matches first)
for preferred_model in model_rankings:
for available_model in self.available_models:
# Exact match first (e.g., "qwen3:1.7b" matches "qwen3:1.7b")
if preferred_model.lower() == available_model.lower():
logger.info(f"Selected exact match model: {available_model}")
return available_model
# Partial match with version handling (e.g., "qwen3:1.7b" matches "qwen3:1.7b-q8_0")
preferred_parts = preferred_model.lower().split(':')
available_parts = available_model.lower().split(':')
if len(preferred_parts) >= 2 and len(available_parts) >= 2:
if (preferred_parts[0] == available_parts[0] and
preferred_parts[1] in available_parts[1]):
logger.info(f"Selected version match model: {available_model}")
return available_model
# If no preferred models found, use first available
fallback = self.available_models[0]
logger.warning(f"Using fallback model: {fallback}")
return fallback
def _ensure_initialized(self):
"""Lazy initialization with LLM warmup."""
if self._initialized:
return
# Load available models
self.available_models = self._get_available_models()
if not self.model:
self.model = self._select_best_model()
# Skip warmup - models are fast enough and warmup causes delays
# Warmup removed to eliminate startup delays and unwanted model calls
self._initialized = True
def _get_optimal_context_size(self, model_name: str) -> int:
"""Get optimal context size based on model capabilities and configuration."""
# Get configured context window
if self.config and hasattr(self.config, 'llm'):
configured_context = self.config.llm.context_window
auto_context = getattr(self.config.llm, 'auto_context', True)
else:
configured_context = 16384 # Default to 16K
auto_context = True
# Model-specific maximum context windows (based on research)
model_limits = {
# Qwen3 models with native context support
'qwen3:0.6b': 32768, # 32K native
'qwen3:1.7b': 32768, # 32K native
'qwen3:4b': 131072, # 131K with YaRN extension
# Qwen2.5 models
'qwen2.5:1.5b': 32768, # 32K native
'qwen2.5:3b': 32768, # 32K native
'qwen2.5-coder:1.5b': 32768, # 32K native
# Fallback for unknown models
'default': 8192
}
# Find model limit (check for partial matches)
model_limit = model_limits.get('default', 8192)
for model_pattern, limit in model_limits.items():
if model_pattern != 'default' and model_pattern.lower() in model_name.lower():
model_limit = limit
break
# If auto_context is enabled, respect model limits
if auto_context:
optimal_context = min(configured_context, model_limit)
else:
optimal_context = configured_context
# Ensure minimum usable context for RAG
optimal_context = max(optimal_context, 4096) # Minimum 4K for basic RAG
logger.debug(f"Context for {model_name}: {optimal_context} tokens (configured: {configured_context}, limit: {model_limit})")
return optimal_context
def is_available(self) -> bool:
"""Check if Ollama is available and has models."""
self._ensure_initialized()
return len(self.available_models) > 0
def _call_ollama(self, prompt: str, temperature: float = 0.3, disable_thinking: bool = False, use_streaming: bool = True, collapse_thinking: bool = True) -> Optional[str]:
"""Make a call to Ollama API with safeguards."""
start_time = time.time()
try:
# Use the best available model
model_to_use = self.model
if self.model not in self.available_models:
# Fallback to first available model
if self.available_models:
model_to_use = self.available_models[0]
else:
logger.error("No Ollama models available")
return None
# Handle thinking mode for Qwen3 models
final_prompt = prompt
use_thinking = self.enable_thinking and not disable_thinking
# For non-thinking mode, add <no_think> tag for Qwen3
if not use_thinking and "qwen3" in model_to_use.lower():
if not final_prompt.endswith(" <no_think>"):
final_prompt += " <no_think>"
# Get optimal parameters for this model
optimal_params = get_optimal_ollama_parameters(model_to_use)
# Qwen3-specific optimal parameters based on research
if "qwen3" in model_to_use.lower():
if use_thinking:
# Thinking mode: Temperature=0.6, TopP=0.95, TopK=20, PresencePenalty=1.5
qwen3_temp = 0.6
qwen3_top_p = 0.95
qwen3_top_k = 20
qwen3_presence = 1.5
else:
# Non-thinking mode: Temperature=0.7, TopP=0.8, TopK=20, PresencePenalty=1.5
qwen3_temp = 0.7
qwen3_top_p = 0.8
qwen3_top_k = 20
qwen3_presence = 1.5
else:
qwen3_temp = temperature
qwen3_top_p = optimal_params.get("top_p", 0.9)
qwen3_top_k = optimal_params.get("top_k", 40)
qwen3_presence = optimal_params.get("presence_penalty", 1.0)
payload = {
"model": model_to_use,
"prompt": final_prompt,
"stream": use_streaming,
"options": {
"temperature": qwen3_temp,
"top_p": qwen3_top_p,
"top_k": qwen3_top_k,
"num_ctx": self._get_optimal_context_size(model_to_use), # Dynamic context based on model and config
"num_predict": optimal_params.get("num_predict", 2000),
"repeat_penalty": optimal_params.get("repeat_penalty", 1.1),
"presence_penalty": qwen3_presence
}
}
# Handle streaming with thinking display
if use_streaming:
return self._handle_streaming_with_thinking_display(payload, model_to_use, use_thinking, start_time, collapse_thinking)
response = requests.post(
f"{self.ollama_url}/api/generate",
json=payload,
timeout=65 # Slightly longer than safeguard timeout
)
if response.status_code == 200:
result = response.json()
# All models use standard response format
# Qwen3 thinking tokens are embedded in the response content itself as <think>...</think>
raw_response = result.get('response', '').strip()
# Log thinking content for Qwen3 debugging
if "qwen3" in model_to_use.lower() and use_thinking and "<think>" in raw_response:
thinking_start = raw_response.find("<think>")
thinking_end = raw_response.find("</think>")
if thinking_start != -1 and thinking_end != -1:
thinking_content = raw_response[thinking_start+7:thinking_end]
logger.info(f"Qwen3 thinking: {thinking_content[:100]}...")
# Apply safeguards to check response quality
if self.safeguard_detector and raw_response:
is_valid, issue_type, explanation = self.safeguard_detector.check_response_quality(
raw_response, prompt[:100], start_time # First 100 chars of prompt for context
)
if not is_valid:
logger.warning(f"Safeguard triggered: {issue_type}")
# Preserve original response but add safeguard warning
return self._create_safeguard_response_with_content(issue_type, explanation, raw_response)
return raw_response
else:
logger.error(f"Ollama API error: {response.status_code}")
return None
except Exception as e:
logger.error(f"Ollama call failed: {e}")
return None
def _create_safeguard_response(self, issue_type: str, explanation: str, original_prompt: str) -> str:
"""Create a helpful response when safeguards are triggered."""
return f"""⚠️ Model Response Issue Detected
{explanation}
**Original query context:** {original_prompt[:200]}{'...' if len(original_prompt) > 200 else ''}
**What happened:** The AI model encountered a common issue with small language models and was prevented from giving a problematic response.
**Your options:**
1. **Try again**: Ask the same question (often resolves itself)
2. **Rephrase**: Make your question more specific or break it into parts
3. **Use exploration mode**: `rag-mini explore` for complex questions
4. **Different approach**: Try synthesis mode: `--synthesize` for simpler responses
This is normal with smaller AI models and helps ensure you get quality responses."""
def _create_safeguard_response_with_content(self, issue_type: str, explanation: str, original_response: str) -> str:
"""Create a response that preserves the original content but adds a safeguard warning."""
# For Qwen3, extract the actual response (after thinking)
actual_response = original_response
if "<think>" in original_response and "</think>" in original_response:
thinking_end = original_response.find("</think>")
if thinking_end != -1:
actual_response = original_response[thinking_end + 8:].strip()
# If we have useful content, preserve it with a warning
if len(actual_response.strip()) > 20:
return f"""⚠️ **Response Quality Warning** ({issue_type})
{explanation}
---
**AI Response (use with caution):**
{actual_response}
---
💡 **Note**: This response may have quality issues. Consider rephrasing your question or trying exploration mode for better results."""
else:
# If content is too short or problematic, use the original safeguard response
return f"""⚠️ Model Response Issue Detected
{explanation}
**What happened:** The AI model encountered a common issue with small language models.
**Your options:**
1. **Try again**: Ask the same question (often resolves itself)
2. **Rephrase**: Make your question more specific or break it into parts
3. **Use exploration mode**: `rag-mini explore` for complex questions
This is normal with smaller AI models and helps ensure you get quality responses."""
def _handle_streaming_with_thinking_display(self, payload: dict, model_name: str, use_thinking: bool, start_time: float, collapse_thinking: bool = True) -> Optional[str]:
"""Handle streaming response with real-time thinking token display."""
import json
import sys
try:
response = requests.post(
f"{self.ollama_url}/api/generate",
json=payload,
stream=True,
timeout=65
)
if response.status_code != 200:
logger.error(f"Ollama API error: {response.status_code}")
return None
full_response = ""
thinking_content = ""
is_in_thinking = False
is_thinking_complete = False
thinking_lines_printed = 0
# ANSI escape codes for colors and cursor control
GRAY = '\033[90m' # Dark gray for thinking
LIGHT_GRAY = '\033[37m' # Light gray alternative
RESET = '\033[0m' # Reset color
CLEAR_LINE = '\033[2K' # Clear entire line
CURSOR_UP = '\033[A' # Move cursor up one line
print(f"\n💭 {GRAY}Thinking...{RESET}", flush=True)
for line in response.iter_lines():
if line:
try:
chunk_data = json.loads(line.decode('utf-8'))
chunk_text = chunk_data.get('response', '')
if chunk_text:
full_response += chunk_text
# Handle thinking tokens
if use_thinking and '<think>' in chunk_text:
is_in_thinking = True
chunk_text = chunk_text.replace('<think>', '')
if is_in_thinking and '</think>' in chunk_text:
is_in_thinking = False
is_thinking_complete = True
chunk_text = chunk_text.replace('</think>', '')
if collapse_thinking:
# Clear thinking content and show completion
# Move cursor up to clear thinking lines
for _ in range(thinking_lines_printed + 1):
print(f"{CURSOR_UP}{CLEAR_LINE}", end='', flush=True)
print(f"💭 {GRAY}Thinking complete ✓{RESET}", flush=True)
thinking_lines_printed = 0
else:
# Keep thinking visible, just show completion
print(f"\n💭 {GRAY}Thinking complete ✓{RESET}", flush=True)
print("🤖 AI Response:", flush=True)
continue
# Display thinking content in gray with better formatting
if is_in_thinking and chunk_text.strip():
thinking_content += chunk_text
# Handle line breaks and word wrapping properly
if ' ' in chunk_text or '\n' in chunk_text or len(thinking_content) > 100:
# Split by sentences for better readability
sentences = thinking_content.replace('\n', ' ').split('. ')
for sentence in sentences[:-1]: # Process complete sentences
sentence = sentence.strip()
if sentence:
# Word wrap long sentences
words = sentence.split()
line = ""
for word in words:
if len(line + " " + word) > 70:
if line:
print(f"{GRAY} {line.strip()}{RESET}", flush=True)
thinking_lines_printed += 1
line = word
else:
line += " " + word if line else word
if line.strip():
print(f"{GRAY} {line.strip()}.{RESET}", flush=True)
thinking_lines_printed += 1
# Keep the last incomplete sentence for next iteration
thinking_content = sentences[-1] if sentences else ""
# Display regular response content (skip any leftover thinking)
elif not is_in_thinking and is_thinking_complete and chunk_text.strip():
# Filter out any remaining thinking tags that might leak through
clean_text = chunk_text
if '<think>' in clean_text or '</think>' in clean_text:
clean_text = clean_text.replace('<think>', '').replace('</think>', '')
if clean_text.strip():
print(clean_text, end='', flush=True)
# Check if response is done
if chunk_data.get('done', False):
print() # Final newline
break
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"Error processing stream chunk: {e}")
continue
return full_response
except Exception as e:
logger.error(f"Streaming failed: {e}")
return None
def _handle_streaming_with_early_stop(self, payload: dict, model_name: str, use_thinking: bool, start_time: float) -> Optional[str]:
"""Handle streaming response with intelligent early stopping."""
import json
try:
response = requests.post(
f"{self.ollama_url}/api/generate",
json=payload,
stream=True,
timeout=65
)
if response.status_code != 200:
logger.error(f"Ollama API error: {response.status_code}")
return None
full_response = ""
word_buffer = []
repetition_window = 30 # Check last 30 words for repetition (more context)
stop_threshold = 0.8 # Stop only if 80% of recent words are repetitive (very permissive)
min_response_length = 100 # Don't early stop until we have at least 100 chars
for line in response.iter_lines():
if line:
try:
chunk_data = json.loads(line.decode('utf-8'))
chunk_text = chunk_data.get('response', '')
if chunk_text:
full_response += chunk_text
# Add words to buffer for repetition detection
new_words = chunk_text.split()
word_buffer.extend(new_words)
# Keep only recent words in buffer
if len(word_buffer) > repetition_window:
word_buffer = word_buffer[-repetition_window:]
# Check for repetition patterns after we have enough words AND content
if len(word_buffer) >= repetition_window and len(full_response) >= min_response_length:
unique_words = set(word_buffer)
repetition_ratio = 1 - (len(unique_words) / len(word_buffer))
# Early stop only if repetition is EXTREMELY high (80%+)
if repetition_ratio > stop_threshold:
logger.info(f"Early stopping due to repetition: {repetition_ratio:.2f}")
# Add a gentle completion to the response
if not full_response.strip().endswith(('.', '!', '?')):
full_response += "..."
# Send stop signal to model (attempt to gracefully stop)
try:
stop_payload = {"model": model_name, "stop": True}
requests.post(f"{self.ollama_url}/api/generate", json=stop_payload, timeout=2)
except:
pass # If stop fails, we already have partial response
break
if chunk_data.get('done', False):
break
except json.JSONDecodeError:
continue
return full_response.strip()
except Exception as e:
logger.error(f"Streaming with early stop failed: {e}")
return None
def synthesize_search_results(self, query: str, results: List[Any], project_path: Path) -> SynthesisResult:
"""Synthesize search results into a coherent summary."""
self._ensure_initialized()
if not self.is_available():
return SynthesisResult(
summary="LLM synthesis unavailable (Ollama not running or no models)",
key_points=[],
code_examples=[],
suggested_actions=["Install and run Ollama with a model"],
confidence=0.0
)
# Prepare context from search results
context_parts = []
for i, result in enumerate(results[:8], 1): # Limit to top 8 results
file_path = result.file_path if hasattr(result, 'file_path') else 'unknown'
content = result.content if hasattr(result, 'content') else str(result)
score = result.score if hasattr(result, 'score') else 0.0
context_parts.append(f"""
Result {i} (Score: {score:.3f}):
File: {file_path}
Content: {content[:500]}{'...' if len(content) > 500 else ''}
""")
context = "\n".join(context_parts)
# Create synthesis prompt
prompt = f"""You are a senior software engineer analyzing code search results. Your task is to synthesize the search results into a helpful, actionable summary.
SEARCH QUERY: "{query}"
PROJECT: {project_path.name}
SEARCH RESULTS:
{context}
Please provide a synthesis in the following JSON format:
{{
"summary": "A 2-3 sentence overview of what the search results show",
"key_points": [
"Important finding 1",
"Important finding 2",
"Important finding 3"
],
"code_examples": [
"Relevant code snippet or pattern from the results",
"Another important code example"
],
"suggested_actions": [
"What the developer should do next",
"Additional recommendations"
],
"confidence": 0.85
}}
Focus on:
- What the code does and how it works
- Patterns and relationships between the results
- Practical next steps for the developer
- Code quality observations
Respond with ONLY the JSON, no other text."""
# Get LLM response
response = self._call_ollama(prompt, temperature=0.2)
if not response:
return SynthesisResult(
summary="LLM synthesis failed (API error)",
key_points=[],
code_examples=[],
suggested_actions=["Check Ollama status and try again"],
confidence=0.0
)
# Parse JSON response
try:
# Extract JSON from response (in case there's extra text)
start_idx = response.find('{')
end_idx = response.rfind('}') + 1
if start_idx >= 0 and end_idx > start_idx:
json_str = response[start_idx:end_idx]
data = json.loads(json_str)
return SynthesisResult(
summary=data.get('summary', 'No summary generated'),
key_points=data.get('key_points', []),
code_examples=data.get('code_examples', []),
suggested_actions=data.get('suggested_actions', []),
confidence=float(data.get('confidence', 0.5))
)
else:
# Fallback: use the raw response as summary
return SynthesisResult(
summary=response[:300] + '...' if len(response) > 300 else response,
key_points=[],
code_examples=[],
suggested_actions=[],
confidence=0.3
)
except Exception as e:
logger.error(f"Failed to parse LLM response: {e}")
return SynthesisResult(
summary="LLM synthesis failed (JSON parsing error)",
key_points=[],
code_examples=[],
suggested_actions=["Try the search again or check LLM output"],
confidence=0.0
)
def format_synthesis_output(self, synthesis: SynthesisResult, query: str) -> str:
"""Format synthesis result for display."""
output = []
output.append("🧠 LLM SYNTHESIS")
output.append("=" * 50)
output.append("")
output.append(f"📝 Summary:")
output.append(f" {synthesis.summary}")
output.append("")
if synthesis.key_points:
output.append("🔍 Key Findings:")
for point in synthesis.key_points:
output.append(f"{point}")
output.append("")
if synthesis.code_examples:
output.append("💡 Code Patterns:")
for example in synthesis.code_examples:
output.append(f" {example}")
output.append("")
if synthesis.suggested_actions:
output.append("🎯 Suggested Actions:")
for action in synthesis.suggested_actions:
output.append(f"{action}")
output.append("")
confidence_emoji = "🟢" if synthesis.confidence > 0.7 else "🟡" if synthesis.confidence > 0.4 else "🔴"
output.append(f"{confidence_emoji} Confidence: {synthesis.confidence:.1%}")
output.append("")
return "\n".join(output)
# Quick test function
def test_synthesizer():
"""Test the synthesizer with sample data."""
from dataclasses import dataclass
@dataclass
class MockResult:
file_path: str
content: str
score: float
synthesizer = LLMSynthesizer()
if not synthesizer.is_available():
print("❌ Ollama not available for testing")
return
# Mock search results
results = [
MockResult("auth.py", "def authenticate_user(username, password):\n return verify_credentials(username, password)", 0.95),
MockResult("models.py", "class User:\n def login(self):\n return authenticate_user(self.username, self.password)", 0.87)
]
synthesis = synthesizer.synthesize_search_results(
"user authentication",
results,
Path("/test/project")
)
print(synthesizer.format_synthesis_output(synthesis, "user authentication"))
if __name__ == "__main__":
test_synthesizer()