Add local file provider for .msg and .eml email files

- Created LocalFileParser for parsing Outlook .msg and .eml files
- Created LocalFileProvider implementing BaseProvider interface
- Updated CLI to support --source local --directory path
- Supports recursive directory scanning
- Parses 952 emails in ~3 seconds

Enables classification of local email file archives without needing
email account credentials.
This commit is contained in:
FSSCoding 2025-11-14 17:13:10 +11:00
parent 10862583ad
commit 4eee962c09
4 changed files with 5738 additions and 1 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,266 @@
"""Parse local email files (.msg and .eml formats)."""
import logging
import email.message
import email.parser
from pathlib import Path
from typing import List, Optional
from datetime import datetime
from email.utils import parsedate_to_datetime
import extract_msg
from src.email_providers.base import Email, Attachment
logger = logging.getLogger(__name__)
class LocalFileParser:
"""
Parse local email files in .msg (Outlook) and .eml formats.
Supports:
- Single directory with email files
- Nested directory structure
- Mixed .msg and .eml files
"""
def __init__(self, directory_path: str):
"""Initialize local file parser."""
self.directory_path = Path(directory_path)
if not self.directory_path.exists():
raise ValueError(f"Directory path not found: {self.directory_path}")
if not self.directory_path.is_dir():
raise ValueError(f"Path is not a directory: {self.directory_path}")
logger.info(f"Initialized local file parser: {self.directory_path}")
def parse_emails(self, limit: Optional[int] = None) -> List[Email]:
"""
Parse emails from directory (including subdirectories).
Args:
limit: Maximum number of emails to parse
Returns:
List of Email objects
"""
emails = []
email_count = 0
logger.info(f"Starting local file parsing (limit: {limit})")
# Find all .msg and .eml files recursively
msg_files = list(self.directory_path.rglob("*.msg"))
eml_files = list(self.directory_path.rglob("*.eml"))
all_files = sorted(msg_files + eml_files)
logger.info(f"Found {len(msg_files)} .msg files and {len(eml_files)} .eml files")
for email_file in all_files:
try:
if email_file.suffix.lower() == '.msg':
parsed_email = self._parse_msg_file(email_file)
elif email_file.suffix.lower() == '.eml':
parsed_email = self._parse_eml_file(email_file)
else:
continue
if parsed_email:
emails.append(parsed_email)
email_count += 1
if limit and email_count >= limit:
logger.info(f"Reached limit: {email_count} emails parsed")
return emails
if email_count % 100 == 0:
logger.info(f"Progress: {email_count} emails parsed")
except Exception as e:
logger.debug(f"Error parsing {email_file}: {e}")
logger.info(f"Parsing complete: {email_count} emails")
return emails
def _parse_msg_file(self, filepath: Path) -> Optional[Email]:
"""Parse Outlook .msg file using extract-msg."""
try:
msg = extract_msg.Message(str(filepath))
# Extract basic info
msg_id = str(filepath).replace('/', '_').replace('\\', '_')
subject = msg.subject or 'No Subject'
sender = msg.sender or ''
sender_name = None # extract-msg doesn't provide senderName attribute
# Parse date
date = None
if msg.date:
try:
# extract-msg returns datetime object
if isinstance(msg.date, datetime):
date = msg.date
else:
# Try parsing string
date = parsedate_to_datetime(str(msg.date))
except Exception:
pass
# Extract body
body = msg.body or ""
body_snippet = body[:500] if body else ""
# Extract attachments
attachments = []
has_attachments = False
if msg.attachments:
has_attachments = True
for att in msg.attachments:
try:
attachments.append(Attachment(
filename=att.longFilename or att.shortFilename or "unknown",
mime_type=att.mimetype or "application/octet-stream",
size=len(att.data) if att.data else 0
))
except Exception:
pass
# Get relative folder path
rel_path = filepath.relative_to(self.directory_path)
folder_name = str(rel_path.parent) if rel_path.parent != Path('.') else 'root'
msg.close()
return Email(
id=msg_id,
subject=subject,
sender=sender,
sender_name=sender_name,
date=date,
body=body,
body_snippet=body_snippet,
has_attachments=has_attachments,
attachments=attachments,
provider='local_msg',
headers={'X-Folder': folder_name, 'X-File': str(filepath)}
)
except Exception as e:
logger.debug(f"Error parsing MSG file {filepath}: {e}")
return None
def _parse_eml_file(self, filepath: Path) -> Optional[Email]:
"""Parse .eml file using Python email library."""
try:
with open(filepath, 'rb') as f:
msg = email.message_from_bytes(f.read())
# Get relative folder path
rel_path = filepath.relative_to(self.directory_path)
folder_name = str(rel_path.parent) if rel_path.parent != Path('.') else 'root'
# Extract basic info
msg_id = str(filepath).replace('/', '_').replace('\\', '_')
subject = msg.get('subject', 'No Subject')
sender = msg.get('from', '')
date_str = msg.get('date')
# Parse sender name if available
sender_name = None
if sender:
try:
from email.utils import parseaddr
name, addr = parseaddr(sender)
if name:
sender_name = name
sender = addr
except Exception:
pass
# Parse date
date = None
if date_str:
try:
date = parsedate_to_datetime(date_str)
except Exception:
pass
# Extract body
body = self._extract_body(msg)
body_snippet = body[:500] if body else ""
# Extract attachments
attachments = []
has_attachments = self._has_attachments(msg)
if has_attachments:
for part in msg.walk():
if part.get_content_disposition() == 'attachment':
filename = part.get_filename()
if filename:
try:
attachments.append(Attachment(
filename=filename,
mime_type=part.get_content_type(),
size=len(part.get_payload(decode=True) or b'')
))
except Exception:
pass
return Email(
id=msg_id,
subject=subject,
sender=sender,
sender_name=sender_name,
date=date,
body=body,
body_snippet=body_snippet,
has_attachments=has_attachments,
attachments=attachments,
provider='local_eml',
headers={'X-Folder': folder_name, 'X-File': str(filepath)}
)
except Exception as e:
logger.debug(f"Error parsing EML file {filepath}: {e}")
return None
def _extract_body(self, msg: email.message.Message) -> str:
"""Extract email body from EML message."""
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == 'text/plain':
try:
payload = part.get_payload(decode=True)
if payload:
body = payload.decode('utf-8', errors='ignore')
break
except Exception:
pass
else:
try:
payload = msg.get_payload(decode=True)
if payload:
body = payload.decode('utf-8', errors='ignore')
else:
body = msg.get_payload(decode=False)
if isinstance(body, str):
pass
else:
body = str(body)
except Exception:
pass
return body.strip() if isinstance(body, str) else ""
def _has_attachments(self, msg: email.message.Message) -> bool:
"""Check if EML message has attachments."""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_disposition() == 'attachment':
if part.get_filename():
return True
return False

