diff --git a/config/default_config.yaml b/config/default_config.yaml index 4fdae8e..3ba518d 100644 --- a/config/default_config.yaml +++ b/config/default_config.yaml @@ -32,7 +32,8 @@ llm: ollama: base_url: "http://localhost:11434" - calibration_model: "qwen3:8b-q4_K_M" + calibration_model: "qwen3:1.7b" + consolidation_model: "qwen3:8b-q4_K_M" # Larger model needed for JSON consolidation classification_model: "qwen3:1.7b" temperature: 0.1 max_tokens: 2000 diff --git a/src/calibration/llm_analyzer.py b/src/calibration/llm_analyzer.py index dc03ff4..64f9983 100644 --- a/src/calibration/llm_analyzer.py +++ b/src/calibration/llm_analyzer.py @@ -90,8 +90,10 @@ class CalibrationAnalyzer: # Step 2: Consolidate overlapping/duplicate categories if len(discovered_categories) > 10: # Only consolidate if too many categories logger.info(f"Consolidating {len(discovered_categories)} categories...") - consolidated = self._consolidate_categories(discovered_categories, email_labels) - if len(consolidated) < len(discovered_categories): + # Use consolidation LLM if provided (larger model for structured output) + consolidation_llm = self.config.get('consolidation_llm', self.llm_provider) + consolidated = self._consolidate_categories(discovered_categories, email_labels, llm_provider=consolidation_llm) + if consolidated and len(consolidated) < len(discovered_categories): discovered_categories = consolidated logger.info(f"After consolidation: {len(discovered_categories)} categories") else: @@ -281,7 +283,8 @@ JSON: def _consolidate_categories( self, discovered_categories: Dict[str, str], - email_labels: List[Tuple[str, str]] + email_labels: List[Tuple[str, str]], + llm_provider=None ) -> Dict[str, str]: """ Consolidate overlapping/duplicate categories using LLM. @@ -430,7 +433,9 @@ JSON: """ try: - response = self.llm_provider.complete( + # Use provided LLM or fall back to self.llm_provider + provider = llm_provider or self.llm_provider + response = provider.complete( prompt, temperature=temperature, max_tokens=3000 diff --git a/src/calibration/trainer.py b/src/calibration/trainer.py index 62aa025..20df3ec 100644 --- a/src/calibration/trainer.py +++ b/src/calibration/trainer.py @@ -102,6 +102,7 @@ class ModelTrainer: # Optional validation data eval_set = None + val_names = None if validation_emails: logger.info(f"Preparing validation set with {len(validation_emails)} emails") X_val_list = [] @@ -120,7 +121,8 @@ class ModelTrainer: if X_val_list: X_val = np.array(X_val_list) y_val = np.array(y_val_list) - eval_set = [(lgb.Dataset(X_val, label=y_val, reference=train_data), 'valid')] + eval_set = [lgb.Dataset(X_val, label=y_val, reference=train_data)] + val_names = ['valid'] # Train model logger.info("Training LightGBM classifier...") @@ -144,9 +146,9 @@ class ModelTrainer: train_data, num_boost_round=n_estimators, valid_sets=eval_set, - valid_names=['valid'] if eval_set else None, + valid_names=val_names, callbacks=[ - lgb.log_evaluation(logger, period=50) if eval_set else None, + lgb.log_evaluation(period=50) ] if eval_set else None ) diff --git a/src/calibration/workflow.py b/src/calibration/workflow.py index 373a3c4..9460cb3 100644 --- a/src/calibration/workflow.py +++ b/src/calibration/workflow.py @@ -41,16 +41,22 @@ class CalibrationWorkflow: llm_provider: BaseLLMProvider, feature_extractor: FeatureExtractor, categories: Dict[str, Dict], - config: CalibrationConfig = None + config: CalibrationConfig = None, + consolidation_llm_provider: BaseLLMProvider = None ): """Initialize calibration workflow.""" self.llm_provider = llm_provider + self.consolidation_llm_provider = consolidation_llm_provider or llm_provider self.feature_extractor = feature_extractor self.categories = list(categories.keys()) self.config = config or CalibrationConfig() self.sampler = EmailSampler() - self.analyzer = CalibrationAnalyzer(llm_provider, {}, embedding_model=feature_extractor.embedder) + self.analyzer = CalibrationAnalyzer( + llm_provider, + {'consolidation_llm': self.consolidation_llm_provider}, + embedding_model=feature_extractor.embedder + ) self.trainer = ModelTrainer(feature_extractor, self.categories) self.results = {} @@ -98,8 +104,10 @@ class CalibrationWorkflow: # Create lookup for LLM labels label_map = {email_id: category for email_id, category in sample_labels} - # Update categories to include discovered ones - all_categories = list(set(self.categories) | set(discovered_categories.keys())) + # Update categories to include ALL categories from labels (not just discovered_categories dict) + # This ensures we include categories that were ambiguous and kept their original names + label_categories = set(category for _, category in sample_labels) + all_categories = list(set(self.categories) | set(discovered_categories.keys()) | label_categories) logger.info(f"Using categories: {all_categories}") # Update trainer with discovered categories diff --git a/src/classification/feature_extractor.py b/src/classification/feature_extractor.py index 098e337..0d9c6a7 100644 --- a/src/classification/feature_extractor.py +++ b/src/classification/feature_extractor.py @@ -230,6 +230,57 @@ class FeatureExtractor: return features + def extract_batch(self, emails: List[Email], batch_size: int = 512) -> List[Dict[str, Any]]: + """ + Extract features from multiple emails with batched embeddings. + + Much faster than calling extract() in a loop because embeddings are batched. + """ + if not emails: + return [] + + # Extract all non-embedding features first + all_features = [] + texts_to_embed = [] + + for email in emails: + features = {} + features['subject'] = email.subject + features['body_snippet'] = email.body_snippet + features['full_body'] = email.body + features.update(self._extract_structural(email)) + features.update(self._extract_sender(email)) + features.update(self._extract_patterns(email)) + all_features.append(features) + texts_to_embed.append(self._build_embedding_text(email)) + + # Batch embed all texts + if self.embedder: + try: + # Process in batches + embeddings = [] + for i in range(0, len(texts_to_embed), batch_size): + batch = texts_to_embed[i:i + batch_size] + response = self.embedder.embed( + model='all-minilm:l6-v2', + input=batch + ) + embeddings.extend(response['embeddings']) + + # Add embeddings to features + for features, embedding in zip(all_features, embeddings): + features['embedding'] = np.array(embedding, dtype=np.float32) + + except Exception as e: + logger.error(f"Batch embedding failed: {e}, falling back to zeros") + for features in all_features: + features['embedding'] = np.zeros(384) + else: + for features in all_features: + features['embedding'] = np.zeros(384) + + return all_features + def _extract_embedding(self, email: Email) -> np.ndarray: """ Generate semantic embedding for email using Ollama. @@ -244,12 +295,12 @@ class FeatureExtractor: # Build structured text for embedding text = self._build_embedding_text(email) - # Get embedding from Ollama - response = self.embedder.embeddings( + # Get embedding from Ollama (use new embed API) + response = self.embedder.embed( model='all-minilm:l6-v2', - prompt=text + input=text ) - embedding = np.array(response['embedding'], dtype=np.float32) + embedding = np.array(response['embeddings'][0], dtype=np.float32) return embedding except Exception as e: logger.error(f"Error generating embedding: {e}") @@ -281,27 +332,6 @@ body: {email.body_snippet[:300]} """ return text - def extract_batch(self, emails: List[Email]) -> Optional[Any]: - """Extract features from batch of emails.""" - if not pd: - logger.error("pandas not available for batch extraction") - return None - - try: - feature_dicts = [] - for email in emails: - features = self.extract(email) - feature_dicts.append(features) - - # Convert to DataFrame - df = pd.DataFrame(feature_dicts) - logger.info(f"Extracted features for {len(df)} emails ({df.shape[1]} features)") - return df - - except Exception as e: - logger.error(f"Error in batch extraction: {e}") - return None - def fit_text_vectorizer(self, emails: List[Email]) -> bool: """Fit TF-IDF vectorizer on email corpus.""" if not self.text_vectorizer: diff --git a/src/cli.py b/src/cli.py index fef4334..d12e029 100644 --- a/src/cli.py +++ b/src/cli.py @@ -146,7 +146,7 @@ def run( from src.calibration.workflow import CalibrationWorkflow, CalibrationConfig - # Create calibration LLM provider with larger model + # Create calibration LLM provider with smaller model calibration_llm = OllamaProvider( base_url=cfg.llm.ollama.base_url, model=cfg.llm.ollama.calibration_model, @@ -155,6 +155,16 @@ def run( ) logger.info(f"Using calibration model: {cfg.llm.ollama.calibration_model}") + # Create consolidation LLM provider with larger model (needs structured JSON output) + consolidation_model = getattr(cfg.llm.ollama, 'consolidation_model', cfg.llm.ollama.calibration_model) + consolidation_llm = OllamaProvider( + base_url=cfg.llm.ollama.base_url, + model=consolidation_model, + temperature=cfg.llm.ollama.temperature, + max_tokens=cfg.llm.ollama.max_tokens + ) + logger.info(f"Using consolidation model: {consolidation_model}") + calibration_config = CalibrationConfig( sample_size=min(1500, len(emails) // 2), # Use 1500 or half the emails validation_size=300, @@ -163,6 +173,7 @@ def run( calibration = CalibrationWorkflow( llm_provider=calibration_llm, + consolidation_llm_provider=consolidation_llm, feature_extractor=feature_extractor, categories=categories, config=calibration_config diff --git a/src/utils/config.py b/src/utils/config.py index c3c5d4c..f6d8581 100644 --- a/src/utils/config.py +++ b/src/utils/config.py @@ -39,7 +39,8 @@ class ClassificationConfig(BaseModel): class OllamaConfig(BaseModel): """Ollama LLM provider configuration.""" base_url: str = "http://localhost:11434" - calibration_model: str = "qwen3:4b" + calibration_model: str = "qwen3:1.7b" # Changed from 4b to 1.7b for speed testing + consolidation_model: str = "qwen3:8b-q4_K_M" # Larger model for structured JSON output classification_model: str = "qwen3:1.7b" temperature: float = 0.1 max_tokens: int = 500