Skip to content

Context window001 #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 53 additions & 24 deletions nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from utils.crawl_github_files import crawl_github_files
from utils.call_llm import call_llm
from utils.crawl_local_files import crawl_local_files
from utils.token_manager import TokenManager


# Helper to get content for specific file indices
Expand All @@ -26,13 +27,17 @@ def prep(self, shared):
project_name = shared.get("project_name")

if not project_name:
# Basic name derivation from URL or directory
# Extract project name from repo URL or directory
if repo_url:
project_name = repo_url.split("/")[-1].replace(".git", "")
project_name = repo_url.rstrip("/").split("/")[-1].replace(".git", "")
else:
project_name = os.path.basename(os.path.abspath(local_dir))
shared["project_name"] = project_name

# Initialize token manager
self.token_manager = TokenManager()
shared["token_manager"] = self.token_manager

# Get file patterns directly from shared
include_patterns = shared["include_patterns"]
exclude_patterns = shared["exclude_patterns"]
Expand Down Expand Up @@ -74,46 +79,70 @@ def exec(self, prep_res):
files_list = list(result.get("files", {}).items())
if len(files_list) == 0:
raise (ValueError("Failed to fetch files"))

# Create hierarchical context using token manager
context = self.token_manager.create_hierarchical_context(files_list)

print(f"Fetched {len(files_list)} files.")
return files_list
print(f"Created hierarchical context with {len(context['levels'])} levels")
print(f"Available tokens: {self.token_manager.get_available_tokens()}")

# Store both full files list and hierarchical context
return {
"files": files_list,
"hierarchical_context": context
}

def post(self, shared, prep_res, exec_res):
shared["files"] = exec_res # List of (path, content) tuples
shared["files"] = exec_res["files"]
shared["hierarchical_context"] = exec_res["hierarchical_context"]


class IdentifyAbstractions(Node):
def prep(self, shared):
files_data = shared["files"]
project_name = shared["project_name"] # Get project name
language = shared.get("language", "english") # Get language
use_cache = shared.get("use_cache", True) # Get use_cache flag, default to True
max_abstraction_num = shared.get("max_abstraction_num", 10) # Get max_abstraction_num, default to 10

# Helper to create context from files, respecting limits (basic example)
def create_llm_context(files_data):
context = ""
file_info = [] # Store tuples of (index, path)
for i, (path, content) in enumerate(files_data):
entry = f"--- File Index {i}: {path} ---\n{content}\n\n"
context += entry
file_info.append((i, path))

return context, file_info # file_info is list of (index, path)

context, file_info = create_llm_context(files_data)
# Format file info for the prompt (comment is just a hint for LLM)
hierarchical_context = shared["hierarchical_context"]
project_name = shared["project_name"]
language = shared.get("language", "english")
use_cache = shared.get("use_cache", True)
max_abstraction_num = shared.get("max_abstraction_num", 10)
token_manager = shared["token_manager"]

# Create context from hierarchical structure
def create_llm_context():
context_parts = []
file_info = []
idx = 0

# Process each level in the hierarchy
for depth, level_files in sorted(hierarchical_context["levels"].items()):
context_parts.append(f"\n--- Level {depth} Files ---\n")

for file_data in level_files:
path = file_data["path"]
content = file_data["content"]
content_type = file_data["type"]

context_parts.append(f"File {idx} ({content_type}): {path}\n{content}\n")
file_info.append((idx, path))
idx += 1

return "\n".join(context_parts), file_info

context, file_info = create_llm_context()
file_listing_for_prompt = "\n".join(
[f"- {idx} # {path}" for idx, path in file_info]
)

return (
context,
file_listing_for_prompt,
len(files_data),
project_name,
language,
use_cache,
max_abstraction_num,
) # Return all parameters
max_abstraction_num
)

def exec(self, prep_res):
(
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ google-cloud-aiplatform>=1.25.0
google-genai>=1.9.0
python-dotenv>=1.0.0
pathspec>=0.11.0
tiktoken>=0.9.0
116 changes: 116 additions & 0 deletions utils/token_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import tiktoken
from typing import Dict, List, Tuple, Any
import os

class TokenManager:
def __init__(self, model_name: str = "gpt-4", max_tokens: int = 128000):
"""Initialize the token manager.

Args:
model_name: The name of the model to use for token counting
max_tokens: Maximum tokens allowed in context
"""
self.encoder = tiktoken.encoding_for_model(model_name)
self.max_tokens = max_tokens
self.current_tokens = 0
self.content_tokens: Dict[str, int] = {}

def count_tokens(self, text: str) -> int:
"""Count the number of tokens in a text."""
return len(self.encoder.encode(text))

def add_content(self, key: str, content: str) -> bool:
"""Add content to the token manager.

Returns:
bool: True if content was added, False if it would exceed token limit
"""
tokens = self.count_tokens(content)
if self.current_tokens + tokens > self.max_tokens:
return False

self.content_tokens[key] = tokens
self.current_tokens += tokens
return True

def remove_content(self, key: str) -> None:
"""Remove content from the token manager."""
if key in self.content_tokens:
self.current_tokens -= self.content_tokens[key]
del self.content_tokens[key]

def get_available_tokens(self) -> int:
"""Get the number of tokens still available."""
return self.max_tokens - self.current_tokens

def create_hierarchical_context(self, files_data: List[Tuple[str, str]],
max_files_per_level: int = 50) -> Dict[str, Any]:
"""Create a hierarchical context from files data.

Args:
files_data: List of (path, content) tuples
max_files_per_level: Maximum number of files to include at each level

Returns:
Dict containing hierarchical context information
"""
# Group files by directory level
hierarchy: Dict[str, List[Tuple[str, str]]] = {}

for path, content in files_data:
depth = len(os.path.normpath(path).split(os.sep))
if depth not in hierarchy:
hierarchy[depth] = []
hierarchy[depth].append((path, content))

# Process each level
context = {
"levels": {},
"file_summaries": {},
"total_files": len(files_data)
}

for depth in sorted(hierarchy.keys()):
level_files = hierarchy[depth]

# Sort files by size and importance (e.g., prioritize non-test files)
level_files.sort(key=lambda x: (
"test" in x[0].lower(), # Deprioritize test files
-len(x[1]) # Prioritize larger files
))

# Take top N files for this level
selected_files = level_files[:max_files_per_level]

level_context = []
for path, content in selected_files:
# Try to add full content
if self.add_content(f"full_{path}", content):
level_context.append({
"path": path,
"type": "full",
"content": content
})
else:
# If full content doesn't fit, add a summary
summary = self._create_file_summary(path, content)
if self.add_content(f"summary_{path}", summary):
level_context.append({
"path": path,
"type": "summary",
"content": summary
})

if level_context:
context["levels"][depth] = level_context

return context

def _create_file_summary(self, path: str, content: str) -> str:
"""Create a summary of a file's content."""
# Basic summary: first few lines and size info
lines = content.split('\n')[:10] # First 10 lines
summary = f"File: {path}\n"
summary += f"Size: {len(content)} chars\n"
summary += f"Preview:\n{''.join(lines)}\n..."
return summary