Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions tests/unit/packaging/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import builtins
import tempfile

from contextlib import contextmanager
from itertools import product
from pathlib import Path

import pretend
import pytest
Expand All @@ -24,8 +26,11 @@
import warehouse.packaging.tasks

from warehouse.accounts.models import WebAuthn
from warehouse.metrics.interfaces import IMetricsService
from warehouse.packaging.interfaces import IFileStorage
from warehouse.packaging.models import Description
from warehouse.packaging.tasks import (
backfill_metadata,
check_file_cache_tasks_outstanding,
compute_2fa_metrics,
compute_packaging_metrics,
Expand Down Expand Up @@ -888,3 +893,106 @@ def test_compute_2fa_metrics(db_request, monkeypatch):
pretend.call("warehouse.2fa.total_users_with_webauthn_enabled", 1),
pretend.call("warehouse.2fa.total_users_with_two_factor_enabled", 2),
]


def test_backfill_metadata(db_request, monkeypatch, metrics):
project = ProjectFactory()
release1 = ReleaseFactory(project=project)
release2 = ReleaseFactory(project=project)
FileFactory(release=release1, packagetype="sdist")
FileFactory(
release=release1,
packagetype="bdist_wheel",
metadata_file_sha256_digest="d34db33f",
)
FileFactory(release=release2, packagetype="sdist")
backfillable_file = FileFactory(
release=release2, packagetype="bdist_wheel", metadata_file_sha256_digest=None
)

metadata_contents = b"some\nmetadata\ncontents"
stub_dist = pretend.stub(
_dist=pretend.stub(_files={Path("METADATA"): metadata_contents})
)
stub_session = pretend.stub()
dist_from_wheel_url = pretend.call_recorder(
lambda project_name, file_url, session: stub_dist
)
monkeypatch.setattr(
warehouse.packaging.tasks, "dist_from_wheel_url", dist_from_wheel_url
)
monkeypatch.setattr(warehouse.packaging.tasks, "PipSession", lambda: stub_session)
archive_storage = pretend.stub(
store=pretend.call_recorder(lambda path_out, path_in, meta: None),
)
cache_storage = pretend.stub(
store=pretend.call_recorder(lambda path_out, path_in, meta: None),
)
db_request.find_service = pretend.call_recorder(
lambda iface, name=None, context=None: {
IFileStorage: {
"archive": archive_storage,
"cache": cache_storage,
},
IMetricsService: {None: metrics},
}[iface][name]
)

@contextmanager
def mock_temporary_directory():
yield "/tmp/wutang"

monkeypatch.setattr(tempfile, "TemporaryDirectory", mock_temporary_directory)

mock_write = pretend.call_recorder(lambda value: None)

@contextmanager
def mock_open(filename, perms):
yield pretend.stub(write=mock_write)

monkeypatch.setattr(builtins, "open", mock_open)

db_request.registry.settings[
"files.url"
] = "https://files.example.com/packages/{path}"

backfill_metadata(db_request)

assert dist_from_wheel_url.calls == [
pretend.call(
project.normalized_name,
f"https://files.example.com/packages/{backfillable_file.path}",
stub_session,
)
]

assert backfillable_file.metadata_file_sha256_digest == (
"e85ce4c9e2d2eddba19c396ed04470efaa2a9c2a6b3c6463e6876a41e55d828d"
)
assert backfillable_file.metadata_file_blake2_256_digest == (
"39cc629504be4087d48889e8666392bd379b91e1826e269cd8467bb29298da82"
)
assert (
archive_storage.store.calls
== cache_storage.store.calls
== [
pretend.call(
backfillable_file.metadata_path,
f"/tmp/wutang/{backfillable_file.filename}.metadata",
meta={
"project": project.normalized_name,
"version": release2.version,
"package-type": backfillable_file.packagetype,
"python-version": backfillable_file.python_version,
},
),
]
)

