Add intelligent multi-stage category matching with LLM review

Implements a sophisticated 5-stage matching strategy for category cache:

MATCHING PIPELINE:
1. Exact name match (1.0) → instant snap
2. High embedding similarity (≥0.7) → confident snap
3. Ambiguous similarity (0.5-0.7) → LLM review
4. Low similarity (<0.5) → accept as new (if slots available)
5. Exceeded max_new → force review/snap

LLM REVIEW FOR AMBIGUOUS CASES:
- Triggered when similarity scores are 0.5-0.7 (too low to snap, too high to ignore)
- LLM decides: snap to existing OR approve as new category
- Considers: semantic overlap, functional distinction, user value
- Conservative bias toward snapping (consistency > fragmentation)
- Respects max_new limit and remaining slots

HEURISTIC FALLBACK:
- If no LLM available: 0.6+ snaps, <0.6 becomes new (if allowed)
- Ensures system always produces valid category mapping

Configuration:
- similarity_threshold: 0.7 (confident match)
- llm_review_threshold: 0.5 (triggers LLM review)
- max_new: 3 (limits new categories per run)

This solves the key problem: embedding similarity alone can't decide
edge cases (0.5-0.7 scores). LLM provides intelligent judgment for
ambiguous matches, accepting valuable new categories while maintaining
cross-mailbox consistency.
This commit is contained in:
FSSCoding 2025-10-23 15:19:50 +11:00
parent 288b341f4e
commit eab378409e
2 changed files with 162 additions and 27 deletions

View File

