Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/pip/_internal/cli/base_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def exc_logging_wrapper(*args: Any) -> int:
assert isinstance(status, int)
return status
except DiagnosticPipError as exc:
logger.error("[present-diagnostic] %s", exc)
logger.error("[present-rich] %s", exc)
logger.debug("Exception information:", exc_info=True)

return ERROR
Expand Down
233 changes: 142 additions & 91 deletions src/pip/_internal/self_outdated_check.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,34 @@
import datetime
import functools
import hashlib
import json
import logging
import optparse
import os.path
import sys
from typing import Any, Dict
from dataclasses import dataclass
from typing import Any, Callable, Dict, Optional

from pip._vendor.packaging.version import parse as parse_version
from pip._vendor.rich.console import Group
from pip._vendor.rich.markup import escape
from pip._vendor.rich.text import Text

from pip._internal.index.collector import LinkCollector
from pip._internal.index.package_finder import PackageFinder
from pip._internal.metadata import get_default_environment
from pip._internal.metadata.base import DistributionVersion
from pip._internal.models.selection_prefs import SelectionPreferences
from pip._internal.network.session import PipSession
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.entrypoints import (
get_best_invocation_for_this_pip,
get_best_invocation_for_this_python,
)
from pip._internal.utils.filesystem import adjacent_tmp_file, check_path_owner, replace
from pip._internal.utils.misc import ensure_dir

SELFCHECK_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ"
_DATE_FMT = "%Y-%m-%dT%H:%M:%SZ"


logger = logging.getLogger(__name__)
Expand All @@ -31,17 +42,17 @@ def _get_statefile_name(key: str) -> str:

class SelfCheckState:
def __init__(self, cache_dir: str) -> None:
self.state: Dict[str, Any] = {}
self.statefile_path = None
self._state: Dict[str, Any] = {}
self._statefile_path = None

