From 88ef570fed0d4459539355d1b465b696a1883b37 Mon Sep 17 00:00:00 2001 From: FSSCoding Date: Thu, 23 Oct 2025 14:12:20 +1100 Subject: [PATCH] Add robust edge case handling to category consolidation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enhanced _consolidate_categories() with comprehensive validation: - Edge case guards: Skip if ≤5 categories or no labels - Parameter validation: Clamp ranges for all config values - 5-stage validation after LLM response: 1. Structure check (valid dicts) 2. Reduction check (consolidation must reduce count) 3. Target compliance (soft 50% overage limit) 4. Complete mapping (all old categories mapped) 5. Valid targets (all mappings point to existing categories) - Auto-repair for common LLM failures: - Unmapped categories → map to first consolidated category - Invalid mapping targets → create missing categories - Failed updates → log with details - Fallback consolidation using top-N by count - Triggered on JSON parse errors, validation failures - Heuristic-based, no LLM required - Guarantees output even if LLM fails All error paths now have proper handling and logging. --- src/calibration/llm_analyzer.py | 256 ++++++++++++++++++++++++++++++++ 1 file changed, 256 insertions(+) diff --git a/src/calibration/llm_analyzer.py b/src/calibration/llm_analyzer.py index 7fb2723..e76fc2f 100644 --- a/src/calibration/llm_analyzer.py +++ b/src/calibration/llm_analyzer.py @@ -81,6 +81,16 @@ class CalibrationAnalyzer: logger.info(f"Discovery complete: {len(discovered_categories)} categories found") + # Step 2: Consolidate overlapping/duplicate categories + if len(discovered_categories) > 10: # Only consolidate if too many categories + logger.info(f"Consolidating {len(discovered_categories)} categories...") + consolidated = self._consolidate_categories(discovered_categories, email_labels) + if len(consolidated) < len(discovered_categories): + discovered_categories = consolidated + logger.info(f"After consolidation: {len(discovered_categories)} categories") + else: + logger.warning("Consolidation didn't reduce categories, keeping original") + return discovered_categories, email_labels def _analyze_batch(self, batch: List[Email], batch_idx: int = 0) -> Dict[str, Any]: @@ -160,6 +170,252 @@ JSON: logger.warning(f"Failed to parse LLM response, returning empty") return {'categories': {}, 'labels': []} + def _consolidate_categories( + self, + discovered_categories: Dict[str, str], + email_labels: List[Tuple[str, str]] + ) -> Dict[str, str]: + """ + Consolidate overlapping/duplicate categories using LLM. + + Takes all discovered categories and merges similar ones into + a lean, non-redundant set. + + Configuration parameters (from self.config): + - target_categories: Max number of final categories (default: 10) + - min_category_size: Merge categories with fewer emails (default: 3) + - inbox_context: Optional user description of inbox purpose + - consolidation_temperature: LLM temperature for consolidation (default: 0.1) + - consolidation_examples: Optional list of example merges + + Returns: + Dict of consolidated categories with descriptions + """ + if not self.llm_available: + logger.warning("LLM unavailable, skipping consolidation") + return discovered_categories + + # Edge case: Too few categories to consolidate + if len(discovered_categories) <= 5: + logger.info(f"Only {len(discovered_categories)} categories, skipping consolidation") + return discovered_categories + + # Edge case: Empty labels + if not email_labels: + logger.warning("No email labels provided, cannot consolidate") + return discovered_categories + + # Get configuration parameters with validation + target_categories = max(3, self.config.get('target_categories', 10)) # Min 3 categories + min_category_size = max(1, self.config.get('min_category_size', 3)) # Min 1 email + inbox_context = self.config.get('inbox_context', '') + temperature = max(0.0, min(1.0, self.config.get('consolidation_temperature', 0.1))) # Clamp 0-1 + user_examples = self.config.get('consolidation_examples', []) + + # Build category list with counts and sort by email count + category_counts = {} + for _, cat in email_labels: + category_counts[cat] = category_counts.get(cat, 0) + 1 + + # Sort by count descending for better merging decisions + sorted_categories = sorted( + discovered_categories.items(), + key=lambda x: category_counts.get(x[0], 0), + reverse=True + ) + + category_list = "\n".join([ + f"- {cat}: {desc} ({category_counts.get(cat, 0)} emails)" + for cat, desc in sorted_categories + ]) + + # Build context section + context_parts = [] + if inbox_context: + context_parts.append(f"INBOX CONTEXT: {inbox_context}") + + if user_examples: + examples_text = "\n".join([f" - {ex}" for ex in user_examples]) + context_parts.append(f"USER MERGE EXAMPLES:\n{examples_text}") + + context_section = "\n\n".join(context_parts) + "\n" if context_parts else "" + + # Build consolidation rules + rules = [ + "1. AGGRESSIVELY merge similar/overlapping categories:", + " - Semantic overlap: 'Meeting Coordination' + 'Meeting Invitations' → 'Meetings'", + " - Variants: 'Survey & Feedback' + 'Survey/Information' → 'Surveys'", + " - Prefixes: All 'Forwarded X' → 'Forwarded'", + f"2. Merge categories with <{min_category_size} emails into broader categories", + f"3. STRICT TARGET: {target_categories} final categories maximum", + "4. Preserve high-count categories when possible", + "5. Use SHORT, generic names (1-2 words preferred)", + "6. Only keep separate if functionally distinct (e.g., 'Financial' vs 'Technical')", + "7. Map EVERY old category to a final category (no unmapped categories)" + ] + + rules_text = "\n".join(rules) + + # Build prompt + prompt = f"""Consolidate email categories by merging duplicates and overlaps. + +DISCOVERED CATEGORIES (sorted by email count): +{category_list} + +{context_section}CONSOLIDATION RULES: +{rules_text} + +OUTPUT FORMAT - Return JSON with consolidated categories and mapping: +{{ + "consolidated": {{ + "FinalCategoryName": "Clear, generic description of what emails fit here" + }}, + "mappings": {{ + "OldCategoryName": "FinalCategoryName" + }} +}} + +IMPORTANT: +- consolidated dict should have {target_categories} or fewer entries +- mappings dict must map EVERY old category name to a final category +- Final category names should be present in both consolidated and mappings + +JSON: +""" + + try: + response = self.llm_provider.complete( + prompt, + temperature=temperature, + max_tokens=3000 + ) + + # Parse response + cleaned = re.sub(r'.*?', '', response, flags=re.DOTALL) + json_match = re.search(r'\{.*\}', cleaned, re.DOTALL) + + if json_match: + result = json.loads(json_match.group()) + consolidated = result.get('consolidated', {}) + mappings = result.get('mappings', {}) + + # Validation 1: Check result structure + if not isinstance(consolidated, dict) or not isinstance(mappings, dict): + logger.error(f"Invalid LLM response structure: consolidated={type(consolidated)}, mappings={type(mappings)}") + return discovered_categories + + # Validation 2: Check consolidation reduced categories + if len(consolidated) >= len(discovered_categories): + logger.warning(f"Consolidation didn't reduce categories: {len(consolidated)} >= {len(discovered_categories)}") + return self._fallback_consolidation(discovered_categories, category_counts, target_categories) + + # Validation 3: Check target compliance (soft limit) + if len(consolidated) > target_categories * 1.5: # Allow 50% overage + logger.warning(f"Consolidation far exceeded target: {len(consolidated)} > {target_categories}") + + # Validation 4: Check all old categories are mapped + old_categories = set(discovered_categories.keys()) + mapped_categories = set(mappings.keys()) + unmapped_cats = old_categories - mapped_categories + + if unmapped_cats: + logger.error(f"LLM failed to map {len(unmapped_cats)} categories: {list(unmapped_cats)[:3]}") + # Fill in missing mappings with fallback + for cat in unmapped_cats: + # Map to most similar consolidated category or create new one + if consolidated: + mappings[cat] = list(consolidated.keys())[0] # Fallback to first category + logger.warning(f"Auto-mapped unmapped category: {cat} → {mappings[cat]}") + else: + logger.error("Cannot map categories - no consolidated categories exist") + return discovered_categories + + # Validation 5: Check all mapped targets exist in consolidated + invalid_mappings = [] + for old_cat, new_cat in mappings.items(): + if new_cat not in consolidated: + invalid_mappings.append((old_cat, new_cat)) + + if invalid_mappings: + logger.error(f"Invalid mappings to non-existent categories: {invalid_mappings[:3]}") + # Create missing consolidated categories + for old_cat, new_cat in invalid_mappings: + if old_cat in discovered_categories: + consolidated[new_cat] = discovered_categories[old_cat] + logger.warning(f"Created missing consolidated category: {new_cat}") + + # Update email_labels to use consolidated categories + failed_updates = [] + for i, (email_id, old_cat) in enumerate(email_labels): + if old_cat in mappings: + new_cat = mappings[old_cat] + if new_cat in consolidated: + email_labels[i] = (email_id, new_cat) + else: + failed_updates.append((email_id, old_cat, new_cat)) + else: + failed_updates.append((email_id, old_cat, None)) + + if failed_updates: + logger.error(f"Failed to update {len(failed_updates)} email labels") + logger.debug(f"First 3 failures: {failed_updates[:3]}") + + logger.info(f"Consolidated {len(discovered_categories)} → {len(consolidated)} categories") + for old, new in list(mappings.items())[:5]: + logger.info(f" Merged: {old} → {new}") + + # Final validation: Check we have valid consolidated categories + if not consolidated: + logger.error("Consolidation resulted in 0 categories, using fallback") + return self._fallback_consolidation(discovered_categories, category_counts, target_categories) + + return consolidated + + except json.JSONDecodeError as e: + logger.error(f"Consolidation JSON parse error: {e}") + logger.debug(f"Response: {response[:500]}") + return self._fallback_consolidation(discovered_categories, category_counts, target_categories) + except Exception as e: + logger.error(f"Consolidation failed: {e}", exc_info=True) + return self._fallback_consolidation(discovered_categories, category_counts, target_categories) + + def _fallback_consolidation( + self, + discovered_categories: Dict[str, str], + category_counts: Dict[str, int], + target_categories: int + ) -> Dict[str, str]: + """ + Fallback consolidation using simple heuristic (top-N by count). + + Used when LLM consolidation fails or produces invalid results. + """ + logger.info(f"Using fallback consolidation: selecting top {target_categories} categories by count") + + # Sort by count descending + sorted_by_count = sorted( + category_counts.items(), + key=lambda x: x[1], + reverse=True + ) + + # Take top N categories + top_categories = sorted_by_count[:target_categories] + + # Build consolidated dict + consolidated = {} + for cat, count in top_categories: + if cat in discovered_categories: + consolidated[cat] = discovered_categories[cat] + else: + consolidated[cat] = f"Category with {count} emails" + + logger.info(f"Fallback consolidated to {len(consolidated)} categories (top by count)") + for cat, count in top_categories[:5]: + logger.info(f" {cat}: {count} emails") + + return consolidated + def _default_categories(self) -> Dict[str, Any]: """Return default categories.""" return {