@ -26,10 +26,11 @@ class CategoryCache:
- Support for mailbox-specific overrides
"""
def __init__(self, cache_path: str = "src/models/category_cache.json", embedding_model=None):
def __init__(self, cache_path: str = "src/models/category_cache.json", embedding_model=None, llm_provider=None):
self.cache_path = Path(cache_path)
self.cache: Dict[str, dict] = {}
self.embedding_model = embedding_model
self.llm_provider = llm_provider
self.load()
def load(self) -> None:
@ -71,16 +72,25 @@ class CategoryCache:
discovered: Dict[str, str],
similarity_threshold: float = 0.7,
allow_new: bool = True,
max_new: int = 3
max_new: int = 3,
llm_review_threshold: float = 0.5
) -> Tuple[Dict[str, str], Dict[str, str]]:
"""
Snap discovered categories to cached ones using semantic similarity.
Snap discovered categories to cached ones using multi-stage matching.
Matching Strategy:
1. Stage 1: Exact name match (score=1.0) instant snap
2. Stage 2: Embedding similarity threshold confident snap
3. Stage 3: Similarity < threshold but llm_review_threshold LLM review
4. Stage 4: Very low similarity auto-accept as new (if allowed)
5. Stage 5: Exceeded max_new force snap to closest
Args:
discovered: Newly discovered categories {name: description}
similarity_threshold: Minimum similarity to match (0-1)
allow_new: Whether to allow new categories not in cache
max_new: Maximum new categories to add per run
similarity_threshold: Confident match threshold (default: 0.7)
allow_new: Whether to allow new categories
max_new: Maximum new categories per run
llm_review_threshold: Min score to trigger LLM review (default: 0.5)
Returns:
(snapped_categories, mapping) where:
@ -88,50 +98,82 @@ class CategoryCache:
- mapping: {discovered_name: final_name} for all discovered
"""
if not self.cache:
# No cache yet, return discovered as-is
logger.info("Empty cache, using all discovered categories")
return discovered, {name: name for name in discovered}
snapped = {}
mapping = {}
new_categories = []
ambiguous_cases = [] # For LLM review
cached_cats = self.get_cached_categories()
# Stage 1-4: Classify each discovered category
for disc_name, disc_desc in discovered.items():
# Try to find best match in cache
best_match, best_score = self._find_best_match(
disc_name, disc_desc, cached_cats
)
if best_score >= similarity_threshold:
# Snap to cached category
# Stage 2: Confident match → snap immediately
mapping[disc_name] = best_match
if best_match not in snapped:
snapped[best_match] = cached_cats[best_match]
logger.debug(f"Snapped '{disc_name}''{best_match}' (similarity: {best_score:.2f})")
logger.debug(f"Confident snap '{disc_name}''{best_match}' (similarity: {best_score:.2f})")
elif best_score >= llm_review_threshold:
# Stage 3: Ambiguous → save for LLM review
ambiguous_cases.append((disc_name, disc_desc, best_match, best_score))
logger.info(f"Ambiguous: '{disc_name}' ~ '{best_match}' (similarity: {best_score:.2f}) → LLM review")
else:
# No good match found
# Stage 4: Very low similarity → likely new category
if allow_new and len(new_categories) < max_new:
# Allow as new category
new_categories.append((disc_name, disc_desc))
mapping[disc_name] = disc_name
snapped[disc_name] = disc_desc
logger.info(f"New category: '{disc_name}' (no cache match, score: {best_score:.2f})")
logger.info(f"New category: '{disc_name}' (low similarity: {best_score:.2f})")
else:
# Force snap to best match even if below threshold
if best_match:
# Stage 5: Can't add new → force snap
ambiguous_cases.append((disc_name, disc_desc, best_match, best_score))
logger.warning(f"Force review: '{disc_name}' (max_new exceeded, score: {best_score:.2f})")
# LLM Review for ambiguous cases
if ambiguous_cases and self.llm_provider:
logger.info(f"Requesting LLM review for {len(ambiguous_cases)} ambiguous cases...")
llm_decisions = self._llm_review_ambiguous(ambiguous_cases, cached_cats, allow_new, len(new_categories), max_new)
for disc_name, decision in llm_decisions.items():
if decision['action'] == 'snap':
target = decision['target']
mapping[disc_name] = target
if target not in snapped:
snapped[target] = cached_cats[target]
logger.info(f"LLM snap: '{disc_name}''{target}'")
elif decision['action'] == 'new':
# Find original description
disc_desc = next(desc for name, desc, _, _ in ambiguous_cases if name == disc_name)
new_categories.append((disc_name, disc_desc))
mapping[disc_name] = disc_name
snapped[disc_name] = disc_desc
logger.info(f"LLM approved new: '{disc_name}'")
elif ambiguous_cases:
# No LLM available → use heuristic fallback
logger.warning(f"No LLM for review, using heuristic for {len(ambiguous_cases)} ambiguous cases")
for disc_name, disc_desc, best_match, best_score in ambiguous_cases:
if best_score >= 0.6: # Medium-high score → snap
mapping[disc_name] = best_match
if best_match not in snapped:
snapped[best_match] = cached_cats[best_match]
elif allow_new and len(new_categories) < max_new: # Low score + space → new
new_categories.append((disc_name, disc_desc))
mapping[disc_name] = disc_name
snapped[disc_name] = disc_desc
else: # Force snap
mapping[disc_name] = best_match
if best_match not in snapped:
snapped[best_match] = cached_cats[best_match]
logger.warning(f"Forced snap '{disc_name}''{best_match}' (low similarity: {best_score:.2f})")
else:
# Fallback to first cached category
fallback = list(cached_cats.keys())[0]
mapping[disc_name] = fallback
if fallback not in snapped:
snapped[fallback] = cached_cats[fallback]
logger.warning(f"Fallback: '{disc_name}''{fallback}'")
logger.info(f"Snapping result: {len(snapped)} final categories ({len(new_categories)} new)")
return snapped, mapping
@ -215,6 +257,99 @@ class CategoryCache:
return float(dot_product / (norm_a * norm_b))
def _llm_review_ambiguous(
self,
ambiguous_cases: List[Tuple[str, str, str, float]],
cached_cats: Dict[str, str],
allow_new: bool,
current_new_count: int,
max_new: int
) -> Dict[str, Dict]:
"""
Use LLM to review ambiguous category matches.
Args:
ambiguous_cases: List of (disc_name, disc_desc, best_match, score)
cached_cats: All cached categories
allow_new: Whether new categories are allowed
current_new_count: How many new categories already accepted
max_new: Maximum new categories allowed
Returns:
Dict of {disc_name: {'action': 'snap'|'new', 'target': category_name}}
"""
import json
import re
# Build ambiguous cases summary
cases_summary = "\n".join([
f"- \"{name}\": {desc}\n Best cache match: \"{match}\" (similarity: {score:.2f})"
for name, desc, match, score in ambiguous_cases
])
# Build cached categories list
cached_summary = "\n".join([
f"- \"{name}\": {desc}"
for name, desc in cached_cats.items()
])
remaining_slots = max(0, max_new - current_new_count)
prompt = f"""<no_think>You are reviewing ambiguous category matches for an email classification system.
CACHED CATEGORIES (established):
{cached_summary}
AMBIGUOUS CASES (need decision):
{cases_summary}
CONTEXT:
- Embedding similarity scores: 1.0=identical, 0.7+=very similar, 0.5-0.7=ambiguous, <0.5=different
- Allow new categories: {allow_new}
- Remaining slots for new categories: {remaining_slots}/{max_new}
TASK:
For each ambiguous case, decide:
1. "snap" - If semantically similar enough to cached category (even if not perfect match)
2. "new" - If genuinely distinct and worth adding (only if slots available)
GUIDELINES:
- PREFER snapping to maintain consistency across mailboxes
- Only approve "new" if category serves a clearly distinct purpose
- Consider: Will users benefit from separating this vs merging with existing?
- Be conservative with "new" - consolidation is better than fragmentation
Return JSON:
{{
"CategoryName": {{"action": "snap"|"new", "target": "CachedCategoryName"}},
...
}}
For "snap": target = cached category to snap to
For "new": target = same as CategoryName (keeps original)
JSON:
"""
try:
response = self.llm_provider.complete(prompt, temperature=0.1, max_tokens=2000)
# Parse JSON
cleaned = re.sub(r'<think>.*?</think>', '', response, flags=re.DOTALL)
json_match = re.search(r'\{.*\}', cleaned, re.DOTALL)
if json_match:
decisions = json.loads(json_match.group())
logger.info(f"LLM reviewed {len(decisions)} categories")
return decisions
else:
logger.error("LLM review: no JSON found in response")
return {}
except Exception as e:
logger.error(f"LLM review failed: {e}")
return {}
def update_cache(
self,
categories: Dict[str, str],

View File

@ -32,7 +32,7 @@ class CalibrationAnalyzer:
# Initialize category cache for cross-mailbox consistency
cache_path = config.get('category_cache_path', 'src/models/category_cache.json')
self.category_cache = CategoryCache(cache_path, embedding_model=embedding_model)
self.category_cache = CategoryCache(cache_path, embedding_model=embedding_model, llm_provider=llm_provider)
if not self.llm_available:
logger.warning("LLM not available for calibration analysis")