Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@ FROM python:3.12

WORKDIR /app

# Create a non-root user
RUN useradd -m -u 1000 appuser

COPY src/ ./
COPY requirements.txt ./

RUN pip install -r requirements.txt

# Change ownership of the application files
RUN chown -R appuser:appuser /app

# Switch to non-root user
USER appuser

CMD ["uvicorn", "main:app", "--reload"]
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
fastapi[standard]
uvicorn
fastapi-analytics
slowapi
slowapi
tokencost
110 changes: 107 additions & 3 deletions src/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
from tokencost import count_string_tokens
from typing import Dict, List, Union

MAX_DIRECTORY_DEPTH = 10 # Maximum depth of directory traversal
MAX_FILES = 10000 # Maximum number of files to process
MAX_TOTAL_SIZE_BYTES = 100 * 1024 * 1024 # 100MB total size limit

def should_ignore(path: str, base_path: str, ignore_patterns: List[str]) -> bool:
"""Checks if a file or directory should be ignored based on patterns."""
name = os.path.basename(path)
Expand All @@ -15,6 +19,19 @@ def should_ignore(path: str, base_path: str, ignore_patterns: List[str]) -> bool
return True
return False

def is_safe_symlink(symlink_path: str, base_path: str) -> bool:
"""Check if a symlink points to a location within the base directory."""
try:
# Get the absolute path of the symlink target
target_path = os.path.realpath(symlink_path)
# Get the absolute path of the base directory
base_path = os.path.realpath(base_path)
# Check if the target path starts with the base path
return os.path.commonpath([target_path]) == os.path.commonpath([target_path, base_path])
except (OSError, ValueError):
# If there's any error resolving the paths, consider it unsafe
return False

def is_text_file(file_path: str) -> bool:
"""Determines if a file is likely a text file based on its content."""
try:
Expand All @@ -32,8 +49,34 @@ def read_file_content(file_path: str) -> str:
except Exception as e:
return f"Error reading file: {str(e)}"

def scan_directory(path: str, ignore_patterns: List[str], base_path: str) -> Dict:
"""Recursively analyzes a directory and its contents."""
def scan_directory(path: str, ignore_patterns: List[str], base_path: str, seen_paths: set = None, depth: int = 0, stats: Dict = None) -> Dict:
"""Recursively analyzes a directory and its contents with safety limits."""
if seen_paths is None:
seen_paths = set()
if stats is None:
stats = {"total_files": 0, "total_size": 0}

# Check depth limit
if depth > MAX_DIRECTORY_DEPTH:
print(f"Skipping deep directory: {path} (max depth {MAX_DIRECTORY_DEPTH} reached)")
return None

# Check total files limit
if stats["total_files"] >= MAX_FILES:
print(f"Skipping further processing: maximum file limit ({MAX_FILES}) reached")
return None

# Check total size limit
if stats["total_size"] >= MAX_TOTAL_SIZE_BYTES:
print(f"Skipping further processing: maximum total size ({MAX_TOTAL_SIZE_BYTES/1024/1024:.1f}MB) reached")
return None

real_path = os.path.realpath(path)
if real_path in seen_paths:
print(f"Skipping already visited path: {path}")
return None
seen_paths.add(real_path)

result = {
"name": os.path.basename(path),
"type": "directory",
Expand All @@ -51,8 +94,69 @@ def scan_directory(path: str, ignore_patterns: List[str], base_path: str) -> Dic
if should_ignore(item_path, base_path, ignore_patterns):
continue

# Handle symlinks
if os.path.islink(item_path):
if not is_safe_symlink(item_path, base_path):
print(f"Skipping symlink that points outside base directory: {item_path}")
continue
real_path = os.path.realpath(item_path)
if real_path in seen_paths:
print(f"Skipping already visited symlink target: {item_path}")
continue

if os.path.isfile(real_path):
file_size = os.path.getsize(real_path)
# Check if adding this file would exceed total size limit
if stats["total_size"] + file_size > MAX_TOTAL_SIZE_BYTES:
print(f"Skipping file {item_path}: would exceed total size limit")
continue

stats["total_files"] += 1
stats["total_size"] += file_size

if stats["total_files"] > MAX_FILES:
print(f"Maximum file limit ({MAX_FILES}) reached")
return result

is_text = is_text_file(real_path)
content = read_file_content(real_path) if is_text else "[Non-text file]"

child = {
"name": item,
"type": "file",
"size": file_size,
"content": content,
"path": item_path
}
result["children"].append(child)
result["size"] += file_size
result["file_count"] += 1

elif os.path.isdir(real_path):
subdir = scan_directory(real_path, ignore_patterns, base_path, seen_paths, depth + 1, stats)
if subdir:
subdir["name"] = item
subdir["path"] = item_path
result["children"].append(subdir)
result["size"] += subdir["size"]
result["file_count"] += subdir["file_count"]
result["dir_count"] += 1 + subdir["dir_count"]
continue

if os.path.isfile(item_path):
file_size = os.path.getsize(item_path)
# Check if adding this file would exceed total size limit
if stats["total_size"] + file_size > MAX_TOTAL_SIZE_BYTES:
print(f"Skipping file {item_path}: would exceed total size limit")
continue

stats["total_files"] += 1
stats["total_size"] += file_size

if stats["total_files"] > MAX_FILES:
print(f"Maximum file limit ({MAX_FILES}) reached")
return result

is_text = is_text_file(item_path)
content = read_file_content(item_path) if is_text else "[Non-text file]"

Expand All @@ -68,7 +172,7 @@ def scan_directory(path: str, ignore_patterns: List[str], base_path: str) -> Dic
result["file_count"] += 1

elif os.path.isdir(item_path):
subdir = scan_directory(item_path, ignore_patterns, base_path)
subdir = scan_directory(item_path, ignore_patterns, base_path, seen_paths, depth + 1, stats)
if subdir:
result["children"].append(subdir)
result["size"] += subdir["size"]
Expand Down