-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add GitLab Support #129
Replies: 2 comments · 17 replies
-
Yeah would appreciate a PR. The main thing to extend is: https://github.com/The-Pocket/PocketFlow-Tutorial-Codebase-Knowledge/blob/main/utils/crawl_github_files.py |
Beta Was this translation helpful? Give feedback.
All reactions
-
👍 1
-
How well does claude perform? |
Beta Was this translation helpful? Give feedback.
All reactions
-
Traceback (most recent call last): its doesn't work😟 |
Beta Was this translation helpful? Give feedback.
All reactions
-
Asked claude to fix it for me and this seems to work import requests
import urllib.parse
import base64
import os
import tempfile
import git
import time
import fnmatch
from typing import Union, Set, List, Dict, Tuple, Any
from urllib.parse import urlparse
def crawl_github_files(
repo_url,
token=None,
max_file_size: int = 1 * 1024 * 1024, # 1 MB
use_relative_paths: bool = False,
include_patterns: Union[str, Set[str]] = None,
exclude_patterns: Union[str, Set[str]] = None
):
"""
Crawl files from a specific path in a GitHub or GitLab repository at a specific commit.
Args:
repo_url (str): URL of the GitHub or GitLab repository with specific path and commit
Examples:
- GitHub: 'https://github.com/microsoft/autogen/tree/e45a15766746d95f8cfaaa705b0371267bec812e/python/packages/autogen-core/src/autogen_core'
- GitLab: 'https://gitlab.com/gitlab-org/gitlab/-/tree/master/app/models'
token (str, optional): **GitHub personal access token or GitLab access token.**
- **Required for private repositories.**
- **Recommended for public repos to avoid rate limits.**
- Can be passed explicitly or set via the `GITHUB_TOKEN` or `GITLAB_TOKEN` environment variable.
max_file_size (int, optional): Maximum file size in bytes to download (default: 1 MB)
use_relative_paths (bool, optional): If True, file paths will be relative to the specified subdirectory
include_patterns (str or set of str, optional): Pattern or set of patterns specifying which files to include (e.g., "*.py", {"*.md", "*.txt"}).
If None, all files are included.
exclude_patterns (str or set of str, optional): Pattern or set of patterns specifying which files to exclude.
If None, no files are excluded.
Returns:
dict: Dictionary with files and statistics
"""
# Convert single pattern to set
if include_patterns and isinstance(include_patterns, str):
include_patterns = {include_patterns}
if exclude_patterns and isinstance(exclude_patterns, str):
exclude_patterns = {exclude_patterns}
def should_include_file(file_path: str, file_name: str) -> bool:
"""Determine if a file should be included based on patterns"""
# If no include patterns are specified, include all files
if not include_patterns:
include_file = True
else:
# Check if file matches any include pattern
include_file = any(fnmatch.fnmatch(file_name, pattern) for pattern in include_patterns)
# If exclude patterns are specified, check if file should be excluded
if exclude_patterns and include_file:
# Exclude if file matches any exclude pattern
exclude_file = any(fnmatch.fnmatch(file_path, pattern) for pattern in exclude_patterns)
return not exclude_file
return include_file
# Detect SSH URL (git@ or .git suffix)
is_ssh_url = repo_url.startswith("git@") or repo_url.endswith(".git")
if is_ssh_url:
# Clone repo via SSH to temp dir
with tempfile.TemporaryDirectory() as tmpdirname:
print(f"Cloning SSH repo {repo_url} to temp dir {tmpdirname} ...")
try:
repo = git.Repo.clone_from(repo_url, tmpdirname)
except Exception as e:
print(f"Error cloning repo: {e}")
return {"files": {}, "stats": {"error": str(e)}}
# Walk directory
files = {}
skipped_files = []
for root, dirs, filenames in os.walk(tmpdirname):
for filename in filenames:
abs_path = os.path.join(root, filename)
rel_path = os.path.relpath(abs_path, tmpdirname)
# Check file size
try:
file_size = os.path.getsize(abs_path)
except OSError:
continue
if file_size > max_file_size:
skipped_files.append((rel_path, file_size))
print(f"Skipping {rel_path}: size {file_size} exceeds limit {max_file_size}")
continue
# Check include/exclude patterns
if not should_include_file(rel_path, filename):
print(f"Skipping {rel_path}: does not match include/exclude patterns")
continue
# Read content
try:
with open(abs_path, "r", encoding="utf-8-sig") as f:
content = f.read()
files[rel_path] = content
print(f"Added {rel_path} ({file_size} bytes)")
except Exception as e:
print(f"Failed to read {rel_path}: {e}")
return {
"files": files,
"stats": {
"downloaded_count": len(files),
"skipped_count": len(skipped_files),
"skipped_files": skipped_files,
"base_path": None,
"include_patterns": include_patterns,
"exclude_patterns": exclude_patterns,
"source": "ssh_clone"
}
}
# Detect platform (GitHub or GitLab)
parsed_url = urlparse(repo_url)
is_gitlab = 'gitlab' in parsed_url.netloc.lower()
if is_gitlab:
return _crawl_gitlab_files(repo_url, token, max_file_size, use_relative_paths, include_patterns, exclude_patterns, should_include_file)
else:
return _crawl_github_files(repo_url, token, max_file_size, use_relative_paths, include_patterns, exclude_patterns, should_include_file)
def _crawl_github_files(repo_url, token, max_file_size, use_relative_paths, include_patterns, exclude_patterns, should_include_file):
"""Handle GitHub repository crawling"""
# Parse GitHub URL to extract owner, repo, commit/branch, and path
parsed_url = urlparse(repo_url)
path_parts = parsed_url.path.strip('/').split('/')
if len(path_parts) < 2:
raise ValueError(f"Invalid GitHub URL: {repo_url}")
# Extract the basic components
owner = path_parts[0]
repo = path_parts[1]
# Setup for GitHub API
headers = {"Accept": "application/vnd.github.v3+json"}
if token:
headers["Authorization"] = f"token {token}"
def fetch_branches(owner: str, repo: str):
"""Get branches of the repository"""
url = f"https://api.github.com/repos/{owner}/{repo}/branches"
response = requests.get(url, headers=headers)
if response.status_code == 404:
if not token:
print(f"Error 404: Repository not found or is private.\n"
f"If this is a private repository, please provide a valid GitHub token via the 'token' argument or set the GITHUB_TOKEN environment variable.")
else:
print(f"Error 404: Repository not found or insufficient permissions with the provided token.\n"
f"Please verify the repository exists and the token has access to this repository.")
return []
if response.status_code != 200:
print(f"Error fetching the branches of {owner}/{repo}: {response.status_code} - {response.text}")
return []
return response.json()
def check_tree(owner: str, repo: str, tree: str):
"""Check the repository has the given tree"""
url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/{tree}"
response = requests.get(url, headers=headers)
return True if response.status_code == 200 else False
# Check if URL contains a specific branch/commit
if len(path_parts) > 2 and 'tree' == path_parts[2]:
join_parts = lambda i: '/'.join(path_parts[i:])
branches = fetch_branches(owner, repo)
branch_names = map(lambda branch: branch.get("name"), branches)
# Fetching branches is not successfully
if len(branches) == 0:
return
# To check branch name
relevant_path = join_parts(3)
# Find a match with relevant path and get the branch name
filter_gen = (name for name in branch_names if relevant_path.startswith(name))
ref = next(filter_gen, None)
# If match is not found, check for is it a tree
if ref == None:
tree = path_parts[3]
ref = tree if check_tree(owner, repo, tree) else None
# If it is neither a tree nor a branch name
if ref == None:
print(f"The given path does not match with any branch and any tree in the repository.\n"
f"Please verify the path is exists.")
return
# Combine all parts after the ref as the path
part_index = 5 if '/' in ref else 4
specific_path = join_parts(part_index) if part_index < len(path_parts) else ""
else:
# Don't put the ref param to query
# and let Github decide default branch
ref = None
specific_path = ""
# Dictionary to store path -> content mapping
files = {}
skipped_files = []
def fetch_contents(path):
"""Fetch contents of the repository at a specific path and commit"""
url = f"https://api.github.com/repos/{owner}/{repo}/contents/{path}"
params = {"ref": ref} if ref != None else {}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 403 and 'rate limit exceeded' in response.text.lower():
reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
wait_time = max(reset_time - time.time(), 0) + 1
print(f"Rate limit exceeded. Waiting for {wait_time:.0f} seconds...")
time.sleep(wait_time)
return fetch_contents(path)
if response.status_code == 404:
if not token:
print(f"Error 404: Repository not found or is private.\n"
f"If this is a private repository, please provide a valid GitHub token via the 'token' argument or set the GITHUB_TOKEN environment variable.")
elif not path and ref == 'main':
print(f"Error 404: Repository not found. Check if the default branch is not 'main'\n"
f"Try adding branch name to the request i.e. python main.py --repo https://github.com/username/repo/tree/master")
else:
print(f"Error 404: Path '{path}' not found in repository or insufficient permissions with the provided token.\n"
f"Please verify the token has access to this repository and the path exists.")
return
if response.status_code != 200:
print(f"Error fetching {path}: {response.status_code} - {response.text}")
return
contents = response.json()
# Handle both single file and directory responses
if not isinstance(contents, list):
contents = [contents]
for item in contents:
item_path = item["path"]
# Calculate relative path if requested
if use_relative_paths and specific_path:
# Make sure the path is relative to the specified subdirectory
if item_path.startswith(specific_path):
rel_path = item_path[len(specific_path):].lstrip('/')
else:
rel_path = item_path
else:
rel_path = item_path
if item["type"] == "file":
# Check if file should be included based on patterns
if not should_include_file(rel_path, item["name"]):
print(f"Skipping {rel_path}: Does not match include/exclude patterns")
continue
# Check file size if available
file_size = item.get("size", 0)
if file_size > max_file_size:
skipped_files.append((item_path, file_size))
print(f"Skipping {rel_path}: File size ({file_size} bytes) exceeds limit ({max_file_size} bytes)")
continue
# For files, get raw content
if "download_url" in item and item["download_url"]:
file_url = item["download_url"]
file_response = requests.get(file_url, headers=headers)
# Final size check in case content-length header is available but differs from metadata
content_length = int(file_response.headers.get('content-length', 0))
if content_length > max_file_size:
skipped_files.append((item_path, content_length))
print(f"Skipping {rel_path}: Content length ({content_length} bytes) exceeds limit ({max_file_size} bytes)")
continue
if file_response.status_code == 200:
files[rel_path] = file_response.text
print(f"Downloaded: {rel_path} ({file_size} bytes) ")
else:
print(f"Failed to download {rel_path}: {file_response.status_code}")
else:
# Alternative method if download_url is not available
content_response = requests.get(item["url"], headers=headers)
if content_response.status_code == 200:
content_data = content_response.json()
if content_data.get("encoding") == "base64" and "content" in content_data:
# Check size of base64 content before decoding
if len(content_data["content"]) * 0.75 > max_file_size: # Approximate size calculation
estimated_size = int(len(content_data["content"]) * 0.75)
skipped_files.append((item_path, estimated_size))
print(f"Skipping {rel_path}: Encoded content exceeds size limit")
continue
file_content = base64.b64decode(content_data["content"]).decode('utf-8')
files[rel_path] = file_content
print(f"Downloaded: {rel_path} ({file_size} bytes)")
else:
print(f"Unexpected content format for {rel_path}")
else:
print(f"Failed to get content for {rel_path}: {content_response.status_code}")
elif item["type"] == "dir":
# Recursively process subdirectories
fetch_contents(item_path)
# Start crawling from the specified path
fetch_contents(specific_path)
return {
"files": files,
"stats": {
"downloaded_count": len(files),
"skipped_count": len(skipped_files),
"skipped_files": skipped_files,
"base_path": specific_path if use_relative_paths else None,
"include_patterns": include_patterns,
"exclude_patterns": exclude_patterns,
"source": "github_api"
}
}
def _crawl_gitlab_files(repo_url, token, max_file_size, use_relative_paths, include_patterns, exclude_patterns, should_include_file):
"""Handle GitLab repository crawling with fixed recursive tree traversal"""
# Parse GitLab URL to extract project ID/path, ref, and path
parsed_url = urlparse(repo_url)
# Remove leading/trailing slashes and split
path_parts = [part for part in parsed_url.path.strip('/').split('/') if part]
if len(path_parts) < 2:
raise ValueError(f"Invalid GitLab URL: {repo_url}")
# Find the tree separator in GitLab URLs (/-/tree/)
tree_index = None
for i, part in enumerate(path_parts):
if part == '-' and i + 1 < len(path_parts) and path_parts[i + 1] == 'tree':
tree_index = i
break
if tree_index is not None:
# Extract project path (everything before /-/tree/)
project_path = '/'.join(path_parts[:tree_index])
# Extract ref (branch/commit after tree)
if tree_index + 2 < len(path_parts):
ref = path_parts[tree_index + 2]
# Extract file path (everything after ref)
if tree_index + 3 < len(path_parts):
specific_path = '/'.join(path_parts[tree_index + 3:])
else:
specific_path = ""
else:
ref = "main" # Default branch
specific_path = ""
else:
# No tree structure, assume it's just the project path
project_path = '/'.join(path_parts)
ref = "main" # Default branch
specific_path = ""
# URL encode the project path for GitLab API
project_id = urllib.parse.quote(project_path, safe='')
# GitLab API base URL
gitlab_host = parsed_url.netloc
base_url = f"https://{gitlab_host}/api/v4"
# Setup headers
headers = {}
if token:
headers["Authorization"] = f"Bearer {token}"
# Dictionary to store path -> content mapping
files = {}
skipped_files = []
def fetch_tree_non_recursive(path=""):
"""
Fetch repository tree from GitLab API without using recursive=true
(due to GitLab API bug where recursive=true only returns directories)
"""
url = f"{base_url}/projects/{project_id}/repository/tree"
params = {
"ref": ref,
"per_page": 100, # Increase per_page to reduce API calls
}
if path:
params["path"] = path
all_items = []
page = 1
while True:
params["page"] = page
response = requests.get(url, headers=headers, params=params)
if response.status_code == 401:
print(f"Error 401: Unauthorized access to GitLab repository.\n"
f"Please provide a valid GitLab access token via the 'token' argument or set the GITLAB_TOKEN environment variable.")
return []
elif response.status_code == 404:
print(f"Error 404: GitLab repository not found or path doesn't exist.\n"
f"Please verify the repository URL and path are correct.")
return []
elif response.status_code != 200:
print(f"Error fetching GitLab tree: {response.status_code} - {response.text}")
return []
page_items = response.json()
if not page_items: # No more items
break
all_items.extend(page_items)
# Check if we have more pages
if len(page_items) < params["per_page"]:
break
page += 1
return all_items
def fetch_file_content(file_path):
"""Fetch file content from GitLab API"""
encoded_path = urllib.parse.quote(file_path, safe='')
url = f"{base_url}/projects/{project_id}/repository/files/{encoded_path}/raw"
params = {"ref": ref}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
return response.text
else:
print(f"Failed to fetch content for {file_path}: {response.status_code}")
return None
def get_file_size(file_path):
"""Get file size from GitLab API"""
encoded_path = urllib.parse.quote(file_path, safe='')
url = f"{base_url}/projects/{project_id}/repository/files/{encoded_path}"
params = {"ref": ref}
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
file_info = response.json()
return file_info.get("size", 0)
return 0
def process_directory_recursively(dir_path=""):
"""Recursively process directories to get all files"""
print(f"Processing directory: {dir_path if dir_path else 'root'}")
tree_items = fetch_tree_non_recursive(dir_path)
for item in tree_items:
item_path = item["path"]
item_name = item["name"]
item_type = item["type"]
# Calculate relative path if requested
if use_relative_paths and specific_path:
if item_path.startswith(specific_path):
rel_path = item_path[len(specific_path):].lstrip('/')
else:
rel_path = item_path
else:
rel_path = item_path
if item_type == "blob": # File
# Check if file should be included based on patterns
if not should_include_file(rel_path, item_name):
print(f"Skipping {rel_path}: Does not match include/exclude patterns")
continue
# Check file size
file_size = get_file_size(item_path)
if file_size > max_file_size:
skipped_files.append((item_path, file_size))
print(f"Skipping {rel_path}: File size ({file_size} bytes) exceeds limit ({max_file_size} bytes)")
continue
# Fetch file content
content = fetch_file_content(item_path)
if content is not None:
files[rel_path] = content
print(f"Downloaded: {rel_path} ({file_size} bytes)")
elif item_type == "tree": # Directory
# Recursively process subdirectory
process_directory_recursively(item_path)
# Start processing from the specified path
process_directory_recursively(specific_path)
return {
"files": files,
"stats": {
"downloaded_count": len(files),
"skipped_count": len(skipped_files),
"skipped_files": skipped_files,
"base_path": specific_path if use_relative_paths else None,
"include_patterns": include_patterns,
"exclude_patterns": exclude_patterns,
"source": "gitlab_api_fixed"
}
}
# Example usage
if __name__ == "__main__":
# Get tokens from environment variables
gitlab_token = os.environ.get("GITLAB_TOKEN")
# Example GitLab URL - testing with a smaller project
gitlab_url = "https://gitlab.com/gitlab-org/gitlab-runner/-/tree/main/docs"
print("Testing GitLab repository with fixed crawler...")
result = crawl_github_files(
gitlab_url,
token=gitlab_token,
max_file_size=1 * 1024 * 1024, # 1 MB in bytes
use_relative_paths=True,
include_patterns={"*.md"}, # Only markdown files for testing
)
print(f"GitLab - Downloaded {result['stats']['downloaded_count']} files.")
print(f"GitLab - Skipped {result['stats']['skipped_count']} files.")
print(f"GitLab - Source: {result['stats']['source']}")
# Display sample files
if result["files"]:
print("\nSample files:")
for i, file_path in enumerate(sorted(result["files"].keys())[:5]):
content_preview = result["files"][file_path][:100].replace('\n', ' ')
print(f" {file_path}: {content_preview}...")
else:
print("\nNo files downloaded. Check the repository URL and token.") |
Beta Was this translation helpful? Give feedback.
All reactions
-
Great news!
This little detail makes everything work smoothly and without any hiccups. Thank you 🙏 |
Beta Was this translation helpful? Give feedback.
All reactions
-
Could you please include this code in the next release? |
Beta Was this translation helpful? Give feedback.
All reactions
-
I would also like to know if it is possible to add the option to choose the language for creating documentation? |
Beta Was this translation helpful? Give feedback.
All reactions
-
You can choose the language by using For example:
|
Beta Was this translation helpful? Give feedback.
All reactions
-
❤️ 1
-
Thanks a lot for the quick and clear response! I’ll definitely try using the --language option for generating documentation in different languages. That’s exactly what I needed. 😊 |
Beta Was this translation helpful? Give feedback.
All reactions
-
It would be super helpful if there was an option to select the language directly in the interface instead of using the terminal.😊 |
Beta Was this translation helpful? Give feedback.
All reactions
-
👍 1
Uh oh!
There was an error while loading. Please reload this page.
-
Currently, PocketFlow-Tutorial-Codebase-Knowledge only works with GitHub repositories.
It would be very useful to add support for GitLab as well, so users can generate tutorials and analyze discussions from both platforms.
Beta Was this translation helpful? Give feedback.
All reactions