Add category caching system and analytical data to prompts

Category Cache System (src/calibration/category_cache.py):
- Persistent storage of discovered categories across mailbox runs
- Semantic matching to snap new categories to existing ones
- Usage tracking for category popularity
- Configurable similarity threshold and new category limits
- JSON-based cache with metadata (created, last_seen, email counts)

Discovery Improvements (src/calibration/llm_analyzer.py):
- Calculate batch statistics: sender domains, recipient counts,
  attachments, subject lengths, common keywords
- Add statistics to LLM discovery prompt for better decisions
- Integrate CategoryCache into CalibrationAnalyzer
- 3-step workflow: Discover → Consolidate → Snap to Cache

Consolidation Improvements:
- Add cached categories as hints in consolidation prompt
- LLM prefers snapping to established categories
- Maintains cross-mailbox consistency while allowing new categories

Configuration Parameters:
- use_category_cache: Enable/disable caching (default: true)
- cache_similarity_threshold: Min similarity for snap (default: 0.7)
- cache_allow_new: Allow new categories (default: true)
- cache_max_new: Max new categories per run (default: 3)
- category_cache_path: Custom cache location

Result: Consistent category sets across different mailboxes
with intelligent discovery of new categories when appropriate.
This commit is contained in:
FSSCoding 2025-10-23 14:25:41 +11:00
parent 183b12c9b4
commit 874caf38bc
2 changed files with 325 additions and 0 deletions

View File

