From 4d4038cf05bed64a35398f21d1bda237dcf06058 Mon Sep 17 00:00:00 2001 From: Damian Stewart Date: Wed, 29 Oct 2025 14:23:09 +0100 Subject: [PATCH 1/9] is_table_sorted initial implementation --- pyproject.toml | 11 ++++ src/script/is_table_sorted.py | 98 ++++++++++++++++++++++++++++++++ src/test/test_is_table_sorted.py | 3 + 3 files changed, 112 insertions(+) create mode 100644 pyproject.toml create mode 100644 src/script/is_table_sorted.py create mode 100644 src/test/test_is_table_sorted.py diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e9132f4 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,11 @@ +[project] +name = "cc-index-table" +version = "0.1.0" +description = "Add your description here" +requires-python = ">=3.12" +dependencies = [ + "boto3>=1.40.61", + "pyarrow>=22.0.0", + "pytest>=8.4.2", + "tqdm>=4.67.1", +] diff --git a/src/script/is_table_sorted.py b/src/script/is_table_sorted.py new file mode 100644 index 0000000..7b16766 --- /dev/null +++ b/src/script/is_table_sorted.py @@ -0,0 +1,98 @@ +from collections import defaultdict + +import pyarrow.parquet as pq +import argparse + +from urllib.parse import urlparse +from urllib.request import urlopen +import boto3 +import gzip +from tqdm.auto import tqdm + + +def are_parquet_file_row_groups_sorted(pf: pq.ParquetFile, column_name: str) -> bool: + sort_column_index = next(i for i, name in enumerate(pf.schema.names) + if name == column_name) + + # keep track of min/max in this ParquetFile + whole_min = None + whole_max = None + prev_max = None + for row_group_index in range(pf.num_row_groups): + row_group = pf.metadata.row_group(row_group_index) + column = row_group.column(sort_column_index) + if prev_max is not None and prev_max > column.statistics.min: + # internally unsorted + print(f"row group {row_group_index} is not sorted on {column_name}: '{column.statistics.min}' <= '{prev_max}' ; stopping") + return False, None, None + whole_min = column.statistics.min if whole_min is None else column.statistics.min + whole_max = column.statistics.max if whole_max is None else column.statistics.max + return True, whole_min, whole_max + + +def is_full_table_sorted(file_or_s3_url_list_ordered: list[str], sort_column_name: str) -> bool: + is_sorted = True + prev_max = None + prev_file_or_url = None + status = defaultdict(int) + with tqdm(file_or_s3_url_list_ordered) as pbar: + for file_or_url in pbar: + pf = pq.ParquetFile(file_or_url) + this_is_sorted, pf_min, pf_max = are_parquet_file_row_groups_sorted(pf, column_name=sort_column_name) + if not this_is_sorted: + print( + f"Row groups are *internally* not sorted in file {file_or_url}" + ) + is_sorted = False + status['internally_unsorted'] += 1 + + if prev_max is not None and prev_max > pf_min: + print(f"{prev_file_or_url} is not sorted with respect to {file_or_url}: '{prev_max}' > '{pf_min}'") + status['filewise_unsorted'] += 1 + pbar.set_postfix(status) + prev_max = pf_max + prev_file_or_url = file_or_url + return is_sorted + + +def is_gzip(content: bytes) -> bool: + return content[:2] == b'\x1f\x8b' + + +def read_file_list(path_or_url: str, prefix: str) -> list[str]: + parsed = urlparse(path_or_url) + if parsed.scheme == "s3": + s3 = boto3.client("s3") + bucket = parsed.netloc + key = parsed.path.lstrip("/") + obj = s3.get_object(Bucket=bucket, Key=key) + content = obj["Body"].read() + elif parsed.scheme in ("http", "https"): + with urlopen(path_or_url) as f: + content = f.read() + else: + with open(path_or_url, "r") as f: + content = f.read() + + if is_gzip(content): + content = gzip.decompress(content) + lines = content.decode("utf-8").split("\n") + return [prefix + line.strip() for line in lines if len(line.strip()) > 0] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Check if a collection of Parquet files, considered as a whole, is sorted. Exit code is 0 if sorted, 1 if not sorted.") + parser.add_argument("files_or_s3_urls_file", type=str, help="URI or path to a text file containing a list of paths or S3 URLs, one per line, in the expected sorted order.") + parser.add_argument("--prefix", type=str, default="s3://commoncrawl/", help="Prefix to prepend to entries read from the file (default: 's3://commoncrawl/')") + parser.add_argument("--column", type=str, default="url_surtkey", help="Column name to check sorting against (default: 'url_surtkey')") + + args = parser.parse_args() + + files = read_file_list(args.files_or_s3_urls_file, prefix=args.prefix) + is_sorted = is_full_table_sorted(files, sort_column_name=args.column) + if is_sorted: + print("✅ Files are sorted") + exit(0) + else: + print("❌ Files are NOT sorted") + exit(1) diff --git a/src/test/test_is_table_sorted.py b/src/test/test_is_table_sorted.py new file mode 100644 index 0000000..1210010 --- /dev/null +++ b/src/test/test_is_table_sorted.py @@ -0,0 +1,3 @@ +import pytest + + From 0b229d9a0a943f1aa333bfc78a3f58cd2ab11e40 Mon Sep 17 00:00:00 2001 From: Damian Stewart Date: Wed, 29 Oct 2025 14:56:18 +0100 Subject: [PATCH 2/9] wip tests --- src/script/is_table_sorted.py | 4 +- src/test/test_is_table_sorted.py | 209 ++++++++++++++++++++++++++++++- 2 files changed, 210 insertions(+), 3 deletions(-) diff --git a/src/script/is_table_sorted.py b/src/script/is_table_sorted.py index 7b16766..0358d58 100644 --- a/src/script/is_table_sorted.py +++ b/src/script/is_table_sorted.py @@ -25,8 +25,8 @@ def are_parquet_file_row_groups_sorted(pf: pq.ParquetFile, column_name: str) -> # internally unsorted print(f"row group {row_group_index} is not sorted on {column_name}: '{column.statistics.min}' <= '{prev_max}' ; stopping") return False, None, None - whole_min = column.statistics.min if whole_min is None else column.statistics.min - whole_max = column.statistics.max if whole_max is None else column.statistics.max + whole_min = column.statistics.min if whole_min is None else min(column.statistics.min, whole_min) + whole_max = column.statistics.max if whole_max is None else max(column.statistics.max, whole_max) return True, whole_min, whole_max diff --git a/src/test/test_is_table_sorted.py b/src/test/test_is_table_sorted.py index 1210010..ee58eeb 100644 --- a/src/test/test_is_table_sorted.py +++ b/src/test/test_is_table_sorted.py @@ -1,3 +1,210 @@ -import pytest +from unittest.mock import MagicMock +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'script')) +from is_table_sorted import are_parquet_file_row_groups_sorted + + +def _create_mock_parquet_file(column_name: str, row_groups_stats: list[tuple]): + """ + Helper to create a mock ParquetFile with specified row group statistics. + + Args: + column_name: Name of the column to sort by + row_groups_stats: List of (min, max) tuples for each row group + """ + mock_pf = MagicMock() + mock_pf.schema.names = [column_name, 'data'] + mock_pf.num_row_groups = len(row_groups_stats) + + mock_row_groups = [] + for min_val, max_val in row_groups_stats: + mock_row_group = MagicMock() + mock_column = MagicMock() + mock_column.statistics.min = min_val + mock_column.statistics.max = max_val + mock_row_group.column.return_value = mock_column + mock_row_groups.append(mock_row_group) + + mock_pf.metadata.row_group.side_effect = lambda i: mock_row_groups[i] + return mock_pf + + +# Tests for sorted row groups + +def test_single_row_group_sorted(): + """Test with a single row group (trivially sorted)""" + mock_pf = _create_mock_parquet_file( + 'url_surtkey', + [('com,example)/page1', 'com,example)/page3')] + ) + + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') + + assert is_sorted is True + assert min_val is not None + assert max_val is not None + + +def test_multiple_row_groups_strictly_increasing(): + """Test with multiple row groups in strictly increasing order""" + mock_pf = _create_mock_parquet_file( + 'url_surtkey', + [ + ('com,aaa)/', 'com,bbb)/'), + ('com,ccc)/', 'com,ddd)/'), + ('com,eee)/', 'com,fff)/') + ] + ) + + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') + + assert is_sorted is True + assert min_val is not None + assert max_val is not None + + +def test_boundary_case_adjacent_values(): + """Test with row groups that have adjacent but non-overlapping values""" + mock_pf = _create_mock_parquet_file( + 'url', + [ + ('com,example)/a', 'com,example)/z'), + ('com,example,aaa)/', 'com,example,zzz)/') + ] + ) + + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url') + + assert is_sorted is True + assert min_val is not None + assert max_val is not None + + +def test_two_row_groups_strictly_increasing_strings(): + """Test with two row groups with string values in strictly increasing order""" + mock_pf = _create_mock_parquet_file( + 'url_surtkey', + [ + ('com,apple)/', 'com,banana)/'), + ('com,cherry)/', 'com,date)/') + ] + ) + + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') + + assert is_sorted is True + assert min_val is not None + assert max_val is not None + + +def test_many_row_groups_strictly_increasing(): + """Test with many row groups, all strictly increasing""" + row_groups = [ + ('com,aaa)/', 'com,aaa,zzz)/'), + ('com,bbb)/', 'com,bbb,zzz)/'), + ('com,ccc)/', 'com,ccc,zzz)/'), + ('com,ddd)/', 'com,ddd,zzz)/'), + ('com,eee)/', 'com,eee,zzz)/'), + ] + mock_pf = _create_mock_parquet_file('url_surtkey', row_groups) + + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') + + assert is_sorted is True + assert min_val is not None + assert max_val is not None + + +# Tests for non-sorted row groups + +def test_two_row_groups_overlapping(): + """Test with two row groups where second min is less than first max (overlapping)""" + mock_pf = _create_mock_parquet_file( + 'url_surtkey', + [ + ('a', 'd'), + ('b', 'e') + ] + ) + + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') + + assert is_sorted is False + assert min_val is None + assert max_val is None + + +def test_row_groups_completely_out_of_order(): + """Test with row groups in descending order""" + mock_pf = _create_mock_parquet_file( + 'url_surtkey', + [ + ('z', 'zz'), + ('a', 'b') # completely before the first group + ] + ) + + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') + + assert is_sorted is False + assert min_val is None + assert max_val is None + + +def test_multiple_row_groups_with_middle_unsorted(): + """Test with multiple row groups where the middle one breaks the sort order""" + mock_pf = _create_mock_parquet_file( + 'url_surtkey', + [ + ('a', 'b'), + ('z', 'zz'), # correctly sorted so far + ('c', 'd') # breaks ordering (min 'c' < previous max 'zz') + ] + ) + + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') + + assert is_sorted is False + assert min_val is None + assert max_val is None + + +def test_row_groups_equal_boundary_allowed(): + """Test that row groups where second min equals first max are allowed (>= not >)""" + mock_pf = _create_mock_parquet_file( + 'url_surtkey', + [ + ('a', 'b'), + ('b', 'c') # min equals prev_max - this is allowed + ] + ) + + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') + + assert is_sorted is True + assert min_val is not None + assert max_val is not None + + +def test_slight_overlap_in_middle(): + """Test detecting overlap in the middle of many row groups""" + mock_pf = _create_mock_parquet_file( + 'url_surtkey', + [ + ('a', 'az'), + ('b', 'bz'), + ('c', 'cz'), + ('ba', 'baz'), # overlaps with previous ('ba' < 'c') + ('d', 'dz'), + ] + ) + + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') + + assert is_sorted is False + assert min_val is None + assert max_val is None From 2914f421383a73f9053418090abb24853a00f0ef Mon Sep 17 00:00:00 2001 From: Damian Stewart Date: Wed, 29 Oct 2025 15:22:55 +0100 Subject: [PATCH 3/9] tests and fixes --- src/__init__.py | 0 src/script/__init__.py | 0 src/script/is_table_sorted.py | 3 +- src/test/__init__.py | 0 src/test/test_is_table_sorted.py | 232 ++++++------------------------- 5 files changed, 48 insertions(+), 187 deletions(-) create mode 100644 src/__init__.py create mode 100644 src/script/__init__.py create mode 100644 src/test/__init__.py diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/script/__init__.py b/src/script/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/script/is_table_sorted.py b/src/script/is_table_sorted.py index 0358d58..8722e2a 100644 --- a/src/script/is_table_sorted.py +++ b/src/script/is_table_sorted.py @@ -10,7 +10,7 @@ from tqdm.auto import tqdm -def are_parquet_file_row_groups_sorted(pf: pq.ParquetFile, column_name: str) -> bool: +def are_parquet_file_row_groups_sorted(pf: pq.ParquetFile, column_name: str) -> tuple[bool, str, str]: sort_column_index = next(i for i, name in enumerate(pf.schema.names) if name == column_name) @@ -27,6 +27,7 @@ def are_parquet_file_row_groups_sorted(pf: pq.ParquetFile, column_name: str) -> return False, None, None whole_min = column.statistics.min if whole_min is None else min(column.statistics.min, whole_min) whole_max = column.statistics.max if whole_max is None else max(column.statistics.max, whole_max) + prev_max = column.statistics.max return True, whole_min, whole_max diff --git a/src/test/__init__.py b/src/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/test/test_is_table_sorted.py b/src/test/test_is_table_sorted.py index ee58eeb..67da9a9 100644 --- a/src/test/test_is_table_sorted.py +++ b/src/test/test_is_table_sorted.py @@ -1,21 +1,15 @@ +import pytest +import random from unittest.mock import MagicMock import os import sys -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'script')) -from is_table_sorted import are_parquet_file_row_groups_sorted +from src.script.is_table_sorted import are_parquet_file_row_groups_sorted -def _create_mock_parquet_file(column_name: str, row_groups_stats: list[tuple]): - """ - Helper to create a mock ParquetFile with specified row group statistics. - - Args: - column_name: Name of the column to sort by - row_groups_stats: List of (min, max) tuples for each row group - """ +def _create_mock_parquet_file(column_name: str, row_groups_stats: list[tuple[str, str]]): mock_pf = MagicMock() - mock_pf.schema.names = [column_name, 'data'] + mock_pf.schema.names = [column_name] mock_pf.num_row_groups = len(row_groups_stats) mock_row_groups = [] @@ -31,180 +25,46 @@ def _create_mock_parquet_file(column_name: str, row_groups_stats: list[tuple]): return mock_pf -# Tests for sorted row groups - def test_single_row_group_sorted(): - """Test with a single row group (trivially sorted)""" - mock_pf = _create_mock_parquet_file( - 'url_surtkey', - [('com,example)/page1', 'com,example)/page3')] - ) - - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') - - assert is_sorted is True - assert min_val is not None - assert max_val is not None - - -def test_multiple_row_groups_strictly_increasing(): - """Test with multiple row groups in strictly increasing order""" - mock_pf = _create_mock_parquet_file( - 'url_surtkey', - [ - ('com,aaa)/', 'com,bbb)/'), - ('com,ccc)/', 'com,ddd)/'), - ('com,eee)/', 'com,fff)/') - ] - ) - - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') - - assert is_sorted is True - assert min_val is not None - assert max_val is not None - - -def test_boundary_case_adjacent_values(): - """Test with row groups that have adjacent but non-overlapping values""" - mock_pf = _create_mock_parquet_file( - 'url', - [ - ('com,example)/a', 'com,example)/z'), - ('com,example,aaa)/', 'com,example,zzz)/') - ] - ) - - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url') - - assert is_sorted is True - assert min_val is not None - assert max_val is not None - - -def test_two_row_groups_strictly_increasing_strings(): - """Test with two row groups with string values in strictly increasing order""" - mock_pf = _create_mock_parquet_file( - 'url_surtkey', - [ - ('com,apple)/', 'com,banana)/'), - ('com,cherry)/', 'com,date)/') - ] - ) - - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') - - assert is_sorted is True - assert min_val is not None - assert max_val is not None - - -def test_many_row_groups_strictly_increasing(): - """Test with many row groups, all strictly increasing""" - row_groups = [ - ('com,aaa)/', 'com,aaa,zzz)/'), - ('com,bbb)/', 'com,bbb,zzz)/'), - ('com,ccc)/', 'com,ccc,zzz)/'), - ('com,ddd)/', 'com,ddd,zzz)/'), - ('com,eee)/', 'com,eee,zzz)/'), - ] + mock_pf = _create_mock_parquet_file('url_surtkey', [('a', 'b')]) + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, column_name='url_surtkey') + assert is_sorted + assert min_val == 'a' + assert max_val == 'b' + + +def test_row_groups_sorted(): + all_row_groups_stats = [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h')] + for n in range(1, len(all_row_groups_stats)): + row_groups_stats = all_row_groups_stats[:n] + mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats) + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, column_name='url_surtkey') + assert is_sorted + assert is_sorted + assert min_val == row_groups_stats[0][0] + assert max_val == row_groups_stats[-1][1] + + +def test_row_groups_unsorted(): + all_row_groups_stats = [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h')] + count = 0 + while count < 100: + for n in range(2, len(all_row_groups_stats)): + row_groups_stats = all_row_groups_stats[:n].copy() + random.shuffle(row_groups_stats) + if row_groups_stats == all_row_groups_stats[:n]: + # shuffle resulted in same order, try again + continue + + mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats) + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, column_name='url_surtkey') + assert not is_sorted + + count += 1 + + +def test_row_groups_overlapping(): + row_groups = [('a', 'c'), ('b', 'd')] mock_pf = _create_mock_parquet_file('url_surtkey', row_groups) - - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') - - assert is_sorted is True - assert min_val is not None - assert max_val is not None - - -# Tests for non-sorted row groups - -def test_two_row_groups_overlapping(): - """Test with two row groups where second min is less than first max (overlapping)""" - mock_pf = _create_mock_parquet_file( - 'url_surtkey', - [ - ('a', 'd'), - ('b', 'e') - ] - ) - - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') - - assert is_sorted is False - assert min_val is None - assert max_val is None - - -def test_row_groups_completely_out_of_order(): - """Test with row groups in descending order""" - mock_pf = _create_mock_parquet_file( - 'url_surtkey', - [ - ('z', 'zz'), - ('a', 'b') # completely before the first group - ] - ) - - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') - - assert is_sorted is False - assert min_val is None - assert max_val is None - - -def test_multiple_row_groups_with_middle_unsorted(): - """Test with multiple row groups where the middle one breaks the sort order""" - mock_pf = _create_mock_parquet_file( - 'url_surtkey', - [ - ('a', 'b'), - ('z', 'zz'), # correctly sorted so far - ('c', 'd') # breaks ordering (min 'c' < previous max 'zz') - ] - ) - - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') - - assert is_sorted is False - assert min_val is None - assert max_val is None - - -def test_row_groups_equal_boundary_allowed(): - """Test that row groups where second min equals first max are allowed (>= not >)""" - mock_pf = _create_mock_parquet_file( - 'url_surtkey', - [ - ('a', 'b'), - ('b', 'c') # min equals prev_max - this is allowed - ] - ) - - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') - - assert is_sorted is True - assert min_val is not None - assert max_val is not None - - -def test_slight_overlap_in_middle(): - """Test detecting overlap in the middle of many row groups""" - mock_pf = _create_mock_parquet_file( - 'url_surtkey', - [ - ('a', 'az'), - ('b', 'bz'), - ('c', 'cz'), - ('ba', 'baz'), # overlaps with previous ('ba' < 'c') - ('d', 'dz'), - ] - ) - - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, 'url_surtkey') - - assert is_sorted is False - assert min_val is None - assert max_val is None - - + is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, column_name='url_surtkey') + assert not is_sorted From 9285ec62cd06ec58fcf0a51d37b38daa4cb13231 Mon Sep 17 00:00:00 2001 From: Damian Stewart Date: Wed, 29 Oct 2025 15:33:18 +0100 Subject: [PATCH 4/9] reorganise --- src/test/__init__.py | 0 src/{ => util}/__init__.py | 0 src/{script => util}/is_table_sorted.py | 0 src/{script => util/test}/__init__.py | 0 src/{ => util}/test/test_is_table_sorted.py | 5 +---- 5 files changed, 1 insertion(+), 4 deletions(-) delete mode 100644 src/test/__init__.py rename src/{ => util}/__init__.py (100%) rename src/{script => util}/is_table_sorted.py (100%) rename src/{script => util/test}/__init__.py (100%) rename src/{ => util}/test/test_is_table_sorted.py (95%) diff --git a/src/test/__init__.py b/src/test/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/__init__.py b/src/util/__init__.py similarity index 100% rename from src/__init__.py rename to src/util/__init__.py diff --git a/src/script/is_table_sorted.py b/src/util/is_table_sorted.py similarity index 100% rename from src/script/is_table_sorted.py rename to src/util/is_table_sorted.py diff --git a/src/script/__init__.py b/src/util/test/__init__.py similarity index 100% rename from src/script/__init__.py rename to src/util/test/__init__.py diff --git a/src/test/test_is_table_sorted.py b/src/util/test/test_is_table_sorted.py similarity index 95% rename from src/test/test_is_table_sorted.py rename to src/util/test/test_is_table_sorted.py index 67da9a9..bacc2ab 100644 --- a/src/test/test_is_table_sorted.py +++ b/src/util/test/test_is_table_sorted.py @@ -1,10 +1,7 @@ -import pytest import random from unittest.mock import MagicMock -import os -import sys -from src.script.is_table_sorted import are_parquet_file_row_groups_sorted +from util.is_table_sorted import are_parquet_file_row_groups_sorted def _create_mock_parquet_file(column_name: str, row_groups_stats: list[tuple[str, str]]): From 6d63937e804b38974405c24ad3924012e42bae7f Mon Sep 17 00:00:00 2001 From: Damian Stewart Date: Wed, 29 Oct 2025 15:45:28 +0100 Subject: [PATCH 5/9] file-level unit tests --- src/util/is_table_sorted.py | 5 ++-- src/util/test/test_is_table_sorted.py | 35 +++++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/util/is_table_sorted.py b/src/util/is_table_sorted.py index 8722e2a..02339fa 100644 --- a/src/util/is_table_sorted.py +++ b/src/util/is_table_sorted.py @@ -10,7 +10,7 @@ from tqdm.auto import tqdm -def are_parquet_file_row_groups_sorted(pf: pq.ParquetFile, column_name: str) -> tuple[bool, str, str]: +def are_parquet_file_row_groups_sorted(pf: pq.ParquetFile, column_name: str) -> tuple[bool, str|None, str|None]: sort_column_index = next(i for i, name in enumerate(pf.schema.names) if name == column_name) @@ -50,6 +50,7 @@ def is_full_table_sorted(file_or_s3_url_list_ordered: list[str], sort_column_nam if prev_max is not None and prev_max > pf_min: print(f"{prev_file_or_url} is not sorted with respect to {file_or_url}: '{prev_max}' > '{pf_min}'") status['filewise_unsorted'] += 1 + is_sorted = False pbar.set_postfix(status) prev_max = pf_max prev_file_or_url = file_or_url @@ -83,7 +84,7 @@ def read_file_list(path_or_url: str, prefix: str) -> list[str]: if __name__ == "__main__": parser = argparse.ArgumentParser(description="Check if a collection of Parquet files, considered as a whole, is sorted. Exit code is 0 if sorted, 1 if not sorted.") - parser.add_argument("files_or_s3_urls_file", type=str, help="URI or path to a text file containing a list of paths or S3 URLs, one per line, in the expected sorted order.") + parser.add_argument("files_or_s3_urls_file", type=str, help="URI or path to a text file containing a list of paths, URLs, or S3 URLs, one per line, in the expected sorted order.") parser.add_argument("--prefix", type=str, default="s3://commoncrawl/", help="Prefix to prepend to entries read from the file (default: 's3://commoncrawl/')") parser.add_argument("--column", type=str, default="url_surtkey", help="Column name to check sorting against (default: 'url_surtkey')") diff --git a/src/util/test/test_is_table_sorted.py b/src/util/test/test_is_table_sorted.py index bacc2ab..19de004 100644 --- a/src/util/test/test_is_table_sorted.py +++ b/src/util/test/test_is_table_sorted.py @@ -1,7 +1,7 @@ import random -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch -from util.is_table_sorted import are_parquet_file_row_groups_sorted +from util.is_table_sorted import are_parquet_file_row_groups_sorted, is_full_table_sorted def _create_mock_parquet_file(column_name: str, row_groups_stats: list[tuple[str, str]]): @@ -65,3 +65,34 @@ def test_row_groups_overlapping(): mock_pf = _create_mock_parquet_file('url_surtkey', row_groups) is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, column_name='url_surtkey') assert not is_sorted + + +def test_ordered_files_sorted(): + files_config = { + '/data/a': [('aaa', 'bbb'), ('bbc', 'ccc')], + '/data/b': [('ccd', 'ddd'), ('dde', 'eee')], + '/data/c': [('eef', 'fff'), ('ffg', 'ggg')], + } + + def mock_parquet_file(path): + return _create_mock_parquet_file('url_surtkey', files_config[path]) + + with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file): + result = is_full_table_sorted(['/data/a', '/data/b', '/data/c'], 'url_surtkey') + assert result + + +def test_ordered_files_unsorted(): + files_config = { + '/data/a': [('aaa', 'bbb'), ('bbc', 'ccc')], + '/data/b': [('ccd', 'ddd'), ('dde', 'eee')], + '/data/c': [('eef', 'fff'), ('ffg', 'ggg')], + } + + def mock_parquet_file(path): + return _create_mock_parquet_file('url_surtkey', files_config[path]) + + with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file): + result = is_full_table_sorted(['/data/a', '/data/c', '/data/b'], 'url_surtkey') + assert result # we don't care about the order of files + From b12057199d8762978f9edefeb92d109523c8a70f Mon Sep 17 00:00:00 2001 From: Damian Stewart Date: Wed, 29 Oct 2025 15:52:19 +0100 Subject: [PATCH 6/9] don't fail if not filewise sorted --- .github/workflows/python_test.yml | 4 ++++ src/util/is_table_sorted.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/python_test.yml diff --git a/.github/workflows/python_test.yml b/.github/workflows/python_test.yml new file mode 100644 index 0000000..ce8f5ac --- /dev/null +++ b/.github/workflows/python_test.yml @@ -0,0 +1,4 @@ +name: python_unit_tests.yml +on: + +jobs: diff --git a/src/util/is_table_sorted.py b/src/util/is_table_sorted.py index 02339fa..529de8a 100644 --- a/src/util/is_table_sorted.py +++ b/src/util/is_table_sorted.py @@ -50,7 +50,7 @@ def is_full_table_sorted(file_or_s3_url_list_ordered: list[str], sort_column_nam if prev_max is not None and prev_max > pf_min: print(f"{prev_file_or_url} is not sorted with respect to {file_or_url}: '{prev_max}' > '{pf_min}'") status['filewise_unsorted'] += 1 - is_sorted = False + #is_sorted = False # uncomment to fail on filewise unsortedness pbar.set_postfix(status) prev_max = pf_max prev_file_or_url = file_or_url From aeffeec00dc4b21488d9ce9cb18d45fd8da47405 Mon Sep 17 00:00:00 2001 From: Damian Stewart Date: Wed, 29 Oct 2025 15:55:19 +0100 Subject: [PATCH 7/9] github workflow for python unit tests --- .github/workflows/python_test.yml | 36 ++++++++++++++++++++++++++++++- pyproject.toml | 2 +- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/.github/workflows/python_test.yml b/.github/workflows/python_test.yml index ce8f5ac..92025d2 100644 --- a/.github/workflows/python_test.yml +++ b/.github/workflows/python_test.yml @@ -1,4 +1,38 @@ -name: python_unit_tests.yml +name: Python Unit Tests + on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + workflow_dispatch: jobs: + test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ['3.10', '3.11', '3.12', '3.13'] + fail-fast: false + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install uv + run: | + curl -LsSf https://astral.sh/uv/install.sh | sh + echo "$HOME/.cargo/bin" >> $GITHUB_PATH + + - name: Install dependencies + run: | + uv sync + + - name: Run tests + run: | + cd src/util/test + python -m pytest test_is_table_sorted.py -v diff --git a/pyproject.toml b/pyproject.toml index e9132f4..57b29b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "cc-index-table" version = "0.1.0" -description = "Add your description here" +description = "Tools for working with Common Crawl index tables." requires-python = ">=3.12" dependencies = [ "boto3>=1.40.61", From a3484ace145002015331bca9cb988b3685fc6d67 Mon Sep 17 00:00:00 2001 From: Damian Stewart Date: Wed, 29 Oct 2025 15:57:58 +0100 Subject: [PATCH 8/9] fix github action --- .github/workflows/python_test.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/python_test.yml b/.github/workflows/python_test.yml index 92025d2..36e2643 100644 --- a/.github/workflows/python_test.yml +++ b/.github/workflows/python_test.yml @@ -34,5 +34,4 @@ jobs: - name: Run tests run: | - cd src/util/test - python -m pytest test_is_table_sorted.py -v + uv run -m pytest src -v From 62f7a9a340e486087699863a2bb066d51bf90481 Mon Sep 17 00:00:00 2001 From: Damian Stewart Date: Fri, 31 Oct 2025 15:44:37 +0100 Subject: [PATCH 9/9] add README details; clarify min/max row group checking vs full file checking Signed-off-by: Damian Stewart --- README.md | 5 +++ ...rted.py => are_part_min_max_increasing.py} | 39 +++++++------------ ...py => test_are_part_min_max_increasing.py} | 19 ++++----- 3 files changed, 25 insertions(+), 38 deletions(-) rename src/util/{is_table_sorted.py => are_part_min_max_increasing.py} (58%) rename src/util/test/{test_is_table_sorted.py => test_are_part_min_max_increasing.py} (76%) diff --git a/README.md b/README.md index 4505872..6dc5dae 100644 --- a/README.md +++ b/README.md @@ -314,3 +314,8 @@ It's also possible to pass the result of SQL query as a CSV file, e.g., an Athen ... ``` +## Part Row Group Test + +This repository also includes a tool to check whether the row groups in a given table part parquet file have strictly increasing min/max values - see `util/are_part_min_max_increasing.py`. + +Note that this tool only checks that, within a single `.parquet` file, each row group's `.min` is greater than the previous row group's `.max`; further this is restricted to a single parquet file - the table as a whole is not checked. diff --git a/src/util/is_table_sorted.py b/src/util/are_part_min_max_increasing.py similarity index 58% rename from src/util/is_table_sorted.py rename to src/util/are_part_min_max_increasing.py index 529de8a..2741b89 100644 --- a/src/util/is_table_sorted.py +++ b/src/util/are_part_min_max_increasing.py @@ -4,13 +4,12 @@ import argparse from urllib.parse import urlparse -from urllib.request import urlopen import boto3 import gzip from tqdm.auto import tqdm -def are_parquet_file_row_groups_sorted(pf: pq.ParquetFile, column_name: str) -> tuple[bool, str|None, str|None]: +def are_parquet_file_row_groups_min_max_ordered(pf: pq.ParquetFile, column_name: str) -> bool: sort_column_index = next(i for i, name in enumerate(pf.schema.names) if name == column_name) @@ -22,38 +21,29 @@ def are_parquet_file_row_groups_sorted(pf: pq.ParquetFile, column_name: str) -> row_group = pf.metadata.row_group(row_group_index) column = row_group.column(sort_column_index) if prev_max is not None and prev_max > column.statistics.min: - # internally unsorted - print(f"row group {row_group_index} is not sorted on {column_name}: '{column.statistics.min}' <= '{prev_max}' ; stopping") - return False, None, None + print(f"row group {row_group_index} min is not strictly increasing w.r.t previous row group max on {column_name}: '{column.statistics.min}' <= '{prev_max}' ; stopping") + return False whole_min = column.statistics.min if whole_min is None else min(column.statistics.min, whole_min) whole_max = column.statistics.max if whole_max is None else max(column.statistics.max, whole_max) prev_max = column.statistics.max - return True, whole_min, whole_max + return True -def is_full_table_sorted(file_or_s3_url_list_ordered: list[str], sort_column_name: str) -> bool: +def are_all_parts_min_max_ordered(file_or_s3_url_list: list[str], sort_column_name: str) -> bool: is_sorted = True - prev_max = None - prev_file_or_url = None status = defaultdict(int) - with tqdm(file_or_s3_url_list_ordered) as pbar: + with tqdm(file_or_s3_url_list) as pbar: for file_or_url in pbar: pf = pq.ParquetFile(file_or_url) - this_is_sorted, pf_min, pf_max = are_parquet_file_row_groups_sorted(pf, column_name=sort_column_name) + this_is_sorted = are_parquet_file_row_groups_min_max_ordered(pf, column_name=sort_column_name) if not this_is_sorted: print( - f"Row groups are *internally* not sorted in file {file_or_url}" + f"Row groups are *internally* not ordered by min/max in file {file_or_url}" ) is_sorted = False status['internally_unsorted'] += 1 - if prev_max is not None and prev_max > pf_min: - print(f"{prev_file_or_url} is not sorted with respect to {file_or_url}: '{prev_max}' > '{pf_min}'") - status['filewise_unsorted'] += 1 - #is_sorted = False # uncomment to fail on filewise unsortedness pbar.set_postfix(status) - prev_max = pf_max - prev_file_or_url = file_or_url return is_sorted @@ -69,11 +59,8 @@ def read_file_list(path_or_url: str, prefix: str) -> list[str]: key = parsed.path.lstrip("/") obj = s3.get_object(Bucket=bucket, Key=key) content = obj["Body"].read() - elif parsed.scheme in ("http", "https"): - with urlopen(path_or_url) as f: - content = f.read() else: - with open(path_or_url, "r") as f: + with open(path_or_url, "rb") as f: content = f.read() if is_gzip(content): @@ -83,15 +70,15 @@ def read_file_list(path_or_url: str, prefix: str) -> list[str]: if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Check if a collection of Parquet files, considered as a whole, is sorted. Exit code is 0 if sorted, 1 if not sorted.") - parser.add_argument("files_or_s3_urls_file", type=str, help="URI or path to a text file containing a list of paths, URLs, or S3 URLs, one per line, in the expected sorted order.") + parser = argparse.ArgumentParser(description="Check if row groups within parquet files have strictly increasing non-overlapping min/max ranges. Exit code is 0 if sorted, 1 if not sorted.") + parser.add_argument("files_or_s3_urls_file", type=str, help="path or s3:// URI to a text file containing a list of paths, to check; used in combination with --prefix to recover individual file paths.") parser.add_argument("--prefix", type=str, default="s3://commoncrawl/", help="Prefix to prepend to entries read from the file (default: 's3://commoncrawl/')") - parser.add_argument("--column", type=str, default="url_surtkey", help="Column name to check sorting against (default: 'url_surtkey')") + parser.add_argument("--column", type=str, default="url_surtkey", help="Column name to check against (default: 'url_surtkey')") args = parser.parse_args() files = read_file_list(args.files_or_s3_urls_file, prefix=args.prefix) - is_sorted = is_full_table_sorted(files, sort_column_name=args.column) + is_sorted = are_all_parts_min_max_ordered(files, sort_column_name=args.column) if is_sorted: print("✅ Files are sorted") exit(0) diff --git a/src/util/test/test_is_table_sorted.py b/src/util/test/test_are_part_min_max_increasing.py similarity index 76% rename from src/util/test/test_is_table_sorted.py rename to src/util/test/test_are_part_min_max_increasing.py index 19de004..08a6ad3 100644 --- a/src/util/test/test_is_table_sorted.py +++ b/src/util/test/test_are_part_min_max_increasing.py @@ -1,7 +1,7 @@ import random from unittest.mock import MagicMock, patch -from util.is_table_sorted import are_parquet_file_row_groups_sorted, is_full_table_sorted +from util.are_part_min_max_increasing import are_parquet_file_row_groups_min_max_ordered, are_all_parts_min_max_ordered def _create_mock_parquet_file(column_name: str, row_groups_stats: list[tuple[str, str]]): @@ -24,10 +24,8 @@ def _create_mock_parquet_file(column_name: str, row_groups_stats: list[tuple[str def test_single_row_group_sorted(): mock_pf = _create_mock_parquet_file('url_surtkey', [('a', 'b')]) - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, column_name='url_surtkey') + is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') assert is_sorted - assert min_val == 'a' - assert max_val == 'b' def test_row_groups_sorted(): @@ -35,11 +33,8 @@ def test_row_groups_sorted(): for n in range(1, len(all_row_groups_stats)): row_groups_stats = all_row_groups_stats[:n] mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats) - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, column_name='url_surtkey') + is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') assert is_sorted - assert is_sorted - assert min_val == row_groups_stats[0][0] - assert max_val == row_groups_stats[-1][1] def test_row_groups_unsorted(): @@ -54,7 +49,7 @@ def test_row_groups_unsorted(): continue mock_pf = _create_mock_parquet_file('url_surtkey', row_groups_stats) - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, column_name='url_surtkey') + is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') assert not is_sorted count += 1 @@ -63,7 +58,7 @@ def test_row_groups_unsorted(): def test_row_groups_overlapping(): row_groups = [('a', 'c'), ('b', 'd')] mock_pf = _create_mock_parquet_file('url_surtkey', row_groups) - is_sorted, min_val, max_val = are_parquet_file_row_groups_sorted(mock_pf, column_name='url_surtkey') + is_sorted = are_parquet_file_row_groups_min_max_ordered(mock_pf, column_name='url_surtkey') assert not is_sorted @@ -78,7 +73,7 @@ def mock_parquet_file(path): return _create_mock_parquet_file('url_surtkey', files_config[path]) with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file): - result = is_full_table_sorted(['/data/a', '/data/b', '/data/c'], 'url_surtkey') + result = are_all_parts_min_max_ordered(['/data/a', '/data/b', '/data/c'], 'url_surtkey') assert result @@ -93,6 +88,6 @@ def mock_parquet_file(path): return _create_mock_parquet_file('url_surtkey', files_config[path]) with patch('pyarrow.parquet.ParquetFile', side_effect=mock_parquet_file): - result = is_full_table_sorted(['/data/a', '/data/c', '/data/b'], 'url_surtkey') + result = are_all_parts_min_max_ordered(['/data/a', '/data/c', '/data/b'], 'url_surtkey') assert result # we don't care about the order of files