email-sorter/BUILD_INSTRUCTIONS.md
Brett Fox 8c73f25537 Initial commit: Complete project blueprint and research
- PROJECT_BLUEPRINT.md: Full architecture with LightGBM, Qwen3, structured embeddings
- RESEARCH_FINDINGS.md: 2024 benchmarks, competition analysis, validation
- BUILD_INSTRUCTIONS.md: Step-by-step implementation guide
- README.md: User-friendly overview and quick start
- Research-backed hybrid ML/LLM email classifier
- 94-96% accuracy target, 17min for 80k emails
- Privacy-first, local processing, distributable wheel
- Modular architecture with tiered dependencies
- LLM optional (graceful degradation)
- OpenAI-compatible API support
2025-10-21 03:08:28 +11:00

1299 lines
33 KiB
Markdown

# EMAIL SORTER - BUILD INSTRUCTIONS
**Step-by-Step Implementation Guide**
Version: 1.0
Date: 2024-10-21
---
## PREREQUISITES
### Required Software
- Python 3.8 or higher
- Git
- Ollama (for local LLM)
- Text editor / IDE
### Required Accounts
- Gmail account (for testing)
- Google Cloud Console project (for Gmail API)
### Skills Needed
- Python programming
- Basic understanding of ML concepts
- Command line comfort
- OAuth 2.0 basics
---
## IMPLEMENTATION ORDER
Build in this exact order. Each phase depends on previous phases.
---
## PHASE 1: PROJECT SETUP
### Step 1: Initialize Git Repository
```bash
cd C:\Users\BrettFox\Documents\Claude\email-sorter
git init
git add .
git commit -m "Initial commit - project blueprint"
```
### Step 2: Create Virtual Environment
```bash
# Create venv
python -m venv venv
# Activate (Windows)
venv\Scripts\activate
# Activate (Linux/Mac)
source venv/bin/activate
```
### Step 3: Create requirements.txt
Already exists, but verify contents:
```txt
# Core
python-dotenv>=1.0.0
pyyaml>=6.0
pydantic>=2.0.0
# Email Providers
google-api-python-client>=2.100.0
google-auth-httplib2>=0.1.1
google-auth-oauthlib>=1.1.0
msal>=1.24.0
imapclient>=2.3.1
# Machine Learning
scikit-learn>=1.3.0
xgboost>=2.0.0
lightgbm>=4.0.0
pandas>=2.0.0
numpy>=1.24.0
# LLM Integration
ollama>=0.1.0
# Text Processing
nltk>=3.8
beautifulsoup4>=4.12.0
lxml>=4.9.0
# Utilities
tqdm>=4.66.0
click>=8.1.0
rich>=13.0.0
joblib>=1.3.0
tenacity>=8.2.0
# Testing
pytest>=7.4.0
pytest-cov>=4.1.0
pytest-mock>=3.11.0
```
### Step 4: Install Dependencies
```bash
pip install -r requirements.txt
```
### Step 5: Create .gitignore
```
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
venv/
*.egg-info/
dist/
build/
# Data and Models
data/training/
src/models/pretrained/*.pkl
*.h5
*.joblib
# Credentials
.env
credentials/
*.json
!config/*.json
# Logs
logs/*.log
*.log
# IDE
.vscode/
.idea/
*.swp
# OS
.DS_Store
Thumbs.db
# Checkpoints
checkpoints/
*.checkpoint
# Results
results/
output/
```
### Step 6: Create Directory Structure
```bash
# Create all directories
mkdir -p src/calibration
mkdir -p src/classification
mkdir -p src/models/pretrained
mkdir -p src/email_providers
mkdir -p src/processing
mkdir -p src/adjustment
mkdir -p src/export
mkdir -p src/utils
mkdir -p tests
mkdir -p prompts
mkdir -p scripts
mkdir -p data/samples
mkdir -p logs
mkdir -p config
# Create __init__.py files
touch src/__init__.py
touch src/calibration/__init__.py
touch src/classification/__init__.py
touch src/models/__init__.py
touch src/email_providers/__init__.py
touch src/processing/__init__.py
touch src/adjustment/__init__.py
touch src/export/__init__.py
touch src/utils/__init__.py
touch tests/__init__.py
# Windows equivalent:
# type nul > src\__init__.py
# (repeat for each)
```
---
## PHASE 2: CORE INFRASTRUCTURE
### Step 7: Config System (src/utils/config.py)
Create the configuration loader:
```python
"""Configuration management."""
import yaml
from pathlib import Path
from typing import Dict, Any
from pydantic import BaseModel
class Config(BaseModel):
"""Main configuration model."""
version: str
calibration: Dict[str, Any]
processing: Dict[str, Any]
classification: Dict[str, Any]
llm: Dict[str, Any]
email_providers: Dict[str, Any]
features: Dict[str, Any]
export: Dict[str, Any]
logging: Dict[str, Any]
cleanup: Dict[str, Any]
class Config:
extra = "allow"
def load_config(config_path: str = "config/default_config.yaml") -> Config:
"""Load configuration from YAML file."""
with open(config_path, 'r') as f:
config_dict = yaml.safe_load(f)
return Config(**config_dict)
def load_categories(categories_path: str = "config/categories.yaml") -> Dict[str, Dict]:
"""Load category definitions."""
with open(categories_path, 'r') as f:
data = yaml.safe_load(f)
return data['categories']
def load_features(features_path: str = "config/features.yaml") -> Dict[str, Any]:
"""Load feature configuration."""
with open(features_path, 'r') as f:
return yaml.safe_load(f)
```
**Test:**
```bash
python -c "from src.utils.config import load_config; print(load_config())"
```
### Step 8: Logging System (src/utils/logging.py)
```python
"""Logging configuration."""
import logging
import sys
from pathlib import Path
from rich.logging import RichHandler
def setup_logging(config: dict):
"""Setup logging with console and file handlers."""
log_level = config.get('level', 'INFO')
log_file = config.get('file', 'logs/email-sorter.log')
# Create logs directory
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
# Create logger
logger = logging.getLogger()
logger.setLevel(log_level)
# Remove existing handlers
logger.handlers = []
# Console handler with rich formatting
console_handler = RichHandler(
rich_tracebacks=True,
markup=True,
show_time=True,
show_path=False
)
console_handler.setLevel(log_level)
console_formatter = logging.Formatter('%(message)s')
console_handler.setFormatter(console_formatter)
# File handler
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(log_level)
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(file_formatter)
# Add handlers
logger.addHandler(console_handler)
logger.addHandler(file_handler)
return logger
```
### Step 9: Email Data Models (src/email_providers/base.py)
```python
"""Base email provider interface and data models."""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Any, Optional
@dataclass
class Email:
"""Unified email data model."""
id: str
subject: str
sender: str
sender_name: Optional[str] = None
date: Optional[datetime] = None
body: str = ""
body_snippet: str = ""
has_attachments: bool = False
attachments: List[Dict] = field(default_factory=list)
headers: Dict = field(default_factory=dict)
labels: List[str] = field(default_factory=list)
is_read: bool = False
def __post_init__(self):
"""Generate body_snippet if not provided."""
if not self.body_snippet and self.body:
self.body_snippet = self.body[:500]
class BaseProvider(ABC):
"""Abstract base class for email providers."""
@abstractmethod
def connect(self, credentials: Dict) -> bool:
"""Establish connection to email provider."""
pass
@abstractmethod
def fetch_emails(self, limit: int = None, filters: Dict = None) -> List[Email]:
"""Fetch emails from provider."""
pass
@abstractmethod
def update_labels(self, email_id: str, labels: List[str]) -> bool:
"""Update email labels/folders."""
pass
@abstractmethod
def batch_update(self, updates: List[Dict]) -> bool:
"""Batch update multiple emails."""
pass
@abstractmethod
def disconnect(self):
"""Close connection."""
pass
```
**Test:**
```bash
python -c "from src.email_providers.base import Email; e = Email(id='1', subject='Test', sender='test@test.com'); print(e)"
```
---
## PHASE 3: CONFIGURATION FILES
### Step 10: Create config/default_config.yaml
```yaml
version: "1.0.0"
calibration:
sample_size: 1500
sample_strategy: "stratified"
validation_size: 300
min_confidence: 0.6
processing:
batch_size: 100
llm_queue_size: 100
parallel_workers: 4
checkpoint_interval: 1000
classification:
default_threshold: 0.75
min_threshold: 0.60
max_threshold: 0.90
adjustment_step: 0.05
adjustment_frequency: 1000
category_thresholds:
junk: 0.85
auth: 0.80
conversational: 0.65
llm:
provider: "ollama"
model: "qwen2.5:1.5b"
base_url: "http://localhost:11434"
temperature: 0.1
max_tokens: 500
timeout: 30
retry_attempts: 3
email_providers:
gmail:
batch_size: 100
microsoft:
batch_size: 100
imap:
timeout: 30
batch_size: 50
features:
text_features:
max_vocab_size: 10000
ngram_range: [1, 2]
min_df: 2
max_df: 0.95
export:
format: "json"
include_confidence: true
create_report: true
logging:
level: "INFO"
file: "logs/email-sorter.log"
cleanup:
delete_temp_files: true
delete_repo_after: false
```
### Step 11: Create config/categories.yaml
(See PROJECT_BLUEPRINT.md for full content)
### Step 12: Create config/features.yaml
(See PROJECT_BLUEPRINT.md for full content)
**Test:**
```bash
python -c "from src.utils.config import load_config, load_categories; print(load_config()); print(load_categories())"
```
---
## PHASE 4: EMAIL PROVIDERS
### Step 13: Gmail Provider (src/email_providers/gmail.py)
```python
"""Gmail API provider implementation."""
import base64
import logging
from typing import List, Dict, Optional
from datetime import datetime
from email.utils import parsedate_to_datetime
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from .base import BaseProvider, Email
logger = logging.getLogger(__name__)
class GmailProvider(BaseProvider):
"""Gmail API email provider."""
SCOPES = [
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.modify'
]
def __init__(self):
self.service = None
self.user_id = 'me'
def connect(self, credentials_path: str) -> bool:
"""Connect to Gmail API using OAuth credentials."""
try:
# For first-time auth
flow = InstalledAppFlow.from_client_secrets_file(
credentials_path, self.SCOPES
)
creds = flow.run_local_server(port=0)
self.service = build('gmail', 'v1', credentials=creds)
logger.info("Connected to Gmail API")
return True
except Exception as e:
logger.error(f"Failed to connect to Gmail: {e}")
return False
def fetch_emails(self, limit: int = None, filters: Dict = None) -> List[Email]:
"""Fetch emails from Gmail."""
emails = []
try:
# Build query
query = filters.get('query', '') if filters else ''
# Get message IDs
results = self.service.users().messages().list(
userId=self.user_id,
q=query,
maxResults=min(limit or 500, 500) if limit else 500
).execute()
messages = results.get('messages', [])
# Fetch full messages
for msg_info in messages:
email = self._fetch_message(msg_info['id'])
if email:
emails.append(email)
if limit and len(emails) >= limit:
break
logger.info(f"Fetched {len(emails)} emails from Gmail")
return emails
except HttpError as e:
logger.error(f"Gmail API error: {e}")
return emails
def _fetch_message(self, msg_id: str) -> Optional[Email]:
"""Fetch and parse a single message."""
try:
msg = self.service.users().messages().get(
userId=self.user_id,
id=msg_id,
format='full'
).execute()
return self._parse_message(msg)
except Exception as e:
logger.error(f"Error fetching message {msg_id}: {e}")
return None
def _parse_message(self, msg: Dict) -> Email:
"""Parse Gmail message into Email object."""
headers = {h['name']: h['value'] for h in msg['payload']['headers']}
# Extract body
body = self._get_body(msg['payload'])
# Parse date
date = None
if 'Date' in headers:
try:
date = parsedate_to_datetime(headers['Date'])
except:
pass
# Check attachments
has_attachments = False
attachments = []
if 'parts' in msg['payload']:
for part in msg['payload']['parts']:
if part.get('filename'):
has_attachments = True
attachments.append({
'filename': part['filename'],
'mime_type': part['mimeType'],
'size': part.get('body', {}).get('size', 0)
})
return Email(
id=msg['id'],
subject=headers.get('Subject', 'No Subject'),
sender=headers.get('From', ''),
date=date,
body=body,
has_attachments=has_attachments,
attachments=attachments,
headers=headers,
labels=msg.get('labelIds', []),
is_read='UNREAD' not in msg.get('labelIds', [])
)
def _get_body(self, payload: Dict) -> str:
"""Extract email body from payload."""
body = ""
if 'body' in payload and 'data' in payload['body']:
body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8', errors='ignore')
elif 'parts' in payload:
for part in payload['parts']:
if part['mimeType'] == 'text/plain':
if 'data' in part['body']:
body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8', errors='ignore')
break
return body
def update_labels(self, email_id: str, labels: List[str]) -> bool:
"""Update labels for a single email."""
try:
self.service.users().messages().modify(
userId=self.user_id,
id=email_id,
body={'addLabelIds': labels}
).execute()
return True
except Exception as e:
logger.error(f"Error updating labels: {e}")
return False
def batch_update(self, updates: List[Dict]) -> bool:
"""Batch update multiple emails."""
try:
batch_size = 100
for i in range(0, len(updates), batch_size):
batch = updates[i:i+batch_size]
email_ids = [u['email_id'] for u in batch]
labels = list(set([l for u in batch for l in u.get('labels', [])]))
self.service.users().messages().batchModify(
userId=self.user_id,
body={
'ids': email_ids,
'addLabelIds': labels
}
).execute()
logger.info(f"Batch updated {len(updates)} emails")
return True
except Exception as e:
logger.error(f"Batch update error: {e}")
return False
def disconnect(self):
"""Close connection."""
self.service = None
logger.info("Disconnected from Gmail")
```
**Test (requires Gmail OAuth setup):**
```bash
# First: Set up OAuth in Google Cloud Console
# Download credentials.json
python -c "from src.email_providers.gmail import GmailProvider; p = GmailProvider(); p.connect('credentials.json'); emails = p.fetch_emails(limit=10); print(f'Fetched {len(emails)} emails')"
```
---
## PHASE 5: FEATURE EXTRACTION
### Step 14: Feature Extractor (src/classification/feature_extractor.py)
```python
"""Feature extraction from emails."""
import re
import logging
from typing import Dict, List, Any
from datetime import datetime
from urllib.parse import urlparse
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from src.email_providers.base import Email
logger = logging.getLogger(__name__)
class FeatureExtractor:
"""Extract features from emails for classification."""
def __init__(self, config: Dict = None):
"""Initialize with feature configuration."""
self.config = config or {
'text_features': {
'max_features': 10000,
'ngram_range': [1, 2],
'min_df': 2,
'max_df': 0.95
}
}
self.text_vectorizer = None
self._initialize_vectorizer()
def _initialize_vectorizer(self):
"""Initialize TF-IDF vectorizer."""
text_config = self.config.get('text_features', {})
self.text_vectorizer = TfidfVectorizer(
max_features=text_config.get('max_features', 10000),
ngram_range=tuple(text_config.get('ngram_range', [1, 2])),
min_df=text_config.get('min_df', 2),
max_df=text_config.get('max_df', 0.95),
sublinear_tf=True
)
def extract(self, email: Email) -> Dict[str, Any]:
"""
Extract features from a single email.
Args:
email: Email object
Returns:
Dictionary of features
"""
features = {}
# Text for TF-IDF
features['text'] = f"{email.subject} {email.body_snippet}"
# Structural features
features.update(self._extract_structural(email))
# Sender features
features.update(self._extract_sender(email))
# Pattern features
features.update(self._extract_patterns(email))
return features
def _extract_structural(self, email: Email) -> Dict[str, Any]:
"""Extract structural features."""
features = {}
# Attachments
features['has_attachments'] = email.has_attachments
features['attachment_count'] = len(email.attachments)
# Links and images
body = email.body or email.body_snippet
features['link_count'] = len(re.findall(r'https?://', body))
features['image_count'] = len(re.findall(r'<img', body, re.IGNORECASE))
# Lengths
features['body_length'] = len(body)
features['subject_length'] = len(email.subject)
# Reply/Forward
features['has_reply_prefix'] = bool(re.match(r'^(Re:|Fwd:)', email.subject, re.IGNORECASE))
# Time features
if email.date:
hour = email.date.hour
if 0 <= hour < 6:
features['time_of_day'] = 'night'
elif 6 <= hour < 12:
features['time_of_day'] = 'morning'
elif 12 <= hour < 18:
features['time_of_day'] = 'afternoon'
else:
features['time_of_day'] = 'evening'
features['day_of_week'] = email.date.strftime('%A').lower()
else:
features['time_of_day'] = 'unknown'
features['day_of_week'] = 'unknown'
return features
def _extract_sender(self, email: Email) -> Dict[str, Any]:
"""Extract sender-based features."""
features = {}
sender = email.sender
if '@' in sender:
# Extract domain
domain = sender.split('@')[1].lower()
features['sender_domain'] = domain
# Domain type
freemail_domains = {'gmail.com', 'yahoo.com', 'hotmail.com', 'outlook.com', 'icloud.com'}
noreply_patterns = ['noreply', 'no-reply', 'donotreply']
marketing_patterns = ['marketing', 'newsletter', 'promo']
if domain in freemail_domains:
features['sender_domain_type'] = 'freemail'
elif any(p in sender.lower() for p in noreply_patterns):
features['sender_domain_type'] = 'noreply'
elif any(p in sender.lower() for p in marketing_patterns):
features['sender_domain_type'] = 'marketing'
else:
features['sender_domain_type'] = 'corporate'
features['is_noreply'] = any(p in sender.lower() for p in noreply_patterns)
else:
features['sender_domain'] = 'unknown'
features['sender_domain_type'] = 'unknown'
features['is_noreply'] = False
return features
def _extract_patterns(self, email: Email) -> Dict[str, Any]:
"""Extract pattern-based features."""
features = {}
body = (email.body or email.body_snippet).lower()
subject = email.subject.lower()
combined = f"{subject} {body}"
# Common patterns
features['has_unsubscribe'] = 'unsubscribe' in combined
features['has_otp_pattern'] = bool(re.search(r'\b\d{4,6}\b', combined))
features['has_price'] = bool(re.search(r'\$\d+', combined))
features['has_tracking_pattern'] = bool(re.search(r'tracking\s*(number|#)', combined))
features['has_invoice_pattern'] = bool(re.search(r'(invoice|bill|receipt)\s*#?\d+', combined))
features['has_meeting_pattern'] = bool(re.search(r'(meeting|call|zoom|teams)', combined))
return features
def extract_batch(self, emails: List[Email]) -> pd.DataFrame:
"""
Extract features from batch of emails.
Args:
emails: List of Email objects
Returns:
DataFrame with all features
"""
# Extract features for each email
feature_dicts = [self.extract(email) for email in emails]
# Convert to DataFrame
df = pd.DataFrame(feature_dicts)
# Transform text features if vectorizer is fitted
if self.text_vectorizer and 'text' in df.columns:
if hasattr(self.text_vectorizer, 'vocabulary_'):
text_features = self.text_vectorizer.transform(df['text'])
text_df = pd.DataFrame(
text_features.toarray(),
columns=[f"text_{i}" for i in range(text_features.shape[1])]
)
df = pd.concat([df.drop('text', axis=1), text_df], axis=1)
else:
df = df.drop('text', axis=1)
return df
def fit_text_vectorizer(self, emails: List[Email]):
"""Fit text vectorizer on corpus."""
texts = [f"{e.subject} {e.body_snippet}" for e in emails]
self.text_vectorizer.fit(texts)
logger.info(f"Fitted vectorizer with {len(self.text_vectorizer.vocabulary_)} features")
```
**Test:**
```bash
# Create mock email and test
python -c "
from src.email_providers.base import Email
from src.classification.feature_extractor import FeatureExtractor
from datetime import datetime
email = Email(
id='1',
subject='Meeting at 3pm',
sender='john@company.com',
date=datetime.now(),
body='Let us meet to discuss the project',
has_attachments=True
)
extractor = FeatureExtractor()
features = extractor.extract(email)
print(features)
"
```
---
## PHASE 6: ML CLASSIFIER (BLOCKER - NEED MODEL)
### Step 15: ML Classifier Wrapper (src/classification/ml_classifier.py)
```python
"""ML-based email classifier."""
import logging
import pickle
from typing import Dict, List, Any
import numpy as np
from pathlib import Path
logger = logging.getLogger(__name__)
class MLClassifier:
"""Wrapper for pre-trained ML classification model."""
def __init__(self, model_path: str = "src/models/pretrained/classifier.pkl"):
"""Load pre-trained model."""
self.model = None
self.label_encoder = None
self.categories = []
self.feature_names = []
self._load_model(model_path)
def _load_model(self, model_path: str):
"""Load model from file."""
try:
with open(model_path, 'rb') as f:
model_data = pickle.load(f)
self.model = model_data['model']
self.label_encoder = model_data.get('label_encoder')
self.categories = model_data.get('categories', [])
self.feature_names = model_data.get('feature_names', [])
logger.info(f"Loaded ML model with {len(self.categories)} categories")
except FileNotFoundError:
logger.warning(f"Model file not found: {model_path}")
logger.warning("Will need to train model or use alternative classification")
except Exception as e:
logger.error(f"Error loading model: {e}")
def predict(self, features: np.ndarray) -> Dict[str, Any]:
"""
Predict category for feature vector.
Args:
features: Feature vector or DataFrame row
Returns:
{
'category': str,
'confidence': float,
'probabilities': Dict[str, float]
}
"""
if self.model is None:
return {
'category': 'unknown',
'confidence': 0.0,
'probabilities': {},
'error': 'Model not loaded'
}
# Get probabilities
probs = self.model.predict_proba([features])[0]
# Get predicted class
pred_class = np.argmax(probs)
category = self.categories[pred_class]
confidence = float(probs[pred_class])
# All probabilities
prob_dict = {
self.categories[i]: float(probs[i])
for i in range(len(self.categories))
}
return {
'category': category,
'confidence': confidence,
'probabilities': prob_dict
}
def predict_batch(self, features: np.ndarray) -> List[Dict[str, Any]]:
"""Predict for batch of feature vectors."""
return [self.predict(f) for f in features]
```
### ⚠️ CRITICAL: You need to either:
**Option A: Create a placeholder model for testing**
```python
# scripts/create_mock_model.py
import pickle
from sklearn.ensemble import RandomForestClassifier
import numpy as np
# Create dummy model
model = RandomForestClassifier(n_estimators=10)
X_dummy = np.random.rand(100, 50)
y_dummy = np.random.randint(0, 12, 100)
model.fit(X_dummy, y_dummy)
categories = [
'junk', 'transactional', 'auth', 'newsletters',
'social', 'automated', 'conversational', 'work',
'personal', 'finance', 'travel', 'unknown'
]
model_data = {
'model': model,
'categories': categories,
'feature_names': [f'feature_{i}' for i in range(50)]
}
with open('src/models/pretrained/classifier.pkl', 'wb') as f:
pickle.dump(model_data, f)
print("Mock model created!")
```
**Option B: Train a real model (recommended)**
See scripts/train_model.py (to be created in next phase)
---
## PHASE 7: LLM INTEGRATION
### Step 16: LLM Classifier (src/classification/llm_classifier.py)
```python
"""LLM-based email classifier using Ollama."""
import logging
import json
import re
from typing import Dict, List, Any
from abc import ABC, abstractmethod
logger = logging.getLogger(__name__)
class BaseLLMProvider(ABC):
"""Abstract LLM provider."""
@abstractmethod
def complete(self, prompt: str, **kwargs) -> str:
pass
@abstractmethod
def test_connection(self) -> bool:
pass
class OllamaProvider(BaseLLMProvider):
"""Ollama local LLM provider."""
def __init__(self, model: str = "qwen2.5:1.5b", base_url: str = "http://localhost:11434"):
try:
import ollama
self.client = ollama.Client(host=base_url)
self.model = model
logger.info(f"Initialized Ollama provider with model {model}")
except ImportError:
logger.error("ollama package not installed. Run: pip install ollama")
self.client = None
except Exception as e:
logger.error(f"Failed to initialize Ollama: {e}")
self.client = None
def complete(self, prompt: str, **kwargs) -> str:
if not self.client:
raise RuntimeError("Ollama client not available")
response = self.client.generate(
model=self.model,
prompt=prompt,
options={
'temperature': kwargs.get('temperature', 0.1),
'num_predict': kwargs.get('max_tokens', 500)
}
)
return response['response']
def test_connection(self) -> bool:
try:
self.client.list()
return True
except:
return False
class LLMClassifier:
"""Email classifier using LLM."""
def __init__(self, provider: BaseLLMProvider, categories: Dict[str, Dict], config: Dict):
self.provider = provider
self.categories = categories
self.config = config
self.classification_prompt = self._load_prompt_template()
def _load_prompt_template(self) -> str:
"""Load or create classification prompt."""
# Try to load from file first
try:
with open('prompts/classification.txt', 'r') as f:
return f.read()
except FileNotFoundError:
# Use default prompt
return """You are an expert email classifier.
CATEGORIES:
{categories}
EMAIL:
Subject: {subject}
From: {sender}
Has Attachments: {has_attachments}
Body Snippet: {body_snippet}
ML Prediction: {ml_prediction} (confidence: {ml_confidence})
Respond with JSON only:
{{
"category": "chosen_category",
"confidence": 0.85,
"reasoning": "brief explanation"
}}
"""
def classify(self, email: Dict[str, Any]) -> Dict[str, Any]:
"""Classify email using LLM."""
# Build prompt
categories_str = "\n".join([
f"- {name}: {info['description']}"
for name, info in self.categories.items()
])
ml_pred = email.get('ml_prediction', {})
prompt = self.classification_prompt.format(
categories=categories_str,
subject=email.get('subject', 'N/A'),
sender=email.get('sender', 'N/A'),
has_attachments=email.get('has_attachments', False),
body_snippet=email.get('body_snippet', '')[:300],
ml_prediction=ml_pred.get('category', 'unknown'),
ml_confidence=ml_pred.get('confidence', 0.0)
)
try:
# Get LLM response
response = self.provider.complete(
prompt,
temperature=self.config['llm']['temperature'],
max_tokens=self.config['llm']['max_tokens']
)
# Parse JSON response
result = self._parse_response(response)
return result
except Exception as e:
logger.error(f"LLM classification failed: {e}")
return {
'category': 'unknown',
'confidence': 0.0,
'reasoning': f'Error: {str(e)}',
'error': True
}
def _parse_response(self, response: str) -> Dict[str, Any]:
"""Parse LLM JSON response."""
# Try to extract JSON
json_match = re.search(r'\{.*\}', response, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group())
except json.JSONDecodeError:
pass
# Fallback parsing
return {
'category': 'unknown',
'confidence': 0.5,
'reasoning': response[:200]
}
```
**Test (requires Ollama running):**
```bash
# First: Install and start Ollama
# ollama pull qwen2.5:1.5b
python -c "
from src.classification.llm_classifier import OllamaProvider, LLMClassifier
from src.utils.config import load_categories, load_config
provider = OllamaProvider()
categories = load_categories()
config = load_config()
classifier = LLMClassifier(provider, categories, config)
email = {
'subject': 'Your verification code is 123456',
'sender': 'noreply@bank.com',
'has_attachments': False,
'body_snippet': 'Your one-time password is 123456',
'ml_prediction': {'category': 'auth', 'confidence': 0.65}
}
result = classifier.classify(email)
print(result)
"
```
---
## NEXT PHASES
Due to length limits, the remaining phases are:
### Phase 8: Adaptive Classifier
- Dynamic threshold adjustment
- Sender rule learning
- Classification orchestration
### Phase 9: Processing Pipeline
- Bulk processor
- Queue management
- Checkpointing
### Phase 10: Calibration System
- Email sampling
- LLM calibration analysis
- Validation
### Phase 11: Export & Sync
- Results exporter
- Gmail sync
- Report generation
### Phase 12: Main CLI
- Click interface
- End-to-end orchestration
### Phase 13: Testing
- Unit tests
- Integration tests
- Full pipeline test on Marion's inbox
---
## TESTING STRATEGY
### Unit Testing
```bash
pytest tests/test_classification.py -v
```
### Integration Testing
```bash
# Test on 100 emails
python src/main.py --source gmail --credentials creds.json --output test/ --limit 100
# Test on 1000 emails
python src/main.py --source gmail --credentials creds.json --output test/ --limit 1000
```
### Full Pipeline
```bash
# Run on Marion's full inbox
python src/main.py --source gmail --credentials marion-creds.json --output results/
```
---
## CRITICAL NEXT STEPS
1. **DECIDE: ML Model Strategy**
- Option A: Create mock model for immediate testing
- Option B: Train real model (takes 1-2 days)
2. **Set up Gmail OAuth**
- Google Cloud Console
- Enable Gmail API
- Download credentials.json
3. **Install and test Ollama**
- Download from ollama.ai
- Pull model: `ollama pull qwen2.5:1.5b`
- Test: `ollama run qwen2.5:1.5b "test"`
4. **Continue building**
- Next: Adaptive Classifier
- Then: Processing Pipeline
- Then: Full integration
---
**THIS IS THE ACTUAL BUILD GUIDE**
Everything in this document provides real, executable steps to build the system.