From 5120e06a7898fb79b5bb7a2347a7fbd6388af52d Mon Sep 17 00:00:00 2001 From: "Andres D. Molins" Date: Mon, 15 Sep 2025 12:22:49 +0200 Subject: [PATCH 1/2] Feature: Implement experimental IPFS pin cleaner. --- deployment/docker-build/cleaner.dockerfile | 51 +++++ deployment/scripts/ipfs_pin_cleaner.py | 232 +++++++++++++++++++++ 2 files changed, 283 insertions(+) create mode 100644 deployment/docker-build/cleaner.dockerfile create mode 100644 deployment/scripts/ipfs_pin_cleaner.py diff --git a/deployment/docker-build/cleaner.dockerfile b/deployment/docker-build/cleaner.dockerfile new file mode 100644 index 000000000..9d7b573b5 --- /dev/null +++ b/deployment/docker-build/cleaner.dockerfile @@ -0,0 +1,51 @@ +FROM ubuntu:24.04 as base + +ENV DEBIAN_FRONTEND noninteractive + +RUN apt-get update && apt-get -y upgrade && apt-get install -y software-properties-common +RUN add-apt-repository -y ppa:deadsnakes/ppa + +# Runtime + build packages +RUN apt-get update && apt-get -y upgrade && apt-get install -y \ + libpq5 \ + python3.12 + +FROM base as builder + +# Build-only packages +RUN apt-get update && apt-get install -y \ + build-essential \ + python3.12-dev \ + python3.12-venv \ + libpq-dev + +# Create virtualenv +RUN python3.12 -m venv /opt/venv + +# Install pip +ENV PIP_NO_CACHE_DIR yes +RUN /opt/venv/bin/python3.12 -m pip install --upgrade pip wheel +ENV PATH="/opt/venv/bin:${PATH}" + +# Install only the minimal dependencies needed for ipfs_pin_cleaner.py +RUN pip install \ + aioipfs~=0.7.1 \ + asyncpg==0.30 + +FROM base + +RUN groupadd -g 1000 -o aleph +RUN useradd -s /bin/bash -u 1000 -g 1000 -o aleph + +COPY --from=builder --chown=aleph /opt/venv /opt/venv + +# Copy only the ipfs_pin_cleaner.py script +RUN mkdir -p /opt/cleaner/deployment/scripts +COPY --chown=aleph ./ipfs_pin_cleaner.py /opt/cleaner/deployment/scripts/ + +ENV PATH="/opt/venv/bin:${PATH}" +WORKDIR /opt/cleaner +USER aleph + +# Default entrypoint to run the cleaner script +ENTRYPOINT ["python3.12", "deployment/scripts/ipfs_pin_cleaner.py"] diff --git a/deployment/scripts/ipfs_pin_cleaner.py b/deployment/scripts/ipfs_pin_cleaner.py new file mode 100644 index 000000000..8c4d12b3b --- /dev/null +++ b/deployment/scripts/ipfs_pin_cleaner.py @@ -0,0 +1,232 @@ +import asyncio +import aioipfs +import asyncpg +import argparse +import logging +import os + +# Configure logging to provide clear output +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +DATABASE_DSN="postgres://USERNAME:PASSWORD@localhost:8080/aleph" +IPFS_API="'/ip4/127.0.0.1/tcp/5001'" + +async def get_ipfs_pins(api_addr: str) -> set: + """ + Connects to an IPFS instance and retrieves a set of recursively pinned CIDs. + + Args: + api_addr: The multiaddress of the IPFS API endpoint (e.g., '/ip4/127.0.0.1/tcp/5001'). + + Returns: + A set of recursively pinned CIDs (as strings). + """ + logging.info(f"Connecting to IPFS API at {api_addr}...") + client = None + try: + client = aioipfs.AsyncIPFS(maddr=api_addr) + pins = set() + # The 'type' argument filters for recursive pins directly in the API call. + # The result is an async generator, so we iterate through it. + pin_list = await client.pin.ls(pintype='recursive', quiet=True) + pinned = list(pin_list['Keys'].keys()) + for pin in pinned: + pins.add(pin) + logging.info(f"Found {len(pins)} recursively pinned files in IPFS.") + return pins + except Exception as e: + logging.error(f"Failed to connect or retrieve pins from IPFS: {e}") + return set() + finally: + if client: + await client.close() + logging.info("IPFS client connection closed.") + + +async def get_database_hashes(dsn: str) -> set: + """ + Connects to a PostgreSQL database and retrieves a set of file hashes that should be pinned. + + Args: + dsn: The PostgreSQL connection string. + + Returns: + A set of file hashes (as strings) that should be pinned. + """ + logging.info("Connecting to PostgreSQL database...") + conn = None + try: + conn = await asyncpg.connect(dsn) + # The query provided by the user + # query = """ + # SELECT f.hash FROM file_pins fp + # INNER JOIN files f ON f.hash = fp.file_hash + # INNER JOIN messages m ON m.item_hash = fp.item_hash + # WHERE m."type" = 'STORE' and m."content"->>'item_type' = 'ipfs' \ + # """ + query = """ + SELECT f.hash FROM files f + WHERE f.hash like 'Qm%' or f.hash like 'bafkrei%' \ + """ + rows = await conn.fetch(query) + hashes = {row['hash'] for row in rows} + logging.info(f"Found {len(hashes)} files that should be pinned in the database.") + return hashes + except Exception as e: + logging.error(f"Failed to connect or query the database: {e}") + return set() + finally: + if conn: + await conn.close() + logging.info("Database connection closed.") + + +async def unpin_files(api_addr: str, cids_to_unpin: list): + """ + Removes pins for a given list of CIDs from the IPFS node. + + Args: + api_addr: The multiaddress of the IPFS API endpoint. + cids_to_unpin: A list of CID strings to unpin. + """ + if not cids_to_unpin: + logging.info("No files to unpin.") + return + + logging.info(f"Connecting to IPFS API at {api_addr} to unpin files...") + client = None + try: + client = aioipfs.AsyncIPFS(maddr=api_addr) + for cid in cids_to_unpin: + try: + logging.warning(f"Unpinning {cid}...") + await client.pin.rm(cid) + logging.info(f"Successfully unpinned {cid}.") + except Exception as e: + logging.error(f"Failed to unpin {cid}: {e}") + except Exception as e: + logging.error(f"Failed to connect to IPFS for unpinning: {e}") + finally: + if client: + await client.close() + logging.info("IPFS client connection closed after unpinning.") + +async def pin_files(api_addr: str, cids_to_pin: list): + """ + Pins a given list of CIDs to the IPFS node. + + Args: + api_addr: The multiaddress of the IPFS API endpoint. + cids_to_pin: A list of CID strings to pin. + """ + if not cids_to_pin: + logging.info("No files to pin.") + return + + logging.info(f"Connecting to IPFS API at {api_addr} to pin files...") + client = None + try: + client = aioipfs.AsyncIPFS(maddr=api_addr) + for cid in cids_to_pin: + try: + logging.info(f"Pinning {cid}...") + # The 'add' method pins recursively by default + async for cid_pin in client.pin.add(cid): + print('Pin progress', cid_pin['Progress']) + logging.info(f"Successfully pinned {cid}.") + except Exception as e: + logging.error(f"Failed to pin {cid}: {e}") + except Exception as e: + logging.error(f"Failed to connect to IPFS for pinning: {e}") + finally: + if client: + await client.close() + logging.info("IPFS client connection closed after pinning.") + + +async def main(): + """ + Main function to orchestrate the IPFS pin synchronization process. + """ + parser = argparse.ArgumentParser( + description="Compares IPFS pins with a database record and optionally syncs the state." + ) + # IPFS arguments + parser.add_argument( + '--ipfs-api', + default=os.getenv('IPFS_API', IPFS_API), + help="IPFS API multiaddress (default: /ip4/127.0.0.1/tcp/5001)" + ) + # PostgreSQL arguments from environment variables for security + parser.add_argument( + '--db-dsn', + default=os.getenv('DATABASE_DSN', DATABASE_DSN), + help="PostgreSQL DSN (e.g., 'postgres://user:pass@host:port/dbname'). " + "Can also be set via DATABASE_DSN environment variable." + ) + # Action arguments + parser.add_argument( + '--unpin', + action='store_true', + help="Actually perform the unpinning of files. Default is a dry run." + ) + parser.add_argument( + '--pin', + action='store_true', + help="Actually perform the pinning of missing files. Default is a dry run." + ) + args = parser.parse_args() + + if not args.db_dsn: + logging.error("Database DSN must be provided via --db-dsn argument or DATABASE_DSN environment variable.") + return + + # Get the two sets of hashes/CIDs + ipfs_pins = await get_ipfs_pins(args.ipfs_api) + db_hashes = await get_database_hashes(args.db_dsn) + + if not ipfs_pins and not db_hashes: + logging.warning("Both IPFS and database checks returned empty sets. Exiting.") + return + + # --- 1. Check for files in IPFS that should be UNPINNED --- + pins_to_remove = ipfs_pins - db_hashes + + if not pins_to_remove: + logging.info("All pinned files are correctly referenced in the database.") + else: + logging.warning(f"Found {len(pins_to_remove)} files to UNPIN (in IPFS, not in DB):") + for cid in pins_to_remove: + print(f" - {cid}") + + if args.unpin: + logging.info("--- UNPINNING ENABLED ---") + await unpin_files(args.ipfs_api, list(pins_to_remove)) + logging.info("--- UNPINNING PROCESS COMPLETE ---") + else: + logging.info("-> This was a dry run. Use --unpin to remove them.") + + print("-" * 50) + + # --- 2. Check for files in DB that should be PINNED --- + hashes_to_add = db_hashes - ipfs_pins + + if not hashes_to_add: + logging.info("All necessary files from the database are already pinned in IPFS.") + else: + for cid in hashes_to_add: + print(f" + {cid}") + + if args.pin: + logging.info("--- PINNING ENABLED ---") + await pin_files(args.ipfs_api, list(hashes_to_add)) + logging.info("--- PINNING PROCESS COMPLETE ---") + else: + logging.info("-> This was a dry run. Use --pin to add them.") + + logging.warning(f"Found {len(pins_to_remove)} files to UNPIN (in IPFS, not in DB):") + logging.warning(f"Found {len(hashes_to_add)} files to PIN (in DB, not in IPFS):") + + +if __name__ == "__main__": + asyncio.run(main()) From dafb57d18e29c751ad276efec6769a52aac78b6f Mon Sep 17 00:00:00 2001 From: "Andres D. Molins" Date: Mon, 15 Sep 2025 13:38:07 +0200 Subject: [PATCH 2/2] Feature: Added script to automatically execute all commands. --- deployment/scripts/run_ipfs_cleanup.sh | 102 +++++++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 deployment/scripts/run_ipfs_cleanup.sh diff --git a/deployment/scripts/run_ipfs_cleanup.sh b/deployment/scripts/run_ipfs_cleanup.sh new file mode 100644 index 000000000..fd3d3fefb --- /dev/null +++ b/deployment/scripts/run_ipfs_cleanup.sh @@ -0,0 +1,102 @@ +#!/bin/bash + +# Exit immediately if a command exits with a non-zero status. +set -e + +# --- Initial Setup & User Input --- +echo "This script will clean unpinned IPFS objects and reclaim disk space." +echo "--------------------------------------------------------------------" +read -p "Enter your DATABASE_USERNAME (default: aleph): " DB_USER +DB_USER=${DB_USER:-aleph} + +read -s -p "Enter your DATABASE_PASSWORD: " DB_PASS +echo +if [ -z "$DB_PASS" ]; then + echo "โŒ Error: Database password cannot be empty." + exit 1 +fi + +# --- 1. Measure Initial Space Usage --- +echo -e "\n๐Ÿ“Š Checking initial disk space usage..." +IPFS_CONTAINER=$(docker ps -a --format "{{.Names}}" | grep ipfs | head -n 1) +if [ -z "$IPFS_CONTAINER" ]; then + echo "โŒ Error: Could not find the IPFS container." + exit 1 +fi + +# Find the volume name mounted to the IPFS container +IPFS_VOLUME=$(docker inspect -f '{{range .Mounts}}{{if eq .Destination "/data/ipfs"}}{{.Name}}{{end}}{{end}}' "$IPFS_CONTAINER") +if [ -z "$IPFS_VOLUME" ]; then + echo "โŒ Error: Could not find the IPFS data volume for container '$IPFS_CONTAINER'." + exit 1 +fi + +# Use 'docker system df -v' to find the volume's reported size +# We grep for the volume name and get the last column containing the size +INITIAL_SIZE_HR=$(docker system df -v | grep -A 9999 "VOLUME" | grep -w "$IPFS_VOLUME" | awk '{print $NF}') + +if [ -z "$INITIAL_SIZE_HR" ]; then + echo " - โš ๏ธ Warning: Could not determine initial size from 'docker system df'." + INITIAL_SIZE_HR="N/A" +fi +echo " - IPFS Volume: $IPFS_VOLUME" +echo " - Initial Size (from docker df): $INITIAL_SIZE_HR" + +# --- 2. Download Files --- +echo -e "\nโฌ‡๏ธ Downloading Dockerfile and cleaner script..." +wget -q --show-progress -O cleaner.dockerfile "https://raw.githubusercontent.com/aleph-im/pyaleph/refs/heads/andres-feature-implement_experimental_ipfs_pin_cleaner/deployment/docker-build/cleaner.dockerfile" +wget -q --show-progress -O ipfs_pin_cleaner.py "https://raw.githubusercontent.com/aleph-im/pyaleph/refs/heads/andres-feature-implement_experimental_ipfs_pin_cleaner/deployment/scripts/ipfs_pin_cleaner.py" + +# --- 3. Build Docker Image --- +echo -e "\n๐Ÿ› ๏ธ Building 'ipfs-pin-cleaner' Docker image..." +docker build -f cleaner.dockerfile -t ipfs-pin-cleaner . > /dev/null +echo " - Image built successfully." + +# --- 4. Stop Containers --- +echo -e "\n๐Ÿ›‘ Stopping non-essential containers..." +docker-compose stop pyaleph pyaleph-api p2p-service rabbitmq redis + +# --- 5. Get Network and IPFS Info --- +echo -e "\n๐Ÿ”Ž Identifying network and IPFS container details..." +PYALEPH_NETWORK=$(docker network list --format "{{.Name}}" | grep pyaleph | head -n 1) +IPFS_IP=$(docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' "$IPFS_CONTAINER") +echo " - Network: $PYALEPH_NETWORK, IPFS IP: $IPFS_IP" + +# --- 6. Run IPFS Pin Cleaner --- +echo -e "\n๐Ÿงน Running the IPFS pin cleaner (this may take a while)..." +docker run --rm --network "$PYALEPH_NETWORK" \ + -e DATABASE_DSN="postgres://${DB_USER}:${DB_PASS}@postgres:5432/aleph" \ + -e IPFS_API="/ip4/${IPFS_IP}/tcp/5001" \ + ipfs-pin-cleaner --unpin + +# --- 7. Execute IPFS Garbage Collector --- +echo -e "\n๐Ÿ—‘๏ธ Executing IPFS garbage collector..." +docker exec -it "$IPFS_CONTAINER" ipfs repo gc + +# --- 8. Measure Final Space --- +echo -e "\n๐Ÿ“Š Checking final disk space usage..." +# A small sleep can give Docker's daemon time to update its disk usage stats +sleep 5 +FINAL_SIZE_HR=$(docker system df -v | grep -A 9999 "VOLUME" | grep -w "$IPFS_VOLUME" | awk '{print $NF}') + +if [ -z "$FINAL_SIZE_HR" ]; then + echo " - โš ๏ธ Warning: Could not determine final size from 'docker system df'." + FINAL_SIZE_HR="N/A" +fi +echo " - Final Size (from docker df): $FINAL_SIZE_HR" + +# --- 9. Restart All Containers --- +echo -e "\n๐Ÿš€ Starting all services..." +docker-compose up -d + +# --- 10. Cleanup --- +echo -e "\nโœจ Cleaning up temporary files..." +rm cleaner.dockerfile ipfs_pin_cleaner.py + +# --- Final Summary --- +echo -e "\n------------------- SUMMARY -------------------" +echo -e "Initial size reported: \033[1;31m$INITIAL_SIZE_HR\033[0m" +echo -e "Final size reported: \033[1;32m$FINAL_SIZE_HR\033[0m" +echo -e "\nโ„น๏ธ Compare the values above to see the reclaimed space." +echo "-----------------------------------------------" +echo "โœ… All tasks finished successfully!"