- Applied Black formatter and isort across entire codebase for professional consistency - Moved implementation scripts (rag-mini.py, rag-tui.py) to bin/ directory for cleaner root - Updated shell scripts to reference new bin/ locations maintaining user compatibility - Added comprehensive linting configuration (.flake8, pyproject.toml) with dedicated .venv-linting - Removed development artifacts (commit_message.txt, GET_STARTED.md duplicate) from root - Consolidated documentation and fixed script references across all guides - Relocated test_fixes.py to proper tests/ directory - Enhanced project structure following Python packaging standards All user commands work identically while improving code organization and beginner accessibility.
453 lines
14 KiB
Python
453 lines
14 KiB
Python
"""
|
||
Integration test to verify all three agents' work integrates properly.
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import tempfile
|
||
from pathlib import Path
|
||
|
||
# Fix Windows encoding
|
||
if sys.platform == "win32":
|
||
os.environ["PYTHONUTF8"] = "1"
|
||
sys.stdout.reconfigure(encoding="utf-8")
|
||
|
||
from mini_rag.chunker import CodeChunker
|
||
from mini_rag.config import RAGConfig
|
||
from mini_rag.indexer import ProjectIndexer
|
||
from mini_rag.ollama_embeddings import OllamaEmbedder as CodeEmbedder
|
||
from mini_rag.query_expander import QueryExpander
|
||
from mini_rag.search import CodeSearcher
|
||
|
||
|
||
def test_chunker():
|
||
"""Test that chunker creates chunks with all required metadata."""
|
||
print("1. Testing Chunker...")
|
||
|
||
# Create test Python file with more substantial content
|
||
test_code = '''"""Test module for integration testing the chunker."""
|
||
|
||
import os
|
||
import sys
|
||
|
||
|
||
class TestClass:
|
||
"""A test class with multiple methods."""
|
||
|
||
def __init__(self):
|
||
"""Initialize the test class."""
|
||
self.value = 42
|
||
self.name = "test"
|
||
|
||
def method_one(self):
|
||
"""First method with some logic."""
|
||
result = self.value * 2
|
||
return result
|
||
|
||
def method_two(self, x):
|
||
"""Second method that takes a parameter."""
|
||
if x > 0:
|
||
return self.value + x
|
||
else:
|
||
return self.value - x
|
||
|
||
def method_three(self):
|
||
"""Third method for testing."""
|
||
data = []
|
||
for i in range(10):
|
||
data.append(i * self.value)
|
||
return data
|
||
|
||
|
||
class AnotherClass:
|
||
"""Another test class."""
|
||
|
||
def __init__(self, name):
|
||
"""Initialize with name."""
|
||
self.name = name
|
||
|
||
def process(self):
|
||
"""Process something."""
|
||
return f"Processing {self.name}"
|
||
|
||
def standalone_function(arg1, arg2):
|
||
"""A standalone function that does something."""
|
||
result = arg1 + arg2
|
||
return result * 2
|
||
|
||
|
||
def another_function():
|
||
"""Another standalone function."""
|
||
data = {"key": "value", "number": 123}
|
||
return data
|
||
'''
|
||
|
||
chunker = CodeChunker(min_chunk_size=1) # Use small chunk size for testing
|
||
chunks = chunker.chunk_file(Path("test.py"), test_code)
|
||
|
||
print(f" Created {len(chunks)} chunks")
|
||
|
||
# Debug: Show what chunks were created
|
||
print(" Chunks created:")
|
||
for chunk in chunks:
|
||
print(
|
||
f" - Type: {chunk.chunk_type}, Name: {chunk.name}, Lines: {chunk.start_line}-{chunk.end_line}"
|
||
)
|
||
|
||
# Check metadata
|
||
issues = []
|
||
for i, chunk in enumerate(chunks):
|
||
if chunk.chunk_index is None:
|
||
issues.append(f"Chunk {i} missing chunk_index")
|
||
if chunk.total_chunks is None:
|
||
issues.append(f"Chunk {i} missing total_chunks")
|
||
if chunk.file_lines is None:
|
||
issues.append(f"Chunk {i} missing file_lines")
|
||
|
||
# Check links (except first/last)
|
||
if i > 0 and chunk.prev_chunk_id is None:
|
||
issues.append(f"Chunk {i} missing prev_chunk_id")
|
||
if i < len(chunks) - 1 and chunk.next_chunk_id is None:
|
||
issues.append(f"Chunk {i} missing next_chunk_id")
|
||
|
||
# Check parent_class for methods
|
||
if chunk.chunk_type == "method" and chunk.parent_class is None:
|
||
issues.append(f"Method chunk {chunk.name} missing parent_class")
|
||
|
||
print(
|
||
f" - Chunk {i}: {chunk.chunk_type} '{chunk.name}' "
|
||
f"[{chunk.chunk_index}/{chunk.total_chunks}] "
|
||
f"prev={chunk.prev_chunk_id} next={chunk.next_chunk_id}"
|
||
)
|
||
|
||
if issues:
|
||
print(" Issues found:")
|
||
for issue in issues:
|
||
print(f" - {issue}")
|
||
else:
|
||
print(" All metadata present")
|
||
|
||
return len(issues) == 0
|
||
|
||
|
||
def test_indexer_storage():
|
||
"""Test that indexer stores the new metadata."""
|
||
print("\n2. Testing Indexer Storage...")
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
project_path = Path(tmpdir)
|
||
|
||
# Create test file
|
||
test_file = project_path / "test.py"
|
||
test_file.write_text(
|
||
"""
|
||
|
||
|
||
class MyClass:
|
||
|
||
def my_method(self):
|
||
return 42
|
||
"""
|
||
)
|
||
|
||
# Index the project with small chunk size for testing
|
||
from mini_rag.chunker import CodeChunker
|
||
|
||
chunker = CodeChunker(min_chunk_size=1)
|
||
indexer = ProjectIndexer(project_path, chunker=chunker)
|
||
stats = indexer.index_project()
|
||
|
||
print(f" Indexed {stats['chunks_created']} chunks")
|
||
|
||
# Check what was stored
|
||
if indexer.table:
|
||
df = indexer.table.to_pandas()
|
||
columns = df.columns.tolist()
|
||
|
||
required_fields = [
|
||
"chunk_id",
|
||
"prev_chunk_id",
|
||
"next_chunk_id",
|
||
"parent_class",
|
||
]
|
||
missing_fields = [f for f in required_fields if f not in columns]
|
||
|
||
if missing_fields:
|
||
print(f" Missing fields in database: {missing_fields}")
|
||
print(f" Current fields: {columns}")
|
||
return False
|
||
else:
|
||
print(" All required fields in database schema")
|
||
|
||
# Check if data is actually stored
|
||
sample = df.iloc[0] if len(df) > 0 else None
|
||
if sample is not None:
|
||
print(f" Sample chunk_id: {sample.get('chunk_id', 'MISSING')}")
|
||
print(f" Sample prev_chunk_id: {sample.get('prev_chunk_id', 'MISSING')}")
|
||
print(f" Sample next_chunk_id: {sample.get('next_chunk_id', 'MISSING')}")
|
||
print(f" Sample parent_class: {sample.get('parent_class', 'MISSING')}")
|
||
|
||
return len(missing_fields) == 0
|
||
|
||
|
||
def test_search_integration():
|
||
"""Test that search uses the new metadata."""
|
||
print("\n3. Testing Search Integration...")
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
project_path = Path(tmpdir)
|
||
|
||
# Create test files with proper content that will create multiple chunks
|
||
(project_path / "math_utils.py").write_text(
|
||
'''"""Math utilities module."""
|
||
|
||
import math
|
||
|
||
|
||
class Calculator:
|
||
"""A simple calculator class."""
|
||
|
||
def __init__(self):
|
||
"""Initialize calculator."""
|
||
self.result = 0
|
||
|
||
def add(self, a, b):
|
||
"""Add two numbers."""
|
||
self.result = a + b
|
||
return self.result
|
||
|
||
def multiply(self, a, b):
|
||
"""Multiply two numbers."""
|
||
self.result = a * b
|
||
return self.result
|
||
|
||
def divide(self, a, b):
|
||
"""Divide two numbers."""
|
||
if b == 0:
|
||
raise ValueError("Cannot divide by zero")
|
||
self.result = a / b
|
||
return self.result
|
||
|
||
|
||
class AdvancedCalculator(Calculator):
|
||
"""Advanced calculator with more operations."""
|
||
|
||
def power(self, a, b):
|
||
"""Raise a to power b."""
|
||
self.result = a ** b
|
||
return self.result
|
||
|
||
def sqrt(self, a):
|
||
"""Calculate square root."""
|
||
self.result = math.sqrt(a)
|
||
return self.result
|
||
|
||
def compute_average(numbers):
|
||
"""Compute average of a list."""
|
||
if not numbers:
|
||
return 0
|
||
return sum(numbers) / len(numbers)
|
||
|
||
|
||
def compute_median(numbers):
|
||
"""Compute median of a list."""
|
||
if not numbers:
|
||
return 0
|
||
sorted_nums = sorted(numbers)
|
||
n = len(sorted_nums)
|
||
if n % 2 == 0:
|
||
return (sorted_nums[n//2-1] + sorted_nums[n//2]) / 2
|
||
return sorted_nums[n//2]
|
||
'''
|
||
)
|
||
|
||
# Index with small chunk size for testing
|
||
chunker = CodeChunker(min_chunk_size=1)
|
||
indexer = ProjectIndexer(project_path, chunker=chunker)
|
||
indexer.index_project()
|
||
|
||
# Search
|
||
searcher = CodeSearcher(project_path)
|
||
|
||
# Test BM25 integration
|
||
results = searcher.search(
|
||
"multiply numbers", top_k=5, semantic_weight=0.3, bm25_weight=0.7
|
||
)
|
||
|
||
if results:
|
||
print(f" BM25 + semantic search returned {len(results)} results")
|
||
for r in results[:2]:
|
||
print(f" - {r.chunk_type} '{r.name}' score={r.score:.3f}")
|
||
else:
|
||
print(" No search results returned")
|
||
return False
|
||
|
||
# Test context retrieval
|
||
print("\n Testing context retrieval...")
|
||
if searcher.table:
|
||
df = searcher.table.to_pandas()
|
||
print(f" Total chunks in DB: {len(df)}")
|
||
|
||
# Find a method/function chunk to test parent context
|
||
method_chunks = df[df["chunk_type"].isin(["method", "function"])]
|
||
if len(method_chunks) > 0:
|
||
method_chunk_id = method_chunks.iloc[0]["chunk_id"]
|
||
context = searcher.get_chunk_context(method_chunk_id)
|
||
|
||
if context["chunk"]:
|
||
print(f" Got main chunk: {context['chunk'].name}")
|
||
if context["prev"]:
|
||
print(f" Got previous chunk: {context['prev'].name}")
|
||
else:
|
||
print(" - No previous chunk (might be first)")
|
||
if context["next"]:
|
||
print(f" Got next chunk: {context['next'].name}")
|
||
else:
|
||
print(" - No next chunk (might be last)")
|
||
if context["parent"]:
|
||
print(f" Got parent chunk: {context['parent'].name}")
|
||
else:
|
||
print(" - No parent chunk")
|
||
|
||
# Test include_context in search
|
||
results_with_context = searcher.search("add", include_context=True, top_k=2)
|
||
if results_with_context:
|
||
print(f" Found {len(results_with_context)} results with context")
|
||
for r in results_with_context:
|
||
# Check if result has context (unused variable removed)
|
||
print(
|
||
f" - {r.name}: context_before={bool(r.context_before)}, "
|
||
f"context_after={bool(r.context_after)}, parent={bool(r.parent_chunk)}"
|
||
)
|
||
|
||
# Check if at least one result has some context
|
||
if any(
|
||
r.context_before or r.context_after or r.parent_chunk
|
||
for r in results_with_context
|
||
):
|
||
print(" Search with context working")
|
||
return True
|
||
else:
|
||
print(" Search returned results but no context attached")
|
||
return False
|
||
else:
|
||
print(" No search results returned")
|
||
return False
|
||
else:
|
||
print(" No method chunks found in database")
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
def test_server():
|
||
"""Test that server still works."""
|
||
print("\n4. Testing Server...")
|
||
|
||
# Just check if we can import and create server instance
|
||
try:
|
||
from mini_rag.server import RAGServer
|
||
|
||
# RAGServer(Path("."), port=7778) # Unused variable removed
|
||
print(" Server can be instantiated")
|
||
return True
|
||
except Exception as e:
|
||
print(f" Server error: {e}")
|
||
return False
|
||
|
||
|
||
def test_new_features():
|
||
"""Test new features: query expansion and smart ranking."""
|
||
print("\n5. Testing New Features (Query Expansion & Smart Ranking)...")
|
||
|
||
try:
|
||
# Test configuration loading
|
||
config = RAGConfig()
|
||
print(" ✅ Configuration loaded successfully")
|
||
print(f" Query expansion enabled: {config.search.expand_queries}")
|
||
print(f" Max expansion terms: {config.llm.max_expansion_terms}")
|
||
|
||
# Test query expander (will use mock if Ollama unavailable)
|
||
expander = QueryExpander(config)
|
||
test_query = "authentication"
|
||
|
||
if expander.is_available():
|
||
expanded = expander.expand_query(test_query)
|
||
print(f" ✅ Query expansion working: '{test_query}' → '{expanded}'")
|
||
else:
|
||
print(" ⚠️ Query expansion offline (Ollama not available)")
|
||
# Test that it still returns original query
|
||
expanded = expander.expand_query(test_query)
|
||
if expanded == test_query:
|
||
print(" ✅ Graceful degradation working: returns original query")
|
||
else:
|
||
print(" ❌ Error: should return original query when offline")
|
||
return False
|
||
|
||
# Test smart ranking (this always works as it's zero-overhead)
|
||
print(" 🧮 Testing smart ranking...")
|
||
|
||
# Create a simple test to verify the method exists and can be called
|
||
with tempfile.TemporaryDirectory() as temp_dir:
|
||
temp_path = Path(temp_dir)
|
||
|
||
# Create a simple test project
|
||
test_file = temp_path / "README.md"
|
||
test_file.write_text("# Test Project\nThis is a test README file.")
|
||
|
||
try:
|
||
searcher = CodeSearcher(temp_path)
|
||
# Test that the _smart_rerank method exists
|
||
if hasattr(searcher, "_smart_rerank"):
|
||
print(" ✅ Smart ranking method available")
|
||
return True
|
||
else:
|
||
print(" ❌ Smart ranking method not found")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f" ❌ Smart ranking test failed: {e}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
print(f" ❌ New features test failed: {e}")
|
||
return False
|
||
|
||
|
||
def main():
|
||
"""Run all integration tests."""
|
||
print("=" * 50)
|
||
print("RAG System Integration Check")
|
||
print("=" * 50)
|
||
|
||
results = {
|
||
"Chunker": test_chunker(),
|
||
"Indexer": test_indexer_storage(),
|
||
"Search": test_search_integration(),
|
||
"Server": test_server(),
|
||
"New Features": test_new_features(),
|
||
}
|
||
|
||
print("\n" + "=" * 50)
|
||
print("SUMMARY:")
|
||
print("=" * 50)
|
||
|
||
all_passed = True
|
||
for component, passed in results.items():
|
||
status = " PASS" if passed else " FAIL"
|
||
print(f"{component}: {status}")
|
||
if not passed:
|
||
all_passed = False
|
||
|
||
if all_passed:
|
||
print("\n All integration tests passed!")
|
||
else:
|
||
print("\n️ Some tests failed - fixes needed!")
|
||
|
||
return all_passed
|
||
|
||
|
||
if __name__ == "__main__":
|
||
success = main()
|
||
sys.exit(0 if success else 1)
|