Add robust edge case handling to category consolidation

Enhanced _consolidate_categories() with comprehensive validation:

- Edge case guards: Skip if ≤5 categories or no labels
- Parameter validation: Clamp ranges for all config values
- 5-stage validation after LLM response:
  1. Structure check (valid dicts)
  2. Reduction check (consolidation must reduce count)
  3. Target compliance (soft 50% overage limit)
  4. Complete mapping (all old categories mapped)
  5. Valid targets (all mappings point to existing categories)

- Auto-repair for common LLM failures:
  - Unmapped categories → map to first consolidated category
  - Invalid mapping targets → create missing categories
  - Failed updates → log with details

- Fallback consolidation using top-N by count
  - Triggered on JSON parse errors, validation failures
  - Heuristic-based, no LLM required
  - Guarantees output even if LLM fails

All error paths now have proper handling and logging.
This commit is contained in:
FSSCoding 2025-10-23 14:12:20 +11:00
parent 50ddaa4b39
commit 88ef570fed

View File

@ -81,6 +81,16 @@ class CalibrationAnalyzer:
logger.info(f"Discovery complete: {len(discovered_categories)} categories found") logger.info(f"Discovery complete: {len(discovered_categories)} categories found")
# Step 2: Consolidate overlapping/duplicate categories
if len(discovered_categories) > 10: # Only consolidate if too many categories
logger.info(f"Consolidating {len(discovered_categories)} categories...")
consolidated = self._consolidate_categories(discovered_categories, email_labels)
if len(consolidated) < len(discovered_categories):
discovered_categories = consolidated
logger.info(f"After consolidation: {len(discovered_categories)} categories")
else:
logger.warning("Consolidation didn't reduce categories, keeping original")
return discovered_categories, email_labels return discovered_categories, email_labels
def _analyze_batch(self, batch: List[Email], batch_idx: int = 0) -> Dict[str, Any]: def _analyze_batch(self, batch: List[Email], batch_idx: int = 0) -> Dict[str, Any]:
@ -160,6 +170,252 @@ JSON:
logger.warning(f"Failed to parse LLM response, returning empty") logger.warning(f"Failed to parse LLM response, returning empty")
return {'categories': {}, 'labels': []} return {'categories': {}, 'labels': []}
def _consolidate_categories(
self,
discovered_categories: Dict[str, str],
email_labels: List[Tuple[str, str]]
) -> Dict[str, str]:
"""
Consolidate overlapping/duplicate categories using LLM.
Takes all discovered categories and merges similar ones into
a lean, non-redundant set.
Configuration parameters (from self.config):
- target_categories: Max number of final categories (default: 10)
- min_category_size: Merge categories with fewer emails (default: 3)
- inbox_context: Optional user description of inbox purpose
- consolidation_temperature: LLM temperature for consolidation (default: 0.1)
- consolidation_examples: Optional list of example merges
Returns:
Dict of consolidated categories with descriptions
"""
if not self.llm_available:
logger.warning("LLM unavailable, skipping consolidation")
return discovered_categories
# Edge case: Too few categories to consolidate
if len(discovered_categories) <= 5:
logger.info(f"Only {len(discovered_categories)} categories, skipping consolidation")
return discovered_categories
# Edge case: Empty labels
if not email_labels:
logger.warning("No email labels provided, cannot consolidate")
return discovered_categories
# Get configuration parameters with validation
target_categories = max(3, self.config.get('target_categories', 10)) # Min 3 categories
min_category_size = max(1, self.config.get('min_category_size', 3)) # Min 1 email
inbox_context = self.config.get('inbox_context', '')
temperature = max(0.0, min(1.0, self.config.get('consolidation_temperature', 0.1))) # Clamp 0-1
user_examples = self.config.get('consolidation_examples', [])
# Build category list with counts and sort by email count
category_counts = {}
for _, cat in email_labels:
category_counts[cat] = category_counts.get(cat, 0) + 1
# Sort by count descending for better merging decisions
sorted_categories = sorted(
discovered_categories.items(),
key=lambda x: category_counts.get(x[0], 0),
reverse=True
)
category_list = "\n".join([
f"- {cat}: {desc} ({category_counts.get(cat, 0)} emails)"
for cat, desc in sorted_categories
])
# Build context section
context_parts = []
if inbox_context:
context_parts.append(f"INBOX CONTEXT: {inbox_context}")
if user_examples:
examples_text = "\n".join([f" - {ex}" for ex in user_examples])
context_parts.append(f"USER MERGE EXAMPLES:\n{examples_text}")
context_section = "\n\n".join(context_parts) + "\n" if context_parts else ""
# Build consolidation rules
rules = [
"1. AGGRESSIVELY merge similar/overlapping categories:",
" - Semantic overlap: 'Meeting Coordination' + 'Meeting Invitations''Meetings'",
" - Variants: 'Survey & Feedback' + 'Survey/Information''Surveys'",
" - Prefixes: All 'Forwarded X''Forwarded'",
f"2. Merge categories with <{min_category_size} emails into broader categories",
f"3. STRICT TARGET: {target_categories} final categories maximum",
"4. Preserve high-count categories when possible",
"5. Use SHORT, generic names (1-2 words preferred)",
"6. Only keep separate if functionally distinct (e.g., 'Financial' vs 'Technical')",
"7. Map EVERY old category to a final category (no unmapped categories)"
]
rules_text = "\n".join(rules)
# Build prompt
prompt = f"""<no_think>Consolidate email categories by merging duplicates and overlaps.
DISCOVERED CATEGORIES (sorted by email count):
{category_list}
{context_section}CONSOLIDATION RULES:
{rules_text}
OUTPUT FORMAT - Return JSON with consolidated categories and mapping:
{{
"consolidated": {{
"FinalCategoryName": "Clear, generic description of what emails fit here"
}},
"mappings": {{
"OldCategoryName": "FinalCategoryName"
}}
}}
IMPORTANT:
- consolidated dict should have {target_categories} or fewer entries
- mappings dict must map EVERY old category name to a final category
- Final category names should be present in both consolidated and mappings
JSON:
"""
try:
response = self.llm_provider.complete(
prompt,
temperature=temperature,
max_tokens=3000
)
# Parse response
cleaned = re.sub(r'<think>.*?</think>', '', response, flags=re.DOTALL)
json_match = re.search(r'\{.*\}', cleaned, re.DOTALL)
if json_match:
result = json.loads(json_match.group())
consolidated = result.get('consolidated', {})
mappings = result.get('mappings', {})
# Validation 1: Check result structure
if not isinstance(consolidated, dict) or not isinstance(mappings, dict):
logger.error(f"Invalid LLM response structure: consolidated={type(consolidated)}, mappings={type(mappings)}")
return discovered_categories
# Validation 2: Check consolidation reduced categories
if len(consolidated) >= len(discovered_categories):
logger.warning(f"Consolidation didn't reduce categories: {len(consolidated)} >= {len(discovered_categories)}")
return self._fallback_consolidation(discovered_categories, category_counts, target_categories)
# Validation 3: Check target compliance (soft limit)
if len(consolidated) > target_categories * 1.5: # Allow 50% overage
logger.warning(f"Consolidation far exceeded target: {len(consolidated)} > {target_categories}")
# Validation 4: Check all old categories are mapped
old_categories = set(discovered_categories.keys())
mapped_categories = set(mappings.keys())
unmapped_cats = old_categories - mapped_categories
if unmapped_cats:
logger.error(f"LLM failed to map {len(unmapped_cats)} categories: {list(unmapped_cats)[:3]}")
# Fill in missing mappings with fallback
for cat in unmapped_cats:
# Map to most similar consolidated category or create new one
if consolidated:
mappings[cat] = list(consolidated.keys())[0] # Fallback to first category
logger.warning(f"Auto-mapped unmapped category: {cat}{mappings[cat]}")
else:
logger.error("Cannot map categories - no consolidated categories exist")
return discovered_categories
# Validation 5: Check all mapped targets exist in consolidated
invalid_mappings = []
for old_cat, new_cat in mappings.items():
if new_cat not in consolidated:
invalid_mappings.append((old_cat, new_cat))
if invalid_mappings:
logger.error(f"Invalid mappings to non-existent categories: {invalid_mappings[:3]}")
# Create missing consolidated categories
for old_cat, new_cat in invalid_mappings:
if old_cat in discovered_categories:
consolidated[new_cat] = discovered_categories[old_cat]
logger.warning(f"Created missing consolidated category: {new_cat}")
# Update email_labels to use consolidated categories
failed_updates = []
for i, (email_id, old_cat) in enumerate(email_labels):
if old_cat in mappings:
new_cat = mappings[old_cat]
if new_cat in consolidated:
email_labels[i] = (email_id, new_cat)
else:
failed_updates.append((email_id, old_cat, new_cat))
else:
failed_updates.append((email_id, old_cat, None))
if failed_updates:
logger.error(f"Failed to update {len(failed_updates)} email labels")
logger.debug(f"First 3 failures: {failed_updates[:3]}")
logger.info(f"Consolidated {len(discovered_categories)}{len(consolidated)} categories")
for old, new in list(mappings.items())[:5]:
logger.info(f" Merged: {old}{new}")
# Final validation: Check we have valid consolidated categories
if not consolidated:
logger.error("Consolidation resulted in 0 categories, using fallback")
return self._fallback_consolidation(discovered_categories, category_counts, target_categories)
return consolidated
except json.JSONDecodeError as e:
logger.error(f"Consolidation JSON parse error: {e}")
logger.debug(f"Response: {response[:500]}")
return self._fallback_consolidation(discovered_categories, category_counts, target_categories)
except Exception as e:
logger.error(f"Consolidation failed: {e}", exc_info=True)
return self._fallback_consolidation(discovered_categories, category_counts, target_categories)
def _fallback_consolidation(
self,
discovered_categories: Dict[str, str],
category_counts: Dict[str, int],
target_categories: int
) -> Dict[str, str]:
"""
Fallback consolidation using simple heuristic (top-N by count).
Used when LLM consolidation fails or produces invalid results.
"""
logger.info(f"Using fallback consolidation: selecting top {target_categories} categories by count")
# Sort by count descending
sorted_by_count = sorted(
category_counts.items(),
key=lambda x: x[1],
reverse=True
)
# Take top N categories
top_categories = sorted_by_count[:target_categories]
# Build consolidated dict
consolidated = {}
for cat, count in top_categories:
if cat in discovered_categories:
consolidated[cat] = discovered_categories[cat]
else:
consolidated[cat] = f"Category with {count} emails"
logger.info(f"Fallback consolidated to {len(consolidated)} categories (top by count)")
for cat, count in top_categories[:5]:
logger.info(f" {cat}: {count} emails")
return consolidated
def _default_categories(self) -> Dict[str, Any]: def _default_categories(self) -> Dict[str, Any]:
"""Return default categories.""" """Return default categories."""
return { return {