email-sorter/tools/batch_llm_classifier.py
FSSCoding 8f25e30f52 Rewrite CLAUDE.md and clean project structure
- Rewrote CLAUDE.md with comprehensive development guide
- Archived 20 old docs to docs/archive/
- Added PROJECT_ROADMAP_2025.md with research learnings
- Added CLASSIFICATION_METHODS_COMPARISON.md
- Added SESSION_HANDOVER_20251128.md
- Added tools for analysis (brett_gmail/microsoft analyzers)
- Updated .gitignore for archive folders
- Config changes for local vLLM endpoint
2025-11-28 13:07:27 +11:00

365 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Standalone vLLM Batch Email Classifier
PREREQUISITE: vLLM server must be running at configured endpoint
This is a SEPARATE tool from the main ML classification pipeline.
Use this for:
- One-off batch questions ("find all emails about project X")
- Custom classification criteria not in trained model
- Exploratory analysis with flexible prompts
Use RAG instead for:
- Searching across large email corpus
- Finding specific topics/keywords
- Building knowledge from email content
"""
import time
import asyncio
import logging
import sys
from pathlib import Path
from typing import List, Dict, Any, Optional
import httpx
import click
# Server configuration
VLLM_CONFIG = {
'base_url': 'https://rtx3090.bobai.com.au/v1',
'api_key': 'rtx3090_foxadmin_10_8034ecb47841f45ba1d5f3f5d875c092',
'model': 'qwen3-coder-30b',
'batch_size': 4, # Tested optimal - 100% success, proper batch pooling
'temperature': 0.1,
'max_tokens': 500
}
async def check_vllm_server(base_url: str, api_key: str, model: str) -> bool:
"""Check if vLLM server is running and model is loaded."""
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{base_url}/chat/completions",
json={
"model": model,
"messages": [{"role": "user", "content": "test"}],
"max_tokens": 5
},
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
timeout=10.0
)
return response.status_code == 200
except Exception as e:
print(f"ERROR: vLLM server check failed: {e}")
return False
async def classify_email_async(
client: httpx.AsyncClient,
email: Any,
prompt_template: str,
base_url: str,
api_key: str,
model: str,
temperature: float,
max_tokens: int
) -> Dict[str, Any]:
"""Classify single email using async HTTP request."""
# No semaphore - proper batch pooling instead
try:
# Build prompt with email data
prompt = prompt_template.format(
subject=email.get('subject', 'N/A')[:100],
sender=email.get('sender', 'N/A')[:50],
body_snippet=email.get('body_snippet', '')[:500]
)
response = await client.post(
f"{base_url}/chat/completions",
json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"temperature": temperature,
"max_tokens": max_tokens
},
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
timeout=30.0
)
if response.status_code == 200:
data = response.json()
content = data['choices'][0]['message']['content']
return {
'email_id': email.get('id', 'unknown'),
'subject': email.get('subject', 'N/A')[:60],
'result': content.strip(),
'success': True
}
return {
'email_id': email.get('id', 'unknown'),
'subject': email.get('subject', 'N/A')[:60],
'result': f'HTTP {response.status_code}',
'success': False
}
except Exception as e:
return {
'email_id': email.get('id', 'unknown'),
'subject': email.get('subject', 'N/A')[:60],
'result': f'Error: {str(e)[:100]}',
'success': False
}
async def classify_single_batch(
client: httpx.AsyncClient,
emails: List[Dict[str, Any]],
prompt_template: str,
config: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Classify one batch of emails - send all at once, wait for completion."""
tasks = [
classify_email_async(
client, email, prompt_template,
config['base_url'], config['api_key'], config['model'],
config['temperature'], config['max_tokens']
)
for email in emails
]
results = await asyncio.gather(*tasks)
return results
async def batch_classify_async(
emails: List[Dict[str, Any]],
prompt_template: str,
config: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Classify emails using proper batch pooling."""
batch_size = config['batch_size']
all_results = []
async with httpx.AsyncClient() as client:
# Process in batches - send batch, wait for all to complete, repeat
for batch_start in range(0, len(emails), batch_size):
batch_end = min(batch_start + batch_size, len(emails))
batch_emails = emails[batch_start:batch_end]
batch_results = await classify_single_batch(
client, batch_emails, prompt_template, config
)
all_results.extend(batch_results)
return all_results
def load_emails_from_provider(provider_type: str, credentials: Optional[str], limit: int) -> List[Dict[str, Any]]:
"""Load emails from configured provider."""
# Lazy import to avoid dependency issues
if provider_type == 'enron':
from src.email_providers.enron import EnronProvider
provider = EnronProvider(maildir_path=".")
provider.connect({})
emails = provider.fetch_emails(limit=limit)
provider.disconnect()
# Convert to dict format
return [
{
'id': e.id,
'subject': e.subject,
'sender': e.sender,
'body_snippet': e.body_snippet
}
for e in emails
]
elif provider_type == 'gmail':
from src.email_providers.gmail import GmailProvider
if not credentials:
print("ERROR: Gmail requires --credentials path")
sys.exit(1)
provider = GmailProvider()
provider.connect({'credentials_path': credentials})
emails = provider.fetch_emails(limit=limit)
provider.disconnect()
return [
{
'id': e.id,
'subject': e.subject,
'sender': e.sender,
'body_snippet': e.body_snippet
}
for e in emails
]
else:
print(f"ERROR: Unsupported provider: {provider_type}")
sys.exit(1)
@click.group()
def cli():
"""vLLM Batch Email Classifier - Ask custom questions across email batches."""
pass
@cli.command()
@click.option('--source', type=click.Choice(['gmail', 'enron']), default='enron',
help='Email provider')
@click.option('--credentials', type=click.Path(exists=False),
help='Path to credentials file (for Gmail)')
@click.option('--limit', type=int, default=50,
help='Number of emails to process')
@click.option('--question', type=str, required=True,
help='Question to ask about each email')
@click.option('--output', type=click.Path(), default='batch_results.txt',
help='Output file for results')
def ask(source: str, credentials: Optional[str], limit: int, question: str, output: str):
"""Ask a custom question about a batch of emails."""
print("=" * 80)
print("vLLM BATCH EMAIL CLASSIFIER")
print("=" * 80)
print(f"Question: {question}")
print(f"Source: {source}")
print(f"Batch size: {limit}")
print("=" * 80)
print()
# Check vLLM server
print("Checking vLLM server...")
if not asyncio.run(check_vllm_server(
VLLM_CONFIG['base_url'],
VLLM_CONFIG['api_key'],
VLLM_CONFIG['model']
)):
print()
print("ERROR: vLLM server not available or not responding")
print(f"Expected endpoint: {VLLM_CONFIG['base_url']}")
print(f"Expected model: {VLLM_CONFIG['model']}")
print()
print("PREREQUISITE: Start vLLM server before running this tool")
sys.exit(1)
print(f"✓ vLLM server running ({VLLM_CONFIG['model']})")
print()
# Load emails
print(f"Loading {limit} emails from {source}...")
emails = load_emails_from_provider(source, credentials, limit)
print(f"✓ Loaded {len(emails)} emails")
print()
# Build prompt template (optimized for caching)
prompt_template = f"""You are analyzing emails to answer specific questions.
INSTRUCTIONS:
- Read the email carefully
- Answer the question directly and concisely
- Provide reasoning if helpful
- If the email is not relevant, say "Not relevant"
QUESTION:
{question}
EMAIL TO ANALYZE:
Subject: {{subject}}
From: {{sender}}
Body: {{body_snippet}}
ANSWER:
"""
# Process batch
print(f"Processing {len(emails)} emails with {VLLM_CONFIG['max_concurrent']} concurrent requests...")
start_time = time.time()
results = asyncio.run(batch_classify_async(emails, prompt_template, VLLM_CONFIG))
end_time = time.time()
total_time = end_time - start_time
# Stats
successful = sum(1 for r in results if r['success'])
throughput = len(emails) / total_time
print()
print("=" * 80)
print("RESULTS")
print("=" * 80)
print(f"Total emails: {len(emails)}")
print(f"Successful: {successful}")
print(f"Failed: {len(emails) - successful}")
print(f"Time: {total_time:.1f}s")
print(f"Throughput: {throughput:.2f} emails/sec")
print("=" * 80)
print()
# Save results
with open(output, 'w') as f:
f.write(f"Question: {question}\n")
f.write(f"Processed: {len(emails)} emails in {total_time:.1f}s\n")
f.write("=" * 80 + "\n\n")
for i, result in enumerate(results, 1):
f.write(f"{i}. {result['subject']}\n")
f.write(f" Email ID: {result['email_id']}\n")
f.write(f" Answer: {result['result']}\n")
f.write("\n")
print(f"Results saved to: {output}")
print()
# Show sample
print("SAMPLE RESULTS (first 5):")
for i, result in enumerate(results[:5], 1):
print(f"\n{i}. {result['subject']}")
print(f" {result['result'][:100]}...")
@cli.command()
def check():
"""Check if vLLM server is running and ready."""
print("Checking vLLM server...")
print(f"Endpoint: {VLLM_CONFIG['base_url']}")
print(f"Model: {VLLM_CONFIG['model']}")
print()
if asyncio.run(check_vllm_server(
VLLM_CONFIG['base_url'],
VLLM_CONFIG['api_key'],
VLLM_CONFIG['model']
)):
print("✓ vLLM server is running and ready")
print(f"✓ Max concurrent requests: {VLLM_CONFIG['max_concurrent']}")
print(f"✓ Estimated throughput: ~4.4 emails/sec")
else:
print("✗ vLLM server not available")
print()
print("Start vLLM server before using this tool")
sys.exit(1)
if __name__ == '__main__':
cli()