From a689ffc819bce91336ddec365042ea7ccd1a1bf3 Mon Sep 17 00:00:00 2001 From: Danny McClanahan <1305167+cosmicexplorer@users.noreply.github.com> Date: Mon, 19 Aug 2024 11:10:39 -0400 Subject: [PATCH 1/6] improve pooled progress output for BatchDownloader - use more specific types for BatchDownloader#__call__ - calculate byte lengths with a HEAD request - quiet all progress output from -q - don't write colored output with --no-color - write a lot more documentation for the new progress bar logic - use ProgressBarType enum for --progress-bar CLI flag --- news/12925.feature.rst | 1 + src/pip/_internal/cli/cmdoptions.py | 11 +- src/pip/_internal/cli/progress_bars.py | 543 ++++++++++++++++++++++-- src/pip/_internal/cli/req_command.py | 5 +- src/pip/_internal/network/download.py | 183 ++++++-- src/pip/_internal/network/utils.py | 1 + src/pip/_internal/operations/prepare.py | 40 +- src/pip/_internal/req/__init__.py | 7 +- tests/unit/test_network_download.py | 11 +- tests/unit/test_operations_prepare.py | 5 +- tests/unit/test_req.py | 5 +- 11 files changed, 711 insertions(+), 101 deletions(-) create mode 100644 news/12925.feature.rst diff --git a/news/12925.feature.rst b/news/12925.feature.rst new file mode 100644 index 00000000000..8255f66d01b --- /dev/null +++ b/news/12925.feature.rst @@ -0,0 +1 @@ +Use very rich progress output for batch downloading. Use ``ProgressBarType`` enum class for ``--progress-bar`` choices. diff --git a/src/pip/_internal/cli/cmdoptions.py b/src/pip/_internal/cli/cmdoptions.py index 3519dadf13d..f6f5c82661e 100644 --- a/src/pip/_internal/cli/cmdoptions.py +++ b/src/pip/_internal/cli/cmdoptions.py @@ -24,6 +24,7 @@ from pip._vendor.packaging.utils import canonicalize_name from pip._internal.cli.parser import ConfigOptionParser +from pip._internal.cli.progress_bars import ProgressBarType from pip._internal.exceptions import CommandError from pip._internal.locations import USER_CACHE_DIR, get_src_prefix from pip._internal.models.format_control import FormatControl @@ -228,15 +229,15 @@ class PipOption(Option): "--progress-bar", dest="progress_bar", type="choice", - choices=["auto", "on", "off", "raw"], - default="auto", + choices=ProgressBarType.choices(), + default=ProgressBarType.ON.value, help=( - "Specify whether the progress bar should be used. In 'auto'" - " mode, --quiet will suppress all progress bars." - " [auto, on, off, raw] (default: auto)" + "Specify whether the progress bar should be used" + f" {ProgressBarType.help_choices()} (default: %default)" ), ) + log: Callable[..., Option] = partial( PipOption, "--log", diff --git a/src/pip/_internal/cli/progress_bars.py b/src/pip/_internal/cli/progress_bars.py index af1bb6a5e05..d8359638f28 100644 --- a/src/pip/_internal/cli/progress_bars.py +++ b/src/pip/_internal/cli/progress_bars.py @@ -1,10 +1,20 @@ from __future__ import annotations +import abc import functools import sys -from collections.abc import Generator, Iterable, Iterator -from typing import Callable, Literal, TypeVar +from collections.abc import Iterable, Iterator +from enum import Enum +from typing import ( + TYPE_CHECKING, + Any, + Callable, + TypeVar, +) +from pip._vendor.rich.console import Console +from pip._vendor.rich.live import Live +from pip._vendor.rich.panel import Panel from pip._vendor.rich.progress import ( BarColumn, DownloadColumn, @@ -13,56 +23,100 @@ Progress, ProgressColumn, SpinnerColumn, + TaskID, TextColumn, TimeElapsedColumn, TimeRemainingColumn, TransferSpeedColumn, ) +from pip._vendor.rich.table import Table from pip._internal.cli.spinners import RateLimiter -from pip._internal.req.req_install import InstallRequirement from pip._internal.utils.logging import get_console, get_indentation +if TYPE_CHECKING: + from pip._internal.req.req_install import InstallRequirement + T = TypeVar("T") ProgressRenderer = Callable[[Iterable[T]], Iterator[T]] -BarType = Literal["on", "off", "raw"] -def _rich_download_progress_bar( +def _unknown_size_columns() -> tuple[ProgressColumn, ...]: + """Rich progress with a spinner for completion of a download of unknown size. + + This is employed for downloads where the server does not return a 'Content-Length' + header, which currently cannot be inferred from e.g. wheel metadata.""" + return ( + TextColumn("[progress.description]{task.description}"), + SpinnerColumn("line", speed=1.5), + FileSizeColumn(), + TransferSpeedColumn(), + TimeElapsedColumn(), + ) + + +def _known_size_columns() -> tuple[ProgressColumn, ...]: + """Rich progress for %completion of a download task in terms of bytes, with ETA.""" + return ( + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + DownloadColumn(), + TransferSpeedColumn(), + TextColumn("{task.fields[time_description]}"), + TimeRemainingColumn(elapsed_when_finished=True), + ) + + +def _task_columns() -> tuple[ProgressColumn, ...]: + """Rich progress for %complete out of a fixed positive number of known tasks.""" + return ( + TextColumn("[progress.description]{task.description}"), + SpinnerColumn("line", speed=1.5), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + MofNCompleteColumn(), + ) + + +def _progress_task_prefix() -> str: + """For output that doesn't take up the whole terminal, make it align with current + logger indentation.""" + return " " * (get_indentation() + 2) + + +def _rich_progress_bar( iterable: Iterable[bytes], *, - bar_type: BarType, size: int | None, initial_progress: int | None = None, -) -> Generator[bytes, None, None]: - assert bar_type == "on", "This should only be used in the default mode." + quiet: bool, + color: bool, +) -> Iterator[bytes]: + """Deploy a single rich progress bar to wrap a single download task. - if not size: + This provides a single line of updating output, prefixed with the appropriate + indentation. ETA and %completion are provided if ``size`` is known; otherwise, + a spinner with size, transfer speed, and time elapsed are provided.""" + if size is None: total = float("inf") - columns: tuple[ProgressColumn, ...] = ( - TextColumn("[progress.description]{task.description}"), - SpinnerColumn("line", speed=1.5), - FileSizeColumn(), - TransferSpeedColumn(), - TimeElapsedColumn(), - ) + columns = _unknown_size_columns() else: total = size - columns = ( - TextColumn("[progress.description]{task.description}"), - BarColumn(), - DownloadColumn(), - TransferSpeedColumn(), - TextColumn("{task.fields[time_description]}"), - TimeRemainingColumn(elapsed_when_finished=True), - ) + columns = _known_size_columns() - progress = Progress(*columns, refresh_per_second=5) + progress = Progress( + *columns, + # TODO: consider writing to stderr over stdout? + console=Console(stderr=False, quiet=quiet, no_color=not color), + refresh_per_second=5, + ) + # This adds a task with no name, just enough indentation to align with log + # output. We rely upon the name of the download being printed beforehand on the + # previous line for context. task_id = progress.add_task( - " " * (get_indentation() + 2), total=total, time_description="eta" + _progress_task_prefix(), total=total, time_description="eta" ) - if initial_progress is not None: - progress.update(task_id, advance=initial_progress) with progress: for chunk in iterable: yield chunk @@ -96,56 +150,453 @@ def _raw_progress_bar( iterable: Iterable[bytes], *, size: int | None, + quiet: bool, initial_progress: int | None = None, -) -> Generator[bytes, None, None]: - def write_progress(current: int, total: int) -> None: - sys.stdout.write(f"Progress {current} of {total}\n") - sys.stdout.flush() +) -> Iterator[bytes]: + """Hand-write progress to stdout. + + Use subsequent lines for each chunk, with manual rate limiting. + """ + prefix = _progress_task_prefix() + total_fmt = "?" if size is None else str(size) + stream = sys.stdout + + def write_progress(current: int) -> None: + if quiet: + return + stream.write(f"{prefix}Progress {current} of {total_fmt} bytes\n") + stream.flush() current = initial_progress or 0 - total = size or 0 rate_limiter = RateLimiter(0.25) - write_progress(current, total) + write_progress(current) for chunk in iterable: current += len(chunk) - if rate_limiter.ready() or current == total: - write_progress(current, total) + if rate_limiter.ready() or current == size: + write_progress(current) rate_limiter.reset() yield chunk +class ProgressBarType(Enum): + """Types of progress output to show, for single or batched downloads. + + The values of this enum are used as the choices for the --progress-var CLI flag.""" + + ON = "on" + OFF = "off" + RAW = "raw" + + @classmethod + def choices(cls) -> list[str]: + return [x.value for x in cls] + + @classmethod + def help_choices(cls) -> str: + inner = ", ".join(cls.choices()) + return f"[{inner}]" + + def get_download_progress_renderer( - *, bar_type: BarType, size: int | None = None, initial_progress: int | None = None + *, + bar_type: ProgressBarType, + size: int | None = None, + initial_progress: int | None = None, + quiet: bool = False, + color: bool = True, ) -> ProgressRenderer[bytes]: """Get an object that can be used to render the download progress. Returns a callable, that takes an iterable to "wrap". """ - if bar_type == "on": + if size is not None: + assert size >= 0 + + # TODO: use 3.10+ match statement! + if bar_type == ProgressBarType.ON: return functools.partial( - _rich_download_progress_bar, - bar_type=bar_type, + _rich_progress_bar, size=size, initial_progress=initial_progress, + quiet=quiet, + color=color, ) - elif bar_type == "raw": + elif bar_type == ProgressBarType.RAW: return functools.partial( - _raw_progress_bar, - size=size, - initial_progress=initial_progress, + _raw_progress_bar, size=size, initial_progress=initial_progress, quiet=quiet ) else: + assert bar_type == ProgressBarType.OFF return iter # no-op, when passed an iterator def get_install_progress_renderer( - *, bar_type: BarType, total: int + *, bar_type: ProgressBarType, total: int ) -> ProgressRenderer[InstallRequirement]: """Get an object that can be used to render the install progress. Returns a callable, that takes an iterable to "wrap". """ - if bar_type == "on": + if bar_type == ProgressBarType.ON: return functools.partial(_rich_install_progress_bar, total=total) else: return iter + + +_ProgressClass = TypeVar("_ProgressClass", bound="BatchedProgress") + + +class BatchedProgress(abc.ABC): + """Interface for reporting progress output on batched download tasks. + + For batched downloads, we want to be able to express progress on several parallel + tasks at once. This means that instead of transforming an ``Iterator[bytes]`` like + ``DownloadProgressRenderer``, we instead want to receive asynchronous notifications + about progress over several separate tasks. These tasks may not start all at once, + and will end at different times. We assume progress over all of these tasks can be + uniformly summed up to get a measure of total progress. + """ + + @abc.abstractmethod + def add_subtask(self, description: str, total: int | None) -> TaskID: + """Given a specific subtask description and known total length, add it to the + set of tracked tasks. + + This method is generally expected to be called before __enter__, but this is not + required.""" + ... + + @abc.abstractmethod + def start_subtask(self, task_id: TaskID) -> None: + """Given a subtask id returned by .add_subtask(), signal that the task + has begun. + + This information is used in progress reporting to calculate ETA. This method is + generally expected to be called after __enter__, but this is not required.""" + ... + + @abc.abstractmethod + def advance_subtask(self, task_id: TaskID, steps: int) -> None: + """Given a subtask id returned by .add_subtask(), progress the given number of + steps. + + Since tasks correspond to downloaded files, ``steps`` refers to the number of + bytes received. This is expected not to overflow the ``total`` number provided + to .add_subtask(), since the total is expected to be exact, but no error will + occur if it does.""" + ... + + @abc.abstractmethod + def finish_subtask(self, task_id: TaskID) -> None: + """Given a subtask id returned by .add_subtask(), indicate the task is complete. + + This is generally used to remove the task progress from the set of tracked + tasks, or to log that the task has completed. It does not need to be called in + the case of an exception.""" + ... + + @abc.abstractmethod + def __enter__(self) -> BatchedProgress: + """Begin writing output to the terminal to track task progress. + + This may do nothing for no-op progress recorders, or it may write log messages, + or it may produce a rich output taking up the entire terminal.""" + ... + + @abc.abstractmethod + def __exit__(self, ty: Any, val: Any, tb: Any) -> None: + """Clean up any output written to the terminal. + + This is generally a no-op except for the rich progress recorder, which will give + back the terminal to the rest of pip.""" + ... + + @classmethod + @abc.abstractmethod + def create( + cls: type[_ProgressClass], + num_tasks: int, + known_total_length: int | None, + quiet: bool, + color: bool, + ) -> _ProgressClass: + """Generate a progress recorder for a static number of known tasks. + + These tasks are intended to correspond to file downloads, so their "length" + corresponds to byte length. These tasks may not have their individual byte + lengths known, depending upon whether the server provides a 'Content-Length' + header. + + Progress recorders are expected to produce no output when ``quiet=True``, and + should not write colored output to the terminal when ``color=False``.""" + ... + + @classmethod + def select_progress_bar(cls, bar_type: ProgressBarType) -> type[BatchedProgress]: + """Factory method to produce a progress recorder according to CLI flag.""" + # TODO: use 3.10+ match statement! + if bar_type == ProgressBarType.ON: + return BatchedRichProgressBar + if bar_type == ProgressBarType.RAW: + return BatchedRawProgressBar + assert bar_type == ProgressBarType.OFF + return BatchedNoOpProgressBar + + +class BatchedNoOpProgressBar(BatchedProgress): + """Do absolutely nothing with the info.""" + + def add_subtask(self, description: str, total: int | None) -> TaskID: + return TaskID(0) + + def start_subtask(self, task_id: TaskID) -> None: + pass + + def advance_subtask(self, task_id: TaskID, steps: int) -> None: + pass + + def finish_subtask(self, task_id: TaskID) -> None: + pass + + def __enter__(self) -> BatchedNoOpProgressBar: + return self + + def __exit__(self, ty: Any, val: Any, tb: Any) -> None: + pass + + @classmethod + def create( + cls, + num_tasks: int, + known_total_length: int | None, + quiet: bool, + color: bool, + ) -> BatchedNoOpProgressBar: + return cls() + + +class BatchedRawProgressBar(BatchedProgress): + """Manually write progress output to stdout. + + This will notify when subtasks have started, when they've completed, and how much + progress was made in the overall byte download (the sum of all bytes downloaded as + a fraction of the known total bytes, if provided).""" + + def __init__( + self, + total_bytes: int | None, + prefix: str, + quiet: bool, + ) -> None: + self._total_bytes = total_bytes + self._prefix = prefix + self._total_progress = 0 + self._subtasks: list[tuple[str, int | None]] = [] + self._rate_limiter = RateLimiter(0.25) + self._stream = sys.stdout + self._quiet = quiet + + def add_subtask(self, description: str, total: int | None) -> TaskID: + task_id = len(self._subtasks) + self._subtasks.append((description, total)) + return TaskID(task_id) + + def _write_immediate(self, line: str) -> None: + if self._quiet: + return + self._stream.write(f"{self._prefix}{line}\n") + self._stream.flush() + + @staticmethod + def _format_total(total: int | None) -> str: + if total is None: + return "?" + return str(total) + + def _total_tasks(self) -> int: + return len(self._subtasks) + + def start_subtask(self, task_id: TaskID) -> None: + description, total = self._subtasks[task_id] + total_fmt = self._format_total(total) + task_index = task_id + 1 + n = self._total_tasks() + self._write_immediate( + f"Starting download [{task_index}/{n}] {description} ({total_fmt} bytes)" + ) + + def _write_progress(self) -> None: + total_fmt = self._format_total(self._total_bytes) + if self._total_bytes is not None: + raw_pcnt = float(self._total_progress) / float(self._total_bytes) * 100 + pcnt = str(round(raw_pcnt, 1)) + else: + pcnt = "?" + self._write_immediate( + f"Progress {pcnt}% {self._total_progress} of {total_fmt} bytes" + ) + + def advance_subtask(self, task_id: TaskID, steps: int) -> None: + self._total_progress += steps + if self._rate_limiter.ready() or self._total_progress == self._total_bytes: + self._write_progress() + self._rate_limiter.reset() + + def finish_subtask(self, task_id: TaskID) -> None: + description, _total = self._subtasks[task_id] + task_index = task_id + 1 + n = self._total_tasks() + self._write_immediate(f"Completed download [{task_index}/{n}] {description}") + + def __enter__(self) -> BatchedRawProgressBar: + self._write_progress() + return self + + def __exit__(self, ty: Any, val: Any, tb: Any) -> None: + pass + + @classmethod + def create( + cls, + num_tasks: int, + known_total_length: int | None, + quiet: bool, + color: bool, + ) -> BatchedRawProgressBar: + prefix = _progress_task_prefix() + return cls(known_total_length, prefix, quiet=quiet) + + +class BatchedRichProgressBar(BatchedProgress): + """Extremely rich progress output for download tasks. + + Provides overall byte progress as well as a separate progress for # of tasks + completed, with individual lines for each subtask. Subtasks are removed from the + table upon completion. ETA and %completion is generated for all subtasks as well as + the overall byte download task.""" + + def __init__( + self, + task_progress: Progress, + total_task_id: TaskID, + progress: Progress, + total_bytes_task_id: TaskID, + quiet: bool, + color: bool, + ) -> None: + self._task_progress = task_progress + self._total_task_id = total_task_id + self._progress = progress + self._total_bytes_task_id = total_bytes_task_id + self._quiet = quiet + self._color = color + self._live: Live | None = None + + _TRIM_LEN = 20 + + def add_subtask(self, description: str, total: int | None) -> TaskID: + if len(description) > self._TRIM_LEN: + description_trimmed = description[: self._TRIM_LEN] + "..." + else: + description_trimmed = description + return self._progress.add_task( + description=f"[green]{description_trimmed}", + start=False, + total=total, + time_description="eta", + ) + + def start_subtask(self, task_id: TaskID) -> None: + self._progress.start_task(task_id) + + def advance_subtask(self, task_id: TaskID, steps: int) -> None: + self._progress.advance(self._total_bytes_task_id, steps) + self._progress.advance(task_id, steps) + + def finish_subtask(self, task_id: TaskID) -> None: + self._task_progress.advance(self._total_task_id) + self._progress.remove_task(task_id) + + def __enter__(self) -> BatchedRichProgressBar: + """Generate a table with two rows so different columns can be used. + + Overall progress in terms of # tasks completed is shown at top, while a box of + all individual tasks is provided below. Tasks are removed from the table (making + it shorter) when completed, and are shown with indeterminate ETA before they are + started.""" + table = Table.grid() + table.add_row( + Panel( + self._task_progress, + title="Download Progress", + border_style="cyan", + padding=(0, 1), + ) + ) + table.add_row( + Panel( + self._progress, + title="[b]Individual Request Progress", + border_style="green", + padding=(0, 0), + ) + ) + self._live = Live( + table, + # TODO: consider writing to stderr over stdout? + console=Console(stderr=False, quiet=self._quiet, no_color=not self._color), + refresh_per_second=5, + ) + self._task_progress.start_task(self._total_task_id) + self._progress.start_task(self._total_bytes_task_id) + self._live.__enter__() + return self + + def __exit__(self, ty: Any, val: Any, tb: Any) -> None: + assert self._live is not None + self._live.__exit__(ty, val, tb) + + @classmethod + def create( + cls, + num_tasks: int, + known_total_length: int | None, + quiet: bool, + color: bool, + ) -> BatchedRichProgressBar: + # This progress indicator is for completion of download subtasks, separate from + # counting overall progress by summing chunk byte lengths. + task_columns = _task_columns() + task_progress = Progress(*task_columns) + # Create the single task in this progress indicator, tracking # of + # completed tasks. + total_task_id = task_progress.add_task( + description="[yellow]total downloads", + start=False, + total=num_tasks, + ) + + # This progress indicator is for individual byte downloads. + if known_total_length is None: + total = float("inf") + columns = _unknown_size_columns() + else: + total = known_total_length + columns = _known_size_columns() + progress = Progress(*columns) + # Create a task for total progress in byte downloads. + total_bytes_task_id = progress.add_task( + description="[cyan]total bytes", + start=False, + total=total, + time_description="eta", + ) + + return cls( + task_progress, + total_task_id, + progress, + total_bytes_task_id, + quiet=quiet, + color=color, + ) diff --git a/src/pip/_internal/cli/req_command.py b/src/pip/_internal/cli/req_command.py index dc1328ff019..5047fc5d500 100644 --- a/src/pip/_internal/cli/req_command.py +++ b/src/pip/_internal/cli/req_command.py @@ -17,6 +17,7 @@ from pip._internal.cli import cmdoptions from pip._internal.cli.index_command import IndexGroupCommand from pip._internal.cli.index_command import SessionCommandMixin as SessionCommandMixin +from pip._internal.cli.progress_bars import ProgressBarType from pip._internal.exceptions import CommandError, PreviousBuildDirError from pip._internal.index.collector import LinkCollector from pip._internal.index.package_finder import PackageFinder @@ -141,12 +142,14 @@ def make_requirement_preparer( check_build_deps=options.check_build_deps, build_tracker=build_tracker, session=session, - progress_bar=options.progress_bar, + progress_bar=ProgressBarType(options.progress_bar), finder=finder, require_hashes=options.require_hashes, use_user_site=use_user_site, lazy_wheel=lazy_wheel, verbosity=verbosity, + quietness=options.quiet, + color=not options.no_color, legacy_resolver=legacy_resolver, resume_retries=options.resume_retries, ) diff --git a/src/pip/_internal/network/download.py b/src/pip/_internal/network/download.py index 9881cc285fa..a7eae6216d0 100644 --- a/src/pip/_internal/network/download.py +++ b/src/pip/_internal/network/download.py @@ -9,15 +9,21 @@ from collections.abc import Iterable, Mapping from dataclasses import dataclass from http import HTTPStatus +from pathlib import Path from typing import BinaryIO from pip._vendor.requests import PreparedRequest from pip._vendor.requests.models import Response +from pip._vendor.rich.progress import TaskID from pip._vendor.urllib3 import HTTPResponse as URLlib3Response from pip._vendor.urllib3._collections import HTTPHeaderDict from pip._vendor.urllib3.exceptions import ReadTimeoutError -from pip._internal.cli.progress_bars import BarType, get_download_progress_renderer +from pip._internal.cli.progress_bars import ( + BatchedProgress, + ProgressBarType, + get_download_progress_renderer, +) from pip._internal.exceptions import IncompleteDownloadError, NetworkConnectionError from pip._internal.models.index import PyPI from pip._internal.models.link import Link @@ -44,19 +50,22 @@ def _get_http_response_etag_or_last_modified(resp: Response) -> str | None: return resp.headers.get("etag", resp.headers.get("last-modified")) -def _log_download( - resp: Response, - link: Link, - progress_bar: BarType, - total_length: int | None, - range_start: int | None = 0, -) -> Iterable[bytes]: +def _format_download_log_url(link: Link) -> str: if link.netloc == PyPI.file_storage_domain: url = link.show_url else: url = link.url_without_fragment - logged_url = redact_auth_from_url(url) + return redact_auth_from_url(url) + + +def _log_download_link( + link: Link, + total_length: int | None, + range_start: int | None = 0, + link_is_from_cache: bool = False, +) -> None: + logged_url = _format_download_log_url(link) if total_length: if range_start: @@ -66,13 +75,27 @@ def _log_download( else: logged_url = f"{logged_url} ({format_size(total_length)})" - if is_from_cache(resp): + if link_is_from_cache: logger.info("Using cached %s", logged_url) elif range_start: logger.info("Resuming download %s", logged_url) else: logger.info("Downloading %s", logged_url) + +def _prepare_download( + resp: Response, + link: Link, + progress_bar: ProgressBarType, + total_length: int | None, + range_start: int | None = 0, + quiet: bool = False, + color: bool = True, +) -> Iterable[bytes]: + total_length = _get_http_response_size(resp) + + _log_download_link(link, total_length, is_from_cache(resp)) + if logger.getEffectiveLevel() > logging.INFO: show_progress = False elif is_from_cache(resp): @@ -90,7 +113,11 @@ def _log_download( return chunks renderer = get_download_progress_renderer( - bar_type=progress_bar, size=total_length, initial_progress=range_start + bar_type=progress_bar, + size=total_length, + initial_progress=range_start, + quiet=quiet, + color=color, ) return renderer(chunks) @@ -117,22 +144,24 @@ def parse_content_disposition(content_disposition: str, default_filename: str) - return filename or default_filename -def _get_http_response_filename(resp: Response, link: Link) -> str: +def _get_http_response_filename( + headers: Mapping[str, str], resp_url: str, link: Link +) -> str: """Get an ideal filename from the given HTTP response, falling back to the link filename if not provided. """ filename = link.filename # fallback # Have a look at the Content-Disposition header for a better guess - content_disposition = resp.headers.get("content-disposition") + content_disposition = headers.get("content-disposition", None) if content_disposition: filename = parse_content_disposition(content_disposition, filename) ext: str | None = splitext(filename)[1] if not ext: - ext = mimetypes.guess_extension(resp.headers.get("content-type", "")) + ext = mimetypes.guess_extension(headers.get("content-type", "")) if ext: filename += ext - if not ext and link.url != resp.url: - ext = os.path.splitext(resp.url)[1] + if not ext and link.url != resp_url: + ext = os.path.splitext(resp_url)[1] if ext: filename += ext return filename @@ -162,12 +191,31 @@ def reset_file(self) -> None: self.bytes_received = 0 +def _http_head_content_info( + session: PipSession, + link: Link, +) -> tuple[int | None, str]: + target_url = link.url.split("#", 1)[0] + resp = session.head(target_url) + raise_for_status(resp) + + if length := resp.headers.get("content-length", None): + content_length = int(length) + else: + content_length = None + + filename = _get_http_response_filename(resp.headers, resp.url, link) + return content_length, filename + + class Downloader: def __init__( self, session: PipSession, - progress_bar: BarType, + progress_bar: ProgressBarType, resume_retries: int, + quiet: bool = False, + color: bool = True, ) -> None: assert ( resume_retries >= 0 @@ -175,21 +223,17 @@ def __init__( self._session = session self._progress_bar = progress_bar self._resume_retries = resume_retries - - def batch( - self, links: Iterable[Link], location: str - ) -> Iterable[tuple[Link, tuple[str, str]]]: - """Convenience method to download multiple links.""" - for link in links: - filepath, content_type = self(link, location) - yield link, (filepath, content_type) + self._quiet = quiet + self._color = color def __call__(self, link: Link, location: str) -> tuple[str, str]: """Download a link and save it under location.""" resp = self._http_get(link) download_size = _get_http_response_size(resp) - filepath = os.path.join(location, _get_http_response_filename(resp, link)) + filepath = os.path.join( + location, _get_http_response_filename(resp.headers, resp, link) + ) with open(filepath, "wb") as content_file: download = _FileDownload(link, content_file, download_size) self._process_response(download, resp) @@ -201,12 +245,14 @@ def __call__(self, link: Link, location: str) -> tuple[str, str]: def _process_response(self, download: _FileDownload, resp: Response) -> None: """Download and save chunks from a response.""" - chunks = _log_download( + chunks = _prepare_download( resp, download.link, self._progress_bar, download.size, range_start=download.bytes_received, + quiet=self._quiet, + color=self._color, ) try: for chunk in chunks: @@ -340,3 +386,86 @@ def _http_get(self, link: Link, headers: Mapping[str, str] = HEADERS) -> Respons ) raise return resp + + +class BatchDownloader: + def __init__( + self, + session: PipSession, + progress_bar: ProgressBarType, + resume_retries: int, + quiet: bool = False, + color: bool = True, + ) -> None: + self._session = session + self._progress_bar = progress_bar + self._inner = Downloader( + session, progress_bar, resume_retries, quiet=quiet, color=color + ) + self._quiet = quiet + self._color = color + + def __call__( + self, links: Iterable[Link], location: Path + ) -> Iterable[tuple[Link, tuple[Path, str | None]]]: + """Download the files given by links into location.""" + # Calculate the byte length for each file, if available. + links_with_lengths: list[tuple[Link, tuple[int | None, str]]] = [ + (link, _http_head_content_info(self._session, link)) for link in links + ] + # Sum up the total length we'll be downloading. + # TODO: filter out responses from cache from total download size? + total_length: int | None = 0 + for _link, (maybe_len, _filename) in links_with_lengths: + if maybe_len is None: + total_length = None + break + assert total_length is not None + total_length += maybe_len + + batched_progress = BatchedProgress.select_progress_bar( + self._progress_bar + ).create( + num_tasks=len(links_with_lengths), + known_total_length=total_length, + quiet=self._quiet, + color=self._color, + ) + + link_tasks: list[tuple[Link, TaskID, str]] = [] + for link, (maybe_len, filename) in links_with_lengths: + _log_download_link(link, maybe_len) + task_id = batched_progress.add_subtask(filename, maybe_len) + link_tasks.append((link, task_id, filename)) + + with batched_progress: + for link, task_id, filename in link_tasks: + try: + # FIXME: resume_retries!!! + resp = self._inner._http_get(link) + except NetworkConnectionError as e: + assert e.response is not None + logger.critical( + "HTTP error %s while getting %s", + e.response.status_code, + link, + ) + raise + + filepath = location / filename + content_type = resp.headers.get("Content-Type") + # TODO: different chunk size for batched downloads? + chunks = response_chunks(resp) + with open(filepath, "wb") as content_file: + # Notify that the current task has begun. + batched_progress.start_subtask(task_id) + for chunk in chunks: + # Copy chunk directly to output file, without any + # additional buffering. + content_file.write(chunk) + # Update progress. + batched_progress.advance_subtask(task_id, len(chunk)) + # Notify of completion. + batched_progress.finish_subtask(task_id) + # Yield completed link and download path. + yield link, (filepath, content_type) diff --git a/src/pip/_internal/network/utils.py b/src/pip/_internal/network/utils.py index 74d3111cff0..98658e6a0cc 100644 --- a/src/pip/_internal/network/utils.py +++ b/src/pip/_internal/network/utils.py @@ -56,6 +56,7 @@ def raise_for_status(resp: Response) -> None: raise NetworkConnectionError(http_error_msg, response=resp) +# TODO: consider reading into a bytearray? def response_chunks( response: Response, chunk_size: int = DOWNLOAD_CHUNK_SIZE ) -> Generator[bytes, None, None]: diff --git a/src/pip/_internal/operations/prepare.py b/src/pip/_internal/operations/prepare.py index 00b1a33a030..4ab8d53d786 100644 --- a/src/pip/_internal/operations/prepare.py +++ b/src/pip/_internal/operations/prepare.py @@ -15,6 +15,7 @@ from pip._vendor.packaging.utils import canonicalize_name from pip._internal.build_env import BuildEnvironmentInstaller +from pip._internal.cli.progress_bars import ProgressBarType from pip._internal.distributions import make_distribution_for_install_requirement from pip._internal.distributions.installed import InstalledDistribution from pip._internal.exceptions import ( @@ -31,7 +32,7 @@ from pip._internal.models.direct_url import ArchiveInfo from pip._internal.models.link import Link from pip._internal.models.wheel import Wheel -from pip._internal.network.download import Downloader +from pip._internal.network.download import BatchDownloader, Downloader from pip._internal.network.lazy_wheel import ( HTTPRangeRequestUnsupported, dist_from_wheel_url, @@ -57,7 +58,7 @@ from pip._internal.vcs import vcs if TYPE_CHECKING: - from pip._internal.cli.progress_bars import BarType + from pip._internal.cli.progress_bars import ProgressBarType logger = getLogger(__name__) @@ -236,12 +237,14 @@ def __init__( # noqa: PLR0913 (too many parameters) check_build_deps: bool, build_tracker: BuildTracker, session: PipSession, - progress_bar: BarType, + progress_bar: ProgressBarType, finder: PackageFinder, require_hashes: bool, use_user_site: bool, lazy_wheel: bool, verbosity: int, + quietness: int, + color: bool, legacy_resolver: bool, resume_retries: int, ) -> None: @@ -251,7 +254,16 @@ def __init__( # noqa: PLR0913 (too many parameters) self.build_dir = build_dir self.build_tracker = build_tracker self._session = session - self._download = Downloader(session, progress_bar, resume_retries) + self._download = Downloader( + session, progress_bar, resume_retries, quiet=quietness > 0, color=color + ) + self._batch_download = BatchDownloader( + session, + progress_bar, + resume_retries, + quiet=quietness > 0, + color=color, + ) self.finder = finder # Where still-packed archives should be written to. If None, they are @@ -475,27 +487,31 @@ def _complete_partial_requirements( assert req.link links_to_fully_download[req.link] = req - batch_download = self._download.batch(links_to_fully_download.keys(), temp_dir) + batch_download = self._batch_download( + links_to_fully_download.keys(), + Path(temp_dir), + ) + # Process completely-downloaded files in parallel with the worker threads + # spawned by the BatchDownloader. for link, (filepath, _) in batch_download: - logger.debug("Downloading link %s to %s", link, filepath) + logger.debug("Completed download for link %s into %s", link, filepath) req = links_to_fully_download[link] # Record the downloaded file path so wheel reqs can extract a Distribution # in .get_dist(). - req.local_file_path = filepath + req.local_file_path = str(filepath) # Record that the file is downloaded so we don't do it again in # _prepare_linked_requirement(). - self._downloaded[req.link.url] = filepath + self._downloaded[req.link.url] = str(filepath) # If this is an sdist, we need to unpack it after downloading, but the # .source_dir won't be set up until we are in _prepare_linked_requirement(). # Add the downloaded archive to the install requirement to unpack after # preparing the source dir. if not req.is_wheel: - req.needs_unpacked_archive(Path(filepath)) + req.needs_unpacked_archive(filepath) - # This step is necessary to ensure all lazy wheels are processed - # successfully by the 'download', 'wheel', and 'install' commands. - for req in partially_downloaded_reqs: + # This step is necessary to ensure all lazy wheels are processed + # successfully by the 'download', 'wheel', and 'install' commands. self._prepare_linked_requirement(req, parallel_builds) def prepare_linked_requirement( diff --git a/src/pip/_internal/req/__init__.py b/src/pip/_internal/req/__init__.py index e5050ee588b..21f03e322ac 100644 --- a/src/pip/_internal/req/__init__.py +++ b/src/pip/_internal/req/__init__.py @@ -5,7 +5,10 @@ from collections.abc import Generator, Sequence from dataclasses import dataclass -from pip._internal.cli.progress_bars import BarType, get_install_progress_renderer +from pip._internal.cli.progress_bars import ( + ProgressBarType, + get_install_progress_renderer, +) from pip._internal.utils.logging import indent_log from .req_file import parse_requirements @@ -44,7 +47,7 @@ def install_given_reqs( warn_script_location: bool, use_user_site: bool, pycompile: bool, - progress_bar: BarType, + progress_bar: ProgressBarType, ) -> list[InstallationResult]: """ Install everything in the given list. diff --git a/tests/unit/test_network_download.py b/tests/unit/test_network_download.py index 7fb6b7fe64b..90e999cd416 100644 --- a/tests/unit/test_network_download.py +++ b/tests/unit/test_network_download.py @@ -7,12 +7,13 @@ import pytest +from pip._internal.cli.progress_bars import ProgressBarType from pip._internal.exceptions import IncompleteDownloadError from pip._internal.models.link import Link from pip._internal.network.download import ( Downloader, _get_http_response_size, - _log_download, + _prepare_download, parse_content_disposition, sanitize_content_filename, ) @@ -92,10 +93,10 @@ def test_log_download( resp.from_cache = from_cache link = Link(url) total_length = _get_http_response_size(resp) - _log_download( + _prepare_download( resp, link, - progress_bar="on", + progress_bar=ProgressBarType.ON, total_length=total_length, range_start=range_start, ) @@ -317,7 +318,7 @@ def test_downloader( ) -> None: session = PipSession() link = Link("http://example.com/foo.tgz") - downloader = Downloader(session, "on", resume_retries) + downloader = Downloader(session, ProgressBarType.ON, resume_retries) responses = [] for headers, status_code, body in mock_responses: @@ -357,7 +358,7 @@ def test_resumed_download_caching(tmpdir: Path) -> None: cache_dir = tmpdir / "cache" session = PipSession(cache=str(cache_dir)) link = Link("https://example.com/foo.tgz") - downloader = Downloader(session, "on", resume_retries=5) + downloader = Downloader(session, ProgressBarType.ON, resume_retries=5) # Mock an incomplete download followed by a successful resume incomplete_resp = MockResponse(b"0cfa7e9d-1868-4dd7-9fb3-") diff --git a/tests/unit/test_operations_prepare.py b/tests/unit/test_operations_prepare.py index 7d58ec61648..73de13a0ed5 100644 --- a/tests/unit/test_operations_prepare.py +++ b/tests/unit/test_operations_prepare.py @@ -8,6 +8,7 @@ import pytest +from pip._internal.cli.progress_bars import ProgressBarType from pip._internal.exceptions import HashMismatch from pip._internal.models.link import Link from pip._internal.network.download import Downloader @@ -32,7 +33,7 @@ def _fake_session_get(*args: Any, **kwargs: Any) -> dict[str, str]: session = Mock() session.get = _fake_session_get - download = Downloader(session, progress_bar="on", resume_retries=0) + download = Downloader(session, progress_bar=ProgressBarType.ON, resume_retries=0) uri = data.packages.joinpath("simple-1.0.tar.gz").as_uri() link = Link(uri) @@ -78,7 +79,7 @@ def test_download_http_url__no_directory_traversal( "content-disposition": 'attachment;filename="../out_dir_file"', } session.get.return_value = resp - download = Downloader(session, progress_bar="on", resume_retries=0) + download = Downloader(session, progress_bar=ProgressBarType.ON, resume_retries=0) download_dir = os.fspath(tmpdir.joinpath("download")) os.mkdir(download_dir) diff --git a/tests/unit/test_req.py b/tests/unit/test_req.py index a2c4cf243ca..91635ab9b70 100644 --- a/tests/unit/test_req.py +++ b/tests/unit/test_req.py @@ -19,6 +19,7 @@ from pip._internal.build_env import SubprocessBuildEnvironmentInstaller from pip._internal.cache import WheelCache +from pip._internal.cli.progress_bars import ProgressBarType from pip._internal.commands import create_command from pip._internal.commands.install import InstallCommand from pip._internal.exceptions import ( @@ -109,12 +110,14 @@ def _basic_resolver( check_build_deps=False, build_tracker=tracker, session=session, - progress_bar="on", + progress_bar=ProgressBarType.ON, finder=finder, require_hashes=require_hashes, use_user_site=False, lazy_wheel=False, verbosity=0, + quietness=0, + color=True, legacy_resolver=True, resume_retries=0, ) From 77de66309135a17a409b4ab9c6540646fff588de Mon Sep 17 00:00:00 2001 From: Danny McClanahan <1305167+cosmicexplorer@users.noreply.github.com> Date: Tue, 20 Aug 2024 13:05:36 -0400 Subject: [PATCH 2/6] download larger files first --- src/pip/_internal/network/download.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/pip/_internal/network/download.py b/src/pip/_internal/network/download.py index a7eae6216d0..64ac8fa7751 100644 --- a/src/pip/_internal/network/download.py +++ b/src/pip/_internal/network/download.py @@ -10,7 +10,7 @@ from dataclasses import dataclass from http import HTTPStatus from pathlib import Path -from typing import BinaryIO +from typing import BinaryIO, cast from pip._vendor.requests import PreparedRequest from pip._vendor.requests.models import Response @@ -422,6 +422,10 @@ def __call__( break assert total_length is not None total_length += maybe_len + # Sort downloads to perform larger downloads first. + if total_length is not None: + # Extract the length from each tuple entry. + links_with_lengths.sort(key=lambda t: cast(int, t[1][0]), reverse=True) batched_progress = BatchedProgress.select_progress_bar( self._progress_bar From 0af29a61389d4f76733d328bb3f752db48c8d9e5 Mon Sep 17 00:00:00 2001 From: Danny McClanahan <1305167+cosmicexplorer@users.noreply.github.com> Date: Wed, 20 Aug 2025 20:53:04 -0400 Subject: [PATCH 3/6] reuse the same restarting logic for batch downloading --- src/pip/_internal/network/download.py | 414 ++++++++++++++++---------- tests/unit/test_network_download.py | 10 +- 2 files changed, 265 insertions(+), 159 deletions(-) diff --git a/src/pip/_internal/network/download.py b/src/pip/_internal/network/download.py index 64ac8fa7751..b0106aee797 100644 --- a/src/pip/_internal/network/download.py +++ b/src/pip/_internal/network/download.py @@ -2,15 +2,16 @@ from __future__ import annotations +import abc import email.message import logging import mimetypes import os -from collections.abc import Iterable, Mapping +from collections.abc import Iterable, Iterator, Mapping from dataclasses import dataclass from http import HTTPStatus from pathlib import Path -from typing import BinaryIO, cast +from typing import Any, BinaryIO, cast from pip._vendor.requests import PreparedRequest from pip._vendor.requests.models import Response @@ -94,7 +95,12 @@ def _prepare_download( ) -> Iterable[bytes]: total_length = _get_http_response_size(resp) - _log_download_link(link, total_length, is_from_cache(resp)) + _log_download_link( + link, + total_length=total_length, + range_start=range_start, + link_is_from_cache=is_from_cache(resp), + ) if logger.getEffectiveLevel() > logging.INFO: show_progress = False @@ -191,120 +197,24 @@ def reset_file(self) -> None: self.bytes_received = 0 -def _http_head_content_info( - session: PipSession, - link: Link, -) -> tuple[int | None, str]: - target_url = link.url.split("#", 1)[0] - resp = session.head(target_url) - raise_for_status(resp) - - if length := resp.headers.get("content-length", None): - content_length = int(length) - else: - content_length = None - - filename = _get_http_response_filename(resp.headers, resp.url, link) - return content_length, filename - - -class Downloader: - def __init__( - self, - session: PipSession, - progress_bar: ProgressBarType, - resume_retries: int, - quiet: bool = False, - color: bool = True, - ) -> None: - assert ( - resume_retries >= 0 - ), "Number of max resume retries must be bigger or equal to zero" +class _CacheSemantics: + def __init__(self, session: PipSession) -> None: self._session = session - self._progress_bar = progress_bar - self._resume_retries = resume_retries - self._quiet = quiet - self._color = color - def __call__(self, link: Link, location: str) -> tuple[str, str]: - """Download a link and save it under location.""" - resp = self._http_get(link) - download_size = _get_http_response_size(resp) + def http_head_content_info(self, link: Link) -> tuple[int | None, str]: + target_url = link.url.split("#", 1)[0] + resp = self._session.head(target_url) + raise_for_status(resp) - filepath = os.path.join( - location, _get_http_response_filename(resp.headers, resp, link) - ) - with open(filepath, "wb") as content_file: - download = _FileDownload(link, content_file, download_size) - self._process_response(download, resp) - if download.is_incomplete(): - self._attempt_resumes_or_redownloads(download, resp) - - content_type = resp.headers.get("Content-Type", "") - return filepath, content_type - - def _process_response(self, download: _FileDownload, resp: Response) -> None: - """Download and save chunks from a response.""" - chunks = _prepare_download( - resp, - download.link, - self._progress_bar, - download.size, - range_start=download.bytes_received, - quiet=self._quiet, - color=self._color, - ) - try: - for chunk in chunks: - download.write_chunk(chunk) - except ReadTimeoutError as e: - # If the download size is not known, then give up downloading the file. - if download.size is None: - raise e - - logger.warning("Connection timed out while downloading.") - - def _attempt_resumes_or_redownloads( - self, download: _FileDownload, first_resp: Response - ) -> None: - """Attempt to resume/restart the download if connection was dropped.""" - - while download.reattempts < self._resume_retries and download.is_incomplete(): - assert download.size is not None - download.reattempts += 1 - logger.warning( - "Attempting to resume incomplete download (%s/%s, attempt %d)", - format_size(download.bytes_received), - format_size(download.size), - download.reattempts, - ) - - try: - resume_resp = self._http_get_resume(download, should_match=first_resp) - # Fallback: if the server responded with 200 (i.e., the file has - # since been modified or range requests are unsupported) or any - # other unexpected status, restart the download from the beginning. - must_restart = resume_resp.status_code != HTTPStatus.PARTIAL_CONTENT - if must_restart: - download.reset_file() - download.size = _get_http_response_size(resume_resp) - first_resp = resume_resp - - self._process_response(download, resume_resp) - except (ConnectionError, ReadTimeoutError, OSError): - continue - - # No more resume attempts. Raise an error if the download is still incomplete. - if download.is_incomplete(): - os.remove(download.output_file.name) - raise IncompleteDownloadError(download) + if length := resp.headers.get("content-length", None): + content_length = int(length) + else: + content_length = None - # If we successfully completed the download via resume, manually cache it - # as a complete response to enable future caching - if download.reattempts > 0: - self._cache_resumed_download(download, first_resp) + filename = _get_http_response_filename(resp.headers, resp.url, link) + return content_length, filename - def _cache_resumed_download( + def cache_resumed_download( self, download: _FileDownload, original_response: Response ) -> None: """ @@ -360,7 +270,7 @@ def _cache_resumed_download( "Cached resumed download as complete response for future use: %s", url ) - def _http_get_resume( + def http_get_resume( self, download: _FileDownload, should_match: Response ) -> Response: """Issue a HTTP range request to resume the download.""" @@ -372,9 +282,9 @@ def _http_get_resume( # downloads caused by the remote file changing in-between. if identifier := _get_http_response_etag_or_last_modified(should_match): headers["If-Range"] = identifier - return self._http_get(download.link, headers) + return self.http_get(download.link, headers) - def _http_get(self, link: Link, headers: Mapping[str, str] = HEADERS) -> Response: + def http_get(self, link: Link, headers: Mapping[str, str] = HEADERS) -> Response: target_url = link.url_without_fragment try: resp = self._session.get(target_url, headers=headers, stream=True) @@ -388,6 +298,153 @@ def _http_get(self, link: Link, headers: Mapping[str, str] = HEADERS) -> Respons return resp +class _DownloadSemantics(abc.ABC): + @abc.abstractmethod + def prepare_response_chunks( + self, download: _FileDownload, resp: Response + ) -> Iterable[bytes]: ... + + @abc.abstractmethod + def process_chunk(self, download: _FileDownload, chunk: bytes) -> None: ... + + +class _DownloadLifecycle: + def __init__( + self, + cache_semantics: _CacheSemantics, + download_semantics: _DownloadSemantics, + resume_retries: int, + ) -> None: + self._cache_semantics = cache_semantics + self._download_semantics = download_semantics + assert ( + resume_retries >= 0 + ), "Number of max resume retries must be bigger or equal to zero" + self._resume_retries = resume_retries + + def _process_response(self, download: _FileDownload, resp: Response) -> None: + """Download and save chunks from a response.""" + chunks = self._download_semantics.prepare_response_chunks(download, resp) + try: + for chunk in chunks: + self._download_semantics.process_chunk(download, chunk) + except ReadTimeoutError: + # If the download size is not known, then give up downloading the file. + if download.size is None: + raise + logger.warning("Connection timed out while downloading.") + + def _attempt_resumes_or_redownloads( + self, download: _FileDownload, first_resp: Response + ) -> None: + """Attempt to resume/restart the download if connection was dropped.""" + + while download.reattempts < self._resume_retries and download.is_incomplete(): + assert download.size is not None + download.reattempts += 1 + logger.warning( + "Attempting to resume incomplete download (%s/%s, attempt %d)", + format_size(download.bytes_received), + format_size(download.size), + download.reattempts, + ) + + try: + resume_resp = self._cache_semantics.http_get_resume( + download, should_match=first_resp + ) + # Fallback: if the server responded with 200 (i.e., the file has + # since been modified or range requests are unsupported) or any + # other unexpected status, restart the download from the beginning. + must_restart = resume_resp.status_code != HTTPStatus.PARTIAL_CONTENT + if must_restart: + download.reset_file() + download.size = _get_http_response_size(resume_resp) + first_resp = resume_resp + + self._process_response(download, resume_resp) + except (ConnectionError, ReadTimeoutError, OSError): + continue + + # No more resume attempts. Raise an error if the download is still incomplete. + if download.is_incomplete(): + os.remove(download.output_file.name) + raise IncompleteDownloadError(download) + + # If we successfully completed the download via resume, manually cache it + # as a complete response to enable future caching + if download.reattempts > 0: + self._cache_semantics.cache_resumed_download(download, first_resp) + + def execute(self, download: _FileDownload, resp: Response) -> Response: + assert download.bytes_received == 0 + # Try the typical case first. + self._process_response(download, resp) + # Retry upon timeouts. + if download.is_incomplete(): + self._attempt_resumes_or_redownloads(download, resp) + return resp + + +class Downloader: + def __init__( + self, + session: PipSession, + progress_bar: ProgressBarType, + resume_retries: int, + quiet: bool = False, + color: bool = True, + ) -> None: + self._cache_semantics = _CacheSemantics(session) + self._resume_retries = resume_retries + + self._download_semantics = self._SingleDownloadSemantics( + progress_bar, quiet=quiet, color=color + ) + + @dataclass(frozen=True) + class _SingleDownloadSemantics(_DownloadSemantics): + progress_bar: ProgressBarType + quiet: bool + color: bool + + def prepare_response_chunks( + self, download: _FileDownload, resp: Response + ) -> Iterable[bytes]: + return _prepare_download( + resp, + download.link, + self.progress_bar, + download.size, + range_start=download.bytes_received, + quiet=self.quiet, + color=self.color, + ) + + def process_chunk(self, download: _FileDownload, chunk: bytes) -> None: + download.write_chunk(chunk) + + def __call__(self, link: Link, location: str) -> tuple[str, str]: + """Download a link and save it under location.""" + resp = self._cache_semantics.http_get(link) + download_size = _get_http_response_size(resp) + + filepath = os.path.join( + location, _get_http_response_filename(resp.headers, resp, link) + ) + with open(filepath, "wb") as content_file: + download = _FileDownload(link, content_file, download_size) + lifecycle = _DownloadLifecycle( + self._cache_semantics, + self._download_semantics, + self._resume_retries, + ) + resp = lifecycle.execute(download, resp) + + content_type = resp.headers.get("Content-Type", "") + return filepath, content_type + + class BatchDownloader: def __init__( self, @@ -397,21 +454,18 @@ def __init__( quiet: bool = False, color: bool = True, ) -> None: - self._session = session + self._cache_semantics = _CacheSemantics(session) self._progress_bar = progress_bar - self._inner = Downloader( - session, progress_bar, resume_retries, quiet=quiet, color=color - ) + self._resume_retries = resume_retries self._quiet = quiet self._color = color - def __call__( - self, links: Iterable[Link], location: Path - ) -> Iterable[tuple[Link, tuple[Path, str | None]]]: - """Download the files given by links into location.""" + def _retrieve_lengths( + self, links: Iterable[Link] + ) -> tuple[int | None, list[tuple[Link, tuple[int | None, str]]]]: # Calculate the byte length for each file, if available. links_with_lengths: list[tuple[Link, tuple[int | None, str]]] = [ - (link, _http_head_content_info(self._session, link)) for link in links + (link, self._cache_semantics.http_head_content_info(link)) for link in links ] # Sum up the total length we'll be downloading. # TODO: filter out responses from cache from total download size? @@ -426,6 +480,39 @@ def __call__( if total_length is not None: # Extract the length from each tuple entry. links_with_lengths.sort(key=lambda t: cast(int, t[1][0]), reverse=True) + return total_length, links_with_lengths + + def _prepare_tasks( + self, + location: Path, + link_tasks: Iterable[tuple[Link, TaskID, str, int | None]], + ) -> Iterator[tuple[Link, TaskID, Path, BinaryIO, Response, _FileDownload]]: + for link, task_id, filename, maybe_len in link_tasks: + resp = self._cache_semantics.http_get(link) + download_size = _get_http_response_size(resp) + + assert filename == _get_http_response_filename(resp.headers, resp, link) + filepath = location / filename + content_file = filepath.open("wb") + download = _FileDownload(link, content_file, download_size) + + _log_download_link( + link, + total_length=maybe_len, + range_start=download.bytes_received, + link_is_from_cache=is_from_cache(resp), + ) + yield link, task_id, filepath, content_file, resp, download + + def _construct_tasks_with_progression( + self, + links: Iterable[Link], + location: Path, + ) -> tuple[ + BatchedProgress, + list[tuple[Link, TaskID, Path, BinaryIO, Response, _FileDownload]], + ]: + total_length, links_with_lengths = self._retrieve_lengths(links) batched_progress = BatchedProgress.select_progress_bar( self._progress_bar @@ -436,40 +523,59 @@ def __call__( color=self._color, ) - link_tasks: list[tuple[Link, TaskID, str]] = [] + link_tasks: list[tuple[Link, TaskID, str, int | None]] = [] for link, (maybe_len, filename) in links_with_lengths: - _log_download_link(link, maybe_len) task_id = batched_progress.add_subtask(filename, maybe_len) - link_tasks.append((link, task_id, filename)) - - with batched_progress: - for link, task_id, filename in link_tasks: - try: - # FIXME: resume_retries!!! - resp = self._inner._http_get(link) - except NetworkConnectionError as e: - assert e.response is not None - logger.critical( - "HTTP error %s while getting %s", - e.response.status_code, - link, + link_tasks.append((link, task_id, filename, maybe_len)) + + return batched_progress, list(self._prepare_tasks(location, link_tasks)) + + class _BatchCurrentDownloadSemantics(_DownloadSemantics): + def __init__(self, batched_progress: BatchedProgress) -> None: + self._batched_progress = batched_progress + self.task_id: TaskID | None = None + + def __enter__(self) -> None: + assert self.task_id is not None + # Notify that the current task has begun. + self._batched_progress.start_subtask(self.task_id) + + def __exit__(self, *exc: Any) -> None: + assert self.task_id is not None + # Notify of completion. + self._batched_progress.finish_subtask(self.task_id) + self.task_id = None + + def prepare_response_chunks( + self, download: _FileDownload, resp: Response + ) -> Iterable[bytes]: + # TODO: different chunk size for batched downloads? + return response_chunks(resp) + + def process_chunk(self, download: _FileDownload, chunk: bytes) -> None: + assert self.task_id is not None + download.write_chunk(chunk) + self._batched_progress.advance_subtask(self.task_id, len(chunk)) + + def __call__( + self, links: Iterable[Link], location: Path + ) -> Iterable[tuple[Link, tuple[Path, str | None]]]: + """Download the files given by links into location.""" + progress, tasks = self._construct_tasks_with_progression(links, location) + download_semantics = self._BatchCurrentDownloadSemantics(progress) + + with progress: + for link, task_id, filepath, content_file, resp, download in tasks: + download_semantics.task_id = task_id + + with content_file, download_semantics: + lifecycle = _DownloadLifecycle( + self._cache_semantics, + download_semantics, + self._resume_retries, ) - raise + resp = lifecycle.execute(download, resp) - filepath = location / filename content_type = resp.headers.get("Content-Type") - # TODO: different chunk size for batched downloads? - chunks = response_chunks(resp) - with open(filepath, "wb") as content_file: - # Notify that the current task has begun. - batched_progress.start_subtask(task_id) - for chunk in chunks: - # Copy chunk directly to output file, without any - # additional buffering. - content_file.write(chunk) - # Update progress. - batched_progress.advance_subtask(task_id, len(chunk)) - # Notify of completion. - batched_progress.finish_subtask(task_id) # Yield completed link and download path. yield link, (filepath, content_type) diff --git a/tests/unit/test_network_download.py b/tests/unit/test_network_download.py index 90e999cd416..68afeb2198c 100644 --- a/tests/unit/test_network_download.py +++ b/tests/unit/test_network_download.py @@ -326,9 +326,9 @@ def test_downloader( resp.headers = headers resp.status_code = status_code responses.append(resp) - _http_get_mock = MagicMock(side_effect=responses) + http_get_mock = MagicMock(side_effect=responses) - with patch.object(Downloader, "_http_get", _http_get_mock): + with patch.object(downloader._cache_semantics, "http_get", http_get_mock): if expected_bytes is None: remove = MagicMock(return_value=None) with patch("os.remove", remove): @@ -350,7 +350,7 @@ def test_downloader( calls.append(call(link, headers)) # Make sure that the downloader makes additional requests for resumption - _http_get_mock.assert_has_calls(calls) + http_get_mock.assert_has_calls(calls) def test_resumed_download_caching(tmpdir: Path) -> None: @@ -370,9 +370,9 @@ def test_resumed_download_caching(tmpdir: Path) -> None: resume_resp.status_code = 206 responses = [incomplete_resp, resume_resp] - _http_get_mock = MagicMock(side_effect=responses) + http_get_mock = MagicMock(side_effect=responses) - with patch.object(Downloader, "_http_get", _http_get_mock): + with patch.object(downloader._cache_semantics, "http_get", http_get_mock): # Perform the download (incomplete then resumed) filepath, _ = downloader(link, str(tmpdir)) From 9d62bf8c8841ac2f57bdec60a8fbc127743d2d5d Mon Sep 17 00:00:00 2001 From: Danny McClanahan <1305167+cosmicexplorer@users.noreply.github.com> Date: Wed, 20 Aug 2025 22:46:03 -0400 Subject: [PATCH 4/6] fix very strange mypy failure--is an implicit conversion to blame? --- src/pip/_internal/network/download.py | 4 ++-- tests/unit/test_network_download.py | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/pip/_internal/network/download.py b/src/pip/_internal/network/download.py index b0106aee797..a1d12293edb 100644 --- a/src/pip/_internal/network/download.py +++ b/src/pip/_internal/network/download.py @@ -430,7 +430,7 @@ def __call__(self, link: Link, location: str) -> tuple[str, str]: download_size = _get_http_response_size(resp) filepath = os.path.join( - location, _get_http_response_filename(resp.headers, resp, link) + location, _get_http_response_filename(resp.headers, resp.url, link) ) with open(filepath, "wb") as content_file: download = _FileDownload(link, content_file, download_size) @@ -491,7 +491,7 @@ def _prepare_tasks( resp = self._cache_semantics.http_get(link) download_size = _get_http_response_size(resp) - assert filename == _get_http_response_filename(resp.headers, resp, link) + assert filename == _get_http_response_filename(resp.headers, resp.url, link) filepath = location / filename content_file = filepath.open("wb") download = _FileDownload(link, content_file, download_size) diff --git a/tests/unit/test_network_download.py b/tests/unit/test_network_download.py index 68afeb2198c..e5353515239 100644 --- a/tests/unit/test_network_download.py +++ b/tests/unit/test_network_download.py @@ -323,6 +323,7 @@ def test_downloader( responses = [] for headers, status_code, body in mock_responses: resp = MockResponse(body) + resp.url = link.url resp.headers = headers resp.status_code = status_code responses.append(resp) @@ -362,10 +363,12 @@ def test_resumed_download_caching(tmpdir: Path) -> None: # Mock an incomplete download followed by a successful resume incomplete_resp = MockResponse(b"0cfa7e9d-1868-4dd7-9fb3-") + incomplete_resp.url = link.url incomplete_resp.headers = {"content-length": "36"} incomplete_resp.status_code = 200 resume_resp = MockResponse(b"f2561d5dfd89") + resume_resp.url = link.url resume_resp.headers = {"content-length": "12"} resume_resp.status_code = 206 From 202f193efe6ffa272b98c008e8c6fab38aca9741 Mon Sep 17 00:00:00 2001 From: Danny McClanahan <1305167+cosmicexplorer@users.noreply.github.com> Date: Wed, 20 Aug 2025 23:49:41 -0400 Subject: [PATCH 5/6] fix test failure --- src/pip/_internal/cli/base_command.py | 8 ++++++-- src/pip/_internal/cli/cmdoptions.py | 6 +++--- src/pip/_internal/cli/progress_bars.py | 7 +++++++ tests/functional/test_install_config.py | 2 ++ 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/pip/_internal/cli/base_command.py b/src/pip/_internal/cli/base_command.py index 7acc29cb349..7bfc2d737d5 100644 --- a/src/pip/_internal/cli/base_command.py +++ b/src/pip/_internal/cli/base_command.py @@ -17,6 +17,7 @@ from pip._internal.cli import cmdoptions from pip._internal.cli.command_context import CommandContextMixIn from pip._internal.cli.parser import ConfigOptionParser, UpdatingDefaultsHelpFormatter +from pip._internal.cli.progress_bars import ProgressBarType from pip._internal.cli.status_codes import ( ERROR, PREVIOUS_BUILD_DIR_ERROR, @@ -176,8 +177,11 @@ def _main(self, args: list[str]) -> int: if options.debug_mode: self.verbosity = 2 - if hasattr(options, "progress_bar") and options.progress_bar == "auto": - options.progress_bar = "on" if self.verbosity >= 0 else "off" + if getattr(options, "progress_bar", None) == ProgressBarType.AUTO.value: + if self.verbosity >= 0: + options.progress_bar = ProgressBarType.ON.value + else: + options.progress_bar = ProgressBarType.OFF.value reconfigure(no_color=options.no_color) level_number = setup_logging( diff --git a/src/pip/_internal/cli/cmdoptions.py b/src/pip/_internal/cli/cmdoptions.py index f6f5c82661e..a7f3b3fb761 100644 --- a/src/pip/_internal/cli/cmdoptions.py +++ b/src/pip/_internal/cli/cmdoptions.py @@ -230,14 +230,14 @@ class PipOption(Option): dest="progress_bar", type="choice", choices=ProgressBarType.choices(), - default=ProgressBarType.ON.value, + default=ProgressBarType.AUTO.value, help=( - "Specify whether the progress bar should be used" + "Specify whether the progress bar should be used. In 'auto'" + " mode, --quiet will suppress all progress bars." f" {ProgressBarType.help_choices()} (default: %default)" ), ) - log: Callable[..., Option] = partial( PipOption, "--log", diff --git a/src/pip/_internal/cli/progress_bars.py b/src/pip/_internal/cli/progress_bars.py index d8359638f28..d9e9d9241e1 100644 --- a/src/pip/_internal/cli/progress_bars.py +++ b/src/pip/_internal/cli/progress_bars.py @@ -184,6 +184,7 @@ class ProgressBarType(Enum): The values of this enum are used as the choices for the --progress-var CLI flag.""" + AUTO = "auto" ON = "on" OFF = "off" RAW = "raw" @@ -213,6 +214,12 @@ def get_download_progress_renderer( if size is not None: assert size >= 0 + if bar_type == ProgressBarType.AUTO: + if quiet: + bar_type = ProgressBarType.OFF + else: + bar_type = ProgressBarType.ON + # TODO: use 3.10+ match statement! if bar_type == ProgressBarType.ON: return functools.partial( diff --git a/tests/functional/test_install_config.py b/tests/functional/test_install_config.py index baee7e80d55..25a3f330d29 100644 --- a/tests/functional/test_install_config.py +++ b/tests/functional/test_install_config.py @@ -443,6 +443,7 @@ def test_prompt_for_keyring_if_needed( keyring_script.pip( "install", "keyring", + allow_stderr_warning=True, ) environ["PATH"] = str(keyring_script.bin_path) + os.pathsep + environ["PATH"] @@ -457,6 +458,7 @@ def test_prompt_for_keyring_if_needed( script.pip( "install", "keyring", + allow_stderr_warning=True, ) if keyring_provider_implementation != "subprocess": From 090af7f08d3c4b86beb29cf5a06883c0e149af34 Mon Sep 17 00:00:00 2001 From: Danny McClanahan <1305167+cosmicexplorer@users.noreply.github.com> Date: Tue, 20 Aug 2024 10:51:27 -0400 Subject: [PATCH 6/6] execute batch downloads in parallel worker threads - limit downloads to 10 at a time instead of starting all at once - add cli arg to limit download parallelism - factor out receiving thread exceptions into a contextmanager - default batch parallelism to 10 - make batch download parallelism 1 in html index test - explicitly yield threads to help ensure correct download ordering --- news/12923.feature.rst | 1 + src/pip/_internal/cli/cmdoptions.py | 17 ++ src/pip/_internal/cli/progress_bars.py | 36 +++ src/pip/_internal/cli/req_command.py | 2 + src/pip/_internal/commands/download.py | 2 + src/pip/_internal/commands/install.py | 2 + src/pip/_internal/commands/wheel.py | 2 + src/pip/_internal/network/download.py | 302 ++++++++++++++++++------ src/pip/_internal/operations/prepare.py | 2 + tests/functional/test_download.py | 2 + tests/unit/test_req.py | 1 + 11 files changed, 296 insertions(+), 73 deletions(-) create mode 100644 news/12923.feature.rst diff --git a/news/12923.feature.rst b/news/12923.feature.rst new file mode 100644 index 00000000000..e152a8d5f1a --- /dev/null +++ b/news/12923.feature.rst @@ -0,0 +1 @@ +Download concrete dists for metadata-only resolves in parallel using worker threads. Add ``--batch-download-parallelism`` CLI flag to limit parallelism. diff --git a/src/pip/_internal/cli/cmdoptions.py b/src/pip/_internal/cli/cmdoptions.py index a7f3b3fb761..7826502b842 100644 --- a/src/pip/_internal/cli/cmdoptions.py +++ b/src/pip/_internal/cli/cmdoptions.py @@ -238,6 +238,23 @@ class PipOption(Option): ), ) +batch_download_parallelism: Callable[..., Option] = partial( + Option, + "--batch-download-parallelism", + dest="batch_download_parallelism", + type="int", + default=10, + help=( + "Maximum parallelism employed for batch downloading of metadata-only dists" + " (default %default parallel requests)." + " Note that more than 10 downloads may overflow the requests connection pool," + " which may affect performance." + " Note also that commands such as 'install --dry-run' should avoid downloads" + " entirely, and so will not be affected by this option." + ), +) + + log: Callable[..., Option] = partial( PipOption, "--log", diff --git a/src/pip/_internal/cli/progress_bars.py b/src/pip/_internal/cli/progress_bars.py index d9e9d9241e1..49475712b44 100644 --- a/src/pip/_internal/cli/progress_bars.py +++ b/src/pip/_internal/cli/progress_bars.py @@ -282,6 +282,16 @@ def start_subtask(self, task_id: TaskID) -> None: generally expected to be called after __enter__, but this is not required.""" ... + @abc.abstractmethod + def reset_subtask(self, task_id: TaskID, to_steps: int = 0) -> None: + """Given a subtask id returned by .add_subtask(), reset progress to exactly the + given number of steps. + + This is similar to .advance_subtask(), but intended to use when restarting or + resuming processes, such as when a download is interrupted. + """ + ... + @abc.abstractmethod def advance_subtask(self, task_id: TaskID, steps: int) -> None: """Given a subtask id returned by .add_subtask(), progress the given number of @@ -359,6 +369,9 @@ def add_subtask(self, description: str, total: int | None) -> TaskID: def start_subtask(self, task_id: TaskID) -> None: pass + def reset_subtask(self, task_id: TaskID, to_steps: int = 0) -> None: + pass + def advance_subtask(self, task_id: TaskID, steps: int) -> None: pass @@ -399,6 +412,7 @@ def __init__( self._prefix = prefix self._total_progress = 0 self._subtasks: list[tuple[str, int | None]] = [] + self._subtask_progress: list[int] = [] self._rate_limiter = RateLimiter(0.25) self._stream = sys.stdout self._quiet = quiet @@ -406,6 +420,7 @@ def __init__( def add_subtask(self, description: str, total: int | None) -> TaskID: task_id = len(self._subtasks) self._subtasks.append((description, total)) + self._subtask_progress.append(0) return TaskID(task_id) def _write_immediate(self, line: str) -> None: @@ -424,6 +439,7 @@ def _total_tasks(self) -> int: return len(self._subtasks) def start_subtask(self, task_id: TaskID) -> None: + assert self._subtask_progress[task_id] == 0 description, total = self._subtasks[task_id] total_fmt = self._format_total(total) task_index = task_id + 1 @@ -443,7 +459,16 @@ def _write_progress(self) -> None: f"Progress {pcnt}% {self._total_progress} of {total_fmt} bytes" ) + def reset_subtask(self, task_id: TaskID, to_steps: int = 0) -> None: + self._total_progress -= self._subtask_progress[task_id] + self._subtask_progress[task_id] = 0 + self.advance_subtask(task_id, to_steps) + def advance_subtask(self, task_id: TaskID, steps: int) -> None: + self._subtask_progress[task_id] += steps + _description, total = self._subtasks[task_id] + if total is not None: + assert self._subtask_progress[task_id] <= total self._total_progress += steps if self._rate_limiter.ready() or self._total_progress == self._total_bytes: self._write_progress() @@ -495,6 +520,7 @@ def __init__( self._total_task_id = total_task_id self._progress = progress self._total_bytes_task_id = total_bytes_task_id + self._subtask_progress: dict[TaskID, int] = {} self._quiet = quiet self._color = color self._live: Live | None = None @@ -514,15 +540,25 @@ def add_subtask(self, description: str, total: int | None) -> TaskID: ) def start_subtask(self, task_id: TaskID) -> None: + assert task_id not in self._subtask_progress self._progress.start_task(task_id) + self._subtask_progress[task_id] = 0 + + def reset_subtask(self, task_id: TaskID, to_steps: int = 0) -> None: + cur_progress = self._subtask_progress[task_id] + self._subtask_progress[task_id] = to_steps + self._progress.advance(self._total_bytes_task_id, -cur_progress) + self._progress.reset(task_id, completed=to_steps) def advance_subtask(self, task_id: TaskID, steps: int) -> None: + self._subtask_progress[task_id] += steps self._progress.advance(self._total_bytes_task_id, steps) self._progress.advance(task_id, steps) def finish_subtask(self, task_id: TaskID) -> None: self._task_progress.advance(self._total_task_id) self._progress.remove_task(task_id) + del self._subtask_progress[task_id] def __enter__(self) -> BatchedRichProgressBar: """Generate a table with two rows so different columns can be used. diff --git a/src/pip/_internal/cli/req_command.py b/src/pip/_internal/cli/req_command.py index 5047fc5d500..407b5312008 100644 --- a/src/pip/_internal/cli/req_command.py +++ b/src/pip/_internal/cli/req_command.py @@ -106,6 +106,7 @@ def make_requirement_preparer( use_user_site: bool, download_dir: str | None = None, verbosity: int = 0, + batch_download_parallelism: int | None = None, ) -> RequirementPreparer: """ Create a RequirementPreparer instance for the given parameters. @@ -150,6 +151,7 @@ def make_requirement_preparer( verbosity=verbosity, quietness=options.quiet, color=not options.no_color, + batch_download_parallelism=batch_download_parallelism, legacy_resolver=legacy_resolver, resume_retries=options.resume_retries, ) diff --git a/src/pip/_internal/commands/download.py b/src/pip/_internal/commands/download.py index 900fb403d6f..c6b109bf2b5 100644 --- a/src/pip/_internal/commands/download.py +++ b/src/pip/_internal/commands/download.py @@ -46,6 +46,7 @@ def add_options(self) -> None: self.cmd_opts.add_option(cmdoptions.pre()) self.cmd_opts.add_option(cmdoptions.require_hashes()) self.cmd_opts.add_option(cmdoptions.progress_bar()) + self.cmd_opts.add_option(cmdoptions.batch_download_parallelism()) self.cmd_opts.add_option(cmdoptions.no_build_isolation()) self.cmd_opts.add_option(cmdoptions.use_pep517()) self.cmd_opts.add_option(cmdoptions.no_use_pep517()) @@ -115,6 +116,7 @@ def run(self, options: Values, args: list[str]) -> int: download_dir=options.download_dir, use_user_site=False, verbosity=self.verbosity, + batch_download_parallelism=options.batch_download_parallelism, ) resolver = self.make_resolver( diff --git a/src/pip/_internal/commands/install.py b/src/pip/_internal/commands/install.py index 1ef7a0f4410..071c9974da5 100644 --- a/src/pip/_internal/commands/install.py +++ b/src/pip/_internal/commands/install.py @@ -251,6 +251,7 @@ def add_options(self) -> None: self.cmd_opts.add_option(cmdoptions.prefer_binary()) self.cmd_opts.add_option(cmdoptions.require_hashes()) self.cmd_opts.add_option(cmdoptions.progress_bar()) + self.cmd_opts.add_option(cmdoptions.batch_download_parallelism()) self.cmd_opts.add_option(cmdoptions.root_user_action()) index_opts = cmdoptions.make_option_group( @@ -373,6 +374,7 @@ def run(self, options: Values, args: list[str]) -> int: finder=finder, use_user_site=options.use_user_site, verbosity=self.verbosity, + batch_download_parallelism=options.batch_download_parallelism, ) resolver = self.make_resolver( preparer=preparer, diff --git a/src/pip/_internal/commands/wheel.py b/src/pip/_internal/commands/wheel.py index 61be254912f..5202abbf5df 100644 --- a/src/pip/_internal/commands/wheel.py +++ b/src/pip/_internal/commands/wheel.py @@ -66,6 +66,7 @@ def add_options(self) -> None: self.cmd_opts.add_option(cmdoptions.ignore_requires_python()) self.cmd_opts.add_option(cmdoptions.no_deps()) self.cmd_opts.add_option(cmdoptions.progress_bar()) + self.cmd_opts.add_option(cmdoptions.batch_download_parallelism()) self.cmd_opts.add_option( "--no-verify", @@ -130,6 +131,7 @@ def run(self, options: Values, args: list[str]) -> int: download_dir=options.wheel_dir, use_user_site=False, verbosity=self.verbosity, + batch_download_parallelism=options.batch_download_parallelism, ) resolver = self.make_resolver( diff --git a/src/pip/_internal/network/download.py b/src/pip/_internal/network/download.py index a1d12293edb..da99170d753 100644 --- a/src/pip/_internal/network/download.py +++ b/src/pip/_internal/network/download.py @@ -7,10 +7,14 @@ import logging import mimetypes import os +import time from collections.abc import Iterable, Iterator, Mapping +from contextlib import contextmanager from dataclasses import dataclass from http import HTTPStatus from pathlib import Path +from queue import Queue +from threading import Event, Semaphore, Thread from typing import Any, BinaryIO, cast from pip._vendor.requests import PreparedRequest @@ -25,7 +29,11 @@ ProgressBarType, get_download_progress_renderer, ) -from pip._internal.exceptions import IncompleteDownloadError, NetworkConnectionError +from pip._internal.exceptions import ( + CommandError, + IncompleteDownloadError, + NetworkConnectionError, +) from pip._internal.models.index import PyPI from pip._internal.models.link import Link from pip._internal.network.cache import SafeFileCache, is_from_cache @@ -65,6 +73,7 @@ def _log_download_link( total_length: int | None, range_start: int | None = 0, link_is_from_cache: bool = False, + level: int = logging.INFO, ) -> None: logged_url = _format_download_log_url(link) @@ -77,11 +86,11 @@ def _log_download_link( logged_url = f"{logged_url} ({format_size(total_length)})" if link_is_from_cache: - logger.info("Using cached %s", logged_url) + logger.log(level, "Using cached %s", logged_url) elif range_start: - logger.info("Resuming download %s", logged_url) + logger.log(level, "Resuming download %s", logged_url) else: - logger.info("Downloading %s", logged_url) + logger.log(level, "Downloading %s", logged_url) def _prepare_download( @@ -445,6 +454,156 @@ def __call__(self, link: Link, location: str) -> tuple[str, str]: return filepath, content_type +class _ErrorReceiver: + def __init__(self, error_flag: Event) -> None: + self._error_flag = error_flag + self._thread_exception: BaseException | None = None + + def receive_error(self, exc: BaseException) -> None: + self._error_flag.set() + self._thread_exception = exc + + def stored_error(self) -> BaseException | None: + return self._thread_exception + + +@contextmanager +def _spawn_workers( + workers: list[Thread], error_flag: Event +) -> Iterator[_ErrorReceiver]: + err_recv = _ErrorReceiver(error_flag) + try: + for w in workers: + w.start() + # We've sorted the list of worker threads so they correspond to the largest + # downloads first. Each thread immediately waits upon a semaphore to limit + # maximum parallel downloads, and we would like the semaphore's internal + # wait queue to retain the same order we established earlier (otherwise, we + # would end up nondeterministically downloading files out of our desired + # order). Yielding to the scheduler here is intended to give the thread we + # just started time to jump into the semaphore, either to execute further + # (until the semaphore is full) or to jump into the queue at the desired + # position. We seem to get the ordering reliably even without this explicit + # yield, and ideally we would like to somehow ensure this deterministically, + # but this is relatively idiomatic and lets us lean on much fewer + # synchronization constructs. We can revisit this if users find the ordering + # is unreliable. It's easy to see if we've messed up, as the rich progress + # table prominently shows each download size and which ones are executing. + time.sleep(0) + yield err_recv + except BaseException as e: + err_recv.receive_error(e) + finally: + thread_exception = err_recv.stored_error() + if thread_exception is not None: + logger.critical("Received exception, shutting down downloader threads...") + + # Ensure each thread is complete by the time the queue has exited, either by + # writing the full request contents, or by checking the Event from an exception. + for w in workers: + # If the user (reasonably) wants to hit ^C again to try to make it close + # faster, we want to avoid spewing out a ton of error text, but at least + # let's let them know we hear them and we're trying to shut down! + while w.is_alive(): + try: + w.join() + except BaseException: + logger.critical("Shutting down worker threads, please wait...") + + if thread_exception is not None: + raise thread_exception + + +def _copy_chunks( + output_queue: Queue[tuple[Link, Path, str | None] | BaseException], + error_flag: Event, + semaphore: Semaphore, + cache_semantics: _CacheSemantics, + resume_retries: int, + location: Path, + progress: BatchedProgress, + download_info: tuple[Link, TaskID, str, int | None], +) -> None: + link, task_id, filename, maybe_len = download_info + # link, task_id, filepath, content_file, resp, download = download_info + download_semantics = _ParallelTaskDownloadSemantics(progress, task_id, error_flag) + + with semaphore: + try: + with download_semantics: + resp = cache_semantics.http_get(link) + download_size = _get_http_response_size(resp) + + assert filename == _get_http_response_filename( + resp.headers, resp.url, link + ) + filepath = location / filename + with filepath.open("wb") as content_file: + download = _FileDownload(link, content_file, download_size) + lifecycle = _DownloadLifecycle( + cache_semantics, + download_semantics, + resume_retries, + ) + resp = lifecycle.execute(download, resp) + + content_type = resp.headers.get("Content-Type") + output_queue.put((link, filepath, content_type)) + except _EarlyReturn: + return + except BaseException as e: + output_queue.put(e) + + +class _EarlyReturn(Exception): ... + + +class _ParallelTaskDownloadSemantics(_DownloadSemantics): + def __init__( + self, batched_progress: BatchedProgress, task_id: TaskID, error_flag: Event + ) -> None: + self._batched_progress = batched_progress + self._task_id = task_id + self._error_flag = error_flag + + def __enter__(self) -> None: + # Check if another thread exited with an exception before we started. + if self._error_flag.is_set(): + raise _EarlyReturn + + # Notify that the current task has begun. + self._batched_progress.start_subtask(self._task_id) + + def __exit__(self, *exc: Any) -> None: + # Notify of completion. + self._batched_progress.finish_subtask(self._task_id) + + def prepare_response_chunks( + self, download: _FileDownload, resp: Response + ) -> Iterable[bytes]: + self._batched_progress.reset_subtask(self._task_id, download.bytes_received) + + _log_download_link( + download.link, + total_length=download.size, + range_start=download.bytes_received, + link_is_from_cache=is_from_cache(resp), + level=logging.DEBUG, + ) + + # TODO: different chunk size for batched downloads? + return response_chunks(resp) + + def process_chunk(self, download: _FileDownload, chunk: bytes) -> None: + # Check if another thread exited with an exception between chunks. + if self._error_flag.is_set(): + raise _EarlyReturn + # Copy chunk to output file. + download.write_chunk(chunk) + # Update progress. + self._batched_progress.advance_subtask(self._task_id, len(chunk)) + + class BatchDownloader: def __init__( self, @@ -453,6 +612,7 @@ def __init__( resume_retries: int, quiet: bool = False, color: bool = True, + max_parallelism: int | None = None, ) -> None: self._cache_semantics = _CacheSemantics(session) self._progress_bar = progress_bar @@ -460,6 +620,14 @@ def __init__( self._quiet = quiet self._color = color + if max_parallelism is None: + max_parallelism = 1 + if max_parallelism < 1: + raise CommandError( + f"invalid batch download parallelism {max_parallelism}: must be >=1" + ) + self._max_parallelism: int = max_parallelism + def _retrieve_lengths( self, links: Iterable[Link] ) -> tuple[int | None, list[tuple[Link, tuple[int | None, str]]]]: @@ -476,41 +644,18 @@ def _retrieve_lengths( break assert total_length is not None total_length += maybe_len - # Sort downloads to perform larger downloads first. + # If lengths are available, sort downloads to perform larger downloads first. if total_length is not None: # Extract the length from each tuple entry. links_with_lengths.sort(key=lambda t: cast(int, t[1][0]), reverse=True) return total_length, links_with_lengths - def _prepare_tasks( - self, - location: Path, - link_tasks: Iterable[tuple[Link, TaskID, str, int | None]], - ) -> Iterator[tuple[Link, TaskID, Path, BinaryIO, Response, _FileDownload]]: - for link, task_id, filename, maybe_len in link_tasks: - resp = self._cache_semantics.http_get(link) - download_size = _get_http_response_size(resp) - - assert filename == _get_http_response_filename(resp.headers, resp.url, link) - filepath = location / filename - content_file = filepath.open("wb") - download = _FileDownload(link, content_file, download_size) - - _log_download_link( - link, - total_length=maybe_len, - range_start=download.bytes_received, - link_is_from_cache=is_from_cache(resp), - ) - yield link, task_id, filepath, content_file, resp, download - def _construct_tasks_with_progression( self, links: Iterable[Link], - location: Path, ) -> tuple[ BatchedProgress, - list[tuple[Link, TaskID, Path, BinaryIO, Response, _FileDownload]], + list[tuple[Link, TaskID, str, int | None]], ]: total_length, links_with_lengths = self._retrieve_lengths(links) @@ -524,58 +669,69 @@ def _construct_tasks_with_progression( ) link_tasks: list[tuple[Link, TaskID, str, int | None]] = [] + #!!!! link_tasks: list[tuple[Link, TaskID, str]] = [] for link, (maybe_len, filename) in links_with_lengths: task_id = batched_progress.add_subtask(filename, maybe_len) link_tasks.append((link, task_id, filename, maybe_len)) - return batched_progress, list(self._prepare_tasks(location, link_tasks)) - - class _BatchCurrentDownloadSemantics(_DownloadSemantics): - def __init__(self, batched_progress: BatchedProgress) -> None: - self._batched_progress = batched_progress - self.task_id: TaskID | None = None - - def __enter__(self) -> None: - assert self.task_id is not None - # Notify that the current task has begun. - self._batched_progress.start_subtask(self.task_id) - - def __exit__(self, *exc: Any) -> None: - assert self.task_id is not None - # Notify of completion. - self._batched_progress.finish_subtask(self.task_id) - self.task_id = None - - def prepare_response_chunks( - self, download: _FileDownload, resp: Response - ) -> Iterable[bytes]: - # TODO: different chunk size for batched downloads? - return response_chunks(resp) - - def process_chunk(self, download: _FileDownload, chunk: bytes) -> None: - assert self.task_id is not None - download.write_chunk(chunk) - self._batched_progress.advance_subtask(self.task_id, len(chunk)) + return batched_progress, link_tasks def __call__( self, links: Iterable[Link], location: Path ) -> Iterable[tuple[Link, tuple[Path, str | None]]]: """Download the files given by links into location.""" - progress, tasks = self._construct_tasks_with_progression(links, location) - download_semantics = self._BatchCurrentDownloadSemantics(progress) + progress, tasks = self._construct_tasks_with_progression(links) + + # Set up state to track thread progress, including inner exceptions. + total_downloads: int = len(tasks) + completed_downloads: int = 0 + q: Queue[tuple[Link, Path, str | None] | BaseException] = Queue() + error_flag = Event() + # Limit downloads to 10 at a time so we can reuse our connection pool. + semaphore = Semaphore(value=self._max_parallelism) + + # Distribute request i/o across equivalent threads. + # NB: event-based/async is likely a better model than thread-per-request, but + # (1) pip doesn't use async anywhere else yet, + # (2) this is at most one thread per dependency in the graph (less if any + # are cached) + # (3) pip is fundamentally run in a synchronous context with a clear start + # and end, instead of e.g. as a server which needs to process + # arbitrary further requests at the same time. + # For these reasons, thread-per-request should be sufficient for our needs. + workers = [ + Thread( + target=_copy_chunks, + args=( + q, + error_flag, + semaphore, + self._cache_semantics, + self._resume_retries, + location, + progress, + download_info, + ), + ) + for download_info in tasks + ] with progress: - for link, task_id, filepath, content_file, resp, download in tasks: - download_semantics.task_id = task_id - - with content_file, download_semantics: - lifecycle = _DownloadLifecycle( - self._cache_semantics, - download_semantics, - self._resume_retries, - ) - resp = lifecycle.execute(download, resp) - - content_type = resp.headers.get("Content-Type") - # Yield completed link and download path. - yield link, (filepath, content_type) + with _spawn_workers(workers, error_flag) as err_recv: + # Read completed downloads from queue, or extract the exception. + while completed_downloads < total_downloads: + # Get item from queue, but also check for ^C from user! + try: + item = q.get() + except BaseException as e: + err_recv.receive_error(e) + break + # Now see if the worker thread failed with an exception (unlikely). + if isinstance(item, BaseException): + err_recv.receive_error(item) + break + # Otherwise, the thread succeeded, and we can pass it to + # the preparer! + link, filepath, content_type = item + completed_downloads += 1 + yield link, (filepath, content_type) diff --git a/src/pip/_internal/operations/prepare.py b/src/pip/_internal/operations/prepare.py index 4ab8d53d786..23c7fa6b271 100644 --- a/src/pip/_internal/operations/prepare.py +++ b/src/pip/_internal/operations/prepare.py @@ -245,6 +245,7 @@ def __init__( # noqa: PLR0913 (too many parameters) verbosity: int, quietness: int, color: bool, + batch_download_parallelism: int | None, legacy_resolver: bool, resume_retries: int, ) -> None: @@ -263,6 +264,7 @@ def __init__( # noqa: PLR0913 (too many parameters) resume_retries, quiet=quietness > 0, color=color, + max_parallelism=batch_download_parallelism, ) self.finder = finder diff --git a/tests/functional/test_download.py b/tests/functional/test_download.py index c5887aa1bf0..560da68c1f2 100644 --- a/tests/functional/test_download.py +++ b/tests/functional/test_download.py @@ -1291,6 +1291,8 @@ def run_for_generated_index( str(download_dir), "-i", "http://localhost:8000", + "--batch-download-parallelism", + "1", *args, ] result = script.pip(*pip_args, allow_error=allow_error) diff --git a/tests/unit/test_req.py b/tests/unit/test_req.py index 91635ab9b70..f9706838918 100644 --- a/tests/unit/test_req.py +++ b/tests/unit/test_req.py @@ -118,6 +118,7 @@ def _basic_resolver( verbosity=0, quietness=0, color=True, + batch_download_parallelism=None, legacy_resolver=True, resume_retries=0, )