Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
70bdd93
well-formed example passes
Oct 16, 2025
92f30c6
wip (back to basics on pyspark)
Oct 17, 2025
fed0b2c
identical output with different format and sort order
Oct 17, 2025
9bb3f41
typo
Oct 17, 2025
458cb2e
fix failing tests by spinning up a proper spark session and RDD
Oct 17, 2025
ada1471
exactly match cc-mrjob output
Oct 20, 2025
1f82d33
wip content encoding and invalid url tests
Oct 20, 2025
35de5ca
Add test for >50 sitemaps; add accumulator checks to tests
Oct 20, 2025
0bd0b4a
add fastwarc implementation
Oct 20, 2025
d17683f
fix logging
Oct 20, 2025
41a3d0b
simplify & address review comments
Oct 30, 2025
d50a39a
add exception handling for urljoin
Oct 30, 2025
c163959
add python unit test github workflow
Oct 30, 2025
1ba160f
downgrade from uv
Oct 30, 2025
e2e9646
remove same-host optimization
Oct 30, 2025
94a98d7
split PySpark into its own requirements.txt; update README
Oct 30, 2025
6a23d23
typo
Oct 30, 2025
8339e6c
typos
Oct 30, 2025
be9a046
polish
Oct 30, 2025
0210cd8
cleanup remaining spark-submit invocations
Oct 30, 2025
cb9c4ca
update docs with expanded PySpark info
Oct 31, 2025
2dd22dc
typo; clarify debugging limitations
Oct 31, 2025
126b31b
address review comments
Oct 31, 2025
3a7ba3b
wip
Oct 31, 2025
69d66a0
add pyproject.toml
Oct 31, 2025
8fa2c04
wip packaging
Nov 3, 2025
2daa4b7
wip package_and_run_job
Nov 7, 2025
5c4b1c6
update version
Nov 7, 2025
f6edfb4
add pypi publish workflow
Nov 7, 2025
ca345e1
Merge branch 'main' into damian/feat/pypi-package
Nov 7, 2025
1ffe57d
try and make the workflow run
Nov 13, 2025
f29d614
try and make the workflow run 2
Nov 13, 2025
3618e7b
try and make the workflow run 3
Nov 13, 2025
9c08fc3
fix pypi project url
Nov 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions .github/workflows/python_publish.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# This workflow will upload a Python Package to PyPI when a release is created
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python#publishing-to-package-registries

# This workflow uses actions that are not certified by GitHub.
# They are provided by a third-party and are governed by
# separate terms of service, privacy policy, and support
# documentation.

name: Upload Python Package to PyPI

on:
workflow_dispatch:
pull_request:

#on:
# release:
# types: [published]


permissions:
contents: read

jobs:
release-build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4

- uses: actions/setup-python@v5
with:
python-version: "3.x"

- name: Build release distributions
run: |
# NOTE: put your own distribution build steps here.
python -m pip install build
python -m build

- name: Upload distributions
uses: actions/upload-artifact@v4
with:
name: release-dists
path: dist/

pypi-publish:
runs-on: ubuntu-latest
needs:
- release-build
permissions:
# IMPORTANT: this permission is mandatory for trusted publishing
id-token: write

# Dedicated environments with protections for publishing are strongly recommended.
# For more information, see: https://docs.github.com/en/actions/deployment/targeting-different-environments/using-environments-for-deployment#deployment-protection-rules
environment:
name: pypi
# OPTIONAL: uncomment and update to include your PyPI project URL in the deployment status:
url: https://pypi.org/p/cc-pyspark
#
# ALTERNATIVE: if your GitHub Release name is the PyPI project version string
# ALTERNATIVE: exactly, uncomment the following line instead:
# url: https://pypi.org/project/YOURPROJECT/${{ github.event.release.name }}

steps:
- name: Retrieve release distributions
uses: actions/download-artifact@v4
with:
name: release-dists
path: dist/

- name: Publish release distributions to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
packages-dir: dist/
28 changes: 14 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,31 @@

