email-sorter/docs/BUILD_INSTRUCTIONS.md
FSSCoding 53174a34eb Organize project structure and add MVP features
Project Reorganization:
- Created docs/ directory and moved all documentation
- Created scripts/ directory for shell scripts
- Created scripts/experimental/ for research scripts
- Updated .gitignore for new structure
- Updated README.md with MVP status and new structure

New Features:
- Category verification system (verify_model_categories)
- --verify-categories flag for mailbox compatibility check
- --no-llm-fallback flag for pure ML classification
- Trained model saved in src/models/calibrated/

Threshold Optimization:
- Reduced default threshold from 0.75 to 0.55
- Updated all category thresholds to 0.55
- Reduces LLM fallback rate by 40% (35% -> 21%)

Documentation:
- SYSTEM_FLOW.html - Complete system architecture
- VERIFY_CATEGORIES_FEATURE.html - Feature documentation
- LABEL_TRAINING_PHASE_DETAIL.html - Calibration breakdown
- FAST_ML_ONLY_WORKFLOW.html - Pure ML guide
- PROJECT_STATUS_AND_NEXT_STEPS.html - Roadmap
- ROOT_CAUSE_ANALYSIS.md - Bug fixes

MVP Status:
- 10k emails in 4 minutes, 72.7% accuracy, 0 LLM calls
- LLM-driven category discovery working
- Embedding-based transfer learning confirmed
- All model paths verified and working
2025-10-25 14:46:58 +11:00

33 KiB

EMAIL SORTER - BUILD INSTRUCTIONS

Step-by-Step Implementation Guide

Version: 1.0 Date: 2024-10-21


PREREQUISITES

Required Software

  • Python 3.8 or higher
  • Git
  • Ollama (for local LLM)
  • Text editor / IDE

Required Accounts

  • Gmail account (for testing)
  • Google Cloud Console project (for Gmail API)

Skills Needed

  • Python programming
  • Basic understanding of ML concepts
  • Command line comfort
  • OAuth 2.0 basics

IMPLEMENTATION ORDER

Build in this exact order. Each phase depends on previous phases.


PHASE 1: PROJECT SETUP

Step 1: Initialize Git Repository

cd C:\Users\BrettFox\Documents\Claude\email-sorter
git init
git add .
git commit -m "Initial commit - project blueprint"

Step 2: Create Virtual Environment

# Create venv
python -m venv venv

# Activate (Windows)
venv\Scripts\activate

# Activate (Linux/Mac)
source venv/bin/activate

Step 3: Create requirements.txt

Already exists, but verify contents:

# Core
python-dotenv>=1.0.0
pyyaml>=6.0
pydantic>=2.0.0

# Email Providers
google-api-python-client>=2.100.0
google-auth-httplib2>=0.1.1
google-auth-oauthlib>=1.1.0
msal>=1.24.0
imapclient>=2.3.1

# Machine Learning
scikit-learn>=1.3.0
xgboost>=2.0.0
lightgbm>=4.0.0
pandas>=2.0.0
numpy>=1.24.0

# LLM Integration
ollama>=0.1.0

# Text Processing
nltk>=3.8
beautifulsoup4>=4.12.0
lxml>=4.9.0

# Utilities
tqdm>=4.66.0
click>=8.1.0
rich>=13.0.0
joblib>=1.3.0
tenacity>=8.2.0

# Testing
pytest>=7.4.0
pytest-cov>=4.1.0
pytest-mock>=3.11.0

Step 4: Install Dependencies

pip install -r requirements.txt

Step 5: Create .gitignore

# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
env/
venv/
*.egg-info/
dist/
build/

