diff --git a/src/calibration/category_cache.py b/src/calibration/category_cache.py index c059c8f..c738e8c 100644 --- a/src/calibration/category_cache.py +++ b/src/calibration/category_cache.py @@ -26,10 +26,11 @@ class CategoryCache: - Support for mailbox-specific overrides """ - def __init__(self, cache_path: str = "src/models/category_cache.json", embedding_model=None): + def __init__(self, cache_path: str = "src/models/category_cache.json", embedding_model=None, llm_provider=None): self.cache_path = Path(cache_path) self.cache: Dict[str, dict] = {} self.embedding_model = embedding_model + self.llm_provider = llm_provider self.load() def load(self) -> None: @@ -71,16 +72,25 @@ class CategoryCache: discovered: Dict[str, str], similarity_threshold: float = 0.7, allow_new: bool = True, - max_new: int = 3 + max_new: int = 3, + llm_review_threshold: float = 0.5 ) -> Tuple[Dict[str, str], Dict[str, str]]: """ - Snap discovered categories to cached ones using semantic similarity. + Snap discovered categories to cached ones using multi-stage matching. + + Matching Strategy: + 1. Stage 1: Exact name match (score=1.0) → instant snap + 2. Stage 2: Embedding similarity ≥ threshold → confident snap + 3. Stage 3: Similarity < threshold but ≥ llm_review_threshold → LLM review + 4. Stage 4: Very low similarity → auto-accept as new (if allowed) + 5. Stage 5: Exceeded max_new → force snap to closest Args: discovered: Newly discovered categories {name: description} - similarity_threshold: Minimum similarity to match (0-1) - allow_new: Whether to allow new categories not in cache - max_new: Maximum new categories to add per run + similarity_threshold: Confident match threshold (default: 0.7) + allow_new: Whether to allow new categories + max_new: Maximum new categories per run + llm_review_threshold: Min score to trigger LLM review (default: 0.5) Returns: (snapped_categories, mapping) where: @@ -88,50 +98,82 @@ class CategoryCache: - mapping: {discovered_name: final_name} for all discovered """ if not self.cache: - # No cache yet, return discovered as-is logger.info("Empty cache, using all discovered categories") return discovered, {name: name for name in discovered} snapped = {} mapping = {} new_categories = [] + ambiguous_cases = [] # For LLM review cached_cats = self.get_cached_categories() + # Stage 1-4: Classify each discovered category for disc_name, disc_desc in discovered.items(): - # Try to find best match in cache best_match, best_score = self._find_best_match( disc_name, disc_desc, cached_cats ) if best_score >= similarity_threshold: - # Snap to cached category + # Stage 2: Confident match → snap immediately mapping[disc_name] = best_match if best_match not in snapped: snapped[best_match] = cached_cats[best_match] - logger.debug(f"Snapped '{disc_name}' → '{best_match}' (similarity: {best_score:.2f})") + logger.debug(f"Confident snap '{disc_name}' → '{best_match}' (similarity: {best_score:.2f})") + + elif best_score >= llm_review_threshold: + # Stage 3: Ambiguous → save for LLM review + ambiguous_cases.append((disc_name, disc_desc, best_match, best_score)) + logger.info(f"Ambiguous: '{disc_name}' ~ '{best_match}' (similarity: {best_score:.2f}) → LLM review") + else: - # No good match found + # Stage 4: Very low similarity → likely new category if allow_new and len(new_categories) < max_new: - # Allow as new category new_categories.append((disc_name, disc_desc)) mapping[disc_name] = disc_name snapped[disc_name] = disc_desc - logger.info(f"New category: '{disc_name}' (no cache match, score: {best_score:.2f})") + logger.info(f"New category: '{disc_name}' (low similarity: {best_score:.2f})") else: - # Force snap to best match even if below threshold - if best_match: - mapping[disc_name] = best_match - if best_match not in snapped: - snapped[best_match] = cached_cats[best_match] - logger.warning(f"Forced snap '{disc_name}' → '{best_match}' (low similarity: {best_score:.2f})") - else: - # Fallback to first cached category - fallback = list(cached_cats.keys())[0] - mapping[disc_name] = fallback - if fallback not in snapped: - snapped[fallback] = cached_cats[fallback] - logger.warning(f"Fallback: '{disc_name}' → '{fallback}'") + # Stage 5: Can't add new → force snap + ambiguous_cases.append((disc_name, disc_desc, best_match, best_score)) + logger.warning(f"Force review: '{disc_name}' (max_new exceeded, score: {best_score:.2f})") + + # LLM Review for ambiguous cases + if ambiguous_cases and self.llm_provider: + logger.info(f"Requesting LLM review for {len(ambiguous_cases)} ambiguous cases...") + llm_decisions = self._llm_review_ambiguous(ambiguous_cases, cached_cats, allow_new, len(new_categories), max_new) + + for disc_name, decision in llm_decisions.items(): + if decision['action'] == 'snap': + target = decision['target'] + mapping[disc_name] = target + if target not in snapped: + snapped[target] = cached_cats[target] + logger.info(f"LLM snap: '{disc_name}' → '{target}'") + elif decision['action'] == 'new': + # Find original description + disc_desc = next(desc for name, desc, _, _ in ambiguous_cases if name == disc_name) + new_categories.append((disc_name, disc_desc)) + mapping[disc_name] = disc_name + snapped[disc_name] = disc_desc + logger.info(f"LLM approved new: '{disc_name}'") + + elif ambiguous_cases: + # No LLM available → use heuristic fallback + logger.warning(f"No LLM for review, using heuristic for {len(ambiguous_cases)} ambiguous cases") + for disc_name, disc_desc, best_match, best_score in ambiguous_cases: + if best_score >= 0.6: # Medium-high score → snap + mapping[disc_name] = best_match + if best_match not in snapped: + snapped[best_match] = cached_cats[best_match] + elif allow_new and len(new_categories) < max_new: # Low score + space → new + new_categories.append((disc_name, disc_desc)) + mapping[disc_name] = disc_name + snapped[disc_name] = disc_desc + else: # Force snap + mapping[disc_name] = best_match + if best_match not in snapped: + snapped[best_match] = cached_cats[best_match] logger.info(f"Snapping result: {len(snapped)} final categories ({len(new_categories)} new)") return snapped, mapping @@ -215,6 +257,99 @@ class CategoryCache: return float(dot_product / (norm_a * norm_b)) + def _llm_review_ambiguous( + self, + ambiguous_cases: List[Tuple[str, str, str, float]], + cached_cats: Dict[str, str], + allow_new: bool, + current_new_count: int, + max_new: int + ) -> Dict[str, Dict]: + """ + Use LLM to review ambiguous category matches. + + Args: + ambiguous_cases: List of (disc_name, disc_desc, best_match, score) + cached_cats: All cached categories + allow_new: Whether new categories are allowed + current_new_count: How many new categories already accepted + max_new: Maximum new categories allowed + + Returns: + Dict of {disc_name: {'action': 'snap'|'new', 'target': category_name}} + """ + import json + import re + + # Build ambiguous cases summary + cases_summary = "\n".join([ + f"- \"{name}\": {desc}\n Best cache match: \"{match}\" (similarity: {score:.2f})" + for name, desc, match, score in ambiguous_cases + ]) + + # Build cached categories list + cached_summary = "\n".join([ + f"- \"{name}\": {desc}" + for name, desc in cached_cats.items() + ]) + + remaining_slots = max(0, max_new - current_new_count) + + prompt = f"""You are reviewing ambiguous category matches for an email classification system. + +CACHED CATEGORIES (established): +{cached_summary} + +AMBIGUOUS CASES (need decision): +{cases_summary} + +CONTEXT: +- Embedding similarity scores: 1.0=identical, 0.7+=very similar, 0.5-0.7=ambiguous, <0.5=different +- Allow new categories: {allow_new} +- Remaining slots for new categories: {remaining_slots}/{max_new} + +TASK: +For each ambiguous case, decide: +1. "snap" - If semantically similar enough to cached category (even if not perfect match) +2. "new" - If genuinely distinct and worth adding (only if slots available) + +GUIDELINES: +- PREFER snapping to maintain consistency across mailboxes +- Only approve "new" if category serves a clearly distinct purpose +- Consider: Will users benefit from separating this vs merging with existing? +- Be conservative with "new" - consolidation is better than fragmentation + +Return JSON: +{{ + "CategoryName": {{"action": "snap"|"new", "target": "CachedCategoryName"}}, + ... +}} + +For "snap": target = cached category to snap to +For "new": target = same as CategoryName (keeps original) + +JSON: +""" + + try: + response = self.llm_provider.complete(prompt, temperature=0.1, max_tokens=2000) + + # Parse JSON + cleaned = re.sub(r'.*?', '', response, flags=re.DOTALL) + json_match = re.search(r'\{.*\}', cleaned, re.DOTALL) + + if json_match: + decisions = json.loads(json_match.group()) + logger.info(f"LLM reviewed {len(decisions)} categories") + return decisions + else: + logger.error("LLM review: no JSON found in response") + return {} + + except Exception as e: + logger.error(f"LLM review failed: {e}") + return {} + def update_cache( self, categories: Dict[str, str], diff --git a/src/calibration/llm_analyzer.py b/src/calibration/llm_analyzer.py index 03d3c5a..e7bcd38 100644 --- a/src/calibration/llm_analyzer.py +++ b/src/calibration/llm_analyzer.py @@ -32,7 +32,7 @@ class CalibrationAnalyzer: # Initialize category cache for cross-mailbox consistency cache_path = config.get('category_cache_path', 'src/models/category_cache.json') - self.category_cache = CategoryCache(cache_path, embedding_model=embedding_model) + self.category_cache = CategoryCache(cache_path, embedding_model=embedding_model, llm_provider=llm_provider) if not self.llm_available: logger.warning("LLM not available for calibration analysis")