diff --git a/.github/workflows/python_publish.yaml b/.github/workflows/python_publish.yaml new file mode 100644 index 0000000..a6de921 --- /dev/null +++ b/.github/workflows/python_publish.yaml @@ -0,0 +1,75 @@ +# This workflow will upload a Python Package to PyPI when a release is created +# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python#publishing-to-package-registries + +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +name: Upload Python Package to PyPI + +on: + workflow_dispatch: + pull_request: + +#on: +# release: +# types: [published] + + +permissions: + contents: read + +jobs: + release-build: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.x" + + - name: Build release distributions + run: | + # NOTE: put your own distribution build steps here. + python -m pip install build + python -m build + + - name: Upload distributions + uses: actions/upload-artifact@v4 + with: + name: release-dists + path: dist/ + + pypi-publish: + runs-on: ubuntu-latest + needs: + - release-build + permissions: + # IMPORTANT: this permission is mandatory for trusted publishing + id-token: write + + # Dedicated environments with protections for publishing are strongly recommended. + # For more information, see: https://docs.github.com/en/actions/deployment/targeting-different-environments/using-environments-for-deployment#deployment-protection-rules + environment: + name: pypi + # OPTIONAL: uncomment and update to include your PyPI project URL in the deployment status: + url: https://pypi.org/p/cc-pyspark + # + # ALTERNATIVE: if your GitHub Release name is the PyPI project version string + # ALTERNATIVE: exactly, uncomment the following line instead: + # url: https://pypi.org/project/YOURPROJECT/${{ github.event.release.name }} + + steps: + - name: Retrieve release distributions + uses: actions/download-artifact@v4 + with: + name: release-dists + path: dist/ + + - name: Publish release distributions to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + packages-dir: dist/ diff --git a/README.md b/README.md index 297245e..073a6a2 100644 --- a/README.md +++ b/README.md @@ -4,31 +4,31 @@ This project provides examples how to process the Common Crawl dataset with [Apache Spark](https://spark.apache.org/) and Python: -+ [count HTML tags](./html_tag_count.py) in Common Crawl's raw response data (WARC files) ++ [count HTML tags](src/cc_pyspark/jobs/html_tag_count.py) in Common Crawl's raw response data (WARC files) -+ [count web server names](./server_count.py) in Common Crawl's metadata (HTTP headers in WAT or WARC files) ++ [count web server names](src/cc_pyspark/jobs/server_count.py) in Common Crawl's metadata (HTTP headers in WAT or WARC files) -+ list host names and corresponding [IP addresses](./server_ip_address.py) (WAT files or WARC files) ++ list host names and corresponding [IP addresses](src/cc_pyspark/jobs/server_ip_address.py) (WAT files or WARC files) -+ [word count](./word_count.py) (term and document frequency) in Common Crawl's extracted text (WET files) ++ [word count](src/cc_pyspark/jobs/word_count.py) (term and document frequency) in Common Crawl's extracted text (WET files) -+ [md5sum](./md5sum.py) Run an external command (`md5sum`) on a list of files from a manifest – WARC, WET, WAT, or any other type of file. ++ [md5sum](src/cc_pyspark/jobs/md5sum.py) Run an external command (`md5sum`) on a list of files from a manifest – WARC, WET, WAT, or any other type of file. -+ [extract links](./wat_extract_links.py) from WAT files and [construct the (host-level) web graph](./hostlinks_to_graph.py) – for further details about the web graphs see the project [cc-webgraph](https://github.com/commoncrawl/cc-webgraph) ++ [extract links](src/cc_pyspark/jobs/wat_extract_links.py) from WAT files and [construct the (host-level) web graph](src/cc_pyspark/jobs/hostlinks_to_graph.py) – for further details about the web graphs see the project [cc-webgraph](https://github.com/commoncrawl/cc-webgraph) -+ [WET extractor](./wet_extractor.py), using FastWARC and Resiliparse. See also [Using FastWARC](#using-fastwarc-to-read-warc-files). ++ [WET extractor](src/cc_pyspark/jobs/wet_extractor.py), using FastWARC and Resiliparse. See also [Using FastWARC](#using-fastwarc-to-read-warc-files). + work with the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see also [cc-index-table](https://github.com/commoncrawl/cc-index-table) and the notes about [querying the columnar index](#querying-the-columnar-index)): - - run a SQL query and [export the result as a table](./cc_index_export.py) + - run a SQL query and [export the result as a table](src/cc_pyspark/jobs/cc_index_export.py) - - select WARC records by a SQL query, parse the HTML, extract the text and [count words](./cc_index_word_count.py). Alternatively, the first step (query the columnar index) can be executed using Amazon Athena. The list of WARC record coordinates (CSV or a table created by a CTAS statement) is then passed via `--csv` or `--input_table_format`) to the Spark job. + - select WARC records by a SQL query, parse the HTML, extract the text and [count words](src/cc_pyspark/jobs/cc_index_word_count.py). Alternatively, the first step (query the columnar index) can be executed using Amazon Athena. The list of WARC record coordinates (CSV or a table created by a CTAS statement) is then passed via `--csv` or `--input_table_format`) to the Spark job. Further information about the examples and available options is shown via the [command-line option](#command-line-options) `--help`. ## Implementing a Custom Extractor -Extending the [CCSparkJob](./sparkcc.py) isn't difficult and for many use cases it is sufficient to override a single method (`process_record`). Have a look at one of the examples, e.g. to [count HTML tags](./html_tag_count.py). +Extending the [CCSparkJob](src/cc_pyspark/sparkcc.py) isn't difficult and for many use cases it is sufficient to override a single method (`process_record`). Have a look at one of the examples, e.g. to [count HTML tags](src/cc_pyspark/jobs/html_tag_count.py). ## Setup @@ -69,7 +69,7 @@ This will install v3.5.7 of [the PySpark python package](https://spark.apache.or Install Spark if (see the [Spark documentation](https://spark.apache.org/docs/latest/) for guidance). Then, ensure that `spark-submit` and `pyspark` are on your `$PATH`, or prepend `$SPARK_HOME/bin` when running eg `$SPARK_HOME/bin/spark-submit`. -> Note: The PySpark package and "py4j" are required if you want to run the tests in `test/`. The packages are also included in Spark installations at `$SPARK_HOME/python` resp. `$SPARK_HOME/python/lib/py4j-*-src.zip`. +> Note: The PySpark package and "py4j" are required if you want to run the tests in `test/`. The packages are also included in Spark installations; Python binary is at `$SPARK_HOME/python` and package sources are at `$SPARK_HOME/python/lib/py4j-*-src.zip`. ## Compatibility and Requirements @@ -92,7 +92,7 @@ Note that the sample data is from an older crawl (`CC-MAIN-2017-13` run in March ## Process Common Crawl Data on Spark -CC-PySpark reads the list of input files from a manifest file. Typically, these are Common Crawl WARC, WAT or WET files, but it could be any other type of file, as long it is supported by the class implementing [CCSparkJob](./sparkcc.py). The files can be given as absolute URLs or as paths relative to a base URL (option `--input_base_url`). The URL cat point to a local file (`file://`), to a remote location (typically below `s3://commoncrawl/` resp. `https://data.commoncrawl.org/`). For development and testing, you'd start with local files. +CC-PySpark reads the list of input files from a manifest file. Typically, these are Common Crawl WARC, WAT or WET files, but it could be any other type of file, as long it is supported by the class implementing [CCSparkJob](src/cc_pyspark/sparkcc.py). The files can be given as absolute URLs or as paths relative to a base URL (option `--input_base_url`). The URL cat point to a local file (`file://`), to a remote location (typically below `s3://commoncrawl/` resp. `https://data.commoncrawl.org/`). For development and testing, you'd start with local files. ### Running locally @@ -256,10 +256,10 @@ Alternatively, it's possible configure the table schema explicitly: Replacing [FastWARC](https://resiliparse.chatnoir.eu/en/latest/man/fastwarc.html) can speed up job execution by 25% if little custom computations are done and most of the time is spent for parsing WARC files. To use FastWARC -- the job class must inherit from [CCFastWarcSparkJob](./sparkcc_fastwarc.py) instead of [CCSparkJob](./sparkcc.py). See [ServerCountFastWarcJob](./server_count_fastwarc.py) for an example. +- the job class must inherit from [CCFastWarcSparkJob](src/cc_pyspark/sparkcc_fastwarc.py) instead of [CCSparkJob](src/cc_pyspark/sparkcc.py). See [ServerCountFastWarcJob](src/cc_pyspark/jobs/server_count_fastwarc.py) for an example. - when running the job in a Spark cluster, `sparkcc_fastwarc.py` must be passed via `--py-files` in addition to `sparkcc.py` and further job-specific Python files. See also [running in a Spark cluster](#running-in-spark-cluster-over-large-amounts-of-data). -Some differences between the warcio and FastWARC APIs are hidden from the user in methods implemented in [CCSparkJob](./sparkcc.py) and [CCFastWarcSparkJob](./sparkcc_fastwarc.py) respectively. These methods allow to access WARC or HTTP headers and the payload stream in a unique way, regardless of whether warcio or FastWARC are used. +Some differences between the warcio and FastWARC APIs are hidden from the user in methods implemented in [CCSparkJob](src/cc_pyspark/sparkcc.py) and [CCFastWarcSparkJob](src/cc_pyspark/sparkcc_fastwarc.py) respectively. These methods allow to access WARC or HTTP headers and the payload stream in a unique way, regardless of whether warcio or FastWARC are used. However, it's recommended that you carefully verify that your custom job implementation works in combination with FastWARC. There are subtle differences between the warcio and FastWARC APIs, including the underlying classes (WARC/HTTP headers and stream implementations). In addition, FastWARC does not support for legacy ARC files and does not automatically decode HTTP content and transfer encodings (see [Resiliparse HTTP Tools](https://resiliparse.chatnoir.eu/en/latest/man/parse/http.html#read-chunked-http-payloads)). While content and transfer encodings are already decoded in Common Crawl WARC files, this may not be the case for WARC files from other sources. See also [WARC 1.1 specification, http/https response records](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#http-and-https-schemes). diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..de902c5 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,24 @@ +[project] +name = "cc-pyspark" +version = "0.1.0-dev0" +description = "Common Crawl data processing examples for PySpark." +readme = "README.md" +license = {text = "MIT"} +requires-python = ">=3.9" +dependencies = [ + "beautifulsoup4>=4.14.2", + "boto3>=1.40.63", + "botocore>=1.40.63", + "idna>=3.11", + "lxml>=6.0.2", + "orjson>=3.11.4", + "pytest>=8.4.2", + "pytest-mock>=3.15.1", + "requests>=2.32.5", + "ujson>=5.11.0", + "warcio>=1.7.5", +] +[tool.pytest.ini_options] +pythonpath = [ + "src" +] diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 9c2bbb9..0000000 --- a/requirements.txt +++ /dev/null @@ -1,33 +0,0 @@ -botocore -boto3 -requests -ujson -orjson -warcio - -# for link extraction and webgraph construction also: -idna - -# for parsing HTML (used in cc_index_word_count.py) -beautifulsoup4 -lxml - -# for HDFS support (requires environments variables JAVA_HOME and HADOOP_HOME): -#pydoop - -# to parse WARC/WAT/WET files using FastWARC (https://pypi.org/project/FastWARC/) -# cf. https://github.com/commoncrawl/cc-pyspark/issues/37 -#fastwarc -# (tested with) -#fastwarc==0.15.2 - -# to parse HTML (used in cc_index_word_count.py) using Resiliparse (https://pypi.org/project/Resiliparse/). -# Resiliparse requires compatible fastwarc version. -# cf. https://github.com/commoncrawl/cc-pyspark/issues/43 -#Resiliparse -# (tested with) -#Resiliparse==0.15.2 - -# testing -pytest -pytest-mock diff --git a/script/package_and_run_job.sh b/script/package_and_run_job.sh new file mode 100755 index 0000000..23eebfb --- /dev/null +++ b/script/package_and_run_job.sh @@ -0,0 +1,83 @@ + +DIR=$(dirname "$0") + +function usage() { + echo "Usage: $0 --job_module [--master ] [job_args...]" + echo "" + echo "Options:" + echo " --job_module The job module to run (e.g., cc_pyspark.jobs.server_count)" + echo " --master The Spark master URL (optional)" + echo " -h, --help Show this help message" + echo "" + echo "Positional Arguments:" + echo " job_args... Additional arguments to pass to the job" +} + +MASTER_URL= +JOB_MODULE= +while [ "$#" -gt 0 ]; do + case $1 in + --job_module) + JOB_MODULE=$2 + shift 2 + ;; + --master) + MASTER_URL=$2 + shift 2 + ;; + -h|--help|-?) + usage + exit 0 + ;; + *) + # remaining args are job args + break + ;; + esac +done + +if [ -z "$JOB_MODULE" ]; then + echo "Error: --job_module is required" + usage + exit 1 +fi +if [ -z "$MASTER_URL" ]; then + MASTER_URL="local[*]" +fi + +VENV_DIR=$(mktemp -d) +VENV_PACK_FILE=$(mktemp -u).tar.gz + +cleanup() { + rm -rf "$VENV_DIR" + echo rm -f "$VENV_PACK_FILE" + echo NOT DELETING "$VENV_PACK_FILE" "for debugging purposes" +} +trap cleanup EXIT + +set -e +set -x + +python -m venv $VENV_DIR +source $VENV_DIR/bin/activate +#pip install cc-pyspark +pip install . +pip install venv-pack +venv-pack -o $VENV_PACK_FILE +deactivate + +# if SPARK_HOME is not set, use `spark-submit` in path, otherwise use $SPARK_HOME/bin/spark-submit +if [ -z "$SPARK_HOME" ]; then + SPARK_SUBMIT="spark-submit" +else + SPARK_SUBMIT="$SPARK_HOME/bin/spark-submit" +fi + +export PYSPARK_PYTHON=./environment/bin/python +$SPARK_SUBMIT \ + --master $MASTER_URL \ + --archives "$VENV_PACK_FILE#environment" \ + "$DIR"/run_job.py \ + --job_module $JOB_MODULE \ + "$@" + diff --git a/script/run_job.py b/script/run_job.py new file mode 100644 index 0000000..ca73c2b --- /dev/null +++ b/script/run_job.py @@ -0,0 +1,43 @@ +import argparse +import importlib +import inspect +import sys + +from cc_pyspark.sparkcc import CCSparkJob + + +def load_and_run_job(module_name: str): + job_module = importlib.import_module(module_name) + + # Find the job class in the module + job_class = None + for name, obj in inspect.getmembers(job_module, inspect.isclass): + if obj.__module__ == job_module.__name__ and issubclass(obj, CCSparkJob): + print("found job class:", obj) + job_class = obj + break + + if job_class is None: + raise ValueError(f"No CCSparkJob subclass found in module {module_name}") + + job_instance = job_class() + print("running job:", job_instance) + job_instance.run() + + +def main(): + arg_parser = argparse.ArgumentParser() + arg_parser.add_argument('--job_module', type=str, required=True,) + + args, remaining = arg_parser.parse_known_args() + + # remove wrapper args from sys.argv so that job class can parse its own args cleanly + sys.argv = [sys.argv[0]] + remaining + + load_and_run_job(args.job_module) + + +if __name__ == '__main__': + main() + + diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/cc_pyspark/__init__.py b/src/cc_pyspark/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bs4_parser.py b/src/cc_pyspark/bs4_parser.py similarity index 100% rename from bs4_parser.py rename to src/cc_pyspark/bs4_parser.py diff --git a/iana_tld.py b/src/cc_pyspark/iana_tld.py similarity index 100% rename from iana_tld.py rename to src/cc_pyspark/iana_tld.py diff --git a/src/cc_pyspark/jobs/__init__.py b/src/cc_pyspark/jobs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/cc_index_export.py b/src/cc_pyspark/jobs/cc_index_export.py similarity index 81% rename from cc_index_export.py rename to src/cc_pyspark/jobs/cc_index_export.py index 2a70344..4519528 100644 --- a/cc_index_export.py +++ b/src/cc_pyspark/jobs/cc_index_export.py @@ -1,4 +1,4 @@ -from sparkcc import CCIndexSparkJob +from cc_pyspark.sparkcc import CCIndexSparkJob class CCIndexExportJob(CCIndexSparkJob): diff --git a/cc_index_word_count.py b/src/cc_pyspark/jobs/cc_index_word_count.py similarity index 98% rename from cc_index_word_count.py rename to src/cc_pyspark/jobs/cc_index_word_count.py index bca69a5..1dd7e04 100644 --- a/cc_index_word_count.py +++ b/src/cc_pyspark/jobs/cc_index_word_count.py @@ -1,6 +1,6 @@ from collections import Counter -from sparkcc import CCIndexWarcSparkJob +from cc_pyspark.sparkcc import CCIndexWarcSparkJob from word_count import WordCountJob diff --git a/hostlinks_extract_fastwarc.py b/src/cc_pyspark/jobs/hostlinks_extract_fastwarc.py similarity index 100% rename from hostlinks_extract_fastwarc.py rename to src/cc_pyspark/jobs/hostlinks_extract_fastwarc.py diff --git a/hostlinks_to_graph.py b/src/cc_pyspark/jobs/hostlinks_to_graph.py similarity index 99% rename from hostlinks_to_graph.py rename to src/cc_pyspark/jobs/hostlinks_to_graph.py index 32d3833..776d077 100644 --- a/hostlinks_to_graph.py +++ b/src/cc_pyspark/jobs/hostlinks_to_graph.py @@ -2,7 +2,7 @@ import logging import os -from sparkcc import CCSparkJob +from cc_pyspark.sparkcc import CCSparkJob from pyspark.sql import functions as sqlf from pyspark.sql.types import BooleanType, LongType, StringType, StructField, StructType diff --git a/html_tag_count.py b/src/cc_pyspark/jobs/html_tag_count.py similarity index 95% rename from html_tag_count.py rename to src/cc_pyspark/jobs/html_tag_count.py index ae5f579..baec297 100644 --- a/html_tag_count.py +++ b/src/cc_pyspark/jobs/html_tag_count.py @@ -2,7 +2,7 @@ from collections import Counter -from sparkcc import CCSparkJob +from cc_pyspark.sparkcc import CCSparkJob class TagCountJob(CCSparkJob): diff --git a/linkmap2parquet.py b/src/cc_pyspark/jobs/linkmap2parquet.py similarity index 96% rename from linkmap2parquet.py rename to src/cc_pyspark/jobs/linkmap2parquet.py index e57ca5c..e69122f 100644 --- a/linkmap2parquet.py +++ b/src/cc_pyspark/jobs/linkmap2parquet.py @@ -1,4 +1,4 @@ -from sparkcc import CCSparkJob +from cc_pyspark.sparkcc import CCSparkJob from pyspark.sql.types import StructType, StructField, StringType diff --git a/server_count.py b/src/cc_pyspark/jobs/server_count.py similarity index 97% rename from server_count.py rename to src/cc_pyspark/jobs/server_count.py index a10ee76..c01f7b4 100644 --- a/server_count.py +++ b/src/cc_pyspark/jobs/server_count.py @@ -1,5 +1,5 @@ -from sparkcc import CCSparkJob -from json_importer import json +from cc_pyspark.sparkcc import CCSparkJob +from cc_pyspark.json_importer import json class ServerCountJob(CCSparkJob): diff --git a/server_count_fastwarc.py b/src/cc_pyspark/jobs/server_count_fastwarc.py similarity index 100% rename from server_count_fastwarc.py rename to src/cc_pyspark/jobs/server_count_fastwarc.py diff --git a/server_ip_address.py b/src/cc_pyspark/jobs/server_ip_address.py similarity index 98% rename from server_ip_address.py rename to src/cc_pyspark/jobs/server_ip_address.py index bf3f30c..9ca0019 100644 --- a/server_ip_address.py +++ b/src/cc_pyspark/jobs/server_ip_address.py @@ -4,7 +4,7 @@ from pyspark.sql.types import StructType, StructField, StringType, LongType -from sparkcc import CCSparkJob +from cc_pyspark.sparkcc import CCSparkJob class ServerIPAddressJob(CCSparkJob): diff --git a/src/cc_pyspark/jobs/sitemaps_from_robotstxt.py b/src/cc_pyspark/jobs/sitemaps_from_robotstxt.py new file mode 100644 index 0000000..ea86f1a --- /dev/null +++ b/src/cc_pyspark/jobs/sitemaps_from_robotstxt.py @@ -0,0 +1,128 @@ +import re +from urllib.parse import urlparse, urljoin + +from pyspark.sql.types import StructType, StructField, StringType, ArrayType +from warcio.recordloader import ArcWarcRecord + +from cc_pyspark.sparkcc import CCSparkJob + +class SitemapExtractorJob(CCSparkJob): + """Extract sitemap URLs (http://www.sitemaps.org/) from robots.txt WARC files.""" + + name = "SitemapExtractor" + + output_schema = StructType([ + StructField('sitemap_url', StringType(), True), + StructField('hosts', ArrayType(elementType=StringType()), True) + ]) + + # rb: match on raw bytes so we can defer utf-8 decoding to the `sitemap:` line + sitemap_pattern = re.compile(rb'^sitemap:\s*(\S+)', re.I) + + robots_txt_processed = None + sitemap_urls_found = None + sitemap_url_invalid_encoding = None + robots_txt_announcing_sitemap = None + robots_txt_with_more_than_50_sitemaps = None + + + def log_accumulators(self, session): + super(SitemapExtractorJob, self).log_accumulators(session) + + self.log_accumulator(session, self.robots_txt_processed, + 'robots.txt successfully parsed = {}') + self.log_accumulator(session, self.sitemap_urls_found, + 'sitemap urls found = {}') + self.log_accumulator(session, self.sitemap_url_invalid_encoding, + 'sitemap urls with invalid utf-8 encoding = {}') + self.log_accumulator(session, self.robots_txt_announcing_sitemap, + 'robots.txt announcing at least 1 sitemap = {}') + self.log_accumulator(session, self.robots_txt_with_more_than_50_sitemaps, + 'robots.txt with more than 50 sitemaps = {}') + + + def init_accumulators(self, session): + super(SitemapExtractorJob, self).init_accumulators(session) + + sc = session.sparkContext + self.robots_txt_processed = sc.accumulator(0) + self.sitemap_urls_found = sc.accumulator(0) + self.sitemap_url_invalid_encoding = sc.accumulator(0) + self.robots_txt_announcing_sitemap = sc.accumulator(0) + self.robots_txt_with_more_than_50_sitemaps = sc.accumulator(0) + + + def process_record(self, record: ArcWarcRecord): + """ emit: sitemap_url => [host] """ + if not self.is_response_record(record): + # we're only interested in the HTTP responses + return + + self.robots_txt_processed.add(1) + # robots_txt url/host are lazily computed when we encounter the first valid sitemap URL + robots_txt_url = None + robots_txt_host = None + n_sitemaps = 0 + + data = self.get_payload_stream(record).read() + for raw_line in data.splitlines(): + raw_line = raw_line.strip() + + match = SitemapExtractorJob.sitemap_pattern.match(raw_line) + if match: + sitemap_url = match.group(1).strip() + self.sitemap_urls_found.add(1) + try: + sitemap_url = sitemap_url.decode("utf-8", "strict") + except UnicodeDecodeError as e: + self.get_logger().warn(f'Invalid encoding of sitemap URL {sitemap_url}: {repr(e)}') + self.sitemap_url_invalid_encoding.add(1) + continue + + if robots_txt_url is None: + # first sitemap found: set base URL and get host from URL + robots_txt_url = self.get_warc_header(record, 'WARC-Target-URI') + try: + robots_txt_host = urlparse(robots_txt_url).netloc.lower().lstrip('.') + except Exception as e1: + try: + self.get_logger().warn(f'Invalid robots.txt URL: {robots_txt_url} - {repr(e1)}') + except Exception as e2: + self.get_logger().warn(f'Invalid robots.txt URL - {repr(e1)} (cannot display: {repr(e2)})') + # skip this entire robots.txt record + return + + if not (sitemap_url.startswith('http:') or sitemap_url.startswith('https:')): + # sitemap_url is relative; pass straight to urljoin which knows how to handle it correctly + try: + sitemap_url = urljoin(robots_txt_url, sitemap_url) + except Exception as e: + try: + self.get_logger().warn(f'Error joining sitemap URL {sitemap_url} with base {robots_txt_url}: {repr(e)}') + except Exception as log_e: + self.get_logger().warn(f'Error joining sitemap URL with base - {repr(e)} (cannot display: {repr(log_e)})') + continue + + yield sitemap_url, [robots_txt_host] + n_sitemaps += 1 + + if n_sitemaps > 0: + self.robots_txt_announcing_sitemap.add(1) + if n_sitemaps > 50: + self.robots_txt_with_more_than_50_sitemaps.add(1) + + + def _try_parse_host(self, url: str, label_for_log: str) -> str|None: + try: + return urlparse(url).netloc.lower().lstrip('.') + except Exception as e: + try: + self.get_logger().warn(f'Invalid {label_for_log} URL: {url} - {repr(e)}') + except Exception as log_e: + self.get_logger().warn(f'Invalid {label_for_log} URL - {repr(e)} (cannot display: {repr(log_e)})') + return None + + +if __name__ == '__main__': + job = SitemapExtractorJob() + job.run() diff --git a/src/cc_pyspark/jobs/sitemaps_from_robotstxt_fastwarc.py b/src/cc_pyspark/jobs/sitemaps_from_robotstxt_fastwarc.py new file mode 100644 index 0000000..ce1d3a7 --- /dev/null +++ b/src/cc_pyspark/jobs/sitemaps_from_robotstxt_fastwarc.py @@ -0,0 +1,20 @@ +from fastwarc.warc import WarcRecordType + +from sparkcc_fastwarc import CCFastWarcSparkJob +from sitemaps_from_robotstxt import SitemapExtractorJob + + +class SitemapExtractorFastWarcJob(SitemapExtractorJob, CCFastWarcSparkJob): + """Extract sitemap URLs (http://www.sitemaps.org/) from robots.txt WARC files + using FastWARC to parse WARC files.""" + + name = "SitemapExtractorFastWarc" + + # process only WARC response and metadata (including WAT) records + fastwarc_record_filter = WarcRecordType.response + + # process_record is implemented by SitemapExtractorJob + +if __name__ == '__main__': + job = SitemapExtractorFastWarcJob() + job.run() diff --git a/wat_extract_links.py b/src/cc_pyspark/jobs/wat_extract_links.py similarity index 99% rename from wat_extract_links.py rename to src/cc_pyspark/jobs/wat_extract_links.py index c09e103..91cd393 100644 --- a/wat_extract_links.py +++ b/src/cc_pyspark/jobs/wat_extract_links.py @@ -6,8 +6,8 @@ from pyspark.sql.types import StructType, StructField, StringType -from sparkcc import CCSparkJob -from json_importer import json +from cc_pyspark.sparkcc import CCSparkJob +from src.cc_pyspark.json_importer import json class ExtractLinksJob(CCSparkJob): diff --git a/word_count.py b/src/cc_pyspark/jobs/word_count.py similarity index 96% rename from word_count.py rename to src/cc_pyspark/jobs/word_count.py index 0b5d5a0..4651c8b 100644 --- a/word_count.py +++ b/src/cc_pyspark/jobs/word_count.py @@ -4,7 +4,7 @@ from pyspark.sql.types import StructType, StructField, StringType, LongType -from sparkcc import CCSparkJob +from cc_pyspark.sparkcc import CCSparkJob class WordCountJob(CCSparkJob): diff --git a/json_importer.py b/src/cc_pyspark/json_importer.py similarity index 100% rename from json_importer.py rename to src/cc_pyspark/json_importer.py diff --git a/resiliparse_parser.py b/src/cc_pyspark/resiliparse_parser.py similarity index 100% rename from resiliparse_parser.py rename to src/cc_pyspark/resiliparse_parser.py diff --git a/sparkcc.py b/src/cc_pyspark/sparkcc.py similarity index 100% rename from sparkcc.py rename to src/cc_pyspark/sparkcc.py diff --git a/sparkcc_fastwarc.py b/src/cc_pyspark/sparkcc_fastwarc.py similarity index 98% rename from sparkcc_fastwarc.py rename to src/cc_pyspark/sparkcc_fastwarc.py index 71664f8..cbbb97f 100644 --- a/sparkcc_fastwarc.py +++ b/src/cc_pyspark/sparkcc_fastwarc.py @@ -2,7 +2,7 @@ from fastwarc.warc import WarcRecordType, WarcRecord from fastwarc.stream_io import FastWARCError -from sparkcc import CCSparkJob +from cc_pyspark.sparkcc import CCSparkJob class CCFastWarcSparkJob(CCSparkJob): diff --git a/test/test_sitemaps_from_robotstxt.py b/test/test_sitemaps_from_robotstxt.py index ba32527..02b5fdc 100644 --- a/test/test_sitemaps_from_robotstxt.py +++ b/test/test_sitemaps_from_robotstxt.py @@ -7,8 +7,8 @@ from pyspark.sql import SparkSession from warcio.recordloader import ArcWarcRecord -from sitemaps_from_robotstxt import SitemapExtractorJob -from sparkcc import CCSparkJob +from cc_pyspark.jobs.sitemaps_from_robotstxt import SitemapExtractorJob +from cc_pyspark.sparkcc import CCSparkJob from utils import _process_jobs