#!/usr/bin/env python3 """ Standalone vLLM Batch Email Classifier PREREQUISITE: vLLM server must be running at configured endpoint This is a SEPARATE tool from the main ML classification pipeline. Use this for: - One-off batch questions ("find all emails about project X") - Custom classification criteria not in trained model - Exploratory analysis with flexible prompts Use RAG instead for: - Searching across large email corpus - Finding specific topics/keywords - Building knowledge from email content """ import time import asyncio import logging import sys from pathlib import Path from typing import List, Dict, Any, Optional import httpx import click # Server configuration VLLM_CONFIG = { 'base_url': 'https://rtx3090.bobai.com.au/v1', 'api_key': 'rtx3090_foxadmin_10_8034ecb47841f45ba1d5f3f5d875c092', 'model': 'qwen3-coder-30b', 'batch_size': 4, # Tested optimal - 100% success, proper batch pooling 'temperature': 0.1, 'max_tokens': 500 } async def check_vllm_server(base_url: str, api_key: str, model: str) -> bool: """Check if vLLM server is running and model is loaded.""" try: async with httpx.AsyncClient() as client: response = await client.post( f"{base_url}/chat/completions", json={ "model": model, "messages": [{"role": "user", "content": "test"}], "max_tokens": 5 }, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, timeout=10.0 ) return response.status_code == 200 except Exception as e: print(f"ERROR: vLLM server check failed: {e}") return False async def classify_email_async( client: httpx.AsyncClient, email: Any, prompt_template: str, base_url: str, api_key: str, model: str, temperature: float, max_tokens: int ) -> Dict[str, Any]: """Classify single email using async HTTP request.""" # No semaphore - proper batch pooling instead try: # Build prompt with email data prompt = prompt_template.format( subject=email.get('subject', 'N/A')[:100], sender=email.get('sender', 'N/A')[:50], body_snippet=email.get('body_snippet', '')[:500] ) response = await client.post( f"{base_url}/chat/completions", json={ "model": model, "messages": [{"role": "user", "content": prompt}], "temperature": temperature, "max_tokens": max_tokens }, headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, timeout=30.0 ) if response.status_code == 200: data = response.json() content = data['choices'][0]['message']['content'] return { 'email_id': email.get('id', 'unknown'), 'subject': email.get('subject', 'N/A')[:60], 'result': content.strip(), 'success': True } return { 'email_id': email.get('id', 'unknown'), 'subject': email.get('subject', 'N/A')[:60], 'result': f'HTTP {response.status_code}', 'success': False } except Exception as e: return { 'email_id': email.get('id', 'unknown'), 'subject': email.get('subject', 'N/A')[:60], 'result': f'Error: {str(e)[:100]}', 'success': False } async def classify_single_batch( client: httpx.AsyncClient, emails: List[Dict[str, Any]], prompt_template: str, config: Dict[str, Any] ) -> List[Dict[str, Any]]: """Classify one batch of emails - send all at once, wait for completion.""" tasks = [ classify_email_async( client, email, prompt_template, config['base_url'], config['api_key'], config['model'], config['temperature'], config['max_tokens'] ) for email in emails ] results = await asyncio.gather(*tasks) return results async def batch_classify_async( emails: List[Dict[str, Any]], prompt_template: str, config: Dict[str, Any] ) -> List[Dict[str, Any]]: """Classify emails using proper batch pooling.""" batch_size = config['batch_size'] all_results = [] async with httpx.AsyncClient() as client: # Process in batches - send batch, wait for all to complete, repeat for batch_start in range(0, len(emails), batch_size): batch_end = min(batch_start + batch_size, len(emails)) batch_emails = emails[batch_start:batch_end] batch_results = await classify_single_batch( client, batch_emails, prompt_template, config ) all_results.extend(batch_results) return all_results def load_emails_from_provider(provider_type: str, credentials: Optional[str], limit: int) -> List[Dict[str, Any]]: """Load emails from configured provider.""" # Lazy import to avoid dependency issues if provider_type == 'enron': from src.email_providers.enron import EnronProvider provider = EnronProvider(maildir_path=".") provider.connect({}) emails = provider.fetch_emails(limit=limit) provider.disconnect() # Convert to dict format return [ { 'id': e.id, 'subject': e.subject, 'sender': e.sender, 'body_snippet': e.body_snippet } for e in emails ] elif provider_type == 'gmail': from src.email_providers.gmail import GmailProvider if not credentials: print("ERROR: Gmail requires --credentials path") sys.exit(1) provider = GmailProvider() provider.connect({'credentials_path': credentials}) emails = provider.fetch_emails(limit=limit) provider.disconnect() return [ { 'id': e.id, 'subject': e.subject, 'sender': e.sender, 'body_snippet': e.body_snippet } for e in emails ] else: print(f"ERROR: Unsupported provider: {provider_type}") sys.exit(1) @click.group() def cli(): """vLLM Batch Email Classifier - Ask custom questions across email batches.""" pass @cli.command() @click.option('--source', type=click.Choice(['gmail', 'enron']), default='enron', help='Email provider') @click.option('--credentials', type=click.Path(exists=False), help='Path to credentials file (for Gmail)') @click.option('--limit', type=int, default=50, help='Number of emails to process') @click.option('--question', type=str, required=True, help='Question to ask about each email') @click.option('--output', type=click.Path(), default='batch_results.txt', help='Output file for results') def ask(source: str, credentials: Optional[str], limit: int, question: str, output: str): """Ask a custom question about a batch of emails.""" print("=" * 80) print("vLLM BATCH EMAIL CLASSIFIER") print("=" * 80) print(f"Question: {question}") print(f"Source: {source}") print(f"Batch size: {limit}") print("=" * 80) print() # Check vLLM server print("Checking vLLM server...") if not asyncio.run(check_vllm_server( VLLM_CONFIG['base_url'], VLLM_CONFIG['api_key'], VLLM_CONFIG['model'] )): print() print("ERROR: vLLM server not available or not responding") print(f"Expected endpoint: {VLLM_CONFIG['base_url']}") print(f"Expected model: {VLLM_CONFIG['model']}") print() print("PREREQUISITE: Start vLLM server before running this tool") sys.exit(1) print(f"✓ vLLM server running ({VLLM_CONFIG['model']})") print() # Load emails print(f"Loading {limit} emails from {source}...") emails = load_emails_from_provider(source, credentials, limit) print(f"✓ Loaded {len(emails)} emails") print() # Build prompt template (optimized for caching) prompt_template = f"""You are analyzing emails to answer specific questions. INSTRUCTIONS: - Read the email carefully - Answer the question directly and concisely - Provide reasoning if helpful - If the email is not relevant, say "Not relevant" QUESTION: {question} EMAIL TO ANALYZE: Subject: {{subject}} From: {{sender}} Body: {{body_snippet}} ANSWER: """ # Process batch print(f"Processing {len(emails)} emails with {VLLM_CONFIG['max_concurrent']} concurrent requests...") start_time = time.time() results = asyncio.run(batch_classify_async(emails, prompt_template, VLLM_CONFIG)) end_time = time.time() total_time = end_time - start_time # Stats successful = sum(1 for r in results if r['success']) throughput = len(emails) / total_time print() print("=" * 80) print("RESULTS") print("=" * 80) print(f"Total emails: {len(emails)}") print(f"Successful: {successful}") print(f"Failed: {len(emails) - successful}") print(f"Time: {total_time:.1f}s") print(f"Throughput: {throughput:.2f} emails/sec") print("=" * 80) print() # Save results with open(output, 'w') as f: f.write(f"Question: {question}\n") f.write(f"Processed: {len(emails)} emails in {total_time:.1f}s\n") f.write("=" * 80 + "\n\n") for i, result in enumerate(results, 1): f.write(f"{i}. {result['subject']}\n") f.write(f" Email ID: {result['email_id']}\n") f.write(f" Answer: {result['result']}\n") f.write("\n") print(f"Results saved to: {output}") print() # Show sample print("SAMPLE RESULTS (first 5):") for i, result in enumerate(results[:5], 1): print(f"\n{i}. {result['subject']}") print(f" {result['result'][:100]}...") @cli.command() def check(): """Check if vLLM server is running and ready.""" print("Checking vLLM server...") print(f"Endpoint: {VLLM_CONFIG['base_url']}") print(f"Model: {VLLM_CONFIG['model']}") print() if asyncio.run(check_vllm_server( VLLM_CONFIG['base_url'], VLLM_CONFIG['api_key'], VLLM_CONFIG['model'] )): print("✓ vLLM server is running and ready") print(f"✓ Max concurrent requests: {VLLM_CONFIG['max_concurrent']}") print(f"✓ Estimated throughput: ~4.4 emails/sec") else: print("✗ vLLM server not available") print() print("Start vLLM server before using this tool") sys.exit(1) if __name__ == '__main__': cli()