"""End-to-end pipeline tests.""" import pytest import tempfile import json from pathlib import Path from src.utils.config import load_config, load_categories from src.email_providers.base import MockProvider from src.orchestration import EmailSorterOrchestrator from src.llm.ollama import OllamaProvider from src.calibration.sampler import EmailSampler from src.calibration.enron_parser import EnronParser from src.processing.bulk_processor import BulkProcessor from src.export.exporter import ResultsExporter def test_e2e_mock_pipeline(sample_emails, tmp_path): """Test full pipeline with mock provider and sample emails.""" config = load_config() # Override output to temp directory config.export.output_dir = str(tmp_path) config.processing.checkpoint_dir = str(tmp_path / "checkpoints") # Create orchestrator without LLM orchestrator = EmailSorterOrchestrator(config, llm_provider=None) # Run pipeline result = orchestrator.run_full_pipeline( all_emails=sample_emails, sample_size=3, # Small sample for testing resume=False ) # Verify results assert result['success'] is True assert result['total_emails'] == len(sample_emails) assert result['results_processed'] > 0 assert 'export_files' in result # Verify exported files exist assert (tmp_path / 'results.json').exists() assert (tmp_path / 'results.csv').exists() assert (tmp_path / 'report.txt').exists() # Verify JSON structure with open(tmp_path / 'results.json') as f: data = json.load(f) assert 'metadata' in data assert 'classifications' in data assert len(data['classifications']) > 0 def test_e2e_sampling_and_processing(sample_emails): """Test stratified sampling and bulk processing.""" config = load_config() # Sample sampler = EmailSampler() sample, remaining = sampler.stratified_sample(sample_emails, 3) assert len(sample) == 3 assert len(remaining) == len(sample_emails) - 3 assert all(e in sample_emails for e in sample) assert all(e in sample_emails for e in remaining) def test_e2e_export_formats(sample_emails, tmp_path): """Test all export formats.""" from src.classification.feature_extractor import FeatureExtractor from src.classification.ml_classifier import MLClassifier from src.classification.adaptive_classifier import AdaptiveClassifier config = load_config() categories = load_categories() # Setup classifiers feature_extractor = FeatureExtractor() ml_classifier = MLClassifier() adaptive = AdaptiveClassifier( feature_extractor, ml_classifier, None, categories, config.dict() ) # Classify sample emails results = adaptive.classify_batch(sample_emails) # Export exporter = ResultsExporter(str(tmp_path)) json_file = exporter.export_json(results, {'test': True}) csv_file = exporter.export_csv(results) category_dir = exporter.export_by_category(results) assert json_file.exists() assert csv_file.exists() assert category_dir.exists() # Verify JSON with open(json_file) as f: data = json.load(f) assert len(data['classifications']) == len(results) # Verify CSV assert csv_file.stat().st_size > 0 # Verify categories category_files = list(category_dir.glob('*.json')) assert len(category_files) > 0 def test_e2e_checkpoint_resume(sample_emails, tmp_path): """Test checkpoint and resume functionality.""" from src.classification.feature_extractor import FeatureExtractor from src.classification.ml_classifier import MLClassifier from src.classification.adaptive_classifier import AdaptiveClassifier config = load_config() categories = load_categories() checkpoint_dir = str(tmp_path / "checkpoints") # Setup classifiers feature_extractor = FeatureExtractor() ml_classifier = MLClassifier() adaptive = AdaptiveClassifier( feature_extractor, ml_classifier, None, categories, config.dict() ) # First run: process some emails processor = BulkProcessor( adaptive, batch_size=2, checkpoint_dir=checkpoint_dir, checkpoint_interval=2 ) results1, _ = processor.process(sample_emails, resume=False) assert len(results1) > 0 # Second run: resume processor2 = BulkProcessor( adaptive, batch_size=2, checkpoint_dir=checkpoint_dir, checkpoint_interval=2 ) results2, _ = processor2.process(sample_emails, resume=True) # Should complete without errors assert len(results2) == len(results1) def test_e2e_enron_parsing(): """Test Enron dataset parsing.""" enron_path = Path("enron_mail_20150507") if not enron_path.exists(): pytest.skip("Enron dataset not available") try: parser = EnronParser(str(enron_path)) emails = parser.parse_emails(limit=100) assert len(emails) > 0 assert all(e.subject for e in emails) # Should have subjects assert all(e.sender for e in emails) # Should have senders except ValueError: pytest.skip("Enron dataset structure invalid") def test_e2e_hard_rules_accuracy(sample_emails): """Test that hard rules work correctly.""" from src.classification.feature_extractor import FeatureExtractor from src.classification.ml_classifier import MLClassifier from src.classification.adaptive_classifier import AdaptiveClassifier from src.email_providers.base import Email config = load_config() categories = load_categories() feature_extractor = FeatureExtractor() ml_classifier = MLClassifier() adaptive = AdaptiveClassifier( feature_extractor, ml_classifier, None, categories, config.dict() ) # Test auth email hard rule auth_email = Email( id='test-auth', subject='Verify your account', sender='noreply@bank.com', body='Your verification code is 123456' ) result = adaptive.classify(auth_email) # Should be caught by hard rules assert result.category == 'auth' assert result.method == 'rule' assert result.confidence == 0.99 def test_e2e_batch_processing_performance(sample_emails): """Test batch processing performance.""" from src.classification.feature_extractor import FeatureExtractor from src.classification.ml_classifier import MLClassifier from src.classification.adaptive_classifier import AdaptiveClassifier import time config = load_config() categories = load_categories() feature_extractor = FeatureExtractor() ml_classifier = MLClassifier() adaptive = AdaptiveClassifier( feature_extractor, ml_classifier, None, categories, config.dict() ) # Time batch processing start = time.time() results = adaptive.classify_batch(sample_emails) elapsed = time.time() - start assert len(results) == len(sample_emails) assert elapsed < 60 # Should process sample in under 60s # Rough performance: ~N emails per second per_email = elapsed / len(sample_emails) print(f"Performance: {per_email*1000:.1f}ms per email")