- PROJECT_BLUEPRINT.md: Full architecture with LightGBM, Qwen3, structured embeddings - RESEARCH_FINDINGS.md: 2024 benchmarks, competition analysis, validation - BUILD_INSTRUCTIONS.md: Step-by-step implementation guide - README.md: User-friendly overview and quick start - Research-backed hybrid ML/LLM email classifier - 94-96% accuracy target, 17min for 80k emails - Privacy-first, local processing, distributable wheel - Modular architecture with tiered dependencies - LLM optional (graceful degradation) - OpenAI-compatible API support
1299 lines
33 KiB
Markdown
1299 lines
33 KiB
Markdown
# EMAIL SORTER - BUILD INSTRUCTIONS
|
|
|
|
**Step-by-Step Implementation Guide**
|
|
|
|
Version: 1.0
|
|
Date: 2024-10-21
|
|
|
|
---
|
|
|
|
## PREREQUISITES
|
|
|
|
### Required Software
|
|
- Python 3.8 or higher
|
|
- Git
|
|
- Ollama (for local LLM)
|
|
- Text editor / IDE
|
|
|
|
### Required Accounts
|
|
- Gmail account (for testing)
|
|
- Google Cloud Console project (for Gmail API)
|
|
|
|
### Skills Needed
|
|
- Python programming
|
|
- Basic understanding of ML concepts
|
|
- Command line comfort
|
|
- OAuth 2.0 basics
|
|
|
|
---
|
|
|
|
## IMPLEMENTATION ORDER
|
|
|
|
Build in this exact order. Each phase depends on previous phases.
|
|
|
|
---
|
|
|
|
## PHASE 1: PROJECT SETUP
|
|
|
|
### Step 1: Initialize Git Repository
|
|
|
|
```bash
|
|
cd C:\Users\BrettFox\Documents\Claude\email-sorter
|
|
git init
|
|
git add .
|
|
git commit -m "Initial commit - project blueprint"
|
|
```
|
|
|
|
### Step 2: Create Virtual Environment
|
|
|
|
```bash
|
|
# Create venv
|
|
python -m venv venv
|
|
|
|
# Activate (Windows)
|
|
venv\Scripts\activate
|
|
|
|
# Activate (Linux/Mac)
|
|
source venv/bin/activate
|
|
```
|
|
|
|
### Step 3: Create requirements.txt
|
|
|
|
Already exists, but verify contents:
|
|
|
|
```txt
|
|
# Core
|
|
python-dotenv>=1.0.0
|
|
pyyaml>=6.0
|
|
pydantic>=2.0.0
|
|
|
|
# Email Providers
|
|
google-api-python-client>=2.100.0
|
|
google-auth-httplib2>=0.1.1
|
|
google-auth-oauthlib>=1.1.0
|
|
msal>=1.24.0
|
|
imapclient>=2.3.1
|
|
|
|
# Machine Learning
|
|
scikit-learn>=1.3.0
|
|
xgboost>=2.0.0
|
|
lightgbm>=4.0.0
|
|
pandas>=2.0.0
|
|
numpy>=1.24.0
|
|
|
|
# LLM Integration
|
|
ollama>=0.1.0
|
|
|
|
# Text Processing
|
|
nltk>=3.8
|
|
beautifulsoup4>=4.12.0
|
|
lxml>=4.9.0
|
|
|
|
# Utilities
|
|
tqdm>=4.66.0
|
|
click>=8.1.0
|
|
rich>=13.0.0
|
|
joblib>=1.3.0
|
|
tenacity>=8.2.0
|
|
|
|
# Testing
|
|
pytest>=7.4.0
|
|
pytest-cov>=4.1.0
|
|
pytest-mock>=3.11.0
|
|
```
|
|
|
|
### Step 4: Install Dependencies
|
|
|
|
```bash
|
|
pip install -r requirements.txt
|
|
```
|
|
|
|
### Step 5: Create .gitignore
|
|
|
|
```
|
|
# Python
|
|
__pycache__/
|
|
*.py[cod]
|
|
*$py.class
|
|
*.so
|
|
.Python
|
|
env/
|
|
venv/
|
|
*.egg-info/
|
|
dist/
|
|
build/
|
|
|
|
# Data and Models
|
|
data/training/
|
|
src/models/pretrained/*.pkl
|
|
*.h5
|
|
*.joblib
|
|
|
|
# Credentials
|
|
.env
|
|
credentials/
|
|
*.json
|
|
!config/*.json
|
|
|
|
# Logs
|
|
logs/*.log
|
|
*.log
|
|
|
|
# IDE
|
|
.vscode/
|
|
.idea/
|
|
*.swp
|
|
|
|
# OS
|
|
.DS_Store
|
|
Thumbs.db
|
|
|
|
# Checkpoints
|
|
checkpoints/
|
|
*.checkpoint
|
|
|
|
# Results
|
|
results/
|
|
output/
|
|
```
|
|
|
|
### Step 6: Create Directory Structure
|
|
|
|
```bash
|
|
# Create all directories
|
|
mkdir -p src/calibration
|
|
mkdir -p src/classification
|
|
mkdir -p src/models/pretrained
|
|
mkdir -p src/email_providers
|
|
mkdir -p src/processing
|
|
mkdir -p src/adjustment
|
|
mkdir -p src/export
|
|
mkdir -p src/utils
|
|
mkdir -p tests
|
|
mkdir -p prompts
|
|
mkdir -p scripts
|
|
mkdir -p data/samples
|
|
mkdir -p logs
|
|
mkdir -p config
|
|
|
|
# Create __init__.py files
|
|
touch src/__init__.py
|
|
touch src/calibration/__init__.py
|
|
touch src/classification/__init__.py
|
|
touch src/models/__init__.py
|
|
touch src/email_providers/__init__.py
|
|
touch src/processing/__init__.py
|
|
touch src/adjustment/__init__.py
|
|
touch src/export/__init__.py
|
|
touch src/utils/__init__.py
|
|
touch tests/__init__.py
|
|
|
|
# Windows equivalent:
|
|
# type nul > src\__init__.py
|
|
# (repeat for each)
|
|
```
|
|
|
|
---
|
|
|
|
## PHASE 2: CORE INFRASTRUCTURE
|
|
|
|
### Step 7: Config System (src/utils/config.py)
|
|
|
|
Create the configuration loader:
|
|
|
|
```python
|
|
"""Configuration management."""
|
|
import yaml
|
|
from pathlib import Path
|
|
from typing import Dict, Any
|
|
from pydantic import BaseModel
|
|
|
|
|
|
class Config(BaseModel):
|
|
"""Main configuration model."""
|
|
version: str
|
|
calibration: Dict[str, Any]
|
|
processing: Dict[str, Any]
|
|
classification: Dict[str, Any]
|
|
llm: Dict[str, Any]
|
|
email_providers: Dict[str, Any]
|
|
features: Dict[str, Any]
|
|
export: Dict[str, Any]
|
|
logging: Dict[str, Any]
|
|
cleanup: Dict[str, Any]
|
|
|
|
class Config:
|
|
extra = "allow"
|
|
|
|
|
|
def load_config(config_path: str = "config/default_config.yaml") -> Config:
|
|
"""Load configuration from YAML file."""
|
|
with open(config_path, 'r') as f:
|
|
config_dict = yaml.safe_load(f)
|
|
return Config(**config_dict)
|
|
|
|
|
|
def load_categories(categories_path: str = "config/categories.yaml") -> Dict[str, Dict]:
|
|
"""Load category definitions."""
|
|
with open(categories_path, 'r') as f:
|
|
data = yaml.safe_load(f)
|
|
return data['categories']
|
|
|
|
|
|
def load_features(features_path: str = "config/features.yaml") -> Dict[str, Any]:
|
|
"""Load feature configuration."""
|
|
with open(features_path, 'r') as f:
|
|
return yaml.safe_load(f)
|
|
```
|
|
|
|
**Test:**
|
|
```bash
|
|
python -c "from src.utils.config import load_config; print(load_config())"
|
|
```
|
|
|
|
### Step 8: Logging System (src/utils/logging.py)
|
|
|
|
```python
|
|
"""Logging configuration."""
|
|
import logging
|
|
import sys
|
|
from pathlib import Path
|
|
from rich.logging import RichHandler
|
|
|
|
|
|
def setup_logging(config: dict):
|
|
"""Setup logging with console and file handlers."""
|
|
log_level = config.get('level', 'INFO')
|
|
log_file = config.get('file', 'logs/email-sorter.log')
|
|
|
|
# Create logs directory
|
|
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create logger
|
|
logger = logging.getLogger()
|
|
logger.setLevel(log_level)
|
|
|
|
# Remove existing handlers
|
|
logger.handlers = []
|
|
|
|
# Console handler with rich formatting
|
|
console_handler = RichHandler(
|
|
rich_tracebacks=True,
|
|
markup=True,
|
|
show_time=True,
|
|
show_path=False
|
|
)
|
|
console_handler.setLevel(log_level)
|
|
console_formatter = logging.Formatter('%(message)s')
|
|
console_handler.setFormatter(console_formatter)
|
|
|
|
# File handler
|
|
file_handler = logging.FileHandler(log_file)
|
|
file_handler.setLevel(log_level)
|
|
file_formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
file_handler.setFormatter(file_formatter)
|
|
|
|
# Add handlers
|
|
logger.addHandler(console_handler)
|
|
logger.addHandler(file_handler)
|
|
|
|
return logger
|
|
```
|
|
|
|
### Step 9: Email Data Models (src/email_providers/base.py)
|
|
|
|
```python
|
|
"""Base email provider interface and data models."""
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
|
|
@dataclass
|
|
class Email:
|
|
"""Unified email data model."""
|
|
id: str
|
|
subject: str
|
|
sender: str
|
|
sender_name: Optional[str] = None
|
|
date: Optional[datetime] = None
|
|
body: str = ""
|
|
body_snippet: str = ""
|
|
has_attachments: bool = False
|
|
attachments: List[Dict] = field(default_factory=list)
|
|
headers: Dict = field(default_factory=dict)
|
|
labels: List[str] = field(default_factory=list)
|
|
is_read: bool = False
|
|
|
|
def __post_init__(self):
|
|
"""Generate body_snippet if not provided."""
|
|
if not self.body_snippet and self.body:
|
|
self.body_snippet = self.body[:500]
|
|
|
|
|
|
class BaseProvider(ABC):
|
|
"""Abstract base class for email providers."""
|
|
|
|
@abstractmethod
|
|
def connect(self, credentials: Dict) -> bool:
|
|
"""Establish connection to email provider."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def fetch_emails(self, limit: int = None, filters: Dict = None) -> List[Email]:
|
|
"""Fetch emails from provider."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def update_labels(self, email_id: str, labels: List[str]) -> bool:
|
|
"""Update email labels/folders."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def batch_update(self, updates: List[Dict]) -> bool:
|
|
"""Batch update multiple emails."""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def disconnect(self):
|
|
"""Close connection."""
|
|
pass
|
|
```
|
|
|
|
**Test:**
|
|
```bash
|
|
python -c "from src.email_providers.base import Email; e = Email(id='1', subject='Test', sender='test@test.com'); print(e)"
|
|
```
|
|
|
|
---
|
|
|
|
## PHASE 3: CONFIGURATION FILES
|
|
|
|
### Step 10: Create config/default_config.yaml
|
|
|
|
```yaml
|
|
version: "1.0.0"
|
|
|
|
calibration:
|
|
sample_size: 1500
|
|
sample_strategy: "stratified"
|
|
validation_size: 300
|
|
min_confidence: 0.6
|
|
|
|
processing:
|
|
batch_size: 100
|
|
llm_queue_size: 100
|
|
parallel_workers: 4
|
|
checkpoint_interval: 1000
|
|
|
|
classification:
|
|
default_threshold: 0.75
|
|
min_threshold: 0.60
|
|
max_threshold: 0.90
|
|
adjustment_step: 0.05
|
|
adjustment_frequency: 1000
|
|
category_thresholds:
|
|
junk: 0.85
|
|
auth: 0.80
|
|
conversational: 0.65
|
|
|
|
llm:
|
|
provider: "ollama"
|
|
model: "qwen2.5:1.5b"
|
|
base_url: "http://localhost:11434"
|
|
temperature: 0.1
|
|
max_tokens: 500
|
|
timeout: 30
|
|
retry_attempts: 3
|
|
|
|
email_providers:
|
|
gmail:
|
|
batch_size: 100
|
|
microsoft:
|
|
batch_size: 100
|
|
imap:
|
|
timeout: 30
|
|
batch_size: 50
|
|
|
|
features:
|
|
text_features:
|
|
max_vocab_size: 10000
|
|
ngram_range: [1, 2]
|
|
min_df: 2
|
|
max_df: 0.95
|
|
|
|
export:
|
|
format: "json"
|
|
include_confidence: true
|
|
create_report: true
|
|
|
|
logging:
|
|
level: "INFO"
|
|
file: "logs/email-sorter.log"
|
|
|
|
cleanup:
|
|
delete_temp_files: true
|
|
delete_repo_after: false
|
|
```
|
|
|
|
### Step 11: Create config/categories.yaml
|
|
|
|
(See PROJECT_BLUEPRINT.md for full content)
|
|
|
|
### Step 12: Create config/features.yaml
|
|
|
|
(See PROJECT_BLUEPRINT.md for full content)
|
|
|
|
**Test:**
|
|
```bash
|
|
python -c "from src.utils.config import load_config, load_categories; print(load_config()); print(load_categories())"
|
|
```
|
|
|
|
---
|
|
|
|
## PHASE 4: EMAIL PROVIDERS
|
|
|
|
### Step 13: Gmail Provider (src/email_providers/gmail.py)
|
|
|
|
```python
|
|
"""Gmail API provider implementation."""
|
|
import base64
|
|
import logging
|
|
from typing import List, Dict, Optional
|
|
from datetime import datetime
|
|
from email.utils import parsedate_to_datetime
|
|
|
|
from google.oauth2.credentials import Credentials
|
|
from google.auth.transport.requests import Request
|
|
from google_auth_oauthlib.flow import InstalledAppFlow
|
|
from googleapiclient.discovery import build
|
|
from googleapiclient.errors import HttpError
|
|
|
|
from .base import BaseProvider, Email
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class GmailProvider(BaseProvider):
|
|
"""Gmail API email provider."""
|
|
|
|
SCOPES = [
|
|
'https://www.googleapis.com/auth/gmail.readonly',
|
|
'https://www.googleapis.com/auth/gmail.modify'
|
|
]
|
|
|
|
def __init__(self):
|
|
self.service = None
|
|
self.user_id = 'me'
|
|
|
|
def connect(self, credentials_path: str) -> bool:
|
|
"""Connect to Gmail API using OAuth credentials."""
|
|
try:
|
|
# For first-time auth
|
|
flow = InstalledAppFlow.from_client_secrets_file(
|
|
credentials_path, self.SCOPES
|
|
)
|
|
creds = flow.run_local_server(port=0)
|
|
|
|
self.service = build('gmail', 'v1', credentials=creds)
|
|
logger.info("Connected to Gmail API")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to Gmail: {e}")
|
|
return False
|
|
|
|
def fetch_emails(self, limit: int = None, filters: Dict = None) -> List[Email]:
|
|
"""Fetch emails from Gmail."""
|
|
emails = []
|
|
|
|
try:
|
|
# Build query
|
|
query = filters.get('query', '') if filters else ''
|
|
|
|
# Get message IDs
|
|
results = self.service.users().messages().list(
|
|
userId=self.user_id,
|
|
q=query,
|
|
maxResults=min(limit or 500, 500) if limit else 500
|
|
).execute()
|
|
|
|
messages = results.get('messages', [])
|
|
|
|
# Fetch full messages
|
|
for msg_info in messages:
|
|
email = self._fetch_message(msg_info['id'])
|
|
if email:
|
|
emails.append(email)
|
|
if limit and len(emails) >= limit:
|
|
break
|
|
|
|
logger.info(f"Fetched {len(emails)} emails from Gmail")
|
|
return emails
|
|
|
|
except HttpError as e:
|
|
logger.error(f"Gmail API error: {e}")
|
|
return emails
|
|
|
|
def _fetch_message(self, msg_id: str) -> Optional[Email]:
|
|
"""Fetch and parse a single message."""
|
|
try:
|
|
msg = self.service.users().messages().get(
|
|
userId=self.user_id,
|
|
id=msg_id,
|
|
format='full'
|
|
).execute()
|
|
|
|
return self._parse_message(msg)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching message {msg_id}: {e}")
|
|
return None
|
|
|
|
def _parse_message(self, msg: Dict) -> Email:
|
|
"""Parse Gmail message into Email object."""
|
|
headers = {h['name']: h['value'] for h in msg['payload']['headers']}
|
|
|
|
# Extract body
|
|
body = self._get_body(msg['payload'])
|
|
|
|
# Parse date
|
|
date = None
|
|
if 'Date' in headers:
|
|
try:
|
|
date = parsedate_to_datetime(headers['Date'])
|
|
except:
|
|
pass
|
|
|
|
# Check attachments
|
|
has_attachments = False
|
|
attachments = []
|
|
if 'parts' in msg['payload']:
|
|
for part in msg['payload']['parts']:
|
|
if part.get('filename'):
|
|
has_attachments = True
|
|
attachments.append({
|
|
'filename': part['filename'],
|
|
'mime_type': part['mimeType'],
|
|
'size': part.get('body', {}).get('size', 0)
|
|
})
|
|
|
|
return Email(
|
|
id=msg['id'],
|
|
subject=headers.get('Subject', 'No Subject'),
|
|
sender=headers.get('From', ''),
|
|
date=date,
|
|
body=body,
|
|
has_attachments=has_attachments,
|
|
attachments=attachments,
|
|
headers=headers,
|
|
labels=msg.get('labelIds', []),
|
|
is_read='UNREAD' not in msg.get('labelIds', [])
|
|
)
|
|
|
|
def _get_body(self, payload: Dict) -> str:
|
|
"""Extract email body from payload."""
|
|
body = ""
|
|
|
|
if 'body' in payload and 'data' in payload['body']:
|
|
body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8', errors='ignore')
|
|
elif 'parts' in payload:
|
|
for part in payload['parts']:
|
|
if part['mimeType'] == 'text/plain':
|
|
if 'data' in part['body']:
|
|
body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8', errors='ignore')
|
|
break
|
|
|
|
return body
|
|
|
|
def update_labels(self, email_id: str, labels: List[str]) -> bool:
|
|
"""Update labels for a single email."""
|
|
try:
|
|
self.service.users().messages().modify(
|
|
userId=self.user_id,
|
|
id=email_id,
|
|
body={'addLabelIds': labels}
|
|
).execute()
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error updating labels: {e}")
|
|
return False
|
|
|
|
def batch_update(self, updates: List[Dict]) -> bool:
|
|
"""Batch update multiple emails."""
|
|
try:
|
|
batch_size = 100
|
|
|
|
for i in range(0, len(updates), batch_size):
|
|
batch = updates[i:i+batch_size]
|
|
email_ids = [u['email_id'] for u in batch]
|
|
labels = list(set([l for u in batch for l in u.get('labels', [])]))
|
|
|
|
self.service.users().messages().batchModify(
|
|
userId=self.user_id,
|
|
body={
|
|
'ids': email_ids,
|
|
'addLabelIds': labels
|
|
}
|
|
).execute()
|
|
|
|
logger.info(f"Batch updated {len(updates)} emails")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Batch update error: {e}")
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Close connection."""
|
|
self.service = None
|
|
logger.info("Disconnected from Gmail")
|
|
```
|
|
|
|
**Test (requires Gmail OAuth setup):**
|
|
```bash
|
|
# First: Set up OAuth in Google Cloud Console
|
|
# Download credentials.json
|
|
python -c "from src.email_providers.gmail import GmailProvider; p = GmailProvider(); p.connect('credentials.json'); emails = p.fetch_emails(limit=10); print(f'Fetched {len(emails)} emails')"
|
|
```
|
|
|
|
---
|
|
|
|
## PHASE 5: FEATURE EXTRACTION
|
|
|
|
### Step 14: Feature Extractor (src/classification/feature_extractor.py)
|
|
|
|
```python
|
|
"""Feature extraction from emails."""
|
|
import re
|
|
import logging
|
|
from typing import Dict, List, Any
|
|
from datetime import datetime
|
|
from urllib.parse import urlparse
|
|
|
|
import pandas as pd
|
|
import numpy as np
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
|
|
from src.email_providers.base import Email
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FeatureExtractor:
|
|
"""Extract features from emails for classification."""
|
|
|
|
def __init__(self, config: Dict = None):
|
|
"""Initialize with feature configuration."""
|
|
self.config = config or {
|
|
'text_features': {
|
|
'max_features': 10000,
|
|
'ngram_range': [1, 2],
|
|
'min_df': 2,
|
|
'max_df': 0.95
|
|
}
|
|
}
|
|
|
|
self.text_vectorizer = None
|
|
self._initialize_vectorizer()
|
|
|
|
def _initialize_vectorizer(self):
|
|
"""Initialize TF-IDF vectorizer."""
|
|
text_config = self.config.get('text_features', {})
|
|
self.text_vectorizer = TfidfVectorizer(
|
|
max_features=text_config.get('max_features', 10000),
|
|
ngram_range=tuple(text_config.get('ngram_range', [1, 2])),
|
|
min_df=text_config.get('min_df', 2),
|
|
max_df=text_config.get('max_df', 0.95),
|
|
sublinear_tf=True
|
|
)
|
|
|
|
def extract(self, email: Email) -> Dict[str, Any]:
|
|
"""
|
|
Extract features from a single email.
|
|
|
|
Args:
|
|
email: Email object
|
|
|
|
Returns:
|
|
Dictionary of features
|
|
"""
|
|
features = {}
|
|
|
|
# Text for TF-IDF
|
|
features['text'] = f"{email.subject} {email.body_snippet}"
|
|
|
|
# Structural features
|
|
features.update(self._extract_structural(email))
|
|
|
|
# Sender features
|
|
features.update(self._extract_sender(email))
|
|
|
|
# Pattern features
|
|
features.update(self._extract_patterns(email))
|
|
|
|
return features
|
|
|
|
def _extract_structural(self, email: Email) -> Dict[str, Any]:
|
|
"""Extract structural features."""
|
|
features = {}
|
|
|
|
# Attachments
|
|
features['has_attachments'] = email.has_attachments
|
|
features['attachment_count'] = len(email.attachments)
|
|
|
|
# Links and images
|
|
body = email.body or email.body_snippet
|
|
features['link_count'] = len(re.findall(r'https?://', body))
|
|
features['image_count'] = len(re.findall(r'<img', body, re.IGNORECASE))
|
|
|
|
# Lengths
|
|
features['body_length'] = len(body)
|
|
features['subject_length'] = len(email.subject)
|
|
|
|
# Reply/Forward
|
|
features['has_reply_prefix'] = bool(re.match(r'^(Re:|Fwd:)', email.subject, re.IGNORECASE))
|
|
|
|
# Time features
|
|
if email.date:
|
|
hour = email.date.hour
|
|
if 0 <= hour < 6:
|
|
features['time_of_day'] = 'night'
|
|
elif 6 <= hour < 12:
|
|
features['time_of_day'] = 'morning'
|
|
elif 12 <= hour < 18:
|
|
features['time_of_day'] = 'afternoon'
|
|
else:
|
|
features['time_of_day'] = 'evening'
|
|
|
|
features['day_of_week'] = email.date.strftime('%A').lower()
|
|
else:
|
|
features['time_of_day'] = 'unknown'
|
|
features['day_of_week'] = 'unknown'
|
|
|
|
return features
|
|
|
|
def _extract_sender(self, email: Email) -> Dict[str, Any]:
|
|
"""Extract sender-based features."""
|
|
features = {}
|
|
|
|
sender = email.sender
|
|
if '@' in sender:
|
|
# Extract domain
|
|
domain = sender.split('@')[1].lower()
|
|
features['sender_domain'] = domain
|
|
|
|
# Domain type
|
|
freemail_domains = {'gmail.com', 'yahoo.com', 'hotmail.com', 'outlook.com', 'icloud.com'}
|
|
noreply_patterns = ['noreply', 'no-reply', 'donotreply']
|
|
marketing_patterns = ['marketing', 'newsletter', 'promo']
|
|
|
|
if domain in freemail_domains:
|
|
features['sender_domain_type'] = 'freemail'
|
|
elif any(p in sender.lower() for p in noreply_patterns):
|
|
features['sender_domain_type'] = 'noreply'
|
|
elif any(p in sender.lower() for p in marketing_patterns):
|
|
features['sender_domain_type'] = 'marketing'
|
|
else:
|
|
features['sender_domain_type'] = 'corporate'
|
|
|
|
features['is_noreply'] = any(p in sender.lower() for p in noreply_patterns)
|
|
else:
|
|
features['sender_domain'] = 'unknown'
|
|
features['sender_domain_type'] = 'unknown'
|
|
features['is_noreply'] = False
|
|
|
|
return features
|
|
|
|
def _extract_patterns(self, email: Email) -> Dict[str, Any]:
|
|
"""Extract pattern-based features."""
|
|
features = {}
|
|
|
|
body = (email.body or email.body_snippet).lower()
|
|
subject = email.subject.lower()
|
|
combined = f"{subject} {body}"
|
|
|
|
# Common patterns
|
|
features['has_unsubscribe'] = 'unsubscribe' in combined
|
|
features['has_otp_pattern'] = bool(re.search(r'\b\d{4,6}\b', combined))
|
|
features['has_price'] = bool(re.search(r'\$\d+', combined))
|
|
features['has_tracking_pattern'] = bool(re.search(r'tracking\s*(number|#)', combined))
|
|
features['has_invoice_pattern'] = bool(re.search(r'(invoice|bill|receipt)\s*#?\d+', combined))
|
|
features['has_meeting_pattern'] = bool(re.search(r'(meeting|call|zoom|teams)', combined))
|
|
|
|
return features
|
|
|
|
def extract_batch(self, emails: List[Email]) -> pd.DataFrame:
|
|
"""
|
|
Extract features from batch of emails.
|
|
|
|
Args:
|
|
emails: List of Email objects
|
|
|
|
Returns:
|
|
DataFrame with all features
|
|
"""
|
|
# Extract features for each email
|
|
feature_dicts = [self.extract(email) for email in emails]
|
|
|
|
# Convert to DataFrame
|
|
df = pd.DataFrame(feature_dicts)
|
|
|
|
# Transform text features if vectorizer is fitted
|
|
if self.text_vectorizer and 'text' in df.columns:
|
|
if hasattr(self.text_vectorizer, 'vocabulary_'):
|
|
text_features = self.text_vectorizer.transform(df['text'])
|
|
text_df = pd.DataFrame(
|
|
text_features.toarray(),
|
|
columns=[f"text_{i}" for i in range(text_features.shape[1])]
|
|
)
|
|
df = pd.concat([df.drop('text', axis=1), text_df], axis=1)
|
|
else:
|
|
df = df.drop('text', axis=1)
|
|
|
|
return df
|
|
|
|
def fit_text_vectorizer(self, emails: List[Email]):
|
|
"""Fit text vectorizer on corpus."""
|
|
texts = [f"{e.subject} {e.body_snippet}" for e in emails]
|
|
self.text_vectorizer.fit(texts)
|
|
logger.info(f"Fitted vectorizer with {len(self.text_vectorizer.vocabulary_)} features")
|
|
```
|
|
|
|
**Test:**
|
|
```bash
|
|
# Create mock email and test
|
|
python -c "
|
|
from src.email_providers.base import Email
|
|
from src.classification.feature_extractor import FeatureExtractor
|
|
from datetime import datetime
|
|
|
|
email = Email(
|
|
id='1',
|
|
subject='Meeting at 3pm',
|
|
sender='john@company.com',
|
|
date=datetime.now(),
|
|
body='Let us meet to discuss the project',
|
|
has_attachments=True
|
|
)
|
|
|
|
extractor = FeatureExtractor()
|
|
features = extractor.extract(email)
|
|
print(features)
|
|
"
|
|
```
|
|
|
|
---
|
|
|
|
## PHASE 6: ML CLASSIFIER (BLOCKER - NEED MODEL)
|
|
|
|
### Step 15: ML Classifier Wrapper (src/classification/ml_classifier.py)
|
|
|
|
```python
|
|
"""ML-based email classifier."""
|
|
import logging
|
|
import pickle
|
|
from typing import Dict, List, Any
|
|
import numpy as np
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MLClassifier:
|
|
"""Wrapper for pre-trained ML classification model."""
|
|
|
|
def __init__(self, model_path: str = "src/models/pretrained/classifier.pkl"):
|
|
"""Load pre-trained model."""
|
|
self.model = None
|
|
self.label_encoder = None
|
|
self.categories = []
|
|
self.feature_names = []
|
|
|
|
self._load_model(model_path)
|
|
|
|
def _load_model(self, model_path: str):
|
|
"""Load model from file."""
|
|
try:
|
|
with open(model_path, 'rb') as f:
|
|
model_data = pickle.load(f)
|
|
|
|
self.model = model_data['model']
|
|
self.label_encoder = model_data.get('label_encoder')
|
|
self.categories = model_data.get('categories', [])
|
|
self.feature_names = model_data.get('feature_names', [])
|
|
|
|
logger.info(f"Loaded ML model with {len(self.categories)} categories")
|
|
|
|
except FileNotFoundError:
|
|
logger.warning(f"Model file not found: {model_path}")
|
|
logger.warning("Will need to train model or use alternative classification")
|
|
except Exception as e:
|
|
logger.error(f"Error loading model: {e}")
|
|
|
|
def predict(self, features: np.ndarray) -> Dict[str, Any]:
|
|
"""
|
|
Predict category for feature vector.
|
|
|
|
Args:
|
|
features: Feature vector or DataFrame row
|
|
|
|
Returns:
|
|
{
|
|
'category': str,
|
|
'confidence': float,
|
|
'probabilities': Dict[str, float]
|
|
}
|
|
"""
|
|
if self.model is None:
|
|
return {
|
|
'category': 'unknown',
|
|
'confidence': 0.0,
|
|
'probabilities': {},
|
|
'error': 'Model not loaded'
|
|
}
|
|
|
|
# Get probabilities
|
|
probs = self.model.predict_proba([features])[0]
|
|
|
|
# Get predicted class
|
|
pred_class = np.argmax(probs)
|
|
category = self.categories[pred_class]
|
|
confidence = float(probs[pred_class])
|
|
|
|
# All probabilities
|
|
prob_dict = {
|
|
self.categories[i]: float(probs[i])
|
|
for i in range(len(self.categories))
|
|
}
|
|
|
|
return {
|
|
'category': category,
|
|
'confidence': confidence,
|
|
'probabilities': prob_dict
|
|
}
|
|
|
|
def predict_batch(self, features: np.ndarray) -> List[Dict[str, Any]]:
|
|
"""Predict for batch of feature vectors."""
|
|
return [self.predict(f) for f in features]
|
|
```
|
|
|
|
### ⚠️ CRITICAL: You need to either:
|
|
|
|
**Option A: Create a placeholder model for testing**
|
|
```python
|
|
# scripts/create_mock_model.py
|
|
import pickle
|
|
from sklearn.ensemble import RandomForestClassifier
|
|
import numpy as np
|
|
|
|
# Create dummy model
|
|
model = RandomForestClassifier(n_estimators=10)
|
|
X_dummy = np.random.rand(100, 50)
|
|
y_dummy = np.random.randint(0, 12, 100)
|
|
model.fit(X_dummy, y_dummy)
|
|
|
|
categories = [
|
|
'junk', 'transactional', 'auth', 'newsletters',
|
|
'social', 'automated', 'conversational', 'work',
|
|
'personal', 'finance', 'travel', 'unknown'
|
|
]
|
|
|
|
model_data = {
|
|
'model': model,
|
|
'categories': categories,
|
|
'feature_names': [f'feature_{i}' for i in range(50)]
|
|
}
|
|
|
|
with open('src/models/pretrained/classifier.pkl', 'wb') as f:
|
|
pickle.dump(model_data, f)
|
|
|
|
print("Mock model created!")
|
|
```
|
|
|
|
**Option B: Train a real model (recommended)**
|
|
See scripts/train_model.py (to be created in next phase)
|
|
|
|
---
|
|
|
|
## PHASE 7: LLM INTEGRATION
|
|
|
|
### Step 16: LLM Classifier (src/classification/llm_classifier.py)
|
|
|
|
```python
|
|
"""LLM-based email classifier using Ollama."""
|
|
import logging
|
|
import json
|
|
import re
|
|
from typing import Dict, List, Any
|
|
from abc import ABC, abstractmethod
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BaseLLMProvider(ABC):
|
|
"""Abstract LLM provider."""
|
|
|
|
@abstractmethod
|
|
def complete(self, prompt: str, **kwargs) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def test_connection(self) -> bool:
|
|
pass
|
|
|
|
|
|
class OllamaProvider(BaseLLMProvider):
|
|
"""Ollama local LLM provider."""
|
|
|
|
def __init__(self, model: str = "qwen2.5:1.5b", base_url: str = "http://localhost:11434"):
|
|
try:
|
|
import ollama
|
|
self.client = ollama.Client(host=base_url)
|
|
self.model = model
|
|
logger.info(f"Initialized Ollama provider with model {model}")
|
|
except ImportError:
|
|
logger.error("ollama package not installed. Run: pip install ollama")
|
|
self.client = None
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Ollama: {e}")
|
|
self.client = None
|
|
|
|
def complete(self, prompt: str, **kwargs) -> str:
|
|
if not self.client:
|
|
raise RuntimeError("Ollama client not available")
|
|
|
|
response = self.client.generate(
|
|
model=self.model,
|
|
prompt=prompt,
|
|
options={
|
|
'temperature': kwargs.get('temperature', 0.1),
|
|
'num_predict': kwargs.get('max_tokens', 500)
|
|
}
|
|
)
|
|
return response['response']
|
|
|
|
def test_connection(self) -> bool:
|
|
try:
|
|
self.client.list()
|
|
return True
|
|
except:
|
|
return False
|
|
|
|
|
|
class LLMClassifier:
|
|
"""Email classifier using LLM."""
|
|
|
|
def __init__(self, provider: BaseLLMProvider, categories: Dict[str, Dict], config: Dict):
|
|
self.provider = provider
|
|
self.categories = categories
|
|
self.config = config
|
|
self.classification_prompt = self._load_prompt_template()
|
|
|
|
def _load_prompt_template(self) -> str:
|
|
"""Load or create classification prompt."""
|
|
# Try to load from file first
|
|
try:
|
|
with open('prompts/classification.txt', 'r') as f:
|
|
return f.read()
|
|
except FileNotFoundError:
|
|
# Use default prompt
|
|
return """You are an expert email classifier.
|
|
|
|
CATEGORIES:
|
|
{categories}
|
|
|
|
EMAIL:
|
|
Subject: {subject}
|
|
From: {sender}
|
|
Has Attachments: {has_attachments}
|
|
Body Snippet: {body_snippet}
|
|
|
|
ML Prediction: {ml_prediction} (confidence: {ml_confidence})
|
|
|
|
Respond with JSON only:
|
|
{{
|
|
"category": "chosen_category",
|
|
"confidence": 0.85,
|
|
"reasoning": "brief explanation"
|
|
}}
|
|
"""
|
|
|
|
def classify(self, email: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Classify email using LLM."""
|
|
# Build prompt
|
|
categories_str = "\n".join([
|
|
f"- {name}: {info['description']}"
|
|
for name, info in self.categories.items()
|
|
])
|
|
|
|
ml_pred = email.get('ml_prediction', {})
|
|
|
|
prompt = self.classification_prompt.format(
|
|
categories=categories_str,
|
|
subject=email.get('subject', 'N/A'),
|
|
sender=email.get('sender', 'N/A'),
|
|
has_attachments=email.get('has_attachments', False),
|
|
body_snippet=email.get('body_snippet', '')[:300],
|
|
ml_prediction=ml_pred.get('category', 'unknown'),
|
|
ml_confidence=ml_pred.get('confidence', 0.0)
|
|
)
|
|
|
|
try:
|
|
# Get LLM response
|
|
response = self.provider.complete(
|
|
prompt,
|
|
temperature=self.config['llm']['temperature'],
|
|
max_tokens=self.config['llm']['max_tokens']
|
|
)
|
|
|
|
# Parse JSON response
|
|
result = self._parse_response(response)
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"LLM classification failed: {e}")
|
|
return {
|
|
'category': 'unknown',
|
|
'confidence': 0.0,
|
|
'reasoning': f'Error: {str(e)}',
|
|
'error': True
|
|
}
|
|
|
|
def _parse_response(self, response: str) -> Dict[str, Any]:
|
|
"""Parse LLM JSON response."""
|
|
# Try to extract JSON
|
|
json_match = re.search(r'\{.*\}', response, re.DOTALL)
|
|
if json_match:
|
|
try:
|
|
return json.loads(json_match.group())
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Fallback parsing
|
|
return {
|
|
'category': 'unknown',
|
|
'confidence': 0.5,
|
|
'reasoning': response[:200]
|
|
}
|
|
```
|
|
|
|
**Test (requires Ollama running):**
|
|
```bash
|
|
# First: Install and start Ollama
|
|
# ollama pull qwen2.5:1.5b
|
|
|
|
python -c "
|
|
from src.classification.llm_classifier import OllamaProvider, LLMClassifier
|
|
from src.utils.config import load_categories, load_config
|
|
|
|
provider = OllamaProvider()
|
|
categories = load_categories()
|
|
config = load_config()
|
|
|
|
classifier = LLMClassifier(provider, categories, config)
|
|
|
|
email = {
|
|
'subject': 'Your verification code is 123456',
|
|
'sender': 'noreply@bank.com',
|
|
'has_attachments': False,
|
|
'body_snippet': 'Your one-time password is 123456',
|
|
'ml_prediction': {'category': 'auth', 'confidence': 0.65}
|
|
}
|
|
|
|
result = classifier.classify(email)
|
|
print(result)
|
|
"
|
|
```
|
|
|
|
---
|
|
|
|
## NEXT PHASES
|
|
|
|
Due to length limits, the remaining phases are:
|
|
|
|
### Phase 8: Adaptive Classifier
|
|
- Dynamic threshold adjustment
|
|
- Sender rule learning
|
|
- Classification orchestration
|
|
|
|
### Phase 9: Processing Pipeline
|
|
- Bulk processor
|
|
- Queue management
|
|
- Checkpointing
|
|
|
|
### Phase 10: Calibration System
|
|
- Email sampling
|
|
- LLM calibration analysis
|
|
- Validation
|
|
|
|
### Phase 11: Export & Sync
|
|
- Results exporter
|
|
- Gmail sync
|
|
- Report generation
|
|
|
|
### Phase 12: Main CLI
|
|
- Click interface
|
|
- End-to-end orchestration
|
|
|
|
### Phase 13: Testing
|
|
- Unit tests
|
|
- Integration tests
|
|
- Full pipeline test on Marion's inbox
|
|
|
|
---
|
|
|
|
## TESTING STRATEGY
|
|
|
|
### Unit Testing
|
|
```bash
|
|
pytest tests/test_classification.py -v
|
|
```
|
|
|
|
### Integration Testing
|
|
```bash
|
|
# Test on 100 emails
|
|
python src/main.py --source gmail --credentials creds.json --output test/ --limit 100
|
|
|
|
# Test on 1000 emails
|
|
python src/main.py --source gmail --credentials creds.json --output test/ --limit 1000
|
|
```
|
|
|
|
### Full Pipeline
|
|
```bash
|
|
# Run on Marion's full inbox
|
|
python src/main.py --source gmail --credentials marion-creds.json --output results/
|
|
```
|
|
|
|
---
|
|
|
|
## CRITICAL NEXT STEPS
|
|
|
|
1. **DECIDE: ML Model Strategy**
|
|
- Option A: Create mock model for immediate testing
|
|
- Option B: Train real model (takes 1-2 days)
|
|
|
|
2. **Set up Gmail OAuth**
|
|
- Google Cloud Console
|
|
- Enable Gmail API
|
|
- Download credentials.json
|
|
|
|
3. **Install and test Ollama**
|
|
- Download from ollama.ai
|
|
- Pull model: `ollama pull qwen2.5:1.5b`
|
|
- Test: `ollama run qwen2.5:1.5b "test"`
|
|
|
|
4. **Continue building**
|
|
- Next: Adaptive Classifier
|
|
- Then: Processing Pipeline
|
|
- Then: Full integration
|
|
|
|
---
|
|
|
|
**THIS IS THE ACTUAL BUILD GUIDE**
|
|
|
|
Everything in this document provides real, executable steps to build the system.
|