4
4
from tokencost import count_string_tokens
5
5
from typing import Dict , List , Union
6
6
7
+ MAX_DIRECTORY_DEPTH = 10 # Maximum depth of directory traversal
8
+ MAX_FILES = 10000 # Maximum number of files to process
9
+ MAX_TOTAL_SIZE_BYTES = 100 * 1024 * 1024 # 100MB total size limit
10
+
7
11
def should_ignore (path : str , base_path : str , ignore_patterns : List [str ]) -> bool :
8
12
"""Checks if a file or directory should be ignored based on patterns."""
9
13
name = os .path .basename (path )
@@ -15,6 +19,19 @@ def should_ignore(path: str, base_path: str, ignore_patterns: List[str]) -> bool
15
19
return True
16
20
return False
17
21
22
+ def is_safe_symlink (symlink_path : str , base_path : str ) -> bool :
23
+ """Check if a symlink points to a location within the base directory."""
24
+ try :
25
+ # Get the absolute path of the symlink target
26
+ target_path = os .path .realpath (symlink_path )
27
+ # Get the absolute path of the base directory
28
+ base_path = os .path .realpath (base_path )
29
+ # Check if the target path starts with the base path
30
+ return os .path .commonpath ([target_path ]) == os .path .commonpath ([target_path , base_path ])
31
+ except (OSError , ValueError ):
32
+ # If there's any error resolving the paths, consider it unsafe
33
+ return False
34
+
18
35
def is_text_file (file_path : str ) -> bool :
19
36
"""Determines if a file is likely a text file based on its content."""
20
37
try :
@@ -32,8 +49,34 @@ def read_file_content(file_path: str) -> str:
32
49
except Exception as e :
33
50
return f"Error reading file: { str (e )} "
34
51
35
- def scan_directory (path : str , ignore_patterns : List [str ], base_path : str ) -> Dict :
36
- """Recursively analyzes a directory and its contents."""
52
+ def scan_directory (path : str , ignore_patterns : List [str ], base_path : str , seen_paths : set = None , depth : int = 0 , stats : Dict = None ) -> Dict :
53
+ """Recursively analyzes a directory and its contents with safety limits."""
54
+ if seen_paths is None :
55
+ seen_paths = set ()
56
+ if stats is None :
57
+ stats = {"total_files" : 0 , "total_size" : 0 }
58
+
59
+ # Check depth limit
60
+ if depth > MAX_DIRECTORY_DEPTH :
61
+ print (f"Skipping deep directory: { path } (max depth { MAX_DIRECTORY_DEPTH } reached)" )
62
+ return None
63
+
64
+ # Check total files limit
65
+ if stats ["total_files" ] >= MAX_FILES :
66
+ print (f"Skipping further processing: maximum file limit ({ MAX_FILES } ) reached" )
67
+ return None
68
+
69
+ # Check total size limit
70
+ if stats ["total_size" ] >= MAX_TOTAL_SIZE_BYTES :
71
+ print (f"Skipping further processing: maximum total size ({ MAX_TOTAL_SIZE_BYTES / 1024 / 1024 :.1f} MB) reached" )
72
+ return None
73
+
74
+ real_path = os .path .realpath (path )
75
+ if real_path in seen_paths :
76
+ print (f"Skipping already visited path: { path } " )
77
+ return None
78
+ seen_paths .add (real_path )
79
+
37
80
result = {
38
81
"name" : os .path .basename (path ),
39
82
"type" : "directory" ,
@@ -51,8 +94,69 @@ def scan_directory(path: str, ignore_patterns: List[str], base_path: str) -> Dic
51
94
if should_ignore (item_path , base_path , ignore_patterns ):
52
95
continue
53
96
97
+ # Handle symlinks
98
+ if os .path .islink (item_path ):
99
+ if not is_safe_symlink (item_path , base_path ):
100
+ print (f"Skipping symlink that points outside base directory: { item_path } " )
101
+ continue
102
+ real_path = os .path .realpath (item_path )
103
+ if real_path in seen_paths :
104
+ print (f"Skipping already visited symlink target: { item_path } " )
105
+ continue
106
+
107
+ if os .path .isfile (real_path ):
108
+ file_size = os .path .getsize (real_path )
109
+ # Check if adding this file would exceed total size limit
110
+ if stats ["total_size" ] + file_size > MAX_TOTAL_SIZE_BYTES :
111
+ print (f"Skipping file { item_path } : would exceed total size limit" )
112
+ continue
113
+
114
+ stats ["total_files" ] += 1
115
+ stats ["total_size" ] += file_size
116
+
117
+ if stats ["total_files" ] > MAX_FILES :
118
+ print (f"Maximum file limit ({ MAX_FILES } ) reached" )
119
+ return result
120
+
121
+ is_text = is_text_file (real_path )
122
+ content = read_file_content (real_path ) if is_text else "[Non-text file]"
123
+
124
+ child = {
125
+ "name" : item ,
126
+ "type" : "file" ,
127
+ "size" : file_size ,
128
+ "content" : content ,
129
+ "path" : item_path
130
+ }
131
+ result ["children" ].append (child )
132
+ result ["size" ] += file_size
133
+ result ["file_count" ] += 1
134
+
135
+ elif os .path .isdir (real_path ):
136
+ subdir = scan_directory (real_path , ignore_patterns , base_path , seen_paths , depth + 1 , stats )
137
+ if subdir :
138
+ subdir ["name" ] = item
139
+ subdir ["path" ] = item_path
140
+ result ["children" ].append (subdir )
141
+ result ["size" ] += subdir ["size" ]
142
+ result ["file_count" ] += subdir ["file_count" ]
143
+ result ["dir_count" ] += 1 + subdir ["dir_count" ]
144
+ continue
145
+
54
146
if os .path .isfile (item_path ):
55
147
file_size = os .path .getsize (item_path )
148
+ # Check if adding this file would exceed total size limit
149
+ if stats ["total_size" ] + file_size > MAX_TOTAL_SIZE_BYTES :
150
+ print (f"Skipping file { item_path } : would exceed total size limit" )
151
+ continue
152
+
153
+ stats ["total_files" ] += 1
154
+ stats ["total_size" ] += file_size
155
+
156
+ if stats ["total_files" ] > MAX_FILES :
157
+ print (f"Maximum file limit ({ MAX_FILES } ) reached" )
158
+ return result
159
+
56
160
is_text = is_text_file (item_path )
57
161
content = read_file_content (item_path ) if is_text else "[Non-text file]"
58
162
@@ -68,7 +172,7 @@ def scan_directory(path: str, ignore_patterns: List[str], base_path: str) -> Dic
68
172
result ["file_count" ] += 1
69
173
70
174
elif os .path .isdir (item_path ):
71
- subdir = scan_directory (item_path , ignore_patterns , base_path )
175
+ subdir = scan_directory (item_path , ignore_patterns , base_path , seen_paths , depth + 1 , stats )
72
176
if subdir :
73
177
result ["children" ].append (subdir )
74
178
result ["size" ] += subdir ["size" ]
0 commit comments