diff --git a/news/10111.trivial.rst b/news/10111.trivial.rst
new file mode 100644
index 00000000000..e646d70f418
--- /dev/null
+++ b/news/10111.trivial.rst
@@ -0,0 +1 @@
+Converted type commentaries into annotations in ``pip/_internal/index``.
diff --git a/src/pip/_internal/index/collector.py b/src/pip/_internal/index/collector.py
index 0721e3683f9..14d745eefbb 100644
--- a/src/pip/_internal/index/collector.py
+++ b/src/pip/_internal/index/collector.py
@@ -46,8 +46,7 @@
ResponseHeaders = MutableMapping[str, str]
-def _match_vcs_scheme(url):
- # type: (str) -> Optional[str]
+def _match_vcs_scheme(url: str) -> Optional[str]:
"""Look for VCS schemes in the URL.
Returns the matched VCS scheme, or None if there's no match.
@@ -59,15 +58,13 @@ def _match_vcs_scheme(url):
class _NotHTML(Exception):
- def __init__(self, content_type, request_desc):
- # type: (str, str) -> None
+ def __init__(self, content_type: str, request_desc: str) -> None:
super().__init__(content_type, request_desc)
self.content_type = content_type
self.request_desc = request_desc
-def _ensure_html_header(response):
- # type: (Response) -> None
+def _ensure_html_header(response: Response) -> None:
"""Check the Content-Type header to ensure the response contains HTML.
Raises `_NotHTML` if the content type is not text/html.
@@ -81,8 +78,7 @@ class _NotHTTP(Exception):
pass
-def _ensure_html_response(url, session):
- # type: (str, PipSession) -> None
+def _ensure_html_response(url: str, session: PipSession) -> None:
"""Send a HEAD request to the URL, and ensure the response contains HTML.
Raises `_NotHTTP` if the URL is not available for a HEAD request, or
@@ -98,8 +94,7 @@ def _ensure_html_response(url, session):
_ensure_html_header(resp)
-def _get_html_response(url, session):
- # type: (str, PipSession) -> Response
+def _get_html_response(url: str, session: PipSession) -> Response:
"""Access an HTML page with GET, and return the response.
This consists of three parts:
@@ -149,8 +144,7 @@ def _get_html_response(url, session):
return resp
-def _get_encoding_from_headers(headers):
- # type: (ResponseHeaders) -> Optional[str]
+def _get_encoding_from_headers(headers: ResponseHeaders) -> Optional[str]:
"""Determine if we have any encoding information in our headers.
"""
if headers and "Content-Type" in headers:
@@ -160,8 +154,7 @@ def _get_encoding_from_headers(headers):
return None
-def _determine_base_url(document, page_url):
- # type: (HTMLElement, str) -> str
+def _determine_base_url(document: HTMLElement, page_url: str) -> str:
"""Determine the HTML document's base URL.
This looks for a ```` tag in the HTML document. If present, its href
@@ -180,8 +173,7 @@ def _determine_base_url(document, page_url):
return page_url
-def _clean_url_path_part(part):
- # type: (str) -> str
+def _clean_url_path_part(part: str) -> str:
"""
Clean a "part" of a URL path (i.e. after splitting on "@" characters).
"""
@@ -189,8 +181,7 @@ def _clean_url_path_part(part):
return urllib.parse.quote(urllib.parse.unquote(part))
-def _clean_file_url_path(part):
- # type: (str) -> str
+def _clean_file_url_path(part: str) -> str:
"""
Clean the first part of a URL path that corresponds to a local
filesystem path (i.e. the first part after splitting on "@" characters).
@@ -207,8 +198,7 @@ def _clean_file_url_path(part):
_reserved_chars_re = re.compile('(@|%2F)', re.IGNORECASE)
-def _clean_url_path(path, is_local_path):
- # type: (str, bool) -> str
+def _clean_url_path(path: str, is_local_path: bool) -> str:
"""
Clean the path portion of a URL.
"""
@@ -230,8 +220,7 @@ def _clean_url_path(path, is_local_path):
return ''.join(cleaned_parts)
-def _clean_link(url):
- # type: (str) -> str
+def _clean_link(url: str) -> str:
"""
Make sure a link is fully quoted.
For example, if ' ' occurs in the URL, it will be replaced with "%20",
@@ -247,11 +236,10 @@ def _clean_link(url):
def _create_link_from_element(
- anchor, # type: HTMLElement
- page_url, # type: str
- base_url, # type: str
-):
- # type: (...) -> Optional[Link]
+ anchor: HTMLElement,
+ page_url: str,
+ base_url: str,
+) -> Optional[Link]:
"""
Convert an anchor element in a simple repository page to a Link.
"""
@@ -278,25 +266,21 @@ def _create_link_from_element(
class CacheablePageContent:
- def __init__(self, page):
- # type: (HTMLPage) -> None
+ def __init__(self, page: "HTMLPage") -> None:
assert page.cache_link_parsing
self.page = page
- def __eq__(self, other):
- # type: (object) -> bool
+ def __eq__(self, other: object) -> bool:
return (isinstance(other, type(self)) and
self.page.url == other.page.url)
- def __hash__(self):
- # type: () -> int
+ def __hash__(self) -> int:
return hash(self.page.url)
def with_cached_html_pages(
- fn, # type: Callable[[HTMLPage], Iterable[Link]]
-):
- # type: (...) -> Callable[[HTMLPage], List[Link]]
+ fn: Callable[["HTMLPage"], Iterable[Link]],
+) -> Callable[["HTMLPage"], List[Link]]:
"""
Given a function that parses an Iterable[Link] from an HTMLPage, cache the
function's result (keyed by CacheablePageContent), unless the HTMLPage
@@ -304,13 +288,11 @@ def with_cached_html_pages(
"""
@functools.lru_cache(maxsize=None)
- def wrapper(cacheable_page):
- # type: (CacheablePageContent) -> List[Link]
+ def wrapper(cacheable_page: CacheablePageContent) -> List[Link]:
return list(fn(cacheable_page.page))
@functools.wraps(fn)
- def wrapper_wrapper(page):
- # type: (HTMLPage) -> List[Link]
+ def wrapper_wrapper(page: "HTMLPage") -> List[Link]:
if page.cache_link_parsing:
return wrapper(CacheablePageContent(page))
return list(fn(page))
@@ -319,8 +301,7 @@ def wrapper_wrapper(page):
@with_cached_html_pages
-def parse_links(page):
- # type: (HTMLPage) -> Iterable[Link]
+def parse_links(page: "HTMLPage") -> Iterable[Link]:
"""
Parse an HTML document, and yield its anchor elements as Link objects.
"""
@@ -348,12 +329,11 @@ class HTMLPage:
def __init__(
self,
- content, # type: bytes
- encoding, # type: Optional[str]
- url, # type: str
- cache_link_parsing=True, # type: bool
- ):
- # type: (...) -> None
+ content: bytes,
+ encoding: Optional[str],
+ url: str,
+ cache_link_parsing: bool = True,
+ ) -> None:
"""
:param encoding: the encoding to decode the given content.
:param url: the URL from which the HTML was downloaded.
@@ -366,24 +346,21 @@ def __init__(
self.url = url
self.cache_link_parsing = cache_link_parsing
- def __str__(self):
- # type: () -> str
+ def __str__(self) -> str:
return redact_auth_from_url(self.url)
def _handle_get_page_fail(
- link, # type: Link
- reason, # type: Union[str, Exception]
- meth=None # type: Optional[Callable[..., None]]
-):
- # type: (...) -> None
+ link: Link,
+ reason: Union[str, Exception],
+ meth: Optional[Callable[..., None]] = None
+) -> None:
if meth is None:
meth = logger.debug
meth("Could not fetch URL %s: %s - skipping", link, reason)
-def _make_html_page(response, cache_link_parsing=True):
- # type: (Response, bool) -> HTMLPage
+def _make_html_page(response: Response, cache_link_parsing: bool = True) -> HTMLPage:
encoding = _get_encoding_from_headers(response.headers)
return HTMLPage(
response.content,
@@ -392,8 +369,9 @@ def _make_html_page(response, cache_link_parsing=True):
cache_link_parsing=cache_link_parsing)
-def _get_html_page(link, session=None):
- # type: (Link, Optional[PipSession]) -> Optional[HTMLPage]
+def _get_html_page(
+ link: Link, session: Optional[PipSession] = None
+) -> Optional["HTMLPage"]:
if session is None:
raise TypeError(
"_get_html_page() missing 1 required keyword argument: 'session'"
@@ -465,16 +443,18 @@ class LinkCollector:
def __init__(
self,
- session, # type: PipSession
- search_scope, # type: SearchScope
- ):
- # type: (...) -> None
+ session: PipSession,
+ search_scope: SearchScope,
+ ) -> None:
self.search_scope = search_scope
self.session = session
@classmethod
- def create(cls, session, options, suppress_no_index=False):
- # type: (PipSession, Values, bool) -> LinkCollector
+ def create(
+ cls, session: PipSession,
+ options: Values,
+ suppress_no_index: bool = False
+ ) -> "LinkCollector":
"""
:param session: The Session to use to make requests.
:param suppress_no_index: Whether to ignore the --no-index option
@@ -500,12 +480,10 @@ def create(cls, session, options, suppress_no_index=False):
return link_collector
@property
- def find_links(self):
- # type: () -> List[str]
+ def find_links(self) -> List[str]:
return self.search_scope.find_links
- def fetch_page(self, location):
- # type: (Link) -> Optional[HTMLPage]
+ def fetch_page(self, location: Link) -> Optional[HTMLPage]:
"""
Fetch an HTML page containing package links.
"""
diff --git a/src/pip/_internal/index/package_finder.py b/src/pip/_internal/index/package_finder.py
index 8fa310ee30b..a6451b62e4b 100644
--- a/src/pip/_internal/index/package_finder.py
+++ b/src/pip/_internal/index/package_finder.py
@@ -51,11 +51,10 @@
def _check_link_requires_python(
- link, # type: Link
- version_info, # type: Tuple[int, int, int]
- ignore_requires_python=False, # type: bool
-):
- # type: (...) -> bool
+ link: Link,
+ version_info: Tuple[int, int, int],
+ ignore_requires_python: bool = False,
+) -> bool:
"""
Return whether the given Python version is compatible with a link's
"Requires-Python" value.
@@ -107,14 +106,13 @@ class LinkEvaluator:
# people when reading the code.
def __init__(
self,
- project_name, # type: str
- canonical_name, # type: str
- formats, # type: FrozenSet[str]
- target_python, # type: TargetPython
- allow_yanked, # type: bool
- ignore_requires_python=None, # type: Optional[bool]
- ):
- # type: (...) -> None
+ project_name: str,
+ canonical_name: str,
+ formats: FrozenSet[str],
+ target_python: TargetPython,
+ allow_yanked: bool,
+ ignore_requires_python: Optional[bool] = None,
+ ) -> None:
"""
:param project_name: The user supplied package name.
:param canonical_name: The canonical package name.
@@ -143,8 +141,7 @@ def __init__(
self.project_name = project_name
- def evaluate_link(self, link):
- # type: (Link) -> Tuple[bool, Optional[str]]
+ def evaluate_link(self, link: Link) -> Tuple[bool, Optional[str]]:
"""
Determine whether a link is a candidate for installation.
@@ -233,11 +230,10 @@ def evaluate_link(self, link):
def filter_unallowed_hashes(
- candidates, # type: List[InstallationCandidate]
- hashes, # type: Hashes
- project_name, # type: str
-):
- # type: (...) -> List[InstallationCandidate]
+ candidates: List[InstallationCandidate],
+ hashes: Hashes,
+ project_name: str,
+) -> List[InstallationCandidate]:
"""
Filter out candidates whose hashes aren't allowed, and return a new
list of candidates.
@@ -316,10 +312,9 @@ class CandidatePreferences:
def __init__(
self,
- prefer_binary=False, # type: bool
- allow_all_prereleases=False, # type: bool
- ):
- # type: (...) -> None
+ prefer_binary: bool = False,
+ allow_all_prereleases: bool = False,
+ ) -> None:
"""
:param allow_all_prereleases: Whether to allow all pre-releases.
"""
@@ -336,11 +331,10 @@ class BestCandidateResult:
def __init__(
self,
- candidates, # type: List[InstallationCandidate]
- applicable_candidates, # type: List[InstallationCandidate]
- best_candidate, # type: Optional[InstallationCandidate]
- ):
- # type: (...) -> None
+ candidates: List[InstallationCandidate],
+ applicable_candidates: List[InstallationCandidate],
+ best_candidate: Optional[InstallationCandidate],
+ ) -> None:
"""
:param candidates: A sequence of all available candidates found.
:param applicable_candidates: The applicable candidates.
@@ -359,14 +353,12 @@ def __init__(
self.best_candidate = best_candidate
- def iter_all(self):
- # type: () -> Iterable[InstallationCandidate]
+ def iter_all(self) -> Iterable[InstallationCandidate]:
"""Iterate through all candidates.
"""
return iter(self._candidates)
- def iter_applicable(self):
- # type: () -> Iterable[InstallationCandidate]
+ def iter_applicable(self) -> Iterable[InstallationCandidate]:
"""Iterate through the applicable candidates.
"""
return iter(self._applicable_candidates)
@@ -382,14 +374,13 @@ class CandidateEvaluator:
@classmethod
def create(
cls,
- project_name, # type: str
- target_python=None, # type: Optional[TargetPython]
- prefer_binary=False, # type: bool
- allow_all_prereleases=False, # type: bool
- specifier=None, # type: Optional[specifiers.BaseSpecifier]
- hashes=None, # type: Optional[Hashes]
- ):
- # type: (...) -> CandidateEvaluator
+ project_name: str,
+ target_python: Optional[TargetPython] = None,
+ prefer_binary: bool = False,
+ allow_all_prereleases: bool = False,
+ specifier: Optional[specifiers.BaseSpecifier] = None,
+ hashes: Optional[Hashes] = None,
+ ) -> "CandidateEvaluator":
"""Create a CandidateEvaluator object.
:param target_python: The target Python interpreter to use when
@@ -418,14 +409,13 @@ def create(
def __init__(
self,
- project_name, # type: str
- supported_tags, # type: List[Tag]
- specifier, # type: specifiers.BaseSpecifier
- prefer_binary=False, # type: bool
- allow_all_prereleases=False, # type: bool
- hashes=None, # type: Optional[Hashes]
- ):
- # type: (...) -> None
+ project_name: str,
+ supported_tags: List[Tag],
+ specifier: specifiers.BaseSpecifier,
+ prefer_binary: bool = False,
+ allow_all_prereleases: bool = False,
+ hashes: Optional[Hashes] = None,
+ ) -> None:
"""
:param supported_tags: The PEP 425 tags supported by the target
Python in order of preference (most preferred first).
@@ -445,9 +435,8 @@ def __init__(
def get_applicable_candidates(
self,
- candidates, # type: List[InstallationCandidate]
- ):
- # type: (...) -> List[InstallationCandidate]
+ candidates: List[InstallationCandidate],
+ ) -> List[InstallationCandidate]:
"""
Return the applicable candidates from a list of candidates.
"""
@@ -481,8 +470,7 @@ def get_applicable_candidates(
return sorted(filtered_applicable_candidates, key=self._sort_key)
- def _sort_key(self, candidate):
- # type: (InstallationCandidate) -> CandidateSortingKey
+ def _sort_key(self, candidate: InstallationCandidate) -> CandidateSortingKey:
"""
Function to pass as the `key` argument to a call to sorted() to sort
InstallationCandidates by preference.
@@ -546,9 +534,8 @@ def _sort_key(self, candidate):
def sort_best_candidate(
self,
- candidates, # type: List[InstallationCandidate]
- ):
- # type: (...) -> Optional[InstallationCandidate]
+ candidates: List[InstallationCandidate],
+ ) -> Optional[InstallationCandidate]:
"""
Return the best candidate per the instance's sort order, or None if
no candidate is acceptable.
@@ -560,9 +547,8 @@ def sort_best_candidate(
def compute_best_candidate(
self,
- candidates, # type: List[InstallationCandidate]
- ):
- # type: (...) -> BestCandidateResult
+ candidates: List[InstallationCandidate],
+ ) -> BestCandidateResult:
"""
Compute and return a `BestCandidateResult` instance.
"""
@@ -586,14 +572,13 @@ class PackageFinder:
def __init__(
self,
- link_collector, # type: LinkCollector
- target_python, # type: TargetPython
- allow_yanked, # type: bool
- format_control=None, # type: Optional[FormatControl]
- candidate_prefs=None, # type: CandidatePreferences
- ignore_requires_python=None, # type: Optional[bool]
- ):
- # type: (...) -> None
+ link_collector: LinkCollector,
+ target_python: TargetPython,
+ allow_yanked: bool,
+ format_control: Optional[FormatControl] = None,
+ candidate_prefs: Optional[CandidatePreferences] = None,
+ ignore_requires_python: Optional[bool] = None,
+ ) -> None:
"""
This constructor is primarily meant to be used by the create() class
method and from tests.
@@ -627,11 +612,10 @@ def __init__(
@classmethod
def create(
cls,
- link_collector, # type: LinkCollector
- selection_prefs, # type: SelectionPreferences
- target_python=None, # type: Optional[TargetPython]
- ):
- # type: (...) -> PackageFinder
+ link_collector: LinkCollector,
+ selection_prefs: SelectionPreferences,
+ target_python: Optional[TargetPython] = None,
+ ) -> "PackageFinder":
"""Create a PackageFinder.
:param selection_prefs: The candidate selection preferences, as a
@@ -658,56 +642,45 @@ def create(
)
@property
- def target_python(self):
- # type: () -> TargetPython
+ def target_python(self) -> TargetPython:
return self._target_python
@property
- def search_scope(self):
- # type: () -> SearchScope
+ def search_scope(self) -> SearchScope:
return self._link_collector.search_scope
@search_scope.setter
- def search_scope(self, search_scope):
- # type: (SearchScope) -> None
+ def search_scope(self, search_scope: SearchScope) -> None:
self._link_collector.search_scope = search_scope
@property
- def find_links(self):
- # type: () -> List[str]
+ def find_links(self) -> List[str]:
return self._link_collector.find_links
@property
- def index_urls(self):
- # type: () -> List[str]
+ def index_urls(self) -> List[str]:
return self.search_scope.index_urls
@property
- def trusted_hosts(self):
- # type: () -> Iterable[str]
+ def trusted_hosts(self) -> Iterable[str]:
for host_port in self._link_collector.session.pip_trusted_origins:
yield build_netloc(*host_port)
@property
- def allow_all_prereleases(self):
- # type: () -> bool
+ def allow_all_prereleases(self) -> bool:
return self._candidate_prefs.allow_all_prereleases
- def set_allow_all_prereleases(self):
- # type: () -> None
+ def set_allow_all_prereleases(self) -> None:
self._candidate_prefs.allow_all_prereleases = True
@property
- def prefer_binary(self):
- # type: () -> bool
+ def prefer_binary(self) -> bool:
return self._candidate_prefs.prefer_binary
- def set_prefer_binary(self):
- # type: () -> None
+ def set_prefer_binary(self) -> None:
self._candidate_prefs.prefer_binary = True
- def make_link_evaluator(self, project_name):
- # type: (str) -> LinkEvaluator
+ def make_link_evaluator(self, project_name: str) -> LinkEvaluator:
canonical_name = canonicalize_name(project_name)
formats = self.format_control.get_allowed_formats(canonical_name)
@@ -720,8 +693,7 @@ def make_link_evaluator(self, project_name):
ignore_requires_python=self._ignore_requires_python,
)
- def _sort_links(self, links):
- # type: (Iterable[Link]) -> List[Link]
+ def _sort_links(self, links: Iterable[Link]) -> List[Link]:
"""
Returns elements of links in order, non-egg links first, egg links
second, while eliminating duplicates
@@ -737,16 +709,16 @@ def _sort_links(self, links):
no_eggs.append(link)
return no_eggs + eggs
- def _log_skipped_link(self, link, reason):
- # type: (Link, str) -> None
+ def _log_skipped_link(self, link: Link, reason: str) -> None:
if link not in self._logged_links:
# Put the link at the end so the reason is more visible and because
# the link string is usually very long.
logger.debug('Skipping link: %s: %s', reason, link)
self._logged_links.add(link)
- def get_install_candidate(self, link_evaluator, link):
- # type: (LinkEvaluator, Link) -> Optional[InstallationCandidate]
+ def get_install_candidate(
+ self, link_evaluator: LinkEvaluator, link: Link
+ ) -> Optional[InstallationCandidate]:
"""
If the link is a candidate for install, convert it to an
InstallationCandidate and return it. Otherwise, return None.
@@ -763,8 +735,9 @@ def get_install_candidate(self, link_evaluator, link):
version=result,
)
- def evaluate_links(self, link_evaluator, links):
- # type: (LinkEvaluator, Iterable[Link]) -> List[InstallationCandidate]
+ def evaluate_links(
+ self, link_evaluator: LinkEvaluator, links: Iterable[Link]
+ ) -> List[InstallationCandidate]:
"""
Convert links that are candidates to InstallationCandidate objects.
"""
@@ -776,8 +749,9 @@ def evaluate_links(self, link_evaluator, links):
return candidates
- def process_project_url(self, project_url, link_evaluator):
- # type: (Link, LinkEvaluator) -> List[InstallationCandidate]
+ def process_project_url(
+ self, project_url: Link, link_evaluator: LinkEvaluator
+ ) -> List[InstallationCandidate]:
logger.debug(
'Fetching project page and analyzing links: %s', project_url,
)
@@ -796,8 +770,7 @@ def process_project_url(self, project_url, link_evaluator):
return package_links
@functools.lru_cache(maxsize=None)
- def find_all_candidates(self, project_name):
- # type: (str) -> List[InstallationCandidate]
+ def find_all_candidates(self, project_name: str) -> List[InstallationCandidate]:
"""Find all available InstallationCandidate for project_name
This checks index_urls and find_links.
@@ -844,11 +817,10 @@ def find_all_candidates(self, project_name):
def make_candidate_evaluator(
self,
- project_name, # type: str
- specifier=None, # type: Optional[specifiers.BaseSpecifier]
- hashes=None, # type: Optional[Hashes]
- ):
- # type: (...) -> CandidateEvaluator
+ project_name: str,
+ specifier: Optional[specifiers.BaseSpecifier] = None,
+ hashes: Optional[Hashes] = None,
+ ) -> CandidateEvaluator:
"""Create a CandidateEvaluator object to use.
"""
candidate_prefs = self._candidate_prefs
@@ -864,11 +836,10 @@ def make_candidate_evaluator(
@functools.lru_cache(maxsize=None)
def find_best_candidate(
self,
- project_name, # type: str
- specifier=None, # type: Optional[specifiers.BaseSpecifier]
- hashes=None, # type: Optional[Hashes]
- ):
- # type: (...) -> BestCandidateResult
+ project_name: str,
+ specifier: Optional[specifiers.BaseSpecifier] = None,
+ hashes: Optional[Hashes] = None,
+ ) -> BestCandidateResult:
"""Find matches for the given project and specifier.
:param specifier: An optional object implementing `filter`
@@ -885,8 +856,9 @@ def find_best_candidate(
)
return candidate_evaluator.compute_best_candidate(candidates)
- def find_requirement(self, req, upgrade):
- # type: (InstallRequirement, bool) -> Optional[InstallationCandidate]
+ def find_requirement(
+ self, req: InstallRequirement, upgrade: bool
+ ) -> Optional[InstallationCandidate]:
"""Try to find a Link matching req
Expects req, an InstallRequirement and upgrade, a boolean
@@ -903,8 +875,7 @@ def find_requirement(self, req, upgrade):
if req.satisfied_by is not None:
installed_version = parse_version(req.satisfied_by.version)
- def _format_versions(cand_iter):
- # type: (Iterable[InstallationCandidate]) -> str
+ def _format_versions(cand_iter: Iterable[InstallationCandidate]) -> str:
# This repeated parse_version and str() conversion is needed to
# handle different vendoring sources from pip and pkg_resources.
# If we stop using the pkg_resources provided specifier and start
@@ -967,8 +938,7 @@ def _format_versions(cand_iter):
return best_candidate
-def _find_name_version_sep(fragment, canonical_name):
- # type: (str, str) -> int
+def _find_name_version_sep(fragment: str, canonical_name: str) -> int:
"""Find the separator's index based on the package's canonical name.
:param fragment: A + filename "fragment" (stem) or
@@ -994,8 +964,7 @@ def _find_name_version_sep(fragment, canonical_name):
raise ValueError(f"{fragment} does not match {canonical_name}")
-def _extract_version_from_fragment(fragment, canonical_name):
- # type: (str, str) -> Optional[str]
+def _extract_version_from_fragment(fragment: str, canonical_name: str) -> Optional[str]:
"""Parse the version string from a + filename
"fragment" (stem) or egg fragment.