# EMAIL SORTER - BUILD INSTRUCTIONS **Step-by-Step Implementation Guide** Version: 1.0 Date: 2024-10-21 --- ## PREREQUISITES ### Required Software - Python 3.8 or higher - Git - Ollama (for local LLM) - Text editor / IDE ### Required Accounts - Gmail account (for testing) - Google Cloud Console project (for Gmail API) ### Skills Needed - Python programming - Basic understanding of ML concepts - Command line comfort - OAuth 2.0 basics --- ## IMPLEMENTATION ORDER Build in this exact order. Each phase depends on previous phases. --- ## PHASE 1: PROJECT SETUP ### Step 1: Initialize Git Repository ```bash cd C:\Users\BrettFox\Documents\Claude\email-sorter git init git add . git commit -m "Initial commit - project blueprint" ``` ### Step 2: Create Virtual Environment ```bash # Create venv python -m venv venv # Activate (Windows) venv\Scripts\activate # Activate (Linux/Mac) source venv/bin/activate ``` ### Step 3: Create requirements.txt Already exists, but verify contents: ```txt # Core python-dotenv>=1.0.0 pyyaml>=6.0 pydantic>=2.0.0 # Email Providers google-api-python-client>=2.100.0 google-auth-httplib2>=0.1.1 google-auth-oauthlib>=1.1.0 msal>=1.24.0 imapclient>=2.3.1 # Machine Learning scikit-learn>=1.3.0 xgboost>=2.0.0 lightgbm>=4.0.0 pandas>=2.0.0 numpy>=1.24.0 # LLM Integration ollama>=0.1.0 # Text Processing nltk>=3.8 beautifulsoup4>=4.12.0 lxml>=4.9.0 # Utilities tqdm>=4.66.0 click>=8.1.0 rich>=13.0.0 joblib>=1.3.0 tenacity>=8.2.0 # Testing pytest>=7.4.0 pytest-cov>=4.1.0 pytest-mock>=3.11.0 ``` ### Step 4: Install Dependencies ```bash pip install -r requirements.txt ``` ### Step 5: Create .gitignore ``` # Python __pycache__/ *.py[cod] *$py.class *.so .Python env/ venv/ *.egg-info/ dist/ build/ # Data and Models data/training/ src/models/pretrained/*.pkl *.h5 *.joblib # Credentials .env credentials/ *.json !config/*.json # Logs logs/*.log *.log # IDE .vscode/ .idea/ *.swp # OS .DS_Store Thumbs.db # Checkpoints checkpoints/ *.checkpoint # Results results/ output/ ``` ### Step 6: Create Directory Structure ```bash # Create all directories mkdir -p src/calibration mkdir -p src/classification mkdir -p src/models/pretrained mkdir -p src/email_providers mkdir -p src/processing mkdir -p src/adjustment mkdir -p src/export mkdir -p src/utils mkdir -p tests mkdir -p prompts mkdir -p scripts mkdir -p data/samples mkdir -p logs mkdir -p config # Create __init__.py files touch src/__init__.py touch src/calibration/__init__.py touch src/classification/__init__.py touch src/models/__init__.py touch src/email_providers/__init__.py touch src/processing/__init__.py touch src/adjustment/__init__.py touch src/export/__init__.py touch src/utils/__init__.py touch tests/__init__.py # Windows equivalent: # type nul > src\__init__.py # (repeat for each) ``` --- ## PHASE 2: CORE INFRASTRUCTURE ### Step 7: Config System (src/utils/config.py) Create the configuration loader: ```python """Configuration management.""" import yaml from pathlib import Path from typing import Dict, Any from pydantic import BaseModel class Config(BaseModel): """Main configuration model.""" version: str calibration: Dict[str, Any] processing: Dict[str, Any] classification: Dict[str, Any] llm: Dict[str, Any] email_providers: Dict[str, Any] features: Dict[str, Any] export: Dict[str, Any] logging: Dict[str, Any] cleanup: Dict[str, Any] class Config: extra = "allow" def load_config(config_path: str = "config/default_config.yaml") -> Config: """Load configuration from YAML file.""" with open(config_path, 'r') as f: config_dict = yaml.safe_load(f) return Config(**config_dict) def load_categories(categories_path: str = "config/categories.yaml") -> Dict[str, Dict]: """Load category definitions.""" with open(categories_path, 'r') as f: data = yaml.safe_load(f) return data['categories'] def load_features(features_path: str = "config/features.yaml") -> Dict[str, Any]: """Load feature configuration.""" with open(features_path, 'r') as f: return yaml.safe_load(f) ``` **Test:** ```bash python -c "from src.utils.config import load_config; print(load_config())" ``` ### Step 8: Logging System (src/utils/logging.py) ```python """Logging configuration.""" import logging import sys from pathlib import Path from rich.logging import RichHandler def setup_logging(config: dict): """Setup logging with console and file handlers.""" log_level = config.get('level', 'INFO') log_file = config.get('file', 'logs/email-sorter.log') # Create logs directory Path(log_file).parent.mkdir(parents=True, exist_ok=True) # Create logger logger = logging.getLogger() logger.setLevel(log_level) # Remove existing handlers logger.handlers = [] # Console handler with rich formatting console_handler = RichHandler( rich_tracebacks=True, markup=True, show_time=True, show_path=False ) console_handler.setLevel(log_level) console_formatter = logging.Formatter('%(message)s') console_handler.setFormatter(console_formatter) # File handler file_handler = logging.FileHandler(log_file) file_handler.setLevel(log_level) file_formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(file_formatter) # Add handlers logger.addHandler(console_handler) logger.addHandler(file_handler) return logger ``` ### Step 9: Email Data Models (src/email_providers/base.py) ```python """Base email provider interface and data models.""" from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime from typing import List, Dict, Any, Optional @dataclass class Email: """Unified email data model.""" id: str subject: str sender: str sender_name: Optional[str] = None date: Optional[datetime] = None body: str = "" body_snippet: str = "" has_attachments: bool = False attachments: List[Dict] = field(default_factory=list) headers: Dict = field(default_factory=dict) labels: List[str] = field(default_factory=list) is_read: bool = False def __post_init__(self): """Generate body_snippet if not provided.""" if not self.body_snippet and self.body: self.body_snippet = self.body[:500] class BaseProvider(ABC): """Abstract base class for email providers.""" @abstractmethod def connect(self, credentials: Dict) -> bool: """Establish connection to email provider.""" pass @abstractmethod def fetch_emails(self, limit: int = None, filters: Dict = None) -> List[Email]: """Fetch emails from provider.""" pass @abstractmethod def update_labels(self, email_id: str, labels: List[str]) -> bool: """Update email labels/folders.""" pass @abstractmethod def batch_update(self, updates: List[Dict]) -> bool: """Batch update multiple emails.""" pass @abstractmethod def disconnect(self): """Close connection.""" pass ``` **Test:** ```bash python -c "from src.email_providers.base import Email; e = Email(id='1', subject='Test', sender='test@test.com'); print(e)" ``` --- ## PHASE 3: CONFIGURATION FILES ### Step 10: Create config/default_config.yaml ```yaml version: "1.0.0" calibration: sample_size: 1500 sample_strategy: "stratified" validation_size: 300 min_confidence: 0.6 processing: batch_size: 100 llm_queue_size: 100 parallel_workers: 4 checkpoint_interval: 1000 classification: default_threshold: 0.75 min_threshold: 0.60 max_threshold: 0.90 adjustment_step: 0.05 adjustment_frequency: 1000 category_thresholds: junk: 0.85 auth: 0.80 conversational: 0.65 llm: provider: "ollama" model: "qwen2.5:1.5b" base_url: "http://localhost:11434" temperature: 0.1 max_tokens: 500 timeout: 30 retry_attempts: 3 email_providers: gmail: batch_size: 100 microsoft: batch_size: 100 imap: timeout: 30 batch_size: 50 features: text_features: max_vocab_size: 10000 ngram_range: [1, 2] min_df: 2 max_df: 0.95 export: format: "json" include_confidence: true create_report: true logging: level: "INFO" file: "logs/email-sorter.log" cleanup: delete_temp_files: true delete_repo_after: false ``` ### Step 11: Create config/categories.yaml (See PROJECT_BLUEPRINT.md for full content) ### Step 12: Create config/features.yaml (See PROJECT_BLUEPRINT.md for full content) **Test:** ```bash python -c "from src.utils.config import load_config, load_categories; print(load_config()); print(load_categories())" ``` --- ## PHASE 4: EMAIL PROVIDERS ### Step 13: Gmail Provider (src/email_providers/gmail.py) ```python """Gmail API provider implementation.""" import base64 import logging from typing import List, Dict, Optional from datetime import datetime from email.utils import parsedate_to_datetime from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError from .base import BaseProvider, Email logger = logging.getLogger(__name__) class GmailProvider(BaseProvider): """Gmail API email provider.""" SCOPES = [ 'https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.modify' ] def __init__(self): self.service = None self.user_id = 'me' def connect(self, credentials_path: str) -> bool: """Connect to Gmail API using OAuth credentials.""" try: # For first-time auth flow = InstalledAppFlow.from_client_secrets_file( credentials_path, self.SCOPES ) creds = flow.run_local_server(port=0) self.service = build('gmail', 'v1', credentials=creds) logger.info("Connected to Gmail API") return True except Exception as e: logger.error(f"Failed to connect to Gmail: {e}") return False def fetch_emails(self, limit: int = None, filters: Dict = None) -> List[Email]: """Fetch emails from Gmail.""" emails = [] try: # Build query query = filters.get('query', '') if filters else '' # Get message IDs results = self.service.users().messages().list( userId=self.user_id, q=query, maxResults=min(limit or 500, 500) if limit else 500 ).execute() messages = results.get('messages', []) # Fetch full messages for msg_info in messages: email = self._fetch_message(msg_info['id']) if email: emails.append(email) if limit and len(emails) >= limit: break logger.info(f"Fetched {len(emails)} emails from Gmail") return emails except HttpError as e: logger.error(f"Gmail API error: {e}") return emails def _fetch_message(self, msg_id: str) -> Optional[Email]: """Fetch and parse a single message.""" try: msg = self.service.users().messages().get( userId=self.user_id, id=msg_id, format='full' ).execute() return self._parse_message(msg) except Exception as e: logger.error(f"Error fetching message {msg_id}: {e}") return None def _parse_message(self, msg: Dict) -> Email: """Parse Gmail message into Email object.""" headers = {h['name']: h['value'] for h in msg['payload']['headers']} # Extract body body = self._get_body(msg['payload']) # Parse date date = None if 'Date' in headers: try: date = parsedate_to_datetime(headers['Date']) except: pass # Check attachments has_attachments = False attachments = [] if 'parts' in msg['payload']: for part in msg['payload']['parts']: if part.get('filename'): has_attachments = True attachments.append({ 'filename': part['filename'], 'mime_type': part['mimeType'], 'size': part.get('body', {}).get('size', 0) }) return Email( id=msg['id'], subject=headers.get('Subject', 'No Subject'), sender=headers.get('From', ''), date=date, body=body, has_attachments=has_attachments, attachments=attachments, headers=headers, labels=msg.get('labelIds', []), is_read='UNREAD' not in msg.get('labelIds', []) ) def _get_body(self, payload: Dict) -> str: """Extract email body from payload.""" body = "" if 'body' in payload and 'data' in payload['body']: body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8', errors='ignore') elif 'parts' in payload: for part in payload['parts']: if part['mimeType'] == 'text/plain': if 'data' in part['body']: body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8', errors='ignore') break return body def update_labels(self, email_id: str, labels: List[str]) -> bool: """Update labels for a single email.""" try: self.service.users().messages().modify( userId=self.user_id, id=email_id, body={'addLabelIds': labels} ).execute() return True except Exception as e: logger.error(f"Error updating labels: {e}") return False def batch_update(self, updates: List[Dict]) -> bool: """Batch update multiple emails.""" try: batch_size = 100 for i in range(0, len(updates), batch_size): batch = updates[i:i+batch_size] email_ids = [u['email_id'] for u in batch] labels = list(set([l for u in batch for l in u.get('labels', [])])) self.service.users().messages().batchModify( userId=self.user_id, body={ 'ids': email_ids, 'addLabelIds': labels } ).execute() logger.info(f"Batch updated {len(updates)} emails") return True except Exception as e: logger.error(f"Batch update error: {e}") return False def disconnect(self): """Close connection.""" self.service = None logger.info("Disconnected from Gmail") ``` **Test (requires Gmail OAuth setup):** ```bash # First: Set up OAuth in Google Cloud Console # Download credentials.json python -c "from src.email_providers.gmail import GmailProvider; p = GmailProvider(); p.connect('credentials.json'); emails = p.fetch_emails(limit=10); print(f'Fetched {len(emails)} emails')" ``` --- ## PHASE 5: FEATURE EXTRACTION ### Step 14: Feature Extractor (src/classification/feature_extractor.py) ```python """Feature extraction from emails.""" import re import logging from typing import Dict, List, Any from datetime import datetime from urllib.parse import urlparse import pandas as pd import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from src.email_providers.base import Email logger = logging.getLogger(__name__) class FeatureExtractor: """Extract features from emails for classification.""" def __init__(self, config: Dict = None): """Initialize with feature configuration.""" self.config = config or { 'text_features': { 'max_features': 10000, 'ngram_range': [1, 2], 'min_df': 2, 'max_df': 0.95 } } self.text_vectorizer = None self._initialize_vectorizer() def _initialize_vectorizer(self): """Initialize TF-IDF vectorizer.""" text_config = self.config.get('text_features', {}) self.text_vectorizer = TfidfVectorizer( max_features=text_config.get('max_features', 10000), ngram_range=tuple(text_config.get('ngram_range', [1, 2])), min_df=text_config.get('min_df', 2), max_df=text_config.get('max_df', 0.95), sublinear_tf=True ) def extract(self, email: Email) -> Dict[str, Any]: """ Extract features from a single email. Args: email: Email object Returns: Dictionary of features """ features = {} # Text for TF-IDF features['text'] = f"{email.subject} {email.body_snippet}" # Structural features features.update(self._extract_structural(email)) # Sender features features.update(self._extract_sender(email)) # Pattern features features.update(self._extract_patterns(email)) return features def _extract_structural(self, email: Email) -> Dict[str, Any]: """Extract structural features.""" features = {} # Attachments features['has_attachments'] = email.has_attachments features['attachment_count'] = len(email.attachments) # Links and images body = email.body or email.body_snippet features['link_count'] = len(re.findall(r'https?://', body)) features['image_count'] = len(re.findall(r' Dict[str, Any]: """Extract sender-based features.""" features = {} sender = email.sender if '@' in sender: # Extract domain domain = sender.split('@')[1].lower() features['sender_domain'] = domain # Domain type freemail_domains = {'gmail.com', 'yahoo.com', 'hotmail.com', 'outlook.com', 'icloud.com'} noreply_patterns = ['noreply', 'no-reply', 'donotreply'] marketing_patterns = ['marketing', 'newsletter', 'promo'] if domain in freemail_domains: features['sender_domain_type'] = 'freemail' elif any(p in sender.lower() for p in noreply_patterns): features['sender_domain_type'] = 'noreply' elif any(p in sender.lower() for p in marketing_patterns): features['sender_domain_type'] = 'marketing' else: features['sender_domain_type'] = 'corporate' features['is_noreply'] = any(p in sender.lower() for p in noreply_patterns) else: features['sender_domain'] = 'unknown' features['sender_domain_type'] = 'unknown' features['is_noreply'] = False return features def _extract_patterns(self, email: Email) -> Dict[str, Any]: """Extract pattern-based features.""" features = {} body = (email.body or email.body_snippet).lower() subject = email.subject.lower() combined = f"{subject} {body}" # Common patterns features['has_unsubscribe'] = 'unsubscribe' in combined features['has_otp_pattern'] = bool(re.search(r'\b\d{4,6}\b', combined)) features['has_price'] = bool(re.search(r'\$\d+', combined)) features['has_tracking_pattern'] = bool(re.search(r'tracking\s*(number|#)', combined)) features['has_invoice_pattern'] = bool(re.search(r'(invoice|bill|receipt)\s*#?\d+', combined)) features['has_meeting_pattern'] = bool(re.search(r'(meeting|call|zoom|teams)', combined)) return features def extract_batch(self, emails: List[Email]) -> pd.DataFrame: """ Extract features from batch of emails. Args: emails: List of Email objects Returns: DataFrame with all features """ # Extract features for each email feature_dicts = [self.extract(email) for email in emails] # Convert to DataFrame df = pd.DataFrame(feature_dicts) # Transform text features if vectorizer is fitted if self.text_vectorizer and 'text' in df.columns: if hasattr(self.text_vectorizer, 'vocabulary_'): text_features = self.text_vectorizer.transform(df['text']) text_df = pd.DataFrame( text_features.toarray(), columns=[f"text_{i}" for i in range(text_features.shape[1])] ) df = pd.concat([df.drop('text', axis=1), text_df], axis=1) else: df = df.drop('text', axis=1) return df def fit_text_vectorizer(self, emails: List[Email]): """Fit text vectorizer on corpus.""" texts = [f"{e.subject} {e.body_snippet}" for e in emails] self.text_vectorizer.fit(texts) logger.info(f"Fitted vectorizer with {len(self.text_vectorizer.vocabulary_)} features") ``` **Test:** ```bash # Create mock email and test python -c " from src.email_providers.base import Email from src.classification.feature_extractor import FeatureExtractor from datetime import datetime email = Email( id='1', subject='Meeting at 3pm', sender='john@company.com', date=datetime.now(), body='Let us meet to discuss the project', has_attachments=True ) extractor = FeatureExtractor() features = extractor.extract(email) print(features) " ``` --- ## PHASE 6: ML CLASSIFIER (BLOCKER - NEED MODEL) ### Step 15: ML Classifier Wrapper (src/classification/ml_classifier.py) ```python """ML-based email classifier.""" import logging import pickle from typing import Dict, List, Any import numpy as np from pathlib import Path logger = logging.getLogger(__name__) class MLClassifier: """Wrapper for pre-trained ML classification model.""" def __init__(self, model_path: str = "src/models/pretrained/classifier.pkl"): """Load pre-trained model.""" self.model = None self.label_encoder = None self.categories = [] self.feature_names = [] self._load_model(model_path) def _load_model(self, model_path: str): """Load model from file.""" try: with open(model_path, 'rb') as f: model_data = pickle.load(f) self.model = model_data['model'] self.label_encoder = model_data.get('label_encoder') self.categories = model_data.get('categories', []) self.feature_names = model_data.get('feature_names', []) logger.info(f"Loaded ML model with {len(self.categories)} categories") except FileNotFoundError: logger.warning(f"Model file not found: {model_path}") logger.warning("Will need to train model or use alternative classification") except Exception as e: logger.error(f"Error loading model: {e}") def predict(self, features: np.ndarray) -> Dict[str, Any]: """ Predict category for feature vector. Args: features: Feature vector or DataFrame row Returns: { 'category': str, 'confidence': float, 'probabilities': Dict[str, float] } """ if self.model is None: return { 'category': 'unknown', 'confidence': 0.0, 'probabilities': {}, 'error': 'Model not loaded' } # Get probabilities probs = self.model.predict_proba([features])[0] # Get predicted class pred_class = np.argmax(probs) category = self.categories[pred_class] confidence = float(probs[pred_class]) # All probabilities prob_dict = { self.categories[i]: float(probs[i]) for i in range(len(self.categories)) } return { 'category': category, 'confidence': confidence, 'probabilities': prob_dict } def predict_batch(self, features: np.ndarray) -> List[Dict[str, Any]]: """Predict for batch of feature vectors.""" return [self.predict(f) for f in features] ``` ### ⚠️ CRITICAL: You need to either: **Option A: Create a placeholder model for testing** ```python # scripts/create_mock_model.py import pickle from sklearn.ensemble import RandomForestClassifier import numpy as np # Create dummy model model = RandomForestClassifier(n_estimators=10) X_dummy = np.random.rand(100, 50) y_dummy = np.random.randint(0, 12, 100) model.fit(X_dummy, y_dummy) categories = [ 'junk', 'transactional', 'auth', 'newsletters', 'social', 'automated', 'conversational', 'work', 'personal', 'finance', 'travel', 'unknown' ] model_data = { 'model': model, 'categories': categories, 'feature_names': [f'feature_{i}' for i in range(50)] } with open('src/models/pretrained/classifier.pkl', 'wb') as f: pickle.dump(model_data, f) print("Mock model created!") ``` **Option B: Train a real model (recommended)** See scripts/train_model.py (to be created in next phase) --- ## PHASE 7: LLM INTEGRATION ### Step 16: LLM Classifier (src/classification/llm_classifier.py) ```python """LLM-based email classifier using Ollama.""" import logging import json import re from typing import Dict, List, Any from abc import ABC, abstractmethod logger = logging.getLogger(__name__) class BaseLLMProvider(ABC): """Abstract LLM provider.""" @abstractmethod def complete(self, prompt: str, **kwargs) -> str: pass @abstractmethod def test_connection(self) -> bool: pass class OllamaProvider(BaseLLMProvider): """Ollama local LLM provider.""" def __init__(self, model: str = "qwen2.5:1.5b", base_url: str = "http://localhost:11434"): try: import ollama self.client = ollama.Client(host=base_url) self.model = model logger.info(f"Initialized Ollama provider with model {model}") except ImportError: logger.error("ollama package not installed. Run: pip install ollama") self.client = None except Exception as e: logger.error(f"Failed to initialize Ollama: {e}") self.client = None def complete(self, prompt: str, **kwargs) -> str: if not self.client: raise RuntimeError("Ollama client not available") response = self.client.generate( model=self.model, prompt=prompt, options={ 'temperature': kwargs.get('temperature', 0.1), 'num_predict': kwargs.get('max_tokens', 500) } ) return response['response'] def test_connection(self) -> bool: try: self.client.list() return True except: return False class LLMClassifier: """Email classifier using LLM.""" def __init__(self, provider: BaseLLMProvider, categories: Dict[str, Dict], config: Dict): self.provider = provider self.categories = categories self.config = config self.classification_prompt = self._load_prompt_template() def _load_prompt_template(self) -> str: """Load or create classification prompt.""" # Try to load from file first try: with open('prompts/classification.txt', 'r') as f: return f.read() except FileNotFoundError: # Use default prompt return """You are an expert email classifier. CATEGORIES: {categories} EMAIL: Subject: {subject} From: {sender} Has Attachments: {has_attachments} Body Snippet: {body_snippet} ML Prediction: {ml_prediction} (confidence: {ml_confidence}) Respond with JSON only: {{ "category": "chosen_category", "confidence": 0.85, "reasoning": "brief explanation" }} """ def classify(self, email: Dict[str, Any]) -> Dict[str, Any]: """Classify email using LLM.""" # Build prompt categories_str = "\n".join([ f"- {name}: {info['description']}" for name, info in self.categories.items() ]) ml_pred = email.get('ml_prediction', {}) prompt = self.classification_prompt.format( categories=categories_str, subject=email.get('subject', 'N/A'), sender=email.get('sender', 'N/A'), has_attachments=email.get('has_attachments', False), body_snippet=email.get('body_snippet', '')[:300], ml_prediction=ml_pred.get('category', 'unknown'), ml_confidence=ml_pred.get('confidence', 0.0) ) try: # Get LLM response response = self.provider.complete( prompt, temperature=self.config['llm']['temperature'], max_tokens=self.config['llm']['max_tokens'] ) # Parse JSON response result = self._parse_response(response) return result except Exception as e: logger.error(f"LLM classification failed: {e}") return { 'category': 'unknown', 'confidence': 0.0, 'reasoning': f'Error: {str(e)}', 'error': True } def _parse_response(self, response: str) -> Dict[str, Any]: """Parse LLM JSON response.""" # Try to extract JSON json_match = re.search(r'\{.*\}', response, re.DOTALL) if json_match: try: return json.loads(json_match.group()) except json.JSONDecodeError: pass # Fallback parsing return { 'category': 'unknown', 'confidence': 0.5, 'reasoning': response[:200] } ``` **Test (requires Ollama running):** ```bash # First: Install and start Ollama # ollama pull qwen2.5:1.5b python -c " from src.classification.llm_classifier import OllamaProvider, LLMClassifier from src.utils.config import load_categories, load_config provider = OllamaProvider() categories = load_categories() config = load_config() classifier = LLMClassifier(provider, categories, config) email = { 'subject': 'Your verification code is 123456', 'sender': 'noreply@bank.com', 'has_attachments': False, 'body_snippet': 'Your one-time password is 123456', 'ml_prediction': {'category': 'auth', 'confidence': 0.65} } result = classifier.classify(email) print(result) " ``` --- ## NEXT PHASES Due to length limits, the remaining phases are: ### Phase 8: Adaptive Classifier - Dynamic threshold adjustment - Sender rule learning - Classification orchestration ### Phase 9: Processing Pipeline - Bulk processor - Queue management - Checkpointing ### Phase 10: Calibration System - Email sampling - LLM calibration analysis - Validation ### Phase 11: Export & Sync - Results exporter - Gmail sync - Report generation ### Phase 12: Main CLI - Click interface - End-to-end orchestration ### Phase 13: Testing - Unit tests - Integration tests - Full pipeline test on Marion's inbox --- ## TESTING STRATEGY ### Unit Testing ```bash pytest tests/test_classification.py -v ``` ### Integration Testing ```bash # Test on 100 emails python src/main.py --source gmail --credentials creds.json --output test/ --limit 100 # Test on 1000 emails python src/main.py --source gmail --credentials creds.json --output test/ --limit 1000 ``` ### Full Pipeline ```bash # Run on Marion's full inbox python src/main.py --source gmail --credentials marion-creds.json --output results/ ``` --- ## CRITICAL NEXT STEPS 1. **DECIDE: ML Model Strategy** - Option A: Create mock model for immediate testing - Option B: Train real model (takes 1-2 days) 2. **Set up Gmail OAuth** - Google Cloud Console - Enable Gmail API - Download credentials.json 3. **Install and test Ollama** - Download from ollama.ai - Pull model: `ollama pull qwen2.5:1.5b` - Test: `ollama run qwen2.5:1.5b "test"` 4. **Continue building** - Next: Adaptive Classifier - Then: Processing Pipeline - Then: Full integration --- **THIS IS THE ACTUAL BUILD GUIDE** Everything in this document provides real, executable steps to build the system.