Change venv

This commit is contained in:
Ambulance Clerc
2023-05-31 08:31:22 +02:00
parent fb6f579089
commit fdbb52c96f
466 changed files with 25899 additions and 64721 deletions

View File

@@ -2,30 +2,32 @@
The main purpose of this module is to expose LinkCollector.collect_sources().
"""
import cgi
import collections
import email.message
import functools
import html
import itertools
import json
import logging
import os
import re
import urllib.parse
import urllib.request
import xml.etree.ElementTree
from html.parser import HTMLParser
from optparse import Values
from typing import (
TYPE_CHECKING,
Callable,
Dict,
Iterable,
List,
MutableMapping,
NamedTuple,
Optional,
Sequence,
Tuple,
Union,
)
from pip._vendor import html5lib, requests
from pip._vendor import requests
from pip._vendor.requests import Response
from pip._vendor.requests.exceptions import RetryError, SSLError
@@ -35,14 +37,18 @@ from pip._internal.models.search_scope import SearchScope
from pip._internal.network.session import PipSession
from pip._internal.network.utils import raise_for_status
from pip._internal.utils.filetypes import is_archive_file
from pip._internal.utils.misc import pairwise, redact_auth_from_url
from pip._internal.utils.misc import redact_auth_from_url
from pip._internal.vcs import vcs
from .sources import CandidatesFromPage, LinkSource, build_source
if TYPE_CHECKING:
from typing import Protocol
else:
Protocol = object
logger = logging.getLogger(__name__)
HTMLElement = xml.etree.ElementTree.Element
ResponseHeaders = MutableMapping[str, str]
@@ -52,70 +58,90 @@ def _match_vcs_scheme(url: str) -> Optional[str]:
Returns the matched VCS scheme, or None if there's no match.
"""
for scheme in vcs.schemes:
if url.lower().startswith(scheme) and url[len(scheme)] in '+:':
if url.lower().startswith(scheme) and url[len(scheme)] in "+:":
return scheme
return None
class _NotHTML(Exception):
class _NotAPIContent(Exception):
def __init__(self, content_type: str, request_desc: str) -> None:
super().__init__(content_type, request_desc)
self.content_type = content_type
self.request_desc = request_desc
def _ensure_html_header(response: Response) -> None:
"""Check the Content-Type header to ensure the response contains HTML.
Raises `_NotHTML` if the content type is not text/html.
def _ensure_api_header(response: Response) -> None:
"""
content_type = response.headers.get("Content-Type", "")
if not content_type.lower().startswith("text/html"):
raise _NotHTML(content_type, response.request.method)
Check the Content-Type header to ensure the response contains a Simple
API Response.
Raises `_NotAPIContent` if the content type is not a valid content-type.
"""
content_type = response.headers.get("Content-Type", "Unknown")
content_type_l = content_type.lower()
if content_type_l.startswith(
(
"text/html",
"application/vnd.pypi.simple.v1+html",
"application/vnd.pypi.simple.v1+json",
)
):
return
raise _NotAPIContent(content_type, response.request.method)
class _NotHTTP(Exception):
pass
def _ensure_html_response(url: str, session: PipSession) -> None:
"""Send a HEAD request to the URL, and ensure the response contains HTML.
def _ensure_api_response(url: str, session: PipSession) -> None:
"""
Send a HEAD request to the URL, and ensure the response contains a simple
API Response.
Raises `_NotHTTP` if the URL is not available for a HEAD request, or
`_NotHTML` if the content type is not text/html.
`_NotAPIContent` if the content type is not a valid content type.
"""
scheme, netloc, path, query, fragment = urllib.parse.urlsplit(url)
if scheme not in {'http', 'https'}:
if scheme not in {"http", "https"}:
raise _NotHTTP()
resp = session.head(url, allow_redirects=True)
raise_for_status(resp)
_ensure_html_header(resp)
_ensure_api_header(resp)
def _get_html_response(url: str, session: PipSession) -> Response:
"""Access an HTML page with GET, and return the response.
def _get_simple_response(url: str, session: PipSession) -> Response:
"""Access an Simple API response with GET, and return the response.
This consists of three parts:
1. If the URL looks suspiciously like an archive, send a HEAD first to
check the Content-Type is HTML, to avoid downloading a large file.
Raise `_NotHTTP` if the content type cannot be determined, or
`_NotHTML` if it is not HTML.
check the Content-Type is HTML or Simple API, to avoid downloading a
large file. Raise `_NotHTTP` if the content type cannot be determined, or
`_NotAPIContent` if it is not HTML or a Simple API.
2. Actually perform the request. Raise HTTP exceptions on network failures.
3. Check the Content-Type header to make sure we got HTML, and raise
`_NotHTML` otherwise.
3. Check the Content-Type header to make sure we got a Simple API response,
and raise `_NotAPIContent` otherwise.
"""
if is_archive_file(Link(url).filename):
_ensure_html_response(url, session=session)
_ensure_api_response(url, session=session)
logger.debug('Getting page %s', redact_auth_from_url(url))
logger.debug("Getting page %s", redact_auth_from_url(url))
resp = session.get(
url,
headers={
"Accept": "text/html",
"Accept": ", ".join(
[
"application/vnd.pypi.simple.v1+json",
"application/vnd.pypi.simple.v1+html; q=0.1",
"text/html; q=0.01",
]
),
# We don't want to blindly returned cached data for
# /simple/, because authors generally expecting that
# twine upload && pip install will function, but if
@@ -137,153 +163,52 @@ def _get_html_response(url: str, session: PipSession) -> Response:
# The check for archives above only works if the url ends with
# something that looks like an archive. However that is not a
# requirement of an url. Unless we issue a HEAD request on every
# url we cannot know ahead of time for sure if something is HTML
# or not. However we can check after we've downloaded it.
_ensure_html_header(resp)
# url we cannot know ahead of time for sure if something is a
# Simple API response or not. However we can check after we've
# downloaded it.
_ensure_api_header(resp)
logger.debug(
"Fetched page %s as %s",
redact_auth_from_url(url),
resp.headers.get("Content-Type", "Unknown"),
)
return resp
def _get_encoding_from_headers(headers: ResponseHeaders) -> Optional[str]:
"""Determine if we have any encoding information in our headers.
"""
"""Determine if we have any encoding information in our headers."""
if headers and "Content-Type" in headers:
content_type, params = cgi.parse_header(headers["Content-Type"])
if "charset" in params:
return params['charset']
m = email.message.Message()
m["content-type"] = headers["Content-Type"]
charset = m.get_param("charset")
if charset:
return str(charset)
return None
def _determine_base_url(document: HTMLElement, page_url: str) -> str:
"""Determine the HTML document's base URL.
This looks for a ``<base>`` tag in the HTML document. If present, its href
attribute denotes the base URL of anchor tags in the document. If there is
no such tag (or if it does not have a valid href attribute), the HTML
file's URL is used as the base URL.
:param document: An HTML document representation. The current
implementation expects the result of ``html5lib.parse()``.
:param page_url: The URL of the HTML document.
"""
for base in document.findall(".//base"):
href = base.get("href")
if href is not None:
return href
return page_url
def _clean_url_path_part(part: str) -> str:
"""
Clean a "part" of a URL path (i.e. after splitting on "@" characters).
"""
# We unquote prior to quoting to make sure nothing is double quoted.
return urllib.parse.quote(urllib.parse.unquote(part))
def _clean_file_url_path(part: str) -> str:
"""
Clean the first part of a URL path that corresponds to a local
filesystem path (i.e. the first part after splitting on "@" characters).
"""
# We unquote prior to quoting to make sure nothing is double quoted.
# Also, on Windows the path part might contain a drive letter which
# should not be quoted. On Linux where drive letters do not
# exist, the colon should be quoted. We rely on urllib.request
# to do the right thing here.
return urllib.request.pathname2url(urllib.request.url2pathname(part))
# percent-encoded: /
_reserved_chars_re = re.compile('(@|%2F)', re.IGNORECASE)
def _clean_url_path(path: str, is_local_path: bool) -> str:
"""
Clean the path portion of a URL.
"""
if is_local_path:
clean_func = _clean_file_url_path
else:
clean_func = _clean_url_path_part
# Split on the reserved characters prior to cleaning so that
# revision strings in VCS URLs are properly preserved.
parts = _reserved_chars_re.split(path)
cleaned_parts = []
for to_clean, reserved in pairwise(itertools.chain(parts, [''])):
cleaned_parts.append(clean_func(to_clean))
# Normalize %xx escapes (e.g. %2f -> %2F)
cleaned_parts.append(reserved.upper())
return ''.join(cleaned_parts)
def _clean_link(url: str) -> str:
"""
Make sure a link is fully quoted.
For example, if ' ' occurs in the URL, it will be replaced with "%20",
and without double-quoting other characters.
"""
# Split the URL into parts according to the general structure
# `scheme://netloc/path;parameters?query#fragment`.
result = urllib.parse.urlparse(url)
# If the netloc is empty, then the URL refers to a local filesystem path.
is_local_path = not result.netloc
path = _clean_url_path(result.path, is_local_path=is_local_path)
return urllib.parse.urlunparse(result._replace(path=path))
def _create_link_from_element(
anchor: HTMLElement,
page_url: str,
base_url: str,
) -> Optional[Link]:
"""
Convert an anchor element in a simple repository page to a Link.
"""
href = anchor.get("href")
if not href:
return None
url = _clean_link(urllib.parse.urljoin(base_url, href))
pyrequire = anchor.get('data-requires-python')
pyrequire = html.unescape(pyrequire) if pyrequire else None
yanked_reason = anchor.get('data-yanked')
if yanked_reason:
yanked_reason = html.unescape(yanked_reason)
link = Link(
url,
comes_from=page_url,
requires_python=pyrequire,
yanked_reason=yanked_reason,
)
return link
class CacheablePageContent:
def __init__(self, page: "HTMLPage") -> None:
def __init__(self, page: "IndexContent") -> None:
assert page.cache_link_parsing
self.page = page
def __eq__(self, other: object) -> bool:
return (isinstance(other, type(self)) and
self.page.url == other.page.url)
return isinstance(other, type(self)) and self.page.url == other.page.url
def __hash__(self) -> int:
return hash(self.page.url)
def with_cached_html_pages(
fn: Callable[["HTMLPage"], Iterable[Link]],
) -> Callable[["HTMLPage"], List[Link]]:
class ParseLinks(Protocol):
def __call__(self, page: "IndexContent") -> Iterable[Link]:
...
def with_cached_index_content(fn: ParseLinks) -> ParseLinks:
"""
Given a function that parses an Iterable[Link] from an HTMLPage, cache the
function's result (keyed by CacheablePageContent), unless the HTMLPage
Given a function that parses an Iterable[Link] from an IndexContent, cache the
function's result (keyed by CacheablePageContent), unless the IndexContent
`page` has `page.cache_link_parsing == False`.
"""
@@ -292,7 +217,7 @@ def with_cached_html_pages(
return list(fn(cacheable_page.page))
@functools.wraps(fn)
def wrapper_wrapper(page: "HTMLPage") -> List[Link]:
def wrapper_wrapper(page: "IndexContent") -> List[Link]:
if page.cache_link_parsing:
return wrapper(CacheablePageContent(page))
return list(fn(page))
@@ -300,36 +225,42 @@ def with_cached_html_pages(
return wrapper_wrapper
@with_cached_html_pages
def parse_links(page: "HTMLPage") -> Iterable[Link]:
@with_cached_index_content
def parse_links(page: "IndexContent") -> Iterable[Link]:
"""
Parse an HTML document, and yield its anchor elements as Link objects.
Parse a Simple API's Index Content, and yield its anchor elements as Link objects.
"""
document = html5lib.parse(
page.content,
transport_encoding=page.encoding,
namespaceHTMLElements=False,
)
content_type_l = page.content_type.lower()
if content_type_l.startswith("application/vnd.pypi.simple.v1+json"):
data = json.loads(page.content)
for file in data.get("files", []):
link = Link.from_json(file, page.url)
if link is None:
continue
yield link
return
parser = HTMLLinkParser(page.url)
encoding = page.encoding or "utf-8"
parser.feed(page.content.decode(encoding))
url = page.url
base_url = _determine_base_url(document, url)
for anchor in document.findall(".//a"):
link = _create_link_from_element(
anchor,
page_url=url,
base_url=base_url,
)
base_url = parser.base_url or url
for anchor in parser.anchors:
link = Link.from_element(anchor, page_url=url, base_url=base_url)
if link is None:
continue
yield link
class HTMLPage:
"""Represents one page, along with its URL"""
class IndexContent:
"""Represents one response (or page), along with its URL"""
def __init__(
self,
content: bytes,
content_type: str,
encoding: Optional[str],
url: str,
cache_link_parsing: bool = True,
@@ -342,6 +273,7 @@ class HTMLPage:
have this set to False, for example.
"""
self.content = content
self.content_type = content_type
self.encoding = encoding
self.url = url
self.cache_link_parsing = cache_link_parsing
@@ -350,80 +282,115 @@ class HTMLPage:
return redact_auth_from_url(self.url)
def _handle_get_page_fail(
class HTMLLinkParser(HTMLParser):
"""
HTMLParser that keeps the first base HREF and a list of all anchor
elements' attributes.
"""
def __init__(self, url: str) -> None:
super().__init__(convert_charrefs=True)
self.url: str = url
self.base_url: Optional[str] = None
self.anchors: List[Dict[str, Optional[str]]] = []
def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
if tag == "base" and self.base_url is None:
href = self.get_href(attrs)
if href is not None:
self.base_url = href
elif tag == "a":
self.anchors.append(dict(attrs))
def get_href(self, attrs: List[Tuple[str, Optional[str]]]) -> Optional[str]:
for name, value in attrs:
if name == "href":
return value
return None
def _handle_get_simple_fail(
link: Link,
reason: Union[str, Exception],
meth: Optional[Callable[..., None]] = None
meth: Optional[Callable[..., None]] = None,
) -> None:
if meth is None:
meth = logger.debug
meth("Could not fetch URL %s: %s - skipping", link, reason)
def _make_html_page(response: Response, cache_link_parsing: bool = True) -> HTMLPage:
def _make_index_content(
response: Response, cache_link_parsing: bool = True
) -> IndexContent:
encoding = _get_encoding_from_headers(response.headers)
return HTMLPage(
return IndexContent(
response.content,
response.headers["Content-Type"],
encoding=encoding,
url=response.url,
cache_link_parsing=cache_link_parsing)
cache_link_parsing=cache_link_parsing,
)
def _get_html_page(
link: Link, session: Optional[PipSession] = None
) -> Optional["HTMLPage"]:
if session is None:
raise TypeError(
"_get_html_page() missing 1 required keyword argument: 'session'"
)
url = link.url.split('#', 1)[0]
def _get_index_content(link: Link, *, session: PipSession) -> Optional["IndexContent"]:
url = link.url.split("#", 1)[0]
# Check for VCS schemes that do not support lookup as web pages.
vcs_scheme = _match_vcs_scheme(url)
if vcs_scheme:
logger.warning('Cannot look at %s URL %s because it does not support '
'lookup as web pages.', vcs_scheme, link)
logger.warning(
"Cannot look at %s URL %s because it does not support lookup as web pages.",
vcs_scheme,
link,
)
return None
# Tack index.html onto file:// URLs that point to directories
scheme, _, path, _, _, _ = urllib.parse.urlparse(url)
if (scheme == 'file' and os.path.isdir(urllib.request.url2pathname(path))):
if scheme == "file" and os.path.isdir(urllib.request.url2pathname(path)):
# add trailing slash if not present so urljoin doesn't trim
# final segment
if not url.endswith('/'):
url += '/'
url = urllib.parse.urljoin(url, 'index.html')
logger.debug(' file: URL is directory, getting %s', url)
if not url.endswith("/"):
url += "/"
# TODO: In the future, it would be nice if pip supported PEP 691
# style responses in the file:// URLs, however there's no
# standard file extension for application/vnd.pypi.simple.v1+json
# so we'll need to come up with something on our own.
url = urllib.parse.urljoin(url, "index.html")
logger.debug(" file: URL is directory, getting %s", url)
try:
resp = _get_html_response(url, session=session)
resp = _get_simple_response(url, session=session)
except _NotHTTP:
logger.warning(
'Skipping page %s because it looks like an archive, and cannot '
'be checked by a HTTP HEAD request.', link,
"Skipping page %s because it looks like an archive, and cannot "
"be checked by a HTTP HEAD request.",
link,
)
except _NotHTML as exc:
except _NotAPIContent as exc:
logger.warning(
'Skipping page %s because the %s request got Content-Type: %s.'
'The only supported Content-Type is text/html',
link, exc.request_desc, exc.content_type,
"Skipping page %s because the %s request got Content-Type: %s. "
"The only supported Content-Types are application/vnd.pypi.simple.v1+json, "
"application/vnd.pypi.simple.v1+html, and text/html",
link,
exc.request_desc,
exc.content_type,
)
except NetworkConnectionError as exc:
_handle_get_page_fail(link, exc)
_handle_get_simple_fail(link, exc)
except RetryError as exc:
_handle_get_page_fail(link, exc)
_handle_get_simple_fail(link, exc)
except SSLError as exc:
reason = "There was a problem confirming the ssl certificate: "
reason += str(exc)
_handle_get_page_fail(link, reason, meth=logger.info)
_handle_get_simple_fail(link, reason, meth=logger.info)
except requests.ConnectionError as exc:
_handle_get_page_fail(link, f"connection error: {exc}")
_handle_get_simple_fail(link, f"connection error: {exc}")
except requests.Timeout:
_handle_get_page_fail(link, "timed out")
_handle_get_simple_fail(link, "timed out")
else:
return _make_html_page(resp,
cache_link_parsing=link.cache_link_parsing)
return _make_index_content(resp, cache_link_parsing=link.cache_link_parsing)
return None
@@ -451,9 +418,10 @@ class LinkCollector:
@classmethod
def create(
cls, session: PipSession,
cls,
session: PipSession,
options: Values,
suppress_no_index: bool = False
suppress_no_index: bool = False,
) -> "LinkCollector":
"""
:param session: The Session to use to make requests.
@@ -463,8 +431,8 @@ class LinkCollector:
index_urls = [options.index_url] + options.extra_index_urls
if options.no_index and not suppress_no_index:
logger.debug(
'Ignoring indexes: %s',
','.join(redact_auth_from_url(url) for url in index_urls),
"Ignoring indexes: %s",
",".join(redact_auth_from_url(url) for url in index_urls),
)
index_urls = []
@@ -472,10 +440,13 @@ class LinkCollector:
find_links = options.find_links or []
search_scope = SearchScope.create(
find_links=find_links, index_urls=index_urls,
find_links=find_links,
index_urls=index_urls,
no_index=options.no_index,
)
link_collector = LinkCollector(
session=session, search_scope=search_scope,
session=session,
search_scope=search_scope,
)
return link_collector
@@ -483,11 +454,11 @@ class LinkCollector:
def find_links(self) -> List[str]:
return self.search_scope.find_links
def fetch_page(self, location: Link) -> Optional[HTMLPage]:
def fetch_response(self, location: Link) -> Optional[IndexContent]:
"""
Fetch an HTML page containing package links.
"""
return _get_html_page(location, session=self.session)
return _get_index_content(location, session=self.session)
def collect_sources(
self,

View File

@@ -1,13 +1,11 @@
"""Routines related to PyPI, indexes"""
# The following comment should be removed at some point in the future.
# mypy: strict-optional=False
import enum
import functools
import itertools
import logging
import re
from typing import FrozenSet, Iterable, List, Optional, Set, Tuple, Union
from typing import TYPE_CHECKING, FrozenSet, Iterable, List, Optional, Set, Tuple, Union
from pip._vendor.packaging import specifiers
from pip._vendor.packaging.tags import Tag
@@ -37,17 +35,17 @@ from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import build_netloc
from pip._internal.utils.packaging import check_requires_python
from pip._internal.utils.unpacking import SUPPORTED_EXTENSIONS
from pip._internal.utils.urls import url_to_path
__all__ = ['FormatControl', 'BestCandidateResult', 'PackageFinder']
if TYPE_CHECKING:
from pip._vendor.typing_extensions import TypeGuard
__all__ = ["FormatControl", "BestCandidateResult", "PackageFinder"]
logger = getLogger(__name__)
BuildTag = Union[Tuple[()], Tuple[int, str]]
CandidateSortingKey = (
Tuple[int, int, int, _BaseVersion, Optional[int], BuildTag]
)
CandidateSortingKey = Tuple[int, int, int, _BaseVersion, Optional[int], BuildTag]
def _check_link_requires_python(
@@ -66,39 +64,54 @@ def _check_link_requires_python(
"""
try:
is_compatible = check_requires_python(
link.requires_python, version_info=version_info,
link.requires_python,
version_info=version_info,
)
except specifiers.InvalidSpecifier:
logger.debug(
"Ignoring invalid Requires-Python (%r) for link: %s",
link.requires_python, link,
link.requires_python,
link,
)
else:
if not is_compatible:
version = '.'.join(map(str, version_info))
version = ".".join(map(str, version_info))
if not ignore_requires_python:
logger.verbose(
'Link requires a different Python (%s not in: %r): %s',
version, link.requires_python, link,
"Link requires a different Python (%s not in: %r): %s",
version,
link.requires_python,
link,
)
return False
logger.debug(
'Ignoring failed Requires-Python check (%s not in: %r) '
'for link: %s',
version, link.requires_python, link,
"Ignoring failed Requires-Python check (%s not in: %r) for link: %s",
version,
link.requires_python,
link,
)
return True
class LinkType(enum.Enum):
candidate = enum.auto()
different_project = enum.auto()
yanked = enum.auto()
format_unsupported = enum.auto()
format_invalid = enum.auto()
platform_mismatch = enum.auto()
requires_python_mismatch = enum.auto()
class LinkEvaluator:
"""
Responsible for evaluating links for a particular project.
"""
_py_version_re = re.compile(r'-py([123]\.?[0-9]?)$')
_py_version_re = re.compile(r"-py([123]\.?[0-9]?)$")
# Don't include an allow_yanked default value to make sure each call
# site considers whether yanked releases are allowed. This also causes
@@ -141,19 +154,20 @@ class LinkEvaluator:
self.project_name = project_name
def evaluate_link(self, link: Link) -> Tuple[bool, Optional[str]]:
def evaluate_link(self, link: Link) -> Tuple[LinkType, str]:
"""
Determine whether a link is a candidate for installation.
:return: A tuple (is_candidate, result), where `result` is (1) a
version string if `is_candidate` is True, and (2) if
`is_candidate` is False, an optional string to log the reason
the link fails to qualify.
:return: A tuple (result, detail), where *result* is an enum
representing whether the evaluation found a candidate, or the reason
why one is not found. If a candidate is found, *detail* will be the
candidate's version string; if one is not found, it contains the
reason the link fails to qualify.
"""
version = None
if link.is_yanked and not self._allow_yanked:
reason = link.yanked_reason or '<none given>'
return (False, f'yanked for reason: {reason}')
reason = link.yanked_reason or "<none given>"
return (LinkType.yanked, f"yanked for reason: {reason}")
if link.egg_fragment:
egg_info = link.egg_fragment
@@ -161,77 +175,83 @@ class LinkEvaluator:
else:
egg_info, ext = link.splitext()
if not ext:
return (False, 'not a file')
return (LinkType.format_unsupported, "not a file")
if ext not in SUPPORTED_EXTENSIONS:
return (False, f'unsupported archive format: {ext}')
return (
LinkType.format_unsupported,
f"unsupported archive format: {ext}",
)
if "binary" not in self._formats and ext == WHEEL_EXTENSION:
reason = 'No binaries permitted for {}'.format(
self.project_name)
return (False, reason)
if "macosx10" in link.path and ext == '.zip':
return (False, 'macosx10 one')
reason = f"No binaries permitted for {self.project_name}"
return (LinkType.format_unsupported, reason)
if "macosx10" in link.path and ext == ".zip":
return (LinkType.format_unsupported, "macosx10 one")
if ext == WHEEL_EXTENSION:
try:
wheel = Wheel(link.filename)
except InvalidWheelFilename:
return (False, 'invalid wheel filename')
return (
LinkType.format_invalid,
"invalid wheel filename",
)
if canonicalize_name(wheel.name) != self._canonical_name:
reason = 'wrong project name (not {})'.format(
self.project_name)
return (False, reason)
reason = f"wrong project name (not {self.project_name})"
return (LinkType.different_project, reason)
supported_tags = self._target_python.get_tags()
if not wheel.supported(supported_tags):
# Include the wheel's tags in the reason string to
# simplify troubleshooting compatibility issues.
file_tags = wheel.get_formatted_file_tags()
file_tags = ", ".join(wheel.get_formatted_file_tags())
reason = (
"none of the wheel's tags ({}) are compatible "
"(run pip debug --verbose to show compatible tags)".format(
', '.join(file_tags)
)
f"none of the wheel's tags ({file_tags}) are compatible "
f"(run pip debug --verbose to show compatible tags)"
)
return (False, reason)
return (LinkType.platform_mismatch, reason)
version = wheel.version
# This should be up by the self.ok_binary check, but see issue 2700.
if "source" not in self._formats and ext != WHEEL_EXTENSION:
reason = f'No sources permitted for {self.project_name}'
return (False, reason)
reason = f"No sources permitted for {self.project_name}"
return (LinkType.format_unsupported, reason)
if not version:
version = _extract_version_from_fragment(
egg_info, self._canonical_name,
egg_info,
self._canonical_name,
)
if not version:
reason = f'Missing project version for {self.project_name}'
return (False, reason)
reason = f"Missing project version for {self.project_name}"
return (LinkType.format_invalid, reason)
match = self._py_version_re.search(version)
if match:
version = version[:match.start()]
version = version[: match.start()]
py_version = match.group(1)
if py_version != self._target_python.py_version:
return (False, 'Python version is incorrect')
return (
LinkType.platform_mismatch,
"Python version is incorrect",
)
supports_python = _check_link_requires_python(
link, version_info=self._target_python.py_version_info,
link,
version_info=self._target_python.py_version_info,
ignore_requires_python=self._ignore_requires_python,
)
if not supports_python:
# Return None for the reason text to suppress calling
# _log_skipped_link().
return (False, None)
reason = f"{version} Requires-Python {link.requires_python}"
return (LinkType.requires_python_mismatch, reason)
logger.debug('Found link %s, version: %s', link, version)
logger.debug("Found link %s, version: %s", link, version)
return (True, version)
return (LinkType.candidate, version)
def filter_unallowed_hashes(
candidates: List[InstallationCandidate],
hashes: Hashes,
hashes: Optional[Hashes],
project_name: str,
) -> List[InstallationCandidate]:
"""
@@ -251,8 +271,8 @@ def filter_unallowed_hashes(
"""
if not hashes:
logger.debug(
'Given no hashes to check %s links for project %r: '
'discarding no candidates',
"Given no hashes to check %s links for project %r: "
"discarding no candidates",
len(candidates),
project_name,
)
@@ -282,22 +302,22 @@ def filter_unallowed_hashes(
filtered = list(candidates)
if len(filtered) == len(candidates):
discard_message = 'discarding no candidates'
discard_message = "discarding no candidates"
else:
discard_message = 'discarding {} non-matches:\n {}'.format(
discard_message = "discarding {} non-matches:\n {}".format(
len(non_matches),
'\n '.join(str(candidate.link) for candidate in non_matches)
"\n ".join(str(candidate.link) for candidate in non_matches),
)
logger.debug(
'Checked %s links for project %r against %s hashes '
'(%s matches, %s no digest): %s',
"Checked %s links for project %r against %s hashes "
"(%s matches, %s no digest): %s",
len(candidates),
project_name,
hashes.digest_count,
match_count,
len(matches_or_no_digest) - match_count,
discard_message
discard_message,
)
return filtered
@@ -354,13 +374,11 @@ class BestCandidateResult:
self.best_candidate = best_candidate
def iter_all(self) -> Iterable[InstallationCandidate]:
"""Iterate through all candidates.
"""
"""Iterate through all candidates."""
return iter(self._candidates)
def iter_applicable(self) -> Iterable[InstallationCandidate]:
"""Iterate through the applicable candidates.
"""
"""Iterate through the applicable candidates."""
return iter(self._applicable_candidates)
@@ -444,7 +462,8 @@ class CandidateEvaluator:
allow_prereleases = self._allow_all_prereleases or None
specifier = self._specifier
versions = {
str(v) for v in specifier.filter(
str(v)
for v in specifier.filter(
# We turn the version object into a str here because otherwise
# when we're debundled but setuptools isn't, Python will see
# packaging.version.Version and
@@ -458,9 +477,7 @@ class CandidateEvaluator:
}
# Again, converting version to str to deal with debundling.
applicable_candidates = [
c for c in candidates if str(c.version) in versions
]
applicable_candidates = [c for c in candidates if str(c.version) in versions]
filtered_applicable_candidates = filter_unallowed_hashes(
candidates=applicable_candidates,
@@ -509,9 +526,11 @@ class CandidateEvaluator:
# can raise InvalidWheelFilename
wheel = Wheel(link.filename)
try:
pri = -(wheel.find_most_preferred_tag(
valid_tags, self._wheel_tag_preferences
))
pri = -(
wheel.find_most_preferred_tag(
valid_tags, self._wheel_tag_preferences
)
)
except ValueError:
raise UnsupportedWheel(
"{} is not a supported wheel for this platform. It "
@@ -520,7 +539,8 @@ class CandidateEvaluator:
if self._prefer_binary:
binary_preference = 1
if wheel.build_tag is not None:
match = re.match(r'^(\d+)(.*)$', wheel.build_tag)
match = re.match(r"^(\d+)(.*)$", wheel.build_tag)
assert match is not None, "guaranteed by filename validation"
build_tag_groups = match.groups()
build_tag = (int(build_tag_groups[0]), build_tag_groups[1])
else: # sdist
@@ -528,8 +548,12 @@ class CandidateEvaluator:
has_allowed_hash = int(link.is_hash_allowed(self._hashes))
yank_value = -1 * int(link.is_yanked) # -1 for yanked.
return (
has_allowed_hash, yank_value, binary_preference, candidate.version,
pri, build_tag,
has_allowed_hash,
yank_value,
binary_preference,
candidate.version,
pri,
build_tag,
)
def sort_best_candidate(
@@ -603,7 +627,7 @@ class PackageFinder:
self.format_control = format_control
# These are boring links that have already been logged somehow.
self._logged_links: Set[Link] = set()
self._logged_links: Set[Tuple[Link, LinkType, str]] = set()
# Don't include an allow_yanked default value to make sure each call
# site considers whether yanked releases are allowed. This also causes
@@ -680,6 +704,14 @@ class PackageFinder:
def set_prefer_binary(self) -> None:
self._candidate_prefs.prefer_binary = True
def requires_python_skipped_reasons(self) -> List[str]:
reasons = {
detail
for _, result, detail in self._logged_links
if result == LinkType.requires_python_mismatch
}
return sorted(reasons)
def make_link_evaluator(self, project_name: str) -> LinkEvaluator:
canonical_name = canonicalize_name(project_name)
formats = self.format_control.get_allowed_formats(canonical_name)
@@ -709,12 +741,13 @@ class PackageFinder:
no_eggs.append(link)
return no_eggs + eggs
def _log_skipped_link(self, link: Link, reason: str) -> None:
if link not in self._logged_links:
def _log_skipped_link(self, link: Link, result: LinkType, detail: str) -> None:
entry = (link, result, detail)
if entry not in self._logged_links:
# Put the link at the end so the reason is more visible and because
# the link string is usually very long.
logger.debug('Skipping link: %s: %s', reason, link)
self._logged_links.add(link)
logger.debug("Skipping link: %s: %s", detail, link)
self._logged_links.add(entry)
def get_install_candidate(
self, link_evaluator: LinkEvaluator, link: Link
@@ -723,16 +756,15 @@ class PackageFinder:
If the link is a candidate for install, convert it to an
InstallationCandidate and return it. Otherwise, return None.
"""
is_candidate, result = link_evaluator.evaluate_link(link)
if not is_candidate:
if result:
self._log_skipped_link(link, reason=result)
result, detail = link_evaluator.evaluate_link(link)
if result != LinkType.candidate:
self._log_skipped_link(link, result, detail)
return None
return InstallationCandidate(
name=link_evaluator.project_name,
link=link,
version=result,
version=detail,
)
def evaluate_links(
@@ -753,13 +785,14 @@ class PackageFinder:
self, project_url: Link, link_evaluator: LinkEvaluator
) -> List[InstallationCandidate]:
logger.debug(
'Fetching project page and analyzing links: %s', project_url,
"Fetching project page and analyzing links: %s",
project_url,
)
html_page = self._link_collector.fetch_page(project_url)
if html_page is None:
index_response = self._link_collector.fetch_response(project_url)
if index_response is None:
return []
page_links = list(parse_links(html_page))
page_links = list(parse_links(index_response))
with indent_log():
package_links = self.evaluate_links(
@@ -809,7 +842,14 @@ class PackageFinder:
)
if logger.isEnabledFor(logging.DEBUG) and file_candidates:
paths = [url_to_path(c.link.url) for c in file_candidates]
paths = []
for candidate in file_candidates:
assert candidate.link.url # we need to have a URL
try:
paths.append(candidate.link.file_path)
except Exception:
paths.append(candidate.link.url) # it's not a local file
logger.debug("Local files found: %s", ", ".join(paths))
# This is an intentional priority ordering
@@ -821,8 +861,7 @@ class PackageFinder:
specifier: Optional[specifiers.BaseSpecifier] = None,
hashes: Optional[Hashes] = None,
) -> CandidateEvaluator:
"""Create a CandidateEvaluator object to use.
"""
"""Create a CandidateEvaluator object to use."""
candidate_prefs = self._candidate_prefs
return CandidateEvaluator.create(
project_name=project_name,
@@ -867,75 +906,83 @@ class PackageFinder:
"""
hashes = req.hashes(trust_internet=False)
best_candidate_result = self.find_best_candidate(
req.name, specifier=req.specifier, hashes=hashes,
req.name,
specifier=req.specifier,
hashes=hashes,
)
best_candidate = best_candidate_result.best_candidate
installed_version: Optional[_BaseVersion] = None
if req.satisfied_by is not None:
installed_version = parse_version(req.satisfied_by.version)
installed_version = req.satisfied_by.version
def _format_versions(cand_iter: Iterable[InstallationCandidate]) -> str:
# This repeated parse_version and str() conversion is needed to
# handle different vendoring sources from pip and pkg_resources.
# If we stop using the pkg_resources provided specifier and start
# using our own, we can drop the cast to str().
return ", ".join(sorted(
{str(c.version) for c in cand_iter},
key=parse_version,
)) or "none"
return (
", ".join(
sorted(
{str(c.version) for c in cand_iter},
key=parse_version,
)
)
or "none"
)
if installed_version is None and best_candidate is None:
logger.critical(
'Could not find a version that satisfies the requirement %s '
'(from versions: %s)',
"Could not find a version that satisfies the requirement %s "
"(from versions: %s)",
req,
_format_versions(best_candidate_result.iter_all()),
)
raise DistributionNotFound(
'No matching distribution found for {}'.format(
req)
"No matching distribution found for {}".format(req)
)
best_installed = False
if installed_version and (
best_candidate is None or
best_candidate.version <= installed_version):
best_installed = True
def _should_install_candidate(
candidate: Optional[InstallationCandidate],
) -> "TypeGuard[InstallationCandidate]":
if installed_version is None:
return True
if best_candidate is None:
return False
return best_candidate.version > installed_version
if not upgrade and installed_version is not None:
if best_installed:
if _should_install_candidate(best_candidate):
logger.debug(
'Existing installed version (%s) is most up-to-date and '
'satisfies requirement',
installed_version,
)
else:
logger.debug(
'Existing installed version (%s) satisfies requirement '
'(most up-to-date version is %s)',
"Existing installed version (%s) satisfies requirement "
"(most up-to-date version is %s)",
installed_version,
best_candidate.version,
)
else:
logger.debug(
"Existing installed version (%s) is most up-to-date and "
"satisfies requirement",
installed_version,
)
return None
if best_installed:
# We have an existing version, and its the best version
if _should_install_candidate(best_candidate):
logger.debug(
'Installed version (%s) is most up-to-date (past versions: '
'%s)',
installed_version,
"Using version %s (newest of versions: %s)",
best_candidate.version,
_format_versions(best_candidate_result.iter_applicable()),
)
raise BestVersionAlreadyInstalled
return best_candidate
# We have an existing version, and its the best version
logger.debug(
'Using version %s (newest of versions: %s)',
best_candidate.version,
"Installed version (%s) is most up-to-date (past versions: %s)",
installed_version,
_format_versions(best_candidate_result.iter_applicable()),
)
return best_candidate
raise BestVersionAlreadyInstalled
def _find_name_version_sep(fragment: str, canonical_name: str) -> int:

View File

@@ -171,7 +171,6 @@ def build_source(
expand_dir: bool,
cache_link_parsing: bool,
) -> Tuple[Optional[str], Optional[LinkSource]]:
path: Optional[str] = None
url: Optional[str] = None
if os.path.exists(location): # Is a local path.