D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
proc
/
self
/
root
/
home
/
vblioqus
/
karachi777.vip
/
in
/
106014
/
900508
/
Filename :
lib.zip
back
Copy
PK )�c\%���B �B 9 python3.11/site-packages/pip/_internal/cli/req_command.pynu �[��� """Contains the Command base classes that depend on PipSession. The classes in this module are in a separate module so the commands not needing download / PackageFinder capability don't unnecessarily import the PackageFinder machinery and all its vendored dependencies, etc. """ import logging import os import sys from functools import partial from optparse import Values from typing import Any, List, Optional, Tuple from pip._internal.cache import WheelCache from pip._internal.cli import cmdoptions from pip._internal.cli.base_command import Command from pip._internal.cli.command_context import CommandContextMixIn from pip._internal.exceptions import CommandError, PreviousBuildDirError from pip._internal.index.collector import LinkCollector from pip._internal.index.package_finder import PackageFinder from pip._internal.models.selection_prefs import SelectionPreferences from pip._internal.models.target_python import TargetPython from pip._internal.network.session import PipSession from pip._internal.operations.prepare import RequirementPreparer from pip._internal.req.constructors import ( install_req_from_editable, install_req_from_line, install_req_from_parsed_requirement, install_req_from_req_string, ) from pip._internal.req.req_file import parse_requirements from pip._internal.req.req_install import InstallRequirement from pip._internal.req.req_tracker import RequirementTracker from pip._internal.resolution.base import BaseResolver from pip._internal.self_outdated_check import pip_self_version_check from pip._internal.utils.deprecation import deprecated from pip._internal.utils.temp_dir import ( TempDirectory, TempDirectoryTypeRegistry, tempdir_kinds, ) from pip._internal.utils.virtualenv import running_under_virtualenv logger = logging.getLogger(__name__) class SessionCommandMixin(CommandContextMixIn): """ A class mixin for command classes needing _build_session(). """ def __init__(self) -> None: super().__init__() self._session: Optional[PipSession] = None @classmethod def _get_index_urls(cls, options: Values) -> Optional[List[str]]: """Return a list of index urls from user-provided options.""" index_urls = [] if not getattr(options, "no_index", False): url = getattr(options, "index_url", None) if url: index_urls.append(url) urls = getattr(options, "extra_index_urls", None) if urls: index_urls.extend(urls) # Return None rather than an empty list return index_urls or None def get_default_session(self, options: Values) -> PipSession: """Get a default-managed session.""" if self._session is None: self._session = self.enter_context(self._build_session(options)) # there's no type annotation on requests.Session, so it's # automatically ContextManager[Any] and self._session becomes Any, # then https://github.com/python/mypy/issues/7696 kicks in assert self._session is not None return self._session def _build_session( self, options: Values, retries: Optional[int] = None, timeout: Optional[int] = None, ) -> PipSession: assert not options.cache_dir or os.path.isabs(options.cache_dir) session = PipSession( cache=( os.path.join(options.cache_dir, "http") if options.cache_dir else None ), retries=retries if retries is not None else options.retries, trusted_hosts=options.trusted_hosts, index_urls=self._get_index_urls(options), ) # Handle custom ca-bundles from the user if options.cert: session.verify = options.cert # Handle SSL client certificate if options.client_cert: session.cert = options.client_cert # Handle timeouts if options.timeout or timeout: session.timeout = timeout if timeout is not None else options.timeout # Handle configured proxies if options.proxy: session.proxies = { "http": options.proxy, "https": options.proxy, } # Determine if we can prompt the user for authentication or not session.auth.prompting = not options.no_input return session class IndexGroupCommand(Command, SessionCommandMixin): """ Abstract base class for commands with the index_group options. This also corresponds to the commands that permit the pip version check. """ def handle_pip_version_check(self, options: Values) -> None: """ Do the pip version check if not disabled. This overrides the default behavior of not doing the check. """ # Make sure the index_group options are present. assert hasattr(options, "no_index") if options.disable_pip_version_check or options.no_index: return # Otherwise, check if we're using the latest version of pip available. session = self._build_session( options, retries=0, timeout=min(5, options.timeout) ) with session: pip_self_version_check(session, options) KEEPABLE_TEMPDIR_TYPES = [ tempdir_kinds.BUILD_ENV, tempdir_kinds.EPHEM_WHEEL_CACHE, tempdir_kinds.REQ_BUILD, ] def warn_if_run_as_root() -> None: """Output a warning for sudo users on Unix. In a virtual environment, sudo pip still writes to virtualenv. On Windows, users may run pip as Administrator without issues. This warning only applies to Unix root users outside of virtualenv. """ if running_under_virtualenv(): return if not hasattr(os, "getuid"): return # On Windows, there are no "system managed" Python packages. Installing as # Administrator via pip is the correct way of updating system environments. # # We choose sys.platform over utils.compat.WINDOWS here to enable Mypy platform # checks: https://mypy.readthedocs.io/en/stable/common_issues.html if sys.platform == "win32" or sys.platform == "cygwin": return if os.getuid() != 0: return logger.warning( "Running pip as the 'root' user can result in broken permissions and " "conflicting behaviour with the system package manager. " "It is recommended to use a virtual environment instead: " "https://pip.pypa.io/warnings/venv" ) def with_cleanup(func: Any) -> Any: """Decorator for common logic related to managing temporary directories. """ def configure_tempdir_registry(registry: TempDirectoryTypeRegistry) -> None: for t in KEEPABLE_TEMPDIR_TYPES: registry.set_delete(t, False) def wrapper( self: RequirementCommand, options: Values, args: List[Any] ) -> Optional[int]: assert self.tempdir_registry is not None if options.no_clean: configure_tempdir_registry(self.tempdir_registry) try: return func(self, options, args) except PreviousBuildDirError: # This kind of conflict can occur when the user passes an explicit # build directory with a pre-existing folder. In that case we do # not want to accidentally remove it. configure_tempdir_registry(self.tempdir_registry) raise return wrapper class RequirementCommand(IndexGroupCommand): def __init__(self, *args: Any, **kw: Any) -> None: super().__init__(*args, **kw) self.cmd_opts.add_option(cmdoptions.no_clean()) @staticmethod def determine_resolver_variant(options: Values) -> str: """Determines which resolver should be used, based on the given options.""" if "legacy-resolver" in options.deprecated_features_enabled: return "legacy" return "2020-resolver" @classmethod def make_requirement_preparer( cls, temp_build_dir: TempDirectory, options: Values, req_tracker: RequirementTracker, session: PipSession, finder: PackageFinder, use_user_site: bool, download_dir: Optional[str] = None, ) -> RequirementPreparer: """ Create a RequirementPreparer instance for the given parameters. """ temp_build_dir_path = temp_build_dir.path assert temp_build_dir_path is not None resolver_variant = cls.determine_resolver_variant(options) if resolver_variant == "2020-resolver": lazy_wheel = "fast-deps" in options.features_enabled if lazy_wheel: logger.warning( "pip is using lazily downloaded wheels using HTTP " "range requests to obtain dependency information. " "This experimental feature is enabled through " "--use-feature=fast-deps and it is not ready for " "production." ) else: lazy_wheel = False if "fast-deps" in options.features_enabled: logger.warning( "fast-deps has no effect when used with the legacy resolver." ) in_tree_build = "out-of-tree-build" not in options.deprecated_features_enabled if "in-tree-build" in options.features_enabled: deprecated( reason="In-tree builds are now the default.", replacement="to remove the --use-feature=in-tree-build flag", gone_in="22.1", ) if "out-of-tree-build" in options.deprecated_features_enabled: deprecated( reason="Out-of-tree builds are deprecated.", replacement=None, gone_in="22.1", ) return RequirementPreparer( build_dir=temp_build_dir_path, src_dir=options.src_dir, download_dir=download_dir, build_isolation=options.build_isolation, req_tracker=req_tracker, session=session, progress_bar=options.progress_bar, finder=finder, require_hashes=options.require_hashes, use_user_site=use_user_site, lazy_wheel=lazy_wheel, in_tree_build=in_tree_build, ) @classmethod def make_resolver( cls, preparer: RequirementPreparer, finder: PackageFinder, options: Values, wheel_cache: Optional[WheelCache] = None, use_user_site: bool = False, ignore_installed: bool = True, ignore_requires_python: bool = False, force_reinstall: bool = False, upgrade_strategy: str = "to-satisfy-only", use_pep517: Optional[bool] = None, py_version_info: Optional[Tuple[int, ...]] = None, ) -> BaseResolver: """ Create a Resolver instance for the given parameters. """ make_install_req = partial( install_req_from_req_string, isolated=options.isolated_mode, use_pep517=use_pep517, ) resolver_variant = cls.determine_resolver_variant(options) # The long import name and duplicated invocation is needed to convince # Mypy into correctly typechecking. Otherwise it would complain the # "Resolver" class being redefined. if resolver_variant == "2020-resolver": import pip._internal.resolution.resolvelib.resolver return pip._internal.resolution.resolvelib.resolver.Resolver( preparer=preparer, finder=finder, wheel_cache=wheel_cache, make_install_req=make_install_req, use_user_site=use_user_site, ignore_dependencies=options.ignore_dependencies, ignore_installed=ignore_installed, ignore_requires_python=ignore_requires_python, force_reinstall=force_reinstall, upgrade_strategy=upgrade_strategy, py_version_info=py_version_info, ) import pip._internal.resolution.legacy.resolver return pip._internal.resolution.legacy.resolver.Resolver( preparer=preparer, finder=finder, wheel_cache=wheel_cache, make_install_req=make_install_req, use_user_site=use_user_site, ignore_dependencies=options.ignore_dependencies, ignore_installed=ignore_installed, ignore_requires_python=ignore_requires_python, force_reinstall=force_reinstall, upgrade_strategy=upgrade_strategy, py_version_info=py_version_info, ) def get_requirements( self, args: List[str], options: Values, finder: PackageFinder, session: PipSession, ) -> List[InstallRequirement]: """ Parse command-line arguments into the corresponding requirements. """ requirements: List[InstallRequirement] = [] for filename in options.constraints: for parsed_req in parse_requirements( filename, constraint=True, finder=finder, options=options, session=session, ): req_to_add = install_req_from_parsed_requirement( parsed_req, isolated=options.isolated_mode, user_supplied=False, ) requirements.append(req_to_add) for req in args: req_to_add = install_req_from_line( req, None, isolated=options.isolated_mode, use_pep517=options.use_pep517, user_supplied=True, ) requirements.append(req_to_add) for req in options.editables: req_to_add = install_req_from_editable( req, user_supplied=True, isolated=options.isolated_mode, use_pep517=options.use_pep517, ) requirements.append(req_to_add) # NOTE: options.require_hashes may be set if --require-hashes is True for filename in options.requirements: for parsed_req in parse_requirements( filename, finder=finder, options=options, session=session ): req_to_add = install_req_from_parsed_requirement( parsed_req, isolated=options.isolated_mode, use_pep517=options.use_pep517, user_supplied=True, ) requirements.append(req_to_add) # If any requirement has hash options, enable hash checking. if any(req.has_hash_options for req in requirements): options.require_hashes = True if not (args or options.editables or options.requirements): opts = {"name": self.name} if options.find_links: raise CommandError( "You must give at least one requirement to {name} " '(maybe you meant "pip {name} {links}"?)'.format( **dict(opts, links=" ".join(options.find_links)) ) ) else: raise CommandError( "You must give at least one requirement to {name} " '(see "pip help {name}")'.format(**opts) ) return requirements @staticmethod def trace_basic_info(finder: PackageFinder) -> None: """ Trace basic information about the provided objects. """ # Display where finder is looking for packages search_scope = finder.search_scope locations = search_scope.get_formatted_locations() if locations: logger.info(locations) def _build_package_finder( self, options: Values, session: PipSession, target_python: Optional[TargetPython] = None, ignore_requires_python: Optional[bool] = None, ) -> PackageFinder: """ Create a package finder appropriate to this requirement command. :param ignore_requires_python: Whether to ignore incompatible "Requires-Python" values in links. Defaults to False. """ link_collector = LinkCollector.create(session, options=options) selection_prefs = SelectionPreferences( allow_yanked=True, format_control=options.format_control, allow_all_prereleases=options.pre, prefer_binary=options.prefer_binary, ignore_requires_python=ignore_requires_python, ) return PackageFinder.create( link_collector=link_collector, selection_prefs=selection_prefs, target_python=target_python, ) PK )�c\ñ�$* $* 4 python3.11/site-packages/pip/_internal/cli/parser.pynu �[��� """Base option parser setup""" import logging import optparse import shutil import sys import textwrap from contextlib import suppress from typing import Any, Dict, Iterator, List, Tuple from pip._internal.cli.status_codes import UNKNOWN_ERROR from pip._internal.configuration import Configuration, ConfigurationError from pip._internal.utils.misc import redact_auth_from_url, strtobool logger = logging.getLogger(__name__) class PrettyHelpFormatter(optparse.IndentedHelpFormatter): """A prettier/less verbose help formatter for optparse.""" def __init__(self, *args: Any, **kwargs: Any) -> None: # help position must be aligned with __init__.parseopts.description kwargs["max_help_position"] = 30 kwargs["indent_increment"] = 1 kwargs["width"] = shutil.get_terminal_size()[0] - 2 super().__init__(*args, **kwargs) def format_option_strings(self, option: optparse.Option) -> str: return self._format_option_strings(option) def _format_option_strings( self, option: optparse.Option, mvarfmt: str = " <{}>", optsep: str = ", " ) -> str: """ Return a comma-separated list of option strings and metavars. :param option: tuple of (short opt, long opt), e.g: ('-f', '--format') :param mvarfmt: metavar format string :param optsep: separator """ opts = [] if option._short_opts: opts.append(option._short_opts[0]) if option._long_opts: opts.append(option._long_opts[0]) if len(opts) > 1: opts.insert(1, optsep) if option.takes_value(): assert option.dest is not None metavar = option.metavar or option.dest.lower() opts.append(mvarfmt.format(metavar.lower())) return "".join(opts) def format_heading(self, heading: str) -> str: if heading == "Options": return "" return heading + ":\n" def format_usage(self, usage: str) -> str: """ Ensure there is only one newline between usage and the first heading if there is no description. """ msg = "\nUsage: {}\n".format(self.indent_lines(textwrap.dedent(usage), " ")) return msg def format_description(self, description: str) -> str: # leave full control over description to us if description: if hasattr(self.parser, "main"): label = "Commands" else: label = "Description" # some doc strings have initial newlines, some don't description = description.lstrip("\n") # some doc strings have final newlines and spaces, some don't description = description.rstrip() # dedent, then reindent description = self.indent_lines(textwrap.dedent(description), " ") description = f"{label}:\n{description}\n" return description else: return "" def format_epilog(self, epilog: str) -> str: # leave full control over epilog to us if epilog: return epilog else: return "" def indent_lines(self, text: str, indent: str) -> str: new_lines = [indent + line for line in text.split("\n")] return "\n".join(new_lines) class UpdatingDefaultsHelpFormatter(PrettyHelpFormatter): """Custom help formatter for use in ConfigOptionParser. This is updates the defaults before expanding them, allowing them to show up correctly in the help listing. Also redact auth from url type options """ def expand_default(self, option: optparse.Option) -> str: default_values = None if self.parser is not None: assert isinstance(self.parser, ConfigOptionParser) self.parser._update_defaults(self.parser.defaults) assert option.dest is not None default_values = self.parser.defaults.get(option.dest) help_text = super().expand_default(option) if default_values and option.metavar == "URL": if isinstance(default_values, str): default_values = [default_values] # If its not a list, we should abort and just return the help text if not isinstance(default_values, list): default_values = [] for val in default_values: help_text = help_text.replace(val, redact_auth_from_url(val)) return help_text class CustomOptionParser(optparse.OptionParser): def insert_option_group( self, idx: int, *args: Any, **kwargs: Any ) -> optparse.OptionGroup: """Insert an OptionGroup at a given position.""" group = self.add_option_group(*args, **kwargs) self.option_groups.pop() self.option_groups.insert(idx, group) return group @property def option_list_all(self) -> List[optparse.Option]: """Get a list of all options, including those in option groups.""" res = self.option_list[:] for i in self.option_groups: res.extend(i.option_list) return res class ConfigOptionParser(CustomOptionParser): """Custom option parser which updates its defaults by checking the configuration files and environmental variables""" def __init__( self, *args: Any, name: str, isolated: bool = False, **kwargs: Any, ) -> None: self.name = name self.config = Configuration(isolated) assert self.name super().__init__(*args, **kwargs) def check_default(self, option: optparse.Option, key: str, val: Any) -> Any: try: return option.check_value(key, val) except optparse.OptionValueError as exc: print(f"An error occurred during configuration: {exc}") sys.exit(3) def _get_ordered_configuration_items(self) -> Iterator[Tuple[str, Any]]: # Configuration gives keys in an unordered manner. Order them. override_order = ["global", self.name, ":env:"] # Pool the options into different groups section_items: Dict[str, List[Tuple[str, Any]]] = { name: [] for name in override_order } for section_key, val in self.config.items(): # ignore empty values if not val: logger.debug( "Ignoring configuration key '%s' as it's value is empty.", section_key, ) continue section, key = section_key.split(".", 1) if section in override_order: section_items[section].append((key, val)) # Yield each group in their override order for section in override_order: for key, val in section_items[section]: yield key, val def _update_defaults(self, defaults: Dict[str, Any]) -> Dict[str, Any]: """Updates the given defaults with values from the config files and the environ. Does a little special handling for certain types of options (lists).""" # Accumulate complex default state. self.values = optparse.Values(self.defaults) late_eval = set() # Then set the options with those values for key, val in self._get_ordered_configuration_items(): # '--' because configuration supports only long names option = self.get_option("--" + key) # Ignore options not present in this parser. E.g. non-globals put # in [global] by users that want them to apply to all applicable # commands. if option is None: continue assert option.dest is not None if option.action in ("store_true", "store_false"): try: val = strtobool(val) except ValueError: self.error( "{} is not a valid value for {} option, " # noqa "please specify a boolean value like yes/no, " "true/false or 1/0 instead.".format(val, key) ) elif option.action == "count": with suppress(ValueError): val = strtobool(val) with suppress(ValueError): val = int(val) if not isinstance(val, int) or val < 0: self.error( "{} is not a valid value for {} option, " # noqa "please instead specify either a non-negative integer " "or a boolean value like yes/no or false/true " "which is equivalent to 1/0.".format(val, key) ) elif option.action == "append": val = val.split() val = [self.check_default(option, key, v) for v in val] elif option.action == "callback": assert option.callback is not None late_eval.add(option.dest) opt_str = option.get_opt_string() val = option.convert_value(opt_str, val) # From take_action args = option.callback_args or () kwargs = option.callback_kwargs or {} option.callback(option, opt_str, val, self, *args, **kwargs) else: val = self.check_default(option, key, val) defaults[option.dest] = val for key in late_eval: defaults[key] = getattr(self.values, key) self.values = None return defaults def get_default_values(self) -> optparse.Values: """Overriding to make updating the defaults after instantiation of the option parser possible, _update_defaults() does the dirty work.""" if not self.process_default_values: # Old, pre-Optik 1.5 behaviour. return optparse.Values(self.defaults) # Load the configuration, or error out in case of an error try: self.config.load() except ConfigurationError as err: self.exit(UNKNOWN_ERROR, str(err)) defaults = self._update_defaults(self.defaults.copy()) # ours for option in self._get_all_options(): assert option.dest is not None default = defaults.get(option.dest) if isinstance(default, str): opt_str = option.get_opt_string() defaults[option.dest] = option.check_value(opt_str, default) return optparse.Values(defaults) def error(self, msg: str) -> None: self.print_usage(sys.stderr) self.exit(UNKNOWN_ERROR, f"{msg}\n") PK )�c\$%��� � 6 python3.11/site-packages/pip/_internal/cli/spinners.pynu �[��� import contextlib import itertools import logging import sys import time from typing import IO, Iterator from pip._vendor.progress import HIDE_CURSOR, SHOW_CURSOR from pip._internal.utils.compat import WINDOWS from pip._internal.utils.logging import get_indentation logger = logging.getLogger(__name__) class SpinnerInterface: def spin(self) -> None: raise NotImplementedError() def finish(self, final_status: str) -> None: raise NotImplementedError() class InteractiveSpinner(SpinnerInterface): def __init__( self, message: str, file: IO[str] = None, spin_chars: str = "-\\|/", # Empirically, 8 updates/second looks nice min_update_interval_seconds: float = 0.125, ): self._message = message if file is None: file = sys.stdout self._file = file self._rate_limiter = RateLimiter(min_update_interval_seconds) self._finished = False self._spin_cycle = itertools.cycle(spin_chars) self._file.write(" " * get_indentation() + self._message + " ... ") self._width = 0 def _write(self, status: str) -> None: assert not self._finished # Erase what we wrote before by backspacing to the beginning, writing # spaces to overwrite the old text, and then backspacing again backup = "\b" * self._width self._file.write(backup + " " * self._width + backup) # Now we have a blank slate to add our status self._file.write(status) self._width = len(status) self._file.flush() self._rate_limiter.reset() def spin(self) -> None: if self._finished: return if not self._rate_limiter.ready(): return self._write(next(self._spin_cycle)) def finish(self, final_status: str) -> None: if self._finished: return self._write(final_status) self._file.write("\n") self._file.flush() self._finished = True # Used for dumb terminals, non-interactive installs (no tty), etc. # We still print updates occasionally (once every 60 seconds by default) to # act as a keep-alive for systems like Travis-CI that take lack-of-output as # an indication that a task has frozen. class NonInteractiveSpinner(SpinnerInterface): def __init__(self, message: str, min_update_interval_seconds: float = 60.0) -> None: self._message = message self._finished = False self._rate_limiter = RateLimiter(min_update_interval_seconds) self._update("started") def _update(self, status: str) -> None: assert not self._finished self._rate_limiter.reset() logger.info("%s: %s", self._message, status) def spin(self) -> None: if self._finished: return if not self._rate_limiter.ready(): return self._update("still running...") def finish(self, final_status: str) -> None: if self._finished: return self._update(f"finished with status '{final_status}'") self._finished = True class RateLimiter: def __init__(self, min_update_interval_seconds: float) -> None: self._min_update_interval_seconds = min_update_interval_seconds self._last_update: float = 0 def ready(self) -> bool: now = time.time() delta = now - self._last_update return delta >= self._min_update_interval_seconds def reset(self) -> None: self._last_update = time.time() @contextlib.contextmanager def open_spinner(message: str) -> Iterator[SpinnerInterface]: # Interactive spinner goes directly to sys.stdout rather than being routed # through the logging system, but it acts like it has level INFO, # i.e. it's only displayed if we're at level INFO or better. # Non-interactive spinner goes through the logging system, so it is always # in sync with logging configuration. if sys.stdout.isatty() and logger.getEffectiveLevel() <= logging.INFO: spinner: SpinnerInterface = InteractiveSpinner(message) else: spinner = NonInteractiveSpinner(message) try: with hidden_cursor(sys.stdout): yield spinner except KeyboardInterrupt: spinner.finish("canceled") raise except Exception: spinner.finish("error") raise else: spinner.finish("done") @contextlib.contextmanager def hidden_cursor(file: IO[str]) -> Iterator[None]: # The Windows terminal does not support the hide/show cursor ANSI codes, # even via colorama. So don't even try. if WINDOWS: yield # We don't want to clutter the output with control characters if we're # writing to a file, or if the user is running with --quiet. # See https://github.com/pypa/pip/issues/3418 elif not file.isatty() or logger.getEffectiveLevel() > logging.INFO: yield else: file.write(HIDE_CURSOR) try: yield finally: file.write(SHOW_CURSOR) PK *�c\�o6 6 9 python3.11/site-packages/pip/_internal/cli/main_parser.pynu �[��� """A single place for constructing and exposing the main parser """ import os import sys from typing import List, Tuple from pip._internal.cli import cmdoptions from pip._internal.cli.parser import ConfigOptionParser, UpdatingDefaultsHelpFormatter from pip._internal.commands import commands_dict, get_similar_commands from pip._internal.exceptions import CommandError from pip._internal.utils.misc import get_pip_version, get_prog __all__ = ["create_main_parser", "parse_command"] def create_main_parser() -> ConfigOptionParser: """Creates and returns the main parser for pip's CLI""" parser = ConfigOptionParser( usage="\n%prog <command> [options]", add_help_option=False, formatter=UpdatingDefaultsHelpFormatter(), name="global", prog=get_prog(), ) parser.disable_interspersed_args() parser.version = get_pip_version() # add the general options gen_opts = cmdoptions.make_option_group(cmdoptions.general_group, parser) parser.add_option_group(gen_opts) # so the help formatter knows parser.main = True # type: ignore # create command listing for description description = [""] + [ f"{name:27} {command_info.summary}" for name, command_info in commands_dict.items() ] parser.description = "\n".join(description) return parser def parse_command(args: List[str]) -> Tuple[str, List[str]]: parser = create_main_parser() # Note: parser calls disable_interspersed_args(), so the result of this # call is to split the initial args into the general options before the # subcommand and everything else. # For example: # args: ['--timeout=5', 'install', '--user', 'INITools'] # general_options: ['--timeout==5'] # args_else: ['install', '--user', 'INITools'] general_options, args_else = parser.parse_args(args) # --version if general_options.version: sys.stdout.write(parser.version) sys.stdout.write(os.linesep) sys.exit() # pip || pip help -> print_help() if not args_else or (args_else[0] == "help" and len(args_else) == 1): parser.print_help() sys.exit() # the subcommand name cmd_name = args_else[0] if cmd_name not in commands_dict: guess = get_similar_commands(cmd_name) msg = [f'unknown command "{cmd_name}"'] if guess: msg.append(f'maybe you meant "{guess}"') raise CommandError(" - ".join(msg)) # all the args without the subcommand cmd_args = args[:] cmd_args.remove(cmd_name) return cmd_name, cmd_args PK *�c\ f�� � = python3.11/site-packages/pip/_internal/cli/command_context.pynu �[��� from contextlib import ExitStack, contextmanager from typing import ContextManager, Iterator, TypeVar _T = TypeVar("_T", covariant=True) class CommandContextMixIn: def __init__(self) -> None: super().__init__() self._in_main_context = False self._main_context = ExitStack() @contextmanager def main_context(self) -> Iterator[None]: assert not self._in_main_context self._in_main_context = True try: with self._main_context: yield finally: self._in_main_context = False def enter_context(self, context_provider: ContextManager[_T]) -> _T: assert self._in_main_context return self._main_context.enter_context(context_provider) PK *�c\vn n : python3.11/site-packages/pip/_internal/cli/base_command.pynu �[��� """Base Command class, and related routines""" import functools import logging import logging.config import optparse import os import sys import traceback from optparse import Values from typing import Any, Callable, List, Optional, Tuple from pip._internal.cli import cmdoptions from pip._internal.cli.command_context import CommandContextMixIn from pip._internal.cli.parser import ConfigOptionParser, UpdatingDefaultsHelpFormatter from pip._internal.cli.status_codes import ( ERROR, PREVIOUS_BUILD_DIR_ERROR, UNKNOWN_ERROR, VIRTUALENV_NOT_FOUND, ) from pip._internal.exceptions import ( BadCommand, CommandError, InstallationError, NetworkConnectionError, PreviousBuildDirError, UninstallationError, ) from pip._internal.utils.filesystem import check_path_owner from pip._internal.utils.logging import BrokenStdoutLoggingError, setup_logging from pip._internal.utils.misc import get_prog, normalize_path from pip._internal.utils.temp_dir import TempDirectoryTypeRegistry as TempDirRegistry from pip._internal.utils.temp_dir import global_tempdir_manager, tempdir_registry from pip._internal.utils.virtualenv import running_under_virtualenv __all__ = ["Command"] logger = logging.getLogger(__name__) class Command(CommandContextMixIn): usage: str = "" ignore_require_venv: bool = False def __init__(self, name: str, summary: str, isolated: bool = False) -> None: super().__init__() self.name = name self.summary = summary self.parser = ConfigOptionParser( usage=self.usage, prog=f"{get_prog()} {name}", formatter=UpdatingDefaultsHelpFormatter(), add_help_option=False, name=name, description=self.__doc__, isolated=isolated, ) self.tempdir_registry: Optional[TempDirRegistry] = None # Commands should add options to this option group optgroup_name = f"{self.name.capitalize()} Options" self.cmd_opts = optparse.OptionGroup(self.parser, optgroup_name) # Add the general options gen_opts = cmdoptions.make_option_group( cmdoptions.general_group, self.parser, ) self.parser.add_option_group(gen_opts) self.add_options() def add_options(self) -> None: pass def handle_pip_version_check(self, options: Values) -> None: """ This is a no-op so that commands by default do not do the pip version check. """ # Make sure we do the pip version check if the index_group options # are present. assert not hasattr(options, "no_index") def run(self, options: Values, args: List[str]) -> int: raise NotImplementedError def parse_args(self, args: List[str]) -> Tuple[Values, List[str]]: # factored out for testability return self.parser.parse_args(args) def main(self, args: List[str]) -> int: try: with self.main_context(): return self._main(args) finally: logging.shutdown() def _main(self, args: List[str]) -> int: # We must initialize this before the tempdir manager, otherwise the # configuration would not be accessible by the time we clean up the # tempdir manager. self.tempdir_registry = self.enter_context(tempdir_registry()) # Intentionally set as early as possible so globally-managed temporary # directories are available to the rest of the code. self.enter_context(global_tempdir_manager()) options, args = self.parse_args(args) # Set verbosity so that it can be used elsewhere. self.verbosity = options.verbose - options.quiet level_number = setup_logging( verbosity=self.verbosity, no_color=options.no_color, user_log_file=options.log, ) # TODO: Try to get these passing down from the command? # without resorting to os.environ to hold these. # This also affects isolated builds and it should. if options.no_input: os.environ["PIP_NO_INPUT"] = "1" if options.exists_action: os.environ["PIP_EXISTS_ACTION"] = " ".join(options.exists_action) if options.require_venv and not self.ignore_require_venv: # If a venv is required check if it can really be found if not running_under_virtualenv(): logger.critical("Could not find an activated virtualenv (required).") sys.exit(VIRTUALENV_NOT_FOUND) if options.cache_dir: options.cache_dir = normalize_path(options.cache_dir) if not check_path_owner(options.cache_dir): logger.warning( "The directory '%s' or its parent directory is not owned " "or is not writable by the current user. The cache " "has been disabled. Check the permissions and owner of " "that directory. If executing pip with sudo, you should " "use sudo's -H flag.", options.cache_dir, ) options.cache_dir = None if "2020-resolver" in options.features_enabled: logger.warning( "--use-feature=2020-resolver no longer has any effect, " "since it is now the default dependency resolver in pip. " "This will become an error in pip 21.0." ) def intercepts_unhandled_exc( run_func: Callable[..., int] ) -> Callable[..., int]: @functools.wraps(run_func) def exc_logging_wrapper(*args: Any) -> int: try: status = run_func(*args) assert isinstance(status, int) return status except PreviousBuildDirError as exc: logger.critical(str(exc)) logger.debug("Exception information:", exc_info=True) return PREVIOUS_BUILD_DIR_ERROR except ( InstallationError, UninstallationError, BadCommand, NetworkConnectionError, ) as exc: logger.critical(str(exc)) logger.debug("Exception information:", exc_info=True) return ERROR except CommandError as exc: logger.critical("%s", exc) logger.debug("Exception information:", exc_info=True) return ERROR except BrokenStdoutLoggingError: # Bypass our logger and write any remaining messages to # stderr because stdout no longer works. print("ERROR: Pipe to stdout was broken", file=sys.stderr) if level_number <= logging.DEBUG: traceback.print_exc(file=sys.stderr) return ERROR except KeyboardInterrupt: logger.critical("Operation cancelled by user") logger.debug("Exception information:", exc_info=True) return ERROR except BaseException: logger.critical("Exception:", exc_info=True) return UNKNOWN_ERROR return exc_logging_wrapper try: if not options.debug_mode: run = intercepts_unhandled_exc(self.run) else: run = self.run return run(options, args) finally: self.handle_pip_version_check(options) PK *�c\�� (s s K python3.11/site-packages/pip/_internal/cli/__pycache__/main.cpython-311.pycnu �[��� � ��^i� � � � d Z ddlZddlZddlZddlZddlmZmZ ddlm Z ddl mZ ddlm Z ddlmZ ddlmZ ej e� � Zdd eee d efd�ZdS ) z Primary application entrypoint. � N)�List�Optional)�autocomplete)� parse_command)�create_command)�PipError)�deprecation�args�returnc � � | �t j dd � } t j � � t � � t | � � \ }}nv# t $ ri}t j � d|� �� � t j � t j � � t j d� � Y d }~nd }~ww xY w t j t j d� � n7# t j $ r%}t � d|� � Y d }~nd }~ww xY wt% |d|v �� � }|� |� � S )N� zERROR: � z%Ignoring error %s when setting localez --isolated)�isolated)�sys�argvr �install_warning_loggerr r r �stderr�write�os�linesep�exit�locale� setlocale�LC_ALL�Error�logger�debugr �main)r �cmd_name�cmd_args�exc�e�commands ��/builddir/build/BUILDROOT/alt-python311-pip-21.3.1-4.el8.x86_64/opt/alt/python311/lib/python3.11/site-packages/pip/_internal/cli/main.pyr r - sE � ��|��x����|�� �&�(�(�(��N�N�N��*�4�0�0���(�(��� � � �� ����3���)�)�)�� �����$�$�$��������������������A������+�+�+�+���<� A� A� A����<�a�@�@�@�@�@�@�@�@�����A���� �X���1I�K�K�K�G��<�<��!�!�!s0 �A � B?�AB:�:B?�C# �#D�2D�D)N)�__doc__r �loggingr r �typingr r � pip._internal.cli.autocompletionr �pip._internal.cli.main_parserr �pip._internal.commandsr �pip._internal.exceptionsr �pip._internal.utilsr � getLogger�__name__r �str�intr � � r$ �<module>r3 s� ��� � � � � � ���� � � � � � � � � !� !� !� !� !� !� !� !� 9� 9� 9� 9� 9� 9� 7� 7� 7� 7� 7� 7� 1� 1� 1� 1� 1� 1� -� -� -� -� -� -� +� +� +� +� +� +� �� �8� $� $��<"� "�x��S� �"� "�c� "� "� "� "� "� "r2 PK *�c\� s��"