# Try to load the existing state
if cache_dir:
self.statefile_path = os.path.join(
self._statefile_path = os.path.join(
cache_dir, "selfcheck", _get_statefile_name(self.key)
)
try:
with open(self.statefile_path, encoding="utf-8") as statefile:
self.state = json.load(statefile)
with open(self._statefile_path, encoding="utf-8") as statefile:
self._state = json.load(statefile)
except (OSError, ValueError, KeyError):
# Explicitly suppressing exceptions, since we don't want to
# error out if the cache file is invalid.
Expand All @@ -51,41 +62,87 @@ def __init__(self, cache_dir: str) -> None:
def key(self) -> str:
return sys.prefix

def save(self, pypi_version: str, current_time: datetime.datetime) -> None:
def get(self, current_time: datetime.datetime) -> Optional[str]:
"""Check if we have a not-outdated version loaded already."""
if not self._state:
return None

if "last_check" not in self._state:
return None

if "pypi_version" not in self._state:
return None

seven_days_in_seconds = 7 * 24 * 60 * 60

# Determine if we need to refresh the state
last_check = datetime.datetime.strptime(self._state["last_check"], _DATE_FMT)
seconds_since_last_check = (current_time - last_check).total_seconds()
if seconds_since_last_check > seven_days_in_seconds:
return None

return self._state["pypi_version"]

def set(self, pypi_version: str, current_time: datetime.datetime) -> None:
# If we do not have a path to cache in, don't bother saving.
if not self.statefile_path:
if not self._statefile_path:
return

# Check to make sure that we own the directory
if not check_path_owner(os.path.dirname(self.statefile_path)):
if not check_path_owner(os.path.dirname(self._statefile_path)):
return

# Now that we've ensured the directory is owned by this user, we'll go
# ahead and make sure that all our directories are created.
ensure_dir(os.path.dirname(self.statefile_path))
ensure_dir(os.path.dirname(self._statefile_path))

state = {
# Include the key so it's easy to tell which pip wrote the
# file.
"key": self.key,
"last_check": current_time.strftime(SELFCHECK_DATE_FMT),
"last_check": current_time.strftime(_DATE_FMT),
"pypi_version": pypi_version,
}

text = json.dumps(state, sort_keys=True, separators=(",", ":"))

with adjacent_tmp_file(self.statefile_path) as f:
with adjacent_tmp_file(self._statefile_path) as f:
f.write(text.encode())

try:
# Since we have a prefix-specific state file, we can just
# overwrite whatever is there, no need to check.
replace(f.name, self.statefile_path)
replace(f.name, self._statefile_path)
except OSError:
# Best effort.
pass


@dataclass
class UpgradePrompt:
old: str
new: str

def __rich__(self) -> Group:
if WINDOWS:
pip_cmd = f"{get_best_invocation_for_this_python()} -m pip"
else:
pip_cmd = get_best_invocation_for_this_pip()

notice = "[bold][[reset][blue]notice[reset][bold]][reset]"
return Group(
Text(),
Text.from_markup(
f"{notice} A new release of pip available: "
f"[red]{self.old}[reset] -> [green]{self.new}[reset]"
),
Text.from_markup(
f"{notice} To update, run: "
f"[green]{escape(pip_cmd)} install --upgrade pip"
),
)


def was_installed_by_pip(pkg: str) -> bool:
"""Checks whether pkg was installed by pip

Expand All @@ -96,6 +153,66 @@ def was_installed_by_pip(pkg: str) -> bool:
return dist is not None and "pip" == dist.installer


def _get_current_remote_pip_version(
session: PipSession, options: optparse.Values
) -> str:
# Lets use PackageFinder to see what the latest pip version is
link_collector = LinkCollector.create(
session,
options=options,
suppress_no_index=True,
)

# Pass allow_yanked=False so we don't suggest upgrading to a
# yanked version.
selection_prefs = SelectionPreferences(
allow_yanked=False,
allow_all_prereleases=False, # Explicitly set to False
)

finder = PackageFinder.create(
link_collector=link_collector,
selection_prefs=selection_prefs,
use_deprecated_html5lib=("html5lib" in options.deprecated_features_enabled),
)
best_candidate = finder.find_best_candidate("pip").best_candidate
if best_candidate is None:
return

return str(best_candidate.version)


def _self_version_check_logic(
*,
state: SelfCheckState,
current_time: datetime.datetime,
local_version: DistributionVersion,
get_remote_version: Callable[[], str],
) -> Optional[UpgradePrompt]:
remote_version_str = state.get(current_time)
if remote_version_str is None:
remote_version_str = get_remote_version()
state.set(remote_version_str, current_time)

remote_version = parse_version(remote_version_str)
logger.debug("Remote version of pip: %s", remote_version)
logger.debug("Local version of pip: %s", local_version)

pip_installed_by_pip = was_installed_by_pip("pip")
logger.debug("Was pip installed by pip? %s", pip_installed_by_pip)
if not pip_installed_by_pip:
return None # Only suggest upgrade if pip is installed by pip.

local_version_is_older = (
local_version < remote_version
and local_version.base_version != remote_version.base_version
)
if local_version_is_older:
return UpgradePrompt(old=str(local_version), new=remote_version_str)

return None


def pip_self_version_check(session: PipSession, options: optparse.Values) -> None:
"""Check for an update for pip.

Expand All @@ -107,83 +224,17 @@ def pip_self_version_check(session: PipSession, options: optparse.Values) -> Non
if not installed_dist:
return

pip_version = installed_dist.version
pypi_version = None

try:
state = SelfCheckState(cache_dir=options.cache_dir)

current_time = datetime.datetime.utcnow()
# Determine if we need to refresh the state
if "last_check" in state.state and "pypi_version" in state.state:
last_check = datetime.datetime.strptime(
state.state["last_check"], SELFCHECK_DATE_FMT
)
if (current_time - last_check).total_seconds() < 7 * 24 * 60 * 60:
pypi_version = state.state["pypi_version"]

# Refresh the version if we need to or just see if we need to warn
if pypi_version is None:
# Lets use PackageFinder to see what the latest pip version is
link_collector = LinkCollector.create(
session,
options=options,
suppress_no_index=True,
)

# Pass allow_yanked=False so we don't suggest upgrading to a
# yanked version.
selection_prefs = SelectionPreferences(
allow_yanked=False,
allow_all_prereleases=False, # Explicitly set to False
)

finder = PackageFinder.create(
link_collector=link_collector,
selection_prefs=selection_prefs,
use_deprecated_html5lib=(
"html5lib" in options.deprecated_features_enabled
),
)
best_candidate = finder.find_best_candidate("pip").best_candidate
if best_candidate is None:
return
pypi_version = str(best_candidate.version)

# save that we've performed a check
state.save(pypi_version, current_time)

remote_version = parse_version(pypi_version)

local_version_is_older = (
pip_version < remote_version
and pip_version.base_version != remote_version.base_version
and was_installed_by_pip("pip")
)

# Determine if our pypi_version is older
if not local_version_is_older:
return

# We cannot tell how the current pip is available in the current
# command context, so be pragmatic here and suggest the command
# that's always available. This does not accommodate spaces in
# `sys.executable` on purpose as it is not possible to do it
# correctly without knowing the user's shell. Thus,
# it won't be done until possible through the standard library.
# Do not be tempted to use the undocumented subprocess.list2cmdline.
# It is considered an internal implementation detail for a reason.
pip_cmd = f"{sys.executable} -m pip"
logger.warning(
"You are using pip version %s; however, version %s is "
"available.\nYou should consider upgrading via the "
"'%s install --upgrade pip' command.",
pip_version,
pypi_version,
pip_cmd,
upgrade_prompt = _self_version_check_logic(
state=SelfCheckState(cache_dir=options.cache_dir),
current_time=datetime.datetime.utcnow(),
local_version=installed_dist.version,
get_remote_version=functools.partial(
_get_current_remote_pip_version, session, options
),
)
if upgrade_prompt is not None:
logger.info("[present-rich] %s", upgrade_prompt)
except Exception:
logger.debug(
"There was an error checking the latest version of pip",
exc_info=True,
)
logger.warning("There was an error checking the latest version of pip.")
logger.debug("See below for error", exc_info=True)
52 changes: 52 additions & 0 deletions src/pip/_internal/utils/entrypoints.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
import itertools
import os
import shutil
import sys
from typing import List, Optional

from pip._internal.cli.main import main
from pip._internal.utils.compat import WINDOWS

_EXECUTABLE_NAMES = [
"pip",
f"pip{sys.version_info.major}",
f"pip{sys.version_info.major}.{sys.version_info.minor}",
]
if WINDOWS:
_allowed_extensions = {"", ".exe"}
_EXECUTABLE_NAMES = [
"".join(parts)
for parts in itertools.product(_EXECUTABLE_NAMES, _allowed_extensions)
]


def _wrapper(args: Optional[List[str]] = None) -> int:
Expand All @@ -25,3 +41,39 @@ def _wrapper(args: Optional[List[str]] = None) -> int:
"running pip directly.\n"
)
return main(args)


def get_best_invocation_for_this_pip() -> str:
"""Try to figure out the best way to invoke pip in the current environment."""
binary_directory = "Scripts" if WINDOWS else "bin"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember for certain - do we not support environments on Windows that use "bin" rather than "Scripts" (for example, MSYS2)? I note that pip._internal.locations.get_bin_prefix() allows both on Windows...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we explicitly support them. None the less, if this fails, the thing that'd happen is a less-perfect prompt will be presented. :)

I've copied this over inspired by what venv does on 3.7 -- https://github.com/python/cpython/blob/3.7/Lib/venv/__init__.py#L114

binary_prefix = os.path.join(sys.prefix, binary_directory)

# Try to use pip[X[.Y]] names, if those executables for this environment are
# the first on PATH with that name.
path_parts = os.path.normcase(os.environ.get("PATH", "")).split(os.pathsep)
exe_are_in_PATH = os.path.normcase(binary_prefix) in path_parts
if exe_are_in_PATH:
for exe_name in _EXECUTABLE_NAMES:
found_executable = shutil.which(exe_name)
if found_executable and os.path.samefile(
found_executable,
os.path.join(binary_prefix, exe_name),
):
return exe_name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume here that we don't care about cases where the user might, for example, have a shell alias of pip for some particular copy of pip - which would completely wreck all of the above logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty much. There's no way to detect or account for these things.


# Use the `-m` invocation, if there's no "nice" invocation.
return f"{get_best_invocation_for_this_python()} -m pip"


def get_best_invocation_for_this_python() -> str:
"""Try to figure out the best way to invoke the current Python."""
exe = sys.executable
exe_name = os.path.basename(exe)

# Try to use the basename, if it's the first executable.
found_executable = shutil.which(exe_name)
if found_executable and os.path.samefile(found_executable, exe):
return exe_name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach seems much more robust than the pip one, but still could be messed up by the shell alias problem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, there's nothing we can do for shell aliases anyway. If things break, I think it's fair to say "you have an alias" as a cause for the issue.


# Use the full executable name, because we couldn't find something simpler.
return exe
Loading