Complete rebrand to eliminate any Claude/Anthropic references: Directory Changes: - claude_rag/ → mini_rag/ (preserving git history) Content Changes: - Replaced 930+ Claude references across 40+ files - Updated all imports: from claude_rag → from mini_rag - Updated all file paths: .claude-rag → .mini-rag - Updated documentation and comments - Updated configuration files and examples Testing Changes: - All tests updated to use mini_rag imports - Integration tests verify new module structure This ensures complete independence from Claude/Anthropic branding while maintaining all functionality and git history.
778 lines
29 KiB
Python
778 lines
29 KiB
Python
"""
|
|
Fast semantic search using LanceDB.
|
|
Optimized for code search with relevance scoring.
|
|
"""
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
import numpy as np
|
|
import pandas as pd
|
|
import lancedb
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
from rich.syntax import Syntax
|
|
from rank_bm25 import BM25Okapi
|
|
from collections import defaultdict
|
|
|
|
from .ollama_embeddings import OllamaEmbedder as CodeEmbedder
|
|
from .path_handler import display_path
|
|
from .query_expander import QueryExpander
|
|
from .config import ConfigManager
|
|
from datetime import datetime, timedelta
|
|
|
|
logger = logging.getLogger(__name__)
|
|
console = Console()
|
|
|
|
|
|
class SearchResult:
|
|
"""Represents a single search result."""
|
|
|
|
def __init__(self,
|
|
file_path: str,
|
|
content: str,
|
|
score: float,
|
|
start_line: int,
|
|
end_line: int,
|
|
chunk_type: str,
|
|
name: str,
|
|
language: str,
|
|
context_before: Optional[str] = None,
|
|
context_after: Optional[str] = None,
|
|
parent_chunk: Optional['SearchResult'] = None):
|
|
self.file_path = file_path
|
|
self.content = content
|
|
self.score = score
|
|
self.start_line = start_line
|
|
self.end_line = end_line
|
|
self.chunk_type = chunk_type
|
|
self.name = name
|
|
self.language = language
|
|
self.context_before = context_before
|
|
self.context_after = context_after
|
|
self.parent_chunk = parent_chunk
|
|
|
|
def __repr__(self):
|
|
return f"SearchResult({self.file_path}:{self.start_line}-{self.end_line}, score={self.score:.3f})"
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convert to dictionary."""
|
|
return {
|
|
'file_path': self.file_path,
|
|
'content': self.content,
|
|
'score': self.score,
|
|
'start_line': self.start_line,
|
|
'end_line': self.end_line,
|
|
'chunk_type': self.chunk_type,
|
|
'name': self.name,
|
|
'language': self.language,
|
|
'context_before': self.context_before,
|
|
'context_after': self.context_after,
|
|
'parent_chunk': self.parent_chunk.to_dict() if self.parent_chunk else None,
|
|
}
|
|
|
|
def format_for_display(self, max_lines: int = 10) -> str:
|
|
"""Format for display with syntax highlighting."""
|
|
lines = self.content.splitlines()
|
|
if len(lines) > max_lines:
|
|
# Show first and last few lines
|
|
half = max_lines // 2
|
|
lines = lines[:half] + ['...'] + lines[-half:]
|
|
|
|
return '\n'.join(lines)
|
|
|
|
|
|
class CodeSearcher:
|
|
"""Semantic code search using vector similarity."""
|
|
|
|
def __init__(self,
|
|
project_path: Path,
|
|
embedder: Optional[CodeEmbedder] = None):
|
|
"""
|
|
Initialize searcher.
|
|
|
|
Args:
|
|
project_path: Path to the project
|
|
embedder: CodeEmbedder instance (creates one if not provided)
|
|
"""
|
|
self.project_path = Path(project_path).resolve()
|
|
self.rag_dir = self.project_path / '.mini-rag'
|
|
self.embedder = embedder or CodeEmbedder()
|
|
|
|
# Load configuration and initialize query expander
|
|
config_manager = ConfigManager(project_path)
|
|
self.config = config_manager.load_config()
|
|
self.query_expander = QueryExpander(self.config)
|
|
|
|
# Initialize database connection
|
|
self.db = None
|
|
self.table = None
|
|
self.bm25 = None
|
|
self.chunk_texts = []
|
|
self.chunk_ids = []
|
|
self._connect()
|
|
self._build_bm25_index()
|
|
|
|
def _connect(self):
|
|
"""Connect to the LanceDB database."""
|
|
try:
|
|
if not self.rag_dir.exists():
|
|
raise FileNotFoundError(f"No RAG index found at {self.rag_dir}")
|
|
|
|
self.db = lancedb.connect(self.rag_dir)
|
|
|
|
if "code_vectors" not in self.db.table_names():
|
|
raise ValueError("No code_vectors table found. Run indexing first.")
|
|
|
|
self.table = self.db.open_table("code_vectors")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to database: {e}")
|
|
raise
|
|
|
|
def _build_bm25_index(self):
|
|
"""Build BM25 index from all chunks in the database."""
|
|
if not self.table:
|
|
return
|
|
|
|
try:
|
|
# Load all chunks into memory for BM25
|
|
df = self.table.to_pandas()
|
|
|
|
# Prepare texts for BM25 by combining content with metadata
|
|
self.chunk_texts = []
|
|
self.chunk_ids = []
|
|
|
|
for idx, row in df.iterrows():
|
|
# Create searchable text combining content, name, and type
|
|
searchable_text = f"{row['content']} {row['name'] or ''} {row['chunk_type']}"
|
|
|
|
# Tokenize for BM25 (simple word splitting)
|
|
tokens = searchable_text.lower().split()
|
|
|
|
self.chunk_texts.append(tokens)
|
|
self.chunk_ids.append(idx)
|
|
|
|
# Build BM25 index
|
|
self.bm25 = BM25Okapi(self.chunk_texts)
|
|
logger.info(f"Built BM25 index with {len(self.chunk_texts)} chunks")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to build BM25 index: {e}")
|
|
self.bm25 = None
|
|
|
|
def get_chunk_context(self, chunk_id: str, include_adjacent: bool = True, include_parent: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
Get context for a specific chunk including adjacent and parent chunks.
|
|
|
|
Args:
|
|
chunk_id: The ID of the chunk to get context for
|
|
include_adjacent: Whether to include previous and next chunks
|
|
include_parent: Whether to include parent class chunk for methods
|
|
|
|
Returns:
|
|
Dictionary with 'chunk', 'prev', 'next', and 'parent' SearchResults
|
|
"""
|
|
if not self.table:
|
|
raise RuntimeError("Database not connected")
|
|
|
|
try:
|
|
# Get the main chunk by ID
|
|
df = self.table.to_pandas()
|
|
chunk_rows = df[df['chunk_id'] == chunk_id]
|
|
|
|
if chunk_rows.empty:
|
|
return {'chunk': None, 'prev': None, 'next': None, 'parent': None}
|
|
|
|
chunk_row = chunk_rows.iloc[0]
|
|
context = {'chunk': self._row_to_search_result(chunk_row, score=1.0)}
|
|
|
|
# Get adjacent chunks if requested
|
|
if include_adjacent:
|
|
# Get previous chunk
|
|
if pd.notna(chunk_row.get('prev_chunk_id')):
|
|
prev_rows = df[df['chunk_id'] == chunk_row['prev_chunk_id']]
|
|
if not prev_rows.empty:
|
|
context['prev'] = self._row_to_search_result(prev_rows.iloc[0], score=1.0)
|
|
else:
|
|
context['prev'] = None
|
|
else:
|
|
context['prev'] = None
|
|
|
|
# Get next chunk
|
|
if pd.notna(chunk_row.get('next_chunk_id')):
|
|
next_rows = df[df['chunk_id'] == chunk_row['next_chunk_id']]
|
|
if not next_rows.empty:
|
|
context['next'] = self._row_to_search_result(next_rows.iloc[0], score=1.0)
|
|
else:
|
|
context['next'] = None
|
|
else:
|
|
context['next'] = None
|
|
else:
|
|
context['prev'] = None
|
|
context['next'] = None
|
|
|
|
# Get parent class chunk if requested and applicable
|
|
if include_parent and pd.notna(chunk_row.get('parent_class')):
|
|
# Find the parent class chunk
|
|
parent_rows = df[(df['name'] == chunk_row['parent_class']) &
|
|
(df['chunk_type'] == 'class') &
|
|
(df['file_path'] == chunk_row['file_path'])]
|
|
if not parent_rows.empty:
|
|
context['parent'] = self._row_to_search_result(parent_rows.iloc[0], score=1.0)
|
|
else:
|
|
context['parent'] = None
|
|
else:
|
|
context['parent'] = None
|
|
|
|
return context
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get chunk context: {e}")
|
|
return {'chunk': None, 'prev': None, 'next': None, 'parent': None}
|
|
|
|
def _row_to_search_result(self, row: pd.Series, score: float) -> SearchResult:
|
|
"""Convert a DataFrame row to a SearchResult."""
|
|
return SearchResult(
|
|
file_path=display_path(row['file_path']),
|
|
content=row['content'],
|
|
score=score,
|
|
start_line=row['start_line'],
|
|
end_line=row['end_line'],
|
|
chunk_type=row['chunk_type'],
|
|
name=row['name'],
|
|
language=row['language']
|
|
)
|
|
|
|
def search(self,
|
|
query: str,
|
|
top_k: int = 10,
|
|
chunk_types: Optional[List[str]] = None,
|
|
languages: Optional[List[str]] = None,
|
|
file_pattern: Optional[str] = None,
|
|
semantic_weight: float = 0.7,
|
|
bm25_weight: float = 0.3,
|
|
include_context: bool = False) -> List[SearchResult]:
|
|
"""
|
|
Hybrid search for code similar to the query using both semantic and BM25.
|
|
|
|
Args:
|
|
query: Natural language search query
|
|
top_k: Maximum number of results to return
|
|
chunk_types: Filter by chunk types (e.g., ['function', 'class'])
|
|
languages: Filter by languages (e.g., ['python', 'javascript'])
|
|
file_pattern: Filter by file path pattern (e.g., '**/test_*.py')
|
|
semantic_weight: Weight for semantic similarity (default 0.7)
|
|
bm25_weight: Weight for BM25 keyword score (default 0.3)
|
|
include_context: Whether to include adjacent and parent chunks for each result
|
|
|
|
Returns:
|
|
List of SearchResult objects, sorted by combined relevance
|
|
"""
|
|
if not self.table:
|
|
raise RuntimeError("Database not connected")
|
|
|
|
# Expand query for better recall (if enabled)
|
|
expanded_query = self.query_expander.expand_query(query)
|
|
|
|
# Use original query for display but expanded query for search
|
|
search_query = expanded_query if expanded_query != query else query
|
|
|
|
# Embed the expanded query for semantic search
|
|
query_embedding = self.embedder.embed_query(search_query)
|
|
|
|
# Ensure query is a numpy array of float32
|
|
if not isinstance(query_embedding, np.ndarray):
|
|
query_embedding = np.array(query_embedding, dtype=np.float32)
|
|
else:
|
|
query_embedding = query_embedding.astype(np.float32)
|
|
|
|
# Get more results for hybrid scoring
|
|
results_df = (
|
|
self.table.search(query_embedding)
|
|
.limit(top_k * 4) # Get extra results for filtering and diversity
|
|
.to_pandas()
|
|
)
|
|
|
|
if results_df.empty:
|
|
return []
|
|
|
|
# Apply filters first
|
|
if chunk_types:
|
|
results_df = results_df[results_df['chunk_type'].isin(chunk_types)]
|
|
|
|
if languages:
|
|
results_df = results_df[results_df['language'].isin(languages)]
|
|
|
|
if file_pattern:
|
|
import fnmatch
|
|
mask = results_df['file_path'].apply(
|
|
lambda x: fnmatch.fnmatch(x, file_pattern)
|
|
)
|
|
results_df = results_df[mask]
|
|
|
|
# Calculate BM25 scores if available
|
|
if self.bm25:
|
|
# Tokenize expanded query for BM25
|
|
query_tokens = search_query.lower().split()
|
|
|
|
# Get BM25 scores for all chunks in results
|
|
bm25_scores = {}
|
|
for idx, row in results_df.iterrows():
|
|
if idx in self.chunk_ids:
|
|
chunk_idx = self.chunk_ids.index(idx)
|
|
bm25_score = self.bm25.get_scores(query_tokens)[chunk_idx]
|
|
# Normalize BM25 score to 0-1 range
|
|
bm25_scores[idx] = min(bm25_score / 10.0, 1.0)
|
|
else:
|
|
bm25_scores[idx] = 0.0
|
|
else:
|
|
bm25_scores = {idx: 0.0 for idx in results_df.index}
|
|
|
|
# Calculate hybrid scores
|
|
hybrid_results = []
|
|
for idx, row in results_df.iterrows():
|
|
# Semantic score (convert distance to similarity)
|
|
distance = row['_distance']
|
|
semantic_score = 1 / (1 + distance)
|
|
|
|
# BM25 score
|
|
bm25_score = bm25_scores.get(idx, 0.0)
|
|
|
|
# Combined score
|
|
combined_score = (semantic_weight * semantic_score +
|
|
bm25_weight * bm25_score)
|
|
|
|
result = SearchResult(
|
|
file_path=display_path(row['file_path']),
|
|
content=row['content'],
|
|
score=combined_score,
|
|
start_line=row['start_line'],
|
|
end_line=row['end_line'],
|
|
chunk_type=row['chunk_type'],
|
|
name=row['name'],
|
|
language=row['language']
|
|
)
|
|
hybrid_results.append(result)
|
|
|
|
# Apply smart re-ranking for better quality (zero overhead)
|
|
hybrid_results = self._smart_rerank(hybrid_results)
|
|
|
|
# Apply diversity constraints
|
|
diverse_results = self._apply_diversity_constraints(hybrid_results, top_k)
|
|
|
|
# Add context if requested
|
|
if include_context:
|
|
diverse_results = self._add_context_to_results(diverse_results, results_df)
|
|
|
|
return diverse_results
|
|
|
|
def _smart_rerank(self, results: List[SearchResult]) -> List[SearchResult]:
|
|
"""
|
|
Smart result re-ranking for better quality with zero overhead.
|
|
|
|
Boosts scores based on:
|
|
- File importance (README, main files, configs)
|
|
- Content freshness (recently modified files)
|
|
- File type relevance
|
|
"""
|
|
now = datetime.now()
|
|
|
|
for result in results:
|
|
# File importance boost (20% boost for important files)
|
|
file_path_lower = str(result.file_path).lower()
|
|
important_patterns = [
|
|
'readme', 'main.', 'index.', '__init__', 'config',
|
|
'setup', 'install', 'getting', 'started', 'docs/',
|
|
'documentation', 'guide', 'tutorial', 'example'
|
|
]
|
|
|
|
if any(pattern in file_path_lower for pattern in important_patterns):
|
|
result.score *= 1.2
|
|
logger.debug(f"Important file boost: {result.file_path}")
|
|
|
|
# Recency boost (10% boost for files modified in last week)
|
|
# Note: This uses file modification time if available in the data
|
|
try:
|
|
# Get file modification time (this is lightweight)
|
|
file_mtime = Path(result.file_path).stat().st_mtime
|
|
modified_date = datetime.fromtimestamp(file_mtime)
|
|
days_old = (now - modified_date).days
|
|
|
|
if days_old <= 7: # Modified in last week
|
|
result.score *= 1.1
|
|
logger.debug(f"Recent file boost: {result.file_path} ({days_old} days old)")
|
|
elif days_old <= 30: # Modified in last month
|
|
result.score *= 1.05
|
|
|
|
except (OSError, ValueError):
|
|
# File doesn't exist or can't get stats - no boost
|
|
pass
|
|
|
|
# Content type relevance boost
|
|
if hasattr(result, 'chunk_type'):
|
|
if result.chunk_type in ['function', 'class', 'method']:
|
|
# Code definitions are usually more valuable
|
|
result.score *= 1.1
|
|
elif result.chunk_type in ['comment', 'docstring']:
|
|
# Documentation is valuable for understanding
|
|
result.score *= 1.05
|
|
|
|
# Penalize very short content (likely not useful)
|
|
if len(result.content.strip()) < 50:
|
|
result.score *= 0.9
|
|
|
|
# Small boost for content with good structure (has multiple lines)
|
|
lines = result.content.strip().split('\n')
|
|
if len(lines) >= 3 and any(len(line.strip()) > 10 for line in lines):
|
|
result.score *= 1.02
|
|
|
|
# Sort by updated scores
|
|
return sorted(results, key=lambda x: x.score, reverse=True)
|
|
|
|
def _apply_diversity_constraints(self, results: List[SearchResult], top_k: int) -> List[SearchResult]:
|
|
"""
|
|
Apply diversity constraints to search results.
|
|
|
|
- Max 2 chunks per file
|
|
- Prefer different chunk types
|
|
- Deduplicate overlapping content
|
|
"""
|
|
final_results = []
|
|
file_counts = defaultdict(int)
|
|
seen_content_hashes = set()
|
|
chunk_type_counts = defaultdict(int)
|
|
|
|
for result in results:
|
|
# Check file limit
|
|
if file_counts[result.file_path] >= 2:
|
|
continue
|
|
|
|
# Check for duplicate/overlapping content
|
|
content_hash = hash(result.content.strip()[:200]) # Hash first 200 chars
|
|
if content_hash in seen_content_hashes:
|
|
continue
|
|
|
|
# Prefer diverse chunk types
|
|
if len(final_results) >= top_k // 2 and chunk_type_counts[result.chunk_type] > top_k // 3:
|
|
# Skip if we have too many of this type already
|
|
continue
|
|
|
|
# Add result
|
|
final_results.append(result)
|
|
file_counts[result.file_path] += 1
|
|
seen_content_hashes.add(content_hash)
|
|
chunk_type_counts[result.chunk_type] += 1
|
|
|
|
if len(final_results) >= top_k:
|
|
break
|
|
|
|
return final_results
|
|
|
|
def _add_context_to_results(self, results: List[SearchResult], search_df: pd.DataFrame) -> List[SearchResult]:
|
|
"""
|
|
Add context (adjacent and parent chunks) to search results.
|
|
|
|
Args:
|
|
results: List of search results to add context to
|
|
search_df: DataFrame from the initial search (for finding chunk_id)
|
|
|
|
Returns:
|
|
List of SearchResult objects with context added
|
|
"""
|
|
# Get full dataframe for context lookups
|
|
full_df = self.table.to_pandas()
|
|
|
|
# Create a mapping from result to chunk_id
|
|
result_to_chunk_id = {}
|
|
for result in results:
|
|
# Find matching row in search_df
|
|
matching_rows = search_df[
|
|
(search_df['file_path'] == result.file_path) &
|
|
(search_df['start_line'] == result.start_line) &
|
|
(search_df['end_line'] == result.end_line)
|
|
]
|
|
if not matching_rows.empty:
|
|
result_to_chunk_id[result] = matching_rows.iloc[0]['chunk_id']
|
|
|
|
# Add context to each result
|
|
for result in results:
|
|
chunk_id = result_to_chunk_id.get(result)
|
|
if not chunk_id:
|
|
continue
|
|
|
|
# Get the row for this chunk
|
|
chunk_rows = full_df[full_df['chunk_id'] == chunk_id]
|
|
if chunk_rows.empty:
|
|
continue
|
|
|
|
chunk_row = chunk_rows.iloc[0]
|
|
|
|
# Add adjacent chunks as context
|
|
if pd.notna(chunk_row.get('prev_chunk_id')):
|
|
prev_rows = full_df[full_df['chunk_id'] == chunk_row['prev_chunk_id']]
|
|
if not prev_rows.empty:
|
|
result.context_before = prev_rows.iloc[0]['content']
|
|
|
|
if pd.notna(chunk_row.get('next_chunk_id')):
|
|
next_rows = full_df[full_df['chunk_id'] == chunk_row['next_chunk_id']]
|
|
if not next_rows.empty:
|
|
result.context_after = next_rows.iloc[0]['content']
|
|
|
|
# Add parent class chunk if applicable
|
|
if pd.notna(chunk_row.get('parent_class')):
|
|
parent_rows = full_df[
|
|
(full_df['name'] == chunk_row['parent_class']) &
|
|
(full_df['chunk_type'] == 'class') &
|
|
(full_df['file_path'] == chunk_row['file_path'])
|
|
]
|
|
if not parent_rows.empty:
|
|
parent_row = parent_rows.iloc[0]
|
|
result.parent_chunk = SearchResult(
|
|
file_path=display_path(parent_row['file_path']),
|
|
content=parent_row['content'],
|
|
score=1.0,
|
|
start_line=parent_row['start_line'],
|
|
end_line=parent_row['end_line'],
|
|
chunk_type=parent_row['chunk_type'],
|
|
name=parent_row['name'],
|
|
language=parent_row['language']
|
|
)
|
|
|
|
return results
|
|
|
|
def search_similar_code(self,
|
|
code_snippet: str,
|
|
top_k: int = 10,
|
|
exclude_self: bool = True) -> List[SearchResult]:
|
|
"""
|
|
Find code similar to a given snippet using hybrid search.
|
|
|
|
Args:
|
|
code_snippet: Code to find similar matches for
|
|
top_k: Maximum number of results
|
|
exclude_self: Whether to exclude exact matches
|
|
|
|
Returns:
|
|
List of similar code chunks
|
|
"""
|
|
# Use the code snippet as query for hybrid search
|
|
# This will use both semantic similarity and keyword matching
|
|
results = self.search(
|
|
query=code_snippet,
|
|
top_k=top_k * 2 if exclude_self else top_k,
|
|
semantic_weight=0.8, # Higher semantic weight for code similarity
|
|
bm25_weight=0.2
|
|
)
|
|
|
|
if exclude_self:
|
|
# Filter out exact matches
|
|
filtered_results = []
|
|
for result in results:
|
|
if result.content.strip() != code_snippet.strip():
|
|
filtered_results.append(result)
|
|
if len(filtered_results) >= top_k:
|
|
break
|
|
return filtered_results
|
|
|
|
return results[:top_k]
|
|
|
|
def get_function(self, function_name: str, top_k: int = 5) -> List[SearchResult]:
|
|
"""
|
|
Search for a specific function by name.
|
|
|
|
Args:
|
|
function_name: Name of the function to find
|
|
top_k: Maximum number of results
|
|
|
|
Returns:
|
|
List of matching functions
|
|
"""
|
|
# Create a targeted query
|
|
query = f"function {function_name} implementation definition"
|
|
|
|
# Search with filters
|
|
results = self.search(
|
|
query,
|
|
top_k=top_k * 2,
|
|
chunk_types=['function', 'method']
|
|
)
|
|
|
|
# Further filter by name
|
|
filtered = []
|
|
for result in results:
|
|
if result.name and function_name.lower() in result.name.lower():
|
|
filtered.append(result)
|
|
|
|
return filtered[:top_k]
|
|
|
|
def get_class(self, class_name: str, top_k: int = 5) -> List[SearchResult]:
|
|
"""
|
|
Search for a specific class by name.
|
|
|
|
Args:
|
|
class_name: Name of the class to find
|
|
top_k: Maximum number of results
|
|
|
|
Returns:
|
|
List of matching classes
|
|
"""
|
|
# Create a targeted query
|
|
query = f"class {class_name} definition implementation"
|
|
|
|
# Search with filters
|
|
results = self.search(
|
|
query,
|
|
top_k=top_k * 2,
|
|
chunk_types=['class']
|
|
)
|
|
|
|
# Further filter by name
|
|
filtered = []
|
|
for result in results:
|
|
if result.name and class_name.lower() in result.name.lower():
|
|
filtered.append(result)
|
|
|
|
return filtered[:top_k]
|
|
|
|
def explain_code(self, query: str, top_k: int = 5) -> List[SearchResult]:
|
|
"""
|
|
Find code that helps explain a concept.
|
|
|
|
Args:
|
|
query: Concept to explain (e.g., "how to connect to database")
|
|
top_k: Maximum number of examples
|
|
|
|
Returns:
|
|
List of relevant code examples
|
|
"""
|
|
# Enhance query for explanation
|
|
enhanced_query = f"example implementation {query}"
|
|
|
|
return self.search(enhanced_query, top_k=top_k)
|
|
|
|
def find_usage(self, identifier: str, top_k: int = 10) -> List[SearchResult]:
|
|
"""
|
|
Find usage examples of an identifier (function, class, variable).
|
|
|
|
Args:
|
|
identifier: The identifier to find usage for
|
|
top_k: Maximum number of results
|
|
|
|
Returns:
|
|
List of usage examples
|
|
"""
|
|
# Search for usage patterns
|
|
query = f"using {identifier} calling {identifier} import {identifier}"
|
|
|
|
results = self.search(query, top_k=top_k * 2)
|
|
|
|
# Filter to ensure identifier appears in content
|
|
filtered = []
|
|
for result in results:
|
|
if identifier in result.content:
|
|
filtered.append(result)
|
|
|
|
return filtered[:top_k]
|
|
|
|
def display_results(self,
|
|
results: List[SearchResult],
|
|
show_content: bool = True,
|
|
max_content_lines: int = 10):
|
|
"""
|
|
Display search results in a formatted table.
|
|
|
|
Args:
|
|
results: List of search results
|
|
show_content: Whether to show code content
|
|
max_content_lines: Maximum lines of content to show
|
|
"""
|
|
if not results:
|
|
console.print("[yellow]No results found[/yellow]")
|
|
return
|
|
|
|
# Create table
|
|
table = Table(title=f"Search Results ({len(results)} matches)")
|
|
table.add_column("Score", style="cyan", width=6)
|
|
table.add_column("File", style="blue")
|
|
table.add_column("Type", style="green", width=10)
|
|
table.add_column("Name", style="magenta")
|
|
table.add_column("Lines", style="yellow", width=10)
|
|
|
|
for result in results:
|
|
table.add_row(
|
|
f"{result.score:.3f}",
|
|
result.file_path,
|
|
result.chunk_type,
|
|
result.name or "-",
|
|
f"{result.start_line}-{result.end_line}"
|
|
)
|
|
|
|
console.print(table)
|
|
|
|
# Show content if requested
|
|
if show_content and results:
|
|
console.print("\n[bold]Top Results:[/bold]\n")
|
|
|
|
for i, result in enumerate(results[:3], 1):
|
|
console.print(f"[bold cyan]#{i}[/bold cyan] {result.file_path}:{result.start_line}")
|
|
console.print(f"[dim]Type: {result.chunk_type} | Name: {result.name}[/dim]")
|
|
|
|
# Display code with syntax highlighting
|
|
syntax = Syntax(
|
|
result.format_for_display(max_content_lines),
|
|
result.language,
|
|
theme="monokai",
|
|
line_numbers=True,
|
|
start_line=result.start_line
|
|
)
|
|
console.print(syntax)
|
|
console.print()
|
|
|
|
def get_statistics(self) -> Dict[str, Any]:
|
|
"""Get search index statistics."""
|
|
if not self.table:
|
|
return {'error': 'Database not connected'}
|
|
|
|
try:
|
|
# Get table statistics
|
|
num_rows = len(self.table.to_pandas())
|
|
|
|
# Get unique files
|
|
df = self.table.to_pandas()
|
|
unique_files = df['file_path'].nunique()
|
|
|
|
# Get chunk type distribution
|
|
chunk_types = df['chunk_type'].value_counts().to_dict()
|
|
|
|
# Get language distribution
|
|
languages = df['language'].value_counts().to_dict()
|
|
|
|
return {
|
|
'total_chunks': num_rows,
|
|
'unique_files': unique_files,
|
|
'chunk_types': chunk_types,
|
|
'languages': languages,
|
|
'index_ready': True,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get statistics: {e}")
|
|
return {'error': str(e)}
|
|
|
|
|
|
# Convenience functions
|
|
def search_code(project_path: Path, query: str, top_k: int = 10) -> List[SearchResult]:
|
|
"""
|
|
Quick search function.
|
|
|
|
Args:
|
|
project_path: Path to the project
|
|
query: Search query
|
|
top_k: Maximum results
|
|
|
|
Returns:
|
|
List of search results
|
|
"""
|
|
searcher = CodeSearcher(project_path)
|
|
return searcher.search(query, top_k=top_k) |