Add python venv
This commit is contained in:
@ -0,0 +1,4 @@
|
||||
"""Subpackage containing all of pip's command line interface related code
|
||||
"""
|
||||
|
||||
# This file intentionally does not import submodules
|
@ -0,0 +1,163 @@
|
||||
"""Logic that powers autocompletion installed by ``pip completion``.
|
||||
"""
|
||||
|
||||
import optparse
|
||||
import os
|
||||
import sys
|
||||
from itertools import chain
|
||||
from typing import Any, Iterable, List, Optional
|
||||
|
||||
from pip._internal.cli.main_parser import create_main_parser
|
||||
from pip._internal.commands import commands_dict, create_command
|
||||
from pip._internal.metadata import get_default_environment
|
||||
|
||||
|
||||
def autocomplete() -> None:
|
||||
"""Entry Point for completion of main and subcommand options."""
|
||||
# Don't complete if user hasn't sourced bash_completion file.
|
||||
if "PIP_AUTO_COMPLETE" not in os.environ:
|
||||
return
|
||||
cwords = os.environ["COMP_WORDS"].split()[1:]
|
||||
cword = int(os.environ["COMP_CWORD"])
|
||||
try:
|
||||
current = cwords[cword - 1]
|
||||
except IndexError:
|
||||
current = ""
|
||||
|
||||
parser = create_main_parser()
|
||||
subcommands = list(commands_dict)
|
||||
options = []
|
||||
|
||||
# subcommand
|
||||
subcommand_name: Optional[str] = None
|
||||
for word in cwords:
|
||||
if word in subcommands:
|
||||
subcommand_name = word
|
||||
break
|
||||
# subcommand options
|
||||
if subcommand_name is not None:
|
||||
# special case: 'help' subcommand has no options
|
||||
if subcommand_name == "help":
|
||||
sys.exit(1)
|
||||
# special case: list locally installed dists for show and uninstall
|
||||
should_list_installed = not current.startswith("-") and subcommand_name in [
|
||||
"show",
|
||||
"uninstall",
|
||||
]
|
||||
if should_list_installed:
|
||||
env = get_default_environment()
|
||||
lc = current.lower()
|
||||
installed = [
|
||||
dist.canonical_name
|
||||
for dist in env.iter_installed_distributions(local_only=True)
|
||||
if dist.canonical_name.startswith(lc)
|
||||
and dist.canonical_name not in cwords[1:]
|
||||
]
|
||||
# if there are no dists installed, fall back to option completion
|
||||
if installed:
|
||||
for dist in installed:
|
||||
print(dist)
|
||||
sys.exit(1)
|
||||
|
||||
subcommand = create_command(subcommand_name)
|
||||
|
||||
for opt in subcommand.parser.option_list_all:
|
||||
if opt.help != optparse.SUPPRESS_HELP:
|
||||
for opt_str in opt._long_opts + opt._short_opts:
|
||||
options.append((opt_str, opt.nargs))
|
||||
|
||||
# filter out previously specified options from available options
|
||||
prev_opts = [x.split("=")[0] for x in cwords[1 : cword - 1]]
|
||||
options = [(x, v) for (x, v) in options if x not in prev_opts]
|
||||
# filter options by current input
|
||||
options = [(k, v) for k, v in options if k.startswith(current)]
|
||||
# get completion type given cwords and available subcommand options
|
||||
completion_type = get_path_completion_type(
|
||||
cwords,
|
||||
cword,
|
||||
subcommand.parser.option_list_all,
|
||||
)
|
||||
# get completion files and directories if ``completion_type`` is
|
||||
# ``<file>``, ``<dir>`` or ``<path>``
|
||||
if completion_type:
|
||||
paths = auto_complete_paths(current, completion_type)
|
||||
options = [(path, 0) for path in paths]
|
||||
for option in options:
|
||||
opt_label = option[0]
|
||||
# append '=' to options which require args
|
||||
if option[1] and option[0][:2] == "--":
|
||||
opt_label += "="
|
||||
print(opt_label)
|
||||
else:
|
||||
# show main parser options only when necessary
|
||||
|
||||
opts = [i.option_list for i in parser.option_groups]
|
||||
opts.append(parser.option_list)
|
||||
flattened_opts = chain.from_iterable(opts)
|
||||
if current.startswith("-"):
|
||||
for opt in flattened_opts:
|
||||
if opt.help != optparse.SUPPRESS_HELP:
|
||||
subcommands += opt._long_opts + opt._short_opts
|
||||
else:
|
||||
# get completion type given cwords and all available options
|
||||
completion_type = get_path_completion_type(cwords, cword, flattened_opts)
|
||||
if completion_type:
|
||||
subcommands = list(auto_complete_paths(current, completion_type))
|
||||
|
||||
print(" ".join([x for x in subcommands if x.startswith(current)]))
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def get_path_completion_type(
|
||||
cwords: List[str], cword: int, opts: Iterable[Any]
|
||||
) -> Optional[str]:
|
||||
"""Get the type of path completion (``file``, ``dir``, ``path`` or None)
|
||||
|
||||
:param cwords: same as the environmental variable ``COMP_WORDS``
|
||||
:param cword: same as the environmental variable ``COMP_CWORD``
|
||||
:param opts: The available options to check
|
||||
:return: path completion type (``file``, ``dir``, ``path`` or None)
|
||||
"""
|
||||
if cword < 2 or not cwords[cword - 2].startswith("-"):
|
||||
return None
|
||||
for opt in opts:
|
||||
if opt.help == optparse.SUPPRESS_HELP:
|
||||
continue
|
||||
for o in str(opt).split("/"):
|
||||
if cwords[cword - 2].split("=")[0] == o:
|
||||
if not opt.metavar or any(
|
||||
x in ("path", "file", "dir") for x in opt.metavar.split("/")
|
||||
):
|
||||
return opt.metavar
|
||||
return None
|
||||
|
||||
|
||||
def auto_complete_paths(current: str, completion_type: str) -> Iterable[str]:
|
||||
"""If ``completion_type`` is ``file`` or ``path``, list all regular files
|
||||
and directories starting with ``current``; otherwise only list directories
|
||||
starting with ``current``.
|
||||
|
||||
:param current: The word to be completed
|
||||
:param completion_type: path completion type(`file`, `path` or `dir`)i
|
||||
:return: A generator of regular files and/or directories
|
||||
"""
|
||||
directory, filename = os.path.split(current)
|
||||
current_path = os.path.abspath(directory)
|
||||
# Don't complete paths if they can't be accessed
|
||||
if not os.access(current_path, os.R_OK):
|
||||
return
|
||||
filename = os.path.normcase(filename)
|
||||
# list all files that start with ``filename``
|
||||
file_list = (
|
||||
x for x in os.listdir(current_path) if os.path.normcase(x).startswith(filename)
|
||||
)
|
||||
for f in file_list:
|
||||
opt = os.path.join(current_path, f)
|
||||
comp_file = os.path.normcase(os.path.join(directory, f))
|
||||
# complete regular files when there is not ``<dir>`` after option
|
||||
# complete directories when there is ``<file>``, ``<path>`` or
|
||||
# ``<dir>``after option
|
||||
if completion_type != "dir" and os.path.isfile(opt):
|
||||
yield comp_file
|
||||
elif os.path.isdir(opt):
|
||||
yield os.path.join(comp_file, "")
|
@ -0,0 +1,214 @@
|
||||
"""Base Command class, and related routines"""
|
||||
|
||||
import functools
|
||||
import logging
|
||||
import logging.config
|
||||
import optparse
|
||||
import os
|
||||
import sys
|
||||
import traceback
|
||||
from optparse import Values
|
||||
from typing import Any, Callable, List, Optional, Tuple
|
||||
|
||||
from pip._internal.cli import cmdoptions
|
||||
from pip._internal.cli.command_context import CommandContextMixIn
|
||||
from pip._internal.cli.parser import ConfigOptionParser, UpdatingDefaultsHelpFormatter
|
||||
from pip._internal.cli.status_codes import (
|
||||
ERROR,
|
||||
PREVIOUS_BUILD_DIR_ERROR,
|
||||
UNKNOWN_ERROR,
|
||||
VIRTUALENV_NOT_FOUND,
|
||||
)
|
||||
from pip._internal.exceptions import (
|
||||
BadCommand,
|
||||
CommandError,
|
||||
InstallationError,
|
||||
NetworkConnectionError,
|
||||
PreviousBuildDirError,
|
||||
UninstallationError,
|
||||
)
|
||||
from pip._internal.utils.filesystem import check_path_owner
|
||||
from pip._internal.utils.logging import BrokenStdoutLoggingError, setup_logging
|
||||
from pip._internal.utils.misc import get_prog, normalize_path
|
||||
from pip._internal.utils.temp_dir import TempDirectoryTypeRegistry as TempDirRegistry
|
||||
from pip._internal.utils.temp_dir import global_tempdir_manager, tempdir_registry
|
||||
from pip._internal.utils.virtualenv import running_under_virtualenv
|
||||
|
||||
__all__ = ["Command"]
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Command(CommandContextMixIn):
|
||||
usage: str = ""
|
||||
ignore_require_venv: bool = False
|
||||
|
||||
def __init__(self, name: str, summary: str, isolated: bool = False) -> None:
|
||||
super().__init__()
|
||||
|
||||
self.name = name
|
||||
self.summary = summary
|
||||
self.parser = ConfigOptionParser(
|
||||
usage=self.usage,
|
||||
prog=f"{get_prog()} {name}",
|
||||
formatter=UpdatingDefaultsHelpFormatter(),
|
||||
add_help_option=False,
|
||||
name=name,
|
||||
description=self.__doc__,
|
||||
isolated=isolated,
|
||||
)
|
||||
|
||||
self.tempdir_registry: Optional[TempDirRegistry] = None
|
||||
|
||||
# Commands should add options to this option group
|
||||
optgroup_name = f"{self.name.capitalize()} Options"
|
||||
self.cmd_opts = optparse.OptionGroup(self.parser, optgroup_name)
|
||||
|
||||
# Add the general options
|
||||
gen_opts = cmdoptions.make_option_group(
|
||||
cmdoptions.general_group,
|
||||
self.parser,
|
||||
)
|
||||
self.parser.add_option_group(gen_opts)
|
||||
|
||||
self.add_options()
|
||||
|
||||
def add_options(self) -> None:
|
||||
pass
|
||||
|
||||
def handle_pip_version_check(self, options: Values) -> None:
|
||||
"""
|
||||
This is a no-op so that commands by default do not do the pip version
|
||||
check.
|
||||
"""
|
||||
# Make sure we do the pip version check if the index_group options
|
||||
# are present.
|
||||
assert not hasattr(options, "no_index")
|
||||
|
||||
def run(self, options: Values, args: List[str]) -> int:
|
||||
raise NotImplementedError
|
||||
|
||||
def parse_args(self, args: List[str]) -> Tuple[Values, List[str]]:
|
||||
# factored out for testability
|
||||
return self.parser.parse_args(args)
|
||||
|
||||
def main(self, args: List[str]) -> int:
|
||||
try:
|
||||
with self.main_context():
|
||||
return self._main(args)
|
||||
finally:
|
||||
logging.shutdown()
|
||||
|
||||
def _main(self, args: List[str]) -> int:
|
||||
# We must initialize this before the tempdir manager, otherwise the
|
||||
# configuration would not be accessible by the time we clean up the
|
||||
# tempdir manager.
|
||||
self.tempdir_registry = self.enter_context(tempdir_registry())
|
||||
# Intentionally set as early as possible so globally-managed temporary
|
||||
# directories are available to the rest of the code.
|
||||
self.enter_context(global_tempdir_manager())
|
||||
|
||||
options, args = self.parse_args(args)
|
||||
|
||||
# Set verbosity so that it can be used elsewhere.
|
||||
self.verbosity = options.verbose - options.quiet
|
||||
|
||||
level_number = setup_logging(
|
||||
verbosity=self.verbosity,
|
||||
no_color=options.no_color,
|
||||
user_log_file=options.log,
|
||||
)
|
||||
|
||||
# TODO: Try to get these passing down from the command?
|
||||
# without resorting to os.environ to hold these.
|
||||
# This also affects isolated builds and it should.
|
||||
|
||||
if options.no_input:
|
||||
os.environ["PIP_NO_INPUT"] = "1"
|
||||
|
||||
if options.exists_action:
|
||||
os.environ["PIP_EXISTS_ACTION"] = " ".join(options.exists_action)
|
||||
|
||||
if options.require_venv and not self.ignore_require_venv:
|
||||
# If a venv is required check if it can really be found
|
||||
if not running_under_virtualenv():
|
||||
logger.critical("Could not find an activated virtualenv (required).")
|
||||
sys.exit(VIRTUALENV_NOT_FOUND)
|
||||
|
||||
if options.cache_dir:
|
||||
options.cache_dir = normalize_path(options.cache_dir)
|
||||
if not check_path_owner(options.cache_dir):
|
||||
logger.warning(
|
||||
"The directory '%s' or its parent directory is not owned "
|
||||
"or is not writable by the current user. The cache "
|
||||
"has been disabled. Check the permissions and owner of "
|
||||
"that directory. If executing pip with sudo, you should "
|
||||
"use sudo's -H flag.",
|
||||
options.cache_dir,
|
||||
)
|
||||
options.cache_dir = None
|
||||
|
||||
if "2020-resolver" in options.features_enabled:
|
||||
logger.warning(
|
||||
"--use-feature=2020-resolver no longer has any effect, "
|
||||
"since it is now the default dependency resolver in pip. "
|
||||
"This will become an error in pip 21.0."
|
||||
)
|
||||
|
||||
def intercepts_unhandled_exc(
|
||||
run_func: Callable[..., int]
|
||||
) -> Callable[..., int]:
|
||||
@functools.wraps(run_func)
|
||||
def exc_logging_wrapper(*args: Any) -> int:
|
||||
try:
|
||||
status = run_func(*args)
|
||||
assert isinstance(status, int)
|
||||
return status
|
||||
except PreviousBuildDirError as exc:
|
||||
logger.critical(str(exc))
|
||||
logger.debug("Exception information:", exc_info=True)
|
||||
|
||||
return PREVIOUS_BUILD_DIR_ERROR
|
||||
except (
|
||||
InstallationError,
|
||||
UninstallationError,
|
||||
BadCommand,
|
||||
NetworkConnectionError,
|
||||
) as exc:
|
||||
logger.critical(str(exc))
|
||||
logger.debug("Exception information:", exc_info=True)
|
||||
|
||||
return ERROR
|
||||
except CommandError as exc:
|
||||
logger.critical("%s", exc)
|
||||
logger.debug("Exception information:", exc_info=True)
|
||||
|
||||
return ERROR
|
||||
except BrokenStdoutLoggingError:
|
||||
# Bypass our logger and write any remaining messages to
|
||||
# stderr because stdout no longer works.
|
||||
print("ERROR: Pipe to stdout was broken", file=sys.stderr)
|
||||
if level_number <= logging.DEBUG:
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
|
||||
return ERROR
|
||||
except KeyboardInterrupt:
|
||||
logger.critical("Operation cancelled by user")
|
||||
logger.debug("Exception information:", exc_info=True)
|
||||
|
||||
return ERROR
|
||||
except BaseException:
|
||||
logger.critical("Exception:", exc_info=True)
|
||||
|
||||
return UNKNOWN_ERROR
|
||||
|
||||
return exc_logging_wrapper
|
||||
|
||||
try:
|
||||
if not options.debug_mode:
|
||||
run = intercepts_unhandled_exc(self.run)
|
||||
else:
|
||||
run = self.run
|
||||
return run(options, args)
|
||||
finally:
|
||||
self.handle_pip_version_check(options)
|
1010
utils/python-venv/Lib/site-packages/pip/_internal/cli/cmdoptions.py
Normal file
1010
utils/python-venv/Lib/site-packages/pip/_internal/cli/cmdoptions.py
Normal file
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,27 @@
|
||||
from contextlib import ExitStack, contextmanager
|
||||
from typing import ContextManager, Iterator, TypeVar
|
||||
|
||||
_T = TypeVar("_T", covariant=True)
|
||||
|
||||
|
||||
class CommandContextMixIn:
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._in_main_context = False
|
||||
self._main_context = ExitStack()
|
||||
|
||||
@contextmanager
|
||||
def main_context(self) -> Iterator[None]:
|
||||
assert not self._in_main_context
|
||||
|
||||
self._in_main_context = True
|
||||
try:
|
||||
with self._main_context:
|
||||
yield
|
||||
finally:
|
||||
self._in_main_context = False
|
||||
|
||||
def enter_context(self, context_provider: ContextManager[_T]) -> _T:
|
||||
assert self._in_main_context
|
||||
|
||||
return self._main_context.enter_context(context_provider)
|
@ -0,0 +1,70 @@
|
||||
"""Primary application entrypoint.
|
||||
"""
|
||||
import locale
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from typing import List, Optional
|
||||
|
||||
from pip._internal.cli.autocompletion import autocomplete
|
||||
from pip._internal.cli.main_parser import parse_command
|
||||
from pip._internal.commands import create_command
|
||||
from pip._internal.exceptions import PipError
|
||||
from pip._internal.utils import deprecation
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Do not import and use main() directly! Using it directly is actively
|
||||
# discouraged by pip's maintainers. The name, location and behavior of
|
||||
# this function is subject to change, so calling it directly is not
|
||||
# portable across different pip versions.
|
||||
|
||||
# In addition, running pip in-process is unsupported and unsafe. This is
|
||||
# elaborated in detail at
|
||||
# https://pip.pypa.io/en/stable/user_guide/#using-pip-from-your-program.
|
||||
# That document also provides suggestions that should work for nearly
|
||||
# all users that are considering importing and using main() directly.
|
||||
|
||||
# However, we know that certain users will still want to invoke pip
|
||||
# in-process. If you understand and accept the implications of using pip
|
||||
# in an unsupported manner, the best approach is to use runpy to avoid
|
||||
# depending on the exact location of this entry point.
|
||||
|
||||
# The following example shows how to use runpy to invoke pip in that
|
||||
# case:
|
||||
#
|
||||
# sys.argv = ["pip", your, args, here]
|
||||
# runpy.run_module("pip", run_name="__main__")
|
||||
#
|
||||
# Note that this will exit the process after running, unlike a direct
|
||||
# call to main. As it is not safe to do any processing after calling
|
||||
# main, this should not be an issue in practice.
|
||||
|
||||
|
||||
def main(args: Optional[List[str]] = None) -> int:
|
||||
if args is None:
|
||||
args = sys.argv[1:]
|
||||
|
||||
# Configure our deprecation warnings to be sent through loggers
|
||||
deprecation.install_warning_logger()
|
||||
|
||||
autocomplete()
|
||||
|
||||
try:
|
||||
cmd_name, cmd_args = parse_command(args)
|
||||
except PipError as exc:
|
||||
sys.stderr.write(f"ERROR: {exc}")
|
||||
sys.stderr.write(os.linesep)
|
||||
sys.exit(1)
|
||||
|
||||
# Needed for locale.getpreferredencoding(False) to work
|
||||
# in pip._internal.utils.encoding.auto_decode
|
||||
try:
|
||||
locale.setlocale(locale.LC_ALL, "")
|
||||
except locale.Error as e:
|
||||
# setlocale can apparently crash if locale are uninitialized
|
||||
logger.debug("Ignoring error %s when setting locale", e)
|
||||
command = create_command(cmd_name, isolated=("--isolated" in cmd_args))
|
||||
|
||||
return command.main(cmd_args)
|
@ -0,0 +1,87 @@
|
||||
"""A single place for constructing and exposing the main parser
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from typing import List, Tuple
|
||||
|
||||
from pip._internal.cli import cmdoptions
|
||||
from pip._internal.cli.parser import ConfigOptionParser, UpdatingDefaultsHelpFormatter
|
||||
from pip._internal.commands import commands_dict, get_similar_commands
|
||||
from pip._internal.exceptions import CommandError
|
||||
from pip._internal.utils.misc import get_pip_version, get_prog
|
||||
|
||||
__all__ = ["create_main_parser", "parse_command"]
|
||||
|
||||
|
||||
def create_main_parser() -> ConfigOptionParser:
|
||||
"""Creates and returns the main parser for pip's CLI"""
|
||||
|
||||
parser = ConfigOptionParser(
|
||||
usage="\n%prog <command> [options]",
|
||||
add_help_option=False,
|
||||
formatter=UpdatingDefaultsHelpFormatter(),
|
||||
name="global",
|
||||
prog=get_prog(),
|
||||
)
|
||||
parser.disable_interspersed_args()
|
||||
|
||||
parser.version = get_pip_version()
|
||||
|
||||
# add the general options
|
||||
gen_opts = cmdoptions.make_option_group(cmdoptions.general_group, parser)
|
||||
parser.add_option_group(gen_opts)
|
||||
|
||||
# so the help formatter knows
|
||||
parser.main = True # type: ignore
|
||||
|
||||
# create command listing for description
|
||||
description = [""] + [
|
||||
f"{name:27} {command_info.summary}"
|
||||
for name, command_info in commands_dict.items()
|
||||
]
|
||||
parser.description = "\n".join(description)
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
def parse_command(args: List[str]) -> Tuple[str, List[str]]:
|
||||
parser = create_main_parser()
|
||||
|
||||
# Note: parser calls disable_interspersed_args(), so the result of this
|
||||
# call is to split the initial args into the general options before the
|
||||
# subcommand and everything else.
|
||||
# For example:
|
||||
# args: ['--timeout=5', 'install', '--user', 'INITools']
|
||||
# general_options: ['--timeout==5']
|
||||
# args_else: ['install', '--user', 'INITools']
|
||||
general_options, args_else = parser.parse_args(args)
|
||||
|
||||
# --version
|
||||
if general_options.version:
|
||||
sys.stdout.write(parser.version)
|
||||
sys.stdout.write(os.linesep)
|
||||
sys.exit()
|
||||
|
||||
# pip || pip help -> print_help()
|
||||
if not args_else or (args_else[0] == "help" and len(args_else) == 1):
|
||||
parser.print_help()
|
||||
sys.exit()
|
||||
|
||||
# the subcommand name
|
||||
cmd_name = args_else[0]
|
||||
|
||||
if cmd_name not in commands_dict:
|
||||
guess = get_similar_commands(cmd_name)
|
||||
|
||||
msg = [f'unknown command "{cmd_name}"']
|
||||
if guess:
|
||||
msg.append(f'maybe you meant "{guess}"')
|
||||
|
||||
raise CommandError(" - ".join(msg))
|
||||
|
||||
# all the args without the subcommand
|
||||
cmd_args = args[:]
|
||||
cmd_args.remove(cmd_name)
|
||||
|
||||
return cmd_name, cmd_args
|
292
utils/python-venv/Lib/site-packages/pip/_internal/cli/parser.py
Normal file
292
utils/python-venv/Lib/site-packages/pip/_internal/cli/parser.py
Normal file
@ -0,0 +1,292 @@
|
||||
"""Base option parser setup"""
|
||||
|
||||
import logging
|
||||
import optparse
|
||||
import shutil
|
||||
import sys
|
||||
import textwrap
|
||||
from contextlib import suppress
|
||||
from typing import Any, Dict, Iterator, List, Tuple
|
||||
|
||||
from pip._internal.cli.status_codes import UNKNOWN_ERROR
|
||||
from pip._internal.configuration import Configuration, ConfigurationError
|
||||
from pip._internal.utils.misc import redact_auth_from_url, strtobool
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PrettyHelpFormatter(optparse.IndentedHelpFormatter):
|
||||
"""A prettier/less verbose help formatter for optparse."""
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
# help position must be aligned with __init__.parseopts.description
|
||||
kwargs["max_help_position"] = 30
|
||||
kwargs["indent_increment"] = 1
|
||||
kwargs["width"] = shutil.get_terminal_size()[0] - 2
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def format_option_strings(self, option: optparse.Option) -> str:
|
||||
return self._format_option_strings(option)
|
||||
|
||||
def _format_option_strings(
|
||||
self, option: optparse.Option, mvarfmt: str = " <{}>", optsep: str = ", "
|
||||
) -> str:
|
||||
"""
|
||||
Return a comma-separated list of option strings and metavars.
|
||||
|
||||
:param option: tuple of (short opt, long opt), e.g: ('-f', '--format')
|
||||
:param mvarfmt: metavar format string
|
||||
:param optsep: separator
|
||||
"""
|
||||
opts = []
|
||||
|
||||
if option._short_opts:
|
||||
opts.append(option._short_opts[0])
|
||||
if option._long_opts:
|
||||
opts.append(option._long_opts[0])
|
||||
if len(opts) > 1:
|
||||
opts.insert(1, optsep)
|
||||
|
||||
if option.takes_value():
|
||||
assert option.dest is not None
|
||||
metavar = option.metavar or option.dest.lower()
|
||||
opts.append(mvarfmt.format(metavar.lower()))
|
||||
|
||||
return "".join(opts)
|
||||
|
||||
def format_heading(self, heading: str) -> str:
|
||||
if heading == "Options":
|
||||
return ""
|
||||
return heading + ":\n"
|
||||
|
||||
def format_usage(self, usage: str) -> str:
|
||||
"""
|
||||
Ensure there is only one newline between usage and the first heading
|
||||
if there is no description.
|
||||
"""
|
||||
msg = "\nUsage: {}\n".format(self.indent_lines(textwrap.dedent(usage), " "))
|
||||
return msg
|
||||
|
||||
def format_description(self, description: str) -> str:
|
||||
# leave full control over description to us
|
||||
if description:
|
||||
if hasattr(self.parser, "main"):
|
||||
label = "Commands"
|
||||
else:
|
||||
label = "Description"
|
||||
# some doc strings have initial newlines, some don't
|
||||
description = description.lstrip("\n")
|
||||
# some doc strings have final newlines and spaces, some don't
|
||||
description = description.rstrip()
|
||||
# dedent, then reindent
|
||||
description = self.indent_lines(textwrap.dedent(description), " ")
|
||||
description = f"{label}:\n{description}\n"
|
||||
return description
|
||||
else:
|
||||
return ""
|
||||
|
||||
def format_epilog(self, epilog: str) -> str:
|
||||
# leave full control over epilog to us
|
||||
if epilog:
|
||||
return epilog
|
||||
else:
|
||||
return ""
|
||||
|
||||
def indent_lines(self, text: str, indent: str) -> str:
|
||||
new_lines = [indent + line for line in text.split("\n")]
|
||||
return "\n".join(new_lines)
|
||||
|
||||
|
||||
class UpdatingDefaultsHelpFormatter(PrettyHelpFormatter):
|
||||
"""Custom help formatter for use in ConfigOptionParser.
|
||||
|
||||
This is updates the defaults before expanding them, allowing
|
||||
them to show up correctly in the help listing.
|
||||
|
||||
Also redact auth from url type options
|
||||
"""
|
||||
|
||||
def expand_default(self, option: optparse.Option) -> str:
|
||||
default_values = None
|
||||
if self.parser is not None:
|
||||
assert isinstance(self.parser, ConfigOptionParser)
|
||||
self.parser._update_defaults(self.parser.defaults)
|
||||
assert option.dest is not None
|
||||
default_values = self.parser.defaults.get(option.dest)
|
||||
help_text = super().expand_default(option)
|
||||
|
||||
if default_values and option.metavar == "URL":
|
||||
if isinstance(default_values, str):
|
||||
default_values = [default_values]
|
||||
|
||||
# If its not a list, we should abort and just return the help text
|
||||
if not isinstance(default_values, list):
|
||||
default_values = []
|
||||
|
||||
for val in default_values:
|
||||
help_text = help_text.replace(val, redact_auth_from_url(val))
|
||||
|
||||
return help_text
|
||||
|
||||
|
||||
class CustomOptionParser(optparse.OptionParser):
|
||||
def insert_option_group(
|
||||
self, idx: int, *args: Any, **kwargs: Any
|
||||
) -> optparse.OptionGroup:
|
||||
"""Insert an OptionGroup at a given position."""
|
||||
group = self.add_option_group(*args, **kwargs)
|
||||
|
||||
self.option_groups.pop()
|
||||
self.option_groups.insert(idx, group)
|
||||
|
||||
return group
|
||||
|
||||
@property
|
||||
def option_list_all(self) -> List[optparse.Option]:
|
||||
"""Get a list of all options, including those in option groups."""
|
||||
res = self.option_list[:]
|
||||
for i in self.option_groups:
|
||||
res.extend(i.option_list)
|
||||
|
||||
return res
|
||||
|
||||
|
||||
class ConfigOptionParser(CustomOptionParser):
|
||||
"""Custom option parser which updates its defaults by checking the
|
||||
configuration files and environmental variables"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*args: Any,
|
||||
name: str,
|
||||
isolated: bool = False,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
self.name = name
|
||||
self.config = Configuration(isolated)
|
||||
|
||||
assert self.name
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def check_default(self, option: optparse.Option, key: str, val: Any) -> Any:
|
||||
try:
|
||||
return option.check_value(key, val)
|
||||
except optparse.OptionValueError as exc:
|
||||
print(f"An error occurred during configuration: {exc}")
|
||||
sys.exit(3)
|
||||
|
||||
def _get_ordered_configuration_items(self) -> Iterator[Tuple[str, Any]]:
|
||||
# Configuration gives keys in an unordered manner. Order them.
|
||||
override_order = ["global", self.name, ":env:"]
|
||||
|
||||
# Pool the options into different groups
|
||||
section_items: Dict[str, List[Tuple[str, Any]]] = {
|
||||
name: [] for name in override_order
|
||||
}
|
||||
for section_key, val in self.config.items():
|
||||
# ignore empty values
|
||||
if not val:
|
||||
logger.debug(
|
||||
"Ignoring configuration key '%s' as it's value is empty.",
|
||||
section_key,
|
||||
)
|
||||
continue
|
||||
|
||||
section, key = section_key.split(".", 1)
|
||||
if section in override_order:
|
||||
section_items[section].append((key, val))
|
||||
|
||||
# Yield each group in their override order
|
||||
for section in override_order:
|
||||
for key, val in section_items[section]:
|
||||
yield key, val
|
||||
|
||||
def _update_defaults(self, defaults: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Updates the given defaults with values from the config files and
|
||||
the environ. Does a little special handling for certain types of
|
||||
options (lists)."""
|
||||
|
||||
# Accumulate complex default state.
|
||||
self.values = optparse.Values(self.defaults)
|
||||
late_eval = set()
|
||||
# Then set the options with those values
|
||||
for key, val in self._get_ordered_configuration_items():
|
||||
# '--' because configuration supports only long names
|
||||
option = self.get_option("--" + key)
|
||||
|
||||
# Ignore options not present in this parser. E.g. non-globals put
|
||||
# in [global] by users that want them to apply to all applicable
|
||||
# commands.
|
||||
if option is None:
|
||||
continue
|
||||
|
||||
assert option.dest is not None
|
||||
|
||||
if option.action in ("store_true", "store_false"):
|
||||
try:
|
||||
val = strtobool(val)
|
||||
except ValueError:
|
||||
self.error(
|
||||
"{} is not a valid value for {} option, " # noqa
|
||||
"please specify a boolean value like yes/no, "
|
||||
"true/false or 1/0 instead.".format(val, key)
|
||||
)
|
||||
elif option.action == "count":
|
||||
with suppress(ValueError):
|
||||
val = strtobool(val)
|
||||
with suppress(ValueError):
|
||||
val = int(val)
|
||||
if not isinstance(val, int) or val < 0:
|
||||
self.error(
|
||||
"{} is not a valid value for {} option, " # noqa
|
||||
"please instead specify either a non-negative integer "
|
||||
"or a boolean value like yes/no or false/true "
|
||||
"which is equivalent to 1/0.".format(val, key)
|
||||
)
|
||||
elif option.action == "append":
|
||||
val = val.split()
|
||||
val = [self.check_default(option, key, v) for v in val]
|
||||
elif option.action == "callback":
|
||||
assert option.callback is not None
|
||||
late_eval.add(option.dest)
|
||||
opt_str = option.get_opt_string()
|
||||
val = option.convert_value(opt_str, val)
|
||||
# From take_action
|
||||
args = option.callback_args or ()
|
||||
kwargs = option.callback_kwargs or {}
|
||||
option.callback(option, opt_str, val, self, *args, **kwargs)
|
||||
else:
|
||||
val = self.check_default(option, key, val)
|
||||
|
||||
defaults[option.dest] = val
|
||||
|
||||
for key in late_eval:
|
||||
defaults[key] = getattr(self.values, key)
|
||||
self.values = None
|
||||
return defaults
|
||||
|
||||
def get_default_values(self) -> optparse.Values:
|
||||
"""Overriding to make updating the defaults after instantiation of
|
||||
the option parser possible, _update_defaults() does the dirty work."""
|
||||
if not self.process_default_values:
|
||||
# Old, pre-Optik 1.5 behaviour.
|
||||
return optparse.Values(self.defaults)
|
||||
|
||||
# Load the configuration, or error out in case of an error
|
||||
try:
|
||||
self.config.load()
|
||||
except ConfigurationError as err:
|
||||
self.exit(UNKNOWN_ERROR, str(err))
|
||||
|
||||
defaults = self._update_defaults(self.defaults.copy()) # ours
|
||||
for option in self._get_all_options():
|
||||
assert option.dest is not None
|
||||
default = defaults.get(option.dest)
|
||||
if isinstance(default, str):
|
||||
opt_str = option.get_opt_string()
|
||||
defaults[option.dest] = option.check_value(opt_str, default)
|
||||
return optparse.Values(defaults)
|
||||
|
||||
def error(self, msg: str) -> None:
|
||||
self.print_usage(sys.stderr)
|
||||
self.exit(UNKNOWN_ERROR, f"{msg}\n")
|
@ -0,0 +1,250 @@
|
||||
import itertools
|
||||
import sys
|
||||
from signal import SIGINT, default_int_handler, signal
|
||||
from typing import Any
|
||||
|
||||
from pip._vendor.progress.bar import Bar, FillingCirclesBar, IncrementalBar
|
||||
from pip._vendor.progress.spinner import Spinner
|
||||
|
||||
from pip._internal.utils.compat import WINDOWS
|
||||
from pip._internal.utils.logging import get_indentation
|
||||
from pip._internal.utils.misc import format_size
|
||||
|
||||
try:
|
||||
from pip._vendor import colorama
|
||||
# Lots of different errors can come from this, including SystemError and
|
||||
# ImportError.
|
||||
except Exception:
|
||||
colorama = None
|
||||
|
||||
|
||||
def _select_progress_class(preferred: Bar, fallback: Bar) -> Bar:
|
||||
encoding = getattr(preferred.file, "encoding", None)
|
||||
|
||||
# If we don't know what encoding this file is in, then we'll just assume
|
||||
# that it doesn't support unicode and use the ASCII bar.
|
||||
if not encoding:
|
||||
return fallback
|
||||
|
||||
# Collect all of the possible characters we want to use with the preferred
|
||||
# bar.
|
||||
characters = [
|
||||
getattr(preferred, "empty_fill", ""),
|
||||
getattr(preferred, "fill", ""),
|
||||
]
|
||||
characters += list(getattr(preferred, "phases", []))
|
||||
|
||||
# Try to decode the characters we're using for the bar using the encoding
|
||||
# of the given file, if this works then we'll assume that we can use the
|
||||
# fancier bar and if not we'll fall back to the plaintext bar.
|
||||
try:
|
||||
"".join(characters).encode(encoding)
|
||||
except UnicodeEncodeError:
|
||||
return fallback
|
||||
else:
|
||||
return preferred
|
||||
|
||||
|
||||
_BaseBar: Any = _select_progress_class(IncrementalBar, Bar)
|
||||
|
||||
|
||||
class InterruptibleMixin:
|
||||
"""
|
||||
Helper to ensure that self.finish() gets called on keyboard interrupt.
|
||||
|
||||
This allows downloads to be interrupted without leaving temporary state
|
||||
(like hidden cursors) behind.
|
||||
|
||||
This class is similar to the progress library's existing SigIntMixin
|
||||
helper, but as of version 1.2, that helper has the following problems:
|
||||
|
||||
1. It calls sys.exit().
|
||||
2. It discards the existing SIGINT handler completely.
|
||||
3. It leaves its own handler in place even after an uninterrupted finish,
|
||||
which will have unexpected delayed effects if the user triggers an
|
||||
unrelated keyboard interrupt some time after a progress-displaying
|
||||
download has already completed, for example.
|
||||
"""
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
"""
|
||||
Save the original SIGINT handler for later.
|
||||
"""
|
||||
# https://github.com/python/mypy/issues/5887
|
||||
super().__init__(*args, **kwargs) # type: ignore
|
||||
|
||||
self.original_handler = signal(SIGINT, self.handle_sigint)
|
||||
|
||||
# If signal() returns None, the previous handler was not installed from
|
||||
# Python, and we cannot restore it. This probably should not happen,
|
||||
# but if it does, we must restore something sensible instead, at least.
|
||||
# The least bad option should be Python's default SIGINT handler, which
|
||||
# just raises KeyboardInterrupt.
|
||||
if self.original_handler is None:
|
||||
self.original_handler = default_int_handler
|
||||
|
||||
def finish(self) -> None:
|
||||
"""
|
||||
Restore the original SIGINT handler after finishing.
|
||||
|
||||
This should happen regardless of whether the progress display finishes
|
||||
normally, or gets interrupted.
|
||||
"""
|
||||
super().finish() # type: ignore
|
||||
signal(SIGINT, self.original_handler)
|
||||
|
||||
def handle_sigint(self, signum, frame): # type: ignore
|
||||
"""
|
||||
Call self.finish() before delegating to the original SIGINT handler.
|
||||
|
||||
This handler should only be in place while the progress display is
|
||||
active.
|
||||
"""
|
||||
self.finish()
|
||||
self.original_handler(signum, frame)
|
||||
|
||||
|
||||
class SilentBar(Bar):
|
||||
def update(self) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class BlueEmojiBar(IncrementalBar):
|
||||
|
||||
suffix = "%(percent)d%%"
|
||||
bar_prefix = " "
|
||||
bar_suffix = " "
|
||||
phases = ("\U0001F539", "\U0001F537", "\U0001F535")
|
||||
|
||||
|
||||
class DownloadProgressMixin:
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
# https://github.com/python/mypy/issues/5887
|
||||
super().__init__(*args, **kwargs) # type: ignore
|
||||
self.message: str = (" " * (get_indentation() + 2)) + self.message
|
||||
|
||||
@property
|
||||
def downloaded(self) -> str:
|
||||
return format_size(self.index) # type: ignore
|
||||
|
||||
@property
|
||||
def download_speed(self) -> str:
|
||||
# Avoid zero division errors...
|
||||
if self.avg == 0.0: # type: ignore
|
||||
return "..."
|
||||
return format_size(1 / self.avg) + "/s" # type: ignore
|
||||
|
||||
@property
|
||||
def pretty_eta(self) -> str:
|
||||
if self.eta: # type: ignore
|
||||
return f"eta {self.eta_td}" # type: ignore
|
||||
return ""
|
||||
|
||||
def iter(self, it): # type: ignore
|
||||
for x in it:
|
||||
yield x
|
||||
# B305 is incorrectly raised here
|
||||
# https://github.com/PyCQA/flake8-bugbear/issues/59
|
||||
self.next(len(x)) # noqa: B305
|
||||
self.finish()
|
||||
|
||||
|
||||
class WindowsMixin:
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
# The Windows terminal does not support the hide/show cursor ANSI codes
|
||||
# even with colorama. So we'll ensure that hide_cursor is False on
|
||||
# Windows.
|
||||
# This call needs to go before the super() call, so that hide_cursor
|
||||
# is set in time. The base progress bar class writes the "hide cursor"
|
||||
# code to the terminal in its init, so if we don't set this soon
|
||||
# enough, we get a "hide" with no corresponding "show"...
|
||||
if WINDOWS and self.hide_cursor: # type: ignore
|
||||
self.hide_cursor = False
|
||||
|
||||
# https://github.com/python/mypy/issues/5887
|
||||
super().__init__(*args, **kwargs) # type: ignore
|
||||
|
||||
# Check if we are running on Windows and we have the colorama module,
|
||||
# if we do then wrap our file with it.
|
||||
if WINDOWS and colorama:
|
||||
self.file = colorama.AnsiToWin32(self.file) # type: ignore
|
||||
# The progress code expects to be able to call self.file.isatty()
|
||||
# but the colorama.AnsiToWin32() object doesn't have that, so we'll
|
||||
# add it.
|
||||
self.file.isatty = lambda: self.file.wrapped.isatty()
|
||||
# The progress code expects to be able to call self.file.flush()
|
||||
# but the colorama.AnsiToWin32() object doesn't have that, so we'll
|
||||
# add it.
|
||||
self.file.flush = lambda: self.file.wrapped.flush()
|
||||
|
||||
|
||||
class BaseDownloadProgressBar(WindowsMixin, InterruptibleMixin, DownloadProgressMixin):
|
||||
|
||||
file = sys.stdout
|
||||
message = "%(percent)d%%"
|
||||
suffix = "%(downloaded)s %(download_speed)s %(pretty_eta)s"
|
||||
|
||||
|
||||
class DefaultDownloadProgressBar(BaseDownloadProgressBar, _BaseBar):
|
||||
pass
|
||||
|
||||
|
||||
class DownloadSilentBar(BaseDownloadProgressBar, SilentBar):
|
||||
pass
|
||||
|
||||
|
||||
class DownloadBar(BaseDownloadProgressBar, Bar):
|
||||
pass
|
||||
|
||||
|
||||
class DownloadFillingCirclesBar(BaseDownloadProgressBar, FillingCirclesBar):
|
||||
pass
|
||||
|
||||
|
||||
class DownloadBlueEmojiProgressBar(BaseDownloadProgressBar, BlueEmojiBar):
|
||||
pass
|
||||
|
||||
|
||||
class DownloadProgressSpinner(
|
||||
WindowsMixin, InterruptibleMixin, DownloadProgressMixin, Spinner
|
||||
):
|
||||
|
||||
file = sys.stdout
|
||||
suffix = "%(downloaded)s %(download_speed)s"
|
||||
|
||||
def next_phase(self) -> str:
|
||||
if not hasattr(self, "_phaser"):
|
||||
self._phaser = itertools.cycle(self.phases)
|
||||
return next(self._phaser)
|
||||
|
||||
def update(self) -> None:
|
||||
message = self.message % self
|
||||
phase = self.next_phase()
|
||||
suffix = self.suffix % self
|
||||
line = "".join(
|
||||
[
|
||||
message,
|
||||
" " if message else "",
|
||||
phase,
|
||||
" " if suffix else "",
|
||||
suffix,
|
||||
]
|
||||
)
|
||||
|
||||
self.writeln(line)
|
||||
|
||||
|
||||
BAR_TYPES = {
|
||||
"off": (DownloadSilentBar, DownloadSilentBar),
|
||||
"on": (DefaultDownloadProgressBar, DownloadProgressSpinner),
|
||||
"ascii": (DownloadBar, DownloadProgressSpinner),
|
||||
"pretty": (DownloadFillingCirclesBar, DownloadProgressSpinner),
|
||||
"emoji": (DownloadBlueEmojiProgressBar, DownloadProgressSpinner),
|
||||
}
|
||||
|
||||
|
||||
def DownloadProgressProvider(progress_bar, max=None): # type: ignore
|
||||
if max is None or max == 0:
|
||||
return BAR_TYPES[progress_bar][1]().iter
|
||||
else:
|
||||
return BAR_TYPES[progress_bar][0](max=max).iter
|
@ -0,0 +1,469 @@
|
||||
"""Contains the Command base classes that depend on PipSession.
|
||||
|
||||
The classes in this module are in a separate module so the commands not
|
||||
needing download / PackageFinder capability don't unnecessarily import the
|
||||
PackageFinder machinery and all its vendored dependencies, etc.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from functools import partial
|
||||
from optparse import Values
|
||||
from typing import Any, List, Optional, Tuple
|
||||
|
||||
from pip._internal.cache import WheelCache
|
||||
from pip._internal.cli import cmdoptions
|
||||
from pip._internal.cli.base_command import Command
|
||||
from pip._internal.cli.command_context import CommandContextMixIn
|
||||
from pip._internal.exceptions import CommandError, PreviousBuildDirError
|
||||
from pip._internal.index.collector import LinkCollector
|
||||
from pip._internal.index.package_finder import PackageFinder
|
||||
from pip._internal.models.selection_prefs import SelectionPreferences
|
||||
from pip._internal.models.target_python import TargetPython
|
||||
from pip._internal.network.session import PipSession
|
||||
from pip._internal.operations.prepare import RequirementPreparer
|
||||
from pip._internal.req.constructors import (
|
||||
install_req_from_editable,
|
||||
install_req_from_line,
|
||||
install_req_from_parsed_requirement,
|
||||
install_req_from_req_string,
|
||||
)
|
||||
from pip._internal.req.req_file import parse_requirements
|
||||
from pip._internal.req.req_install import InstallRequirement
|
||||
from pip._internal.req.req_tracker import RequirementTracker
|
||||
from pip._internal.resolution.base import BaseResolver
|
||||
from pip._internal.self_outdated_check import pip_self_version_check
|
||||
from pip._internal.utils.deprecation import deprecated
|
||||
from pip._internal.utils.temp_dir import (
|
||||
TempDirectory,
|
||||
TempDirectoryTypeRegistry,
|
||||
tempdir_kinds,
|
||||
)
|
||||
from pip._internal.utils.virtualenv import running_under_virtualenv
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SessionCommandMixin(CommandContextMixIn):
|
||||
|
||||
"""
|
||||
A class mixin for command classes needing _build_session().
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._session: Optional[PipSession] = None
|
||||
|
||||
@classmethod
|
||||
def _get_index_urls(cls, options: Values) -> Optional[List[str]]:
|
||||
"""Return a list of index urls from user-provided options."""
|
||||
index_urls = []
|
||||
if not getattr(options, "no_index", False):
|
||||
url = getattr(options, "index_url", None)
|
||||
if url:
|
||||
index_urls.append(url)
|
||||
urls = getattr(options, "extra_index_urls", None)
|
||||
if urls:
|
||||
index_urls.extend(urls)
|
||||
# Return None rather than an empty list
|
||||
return index_urls or None
|
||||
|
||||
def get_default_session(self, options: Values) -> PipSession:
|
||||
"""Get a default-managed session."""
|
||||
if self._session is None:
|
||||
self._session = self.enter_context(self._build_session(options))
|
||||
# there's no type annotation on requests.Session, so it's
|
||||
# automatically ContextManager[Any] and self._session becomes Any,
|
||||
# then https://github.com/python/mypy/issues/7696 kicks in
|
||||
assert self._session is not None
|
||||
return self._session
|
||||
|
||||
def _build_session(
|
||||
self,
|
||||
options: Values,
|
||||
retries: Optional[int] = None,
|
||||
timeout: Optional[int] = None,
|
||||
) -> PipSession:
|
||||
assert not options.cache_dir or os.path.isabs(options.cache_dir)
|
||||
session = PipSession(
|
||||
cache=(
|
||||
os.path.join(options.cache_dir, "http") if options.cache_dir else None
|
||||
),
|
||||
retries=retries if retries is not None else options.retries,
|
||||
trusted_hosts=options.trusted_hosts,
|
||||
index_urls=self._get_index_urls(options),
|
||||
)
|
||||
|
||||
# Handle custom ca-bundles from the user
|
||||
if options.cert:
|
||||
session.verify = options.cert
|
||||
|
||||
# Handle SSL client certificate
|
||||
if options.client_cert:
|
||||
session.cert = options.client_cert
|
||||
|
||||
# Handle timeouts
|
||||
if options.timeout or timeout:
|
||||
session.timeout = timeout if timeout is not None else options.timeout
|
||||
|
||||
# Handle configured proxies
|
||||
if options.proxy:
|
||||
session.proxies = {
|
||||
"http": options.proxy,
|
||||
"https": options.proxy,
|
||||
}
|
||||
|
||||
# Determine if we can prompt the user for authentication or not
|
||||
session.auth.prompting = not options.no_input
|
||||
|
||||
return session
|
||||
|
||||
|
||||
class IndexGroupCommand(Command, SessionCommandMixin):
|
||||
|
||||
"""
|
||||
Abstract base class for commands with the index_group options.
|
||||
|
||||
This also corresponds to the commands that permit the pip version check.
|
||||
"""
|
||||
|
||||
def handle_pip_version_check(self, options: Values) -> None:
|
||||
"""
|
||||
Do the pip version check if not disabled.
|
||||
|
||||
This overrides the default behavior of not doing the check.
|
||||
"""
|
||||
# Make sure the index_group options are present.
|
||||
assert hasattr(options, "no_index")
|
||||
|
||||
if options.disable_pip_version_check or options.no_index:
|
||||
return
|
||||
|
||||
# Otherwise, check if we're using the latest version of pip available.
|
||||
session = self._build_session(
|
||||
options, retries=0, timeout=min(5, options.timeout)
|
||||
)
|
||||
with session:
|
||||
pip_self_version_check(session, options)
|
||||
|
||||
|
||||
KEEPABLE_TEMPDIR_TYPES = [
|
||||
tempdir_kinds.BUILD_ENV,
|
||||
tempdir_kinds.EPHEM_WHEEL_CACHE,
|
||||
tempdir_kinds.REQ_BUILD,
|
||||
]
|
||||
|
||||
|
||||
def warn_if_run_as_root() -> None:
|
||||
"""Output a warning for sudo users on Unix.
|
||||
|
||||
In a virtual environment, sudo pip still writes to virtualenv.
|
||||
On Windows, users may run pip as Administrator without issues.
|
||||
This warning only applies to Unix root users outside of virtualenv.
|
||||
"""
|
||||
if running_under_virtualenv():
|
||||
return
|
||||
if not hasattr(os, "getuid"):
|
||||
return
|
||||
# On Windows, there are no "system managed" Python packages. Installing as
|
||||
# Administrator via pip is the correct way of updating system environments.
|
||||
#
|
||||
# We choose sys.platform over utils.compat.WINDOWS here to enable Mypy platform
|
||||
# checks: https://mypy.readthedocs.io/en/stable/common_issues.html
|
||||
if sys.platform == "win32" or sys.platform == "cygwin":
|
||||
return
|
||||
|
||||
if os.getuid() != 0:
|
||||
return
|
||||
|
||||
logger.warning(
|
||||
"Running pip as the 'root' user can result in broken permissions and "
|
||||
"conflicting behaviour with the system package manager. "
|
||||
"It is recommended to use a virtual environment instead: "
|
||||
"https://pip.pypa.io/warnings/venv"
|
||||
)
|
||||
|
||||
|
||||
def with_cleanup(func: Any) -> Any:
|
||||
"""Decorator for common logic related to managing temporary
|
||||
directories.
|
||||
"""
|
||||
|
||||
def configure_tempdir_registry(registry: TempDirectoryTypeRegistry) -> None:
|
||||
for t in KEEPABLE_TEMPDIR_TYPES:
|
||||
registry.set_delete(t, False)
|
||||
|
||||
def wrapper(
|
||||
self: RequirementCommand, options: Values, args: List[Any]
|
||||
) -> Optional[int]:
|
||||
assert self.tempdir_registry is not None
|
||||
if options.no_clean:
|
||||
configure_tempdir_registry(self.tempdir_registry)
|
||||
|
||||
try:
|
||||
return func(self, options, args)
|
||||
except PreviousBuildDirError:
|
||||
# This kind of conflict can occur when the user passes an explicit
|
||||
# build directory with a pre-existing folder. In that case we do
|
||||
# not want to accidentally remove it.
|
||||
configure_tempdir_registry(self.tempdir_registry)
|
||||
raise
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class RequirementCommand(IndexGroupCommand):
|
||||
def __init__(self, *args: Any, **kw: Any) -> None:
|
||||
super().__init__(*args, **kw)
|
||||
|
||||
self.cmd_opts.add_option(cmdoptions.no_clean())
|
||||
|
||||
@staticmethod
|
||||
def determine_resolver_variant(options: Values) -> str:
|
||||
"""Determines which resolver should be used, based on the given options."""
|
||||
if "legacy-resolver" in options.deprecated_features_enabled:
|
||||
return "legacy"
|
||||
|
||||
return "2020-resolver"
|
||||
|
||||
@classmethod
|
||||
def make_requirement_preparer(
|
||||
cls,
|
||||
temp_build_dir: TempDirectory,
|
||||
options: Values,
|
||||
req_tracker: RequirementTracker,
|
||||
session: PipSession,
|
||||
finder: PackageFinder,
|
||||
use_user_site: bool,
|
||||
download_dir: Optional[str] = None,
|
||||
) -> RequirementPreparer:
|
||||
"""
|
||||
Create a RequirementPreparer instance for the given parameters.
|
||||
"""
|
||||
temp_build_dir_path = temp_build_dir.path
|
||||
assert temp_build_dir_path is not None
|
||||
|
||||
resolver_variant = cls.determine_resolver_variant(options)
|
||||
if resolver_variant == "2020-resolver":
|
||||
lazy_wheel = "fast-deps" in options.features_enabled
|
||||
if lazy_wheel:
|
||||
logger.warning(
|
||||
"pip is using lazily downloaded wheels using HTTP "
|
||||
"range requests to obtain dependency information. "
|
||||
"This experimental feature is enabled through "
|
||||
"--use-feature=fast-deps and it is not ready for "
|
||||
"production."
|
||||
)
|
||||
else:
|
||||
lazy_wheel = False
|
||||
if "fast-deps" in options.features_enabled:
|
||||
logger.warning(
|
||||
"fast-deps has no effect when used with the legacy resolver."
|
||||
)
|
||||
|
||||
in_tree_build = "out-of-tree-build" not in options.deprecated_features_enabled
|
||||
if "in-tree-build" in options.features_enabled:
|
||||
deprecated(
|
||||
reason="In-tree builds are now the default.",
|
||||
replacement="to remove the --use-feature=in-tree-build flag",
|
||||
gone_in="22.1",
|
||||
)
|
||||
if "out-of-tree-build" in options.deprecated_features_enabled:
|
||||
deprecated(
|
||||
reason="Out-of-tree builds are deprecated.",
|
||||
replacement=None,
|
||||
gone_in="22.1",
|
||||
)
|
||||
|
||||
return RequirementPreparer(
|
||||
build_dir=temp_build_dir_path,
|
||||
src_dir=options.src_dir,
|
||||
download_dir=download_dir,
|
||||
build_isolation=options.build_isolation,
|
||||
req_tracker=req_tracker,
|
||||
session=session,
|
||||
progress_bar=options.progress_bar,
|
||||
finder=finder,
|
||||
require_hashes=options.require_hashes,
|
||||
use_user_site=use_user_site,
|
||||
lazy_wheel=lazy_wheel,
|
||||
in_tree_build=in_tree_build,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def make_resolver(
|
||||
cls,
|
||||
preparer: RequirementPreparer,
|
||||
finder: PackageFinder,
|
||||
options: Values,
|
||||
wheel_cache: Optional[WheelCache] = None,
|
||||
use_user_site: bool = False,
|
||||
ignore_installed: bool = True,
|
||||
ignore_requires_python: bool = False,
|
||||
force_reinstall: bool = False,
|
||||
upgrade_strategy: str = "to-satisfy-only",
|
||||
use_pep517: Optional[bool] = None,
|
||||
py_version_info: Optional[Tuple[int, ...]] = None,
|
||||
) -> BaseResolver:
|
||||
"""
|
||||
Create a Resolver instance for the given parameters.
|
||||
"""
|
||||
make_install_req = partial(
|
||||
install_req_from_req_string,
|
||||
isolated=options.isolated_mode,
|
||||
use_pep517=use_pep517,
|
||||
)
|
||||
resolver_variant = cls.determine_resolver_variant(options)
|
||||
# The long import name and duplicated invocation is needed to convince
|
||||
# Mypy into correctly typechecking. Otherwise it would complain the
|
||||
# "Resolver" class being redefined.
|
||||
if resolver_variant == "2020-resolver":
|
||||
import pip._internal.resolution.resolvelib.resolver
|
||||
|
||||
return pip._internal.resolution.resolvelib.resolver.Resolver(
|
||||
preparer=preparer,
|
||||
finder=finder,
|
||||
wheel_cache=wheel_cache,
|
||||
make_install_req=make_install_req,
|
||||
use_user_site=use_user_site,
|
||||
ignore_dependencies=options.ignore_dependencies,
|
||||
ignore_installed=ignore_installed,
|
||||
ignore_requires_python=ignore_requires_python,
|
||||
force_reinstall=force_reinstall,
|
||||
upgrade_strategy=upgrade_strategy,
|
||||
py_version_info=py_version_info,
|
||||
)
|
||||
import pip._internal.resolution.legacy.resolver
|
||||
|
||||
return pip._internal.resolution.legacy.resolver.Resolver(
|
||||
preparer=preparer,
|
||||
finder=finder,
|
||||
wheel_cache=wheel_cache,
|
||||
make_install_req=make_install_req,
|
||||
use_user_site=use_user_site,
|
||||
ignore_dependencies=options.ignore_dependencies,
|
||||
ignore_installed=ignore_installed,
|
||||
ignore_requires_python=ignore_requires_python,
|
||||
force_reinstall=force_reinstall,
|
||||
upgrade_strategy=upgrade_strategy,
|
||||
py_version_info=py_version_info,
|
||||
)
|
||||
|
||||
def get_requirements(
|
||||
self,
|
||||
args: List[str],
|
||||
options: Values,
|
||||
finder: PackageFinder,
|
||||
session: PipSession,
|
||||
) -> List[InstallRequirement]:
|
||||
"""
|
||||
Parse command-line arguments into the corresponding requirements.
|
||||
"""
|
||||
requirements: List[InstallRequirement] = []
|
||||
for filename in options.constraints:
|
||||
for parsed_req in parse_requirements(
|
||||
filename,
|
||||
constraint=True,
|
||||
finder=finder,
|
||||
options=options,
|
||||
session=session,
|
||||
):
|
||||
req_to_add = install_req_from_parsed_requirement(
|
||||
parsed_req,
|
||||
isolated=options.isolated_mode,
|
||||
user_supplied=False,
|
||||
)
|
||||
requirements.append(req_to_add)
|
||||
|
||||
for req in args:
|
||||
req_to_add = install_req_from_line(
|
||||
req,
|
||||
None,
|
||||
isolated=options.isolated_mode,
|
||||
use_pep517=options.use_pep517,
|
||||
user_supplied=True,
|
||||
)
|
||||
requirements.append(req_to_add)
|
||||
|
||||
for req in options.editables:
|
||||
req_to_add = install_req_from_editable(
|
||||
req,
|
||||
user_supplied=True,
|
||||
isolated=options.isolated_mode,
|
||||
use_pep517=options.use_pep517,
|
||||
)
|
||||
requirements.append(req_to_add)
|
||||
|
||||
# NOTE: options.require_hashes may be set if --require-hashes is True
|
||||
for filename in options.requirements:
|
||||
for parsed_req in parse_requirements(
|
||||
filename, finder=finder, options=options, session=session
|
||||
):
|
||||
req_to_add = install_req_from_parsed_requirement(
|
||||
parsed_req,
|
||||
isolated=options.isolated_mode,
|
||||
use_pep517=options.use_pep517,
|
||||
user_supplied=True,
|
||||
)
|
||||
requirements.append(req_to_add)
|
||||
|
||||
# If any requirement has hash options, enable hash checking.
|
||||
if any(req.has_hash_options for req in requirements):
|
||||
options.require_hashes = True
|
||||
|
||||
if not (args or options.editables or options.requirements):
|
||||
opts = {"name": self.name}
|
||||
if options.find_links:
|
||||
raise CommandError(
|
||||
"You must give at least one requirement to {name} "
|
||||
'(maybe you meant "pip {name} {links}"?)'.format(
|
||||
**dict(opts, links=" ".join(options.find_links))
|
||||
)
|
||||
)
|
||||
else:
|
||||
raise CommandError(
|
||||
"You must give at least one requirement to {name} "
|
||||
'(see "pip help {name}")'.format(**opts)
|
||||
)
|
||||
|
||||
return requirements
|
||||
|
||||
@staticmethod
|
||||
def trace_basic_info(finder: PackageFinder) -> None:
|
||||
"""
|
||||
Trace basic information about the provided objects.
|
||||
"""
|
||||
# Display where finder is looking for packages
|
||||
search_scope = finder.search_scope
|
||||
locations = search_scope.get_formatted_locations()
|
||||
if locations:
|
||||
logger.info(locations)
|
||||
|
||||
def _build_package_finder(
|
||||
self,
|
||||
options: Values,
|
||||
session: PipSession,
|
||||
target_python: Optional[TargetPython] = None,
|
||||
ignore_requires_python: Optional[bool] = None,
|
||||
) -> PackageFinder:
|
||||
"""
|
||||
Create a package finder appropriate to this requirement command.
|
||||
|
||||
:param ignore_requires_python: Whether to ignore incompatible
|
||||
"Requires-Python" values in links. Defaults to False.
|
||||
"""
|
||||
link_collector = LinkCollector.create(session, options=options)
|
||||
selection_prefs = SelectionPreferences(
|
||||
allow_yanked=True,
|
||||
format_control=options.format_control,
|
||||
allow_all_prereleases=options.pre,
|
||||
prefer_binary=options.prefer_binary,
|
||||
ignore_requires_python=ignore_requires_python,
|
||||
)
|
||||
|
||||
return PackageFinder.create(
|
||||
link_collector=link_collector,
|
||||
selection_prefs=selection_prefs,
|
||||
target_python=target_python,
|
||||
)
|
@ -0,0 +1,157 @@
|
||||
import contextlib
|
||||
import itertools
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
from typing import IO, Iterator
|
||||
|
||||
from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR
|
||||
|
||||
from pip._internal.utils.compat import WINDOWS
|
||||
from pip._internal.utils.logging import get_indentation
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SpinnerInterface:
|
||||
def spin(self) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
def finish(self, final_status: str) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class InteractiveSpinner(SpinnerInterface):
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
file: IO[str] = None,
|
||||
spin_chars: str = "-\\|/",
|
||||
# Empirically, 8 updates/second looks nice
|
||||
min_update_interval_seconds: float = 0.125,
|
||||
):
|
||||
self._message = message
|
||||
if file is None:
|
||||
file = sys.stdout
|
||||
self._file = file
|
||||
self._rate_limiter = RateLimiter(min_update_interval_seconds)
|
||||
self._finished = False
|
||||
|
||||
self._spin_cycle = itertools.cycle(spin_chars)
|
||||
|
||||
self._file.write(" " * get_indentation() + self._message + " ... ")
|
||||
self._width = 0
|
||||
|
||||
def _write(self, status: str) -> None:
|
||||
assert not self._finished
|
||||
# Erase what we wrote before by backspacing to the beginning, writing
|
||||
# spaces to overwrite the old text, and then backspacing again
|
||||
backup = "\b" * self._width
|
||||
self._file.write(backup + " " * self._width + backup)
|
||||
# Now we have a blank slate to add our status
|
||||
self._file.write(status)
|
||||
self._width = len(status)
|
||||
self._file.flush()
|
||||
self._rate_limiter.reset()
|
||||
|
||||
def spin(self) -> None:
|
||||
if self._finished:
|
||||
return
|
||||
if not self._rate_limiter.ready():
|
||||
return
|
||||
self._write(next(self._spin_cycle))
|
||||
|
||||
def finish(self, final_status: str) -> None:
|
||||
if self._finished:
|
||||
return
|
||||
self._write(final_status)
|
||||
self._file.write("\n")
|
||||
self._file.flush()
|
||||
self._finished = True
|
||||
|
||||
|
||||
# Used for dumb terminals, non-interactive installs (no tty), etc.
|
||||
# We still print updates occasionally (once every 60 seconds by default) to
|
||||
# act as a keep-alive for systems like Travis-CI that take lack-of-output as
|
||||
# an indication that a task has frozen.
|
||||
class NonInteractiveSpinner(SpinnerInterface):
|
||||
def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None:
|
||||
self._message = message
|
||||
self._finished = False
|
||||
self._rate_limiter = RateLimiter(min_update_interval_seconds)
|
||||
self._update("started")
|
||||
|
||||
def _update(self, status: str) -> None:
|
||||
assert not self._finished
|
||||
self._rate_limiter.reset()
|
||||
logger.info("%s: %s", self._message, status)
|
||||
|
||||
def spin(self) -> None:
|
||||
if self._finished:
|
||||
return
|
||||
if not self._rate_limiter.ready():
|
||||
return
|
||||
self._update("still running...")
|
||||
|
||||
def finish(self, final_status: str) -> None:
|
||||
if self._finished:
|
||||
return
|
||||
self._update(f"finished with status '{final_status}'")
|
||||
self._finished = True
|
||||
|
||||
|
||||
class RateLimiter:
|
||||
def __init__(self, min_update_interval_seconds: float) -> None:
|
||||
self._min_update_interval_seconds = min_update_interval_seconds
|
||||
self._last_update: float = 0
|
||||
|
||||
def ready(self) -> bool:
|
||||
now = time.time()
|
||||
delta = now - self._last_update
|
||||
return delta >= self._min_update_interval_seconds
|
||||
|
||||
def reset(self) -> None:
|
||||
self._last_update = time.time()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def open_spinner(message: str) -> Iterator[SpinnerInterface]:
|
||||
# Interactive spinner goes directly to sys.stdout rather than being routed
|
||||
# through the logging system, but it acts like it has level INFO,
|
||||
# i.e. it's only displayed if we're at level INFO or better.
|
||||
# Non-interactive spinner goes through the logging system, so it is always
|
||||
# in sync with logging configuration.
|
||||
if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO:
|
||||
spinner: SpinnerInterface = InteractiveSpinner(message)
|
||||
else:
|
||||
spinner = NonInteractiveSpinner(message)
|
||||
try:
|
||||
with hidden_cursor(sys.stdout):
|
||||
yield spinner
|
||||
except KeyboardInterrupt:
|
||||
spinner.finish("canceled")
|
||||
raise
|
||||
except Exception:
|
||||
spinner.finish("error")
|
||||
raise
|
||||
else:
|
||||
spinner.finish("done")
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def hidden_cursor(file: IO[str]) -> Iterator[None]:
|
||||
# The Windows terminal does not support the hide/show cursor ANSI codes,
|
||||
# even via colorama. So don't even try.
|
||||
if WINDOWS:
|
||||
yield
|
||||
# We don't want to clutter the output with control characters if we're
|
||||
# writing to a file, or if the user is running with --quiet.
|
||||
# See https://github.com/pypa/pip/issues/3418
|
||||
elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO:
|
||||
yield
|
||||
else:
|
||||
file.write(HIDE_CURSOR)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
file.write(SHOW_CURSOR)
|
@ -0,0 +1,6 @@
|
||||
SUCCESS = 0
|
||||
ERROR = 1
|
||||
UNKNOWN_ERROR = 2
|
||||
VIRTUALENV_NOT_FOUND = 3
|
||||
PREVIOUS_BUILD_DIR_ERROR = 4
|
||||
NO_MATCHES_FOUND = 23
|
Reference in New Issue
Block a user