"""LLM-based calibration analysis.""" import logging import json import re from typing import List, Dict, Any, Optional, Tuple from src.email_providers.base import Email from src.llm.base import BaseLLMProvider from src.calibration.category_cache import CategoryCache logger = logging.getLogger(__name__) class CalibrationAnalyzer: """ Use LLM to discover natural categories in email sample. This runs ONCE during calibration to understand what categories exist naturally in this inbox. """ def __init__( self, llm_provider: BaseLLMProvider, config: Dict[str, Any], embedding_model=None ): """Initialize calibration analyzer.""" self.llm_provider = llm_provider self.config = config self.llm_available = llm_provider.is_available() # Initialize category cache for cross-mailbox consistency cache_path = config.get('category_cache_path', 'src/models/category_cache.json') self.category_cache = CategoryCache(cache_path, embedding_model=embedding_model, llm_provider=llm_provider) if not self.llm_available: logger.warning("LLM not available for calibration analysis") def discover_categories( self, sample_emails: List[Email] ) -> Tuple[Dict[str, Any], List[Tuple[str, str]]]: """ Discover natural categories in email sample. Args: sample_emails: Stratified sample of emails Returns: (category_map, email_labels) where: - category_map: discovered categories with descriptions - email_labels: list of (email_id, assigned_category) """ if not self.llm_available: logger.warning("LLM unavailable, using default categories") return self._default_categories(), [] logger.info(f"Starting LLM category discovery on {len(sample_emails)} emails") # Batch emails for analysis batch_size = 20 discovered_categories = {} email_labels = [] for batch_idx in range(0, len(sample_emails), batch_size): batch = sample_emails[batch_idx:batch_idx + batch_size] try: batch_results = self._analyze_batch(batch, batch_idx) logger.debug(f"Batch results: {len(batch_results.get('categories', {}))} categories, {len(batch_results.get('labels', []))} labels") # Merge categories for category, desc in batch_results.get('categories', {}).items(): if category not in discovered_categories: discovered_categories[category] = desc logger.debug(f"Discovered new category: {category}") # Collect labels for email_id, category in batch_results.get('labels', []): email_labels.append((email_id, category)) logger.debug(f"Label: {email_id} -> {category}") except Exception as e: logger.error(f"Error analyzing batch {batch_idx}: {e}", exc_info=True) logger.info(f"Discovery complete: {len(discovered_categories)} categories found") # Step 2: Consolidate overlapping/duplicate categories if len(discovered_categories) > 10: # Only consolidate if too many categories logger.info(f"Consolidating {len(discovered_categories)} categories...") consolidated = self._consolidate_categories(discovered_categories, email_labels) if len(consolidated) < len(discovered_categories): discovered_categories = consolidated logger.info(f"After consolidation: {len(discovered_categories)} categories") else: logger.warning("Consolidation didn't reduce categories, keeping original") # Step 3: Snap to cached categories for cross-mailbox consistency use_cache = self.config.get('use_category_cache', True) if use_cache and self.category_cache: similarity_threshold = self.config.get('cache_similarity_threshold', 0.7) allow_new = self.config.get('cache_allow_new', True) max_new = self.config.get('cache_max_new', 3) logger.info(f"Snapping to cached categories (threshold={similarity_threshold}, allow_new={allow_new}, max_new={max_new})") final_categories, snap_mapping, cache_worthy = self.category_cache.snap_to_cache( discovered_categories, similarity_threshold=similarity_threshold, allow_new=allow_new, max_new=max_new ) # Update email labels with snapped categories for i, (email_id, old_cat) in enumerate(email_labels): if old_cat in snap_mapping: email_labels[i] = (email_id, snap_mapping[old_cat]) logger.info(f"After cache snap: {len(final_categories)} categories") discovered_categories = final_categories # Update cache with usage counts AND add cache-worthy new categories category_counts = {} for _, cat in email_labels: category_counts[cat] = category_counts.get(cat, 0) + 1 # Add cache-worthy categories to persistent cache if cache_worthy: cache_additions = {name: desc for name, desc in cache_worthy} logger.info(f"Adding {len(cache_worthy)} LLM-approved categories to persistent cache: {list(cache_additions.keys())}") self.category_cache.update_cache(cache_additions, category_counts) else: # Just update usage counts for existing categories self.category_cache.update_cache(discovered_categories, category_counts) return discovered_categories, email_labels def _analyze_batch(self, batch: List[Email], batch_idx: int = 0) -> Dict[str, Any]: """Analyze single batch of emails.""" # Calculate analytical patterns sender_domains = {} recipients_count = [] has_attachments = 0 avg_subject_len = 0 common_keywords = {} for e in batch: # Domain analysis if '@' in e.sender: domain = e.sender.split('@')[1].lower() sender_domains[domain] = sender_domains.get(domain, 0) + 1 # Recipient count recipient_count = len(e.recipients) if hasattr(e, 'recipients') else 1 recipients_count.append(recipient_count) # Attachments if hasattr(e, 'has_attachments') and e.has_attachments: has_attachments += 1 # Subject length avg_subject_len += len(e.subject) # Extract keywords from subject (simple word frequency) words = e.subject.lower().split() for word in words: if len(word) > 3: # Skip short words common_keywords[word] = common_keywords.get(word, 0) + 1 # Build statistics summary top_domains = sorted(sender_domains.items(), key=lambda x: x[1], reverse=True)[:5] top_keywords = sorted(common_keywords.items(), key=lambda x: x[1], reverse=True)[:10] avg_recipients = sum(recipients_count) / len(recipients_count) if recipients_count else 0 avg_subject_len = avg_subject_len / len(batch) if batch else 0 stats_summary = f"""BATCH STATISTICS ({len(batch)} emails): - Top sender domains: {', '.join([f'{d} ({c})' for d, c in top_domains])} - Avg recipients per email: {avg_recipients:.1f} - Emails with attachments: {has_attachments}/{len(batch)} - Avg subject length: {avg_subject_len:.0f} chars - Common keywords: {', '.join([f'{w}({c})' for w, c in top_keywords[:5]])}""" # Build email summary with actual IDs email_list = [] for i, e in enumerate(batch): email_list.append(f"{i+1}. ID: {e.id}\n From: {e.sender}\n Subject: {e.subject}\n Preview: {e.body_snippet[:100]}...") email_summary = "\n\n".join(email_list) # Use first email ID as example example_id = batch[0].id if batch else "maildir_example__sent_1" prompt = f"""You are analyzing emails to discover natural categories for an automatic classification system. GOAL: Identify broad, reusable categories that will help train a machine learning model to sort thousands of emails automatically. GUIDELINES FOR GOOD CATEGORIES: - BROAD & TIMELESS: "Financial" not "Q3 Budget Review" - USER-FOCUSED: Think "what would help someone find this email later?" - LEARNABLE: ML model needs consistent patterns (sender domains, keywords, structure) - FUNCTIONAL: Each category serves a distinct purpose - 3-10 categories ideal: Too many = noise, too few = useless {stats_summary} EMAILS TO ANALYZE: {email_summary} TASK: 1. Identify natural groupings based on PURPOSE, not just topic 2. Create SHORT (1-3 word) category names 3. Assign each email to exactly one category 4. CRITICAL: Copy EXACT email IDs - if email #1 shows ID "{example_id}", use exactly "{example_id}" in labels EXAMPLES OF GOOD CATEGORIES: - "Work Communication" (daily business emails) - "Financial" (invoices, budgets, reports) - "Urgent" (time-sensitive requests) - "Technical" (system alerts, dev discussions) - "Administrative" (HR, policies, announcements) Return JSON: {{ "categories": {{"category_name": "what user need this serves", ...}}, "labels": [["{example_id}", "category"], ...] }} JSON: """ try: response = self.llm_provider.complete( prompt, temperature=0.1, max_tokens=2000 ) # Save first batch for debugging if batch_idx == 0: with open('debug_prompt.txt', 'w') as f: f.write(prompt) with open('debug_response.txt', 'w') as f: f.write(response) logger.info("Saved first batch prompt and response to debug_*.txt") logger.debug(f"LLM raw response preview: {response[:500]}") parsed = self._parse_response(response) # Log parsing result if batch_idx == 0: with open('debug_parsed.txt', 'w') as f: import json f.write(json.dumps(parsed, indent=2)) return parsed except Exception as e: logger.error(f"LLM analysis failed: {e}") return {'categories': {}, 'labels': []} def _parse_response(self, response: str) -> Dict[str, Any]: """Parse LLM JSON response.""" try: # Strip tags if present cleaned = re.sub(r'.*?', '', response, flags=re.DOTALL) # Extract JSON json_match = re.search(r'\{.*\}', cleaned, re.DOTALL) if json_match: parsed = json.loads(json_match.group()) logger.debug(f"Successfully parsed JSON: {len(parsed.get('categories', {}))} categories, {len(parsed.get('labels', []))} labels") return parsed except json.JSONDecodeError as e: logger.warning(f"JSON parse error: {e}") logger.debug(f"Response preview: {response[:200]}") logger.warning(f"Failed to parse LLM response, returning empty") return {'categories': {}, 'labels': []} def _consolidate_categories( self, discovered_categories: Dict[str, str], email_labels: List[Tuple[str, str]] ) -> Dict[str, str]: """ Consolidate overlapping/duplicate categories using LLM. Takes all discovered categories and merges similar ones into a lean, non-redundant set. Configuration parameters (from self.config): - target_categories: Max number of final categories (default: 10) - min_category_size: Merge categories with fewer emails (default: 3) - inbox_context: Optional user description of inbox purpose - consolidation_temperature: LLM temperature for consolidation (default: 0.1) - consolidation_examples: Optional list of example merges Returns: Dict of consolidated categories with descriptions """ if not self.llm_available: logger.warning("LLM unavailable, skipping consolidation") return discovered_categories # Edge case: Too few categories to consolidate if len(discovered_categories) <= 5: logger.info(f"Only {len(discovered_categories)} categories, skipping consolidation") return discovered_categories # Edge case: Empty labels if not email_labels: logger.warning("No email labels provided, cannot consolidate") return discovered_categories # Get configuration parameters with validation target_categories = max(3, self.config.get('target_categories', 10)) # Min 3 categories min_category_size = max(1, self.config.get('min_category_size', 3)) # Min 1 email inbox_context = self.config.get('inbox_context', '') temperature = max(0.0, min(1.0, self.config.get('consolidation_temperature', 0.1))) # Clamp 0-1 user_examples = self.config.get('consolidation_examples', []) # Build category list with counts and sort by email count category_counts = {} for _, cat in email_labels: category_counts[cat] = category_counts.get(cat, 0) + 1 # Sort by count descending for better merging decisions sorted_categories = sorted( discovered_categories.items(), key=lambda x: category_counts.get(x[0], 0), reverse=True ) category_list = "\n".join([ f"- {cat}: {desc} ({category_counts.get(cat, 0)} emails)" for cat, desc in sorted_categories ]) # Build context section context_parts = [] # Add cached categories as consolidation hints if self.category_cache: cached_cats = self.category_cache.get_cached_categories() if cached_cats: cache_stats = self.category_cache.get_stats() cache_list = "\n".join([ f" - {name}: {desc}" for name, desc in list(cached_cats.items())[:15] # Show top 15 ]) context_parts.append(f"""CACHED CATEGORIES ({cache_stats['total_categories']} total, showing top 15): These are established categories from previous mailboxes. PREFER consolidating to these when semantically appropriate to maintain cross-mailbox consistency. {cache_list}""") if inbox_context: context_parts.append(f"INBOX CONTEXT: {inbox_context}") if user_examples: examples_text = "\n".join([f" - {ex}" for ex in user_examples]) context_parts.append(f"USER MERGE EXAMPLES:\n{examples_text}") context_section = "\n\n".join(context_parts) + "\n" if context_parts else "" # Build consolidation rules rules = [ "1. AGGRESSIVELY merge similar/overlapping categories:", " - Semantic overlap: 'Meeting Coordination' + 'Meeting Invitations' → 'Meetings'", " - Variants: 'Survey & Feedback' + 'Survey/Information' → 'Surveys'", " - Prefixes: All 'Forwarded X' → 'Forwarded'", f"2. Merge categories with <{min_category_size} emails into broader categories", f"3. STRICT TARGET: {target_categories} final categories maximum", "4. Preserve high-count categories when possible", "5. Use SHORT, generic names (1-2 words preferred)", "6. Only keep separate if functionally distinct (e.g., 'Financial' vs 'Technical')", "7. Map EVERY old category to a final category (no unmapped categories)" ] rules_text = "\n".join(rules) # Build prompt prompt = f"""You are helping build an email classification system that will automatically sort thousands of emails. TASK: Consolidate the discovered categories below into a lean, effective set for training a machine learning classifier. WHY THIS MATTERS: These categories will be used to: 1. Train a LightGBM classifier on email features (embeddings, patterns, structure) 2. Automatically label thousands of emails without human intervention 3. Help users quickly find emails by category (like Gmail labels) WHAT MAKES GOOD CATEGORIES: - BROAD & REUSABLE: "Meetings" not "Q3 Planning Meeting" - applies to many emails - FUNCTIONALLY DISTINCT: Each category serves a different user need - BALANCED: Avoid 1 huge category + many tiny ones - LEARNABLE: ML model needs clear patterns to distinguish categories - TIMELESS: "Financial Reports" not "2023 Budget Review" - ACTION-ORIENTED: Users ask "show me all X" - what is X? DISCOVERED CATEGORIES (sorted by email count): {category_list} {context_section}CONSOLIDATION STRATEGY: {rules_text} THINK LIKE A USER: If you had to sort 10,000 emails, what categories would help you find things fast? - "Work Communication" catches daily business emails - "Urgent" flags time-sensitive items - "Financial" groups all money-related emails - "Technical" vs "Administrative" serves different workflows OUTPUT FORMAT - Return JSON with consolidated categories and mapping: {{ "consolidated": {{ "FinalCategoryName": "Clear description of what user need this serves" }}, "mappings": {{ "OldCategoryName": "FinalCategoryName" }} }} CRITICAL REQUIREMENTS: - Maximum {target_categories} final categories (strict limit) - Map EVERY old category to exactly one final category - Final category names must be SHORT (1-3 words), GENERIC, and REUSABLE - Think: "Would this category still make sense in 5 years?" JSON: """ try: response = self.llm_provider.complete( prompt, temperature=temperature, max_tokens=3000 ) # Parse response cleaned = re.sub(r'.*?', '', response, flags=re.DOTALL) json_match = re.search(r'\{.*\}', cleaned, re.DOTALL) if json_match: result = json.loads(json_match.group()) consolidated = result.get('consolidated', {}) mappings = result.get('mappings', {}) # Validation 1: Check result structure if not isinstance(consolidated, dict) or not isinstance(mappings, dict): logger.error(f"Invalid LLM response structure: consolidated={type(consolidated)}, mappings={type(mappings)}") return discovered_categories # Validation 2: Check consolidation reduced categories if len(consolidated) >= len(discovered_categories): logger.warning(f"Consolidation didn't reduce categories: {len(consolidated)} >= {len(discovered_categories)}") return self._fallback_consolidation(discovered_categories, category_counts, target_categories) # Validation 3: Check target compliance (soft limit) if len(consolidated) > target_categories * 1.5: # Allow 50% overage logger.warning(f"Consolidation far exceeded target: {len(consolidated)} > {target_categories}") # Validation 4: Check all old categories are mapped old_categories = set(discovered_categories.keys()) mapped_categories = set(mappings.keys()) unmapped_cats = old_categories - mapped_categories if unmapped_cats: logger.error(f"LLM failed to map {len(unmapped_cats)} categories: {list(unmapped_cats)[:3]}") # Fill in missing mappings with fallback for cat in unmapped_cats: # Map to most similar consolidated category or create new one if consolidated: mappings[cat] = list(consolidated.keys())[0] # Fallback to first category logger.warning(f"Auto-mapped unmapped category: {cat} → {mappings[cat]}") else: logger.error("Cannot map categories - no consolidated categories exist") return discovered_categories # Validation 5: Check all mapped targets exist in consolidated invalid_mappings = [] for old_cat, new_cat in mappings.items(): if new_cat not in consolidated: invalid_mappings.append((old_cat, new_cat)) if invalid_mappings: logger.error(f"Invalid mappings to non-existent categories: {invalid_mappings[:3]}") # Create missing consolidated categories for old_cat, new_cat in invalid_mappings: if old_cat in discovered_categories: consolidated[new_cat] = discovered_categories[old_cat] logger.warning(f"Created missing consolidated category: {new_cat}") # Update email_labels to use consolidated categories failed_updates = [] for i, (email_id, old_cat) in enumerate(email_labels): if old_cat in mappings: new_cat = mappings[old_cat] if new_cat in consolidated: email_labels[i] = (email_id, new_cat) else: failed_updates.append((email_id, old_cat, new_cat)) else: failed_updates.append((email_id, old_cat, None)) if failed_updates: logger.error(f"Failed to update {len(failed_updates)} email labels") logger.debug(f"First 3 failures: {failed_updates[:3]}") logger.info(f"Consolidated {len(discovered_categories)} → {len(consolidated)} categories") for old, new in list(mappings.items())[:5]: logger.info(f" Merged: {old} → {new}") # Final validation: Check we have valid consolidated categories if not consolidated: logger.error("Consolidation resulted in 0 categories, using fallback") return self._fallback_consolidation(discovered_categories, category_counts, target_categories) return consolidated except json.JSONDecodeError as e: logger.error(f"Consolidation JSON parse error: {e}") logger.debug(f"Response: {response[:500]}") return self._fallback_consolidation(discovered_categories, category_counts, target_categories) except Exception as e: logger.error(f"Consolidation failed: {e}", exc_info=True) return self._fallback_consolidation(discovered_categories, category_counts, target_categories) def _fallback_consolidation( self, discovered_categories: Dict[str, str], category_counts: Dict[str, int], target_categories: int ) -> Dict[str, str]: """ Fallback consolidation using simple heuristic (top-N by count). Used when LLM consolidation fails or produces invalid results. """ logger.info(f"Using fallback consolidation: selecting top {target_categories} categories by count") # Sort by count descending sorted_by_count = sorted( category_counts.items(), key=lambda x: x[1], reverse=True ) # Take top N categories top_categories = sorted_by_count[:target_categories] # Build consolidated dict consolidated = {} for cat, count in top_categories: if cat in discovered_categories: consolidated[cat] = discovered_categories[cat] else: consolidated[cat] = f"Category with {count} emails" logger.info(f"Fallback consolidated to {len(consolidated)} categories (top by count)") for cat, count in top_categories[:5]: logger.info(f" {cat}: {count} emails") return consolidated def _default_categories(self) -> Dict[str, Any]: """Return default categories.""" return { 'junk': 'Spam and unwanted emails', 'transactional': 'Receipts and confirmations', 'auth': 'Authentication and security', 'newsletters': 'Newsletters and subscriptions', 'work': 'Work correspondence', 'personal': 'Personal emails', 'finance': 'Financial documents', 'unknown': 'Unclassified' }