@ -0,0 +1,231 @@
"""
Category cache system for consistent categorization across mailboxes.
Stores discovered categories and provides semantic matching to snap
new discoveries to existing categories for cross-mailbox consistency.
"""
import json
import logging
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from datetime import datetime
logger = logging.getLogger(__name__)
class CategoryCache:
"""
Manages cached categories for consistent email classification.
Features:
- Persistent storage of discovered categories
- Semantic matching to snap new categories to cached ones
- Usage tracking for category popularity
- Support for mailbox-specific overrides
"""
def __init__(self, cache_path: str = "src/models/category_cache.json"):
self.cache_path = Path(cache_path)
self.cache: Dict[str, dict] = {}
self.load()
def load(self) -> None:
"""Load category cache from disk."""
if self.cache_path.exists():
try:
with open(self.cache_path, 'r') as f:
data = json.load(f)
self.cache = data.get('categories', {})
logger.info(f"Loaded {len(self.cache)} cached categories from {self.cache_path}")
except Exception as e:
logger.error(f"Failed to load category cache: {e}")
self.cache = {}
else:
logger.info("No category cache found, starting fresh")
self.cache = {}
def save(self) -> None:
"""Save category cache to disk."""
try:
self.cache_path.parent.mkdir(parents=True, exist_ok=True)
data = {
'version': '1.0',
'updated': datetime.now().isoformat(),
'categories': self.cache
}
with open(self.cache_path, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Saved {len(self.cache)} categories to cache")
except Exception as e:
logger.error(f"Failed to save category cache: {e}")
def get_cached_categories(self) -> Dict[str, str]:
"""Get all cached categories as {name: description}."""
return {name: info['description'] for name, info in self.cache.items()}
def snap_to_cache(
self,
discovered: Dict[str, str],
similarity_threshold: float = 0.7,
allow_new: bool = True,
max_new: int = 3
) -> Tuple[Dict[str, str], Dict[str, str]]:
"""
Snap discovered categories to cached ones using semantic similarity.
Args:
discovered: Newly discovered categories {name: description}
similarity_threshold: Minimum similarity to match (0-1)
allow_new: Whether to allow new categories not in cache
max_new: Maximum new categories to add per run
Returns:
(snapped_categories, mapping) where:
- snapped_categories: Final category set (from cache + new)
- mapping: {discovered_name: final_name} for all discovered
"""
if not self.cache:
# No cache yet, return discovered as-is
logger.info("Empty cache, using all discovered categories")
return discovered, {name: name for name in discovered}
snapped = {}
mapping = {}
new_categories = []
cached_cats = self.get_cached_categories()
for disc_name, disc_desc in discovered.items():
# Try to find best match in cache
best_match, best_score = self._find_best_match(
disc_name, disc_desc, cached_cats
)
if best_score >= similarity_threshold:
# Snap to cached category
mapping[disc_name] = best_match
if best_match not in snapped:
snapped[best_match] = cached_cats[best_match]
logger.debug(f"Snapped '{disc_name}''{best_match}' (similarity: {best_score:.2f})")
else:
# No good match found
if allow_new and len(new_categories) < max_new:
# Allow as new category
new_categories.append((disc_name, disc_desc))
mapping[disc_name] = disc_name
snapped[disc_name] = disc_desc
logger.info(f"New category: '{disc_name}' (no cache match, score: {best_score:.2f})")
else:
# Force snap to best match even if below threshold
if best_match:
mapping[disc_name] = best_match
if best_match not in snapped:
snapped[best_match] = cached_cats[best_match]
logger.warning(f"Forced snap '{disc_name}''{best_match}' (low similarity: {best_score:.2f})")
else:
# Fallback to first cached category
fallback = list(cached_cats.keys())[0]
mapping[disc_name] = fallback
if fallback not in snapped:
snapped[fallback] = cached_cats[fallback]
logger.warning(f"Fallback: '{disc_name}''{fallback}'")
logger.info(f"Snapping result: {len(snapped)} final categories ({len(new_categories)} new)")
return snapped, mapping
def _find_best_match(
self,
name: str,
description: str,
cached: Dict[str, str]
) -> Tuple[Optional[str], float]:
"""
Find best matching cached category using simple similarity.
Uses exact name match, keyword overlap, and description similarity.
Returns (best_category_name, similarity_score).
"""
if not cached:
return None, 0.0
name_lower = name.lower()
desc_words = set(description.lower().split())
best_match = None
best_score = 0.0
for cached_name, cached_desc in cached.items():
score = 0.0
# Exact name match
if name_lower == cached_name.lower():
score = 1.0
# Partial name match
elif name_lower in cached_name.lower() or cached_name.lower() in name_lower:
score = 0.8
# Keyword overlap
else:
cached_words = set(cached_desc.lower().split())
common_words = desc_words & cached_words
if desc_words:
overlap = len(common_words) / len(desc_words)
score = overlap * 0.6 # Max 0.6 from keyword overlap
if score > best_score:
best_score = score
best_match = cached_name
return best_match, best_score
def update_cache(
self,
categories: Dict[str, str],
usage_count: Optional[Dict[str, int]] = None
) -> None:
"""
Update cache with new/refined categories.
Args:
categories: Categories to add/update {name: description}
usage_count: Optional email counts per category
"""
for name, desc in categories.items():
if name in self.cache:
# Update existing
self.cache[name]['description'] = desc
self.cache[name]['last_seen'] = datetime.now().isoformat()
if usage_count and name in usage_count:
self.cache[name]['total_emails'] = self.cache[name].get('total_emails', 0) + usage_count[name]
else:
# Add new
self.cache[name] = {
'description': desc,
'created': datetime.now().isoformat(),
'last_seen': datetime.now().isoformat(),
'total_emails': usage_count.get(name, 0) if usage_count else 0
}
self.save()
logger.info(f"Updated cache with {len(categories)} categories")
def get_stats(self) -> Dict:
"""Get cache statistics."""
if not self.cache:
return {'total_categories': 0}
total_emails = sum(info.get('total_emails', 0) for info in self.cache.values())
sorted_by_usage = sorted(
self.cache.items(),
key=lambda x: x[1].get('total_emails', 0),
reverse=True
)
return {
'total_categories': len(self.cache),
'total_emails_classified': total_emails,
'top_categories': [
(name, info.get('total_emails', 0))
for name, info in sorted_by_usage[:10]
]
}

View File

@ -6,6 +6,7 @@ from typing import List, Dict, Any, Optional, Tuple
from src.email_providers.base import Email from src.email_providers.base import Email
from src.llm.base import BaseLLMProvider from src.llm.base import BaseLLMProvider
from src.calibration.category_cache import CategoryCache
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -28,6 +29,10 @@ class CalibrationAnalyzer:
self.config = config self.config = config
self.llm_available = llm_provider.is_available() self.llm_available = llm_provider.is_available()
# Initialize category cache for cross-mailbox consistency
cache_path = config.get('category_cache_path', 'src/models/category_cache.json')
self.category_cache = CategoryCache(cache_path)
if not self.llm_available: if not self.llm_available:
logger.warning("LLM not available for calibration analysis") logger.warning("LLM not available for calibration analysis")
@ -91,10 +96,82 @@ class CalibrationAnalyzer:
else: else:
logger.warning("Consolidation didn't reduce categories, keeping original") logger.warning("Consolidation didn't reduce categories, keeping original")
# Step 3: Snap to cached categories for cross-mailbox consistency
use_cache = self.config.get('use_category_cache', True)
if use_cache and self.category_cache:
similarity_threshold = self.config.get('cache_similarity_threshold', 0.7)
allow_new = self.config.get('cache_allow_new', True)
max_new = self.config.get('cache_max_new', 3)
logger.info(f"Snapping to cached categories (threshold={similarity_threshold}, allow_new={allow_new}, max_new={max_new})")
final_categories, snap_mapping = self.category_cache.snap_to_cache(
discovered_categories,
similarity_threshold=similarity_threshold,
allow_new=allow_new,
max_new=max_new
)
# Update email labels with snapped categories
for i, (email_id, old_cat) in enumerate(email_labels):
if old_cat in snap_mapping:
email_labels[i] = (email_id, snap_mapping[old_cat])
logger.info(f"After cache snap: {len(final_categories)} categories")
discovered_categories = final_categories
# Update cache with usage counts
category_counts = {}
for _, cat in email_labels:
category_counts[cat] = category_counts.get(cat, 0) + 1
self.category_cache.update_cache(discovered_categories, category_counts)
return discovered_categories, email_labels return discovered_categories, email_labels
def _analyze_batch(self, batch: List[Email], batch_idx: int = 0) -> Dict[str, Any]: def _analyze_batch(self, batch: List[Email], batch_idx: int = 0) -> Dict[str, Any]:
"""Analyze single batch of emails.""" """Analyze single batch of emails."""
# Calculate analytical patterns
sender_domains = {}
recipients_count = []
has_attachments = 0
avg_subject_len = 0
common_keywords = {}
for e in batch:
# Domain analysis
if '@' in e.sender:
domain = e.sender.split('@')[1].lower()
sender_domains[domain] = sender_domains.get(domain, 0) + 1
# Recipient count
recipient_count = len(e.recipients) if hasattr(e, 'recipients') else 1
recipients_count.append(recipient_count)
# Attachments
if hasattr(e, 'has_attachments') and e.has_attachments:
has_attachments += 1
# Subject length
avg_subject_len += len(e.subject)
# Extract keywords from subject (simple word frequency)
words = e.subject.lower().split()
for word in words:
if len(word) > 3: # Skip short words
common_keywords[word] = common_keywords.get(word, 0) + 1
# Build statistics summary
top_domains = sorted(sender_domains.items(), key=lambda x: x[1], reverse=True)[:5]
top_keywords = sorted(common_keywords.items(), key=lambda x: x[1], reverse=True)[:10]
avg_recipients = sum(recipients_count) / len(recipients_count) if recipients_count else 0
avg_subject_len = avg_subject_len / len(batch) if batch else 0
stats_summary = f"""BATCH STATISTICS ({len(batch)} emails):
- Top sender domains: {', '.join([f'{d} ({c})' for d, c in top_domains])}
- Avg recipients per email: {avg_recipients:.1f}
- Emails with attachments: {has_attachments}/{len(batch)}
- Avg subject length: {avg_subject_len:.0f} chars
- Common keywords: {', '.join([f'{w}({c})' for w, c in top_keywords[:5]])}"""
# Build email summary with actual IDs # Build email summary with actual IDs
email_list = [] email_list = []
for i, e in enumerate(batch): for i, e in enumerate(batch):
@ -116,6 +193,8 @@ GUIDELINES FOR GOOD CATEGORIES:
- FUNCTIONAL: Each category serves a distinct purpose - FUNCTIONAL: Each category serves a distinct purpose
- 3-10 categories ideal: Too many = noise, too few = useless - 3-10 categories ideal: Too many = noise, too few = useless
{stats_summary}
EMAILS TO ANALYZE: EMAILS TO ANALYZE:
{email_summary} {email_summary}
@ -251,6 +330,21 @@ JSON:
# Build context section # Build context section
context_parts = [] context_parts = []
# Add cached categories as consolidation hints
if self.category_cache:
cached_cats = self.category_cache.get_cached_categories()
if cached_cats:
cache_stats = self.category_cache.get_stats()
cache_list = "\n".join([
f" - {name}: {desc}"
for name, desc in list(cached_cats.items())[:15] # Show top 15
])
context_parts.append(f"""CACHED CATEGORIES ({cache_stats['total_categories']} total, showing top 15):
These are established categories from previous mailboxes. PREFER consolidating to these
when semantically appropriate to maintain cross-mailbox consistency.
{cache_list}""")
if inbox_context: if inbox_context:
context_parts.append(f"INBOX CONTEXT: {inbox_context}") context_parts.append(f"INBOX CONTEXT: {inbox_context}")