diff --git a/.github/workflows/python_test.yml b/.github/workflows/python_test.yml new file mode 100644 index 0000000..36e2643 --- /dev/null +++ b/.github/workflows/python_test.yml @@ -0,0 +1,37 @@ +name: Python Unit Tests + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + workflow_dispatch: + +jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ['3.10', '3.11', '3.12', '3.13'] + fail-fast: false + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install uv + run: | + curl -LsSf https://astral.sh/uv/install.sh | sh + echo "$HOME/.cargo/bin" >> $GITHUB_PATH + + - name: Install dependencies + run: | + uv sync + + - name: Run tests + run: | + uv run -m pytest src -v diff --git a/README.md b/README.md index 4505872..6dc5dae 100644 --- a/README.md +++ b/README.md @@ -314,3 +314,8 @@ It's also possible to pass the result of SQL query as a CSV file, e.g., an Athen ... ``` +## Part Row Group Test + +This repository also includes a tool to check whether the row groups in a given table part parquet file have strictly increasing min/max values - see `util/are_part_min_max_increasing.py`. + +Note that this tool only checks that, within a single `.parquet` file, each row group's `.min` is greater than the previous row group's `.max`; further this is restricted to a single parquet file - the table as a whole is not checked. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..57b29b1 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,11 @@ +[project] +name = "cc-index-table" +version = "0.1.0" +description = "Tools for working with Common Crawl index tables." +requires-python = ">=3.12" +dependencies = [ + "boto3>=1.40.61", + "pyarrow>=22.0.0", + "pytest>=8.4.2", + "tqdm>=4.67.1", +] diff --git a/src/util/__init__.py b/src/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/util/are_part_min_max_increasing.py b/src/util/are_part_min_max_increasing.py new file mode 100644 index 0000000..2741b89 --- /dev/null +++ b/src/util/are_part_min_max_increasing.py @@ -0,0 +1,87 @@ +from collections import defaultdict + +import pyarrow.parquet as pq +import argparse + +from urllib.parse import urlparse +import boto3 +import gzip +from tqdm.auto import tqdm + + +def are_parquet_file_row_groups_min_max_ordered(pf: pq.ParquetFile, column_name: str) -> bool: + sort_column_index = next(i for i, name in enumerate(pf.schema.names) + if name == column_name) + + # keep track of min/max in this ParquetFile + whole_min = None + whole_max = None + prev_max = None + for row_group_index in range(pf.num_row_groups): + row_group = pf.metadata.row_group(row_group_index) + column = row_group.column(sort_column_index) + if prev_max is not None and prev_max > column.statistics.min: + print(f"row group {row_group_index} min is not strictly increasing w.r.t previous row group max on {column_name}: '{column.statistics.min}' <= '{prev_max}' ; stopping") + return False + whole_min = column.statistics.min if whole_min is None else min(column.statistics.min, whole_min) + whole_max = column.statistics.max if whole_max is None else max(column.statistics.max, whole_max) + prev_max = column.statistics.max + return True + + +def are_all_parts_min_max_ordered(file_or_s3_url_list: list[str], sort_column_name: str) -> bool: + is_sorted = True + status = defaultdict(int) + with tqdm(file_or_s3_url_list) as pbar: + for file_or_url in pbar: + pf = pq.ParquetFile(file_or_url) + this_is_sorted = are_parquet_file_row_groups_min_max_ordered(pf, column_name=sort_column_name) + if not this_is_sorted: + print( + f"Row groups are *internally* not ordered by min/max in file {file_or_url}" + ) + is_sorted = False + status['internally_unsorted'] += 1 + + pbar.set_postfix(status) + return is_sorted + + +def is_gzip(content: bytes) -> bool: + return content[:2] == b'\x1f\x8b' + + +def read_file_list(path_or_url: str, prefix: str) -> list[str]: + parsed = urlparse(path_or_url) + if parsed.scheme == "s3": + s3 = boto3.client("s3") + bucket = parsed.netloc + key = parsed.path.lstrip("/") + obj = s3.get_object(Bucket=bucket, Key=key) + content = obj["Body"].read() + else: + with open(path_or_url, "rb") as f: + content = f.read() + + if is_gzip(content): + content = gzip.decompress(content) + lines = content.decode("utf-8").split("\n") + return [prefix + line.strip() for line in lines if len(line.strip()) > 0] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Check if row groups within parquet files have strictly increasing non-overlapping min/max ranges. Exit code is 0 if sorted, 1 if not sorted.") + parser.add_argument("files_or_s3_urls_file", type=str, help="path or s3:// URI to a text file containing a list of paths, to check; used in combination with --prefix to recover individual file paths.") + parser.add_argument("--prefix", type=str, default="s3://commoncrawl/", help="Prefix to prepend to entries read from the file (default: 's3://commoncrawl/')") + parser.add_argument("--column", type=str, default="url_surtkey", help="Column name to check against (default: 'url_surtkey')") + + args = parser.parse_args() + + files = read_file_list(args.files_or_s3_urls_file, prefix=args.prefix) + is_sorted = are_all_parts_min_max_ordered(files, sort_column_name=args.column) + if is_sorted: + print("✅ Files are sorted") + exit(0) + else: + print("❌ Files are NOT sorted") + exit(1) diff --git a/src/util/test/__init__.py b/src/util/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/util/test/test_are_part_min_max_increasing.py b/src/util/test/test_are_part_min_max_increasing.py new file mode 100644 index 0000000..08a6ad3 --- /dev/null +++ b/src/util/test/test_are_part_min_max_increasing.py @@ -0,0 +1,93 @@ +import random +from unittest.mock import MagicMock, patch + +from util.are_part_min_max_increasing import are_parquet_file_row_groups_min_max_ordered, are_all_parts_min_max_ordered + + +def _create_mock_parquet_file(column_name: str, row_groups_stats: list[tuple[str, str]]): + mock_pf = MagicMock() + mock_pf.schema.names = [column_name] + mock_pf.num_row_groups = len(row_groups_stats) + + mock_row_groups = [] + for min_val, max_val in row_groups_stats: + mock_row_group = MagicMock() + mock_column = MagicMock() + mock_column.statistics.min = min_val + mock_column.statistics.max = max_val + mock_row_group.column.return_value = mock_column + mock_row_groups.append(mock_row_group) + + mock_pf.metadata.row_group.side_effect = lambda i: mock_row_groups[i] + return mock_pf + + +def test_single_row_group_sorted(): + mock_pf = _create_mock_parquet_file('url_surtkey', [('a', 'b')]) + is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') + assert is_sorted + + +def test_row_groups_sorted(): + all_row_groups_stats = [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h')] + for n in range(1, len(all_row_groups_stats)): + row_groups_stats = all_row_groups_stats[:n] + mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats) + is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') + assert is_sorted + + +def test_row_groups_unsorted(): + all_row_groups_stats = [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h')] + count = 0 + while count < 100: + for n in range(2, len(all_row_groups_stats)): + row_groups_stats = all_row_groups_stats[:n].copy() + random.shuffle(row_groups_stats) + if row_groups_stats == all_row_groups_stats[:n]: + # shuffle resulted in same order, try again + continue + + mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats) + is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') + assert not is_sorted + + count += 1 + + +def test_row_groups_overlapping(): + row_groups = [('a', 'c'), ('b', 'd')] + mock_pf = _create_mock_parquet_file('url_surtkey', row_groups) + is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') + assert not is_sorted + + +def test_ordered_files_sorted(): + files_config = { + '/data/a': [('aaa', 'bbb'), ('bbc', 'ccc')], + '/data/b': [('ccd', 'ddd'), ('dde', 'eee')], + '/data/c': [('eef', 'fff'), ('ffg', 'ggg')], + } + + def mock_parquet_file(path): + return _create_mock_parquet_file('url_surtkey', files_config[path]) + + with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file): + result = are_all_parts_min_max_ordered(['/data/a', '/data/b', '/data/c'], 'url_surtkey') + assert result + + +def test_ordered_files_unsorted(): + files_config = { + '/data/a': [('aaa', 'bbb'), ('bbc', 'ccc')], + '/data/b': [('ccd', 'ddd'), ('dde', 'eee')], + '/data/c': [('eef', 'fff'), ('ffg', 'ggg')], + } + + def mock_parquet_file(path): + return _create_mock_parquet_file('url_surtkey', files_config[path]) + + with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file): + result = are_all_parts_min_max_ordered(['/data/a', '/data/c', '/data/b'], 'url_surtkey') + assert result # we don't care about the order of files +