This project provides examples how to process the Common Crawl dataset with [Apache Spark](https://spark.apache.org/) and Python:

+ [count HTML tags](./html_tag_count.py) in Common Crawl's raw response data (WARC files)
+ [count HTML tags](src/cc_pyspark/jobs/html_tag_count.py) in Common Crawl's raw response data (WARC files)

+ [count web server names](./server_count.py) in Common Crawl's metadata (HTTP headers in WAT or WARC files)
+ [count web server names](src/cc_pyspark/jobs/server_count.py) in Common Crawl's metadata (HTTP headers in WAT or WARC files)

+ list host names and corresponding [IP addresses](./server_ip_address.py) (WAT files or WARC files)
+ list host names and corresponding [IP addresses](src/cc_pyspark/jobs/server_ip_address.py) (WAT files or WARC files)

+ [word count](./word_count.py) (term and document frequency) in Common Crawl's extracted text (WET files)
+ [word count](src/cc_pyspark/jobs/word_count.py) (term and document frequency) in Common Crawl's extracted text (WET files)

+ [md5sum](./md5sum.py) Run an external command (`md5sum`) on a list of files from a manifest – WARC, WET, WAT, or any other type of file.
+ [md5sum](src/cc_pyspark/jobs/md5sum.py) Run an external command (`md5sum`) on a list of files from a manifest – WARC, WET, WAT, or any other type of file.

+ [extract links](./wat_extract_links.py) from WAT files and [construct the (host-level) web graph](./hostlinks_to_graph.py) – for further details about the web graphs see the project [cc-webgraph](https://github.com/commoncrawl/cc-webgraph)
+ [extract links](src/cc_pyspark/jobs/wat_extract_links.py) from WAT files and [construct the (host-level) web graph](src/cc_pyspark/jobs/hostlinks_to_graph.py) – for further details about the web graphs see the project [cc-webgraph](https://github.com/commoncrawl/cc-webgraph)

+ [WET extractor](./wet_extractor.py), using FastWARC and Resiliparse. See also [Using FastWARC](#using-fastwarc-to-read-warc-files).
+ [WET extractor](src/cc_pyspark/jobs/wet_extractor.py), using FastWARC and Resiliparse. See also [Using FastWARC](#using-fastwarc-to-read-warc-files).

+ work with the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see also [cc-index-table](https://github.com/commoncrawl/cc-index-table) and the notes about [querying the columnar index](#querying-the-columnar-index)):

- run a SQL query and [export the result as a table](./cc_index_export.py)
- run a SQL query and [export the result as a table](src/cc_pyspark/jobs/cc_index_export.py)

- select WARC records by a SQL query, parse the HTML, extract the text and [count words](./cc_index_word_count.py). Alternatively, the first step (query the columnar index) can be executed using Amazon Athena. The list of WARC record coordinates (CSV or a table created by a CTAS statement) is then passed via `--csv` or `--input_table_format`) to the Spark job.
- select WARC records by a SQL query, parse the HTML, extract the text and [count words](src/cc_pyspark/jobs/cc_index_word_count.py). Alternatively, the first step (query the columnar index) can be executed using Amazon Athena. The list of WARC record coordinates (CSV or a table created by a CTAS statement) is then passed via `--csv` or `--input_table_format`) to the Spark job.

Further information about the examples and available options is shown via the [command-line option](#command-line-options) `--help`.

## Implementing a Custom Extractor

Extending the [CCSparkJob](./sparkcc.py) isn't difficult and for many use cases it is sufficient to override a single method (`process_record`). Have a look at one of the examples, e.g. to [count HTML tags](./html_tag_count.py).
Extending the [CCSparkJob](src/cc_pyspark/sparkcc.py) isn't difficult and for many use cases it is sufficient to override a single method (`process_record`). Have a look at one of the examples, e.g. to [count HTML tags](src/cc_pyspark/jobs/html_tag_count.py).

## Setup

Expand Down Expand Up @@ -69,7 +69,7 @@ This will install v3.5.7 of [the PySpark python package](https://spark.apache.or

Install Spark if (see the [Spark documentation](https://spark.apache.org/docs/latest/) for guidance). Then, ensure that `spark-submit` and `pyspark` are on your `$PATH`, or prepend `$SPARK_HOME/bin` when running eg `$SPARK_HOME/bin/spark-submit`.

> Note: The PySpark package and "py4j" are required if you want to run the tests in `test/`. The packages are also included in Spark installations at `$SPARK_HOME/python` resp. `$SPARK_HOME/python/lib/py4j-*-src.zip`.
> Note: The PySpark package and "py4j" are required if you want to run the tests in `test/`. The packages are also included in Spark installations; Python binary is at `$SPARK_HOME/python` and package sources are at `$SPARK_HOME/python/lib/py4j-*-src.zip`.

## Compatibility and Requirements

Expand All @@ -92,7 +92,7 @@ Note that the sample data is from an older crawl (`CC-MAIN-2017-13` run in March

## Process Common Crawl Data on Spark

CC-PySpark reads the list of input files from a manifest file. Typically, these are Common Crawl WARC, WAT or WET files, but it could be any other type of file, as long it is supported by the class implementing [CCSparkJob](./sparkcc.py). The files can be given as absolute URLs or as paths relative to a base URL (option `--input_base_url`). The URL cat point to a local file (`file://`), to a remote location (typically below `s3://commoncrawl/` resp. `https://data.commoncrawl.org/`). For development and testing, you'd start with local files.
CC-PySpark reads the list of input files from a manifest file. Typically, these are Common Crawl WARC, WAT or WET files, but it could be any other type of file, as long it is supported by the class implementing [CCSparkJob](src/cc_pyspark/sparkcc.py). The files can be given as absolute URLs or as paths relative to a base URL (option `--input_base_url`). The URL cat point to a local file (`file://`), to a remote location (typically below `s3://commoncrawl/` resp. `https://data.commoncrawl.org/`). For development and testing, you'd start with local files.

### Running locally

Expand Down Expand Up @@ -256,10 +256,10 @@ Alternatively, it's possible configure the table schema explicitly:
Replacing [FastWARC](https://resiliparse.chatnoir.eu/en/latest/man/fastwarc.html) can speed up job execution by 25% if little custom computations are done and most of the time is spent for parsing WARC files.

To use FastWARC
- the job class must inherit from [CCFastWarcSparkJob](./sparkcc_fastwarc.py) instead of [CCSparkJob](./sparkcc.py). See [ServerCountFastWarcJob](./server_count_fastwarc.py) for an example.
- the job class must inherit from [CCFastWarcSparkJob](src/cc_pyspark/sparkcc_fastwarc.py) instead of [CCSparkJob](src/cc_pyspark/sparkcc.py). See [ServerCountFastWarcJob](src/cc_pyspark/jobs/server_count_fastwarc.py) for an example.
- when running the job in a Spark cluster, `sparkcc_fastwarc.py` must be passed via `--py-files` in addition to `sparkcc.py` and further job-specific Python files. See also [running in a Spark cluster](#running-in-spark-cluster-over-large-amounts-of-data).

Some differences between the warcio and FastWARC APIs are hidden from the user in methods implemented in [CCSparkJob](./sparkcc.py) and [CCFastWarcSparkJob](./sparkcc_fastwarc.py) respectively. These methods allow to access WARC or HTTP headers and the payload stream in a unique way, regardless of whether warcio or FastWARC are used.
Some differences between the warcio and FastWARC APIs are hidden from the user in methods implemented in [CCSparkJob](src/cc_pyspark/sparkcc.py) and [CCFastWarcSparkJob](src/cc_pyspark/sparkcc_fastwarc.py) respectively. These methods allow to access WARC or HTTP headers and the payload stream in a unique way, regardless of whether warcio or FastWARC are used.

However, it's recommended that you carefully verify that your custom job implementation works in combination with FastWARC. There are subtle differences between the warcio and FastWARC APIs, including the underlying classes (WARC/HTTP headers and stream implementations). In addition, FastWARC does not support for legacy ARC files and does not automatically decode HTTP content and transfer encodings (see [Resiliparse HTTP Tools](https://resiliparse.chatnoir.eu/en/latest/man/parse/http.html#read-chunked-http-payloads)). While content and transfer encodings are already decoded in Common Crawl WARC files, this may not be the case for WARC files from other sources. See also [WARC 1.1 specification, http/https response records](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#http-and-https-schemes).

Expand Down
24 changes: 24 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[project]
name = "cc-pyspark"
version = "0.1.0-dev0"
description = "Common Crawl data processing examples for PySpark."
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.9"
dependencies = [
"beautifulsoup4>=4.14.2",
"boto3>=1.40.63",
"botocore>=1.40.63",
"idna>=3.11",
"lxml>=6.0.2",
"orjson>=3.11.4",
"pytest>=8.4.2",
"pytest-mock>=3.15.1",
"requests>=2.32.5",
"ujson>=5.11.0",
"warcio>=1.7.5",
]
[tool.pytest.ini_options]
pythonpath = [
"src"
]
33 changes: 0 additions & 33 deletions requirements.txt

This file was deleted.

83 changes: 83 additions & 0 deletions script/package_and_run_job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@

DIR=$(dirname "$0")

function usage() {
echo "Usage: $0 --job_module <job_module> [--master <master_url>] [job_args...]"
echo ""
echo "Options:"
echo " --job_module <job_module> The job module to run (e.g., cc_pyspark.jobs.server_count)"
echo " --master <master_url> The Spark master URL (optional)"
echo " -h, --help Show this help message"
echo ""
echo "Positional Arguments:"
echo " job_args... Additional arguments to pass to the job"
}

MASTER_URL=
JOB_MODULE=
while [ "$#" -gt 0 ]; do
case $1 in
--job_module)
JOB_MODULE=$2
shift 2
;;
--master)
MASTER_URL=$2
shift 2
;;
-h|--help|-?)
usage
exit 0
;;
*)
# remaining args are job args
break
;;
esac
done

if [ -z "$JOB_MODULE" ]; then
echo "Error: --job_module is required"
usage
exit 1
fi
if [ -z "$MASTER_URL" ]; then
MASTER_URL="local[*]"
fi

VENV_DIR=$(mktemp -d)
VENV_PACK_FILE=$(mktemp -u).tar.gz

cleanup() {
rm -rf "$VENV_DIR"
echo rm -f "$VENV_PACK_FILE"
echo NOT DELETING "$VENV_PACK_FILE" "for debugging purposes"
}
trap cleanup EXIT

set -e
set -x

python -m venv $VENV_DIR
source $VENV_DIR/bin/activate
#pip install cc-pyspark
pip install .
pip install venv-pack
venv-pack -o $VENV_PACK_FILE
deactivate

# if SPARK_HOME is not set, use `spark-submit` in path, otherwise use $SPARK_HOME/bin/spark-submit
if [ -z "$SPARK_HOME" ]; then
SPARK_SUBMIT="spark-submit"
else
SPARK_SUBMIT="$SPARK_HOME/bin/spark-submit"
fi

export PYSPARK_PYTHON=./environment/bin/python
$SPARK_SUBMIT \
--master $MASTER_URL \
--archives "$VENV_PACK_FILE#environment" \
"$DIR"/run_job.py \
--job_module $JOB_MODULE \
"$@"

43 changes: 43 additions & 0 deletions script/run_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import argparse
import importlib
import inspect
import sys

from cc_pyspark.sparkcc import CCSparkJob


def load_and_run_job(module_name: str):
job_module = importlib.import_module(module_name)

# Find the job class in the module
job_class = None
for name, obj in inspect.getmembers(job_module, inspect.isclass):
if obj.__module__ == job_module.__name__ and issubclass(obj, CCSparkJob):
print("found job class:", obj)
job_class = obj
break

if job_class is None:
raise ValueError(f"No CCSparkJob subclass found in module {module_name}")

job_instance = job_class()
print("running job:", job_instance)
job_instance.run()


def main():
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--job_module', type=str, required=True,)

args, remaining = arg_parser.parse_known_args()

# remove wrapper args from sys.argv so that job class can parse its own args cleanly
sys.argv = [sys.argv[0]] + remaining

load_and_run_job(args.job_module)


if __name__ == '__main__':
main()


Empty file added src/__init__.py
Empty file.
Empty file added src/cc_pyspark/__init__.py
Empty file.
File renamed without changes.
File renamed without changes.
Empty file added src/cc_pyspark/jobs/__init__.py
Empty file.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sparkcc import CCIndexSparkJob
from cc_pyspark.sparkcc import CCIndexSparkJob


class CCIndexExportJob(CCIndexSparkJob):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import Counter

from sparkcc import CCIndexWarcSparkJob
from cc_pyspark.sparkcc import CCIndexWarcSparkJob
from word_count import WordCountJob


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import os

from sparkcc import CCSparkJob
from cc_pyspark.sparkcc import CCSparkJob
from pyspark.sql import functions as sqlf
from pyspark.sql.types import BooleanType, LongType, StringType, StructField, StructType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from collections import Counter

from sparkcc import CCSparkJob
from cc_pyspark.sparkcc import CCSparkJob


class TagCountJob(CCSparkJob):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sparkcc import CCSparkJob
from cc_pyspark.sparkcc import CCSparkJob
from pyspark.sql.types import StructType, StructField, StringType


Expand Down
4 changes: 2 additions & 2 deletions server_count.py → src/cc_pyspark/jobs/server_count.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from sparkcc import CCSparkJob
from json_importer import json
from cc_pyspark.sparkcc import CCSparkJob
from cc_pyspark.json_importer import json


class ServerCountJob(CCSparkJob):
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from pyspark.sql.types import StructType, StructField, StringType, LongType

from sparkcc import CCSparkJob
from cc_pyspark.sparkcc import CCSparkJob


class ServerIPAddressJob(CCSparkJob):
Expand Down
Loading
Loading