MMCT TEAM
Server IP : 2a02:4780:11:1361:0:bf7:7935:10  /  Your IP : 3.17.9.170
Web Server : LiteSpeed
System : Linux in-mum-web1261.main-hosting.eu 4.18.0-553.37.1.lve.el8.x86_64 #1 SMP Mon Feb 10 22:45:17 UTC 2025 x86_64
User : u200767797 ( 200767797)
PHP Version : 8.1.31
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : OFF  |  Python : ON
Directory (0755) :  /opt/.wp-cli/../gsutil/gslib/

[  Home  ][  C0mmand  ][  Upload File  ]

Current File : //opt/.wp-cli/../gsutil/gslib/command_runner.py
# -*- coding: utf-8 -*-
# Copyright 2011 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Class that runs a named gsutil command."""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals

import difflib
import logging
import os
import pkgutil
import sys
import textwrap
import time

import six
from six.moves import input
import boto
from boto import config
from boto.storage_uri import BucketStorageUri
import gslib
from gslib import metrics
from gslib.cloud_api_delegator import CloudApiDelegator
from gslib.command import Command
from gslib.command import CreateOrGetGsutilLogger
from gslib.command import GetFailureCount
from gslib.command import OLD_ALIAS_MAP
from gslib.command import ShutDownGsutil
import gslib.commands
from gslib.cs_api_map import ApiSelector
from gslib.cs_api_map import GsutilApiClassMapFactory
from gslib.cs_api_map import GsutilApiMapFactory
from gslib.discard_messages_queue import DiscardMessagesQueue
from gslib.exception import CommandException
from gslib.gcs_json_api import GcsJsonApi
from gslib.no_op_credentials import NoOpCredentials
from gslib.tab_complete import MakeCompleter
from gslib.utils import boto_util
from gslib.utils import shim_util
from gslib.utils import system_util
from gslib.utils.constants import RELEASE_NOTES_URL
from gslib.utils.constants import UTF8
from gslib.utils.metadata_util import IsCustomMetadataHeader
from gslib.utils.parallelism_framework_util import CheckMultiprocessingAvailableAndInit
from gslib.utils.text_util import CompareVersions
from gslib.utils.text_util import InsistAsciiHeader
from gslib.utils.text_util import InsistAsciiHeaderValue
from gslib.utils.text_util import print_to_fd
from gslib.utils.unit_util import SECONDS_PER_DAY
from gslib.utils.update_util import LookUpGsutilVersion
from gslib.utils.update_util import GsutilPubTarball


def HandleHeaderCoding(headers):
  """Handles coding of headers and their values. Alters the dict in-place.

  Converts a dict of headers and their values to their appropriate types. We
  ensure that all headers and their values will contain only ASCII characters,
  with the exception of custom metadata header values; these values may contain
  Unicode characters, and thus if they are not already unicode-type objects,
  we attempt to decode them to Unicode using UTF-8 encoding.

  Args:
    headers: A dict mapping headers to their values. All keys and values must
        be either str or unicode objects.

  Raises:
    CommandException: If a header or its value cannot be encoded in the
        required encoding.
  """
  if not headers:
    return

  for key in headers:
    InsistAsciiHeader(key)
    if IsCustomMetadataHeader(key):
      if not isinstance(headers[key], six.text_type):
        try:
          headers[key] = headers[key].decode(UTF8)
        except UnicodeDecodeError:
          raise CommandException('\n'.join(
              textwrap.wrap(
                  'Invalid encoding for header value (%s: %s). Values must be '
                  'decodable as Unicode. NOTE: the value printed above '
                  'replaces the problematic characters with a hex-encoded '
                  'printable representation. For more details (including how to '
                  'convert to a gsutil-compatible encoding) see `gsutil help '
                  'encoding`.' % (repr(key), repr(headers[key])))))
    else:
      # Non-custom-metadata headers and their values must be ASCII characters.
      InsistAsciiHeaderValue(key, headers[key])