View File

@ -13,6 +13,7 @@ from src.email_providers.gmail import GmailProvider
from src.email_providers.imap import IMAPProvider from src.email_providers.imap import IMAPProvider
from src.email_providers.enron import EnronProvider from src.email_providers.enron import EnronProvider
from src.email_providers.outlook import OutlookProvider from src.email_providers.outlook import OutlookProvider
from src.email_providers.local_file import LocalFileProvider
from src.classification.feature_extractor import FeatureExtractor from src.classification.feature_extractor import FeatureExtractor
from src.classification.ml_classifier import MLClassifier from src.classification.ml_classifier import MLClassifier
from src.classification.llm_classifier import LLMClassifier from src.classification.llm_classifier import LLMClassifier
@ -28,10 +29,12 @@ def cli():
@cli.command() @cli.command()
@click.option('--source', type=click.Choice(['gmail', 'outlook', 'imap', 'mock', 'enron']), default='mock', @click.option('--source', type=click.Choice(['gmail', 'outlook', 'imap', 'mock', 'enron', 'local']), default='mock',
help='Email provider') help='Email provider')
@click.option('--credentials', type=click.Path(exists=False), @click.option('--credentials', type=click.Path(exists=False),
help='Path to credentials file') help='Path to credentials file')
@click.option('--directory', type=click.Path(exists=True),
help='Directory path for local file provider (.msg/.eml files)')
@click.option('--output', type=click.Path(), default='results/', @click.option('--output', type=click.Path(), default='results/',
help='Output directory') help='Output directory')
@click.option('--config', type=click.Path(exists=False), default='config/default_config.yaml', @click.option('--config', type=click.Path(exists=False), default='config/default_config.yaml',
@ -53,6 +56,7 @@ def cli():
def run( def run(
source: str, source: str,
credentials: Optional[str], credentials: Optional[str],
directory: Optional[str],
output: str, output: str,
config: str, config: str,
limit: Optional[int], limit: Optional[int],
@ -99,6 +103,12 @@ def run(
elif source == 'enron': elif source == 'enron':
provider = EnronProvider(maildir_path=".") provider = EnronProvider(maildir_path=".")
credentials = None credentials = None
elif source == 'local':
if not directory:
logger.error("Local file provider requires --directory")
sys.exit(1)
provider = LocalFileProvider(directory_path=directory)
credentials = None
else: # mock else: # mock
logger.warning("Using MOCK provider for testing") logger.warning("Using MOCK provider for testing")
provider = MockProvider() provider = MockProvider()

View File

@ -0,0 +1,104 @@
"""Local file provider - for .msg and .eml files."""
import logging
from typing import List, Dict, Optional
from .base import BaseProvider, Email
from src.calibration.local_file_parser import LocalFileParser
logger = logging.getLogger(__name__)
class LocalFileProvider(BaseProvider):
"""
Local file provider for .msg and .eml files.
Supports:
- Single directory with email files
- Nested directory structure
- Mixed .msg (Outlook) and .eml formats
Uses the same Email data model and BaseProvider interface as other providers.
"""
def __init__(self, directory_path: str):
"""
Initialize local file provider.
Args:
directory_path: Path to directory containing email files
"""
super().__init__(name="local_file")
self.parser = LocalFileParser(directory_path)
self.connected = False
def connect(self, credentials: Dict = None) -> bool:
"""
Connect to local file provider (no auth needed).
Args:
credentials: Not used for local files
Returns:
Always True for local files
"""
self.connected = True
logger.info("Connected to local file provider")
return True
def disconnect(self) -> bool:
"""Disconnect from local file provider."""
self.connected = False
logger.info("Disconnected from local file provider")
return True
def fetch_emails(self, limit: int = None, filters: Dict = None) -> List[Email]:
"""
Fetch emails from local directory.
Args:
limit: Maximum number of emails to fetch
filters: Optional filters (not implemented for local files)
Returns:
List of Email objects
"""
if not self.connected:
logger.warning("Not connected to local file provider")
return []
logger.info(f"Fetching up to {limit or 'all'} emails from local files")
emails = self.parser.parse_emails(limit=limit)
logger.info(f"Fetched {len(emails)} emails")
return emails
def update_labels(self, email_id: str, labels: List[str]) -> bool:
"""
Update labels (not supported for local files).
Args:
email_id: Email ID
labels: List of labels to add
Returns:
Always False for local files
"""
logger.warning("Label updates not supported for local file provider")
return False
def batch_update(self, updates: List[Dict]) -> bool:
"""
Batch update (not supported for local files).
Args:
updates: List of update operations
Returns:
Always False for local files
"""
logger.warning("Batch updates not supported for local file provider")
return False
def is_connected(self) -> bool:
"""Check if provider is connected."""
return self.connected