# Data and Models
data/training/
src/models/pretrained/*.pkl
*.h5
*.joblib

# Credentials
.env
credentials/
*.json
!config/*.json

# Logs
logs/*.log
*.log

# IDE
.vscode/
.idea/
*.swp

# OS
.DS_Store
Thumbs.db

# Checkpoints
checkpoints/
*.checkpoint

# Results
results/
output/

Step 6: Create Directory Structure

# Create all directories
mkdir -p src/calibration
mkdir -p src/classification
mkdir -p src/models/pretrained
mkdir -p src/email_providers
mkdir -p src/processing
mkdir -p src/adjustment
mkdir -p src/export
mkdir -p src/utils
mkdir -p tests
mkdir -p prompts
mkdir -p scripts
mkdir -p data/samples
mkdir -p logs
mkdir -p config

# Create __init__.py files
touch src/__init__.py
touch src/calibration/__init__.py
touch src/classification/__init__.py
touch src/models/__init__.py
touch src/email_providers/__init__.py
touch src/processing/__init__.py
touch src/adjustment/__init__.py
touch src/export/__init__.py
touch src/utils/__init__.py
touch tests/__init__.py

# Windows equivalent:
# type nul > src\__init__.py
# (repeat for each)

PHASE 2: CORE INFRASTRUCTURE

Step 7: Config System (src/utils/config.py)

Create the configuration loader:

"""Configuration management."""
import yaml
from pathlib import Path
from typing import Dict, Any
from pydantic import BaseModel


class Config(BaseModel):
    """Main configuration model."""
    version: str
    calibration: Dict[str, Any]
    processing: Dict[str, Any]
    classification: Dict[str, Any]
    llm: Dict[str, Any]
    email_providers: Dict[str, Any]
    features: Dict[str, Any]
    export: Dict[str, Any]
    logging: Dict[str, Any]
    cleanup: Dict[str, Any]

    class Config:
        extra = "allow"


def load_config(config_path: str = "config/default_config.yaml") -> Config:
    """Load configuration from YAML file."""
    with open(config_path, 'r') as f:
        config_dict = yaml.safe_load(f)
    return Config(**config_dict)


def load_categories(categories_path: str = "config/categories.yaml") -> Dict[str, Dict]:
    """Load category definitions."""
    with open(categories_path, 'r') as f:
        data = yaml.safe_load(f)
    return data['categories']


def load_features(features_path: str = "config/features.yaml") -> Dict[str, Any]:
    """Load feature configuration."""
    with open(features_path, 'r') as f:
        return yaml.safe_load(f)

Test:

python -c "from src.utils.config import load_config; print(load_config())"

Step 8: Logging System (src/utils/logging.py)

"""Logging configuration."""
import logging
import sys
from pathlib import Path
from rich.logging import RichHandler


def setup_logging(config: dict):
    """Setup logging with console and file handlers."""
    log_level = config.get('level', 'INFO')
    log_file = config.get('file', 'logs/email-sorter.log')

    # Create logs directory
    Path(log_file).parent.mkdir(parents=True, exist_ok=True)

    # Create logger
    logger = logging.getLogger()
    logger.setLevel(log_level)

    # Remove existing handlers
    logger.handlers = []

    # Console handler with rich formatting
    console_handler = RichHandler(
        rich_tracebacks=True,
        markup=True,
        show_time=True,
        show_path=False
    )
    console_handler.setLevel(log_level)
    console_formatter = logging.Formatter('%(message)s')
    console_handler.setFormatter(console_formatter)

    # File handler
    file_handler = logging.FileHandler(log_file)
    file_handler.setLevel(log_level)
    file_formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    file_handler.setFormatter(file_formatter)

    # Add handlers
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)

    return logger

Step 9: Email Data Models (src/email_providers/base.py)

"""Base email provider interface and data models."""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Any, Optional


@dataclass
class Email:
    """Unified email data model."""
    id: str
    subject: str
    sender: str
    sender_name: Optional[str] = None
    date: Optional[datetime] = None
    body: str = ""
    body_snippet: str = ""
    has_attachments: bool = False
    attachments: List[Dict] = field(default_factory=list)
    headers: Dict = field(default_factory=dict)
    labels: List[str] = field(default_factory=list)
    is_read: bool = False

    def __post_init__(self):
        """Generate body_snippet if not provided."""
        if not self.body_snippet and self.body:
            self.body_snippet = self.body[:500]


class BaseProvider(ABC):
    """Abstract base class for email providers."""

    @abstractmethod
    def connect(self, credentials: Dict) -> bool:
        """Establish connection to email provider."""
        pass

    @abstractmethod
    def fetch_emails(self, limit: int = None, filters: Dict = None) -> List[Email]:
        """Fetch emails from provider."""
        pass

    @abstractmethod
    def update_labels(self, email_id: str, labels: List[str]) -> bool:
        """Update email labels/folders."""
        pass

    @abstractmethod
    def batch_update(self, updates: List[Dict]) -> bool:
        """Batch update multiple emails."""
        pass

    @abstractmethod
    def disconnect(self):
        """Close connection."""
        pass

Test:

python -c "from src.email_providers.base import Email; e = Email(id='1', subject='Test', sender='test@test.com'); print(e)"

PHASE 3: CONFIGURATION FILES

Step 10: Create config/default_config.yaml

version: "1.0.0"

calibration:
  sample_size: 1500
  sample_strategy: "stratified"
  validation_size: 300
  min_confidence: 0.6

processing:
  batch_size: 100
  llm_queue_size: 100
  parallel_workers: 4
  checkpoint_interval: 1000

classification:
  default_threshold: 0.75
  min_threshold: 0.60
  max_threshold: 0.90
  adjustment_step: 0.05
  adjustment_frequency: 1000
  category_thresholds:
    junk: 0.85
    auth: 0.80
    conversational: 0.65

llm:
  provider: "ollama"
  model: "qwen2.5:1.5b"
  base_url: "http://localhost:11434"
  temperature: 0.1
  max_tokens: 500
  timeout: 30
  retry_attempts: 3

email_providers:
  gmail:
    batch_size: 100
  microsoft:
    batch_size: 100
  imap:
    timeout: 30
    batch_size: 50

features:
  text_features:
    max_vocab_size: 10000
    ngram_range: [1, 2]
    min_df: 2
    max_df: 0.95

export:
  format: "json"
  include_confidence: true
  create_report: true

logging:
  level: "INFO"
  file: "logs/email-sorter.log"

cleanup:
  delete_temp_files: true
  delete_repo_after: false

Step 11: Create config/categories.yaml

(See PROJECT_BLUEPRINT.md for full content)

Step 12: Create config/features.yaml

(See PROJECT_BLUEPRINT.md for full content)

Test:

python -c "from src.utils.config import load_config, load_categories; print(load_config()); print(load_categories())"

PHASE 4: EMAIL PROVIDERS

Step 13: Gmail Provider (src/email_providers/gmail.py)

"""Gmail API provider implementation."""
import base64
import logging
from typing import List, Dict, Optional
from datetime import datetime
from email.utils import parsedate_to_datetime

from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

from .base import BaseProvider, Email

logger = logging.getLogger(__name__)


class GmailProvider(BaseProvider):
    """Gmail API email provider."""

    SCOPES = [
        'https://www.googleapis.com/auth/gmail.readonly',
        'https://www.googleapis.com/auth/gmail.modify'
    ]

    def __init__(self):
        self.service = None
        self.user_id = 'me'

    def connect(self, credentials_path: str) -> bool:
        """Connect to Gmail API using OAuth credentials."""
        try:
            # For first-time auth
            flow = InstalledAppFlow.from_client_secrets_file(
                credentials_path, self.SCOPES
            )
            creds = flow.run_local_server(port=0)

            self.service = build('gmail', 'v1', credentials=creds)
            logger.info("Connected to Gmail API")
            return True

        except Exception as e:
            logger.error(f"Failed to connect to Gmail: {e}")
            return False

    def fetch_emails(self, limit: int = None, filters: Dict = None) -> List[Email]:
        """Fetch emails from Gmail."""
        emails = []

        try:
            # Build query
            query = filters.get('query', '') if filters else ''

            # Get message IDs
            results = self.service.users().messages().list(
                userId=self.user_id,
                q=query,
                maxResults=min(limit or 500, 500) if limit else 500
            ).execute()

            messages = results.get('messages', [])

            # Fetch full messages
            for msg_info in messages:
                email = self._fetch_message(msg_info['id'])
                if email:
                    emails.append(email)
                    if limit and len(emails) >= limit:
                        break

            logger.info(f"Fetched {len(emails)} emails from Gmail")
            return emails

        except HttpError as e:
            logger.error(f"Gmail API error: {e}")
            return emails

    def _fetch_message(self, msg_id: str) -> Optional[Email]:
        """Fetch and parse a single message."""
        try:
            msg = self.service.users().messages().get(
                userId=self.user_id,
                id=msg_id,
                format='full'
            ).execute()

            return self._parse_message(msg)

        except Exception as e:
            logger.error(f"Error fetching message {msg_id}: {e}")
            return None

    def _parse_message(self, msg: Dict) -> Email:
        """Parse Gmail message into Email object."""
        headers = {h['name']: h['value'] for h in msg['payload']['headers']}

        # Extract body
        body = self._get_body(msg['payload'])

        # Parse date
        date = None
        if 'Date' in headers:
            try:
                date = parsedate_to_datetime(headers['Date'])
            except:
                pass

        # Check attachments
        has_attachments = False
        attachments = []
        if 'parts' in msg['payload']:
            for part in msg['payload']['parts']:
                if part.get('filename'):
                    has_attachments = True
                    attachments.append({
                        'filename': part['filename'],
                        'mime_type': part['mimeType'],
                        'size': part.get('body', {}).get('size', 0)
                    })

        return Email(
            id=msg['id'],
            subject=headers.get('Subject', 'No Subject'),
            sender=headers.get('From', ''),
            date=date,
            body=body,
            has_attachments=has_attachments,
            attachments=attachments,
            headers=headers,
            labels=msg.get('labelIds', []),
            is_read='UNREAD' not in msg.get('labelIds', [])
        )

    def _get_body(self, payload: Dict) -> str:
        """Extract email body from payload."""
        body = ""

        if 'body' in payload and 'data' in payload['body']:
            body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8', errors='ignore')
        elif 'parts' in payload:
            for part in payload['parts']:
                if part['mimeType'] == 'text/plain':
                    if 'data' in part['body']:
                        body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8', errors='ignore')
                        break

        return body

    def update_labels(self, email_id: str, labels: List[str]) -> bool:
        """Update labels for a single email."""
        try:
            self.service.users().messages().modify(
                userId=self.user_id,
                id=email_id,
                body={'addLabelIds': labels}
            ).execute()
            return True
        except Exception as e:
            logger.error(f"Error updating labels: {e}")
            return False

    def batch_update(self, updates: List[Dict]) -> bool:
        """Batch update multiple emails."""
        try:
            batch_size = 100

            for i in range(0, len(updates), batch_size):
                batch = updates[i:i+batch_size]
                email_ids = [u['email_id'] for u in batch]
                labels = list(set([l for u in batch for l in u.get('labels', [])]))

                self.service.users().messages().batchModify(
                    userId=self.user_id,
                    body={
                        'ids': email_ids,
                        'addLabelIds': labels
                    }
                ).execute()

            logger.info(f"Batch updated {len(updates)} emails")
            return True

        except Exception as e:
            logger.error(f"Batch update error: {e}")
            return False

    def disconnect(self):
        """Close connection."""
        self.service = None
        logger.info("Disconnected from Gmail")

Test (requires Gmail OAuth setup):

# First: Set up OAuth in Google Cloud Console
# Download credentials.json
python -c "from src.email_providers.gmail import GmailProvider; p = GmailProvider(); p.connect('credentials.json'); emails = p.fetch_emails(limit=10); print(f'Fetched {len(emails)} emails')"

PHASE 5: FEATURE EXTRACTION

Step 14: Feature Extractor (src/classification/feature_extractor.py)

"""Feature extraction from emails."""
import re
import logging
from typing import Dict, List, Any
from datetime import datetime
from urllib.parse import urlparse

import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer

from src.email_providers.base import Email

logger = logging.getLogger(__name__)


class FeatureExtractor:
    """Extract features from emails for classification."""

    def __init__(self, config: Dict = None):
        """Initialize with feature configuration."""
        self.config = config or {
            'text_features': {
                'max_features': 10000,
                'ngram_range': [1, 2],
                'min_df': 2,
                'max_df': 0.95
            }
        }

        self.text_vectorizer = None
        self._initialize_vectorizer()

    def _initialize_vectorizer(self):
        """Initialize TF-IDF vectorizer."""
        text_config = self.config.get('text_features', {})
        self.text_vectorizer = TfidfVectorizer(
            max_features=text_config.get('max_features', 10000),
            ngram_range=tuple(text_config.get('ngram_range', [1, 2])),
            min_df=text_config.get('min_df', 2),
            max_df=text_config.get('max_df', 0.95),
            sublinear_tf=True
        )

    def extract(self, email: Email) -> Dict[str, Any]:
        """
        Extract features from a single email.

        Args:
            email: Email object

        Returns:
            Dictionary of features
        """
        features = {}

        # Text for TF-IDF
        features['text'] = f"{email.subject} {email.body_snippet}"

        # Structural features
        features.update(self._extract_structural(email))

        # Sender features
        features.update(self._extract_sender(email))

        # Pattern features
        features.update(self._extract_patterns(email))

        return features

    def _extract_structural(self, email: Email) -> Dict[str, Any]:
        """Extract structural features."""
        features = {}

        # Attachments
        features['has_attachments'] = email.has_attachments
        features['attachment_count'] = len(email.attachments)

        # Links and images
        body = email.body or email.body_snippet
        features['link_count'] = len(re.findall(r'https?://', body))
        features['image_count'] = len(re.findall(r'<img', body, re.IGNORECASE))

        # Lengths
        features['body_length'] = len(body)
        features['subject_length'] = len(email.subject)

        # Reply/Forward
        features['has_reply_prefix'] = bool(re.match(r'^(Re:|Fwd:)', email.subject, re.IGNORECASE))

        # Time features
        if email.date:
            hour = email.date.hour
            if 0 <= hour < 6:
                features['time_of_day'] = 'night'
            elif 6 <= hour < 12:
                features['time_of_day'] = 'morning'
            elif 12 <= hour < 18:
                features['time_of_day'] = 'afternoon'
            else:
                features['time_of_day'] = 'evening'

            features['day_of_week'] = email.date.strftime('%A').lower()
        else:
            features['time_of_day'] = 'unknown'
            features['day_of_week'] = 'unknown'

        return features

    def _extract_sender(self, email: Email) -> Dict[str, Any]:
        """Extract sender-based features."""
        features = {}

        sender = email.sender
        if '@' in sender:
            # Extract domain
            domain = sender.split('@')[1].lower()
            features['sender_domain'] = domain

            # Domain type
            freemail_domains = {'gmail.com', 'yahoo.com', 'hotmail.com', 'outlook.com', 'icloud.com'}
            noreply_patterns = ['noreply', 'no-reply', 'donotreply']
            marketing_patterns = ['marketing', 'newsletter', 'promo']

            if domain in freemail_domains:
                features['sender_domain_type'] = 'freemail'
            elif any(p in sender.lower() for p in noreply_patterns):
                features['sender_domain_type'] = 'noreply'
            elif any(p in sender.lower() for p in marketing_patterns):
                features['sender_domain_type'] = 'marketing'
            else:
                features['sender_domain_type'] = 'corporate'

            features['is_noreply'] = any(p in sender.lower() for p in noreply_patterns)
        else:
            features['sender_domain'] = 'unknown'
            features['sender_domain_type'] = 'unknown'
            features['is_noreply'] = False

        return features

    def _extract_patterns(self, email: Email) -> Dict[str, Any]:
        """Extract pattern-based features."""
        features = {}

        body = (email.body or email.body_snippet).lower()
        subject = email.subject.lower()
        combined = f"{subject} {body}"

        # Common patterns
        features['has_unsubscribe'] = 'unsubscribe' in combined
        features['has_otp_pattern'] = bool(re.search(r'\b\d{4,6}\b', combined))
        features['has_price'] = bool(re.search(r'\$\d+', combined))
        features['has_tracking_pattern'] = bool(re.search(r'tracking\s*(number|#)', combined))
        features['has_invoice_pattern'] = bool(re.search(r'(invoice|bill|receipt)\s*#?\d+', combined))
        features['has_meeting_pattern'] = bool(re.search(r'(meeting|call|zoom|teams)', combined))

        return features

    def extract_batch(self, emails: List[Email]) -> pd.DataFrame:
        """
        Extract features from batch of emails.

        Args:
            emails: List of Email objects

        Returns:
            DataFrame with all features
        """
        # Extract features for each email
        feature_dicts = [self.extract(email) for email in emails]

        # Convert to DataFrame
        df = pd.DataFrame(feature_dicts)

        # Transform text features if vectorizer is fitted
        if self.text_vectorizer and 'text' in df.columns:
            if hasattr(self.text_vectorizer, 'vocabulary_'):
                text_features = self.text_vectorizer.transform(df['text'])
                text_df = pd.DataFrame(
                    text_features.toarray(),
                    columns=[f"text_{i}" for i in range(text_features.shape[1])]
                )
                df = pd.concat([df.drop('text', axis=1), text_df], axis=1)
            else:
                df = df.drop('text', axis=1)

        return df

    def fit_text_vectorizer(self, emails: List[Email]):
        """Fit text vectorizer on corpus."""
        texts = [f"{e.subject} {e.body_snippet}" for e in emails]
        self.text_vectorizer.fit(texts)
        logger.info(f"Fitted vectorizer with {len(self.text_vectorizer.vocabulary_)} features")

Test:

# Create mock email and test
python -c "
from src.email_providers.base import Email
from src.classification.feature_extractor import FeatureExtractor
from datetime import datetime

email = Email(
    id='1',
    subject='Meeting at 3pm',
    sender='john@company.com',
    date=datetime.now(),
    body='Let us meet to discuss the project',
    has_attachments=True
)

extractor = FeatureExtractor()
features = extractor.extract(email)
print(features)
"

PHASE 6: ML CLASSIFIER (BLOCKER - NEED MODEL)

Step 15: ML Classifier Wrapper (src/classification/ml_classifier.py)

"""ML-based email classifier."""
import logging
import pickle
from typing import Dict, List, Any
import numpy as np
from pathlib import Path

logger = logging.getLogger(__name__)


class MLClassifier:
    """Wrapper for pre-trained ML classification model."""

    def __init__(self, model_path: str = "src/models/pretrained/classifier.pkl"):
        """Load pre-trained model."""
        self.model = None
        self.label_encoder = None
        self.categories = []
        self.feature_names = []

        self._load_model(model_path)

    def _load_model(self, model_path: str):
        """Load model from file."""
        try:
            with open(model_path, 'rb') as f:
                model_data = pickle.load(f)

            self.model = model_data['model']
            self.label_encoder = model_data.get('label_encoder')
            self.categories = model_data.get('categories', [])
            self.feature_names = model_data.get('feature_names', [])

            logger.info(f"Loaded ML model with {len(self.categories)} categories")

        except FileNotFoundError:
            logger.warning(f"Model file not found: {model_path}")
            logger.warning("Will need to train model or use alternative classification")
        except Exception as e:
            logger.error(f"Error loading model: {e}")

    def predict(self, features: np.ndarray) -> Dict[str, Any]:
        """
        Predict category for feature vector.

        Args:
            features: Feature vector or DataFrame row

        Returns:
            {
                'category': str,
                'confidence': float,
                'probabilities': Dict[str, float]
            }
        """
        if self.model is None:
            return {
                'category': 'unknown',
                'confidence': 0.0,
                'probabilities': {},
                'error': 'Model not loaded'
            }

        # Get probabilities
        probs = self.model.predict_proba([features])[0]

        # Get predicted class
        pred_class = np.argmax(probs)
        category = self.categories[pred_class]
        confidence = float(probs[pred_class])

        # All probabilities
        prob_dict = {
            self.categories[i]: float(probs[i])
            for i in range(len(self.categories))
        }

        return {
            'category': category,
            'confidence': confidence,
            'probabilities': prob_dict
        }

    def predict_batch(self, features: np.ndarray) -> List[Dict[str, Any]]:
        """Predict for batch of feature vectors."""
        return [self.predict(f) for f in features]

⚠️ CRITICAL: You need to either:

Option A: Create a placeholder model for testing

# scripts/create_mock_model.py
import pickle
from sklearn.ensemble import RandomForestClassifier
import numpy as np

# Create dummy model
model = RandomForestClassifier(n_estimators=10)
X_dummy = np.random.rand(100, 50)
y_dummy = np.random.randint(0, 12, 100)
model.fit(X_dummy, y_dummy)

categories = [
    'junk', 'transactional', 'auth', 'newsletters',
    'social', 'automated', 'conversational', 'work',
    'personal', 'finance', 'travel', 'unknown'
]

model_data = {
    'model': model,
    'categories': categories,
    'feature_names': [f'feature_{i}' for i in range(50)]
}

with open('src/models/pretrained/classifier.pkl', 'wb') as f:
    pickle.dump(model_data, f)

print("Mock model created!")

Option B: Train a real model (recommended) See scripts/train_model.py (to be created in next phase)


PHASE 7: LLM INTEGRATION

Step 16: LLM Classifier (src/classification/llm_classifier.py)

"""LLM-based email classifier using Ollama."""
import logging
import json
import re
from typing import Dict, List, Any
from abc import ABC, abstractmethod

logger = logging.getLogger(__name__)


class BaseLLMProvider(ABC):
    """Abstract LLM provider."""

    @abstractmethod
    def complete(self, prompt: str, **kwargs) -> str:
        pass

    @abstractmethod
    def test_connection(self) -> bool:
        pass


class OllamaProvider(BaseLLMProvider):
    """Ollama local LLM provider."""

    def __init__(self, model: str = "qwen2.5:1.5b", base_url: str = "http://localhost:11434"):
        try:
            import ollama
            self.client = ollama.Client(host=base_url)
            self.model = model
            logger.info(f"Initialized Ollama provider with model {model}")
        except ImportError:
            logger.error("ollama package not installed. Run: pip install ollama")
            self.client = None
        except Exception as e:
            logger.error(f"Failed to initialize Ollama: {e}")
            self.client = None

    def complete(self, prompt: str, **kwargs) -> str:
        if not self.client:
            raise RuntimeError("Ollama client not available")

        response = self.client.generate(
            model=self.model,
            prompt=prompt,
            options={
                'temperature': kwargs.get('temperature', 0.1),
                'num_predict': kwargs.get('max_tokens', 500)
            }
        )
        return response['response']

    def test_connection(self) -> bool:
        try:
            self.client.list()
            return True
        except:
            return False


class LLMClassifier:
    """Email classifier using LLM."""

    def __init__(self, provider: BaseLLMProvider, categories: Dict[str, Dict], config: Dict):
        self.provider = provider
        self.categories = categories
        self.config = config
        self.classification_prompt = self._load_prompt_template()

    def _load_prompt_template(self) -> str:
        """Load or create classification prompt."""
        # Try to load from file first
        try:
            with open('prompts/classification.txt', 'r') as f:
                return f.read()
        except FileNotFoundError:
            # Use default prompt
            return """You are an expert email classifier.

CATEGORIES:
{categories}

EMAIL:
Subject: {subject}
From: {sender}
Has Attachments: {has_attachments}
Body Snippet: {body_snippet}

ML Prediction: {ml_prediction} (confidence: {ml_confidence})

Respond with JSON only:
{{
  "category": "chosen_category",
  "confidence": 0.85,
  "reasoning": "brief explanation"
}}
"""

    def classify(self, email: Dict[str, Any]) -> Dict[str, Any]:
        """Classify email using LLM."""
        # Build prompt
        categories_str = "\n".join([
            f"- {name}: {info['description']}"
            for name, info in self.categories.items()
        ])

        ml_pred = email.get('ml_prediction', {})

        prompt = self.classification_prompt.format(
            categories=categories_str,
            subject=email.get('subject', 'N/A'),
            sender=email.get('sender', 'N/A'),
            has_attachments=email.get('has_attachments', False),
            body_snippet=email.get('body_snippet', '')[:300],
            ml_prediction=ml_pred.get('category', 'unknown'),
            ml_confidence=ml_pred.get('confidence', 0.0)
        )

        try:
            # Get LLM response
            response = self.provider.complete(
                prompt,
                temperature=self.config['llm']['temperature'],
                max_tokens=self.config['llm']['max_tokens']
            )

            # Parse JSON response
            result = self._parse_response(response)
            return result

        except Exception as e:
            logger.error(f"LLM classification failed: {e}")
            return {
                'category': 'unknown',
                'confidence': 0.0,
                'reasoning': f'Error: {str(e)}',
                'error': True
            }

    def _parse_response(self, response: str) -> Dict[str, Any]:
        """Parse LLM JSON response."""
        # Try to extract JSON
        json_match = re.search(r'\{.*\}', response, re.DOTALL)
        if json_match:
            try:
                return json.loads(json_match.group())
            except json.JSONDecodeError:
                pass

        # Fallback parsing
        return {
            'category': 'unknown',
            'confidence': 0.5,
            'reasoning': response[:200]
        }

Test (requires Ollama running):

# First: Install and start Ollama
# ollama pull qwen2.5:1.5b

python -c "
from src.classification.llm_classifier import OllamaProvider, LLMClassifier
from src.utils.config import load_categories, load_config

provider = OllamaProvider()
categories = load_categories()
config = load_config()

classifier = LLMClassifier(provider, categories, config)

email = {
    'subject': 'Your verification code is 123456',
    'sender': 'noreply@bank.com',
    'has_attachments': False,
    'body_snippet': 'Your one-time password is 123456',
    'ml_prediction': {'category': 'auth', 'confidence': 0.65}
}

result = classifier.classify(email)
print(result)
"

NEXT PHASES

Due to length limits, the remaining phases are:

Phase 8: Adaptive Classifier

  • Dynamic threshold adjustment
  • Sender rule learning
  • Classification orchestration

Phase 9: Processing Pipeline

  • Bulk processor
  • Queue management
  • Checkpointing

Phase 10: Calibration System

  • Email sampling
  • LLM calibration analysis
  • Validation

Phase 11: Export & Sync

  • Results exporter
  • Gmail sync
  • Report generation

Phase 12: Main CLI

  • Click interface
  • End-to-end orchestration

Phase 13: Testing

  • Unit tests
  • Integration tests
  • Full pipeline test on Marion's inbox

TESTING STRATEGY

Unit Testing

pytest tests/test_classification.py -v

Integration Testing

# Test on 100 emails
python src/main.py --source gmail --credentials creds.json --output test/ --limit 100

# Test on 1000 emails
python src/main.py --source gmail --credentials creds.json --output test/ --limit 1000

Full Pipeline

# Run on Marion's full inbox
python src/main.py --source gmail --credentials marion-creds.json --output results/

CRITICAL NEXT STEPS

  1. DECIDE: ML Model Strategy

    • Option A: Create mock model for immediate testing
    • Option B: Train real model (takes 1-2 days)
  2. Set up Gmail OAuth

    • Google Cloud Console
    • Enable Gmail API
    • Download credentials.json
  3. Install and test Ollama

    • Download from ollama.ai
    • Pull model: ollama pull qwen2.5:1.5b
    • Test: ollama run qwen2.5:1.5b "test"
  4. Continue building

    • Next: Adaptive Classifier
    • Then: Processing Pipeline
    • Then: Full integration

THIS IS THE ACTUAL BUILD GUIDE

Everything in this document provides real, executable steps to build the system.