From 286f529cca19f3ae3f71997da73ccc5b6acda288 Mon Sep 17 00:00:00 2001 From: Dustin Ingram Date: Wed, 14 Feb 2024 22:50:52 +0000 Subject: [PATCH 1/4] Fan out backfill tasks --- tests/unit/packaging/test_tasks.py | 47 ++++++++--- warehouse/packaging/tasks.py | 120 +++++++++++++++-------------- 2 files changed, 99 insertions(+), 68 deletions(-) diff --git a/tests/unit/packaging/test_tasks.py b/tests/unit/packaging/test_tasks.py index 0e1170f87923..2ca48e4fbba5 100644 --- a/tests/unit/packaging/test_tasks.py +++ b/tests/unit/packaging/test_tasks.py @@ -32,6 +32,7 @@ from warehouse.packaging.models import Description from warehouse.packaging.tasks import ( backfill_metadata, + backfill_metadata_individual, check_file_cache_tasks_outstanding, compute_2fa_metrics, compute_packaging_metrics, @@ -911,6 +912,37 @@ def test_backfill_metadata(db_request, monkeypatch, metrics): release=release2, packagetype="bdist_wheel", metadata_file_sha256_digest=None ) + delay = pretend.call_recorder(lambda x: None) + db_request.task = pretend.call_recorder(lambda x: pretend.stub(delay=delay)) + + backfill_metadata(db_request) + + assert db_request.task.calls == [pretend.call(backfill_metadata_individual)] + assert delay.calls == [pretend.call(backfillable_file.id)] + + assert metrics.increment.calls == [ + pretend.call("warehouse.packaging.metadata_backfill.tasks"), + ] + assert metrics.gauge.calls == [ + pretend.call("warehouse.packaging.metadata_backfill.remaining", 1) + ] + + +def test_backfill_metadata_individual(db_request, monkeypatch, metrics): + project = ProjectFactory() + release1 = ReleaseFactory(project=project) + release2 = ReleaseFactory(project=project) + FileFactory(release=release1, packagetype="sdist") + FileFactory( + release=release1, + packagetype="bdist_wheel", + metadata_file_sha256_digest="d34db33f", + ) + FileFactory(release=release2, packagetype="sdist") + backfillable_file = FileFactory( + release=release2, packagetype="bdist_wheel", metadata_file_sha256_digest=None + ) + metadata_contents = b"some\nmetadata\ncontents" stub_dist = pretend.stub( _dist=pretend.stub(_files={Path("METADATA"): metadata_contents}) @@ -957,7 +989,7 @@ def mock_open(filename, perms): "files.url" ] = "https://files.example.com/packages/{path}" - backfill_metadata(db_request) + backfill_metadata_individual(pretend.stub(), db_request, backfillable_file.id) assert dist_from_wheel_url.calls == [ pretend.call( @@ -992,10 +1024,6 @@ def mock_open(filename, perms): assert metrics.increment.calls == [ pretend.call("warehouse.packaging.metadata_backfill.files"), - pretend.call("warehouse.packaging.metadata_backfill.tasks"), - ] - assert metrics.gauge.calls == [ - pretend.call("warehouse.packaging.metadata_backfill.remaining", 0) ] @@ -1027,12 +1055,7 @@ def test_backfill_metadata_file_unbackfillable(db_request, monkeypatch, metrics) assert backfillable_file.metadata_file_unbackfillable is False - backfill_metadata(db_request) + backfill_metadata_individual(pretend.stub(), db_request, backfillable_file.id) assert backfillable_file.metadata_file_unbackfillable is True - assert metrics.increment.calls == [ - pretend.call("warehouse.packaging.metadata_backfill.tasks"), - ] - assert metrics.gauge.calls == [ - pretend.call("warehouse.packaging.metadata_backfill.remaining", 0) - ] + assert metrics.increment.calls == [] diff --git a/warehouse/packaging/tasks.py b/warehouse/packaging/tasks.py index c60271cc66f2..25db6a8291b2 100644 --- a/warehouse/packaging/tasks.py +++ b/warehouse/packaging/tasks.py @@ -64,11 +64,6 @@ def sync_file_to_cache(request, file_id): @tasks.task(ignore_result=True, acks_late=True) def backfill_metadata(request): metrics = request.find_service(IMetricsService, context=None) - base_url = request.registry.settings.get("files.url") - - archive_storage = request.find_service(IFileStorage, name="archive") - cache_storage = request.find_service(IFileStorage, name="cache") - session = PipSession() # Get all wheel files without metadata in reverse chronologicial order files_without_metadata = ( @@ -79,64 +74,77 @@ def backfill_metadata(request): .order_by(desc(File.upload_time)) ) - with tempfile.TemporaryDirectory() as tmpdir: - for file_ in files_without_metadata.yield_per(100).limit(500): - # Use pip to download just the metadata of the wheel - file_url = base_url.format(path=file_.path) - try: - lazy_dist = dist_from_wheel_url( - file_.release.project.normalized_name, file_url, session - ) - wheel_metadata_contents = lazy_dist._dist._files[Path("METADATA")] - except UnsupportedWheel: - file_.metadata_file_unbackfillable = True - continue - - # Write the metadata to a temporary file - temporary_filename = os.path.join(tmpdir, file_.filename) + ".metadata" - with open(temporary_filename, "wb") as fp: - fp.write(wheel_metadata_contents) + for file_ in files_without_metadata.yield_per(100).limit(500): + request.task(backfill_metadata_individual).delay(file_.id) - # Hash the metadata and add it to the File instance - file_.metadata_file_sha256_digest = ( - hashlib.sha256(wheel_metadata_contents).hexdigest().lower() - ) - file_.metadata_file_blake2_256_digest = ( - hashlib.blake2b(wheel_metadata_contents, digest_size=256 // 8) - .hexdigest() - .lower() - ) - - # Store the metadata file via our object storage backend - archive_storage.store( - file_.metadata_path, - temporary_filename, - meta={ - "project": file_.release.project.normalized_name, - "version": file_.release.version, - "package-type": file_.packagetype, - "python-version": file_.python_version, - }, - ) - # Write it to our storage cache as well - cache_storage.store( - file_.metadata_path, - temporary_filename, - meta={ - "project": file_.release.project.normalized_name, - "version": file_.release.version, - "package-type": file_.packagetype, - "python-version": file_.python_version, - }, - ) - metrics.increment("warehouse.packaging.metadata_backfill.files") - metrics.increment("warehouse.packaging.metadata_backfill.tasks") + metrics.increment("warehouse.packaging.metadata_backfill.tasks") metrics.gauge( "warehouse.packaging.metadata_backfill.remaining", files_without_metadata.count(), ) +@tasks.task(ignore_result=True, acks_late=True) +def backfill_metadata_individual(_task, request, file_id): + file_ = request.db.get(File, file_id) + base_url = request.registry.settings.get("files.url") + file_url = base_url.format(path=file_.path) + metrics = request.find_service(IMetricsService, context=None) + archive_storage = request.find_service(IFileStorage, name="archive") + cache_storage = request.find_service(IFileStorage, name="cache") + session = PipSession() + + # Use pip to download just the metadata of the wheel + try: + lazy_dist = dist_from_wheel_url( + file_.release.project.normalized_name, file_url, session + ) + wheel_metadata_contents = lazy_dist._dist._files[Path("METADATA")] + except UnsupportedWheel: + file_.metadata_file_unbackfillable = True + return + + # Hash the metadata and add it to the File instance + file_.metadata_file_sha256_digest = ( + hashlib.sha256(wheel_metadata_contents).hexdigest().lower() + ) + file_.metadata_file_blake2_256_digest = ( + hashlib.blake2b(wheel_metadata_contents, digest_size=256 // 8) + .hexdigest() + .lower() + ) + + with tempfile.TemporaryDirectory() as tmpdir: + # Write the metadata to a temporary file + temporary_filename = os.path.join(tmpdir, file_.filename) + ".metadata" + with open(temporary_filename, "wb") as fp: + fp.write(wheel_metadata_contents) + + # Store the metadata file via our object storage backend + archive_storage.store( + file_.metadata_path, + temporary_filename, + meta={ + "project": file_.release.project.normalized_name, + "version": file_.release.version, + "package-type": file_.packagetype, + "python-version": file_.python_version, + }, + ) + # Write it to our storage cache as well + cache_storage.store( + file_.metadata_path, + temporary_filename, + meta={ + "project": file_.release.project.normalized_name, + "version": file_.release.version, + "package-type": file_.packagetype, + "python-version": file_.python_version, + }, + ) + metrics.increment("warehouse.packaging.metadata_backfill.files") + + @tasks.task(ignore_result=True, acks_late=True) def compute_packaging_metrics(request): counts = dict( From 74448038a51517f652e6f7cce4f1ace53aac6b50 Mon Sep 17 00:00:00 2001 From: Dustin Ingram Date: Thu, 15 Feb 2024 15:09:13 +0000 Subject: [PATCH 2/4] Set the batch size limit via env var --- tests/unit/packaging/test_tasks.py | 1 + warehouse/config.py | 7 +++++++ warehouse/packaging/tasks.py | 4 ++-- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/unit/packaging/test_tasks.py b/tests/unit/packaging/test_tasks.py index 2ca48e4fbba5..c03b3756d46b 100644 --- a/tests/unit/packaging/test_tasks.py +++ b/tests/unit/packaging/test_tasks.py @@ -914,6 +914,7 @@ def test_backfill_metadata(db_request, monkeypatch, metrics): delay = pretend.call_recorder(lambda x: None) db_request.task = pretend.call_recorder(lambda x: pretend.stub(delay=delay)) + db_request.registry.settings["backfill_metadata.batch_size"] = 500 backfill_metadata(db_request) diff --git a/warehouse/config.py b/warehouse/config.py index 22912f76c1a3..cc666a82fab3 100644 --- a/warehouse/config.py +++ b/warehouse/config.py @@ -326,6 +326,13 @@ def configure(settings=None): coercer=int, default=100, ) + maybe_set( + settings, + "metadata_backfill.batch_size", + "METADATA_BACKFILL_BATCH_SIZE", + coercer=int, + default=500, + ) maybe_set_compound(settings, "billing", "backend", "BILLING_BACKEND") maybe_set_compound(settings, "files", "backend", "FILES_BACKEND") maybe_set_compound(settings, "archive_files", "backend", "ARCHIVE_FILES_BACKEND") diff --git a/warehouse/packaging/tasks.py b/warehouse/packaging/tasks.py index 25db6a8291b2..65784543bda2 100644 --- a/warehouse/packaging/tasks.py +++ b/warehouse/packaging/tasks.py @@ -64,6 +64,7 @@ def sync_file_to_cache(request, file_id): @tasks.task(ignore_result=True, acks_late=True) def backfill_metadata(request): metrics = request.find_service(IMetricsService, context=None) + batch_size = request.registry.settings["backfill_metadata.batch_size"] # Get all wheel files without metadata in reverse chronologicial order files_without_metadata = ( @@ -73,8 +74,7 @@ def backfill_metadata(request): .filter(File.metadata_file_unbackfillable.isnot(True)) .order_by(desc(File.upload_time)) ) - - for file_ in files_without_metadata.yield_per(100).limit(500): + for file_ in files_without_metadata.yield_per(100).limit(batch_size): request.task(backfill_metadata_individual).delay(file_.id) metrics.increment("warehouse.packaging.metadata_backfill.tasks") From 37f93543bbae9580551936dbdcdc064f09e0b391 Mon Sep 17 00:00:00 2001 From: Dustin Ingram Date: Thu, 15 Feb 2024 15:21:18 +0000 Subject: [PATCH 3/4] Resolve confusion between cache/archive storage --- warehouse/packaging/tasks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/warehouse/packaging/tasks.py b/warehouse/packaging/tasks.py index 65784543bda2..7f65f8f9a4e3 100644 --- a/warehouse/packaging/tasks.py +++ b/warehouse/packaging/tasks.py @@ -90,8 +90,8 @@ def backfill_metadata_individual(_task, request, file_id): base_url = request.registry.settings.get("files.url") file_url = base_url.format(path=file_.path) metrics = request.find_service(IMetricsService, context=None) - archive_storage = request.find_service(IFileStorage, name="archive") cache_storage = request.find_service(IFileStorage, name="cache") + archive_storage = request.find_service(IFileStorage, name="archive") session = PipSession() # Use pip to download just the metadata of the wheel @@ -121,7 +121,7 @@ def backfill_metadata_individual(_task, request, file_id): fp.write(wheel_metadata_contents) # Store the metadata file via our object storage backend - archive_storage.store( + cache_storage.store( file_.metadata_path, temporary_filename, meta={ @@ -131,8 +131,8 @@ def backfill_metadata_individual(_task, request, file_id): "python-version": file_.python_version, }, ) - # Write it to our storage cache as well - cache_storage.store( + # Write it to our archive storage as well + archive_storage.store( file_.metadata_path, temporary_filename, meta={ From 3e68c67eab0c65eb47b1eda4763fd14ddfc03729 Mon Sep 17 00:00:00 2001 From: Dustin Ingram Date: Thu, 15 Feb 2024 15:40:32 +0000 Subject: [PATCH 4/4] Update test --- tests/unit/test_config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index b46788315570..7a9fe2b5bf43 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -252,6 +252,7 @@ def __init__(self): "oidc.backend": "warehouse.oidc.services.OIDCPublisherService", "warehouse.organizations.max_undecided_organization_applications": 3, "reconcile_file_storages.batch_size": 100, + "metadata_backfill.batch_size": 500, "gcloud.service_account_info": {}, } if environment == config.Environment.development: