From c4470ba3bd1f53cecd3ddd52413cb0f4455d3234 Mon Sep 17 00:00:00 2001 From: Pradyun Gedam Date: Fri, 11 Mar 2022 12:35:49 +0000 Subject: [PATCH 01/11] Rename `[present-diagnostic]` to `[present-rich]` This makes it more reasonable to use a rich renderable for regular logging messages, instead of only using it for the diagnostic error messages. --- src/pip/_internal/cli/base_command.py | 2 +- src/pip/_internal/utils/logging.py | 11 ++++++----- src/pip/_internal/utils/subprocess.py | 2 +- tests/unit/test_utils_subprocess.py | 2 +- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/pip/_internal/cli/base_command.py b/src/pip/_internal/cli/base_command.py index 78b96bb7070..0774f26081f 100644 --- a/src/pip/_internal/cli/base_command.py +++ b/src/pip/_internal/cli/base_command.py @@ -168,7 +168,7 @@ def exc_logging_wrapper(*args: Any) -> int: assert isinstance(status, int) return status except DiagnosticPipError as exc: - logger.error("[present-diagnostic] %s", exc) + logger.error("[present-rich] %s", exc) logger.debug("Exception information:", exc_info=True) return ERROR diff --git a/src/pip/_internal/utils/logging.py b/src/pip/_internal/utils/logging.py index e6b7d5dd5d7..d6e991b89ec 100644 --- a/src/pip/_internal/utils/logging.py +++ b/src/pip/_internal/utils/logging.py @@ -21,7 +21,6 @@ from pip._vendor.rich.segment import Segment from pip._vendor.rich.style import Style -from pip._internal.exceptions import DiagnosticPipError from pip._internal.utils._log import VERBOSE, getLogger from pip._internal.utils.compat import WINDOWS from pip._internal.utils.deprecation import DEPRECATION_MSG_PREFIX @@ -154,12 +153,14 @@ def emit(self, record: logging.LogRecord) -> None: # If we are given a diagnostic error to present, present it with indentation. assert isinstance(record.args, tuple) - if record.msg == "[present-diagnostic] %s" and len(record.args) == 1: - diagnostic_error = record.args[0] - assert isinstance(diagnostic_error, DiagnosticPipError) + if record.msg == "[present-rich] %s" and len(record.args) == 1: + rich_renderable = record.args[0] + assert isinstance( + rich_renderable, ConsoleRenderable + ), f"{rich_renderable} is not rich-console-renderable" renderable: ConsoleRenderable = IndentedRenderable( - diagnostic_error, indent=get_indentation() + rich_renderable, indent=get_indentation() ) else: message = self.format(record) diff --git a/src/pip/_internal/utils/subprocess.py b/src/pip/_internal/utils/subprocess.py index 69c01c7ddcd..cf5bf6be1f6 100644 --- a/src/pip/_internal/utils/subprocess.py +++ b/src/pip/_internal/utils/subprocess.py @@ -209,7 +209,7 @@ def call_subprocess( output_lines=all_output if not showing_subprocess else None, ) if log_failed_cmd: - subprocess_logger.error("[present-diagnostic] %s", error) + subprocess_logger.error("[present-rich] %s", error) subprocess_logger.verbose( "[bold magenta]full command[/]: [blue]%s[/]", escape(format_command_args(cmd)), diff --git a/tests/unit/test_utils_subprocess.py b/tests/unit/test_utils_subprocess.py index ea3a7b26175..a694b717fcb 100644 --- a/tests/unit/test_utils_subprocess.py +++ b/tests/unit/test_utils_subprocess.py @@ -262,7 +262,7 @@ def test_info_logging__subprocess_error( [ # pytest's caplog overrides th formatter, which means that we # won't see the message formatted through our formatters. - ("pip.subprocessor", ERROR, "[present-diagnostic]"), + ("pip.subprocessor", ERROR, "[present-rich]"), ], ) # The spinner should spin three times in this case since the From ea0d976d9f3680fa5f62d3849320df89fe67bb17 Mon Sep 17 00:00:00 2001 From: Pradyun Gedam Date: Fri, 11 Mar 2022 12:36:42 +0000 Subject: [PATCH 02/11] Add a way to get a simpler invocation of pip This makes it possible to provide more terse suggestions that are still correct, depending on the user's environment. --- src/pip/_internal/utils/entrypoints.py | 47 ++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/src/pip/_internal/utils/entrypoints.py b/src/pip/_internal/utils/entrypoints.py index 1504a12916b..81c7c0ebe5b 100644 --- a/src/pip/_internal/utils/entrypoints.py +++ b/src/pip/_internal/utils/entrypoints.py @@ -1,7 +1,23 @@ +import itertools +import os +import shutil import sys from typing import List, Optional from pip._internal.cli.main import main +from pip._internal.utils.compat import WINDOWS + +_EXECUTABLE_NAMES = [ + "pip", + f"pip{sys.version_info.major}", + f"pip{sys.version_info.major}.{sys.version_info.minor}", +] +if WINDOWS: + _allowed_extensions = {"", ".exe", ".EXE"} + _EXECUTABLE_NAMES = [ + "".join(parts) + for parts in itertools.product(_EXECUTABLE_NAMES, _allowed_extensions) + ] def _wrapper(args: Optional[List[str]] = None) -> int: @@ -25,3 +41,34 @@ def _wrapper(args: Optional[List[str]] = None) -> int: "running pip directly.\n" ) return main(args) + + +def get_best_invocation_for_this_pip() -> str: + """Try to figure out the best way to invoke pip in the current environment.""" + binary_directory = "Scripts" if WINDOWS else "bin" + binary_prefix = os.path.join(sys.prefix, binary_directory) + + # Try to use pip[X[.Y]] names, if those executables for this environment are + # the first on PATH with that name. + exe_are_in_PATH = binary_prefix in os.environ.get("PATH", "").split(os.pathsep) + if exe_are_in_PATH: + for exe_name in _EXECUTABLE_NAMES: + if shutil.which(exe_name) == os.path.join(binary_prefix, exe_name): + return exe_name + + # Use the `-m` invocation, if there's no "nice" invocation. + return f"{get_best_invocation_for_this_python()} -m pip" + + +def get_best_invocation_for_this_python() -> str: + """Try to figure out the best way to invoke the current Python.""" + exe = sys.executable + exe_name = os.path.basename(exe) + + # Try to use the basename, if it's the first executable. + found_executable = shutil.which(exe_name) + if found_executable and os.path.samefile(found_executable, exe): + return exe_name + + # Use the full executable name, because we couldn't find something simpler. + return exe From 4f35572ae76c45368d2e8eff50cbc4fb5430d6be Mon Sep 17 00:00:00 2001 From: Pradyun Gedam Date: Fri, 11 Mar 2022 12:37:14 +0000 Subject: [PATCH 03/11] Refactor the logic in self_outdated_check - Move the lookup within `selfcheck/*.json` files into a method on `SelfCheckState`. - Factor out `PackageFinder` interaction into a separate function. - Rename variables to more clearly reflect what they're for. Co-Authored-By: Pradyun Gedam --- src/pip/_internal/self_outdated_check.py | 134 +++++++++++++---------- tests/unit/test_self_check_outdated.py | 28 ++--- 2 files changed, 88 insertions(+), 74 deletions(-) diff --git a/src/pip/_internal/self_outdated_check.py b/src/pip/_internal/self_outdated_check.py index 7300e0ea4c0..b5d720d1708 100644 --- a/src/pip/_internal/self_outdated_check.py +++ b/src/pip/_internal/self_outdated_check.py @@ -5,7 +5,7 @@ import optparse import os.path import sys -from typing import Any, Dict +from typing import Any, Dict, Optional from pip._vendor.packaging.version import parse as parse_version @@ -17,7 +17,7 @@ from pip._internal.utils.filesystem import adjacent_tmp_file, check_path_owner, replace from pip._internal.utils.misc import ensure_dir -SELFCHECK_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ" +_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ" logger = logging.getLogger(__name__) @@ -31,17 +31,17 @@ def _get_statefile_name(key: str) -> str: class SelfCheckState: def __init__(self, cache_dir: str) -> None: - self.state: Dict[str, Any] = {} - self.statefile_path = None + self._state: Dict[str, Any] = {} + self._statefile_path = None # Try to load the existing state if cache_dir: - self.statefile_path = os.path.join( + self._statefile_path = os.path.join( cache_dir, "selfcheck", _get_statefile_name(self.key) ) try: - with open(self.statefile_path, encoding="utf-8") as statefile: - self.state = json.load(statefile) + with open(self._statefile_path, encoding="utf-8") as statefile: + self._state = json.load(statefile) except (OSError, ValueError, KeyError): # Explicitly suppressing exceptions, since we don't want to # error out if the cache file is invalid. @@ -51,36 +51,57 @@ def __init__(self, cache_dir: str) -> None: def key(self) -> str: return sys.prefix - def save(self, pypi_version: str, current_time: datetime.datetime) -> None: + def get(self, current_time: datetime.datetime) -> Optional[str]: + """Check if we have a not-outdated version loaded already.""" + if not self._state: + return None + + if "last_check" not in self._state: + return None + + if "pypi_version" not in self._state: + return None + + seven_days_in_seconds = 7 * 24 * 60 * 60 + + # Determine if we need to refresh the state + last_check = datetime.datetime.strptime(self._state["last_check"], _DATE_FMT) + seconds_since_last_check = (current_time - last_check).total_seconds() + if seconds_since_last_check > seven_days_in_seconds: + return None + + return self._state["pypi_version"] + + def set(self, pypi_version: str, current_time: datetime.datetime) -> None: # If we do not have a path to cache in, don't bother saving. - if not self.statefile_path: + if not self._statefile_path: return # Check to make sure that we own the directory - if not check_path_owner(os.path.dirname(self.statefile_path)): + if not check_path_owner(os.path.dirname(self._statefile_path)): return # Now that we've ensured the directory is owned by this user, we'll go # ahead and make sure that all our directories are created. - ensure_dir(os.path.dirname(self.statefile_path)) + ensure_dir(os.path.dirname(self._statefile_path)) state = { # Include the key so it's easy to tell which pip wrote the # file. "key": self.key, - "last_check": current_time.strftime(SELFCHECK_DATE_FMT), + "last_check": current_time.strftime(_DATE_FMT), "pypi_version": pypi_version, } text = json.dumps(state, sort_keys=True, separators=(",", ":")) - with adjacent_tmp_file(self.statefile_path) as f: + with adjacent_tmp_file(self._statefile_path) as f: f.write(text.encode()) try: # Since we have a prefix-specific state file, we can just # overwrite whatever is there, no need to check. - replace(f.name, self.statefile_path) + replace(f.name, self._statefile_path) except OSError: # Best effort. pass @@ -96,6 +117,35 @@ def was_installed_by_pip(pkg: str) -> bool: return dist is not None and "pip" == dist.installer +def _get_current_remote_pip_version( + session: PipSession, options: optparse.Values +) -> str: + # Lets use PackageFinder to see what the latest pip version is + link_collector = LinkCollector.create( + session, + options=options, + suppress_no_index=True, + ) + + # Pass allow_yanked=False so we don't suggest upgrading to a + # yanked version. + selection_prefs = SelectionPreferences( + allow_yanked=False, + allow_all_prereleases=False, # Explicitly set to False + ) + + finder = PackageFinder.create( + link_collector=link_collector, + selection_prefs=selection_prefs, + use_deprecated_html5lib=("html5lib" in options.deprecated_features_enabled), + ) + best_candidate = finder.find_best_candidate("pip").best_candidate + if best_candidate is None: + return + + return str(best_candidate.version) + + def pip_self_version_check(session: PipSession, options: optparse.Values) -> None: """Check for an update for pip. @@ -107,61 +157,25 @@ def pip_self_version_check(session: PipSession, options: optparse.Values) -> Non if not installed_dist: return - pip_version = installed_dist.version - pypi_version = None + local_version = installed_dist.version try: state = SelfCheckState(cache_dir=options.cache_dir) current_time = datetime.datetime.utcnow() - # Determine if we need to refresh the state - if "last_check" in state.state and "pypi_version" in state.state: - last_check = datetime.datetime.strptime( - state.state["last_check"], SELFCHECK_DATE_FMT - ) - if (current_time - last_check).total_seconds() < 7 * 24 * 60 * 60: - pypi_version = state.state["pypi_version"] - - # Refresh the version if we need to or just see if we need to warn - if pypi_version is None: - # Lets use PackageFinder to see what the latest pip version is - link_collector = LinkCollector.create( - session, - options=options, - suppress_no_index=True, - ) + remote_version_str = state.get(current_time) - # Pass allow_yanked=False so we don't suggest upgrading to a - # yanked version. - selection_prefs = SelectionPreferences( - allow_yanked=False, - allow_all_prereleases=False, # Explicitly set to False - ) + if remote_version_str is None: + remote_version_str = _get_current_remote_pip_version(session, options) + state.set(remote_version_str, current_time) - finder = PackageFinder.create( - link_collector=link_collector, - selection_prefs=selection_prefs, - use_deprecated_html5lib=( - "html5lib" in options.deprecated_features_enabled - ), - ) - best_candidate = finder.find_best_candidate("pip").best_candidate - if best_candidate is None: - return - pypi_version = str(best_candidate.version) - - # save that we've performed a check - state.save(pypi_version, current_time) - - remote_version = parse_version(pypi_version) + remote_version = parse_version(remote_version_str) local_version_is_older = ( - pip_version < remote_version - and pip_version.base_version != remote_version.base_version + local_version < remote_version + and local_version.base_version != remote_version.base_version and was_installed_by_pip("pip") ) - - # Determine if our pypi_version is older if not local_version_is_older: return @@ -178,8 +192,8 @@ def pip_self_version_check(session: PipSession, options: optparse.Values) -> Non "You are using pip version %s; however, version %s is " "available.\nYou should consider upgrading via the " "'%s install --upgrade pip' command.", - pip_version, - pypi_version, + local_version, + remote_version_str, pip_cmd, ) except Exception: diff --git a/tests/unit/test_self_check_outdated.py b/tests/unit/test_self_check_outdated.py index d41de249e19..5174bdaea47 100644 --- a/tests/unit/test_self_check_outdated.py +++ b/tests/unit/test_self_check_outdated.py @@ -131,8 +131,8 @@ def test_pip_self_version_check( monkeypatch.setattr(logger, "debug", mock.Mock()) fake_state = mock.Mock( - state={"last_check": stored_time, "pypi_version": installed_ver}, - save=mock.Mock(), + get=mock.Mock(return_value=None), + set=mock.Mock(), ) monkeypatch.setattr(self_outdated_check, "SelfCheckState", lambda **kw: fake_state) @@ -146,16 +146,16 @@ def test_pip_self_version_check( ): pip_self_version_check(PipSession(), _options()) - # See that we saved the correct version + # See that we set the correct version if check_if_upgrade_required: - assert fake_state.save.call_args_list == [ + assert fake_state.set.call_args_list == [ mock.call(new_ver, datetime.datetime(1970, 1, 9, 10, 00, 00)), ] elif installed_ver: # Make sure no Exceptions - assert not cast(mock.Mock, logger.debug).call_args_list - # See that save was not called - assert fake_state.save.call_args_list == [] + assert not cast(mock.Mock, logger.warning).call_args_list + # See that set was not called + assert fake_state.set.call_args_list == [] # Ensure we warn the user or not if check_warn_logs: @@ -188,8 +188,8 @@ def _get_statefile_path(cache_dir: str, key: str) -> str: def test_self_check_state_no_cache_dir() -> None: state = SelfCheckState(cache_dir="") - assert state.state == {} - assert state.statefile_path is None + assert state._state == {} + assert state._statefile_path is None def test_self_check_state_key_uses_sys_prefix(monkeypatch: pytest.MonkeyPatch) -> None: @@ -225,8 +225,8 @@ def test_self_check_state_reads_expected_statefile( monkeypatch.setattr(sys, "prefix", key) state = self_outdated_check.SelfCheckState(str(cache_dir)) - assert state.state["last_check"] == last_check - assert state.state["pypi_version"] == pypi_version + assert state._state["last_check"] == last_check + assert state._state["pypi_version"] == pypi_version def test_self_check_state_writes_expected_statefile( @@ -238,20 +238,20 @@ def test_self_check_state_writes_expected_statefile( statefile_path = _get_statefile_path(str(cache_dir), key) last_check = datetime.datetime.strptime( - "1970-01-02T11:00:00Z", self_outdated_check.SELFCHECK_DATE_FMT + "1970-01-02T11:00:00Z", self_outdated_check._DATE_FMT ) pypi_version = "1.0" monkeypatch.setattr(sys, "prefix", key) state = self_outdated_check.SelfCheckState(str(cache_dir)) - state.save(pypi_version, last_check) + state.set(pypi_version, last_check) with open(statefile_path) as f: saved = json.load(f) expected = { "key": key, - "last_check": last_check.strftime(self_outdated_check.SELFCHECK_DATE_FMT), + "last_check": last_check.strftime(self_outdated_check._DATE_FMT), "pypi_version": pypi_version, } assert expected == saved From 4a3c4c90ef127eef6c185fd9a3cb64b892e16a66 Mon Sep 17 00:00:00 2001 From: Pradyun Gedam Date: Fri, 11 Mar 2022 12:37:30 +0000 Subject: [PATCH 04/11] Improve style of the pip upgrade prompt This style makes the upgrade prompt less of blurb of yellow text, making it easier to read while clarifying the presentation style to make it easier to figure out what the user is supposed to run. Co-Authored-By: Pradyun Gedam --- src/pip/_internal/self_outdated_check.py | 52 +++++++++++++++--------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/src/pip/_internal/self_outdated_check.py b/src/pip/_internal/self_outdated_check.py index b5d720d1708..4b2bbff3d6d 100644 --- a/src/pip/_internal/self_outdated_check.py +++ b/src/pip/_internal/self_outdated_check.py @@ -5,15 +5,20 @@ import optparse import os.path import sys +from dataclasses import dataclass from typing import Any, Dict, Optional from pip._vendor.packaging.version import parse as parse_version +from pip._vendor.rich.console import Group +from pip._vendor.rich.markup import escape +from pip._vendor.rich.text import Text from pip._internal.index.collector import LinkCollector from pip._internal.index.package_finder import PackageFinder from pip._internal.metadata import get_default_environment from pip._internal.models.selection_prefs import SelectionPreferences from pip._internal.network.session import PipSession +from pip._internal.utils.entrypoints import get_best_invocation_for_this_pip from pip._internal.utils.filesystem import adjacent_tmp_file, check_path_owner, replace from pip._internal.utils.misc import ensure_dir @@ -107,6 +112,28 @@ def set(self, pypi_version: str, current_time: datetime.datetime) -> None: pass +@dataclass +class UpgradePrompt: + old: str + new: str + + def __rich__(self) -> Group: + pip_cmd = get_best_invocation_for_this_pip() + + notice = "[bold][[reset][blue]notice[reset][bold]][reset]" + return Group( + Text(), + Text.from_markup( + f"{notice} A new release of pip available: " + f"[red]{self.old}[reset] -> [green]{self.new}[reset]" + ), + Text.from_markup( + f"{notice} To update, run: " + f"[green]{escape(pip_cmd)} install --upgrade pip" + ), + ) + + def was_installed_by_pip(pkg: str) -> bool: """Checks whether pkg was installed by pip @@ -179,25 +206,10 @@ def pip_self_version_check(session: PipSession, options: optparse.Values) -> Non if not local_version_is_older: return - # We cannot tell how the current pip is available in the current - # command context, so be pragmatic here and suggest the command - # that's always available. This does not accommodate spaces in - # `sys.executable` on purpose as it is not possible to do it - # correctly without knowing the user's shell. Thus, - # it won't be done until possible through the standard library. - # Do not be tempted to use the undocumented subprocess.list2cmdline. - # It is considered an internal implementation detail for a reason. - pip_cmd = f"{sys.executable} -m pip" - logger.warning( - "You are using pip version %s; however, version %s is " - "available.\nYou should consider upgrading via the " - "'%s install --upgrade pip' command.", - local_version, - remote_version_str, - pip_cmd, + logger.info( + "[present-rich] %s", + UpgradePrompt(old=str(local_version), new=remote_version_str), ) except Exception: - logger.debug( - "There was an error checking the latest version of pip", - exc_info=True, - ) + logger.warning("There was an error checking the latest version of pip.") + logger.debug("See below for error", exc_info=True) From 59db67c8b25a6d140f15be3d45beb194dd1d6be6 Mon Sep 17 00:00:00 2001 From: Pradyun Gedam Date: Fri, 11 Mar 2022 12:39:17 +0000 Subject: [PATCH 05/11] Add some debugging logs to `self_outdated_check` This would make it easier to diagnose what an issue might be, related to this logic, since the things that affect whether the message is printed, are now available in the debugging logs. --- src/pip/_internal/self_outdated_check.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/pip/_internal/self_outdated_check.py b/src/pip/_internal/self_outdated_check.py index 4b2bbff3d6d..ad3ccc91459 100644 --- a/src/pip/_internal/self_outdated_check.py +++ b/src/pip/_internal/self_outdated_check.py @@ -197,11 +197,16 @@ def pip_self_version_check(session: PipSession, options: optparse.Values) -> Non state.set(remote_version_str, current_time) remote_version = parse_version(remote_version_str) + logger.debug("Remote version of pip: %s", remote_version) + logger.debug("Local version of pip: %s", local_version) + + pip_installed_by_pip = was_installed_by_pip("pip") + logger.debug("Was installed by pip: %s", pip_installed_by_pip) local_version_is_older = ( local_version < remote_version and local_version.base_version != remote_version.base_version - and was_installed_by_pip("pip") + and pip_installed_by_pip ) if not local_version_is_older: return From e247195e21cfd9da16bceffb4bfbe51e299497d9 Mon Sep 17 00:00:00 2001 From: Pradyun Gedam Date: Fri, 8 Apr 2022 15:13:32 +0100 Subject: [PATCH 06/11] Move self-check logic into a dedicated function This makes it easier to test this code. --- src/pip/_internal/self_outdated_check.py | 72 ++++++++++++++---------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/src/pip/_internal/self_outdated_check.py b/src/pip/_internal/self_outdated_check.py index ad3ccc91459..26d3508d06d 100644 --- a/src/pip/_internal/self_outdated_check.py +++ b/src/pip/_internal/self_outdated_check.py @@ -1,4 +1,5 @@ import datetime +import functools import hashlib import json import logging @@ -6,7 +7,7 @@ import os.path import sys from dataclasses import dataclass -from typing import Any, Dict, Optional +from typing import Any, Callable, Dict, Optional from pip._vendor.packaging.version import parse as parse_version from pip._vendor.rich.console import Group @@ -16,6 +17,7 @@ from pip._internal.index.collector import LinkCollector from pip._internal.index.package_finder import PackageFinder from pip._internal.metadata import get_default_environment +from pip._internal.metadata.base import DistributionVersion from pip._internal.models.selection_prefs import SelectionPreferences from pip._internal.network.session import PipSession from pip._internal.utils.entrypoints import get_best_invocation_for_this_pip @@ -173,6 +175,36 @@ def _get_current_remote_pip_version( return str(best_candidate.version) +def _self_version_check_logic( + *, + state: SelfCheckState, + current_time: datetime.datetime, + local_version: DistributionVersion, + get_remote_version: Callable[[], str], +) -> Optional[UpgradePrompt]: + remote_version_str = state.get(current_time) + if remote_version_str is None: + remote_version_str = get_remote_version() + state.set(remote_version_str, current_time) + + remote_version = parse_version(remote_version_str) + logger.debug("Remote version of pip: %s", remote_version) + logger.debug("Local version of pip: %s", local_version) + + pip_installed_by_pip = was_installed_by_pip("pip") + logger.debug("Was pip installed by pip? %s", pip_installed_by_pip) + + local_version_is_older = ( + local_version < remote_version + and local_version.base_version != remote_version.base_version + and pip_installed_by_pip + ) + if local_version_is_older: + return UpgradePrompt(old=str(local_version), new=remote_version_str) + + return None + + def pip_self_version_check(session: PipSession, options: optparse.Values) -> None: """Check for an update for pip. @@ -184,37 +216,17 @@ def pip_self_version_check(session: PipSession, options: optparse.Values) -> Non if not installed_dist: return - local_version = installed_dist.version - try: - state = SelfCheckState(cache_dir=options.cache_dir) - - current_time = datetime.datetime.utcnow() - remote_version_str = state.get(current_time) - - if remote_version_str is None: - remote_version_str = _get_current_remote_pip_version(session, options) - state.set(remote_version_str, current_time) - - remote_version = parse_version(remote_version_str) - logger.debug("Remote version of pip: %s", remote_version) - logger.debug("Local version of pip: %s", local_version) - - pip_installed_by_pip = was_installed_by_pip("pip") - logger.debug("Was installed by pip: %s", pip_installed_by_pip) - - local_version_is_older = ( - local_version < remote_version - and local_version.base_version != remote_version.base_version - and pip_installed_by_pip - ) - if not local_version_is_older: - return - - logger.info( - "[present-rich] %s", - UpgradePrompt(old=str(local_version), new=remote_version_str), + upgrade_prompt = _self_version_check_logic( + state=SelfCheckState(cache_dir=options.cache_dir), + current_time=datetime.datetime.utcnow(), + local_version=installed_dist.version, + get_remote_version=functools.partial( + _get_current_remote_pip_version, session, options + ), ) + if upgrade_prompt is not None: + logger.info("[present-rich] %s", upgrade_prompt) except Exception: logger.warning("There was an error checking the latest version of pip.") logger.debug("See below for error", exc_info=True) From ca84a83633adeb25bec614fa63284195b14e44f8 Mon Sep 17 00:00:00 2001 From: Pradyun Gedam Date: Fri, 8 Apr 2022 19:42:27 +0100 Subject: [PATCH 07/11] Rework the tests for `self_check_outdated` to do less mocking This makes the tests more reliable and easier to reason about. This also adds a few more cases under the test conditions. --- tests/unit/test_self_check_outdated.py | 377 ++++++++++--------------- 1 file changed, 149 insertions(+), 228 deletions(-) diff --git a/tests/unit/test_self_check_outdated.py b/tests/unit/test_self_check_outdated.py index 5174bdaea47..86f7d2c7949 100644 --- a/tests/unit/test_self_check_outdated.py +++ b/tests/unit/test_self_check_outdated.py @@ -1,257 +1,178 @@ import datetime -import functools import json -import os import sys -from typing import Any, Optional, cast -from unittest import mock +from optparse import Values +from typing import Optional +from unittest.mock import ANY, Mock, patch -import freezegun import pytest -from pip._vendor.packaging.version import parse as parse_version +from freezegun import freeze_time +from pip._vendor.packaging.version import Version from pip._internal import self_outdated_check -from pip._internal.models.candidate import InstallationCandidate -from pip._internal.models.link import Link -from pip._internal.network.session import PipSession -from pip._internal.self_outdated_check import ( - SelfCheckState, - logger, - pip_self_version_check, -) from tests.lib.path import Path -class MockBestCandidateResult: - def __init__(self, best: InstallationCandidate) -> None: - self.best_candidate = best - - -class MockPackageFinder: - - BASE_URL = "https://pypi.org/simple/pip-{0}.tar.gz" - PIP_PROJECT_NAME = "pip" - INSTALLATION_CANDIDATES = [ - InstallationCandidate( - PIP_PROJECT_NAME, - "6.9.0", - Link(BASE_URL.format("6.9.0")), - ), - InstallationCandidate( - PIP_PROJECT_NAME, - "3.3.1", - Link(BASE_URL.format("3.3.1")), +@pytest.mark.parametrize( + ["key", "expected"], + [ + ( + "/hello/world/venv", + "fcd2d5175dd33d5df759ee7b045264230205ef837bf9f582f7c3ada7", ), - InstallationCandidate( - PIP_PROJECT_NAME, - "1.0", - Link(BASE_URL.format("1.0")), + ( + "C:\\Users\\User\\Desktop\\venv", + "902cecc0745b8ecf2509ba473f3556f0ba222fedc6df433acda24aa5", ), - ] - - @classmethod - def create(cls, *args: Any, **kwargs: Any) -> "MockPackageFinder": - return cls() - - def find_best_candidate(self, project_name: str) -> MockBestCandidateResult: - return MockBestCandidateResult(self.INSTALLATION_CANDIDATES[0]) - - -class MockDistribution: - def __init__(self, installer: str, version: str) -> None: - self.installer = installer - self.version = parse_version(version) - - -class MockEnvironment: - def __init__(self, installer: str, installed_version: Optional[str]) -> None: - self.installer = installer - self.installed_version = installed_version - - def get_distribution(self, name: str) -> Optional[MockDistribution]: - if self.installed_version is None: - return None - return MockDistribution(self.installer, self.installed_version) + ], +) +def test_get_statefile_name_known_values(key: str, expected: str) -> None: + assert expected == self_outdated_check._get_statefile_name(key) -def _options() -> mock.Mock: - """Some default options that we pass to - self_outdated_check.pip_self_version_check""" - return mock.Mock( - find_links=[], - index_url="default_url", - extra_index_urls=[], - no_index=False, - pre=False, - cache_dir="", - deprecated_features_enabled=[], +@freeze_time("1970-01-02T11:00:00Z") +@patch("pip._internal.self_outdated_check._self_version_check_logic") +@patch("pip._internal.self_outdated_check.SelfCheckState") +def test_pip_self_version_check_calls_underlying_implementation( + mocked_state: Mock, mocked_function: Mock, tmpdir: Path +) -> None: + # GIVEN + mock_session = Mock() + fake_options = Values(dict(cache_dir=str(tmpdir))) + + # WHEN + self_outdated_check.pip_self_version_check(mock_session, fake_options) + + # THEN + mocked_state.assert_called_once_with(cache_dir=str(tmpdir)) + mocked_function.assert_called_once_with( + state=mocked_state(cache_dir=str(tmpdir)), + current_time=datetime.datetime(1970, 1, 2, 11, 0, 0), + local_version=ANY, + get_remote_version=ANY, ) @pytest.mark.parametrize( [ - "stored_time", - "installed_ver", - "new_ver", - "installer", - "check_if_upgrade_required", - "check_warn_logs", + "installed_version", + "remote_version", + "stored_version", + "installed_by_pip", + "should_show_prompt", ], [ - # Test we return None when installed version is None - ("1970-01-01T10:00:00Z", None, "1.0", "pip", False, False), - # Need an upgrade - upgrade warning should print - ("1970-01-01T10:00:00Z", "1.0", "6.9.0", "pip", True, True), - # Upgrade available, pip installed via rpm - warning should not print - ("1970-01-01T10:00:00Z", "1.0", "6.9.0", "rpm", True, False), - # No upgrade - upgrade warning should not print - ("1970-01-9T10:00:00Z", "6.9.0", "6.9.0", "pip", False, False), + # A newer version available! + ("1.0", "2.0", None, True, True), + # A newer version available, and cached value is new too! + ("1.0", "2.0", "2.0", True, True), + # A newer version available, but was not installed by pip. + ("1.0", "2.0", None, False, False), + # On the latest version already. + ("2.0", "2.0", None, True, False), + # On the latest version already, and cached value matches. + ("2.0", "2.0", "2.0", True, False), + # A newer version available, but cached value is older. + ("1.0", "2.0", "1.0", True, False), ], ) -def test_pip_self_version_check( +def test_core_logic( + installed_version: str, + remote_version: str, + stored_version: Optional[str], + installed_by_pip: bool, + should_show_prompt: bool, + caplog: pytest.LogCaptureFixture, monkeypatch: pytest.MonkeyPatch, - stored_time: str, - installed_ver: Optional[str], - new_ver: str, - installer: str, - check_if_upgrade_required: bool, - check_warn_logs: bool, ) -> None: + # GIVEN monkeypatch.setattr( - self_outdated_check, - "get_default_environment", - functools.partial(MockEnvironment, installer, installed_ver), - ) - monkeypatch.setattr( - self_outdated_check, - "PackageFinder", - MockPackageFinder, - ) - monkeypatch.setattr(logger, "warning", mock.Mock()) - monkeypatch.setattr(logger, "debug", mock.Mock()) - - fake_state = mock.Mock( - get=mock.Mock(return_value=None), - set=mock.Mock(), + self_outdated_check, "was_installed_by_pip", lambda _: installed_by_pip ) - monkeypatch.setattr(self_outdated_check, "SelfCheckState", lambda **kw: fake_state) - - with freezegun.freeze_time( - "1970-01-09 10:00:00", - ignore=[ - "six.moves", - "pip._vendor.six.moves", - "pip._vendor.requests.packages.urllib3.packages.six.moves", - ], - ): - pip_self_version_check(PipSession(), _options()) - - # See that we set the correct version - if check_if_upgrade_required: - assert fake_state.set.call_args_list == [ - mock.call(new_ver, datetime.datetime(1970, 1, 9, 10, 00, 00)), - ] - elif installed_ver: - # Make sure no Exceptions - assert not cast(mock.Mock, logger.warning).call_args_list - # See that set was not called - assert fake_state.set.call_args_list == [] + mock_state = Mock() + mock_state.get.return_value = stored_version + fake_time = datetime.datetime(2000, 1, 1, 0, 0, 0) + version_that_should_be_checked = stored_version or remote_version + + # WHEN + with caplog.at_level("DEBUG"): + return_value = self_outdated_check._self_version_check_logic( + state=mock_state, + current_time=fake_time, + local_version=Version(installed_version), + get_remote_version=lambda: remote_version, + ) + + # THEN + mock_state.get.assert_called_once_with(fake_time) + assert caplog.messages == [ + f"Remote version of pip: {version_that_should_be_checked}", + f"Local version of pip: {installed_version}", + f"Was pip installed by pip? {installed_by_pip}", + ] - # Ensure we warn the user or not - if check_warn_logs: - assert cast(mock.Mock, logger.warning).call_count == 1 + if stored_version: + mock_state.set.assert_not_called() else: - assert cast(mock.Mock, logger.warning).call_count == 0 - - -statefile_name_case_1 = "fcd2d5175dd33d5df759ee7b045264230205ef837bf9f582f7c3ada7" - -statefile_name_case_2 = "902cecc0745b8ecf2509ba473f3556f0ba222fedc6df433acda24aa5" - - -@pytest.mark.parametrize( - "key,expected", - [ - ("/hello/world/venv", statefile_name_case_1), - ("C:\\Users\\User\\Desktop\\venv", statefile_name_case_2), - ], -) -def test_get_statefile_name_known_values(key: str, expected: str) -> None: - assert expected == self_outdated_check._get_statefile_name(key) - - -def _get_statefile_path(cache_dir: str, key: str) -> str: - return os.path.join( - cache_dir, "selfcheck", self_outdated_check._get_statefile_name(key) - ) - - -def test_self_check_state_no_cache_dir() -> None: - state = SelfCheckState(cache_dir="") - assert state._state == {} - assert state._statefile_path is None - - -def test_self_check_state_key_uses_sys_prefix(monkeypatch: pytest.MonkeyPatch) -> None: - key = "helloworld" - - monkeypatch.setattr(sys, "prefix", key) - state = self_outdated_check.SelfCheckState("") - - assert state.key == key - - -def test_self_check_state_reads_expected_statefile( - monkeypatch: pytest.MonkeyPatch, tmpdir: Path -) -> None: - cache_dir = tmpdir / "cache_dir" - cache_dir.mkdir() - key = "helloworld" - statefile_path = _get_statefile_path(str(cache_dir), key) - - last_check = "1970-01-02T11:00:00Z" - pypi_version = "1.0" - content = { - "key": key, - "last_check": last_check, - "pypi_version": pypi_version, - } - - Path(statefile_path).parent.mkdir() - - with open(statefile_path, "w") as f: - json.dump(content, f) - - monkeypatch.setattr(sys, "prefix", key) - state = self_outdated_check.SelfCheckState(str(cache_dir)) - - assert state._state["last_check"] == last_check - assert state._state["pypi_version"] == pypi_version - - -def test_self_check_state_writes_expected_statefile( - monkeypatch: pytest.MonkeyPatch, tmpdir: Path -) -> None: - cache_dir = tmpdir / "cache_dir" - cache_dir.mkdir() - key = "helloworld" - statefile_path = _get_statefile_path(str(cache_dir), key) - - last_check = datetime.datetime.strptime( - "1970-01-02T11:00:00Z", self_outdated_check._DATE_FMT - ) - pypi_version = "1.0" - - monkeypatch.setattr(sys, "prefix", key) - state = self_outdated_check.SelfCheckState(str(cache_dir)) - - state.set(pypi_version, last_check) - with open(statefile_path) as f: - saved = json.load(f) - - expected = { - "key": key, - "last_check": last_check.strftime(self_outdated_check._DATE_FMT), - "pypi_version": pypi_version, - } - assert expected == saved + mock_state.set.assert_called_once_with( + version_that_should_be_checked, fake_time + ) + + if not should_show_prompt: + assert return_value is None + return # the remaining assertions are for the other case. + + assert return_value is not None + assert return_value.old == installed_version + assert return_value.new == remote_version + + +class TestSelfCheckState: + def test_no_cache(self) -> None: + # GIVEN / WHEN + state = self_outdated_check.SelfCheckState(cache_dir="") + assert state._statefile_path is None + + def test_reads_expected_statefile(self, tmpdir: Path) -> None: + # GIVEN + cache_dir = tmpdir / "cache_dir" + expected_path = ( + cache_dir + / "selfcheck" + / self_outdated_check._get_statefile_name(sys.prefix) + ) + + cache_dir.mkdir() + (cache_dir / "selfcheck").mkdir() + expected_path.write_text('{"foo": "bar"}') + + # WHEN + state = self_outdated_check.SelfCheckState(cache_dir=str(cache_dir)) + + # THEN + assert state._statefile_path == expected_path + assert state._state == {"foo": "bar"} + + def test_writes_expected_statefile(self, tmpdir: Path) -> None: + # GIVEN + cache_dir = tmpdir / "cache_dir" + cache_dir.mkdir() + expected_path = ( + cache_dir + / "selfcheck" + / self_outdated_check._get_statefile_name(sys.prefix) + ) + + # WHEN + state = self_outdated_check.SelfCheckState(cache_dir=str(cache_dir)) + state.set("1.0.0", datetime.datetime(2000, 1, 1, 0, 0, 0)) + + # THEN + assert state._statefile_path == expected_path + + contents = expected_path.read_text() + assert json.loads(contents) == { + "key": sys.prefix, + "last_check": "2000-01-01T00:00:00Z", + "pypi_version": "1.0.0", + } From ee72f55de62e0e2c189a2cd7475d800600deae8a Mon Sep 17 00:00:00 2001 From: Pradyun Gedam Date: Fri, 22 Apr 2022 15:56:57 +0100 Subject: [PATCH 08/11] Add an early return with comment if pip wasn't installed by pip This addresses a review comment around the code style, and makes it possible to provide additonal context in the comment. --- src/pip/_internal/self_outdated_check.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pip/_internal/self_outdated_check.py b/src/pip/_internal/self_outdated_check.py index 26d3508d06d..d2f1213ed78 100644 --- a/src/pip/_internal/self_outdated_check.py +++ b/src/pip/_internal/self_outdated_check.py @@ -193,11 +193,12 @@ def _self_version_check_logic( pip_installed_by_pip = was_installed_by_pip("pip") logger.debug("Was pip installed by pip? %s", pip_installed_by_pip) + if not pip_installed_by_pip: + return None # Only suggest upgrade if pip is installed by pip. local_version_is_older = ( local_version < remote_version and local_version.base_version != remote_version.base_version - and pip_installed_by_pip ) if local_version_is_older: return UpgradePrompt(old=str(local_version), new=remote_version_str) From 26d8441100ae1e091559c9e4a6349bc06d4503cf Mon Sep 17 00:00:00 2001 From: Pradyun Gedam Date: Fri, 22 Apr 2022 16:21:59 +0100 Subject: [PATCH 09/11] Pacify linters by passing `logging.DEBUG` instead of `"DEBUG"` --- tests/unit/test_self_check_outdated.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_self_check_outdated.py b/tests/unit/test_self_check_outdated.py index 86f7d2c7949..edfc4e82f19 100644 --- a/tests/unit/test_self_check_outdated.py +++ b/tests/unit/test_self_check_outdated.py @@ -1,5 +1,6 @@ import datetime import json +import logging import sys from optparse import Values from typing import Optional @@ -95,7 +96,7 @@ def test_core_logic( version_that_should_be_checked = stored_version or remote_version # WHEN - with caplog.at_level("DEBUG"): + with caplog.at_level(logging.DEBUG): return_value = self_outdated_check._self_version_check_logic( state=mock_state, current_time=fake_time, From 4e53eaf3e7cbacf521d550262935ce61e85e565b Mon Sep 17 00:00:00 2001 From: Pradyun Gedam Date: Sat, 23 Apr 2022 12:02:45 +0100 Subject: [PATCH 10/11] Use `python -m pip` for the upgrade prompt on Windows --- src/pip/_internal/self_outdated_check.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/pip/_internal/self_outdated_check.py b/src/pip/_internal/self_outdated_check.py index d2f1213ed78..ad62dd27b00 100644 --- a/src/pip/_internal/self_outdated_check.py +++ b/src/pip/_internal/self_outdated_check.py @@ -20,7 +20,11 @@ from pip._internal.metadata.base import DistributionVersion from pip._internal.models.selection_prefs import SelectionPreferences from pip._internal.network.session import PipSession -from pip._internal.utils.entrypoints import get_best_invocation_for_this_pip +from pip._internal.utils.compat import WINDOWS +from pip._internal.utils.entrypoints import ( + get_best_invocation_for_this_pip, + get_best_invocation_for_this_python, +) from pip._internal.utils.filesystem import adjacent_tmp_file, check_path_owner, replace from pip._internal.utils.misc import ensure_dir @@ -120,7 +124,10 @@ class UpgradePrompt: new: str def __rich__(self) -> Group: - pip_cmd = get_best_invocation_for_this_pip() + if WINDOWS: + pip_cmd = f"{get_best_invocation_for_this_python()} -m pip" + else: + pip_cmd = get_best_invocation_for_this_pip() notice = "[bold][[reset][blue]notice[reset][bold]][reset]" return Group( From 85ccf51b6483e8cef1bfa314c90bd62110eac082 Mon Sep 17 00:00:00 2001 From: Pradyun Gedam Date: Sat, 23 Apr 2022 12:04:09 +0100 Subject: [PATCH 11/11] Better accomodate for case-insensitive file systems --- src/pip/_internal/utils/entrypoints.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/pip/_internal/utils/entrypoints.py b/src/pip/_internal/utils/entrypoints.py index 81c7c0ebe5b..f292c64045b 100644 --- a/src/pip/_internal/utils/entrypoints.py +++ b/src/pip/_internal/utils/entrypoints.py @@ -13,7 +13,7 @@ f"pip{sys.version_info.major}.{sys.version_info.minor}", ] if WINDOWS: - _allowed_extensions = {"", ".exe", ".EXE"} + _allowed_extensions = {"", ".exe"} _EXECUTABLE_NAMES = [ "".join(parts) for parts in itertools.product(_EXECUTABLE_NAMES, _allowed_extensions) @@ -50,10 +50,15 @@ def get_best_invocation_for_this_pip() -> str: # Try to use pip[X[.Y]] names, if those executables for this environment are # the first on PATH with that name. - exe_are_in_PATH = binary_prefix in os.environ.get("PATH", "").split(os.pathsep) + path_parts = os.path.normcase(os.environ.get("PATH", "")).split(os.pathsep) + exe_are_in_PATH = os.path.normcase(binary_prefix) in path_parts if exe_are_in_PATH: for exe_name in _EXECUTABLE_NAMES: - if shutil.which(exe_name) == os.path.join(binary_prefix, exe_name): + found_executable = shutil.which(exe_name) + if found_executable and os.path.samefile( + found_executable, + os.path.join(binary_prefix, exe_name), + ): return exe_name # Use the `-m` invocation, if there's no "nice" invocation.