assert metrics.increment.calls == [
pretend.call("warehouse.packaging.metadata_backfill.files"),
pretend.call("warehouse.packaging.metadata_backfill.tasks"),
]
assert metrics.gauge.calls == [
pretend.call("warehouse.packaging.metadata_backfill.remaining", 0)
]
4 changes: 4 additions & 0 deletions warehouse/packaging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from warehouse.packaging.models import File, Project, Release, Role
from warehouse.packaging.services import project_service_factory
from warehouse.packaging.tasks import (
backfill_metadata,
check_file_cache_tasks_outstanding,
compute_2fa_metrics,
compute_packaging_metrics,
Expand Down Expand Up @@ -193,3 +194,6 @@ def includeme(config):
# TODO: restore this
# if config.get_settings().get("warehouse.release_files_table"):
# config.add_periodic_task(crontab(minute=0), sync_bigquery_release_files)

# Backfill wheel metadata
config.add_periodic_task(crontab(minute="*/5"), backfill_metadata)
77 changes: 77 additions & 0 deletions warehouse/packaging/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,19 @@
# limitations under the License.

import datetime
import hashlib
import logging
import os
import tempfile

from collections import namedtuple
from itertools import product
from pathlib import Path

from google.cloud.bigquery import LoadJobConfig
from pip._internal.network.lazy_wheel import dist_from_wheel_url
from pip._internal.network.session import PipSession
from sqlalchemy import desc
from sqlalchemy.orm import joinedload

from warehouse import tasks
Expand Down Expand Up @@ -54,6 +60,77 @@ def sync_file_to_cache(request, file_id):
file.cached = True


@tasks.task(ignore_result=True, acks_late=True)
def backfill_metadata(request):
metrics = request.find_service(IMetricsService, context=None)
base_url = request.registry.settings.get("files.url")

archive_storage = request.find_service(IFileStorage, name="archive")
cache_storage = request.find_service(IFileStorage, name="cache")
session = PipSession()

# Get all wheel files without metadata in reverse chronologicial order
files_without_metadata = (
request.db.query(File)
.filter(File.packagetype == "bdist_wheel")
.filter(File.metadata_file_sha256_digest.is_(None))
.order_by(desc(File.upload_time))
)

with tempfile.TemporaryDirectory() as tmpdir:
for file_ in files_without_metadata.yield_per(100):
# Use pip to download just the metadata of the wheel
file_url = base_url.format(path=file_.path)
lazy_dist = dist_from_wheel_url(
file_.release.project.normalized_name, file_url, session
)
wheel_metadata_contents = lazy_dist._dist._files[Path("METADATA")]

# Write the metadata to a temporary file
temporary_filename = os.path.join(tmpdir, file_.filename) + ".metadata"
with open(temporary_filename, "wb") as fp:
fp.write(wheel_metadata_contents)

# Hash the metadata and add it to the File instance
file_.metadata_file_sha256_digest = (
hashlib.sha256(wheel_metadata_contents).hexdigest().lower()
)
file_.metadata_file_blake2_256_digest = (
hashlib.blake2b(wheel_metadata_contents, digest_size=256 // 8)
.hexdigest()
.lower()
)
Comment on lines +94 to +102
Copy link

@edmorley edmorley Feb 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume the digests here are written to the DB immediately? (Rather than when _file's destructor is called)

If so, it seems like there is a failure mode here that could leave wheels without their metadata file?

ie: If the backfill task's worker process gets killed/restarted between writing the digests and storing the metadata file in the storage backend (or if the write to the storage backend fails). In such a case, the next time the backfill tasks runs, the wheel in question wouldn't be returned by the DB query since the digests would then be set - even though the metadata file was never uploaded.

If setting the digests was moved after writing to the storage backend, then in the case of an interrupted backfill task, the inconsistency between storage backend (which would have the uploaded file) and DB digest state (which wouldn't yet have the digest) would instead self-heal on the next run.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a non-issue, the changes to the database aren't persisted until the entire request has completed and a transaction is committed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good to know. (The fact that everything was configured to use transactions isn't obvious from code inspection of this single task.)


# Store the metadata file via our object storage backend
archive_storage.store(
file_.metadata_path,
temporary_filename,
meta={
"project": file_.release.project.normalized_name,
"version": file_.release.version,
"package-type": file_.packagetype,
"python-version": file_.python_version,
},
)
# Write it to our storage cache as well
cache_storage.store(
file_.metadata_path,
temporary_filename,
meta={
"project": file_.release.project.normalized_name,
"version": file_.release.version,
"package-type": file_.packagetype,
"python-version": file_.python_version,
},
)
metrics.increment("warehouse.packaging.metadata_backfill.files")
metrics.increment("warehouse.packaging.metadata_backfill.tasks")
metrics.gauge(
"warehouse.packaging.metadata_backfill.remaining",
files_without_metadata.count(),
)


@tasks.task(ignore_result=True, acks_late=True)
def compute_packaging_metrics(request):
counts = dict(
Expand Down