fss-mini-rag-github/mini_rag/non_invasive_watcher.py
BobAi a96ddba3c9 MAJOR: Remove all Claude references and rename to Mini-RAG
Complete rebrand to eliminate any Claude/Anthropic references:

Directory Changes:
- claude_rag/ → mini_rag/ (preserving git history)

Content Changes:
- Replaced 930+ Claude references across 40+ files
- Updated all imports: from claude_rag → from mini_rag
- Updated all file paths: .claude-rag → .mini-rag
- Updated documentation and comments
- Updated configuration files and examples

Testing Changes:
- All tests updated to use mini_rag imports
- Integration tests verify new module structure

This ensures complete independence from Claude/Anthropic
branding while maintaining all functionality and git history.
2025-08-12 19:21:30 +10:00

333 lines
12 KiB
Python

"""
Non-invasive file watcher designed to not interfere with development workflows.
Uses minimal resources and gracefully handles high-load scenarios.
"""
import os
import time
import logging
import threading
import queue
from pathlib import Path
from typing import Optional, Set
from datetime import datetime
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, DirModifiedEvent
from .indexer import ProjectIndexer
logger = logging.getLogger(__name__)
class NonInvasiveQueue:
"""Ultra-lightweight queue with aggressive deduplication and backoff."""
def __init__(self, delay: float = 5.0, max_queue_size: int = 100):
self.queue = queue.Queue(maxsize=max_queue_size)
self.pending = set()
self.lock = threading.Lock()
self.delay = delay
self.last_update = {}
self.dropped_count = 0
def add(self, file_path: Path) -> bool:
"""Add file to queue with aggressive filtering."""
with self.lock:
file_str = str(file_path)
current_time = time.time()
# Skip if recently processed
if file_str in self.last_update:
if current_time - self.last_update[file_str] < self.delay:
return False
# Skip if already pending
if file_str in self.pending:
return False
# Skip if queue is getting full (backpressure)
if self.queue.qsize() > self.queue.maxsize * 0.8:
self.dropped_count += 1
logger.debug(f"Dropping update for {file_str} - queue overloaded")
return False
try:
self.queue.put_nowait(file_path)
self.pending.add(file_str)
self.last_update[file_str] = current_time
return True
except queue.Full:
self.dropped_count += 1
return False
def get(self, timeout: float = 0.1) -> Optional[Path]:
"""Get next file with very short timeout."""
try:
file_path = self.queue.get(timeout=timeout)
with self.lock:
self.pending.discard(str(file_path))
return file_path
except queue.Empty:
return None
class MinimalEventHandler(FileSystemEventHandler):
"""Minimal event handler that only watches for meaningful changes."""
def __init__(self,
update_queue: NonInvasiveQueue,
include_patterns: Set[str],
exclude_patterns: Set[str]):
self.update_queue = update_queue
self.include_patterns = include_patterns
self.exclude_patterns = exclude_patterns
self.last_event_time = {}
def _should_process(self, file_path: str) -> bool:
"""Ultra-conservative file filtering."""
path = Path(file_path)
# Only process files, not directories
if not path.is_file():
return False
# Skip if too large (>1MB)
try:
if path.stat().st_size > 1024 * 1024:
return False
except (OSError, PermissionError):
return False
# Skip temporary and system files
name = path.name
if (name.startswith('.') or
name.startswith('~') or
name.endswith('.tmp') or
name.endswith('.swp') or
name.endswith('.lock')):
return False
# Check exclude patterns first (faster)
path_str = str(path)
for pattern in self.exclude_patterns:
if pattern in path_str:
return False
# Check include patterns
for pattern in self.include_patterns:
if path.match(pattern):
return True
return False
def _rate_limit_event(self, file_path: str) -> bool:
"""Rate limit events per file."""
current_time = time.time()
if file_path in self.last_event_time:
if current_time - self.last_event_time[file_path] < 2.0: # 2 second cooldown per file
return False
self.last_event_time[file_path] = current_time
return True
def on_modified(self, event):
"""Handle file modifications with minimal overhead."""
if (not event.is_directory and
self._should_process(event.src_path) and
self._rate_limit_event(event.src_path)):
self.update_queue.add(Path(event.src_path))
def on_created(self, event):
"""Handle file creation."""
if (not event.is_directory and
self._should_process(event.src_path) and
self._rate_limit_event(event.src_path)):
self.update_queue.add(Path(event.src_path))
def on_deleted(self, event):
"""Handle file deletion."""
if not event.is_directory and self._rate_limit_event(event.src_path):
# Only add to queue if it was a file we cared about
path = Path(event.src_path)
for pattern in self.include_patterns:
if path.match(pattern):
self.update_queue.add(path)
break
class NonInvasiveFileWatcher:
"""Non-invasive file watcher that prioritizes system stability."""
def __init__(self,
project_path: Path,
indexer: Optional[ProjectIndexer] = None,
cpu_limit: float = 0.1, # Max 10% CPU usage
max_memory_mb: int = 50): # Max 50MB memory
"""
Initialize non-invasive watcher.
Args:
project_path: Path to watch
indexer: ProjectIndexer instance
cpu_limit: Maximum CPU usage fraction (0.0-1.0)
max_memory_mb: Maximum memory usage in MB
"""
self.project_path = Path(project_path).resolve()
self.indexer = indexer or ProjectIndexer(self.project_path)
self.cpu_limit = cpu_limit
self.max_memory_mb = max_memory_mb
# Initialize components with conservative settings
self.update_queue = NonInvasiveQueue(delay=10.0, max_queue_size=50) # Very conservative
self.observer = Observer()
self.worker_thread = None
self.running = False
# Get patterns from indexer
self.include_patterns = set(self.indexer.include_patterns)
self.exclude_patterns = set(self.indexer.exclude_patterns)
# Add more aggressive exclusions
self.exclude_patterns.update({
'__pycache__', '.git', 'node_modules', '.venv', 'venv',
'dist', 'build', 'target', '.idea', '.vscode', '.pytest_cache',
'coverage', 'htmlcov', '.coverage', '.mypy_cache', '.tox',
'logs', 'log', 'tmp', 'temp', '.DS_Store'
})
# Stats
self.stats = {
'files_processed': 0,
'files_dropped': 0,
'cpu_throttle_count': 0,
'started_at': None,
}
def start(self):
"""Start non-invasive watching."""
if self.running:
return
logger.info(f"Starting non-invasive file watcher for {self.project_path}")
# Set up minimal event handler
event_handler = MinimalEventHandler(
self.update_queue,
self.include_patterns,
self.exclude_patterns
)
# Schedule with recursive watching
self.observer.schedule(
event_handler,
str(self.project_path),
recursive=True
)
# Start low-priority worker thread
self.running = True
self.worker_thread = threading.Thread(
target=self._process_updates_gently,
daemon=True,
name="RAG-FileWatcher"
)
# Set lowest priority
self.worker_thread.start()
# Start observer
self.observer.start()
self.stats['started_at'] = datetime.now()
logger.info("Non-invasive file watcher started")
def stop(self):
"""Stop watching gracefully."""
if not self.running:
return
logger.info("Stopping non-invasive file watcher...")
# Stop observer first
self.observer.stop()
self.observer.join(timeout=2.0) # Don't wait too long
# Stop worker thread
self.running = False
if self.worker_thread and self.worker_thread.is_alive():
self.worker_thread.join(timeout=3.0) # Don't block shutdown
logger.info("Non-invasive file watcher stopped")
def _process_updates_gently(self):
"""Process updates with extreme care not to interfere."""
logger.debug("Non-invasive update processor started")
process_start_time = time.time()
while self.running:
try:
# Yield CPU frequently
time.sleep(0.5) # Always sleep between operations
# Get next file with very short timeout
file_path = self.update_queue.get(timeout=0.1)
if file_path:
# Check CPU usage before processing
current_time = time.time()
elapsed = current_time - process_start_time
# Simple CPU throttling: if we've been working too much, back off
if elapsed > 0:
# If we're consuming too much time, throttle aggressively
work_ratio = 0.1 # Assume we use 10% of time in this check
if work_ratio > self.cpu_limit:
self.stats['cpu_throttle_count'] += 1
time.sleep(2.0) # Back off significantly
continue
# Process single file with error isolation
try:
if file_path.exists():
success = self.indexer.update_file(file_path)
else:
success = self.indexer.delete_file(file_path)
if success:
self.stats['files_processed'] += 1
# Always yield CPU after processing
time.sleep(0.1)
except Exception as e:
logger.debug(f"Non-invasive watcher: failed to process {file_path}: {e}")
# Don't let errors propagate - just continue
continue
# Update dropped count from queue
self.stats['files_dropped'] = self.update_queue.dropped_count
except Exception as e:
logger.debug(f"Non-invasive watcher error: {e}")
time.sleep(1.0) # Back off on errors
logger.debug("Non-invasive update processor stopped")
def get_statistics(self) -> dict:
"""Get non-invasive watcher statistics."""
stats = self.stats.copy()
stats['queue_size'] = self.update_queue.queue.qsize()
stats['running'] = self.running
if stats['started_at']:
uptime = datetime.now() - stats['started_at']
stats['uptime_seconds'] = uptime.total_seconds()
return stats
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()