def HandleArgCoding(args):
  """Handles coding of command-line args. Alters the list in-place.

  Args:
    args: A list of command-line args.

  Raises:
    CommandException: if errors encountered.
  """
  # Python passes arguments from the command line as byte strings. To
  # correctly interpret them, we decode them as utf-8.
  for i in range(len(args)):
    arg = args[i]
    if not isinstance(arg, six.text_type):
      try:
        args[i] = arg.decode(UTF8)
      except UnicodeDecodeError:
        raise CommandException('\n'.join(
            textwrap.wrap(
                'Invalid encoding for argument (%s). Arguments must be decodable '
                'as Unicode. NOTE: the argument printed above replaces the '
                'problematic characters with a hex-encoded printable '
                'representation. For more details (including how to convert to a '
                'gsutil-compatible encoding) see `gsutil help encoding`.' %
                repr(arg))))


def _StringToSysArgType(unicode_str):
  """Converts a string literal (unicode) to the same type as sys.argv[0]."""
  # TODO(PY3-ONLY): If we remove the PY2 code branch, this method becomes
  # a no-op, so we can just remove the whole method when we move to PY3.
  if six.PY2:
    return unicode_str.encode(UTF8)
  return unicode_str


class CommandRunner(object):
  """Runs gsutil commands and does some top-level argument handling."""

  def __init__(self,
               bucket_storage_uri_class=BucketStorageUri,
               gsutil_api_class_map_factory=GsutilApiClassMapFactory,
               command_map=None):
    """Instantiates a CommandRunner.

    Args:
      bucket_storage_uri_class: Class to instantiate for cloud StorageUris.
                                Settable for testing/mocking.
      gsutil_api_class_map_factory: Creates map of cloud storage interfaces.
                                    Settable for testing/mocking.
      command_map: Map of command names to their implementations for
                   testing/mocking. If not set, the map is built dynamically.
    """
    self.bucket_storage_uri_class = bucket_storage_uri_class
    self.gsutil_api_class_map_factory = gsutil_api_class_map_factory
    if command_map:
      self.command_map = command_map
    else:
      self.command_map = self._LoadCommandMap()

  def _LoadCommandMap(self):
    """Returns dict mapping each command_name to implementing class."""
    # Import all gslib.commands submodules.
    for _, module_name, _ in pkgutil.iter_modules(gslib.commands.__path__):
      __import__('gslib.commands.%s' % module_name)

    command_map = {}
    # Only include Command subclasses in the dict.
    for command in Command.__subclasses__():
      command_map[command.command_spec.command_name] = command
      for command_name_aliases in command.command_spec.command_name_aliases:
        command_map[command_name_aliases] = command
    return command_map

  def _GetTabCompleteLogger(self):
    """Returns a logger for tab completion."""
    return CreateOrGetGsutilLogger('tab_complete')

  def _ConfigureCommandArgumentParserArguments(self, parser,
                                               subcommands_or_arguments,
                                               gsutil_api):
    """Creates parsers recursively for potentially nested subcommands.

    Args:
      parser: argparse parser object.
      subcommands_or_arguments: list of CommandArgument objects, or recursive
          dict with subcommand names as keys.
      gsutil_api: gsutil Cloud API instance to use.

    Raises:
      RuntimeError: if argument is configured with unsupported completer
      TypeError: if subcommands_or_arguments is not a dict or list

    """
    logger = self._GetTabCompleteLogger()

    def HandleList():
      for command_argument in subcommands_or_arguments:
        action = parser.add_argument(*command_argument.args,
                                     **command_argument.kwargs)
        if command_argument.completer:
          action.completer = MakeCompleter(command_argument.completer,
                                           gsutil_api)

    def HandleDict():
      subparsers = parser.add_subparsers()
      for subcommand_name, subcommand_value in subcommands_or_arguments.items():
        cur_subcommand_parser = subparsers.add_parser(subcommand_name,
                                                      add_help=False)
        logger.info(
            'Constructing argument parsers for {}'.format(subcommand_name))
        self._ConfigureCommandArgumentParserArguments(cur_subcommand_parser,
                                                      subcommand_value,
                                                      gsutil_api)

    if isinstance(subcommands_or_arguments, list):
      HandleList()
    elif isinstance(subcommands_or_arguments, dict):
      HandleDict()
    else:
      error_format = ('subcommands_or_arguments {} should be list or dict, '
                      'found type {}')
      raise TypeError(
          error_format.format(subcommands_or_arguments,
                              type(subcommands_or_arguments)))

  def GetGsutilApiForTabComplete(self):
    """Builds and returns a gsutil_api based off gsutil_api_class_map_factory.

    Returns:
      the gsutil_api instance
    """
    # This should match the support map for the "ls" command.
    support_map = {
        'gs': [ApiSelector.XML, ApiSelector.JSON],
        's3': [ApiSelector.XML]
    }
    default_map = {'gs': ApiSelector.JSON, 's3': ApiSelector.XML}
    gsutil_api_map = GsutilApiMapFactory.GetApiMap(
        self.gsutil_api_class_map_factory, support_map, default_map)

    gsutil_api = CloudApiDelegator(self.bucket_storage_uri_class,
                                   gsutil_api_map,
                                   self._GetTabCompleteLogger(),
                                   DiscardMessagesQueue(),
                                   debug=0)
    return gsutil_api

  def ConfigureCommandArgumentParsers(self, main_parser):
    """Configures argparse arguments and argcomplete completers for commands.

    Args:
      main_parser: argparse object that can be called to get subparsers to add
      subcommands (called just 'commands' in gsutil)
    """
    gsutil_api = self.GetGsutilApiForTabComplete()

    # build a dict mapping from command name to the argparse arguments.
    # this dict has values with either a recursive dictionary or a list of
    # CommandArgument objects.
    command_to_argparse_arguments = {
        command.command_spec.command_name:
            command.command_spec.argparse_arguments
        for command in self.command_map.values()
    }

    # At this point command_to_argparse_arguments looks like
    # {
    #   'retention': {
    #     'set' : `set arguments array`
    #     'get' : `set arguments array`
    #     ...
    #     'event': {
    #       'set': `event set arguments array`
    #       'release': `event release arguments array`
    #     }
    #   },
    #   ... other commands here
    # }
    #
    # Which will be passed into the helper and called recursively on the items
    # in the dict, with the base case being the arguments arrays, where the
    # arguments are added to the subparser for the lowest level command.

    self._ConfigureCommandArgumentParserArguments(
        main_parser, command_to_argparse_arguments, gsutil_api)

  def RunNamedCommand(self,
                      command_name,
                      args=None,
                      headers=None,
                      debug=0,
                      trace_token=None,
                      parallel_operations=False,
                      skip_update_check=False,
                      logging_filters=None,
                      do_shutdown=True,
                      perf_trace_token=None,
                      user_project=None,
                      collect_analytics=False):
    """Runs the named command.

    Used by gsutil main, commands built atop other commands, and tests.

    Args:
      command_name: The name of the command being run.
      args: Command-line args (arg0 = actual arg, not command name ala bash).
      headers: Dictionary containing optional HTTP headers to pass to boto.
      debug: Debug level to pass in to boto connection (range 0..3).
      trace_token: Trace token to pass to the underlying API.
      parallel_operations: Should command operations be executed in parallel?
      skip_update_check: Set to True to disable checking for gsutil updates.
      logging_filters: Optional list of logging.Filters to apply to this
          command's logger.
      do_shutdown: Stop all parallelism framework workers iff this is True.
      perf_trace_token: Performance measurement trace token to pass to the
          underlying API.
      user_project: The project to bill this request to.
      collect_analytics: Set to True to collect an analytics metric logging this
          command.

    Raises:
      CommandException: if errors encountered.

    Returns:
      Return value(s) from Command that was run.
    """
    command_changed_to_update = False
    if (not skip_update_check and
        self.MaybeCheckForAndOfferSoftwareUpdate(command_name, debug)):
      command_name = 'update'
      command_changed_to_update = True
      args = [_StringToSysArgType('-n')]

      # Check for opt-in analytics.
      if system_util.IsRunningInteractively() and collect_analytics:
        metrics.CheckAndMaybePromptForAnalyticsEnabling()

    self.MaybePromptForPythonUpdate(command_name)

    if not args:
      args = []

    # Include api_version header in all commands.
    api_version = boto.config.get_value('GSUtil', 'default_api_version', '1')
    if not headers:
      headers = {}
    headers['x-goog-api-version'] = api_version

    if command_name not in self.command_map:
      close_matches = difflib.get_close_matches(command_name,
                                                self.command_map.keys(),
                                                n=1)
      if close_matches:
        # Instead of suggesting a deprecated command alias, suggest the new
        # name for that command.
        translated_command_name = (OLD_ALIAS_MAP.get(close_matches[0],
                                                     close_matches)[0])
        print('Did you mean this?', file=sys.stderr)
        print('\t%s' % translated_command_name, file=sys.stderr)
      elif command_name == 'update' and gslib.IS_PACKAGE_INSTALL:
        sys.stderr.write(
            'Update command is not supported for package installs; '
            'please instead update using your package manager.')

      raise CommandException('Invalid command "%s".' % command_name)
    # Call str() on this string because the type of objects in `args` differ
    # on Python 2 vs 3 (bytes vs unicode), and we want to compare using the
    # same as whatever is in `args`.
    if _StringToSysArgType('--help') in args:
      new_args = [command_name]
      original_command_class = self.command_map[command_name]
      subcommands = original_command_class.help_spec.subcommand_help_text.keys()
      for arg in args:
        if arg in subcommands:
          new_args.append(arg)
          break  # Take the first match and throw away the rest.
      args = new_args
      command_name = 'help'

    HandleArgCoding(args)
    HandleHeaderCoding(headers)

    command_class = self.command_map[command_name]
    command_inst = command_class(self,
                                 args,
                                 headers,
                                 debug,
                                 trace_token,
                                 parallel_operations,
                                 self.bucket_storage_uri_class,
                                 self.gsutil_api_class_map_factory,
                                 logging_filters,
                                 command_alias_used=command_name,
                                 perf_trace_token=perf_trace_token,
                                 user_project=user_project)

    # Log the command name, command alias, and sub-options after being parsed by
    # RunCommand and the command constructor. For commands with subcommands and
    # suboptions, we need to log the suboptions again within the command itself
    # because the command constructor will not parse the suboptions fully.
    if collect_analytics:
      metrics.LogCommandParams(command_name=command_inst.command_name,
                               sub_opts=command_inst.sub_opts,
                               command_alias=command_name)

    if command_inst.translate_to_gcloud_storage_if_requested():
      # This does not mean that the gcloud storage command worked.
      # It only means that we succesfully attempted running gcloud storage.
      # The command itself might have failed.
      return_code = command_inst.run_gcloud_storage()
    else:
      # Run gsutil.
      return_code = command_inst.RunCommand()

    if CheckMultiprocessingAvailableAndInit().is_available and do_shutdown:
      ShutDownGsutil()
    if GetFailureCount() > 0:
      return_code = 1
    if command_changed_to_update:
      # If the command changed to update, the user's original command was
      # not executed.
      return_code = 1
      print('\n'.join(
          textwrap.wrap(
              'Update was successful. Exiting with code 1 as the original command '
              'issued prior to the update was not executed and should be re-run.'
          )))
    return return_code

  def SkipUpdateCheck(self):
    """Helper function that will determine if update checks should be skipped.

    Args:
      command_name: The name of the command being run.

    Returns:
      True if:
      - gsutil is not connected to a tty (e.g., if being run from cron);
      - user is running gsutil -q
      - user specified gs_host (which could be a non-production different
        service instance, in which case credentials won't work for checking
        gsutil tarball)."""
    logger = logging.getLogger()
    if (not system_util.IsRunningInteractively() or
        not logger.isEnabledFor(logging.INFO) or
        boto_util.HasUserSpecifiedGsHost()):
      return True
    return False

  def MaybePromptForPythonUpdate(self, command_name):
    """Alert the user that they should install Python 3.

    Args:
      command_name: The name of the command being run.

    Returns:
      True if a prompt was output.
    """
    if (sys.version_info.major != 2 or self.SkipUpdateCheck() or
        command_name not in ('update', 'ver', 'version') or
        boto.config.getbool('GSUtil', 'skip_python_update_prompt', False)):
      return False

    # Notify the user about Python 2 deprecation.
    print_to_fd(
        'Gsutil 5 drops Python 2 support. Please install Python 3 to update '
        'to the latest version of gsutil. https://goo.gle/py3\n')
    return True

  def MaybeCheckForAndOfferSoftwareUpdate(self, command_name, debug):
    """Checks the last time we checked for an update and offers one if needed.

    Offer is made if the time since the last update check is longer
    than the configured threshold offers the user to update gsutil.

    Args:
      command_name: The name of the command being run.
      debug: Debug level to pass in to boto connection (range 0..3).

    Returns:
      True if the user decides to update.
    """
    # Don't try to interact with user if:
    # - SkipUpdateChecks returns True.
    # - user is running the config command (which could otherwise attempt to
    #   check for an update for a user running behind a proxy, who has not yet
    #   configured gsutil to go through the proxy; for such users we need the
    #   first connection attempt to be made by the gsutil config command).
    # - user is running the version command (which gets run when using
    #   gsutil -D, which would prevent users with proxy config problems from
    #   sending us gsutil -D output).
    # - user is running the update command (which could otherwise cause an
    #   additional note that an update is available when user is already trying
    #   to perform an update);
    # - user is using a Cloud SDK install (which should only be updated via
    #   gcloud components update)
    logger = logging.getLogger()
    if (self.SkipUpdateCheck() or
        command_name in ('config', 'update', 'ver', 'version') or
        system_util.InvokedViaCloudSdk()):
      return False

    software_update_check_period = boto.config.getint(
        'GSUtil', 'software_update_check_period', 30)
    # Setting software_update_check_period to 0 means periodic software
    # update checking is disabled.
    if software_update_check_period == 0:
      return False

    last_checked_for_gsutil_update_timestamp_file = (
        boto_util.GetLastCheckedForGsutilUpdateTimestampFile())

    cur_ts = int(time.time())
    if not os.path.isfile(last_checked_for_gsutil_update_timestamp_file):
      # Set last_checked_ts from date of VERSION file, so if the user installed
      # an old copy of gsutil it will get noticed (and an update offered) the
      # first time they try to run it.
      last_checked_ts = gslib.GetGsutilVersionModifiedTime()
      with open(last_checked_for_gsutil_update_timestamp_file, 'w') as f:
        f.write(str(last_checked_ts))
    else:
      try:
        with open(last_checked_for_gsutil_update_timestamp_file, 'r') as f:
          last_checked_ts = int(f.readline())
      except (TypeError, ValueError):
        return False

    if (cur_ts - last_checked_ts
        > software_update_check_period * SECONDS_PER_DAY):
      # Create a credential-less gsutil API to check for the public
      # update tarball.
      gsutil_api = GcsJsonApi(self.bucket_storage_uri_class,
                              logger,
                              DiscardMessagesQueue(),
                              credentials=NoOpCredentials(),
                              debug=debug)

      cur_ver = gslib.VERSION
      try:
        cur_ver = LookUpGsutilVersion(gsutil_api, GsutilPubTarball())
      except Exception:
        return False

      with open(last_checked_for_gsutil_update_timestamp_file, 'w') as f:
        f.write(str(cur_ts))
      (g, m) = CompareVersions(cur_ver, gslib.VERSION)
      if m:
        print_to_fd('\n'.join(
            textwrap.wrap(
                'A newer version of gsutil (%s) is available than the version you '
                'are running (%s). NOTE: This is a major new version, so it is '
                'strongly recommended that you review the release note details at '
                '%s before updating to this version, especially if you use gsutil '
                'in scripts.' % (cur_ver, gslib.VERSION, RELEASE_NOTES_URL))))
        if gslib.IS_PACKAGE_INSTALL:
          return False
        print_to_fd('\n')
        answer = input('Would you like to update [y/N]? ')
        return answer and answer.lower()[0] == 'y'
      elif g:
        print_to_fd('\n'.join(
            textwrap.wrap(
                'A newer version of gsutil (%s) is available than the version you '
                'are running (%s). A detailed log of gsutil release changes is '
                'available at %s if you would like to read them before updating.'
                % (cur_ver, gslib.VERSION, RELEASE_NOTES_URL))))
        if gslib.IS_PACKAGE_INSTALL:
          return False
        print_to_fd('\n')
        answer = input('Would you like to update [Y/n]? ')
        return not answer or answer.lower()[0] != 'n'
    return False

MMCT - 2023