Server IP : 2a02:4780:11:1361:0:bf7:7935:10 / Your IP : 3.145.11.190 Web Server : LiteSpeed System : Linux in-mum-web1261.main-hosting.eu 4.18.0-553.37.1.lve.el8.x86_64 #1 SMP Mon Feb 10 22:45:17 UTC 2025 x86_64 User : u200767797 ( 200767797) PHP Version : 8.1.31 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : OFF | Python : ON Directory (0755) : /opt/.wp-cli/../gsutil/gslib/ |
[ Home ] | [ C0mmand ] | [ Upload File ] |
---|
# -*- coding: utf-8 -*- # Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Credentials logic for JSON CloudApi implementation.""" # This module duplicates some logic in third_party gcs_oauth2_boto_plugin # because apitools credentials lib has its own mechanisms for file-locking # and credential storage. As such, it doesn't require most of the # gcs_oauth2_boto_plugin logic. from __future__ import absolute_import from __future__ import print_function from __future__ import division from __future__ import unicode_literals import base64 import json import logging import os import io import six import traceback # pylint: disable=g-bad-import-order from apitools.base.py import credentials_lib from apitools.base.py import exceptions as apitools_exceptions from boto import config from gslib.cred_types import CredTypes from gslib.exception import CommandException from gslib.impersonation_credentials import ImpersonationCredentials from gslib.no_op_credentials import NoOpCredentials from gslib.utils import constants from gslib.utils import system_util from gslib.utils.boto_util import GetFriendlyConfigFilePaths from gslib.utils.boto_util import GetCredentialStoreFilename from gslib.utils.boto_util import GetGceCredentialCacheFilename from gslib.utils.boto_util import GetGcsJsonApiVersion from gslib.utils.constants import UTF8 from gslib.utils.wrapped_credentials import WrappedCredentials from google.auth import _helpers from google.auth.crypt import base as crypt_base from google_auth_httplib2 import AuthorizedHttp from google.oauth2 import service_account import oauth2client from oauth2client.client import HAS_CRYPTO from oauth2client.contrib import devshell from oauth2client.service_account import ServiceAccountCredentials from google_reauth import reauth_creds from oauth2client.contrib import multiprocess_file_storage from six import BytesIO DEFAULT_GOOGLE_OAUTH2_PROVIDER_AUTHORIZATION_URI = ( 'https://accounts.google.com/o/oauth2/auth') DEFAULT_GOOGLE_OAUTH2_PROVIDER_TOKEN_URI = ( 'https://oauth2.googleapis.com/token') DEFAULT_SCOPES = [ constants.Scopes.CLOUD_PLATFORM, constants.Scopes.CLOUD_PLATFORM_READ_ONLY, constants.Scopes.FULL_CONTROL, constants.Scopes.READ_ONLY, constants.Scopes.READ_WRITE, ] GOOGLE_OAUTH2_DEFAULT_FILE_PASSWORD = 'notasecret' def isP12Credentials(credentials): return isinstance(credentials, P12Credentials) class PKCS12Signer(crypt_base.Signer, crypt_base.FromServiceAccountMixin): """Signer for a p12 service account key.""" def __init__(self, key): self._key = key # Defined in crypt_base.Signer interface. # It is not used by p12 service account keys. @property def key_id(self): return None def sign(self, message): message = _helpers.to_bytes(message) from google.auth.crypt import _cryptography_rsa # pylint: disable=g-import-not-at-top return self._key.sign( message, _cryptography_rsa._PADDING, # pylint: disable=protected-access _cryptography_rsa._SHA256) # pylint: disable=protected-access @classmethod def from_string(cls, key_strings, key_id=None): del key_id #Unused key_string, password = (_helpers.to_bytes(k) for k in key_strings) # Cryptography package is not bundled with gsutil, Keeping it at top would throw error. from cryptography.hazmat.primitives.serialization import pkcs12 # pylint: disable=g-import-not-at-top try: key, _, _ = pkcs12.load_key_and_certificates(key_string, password) return cls(key) except: raise CommandException('Unable to load the keyfile, Invalid password or PKCS12 data.') class P12Credentials(service_account.Credentials): """google-auth service account credentials for p12 keys. p12 keys are not supported by the google-auth service account credentials. gsutil uses oauth2client to support p12 key users. Since oauth2client was deprecated and bundling it is security concern, we decided to support p12 in gsutil codebase. We prefer not adding it to the google-auth library because p12 is not supported from the beginning by google-auth. GCP strongly suggests users to use the JSON format. gsutil has to support it to not break users. """ _REQUIRED_FIELDS = ('service_account_email', 'token_uri', 'scopes') def authorize(self, http): return AuthorizedHttp(self, http=http) @classmethod def from_service_account_pkcs12_keystring(cls, key_string, password=None, **kwargs): password = password or GOOGLE_OAUTH2_DEFAULT_FILE_PASSWORD signer = PKCS12Signer.from_string((key_string, password)) missing_fields = [f for f in cls._REQUIRED_FIELDS if f not in kwargs] if missing_fields: raise CommandException('Missing fields: {}.'.format( ', '.join(missing_fields))) creds = cls(signer, **kwargs) return creds def CreateP12ServiceAccount(key_string, password=None, **kwargs): """Creates a service account from a p12 key and handles import errors.""" try: return P12Credentials.from_service_account_pkcs12_keystring( key_string, password, **kwargs) except ImportError: raise CommandException( ('pyca/cryptography is not available. Either install it, or please consider using the .json keyfile')) def GetCredentialStoreKey(credentials, api_version): """Disambiguates a credential for caching in a credential store. Different credential types have different fields that identify them. This function assembles relevant information in a string to be used as the key for accessing a credential. Note that in addition to uniquely identifying the entity to which a credential corresponds, we must differentiate between two or more of that entity's credentials that have different attributes such that the credentials should not be treated as interchangeable, e.g. if they target different API versions (happens for developers targeting different test environments), have different private key IDs (for service account JSON keyfiles), or target different provider token (refresh) URIs. Args: credentials: An OAuth2Credentials object. api_version: JSON API version being used. Returns: A string that can be used as the key to identify a credential, e.g. "v1-909320924072.apps.googleusercontent.com-1/rEfrEshtOkEn-https://..." """ # Note: We don't include the scopes as part of the key. For a user credential # object, we always construct it with manually added scopes that are necessary # to negotiate reauth challenges - those scopes don't necessarily correspond # to the scopes the refresh token was created with. We avoid key name # mismatches for the same refresh token by not including scopes in the key # string. key_parts = [api_version] if isinstance(credentials, devshell.DevshellCredentials): key_parts.append(credentials.user_email) elif isinstance(credentials, ServiceAccountCredentials): # pylint: disable=protected-access key_parts.append(credentials._service_account_email) if getattr(credentials, '_private_key_id', None): # JSON keyfile. # Differentiate between two different JSON keyfiles for the same service # account. key_parts.append(credentials._private_key_id) elif getattr(credentials, '_private_key_pkcs12', None): # P12 keyfile # Use a prefix of the Base64-encoded PEM string to differentiate it from # others. Using a prefix of reasonable length prevents the key from being # unnecessarily large, and the likelihood of having two PEM strings with # the same prefixes is sufficiently low. key_parts.append(base64.b64encode(credentials._private_key_pkcs12)[:20]) # pylint: enable=protected-access elif isinstance(credentials, oauth2client.client.OAuth2Credentials): if credentials.client_id and credentials.client_id != 'null': key_parts.append(credentials.client_id) else: key_parts.append('noclientid') key_parts.append(credentials.refresh_token or 'norefreshtoken') # If a cached credential is targeting provider token URI 'A' for token refresh # requests, then the user changes their boto file or private key file to # target URI 'B', we don't want to treat the cached and the new credential as # interchangeable. This applies for all credentials that store a token URI. if getattr(credentials, 'token_uri', None): key_parts.append(credentials.token_uri) key_parts = [six.ensure_text(part) for part in key_parts] return '-'.join(key_parts) def SetUpJsonCredentialsAndCache(api, logger, credentials=None): """Helper to ensure each GCS API client shares the same credentials.""" api.credentials = (credentials or _CheckAndGetCredentials(logger) or NoOpCredentials()) # Notify the user that impersonation credentials are in effect. if isinstance(api.credentials, ImpersonationCredentials): logger.warn( 'WARNING: This command is using service account impersonation. All ' 'API calls will be executed as [%s].', _GetImpersonateServiceAccount()) # Do not store p12 credentials (Supported by google-auth), as google-auth does not support storing service account credentials. if not isP12Credentials(api.credentials): # Set credential cache so that we don't have to get a new access token for # every call we make. All GCS APIs use the same credentials as the JSON API, # so we use its version in the key for caching access tokens. credential_store_key = (GetCredentialStoreKey(api.credentials, GetGcsJsonApiVersion())) api.credentials.set_store( multiprocess_file_storage.MultiprocessFileStorage( GetCredentialStoreFilename(), credential_store_key)) # The cached entry for this credential often contains more context than what # we can construct from boto config attributes (e.g. for a user credential, # the cached version might also contain a RAPT token and expiry info). # Prefer the cached credential if present. cached_cred = None if not isinstance(api.credentials, NoOpCredentials): # A NoOpCredentials object doesn't actually have a store attribute. cached_cred = api.credentials.store.get() # As of gsutil 4.31, we never use the OAuth2Credentials class for # credentials directly; rather, we use subclasses (user credentials were # the only ones left using it, but they now use # Oauth2WithReauthCredentials). If we detect that a cached credential is # an instance of OAuth2Credentials and not a subclass of it (as might # happen when transitioning to version v4.31+), we don't fetch it from the # cache. This results in our new-style credential being refreshed and # overwriting the old credential cache entry in our credstore. if (cached_cred and type(cached_cred) != oauth2client.client.OAuth2Credentials): api.credentials = cached_cred def _CheckAndGetCredentials(logger): """Returns credentials from the configuration file, if any are present. Args: logger: logging.Logger instance for outputting messages. Returns: OAuth2Credentials object if any valid ones are found, otherwise None. """ configured_cred_types = [] failed_cred_type = None try: if _HasOauth2UserAccountCreds(): configured_cred_types.append(CredTypes.OAUTH2_USER_ACCOUNT) if _HasOauth2ServiceAccountCreds(): configured_cred_types.append(CredTypes.OAUTH2_SERVICE_ACCOUNT) if len(configured_cred_types) > 1: # We only allow one set of configured credentials. Otherwise, we're # choosing one arbitrarily, which can be very confusing to the user # (e.g., if only one is authorized to perform some action) and can # also mask errors. # Because boto merges config files, GCE credentials show up by default # for GCE VMs. We don't want to fail when a user creates a boto file # with their own credentials, so in this case we'll use the OAuth2 # user credentials. raise CommandException( ('You have multiple types of configured credentials (%s), which is ' 'not supported. One common way this happens is if you run gsutil ' 'config to create credentials and later run gcloud auth, and ' 'create a second set of credentials. Your boto config path is: ' '%s. For more help, see "gsutil help creds".') % (configured_cred_types, GetFriendlyConfigFilePaths())) failed_cred_type = CredTypes.OAUTH2_USER_ACCOUNT user_creds = _GetOauth2UserAccountCredentials() failed_cred_type = CredTypes.OAUTH2_SERVICE_ACCOUNT service_account_creds = _GetOauth2ServiceAccountCredentials() failed_cred_type = CredTypes.EXTERNAL_ACCOUNT external_account_creds = _GetExternalAccountCredentials() failed_cred_type = CredTypes.EXTERNAL_ACCOUNT_AUTHORIZED_USER external_account_authorized_user_creds = _GetExternalAccountAuthorizedUserCredentials( ) failed_cred_type = CredTypes.GCE gce_creds = _GetGceCreds() failed_cred_type = CredTypes.DEVSHELL devshell_creds = _GetDevshellCreds() creds = user_creds or service_account_creds or gce_creds or external_account_creds or external_account_authorized_user_creds or devshell_creds # Use one of the above credential types to impersonate, if configured. if _HasImpersonateServiceAccount() and creds: failed_cred_type = CredTypes.IMPERSONATION return _GetImpersonationCredentials(creds, logger) else: return creds except Exception as e: # If we didn't actually try to authenticate because there were multiple # types of configured credentials, don't emit this warning. if failed_cred_type: if logger.isEnabledFor(logging.DEBUG): logger.debug(traceback.format_exc()) # If impersonation fails, show the user the actual error, since we handle # errors in iamcredentials_api. if failed_cred_type == CredTypes.IMPERSONATION: raise e elif system_util.InvokedViaCloudSdk(): logger.warn( 'Your "%s" credentials are invalid. Please run\n' ' $ gcloud auth login', failed_cred_type) else: logger.warn( 'Your "%s" credentials are invalid. For more help, see ' '"gsutil help creds", or re-run the gsutil config command (see ' '"gsutil help config").', failed_cred_type) # If there's any set of configured credentials, we'll fail if they're # invalid, rather than silently falling back to anonymous config (as # boto does). That approach leads to much confusion if users don't # realize their credentials are invalid. raise def _GetProviderTokenUri(): return config.get('OAuth2', 'provider_token_uri', DEFAULT_GOOGLE_OAUTH2_PROVIDER_TOKEN_URI) def _HasOauth2ServiceAccountCreds(): return config.has_option('Credentials', 'gs_service_key_file') def _HasOauth2UserAccountCreds(): return config.has_option('Credentials', 'gs_oauth2_refresh_token') def _HasGceCreds(): return config.has_option('GoogleCompute', 'service_account') def _HasImpersonateServiceAccount(): return _GetImpersonateServiceAccount() not in (None, '') def _GetExternalAccountCredentials(): external_account_filename = config.get('Credentials', 'gs_external_account_file', None) if not external_account_filename: return None return WrappedCredentials.for_external_account(external_account_filename) def _GetExternalAccountAuthorizedUserCredentials(): external_account_authorized_user_filename = config.get( 'Credentials', 'gs_external_account_authorized_user_file', None) if not external_account_authorized_user_filename: return None return WrappedCredentials.for_external_account_authorized_user( external_account_authorized_user_filename) def _GetImpersonateServiceAccount(): return (constants.IMPERSONATE_SERVICE_ACCOUNT or config.get( 'Credentials', 'gs_impersonate_service_account', os.environ.get('CLOUDSDK_AUTH_IMPERSONATE_SERVICE_ACCOUNT'))) def _GetOauth2ServiceAccountCredentials(): """Retrieves OAuth2 service account credentials for a private key file.""" if not _HasOauth2ServiceAccountCreds(): return provider_token_uri = _GetProviderTokenUri() service_client_id = config.get('Credentials', 'gs_service_client_id', '') private_key_filename = config.get('Credentials', 'gs_service_key_file', '') with io.open(private_key_filename, 'rb') as private_key_file: private_key = private_key_file.read() keyfile_is_utf8 = False try: private_key = private_key.decode(UTF8) # P12 keys won't be encoded as UTF8 bytes. keyfile_is_utf8 = True except UnicodeDecodeError: pass if keyfile_is_utf8: try: json_key_dict = json.loads(private_key) except ValueError: raise Exception('Could not parse JSON keyfile "%s" as valid JSON' % private_key_filename) # Key file is in JSON format. for json_entry in ('client_id', 'client_email', 'private_key_id', 'private_key'): if json_entry not in json_key_dict: raise Exception('The JSON private key file at %s ' 'did not contain the required entry: %s' % (private_key_filename, json_entry)) return ServiceAccountCredentials.from_json_keyfile_dict( json_key_dict, scopes=DEFAULT_SCOPES, token_uri=provider_token_uri) else: # Key file is in P12 format. key_file_pass = config.get('Credentials', 'gs_service_key_file_password', GOOGLE_OAUTH2_DEFAULT_FILE_PASSWORD) return CreateP12ServiceAccount(private_key, key_file_pass, scopes=DEFAULT_SCOPES, service_account_email=service_client_id, token_uri=provider_token_uri) def _GetOauth2UserAccountCredentials(): """Retrieves OAuth2 service account credentials for a refresh token.""" if not _HasOauth2UserAccountCreds(): return provider_token_uri = _GetProviderTokenUri() gsutil_client_id, gsutil_client_secret = ( system_util.GetGsutilClientIdAndSecret()) client_id = config.get('OAuth2', 'client_id', os.environ.get('OAUTH2_CLIENT_ID', gsutil_client_id)) client_secret = config.get( 'OAuth2', 'client_secret', os.environ.get('OAUTH2_CLIENT_SECRET', gsutil_client_secret)) # Note that these scopes don't necessarily correspond to the refresh token # being used. This list is is used for obtaining the RAPT in the reauth flow, # to determine which challenges should be used. scopes_for_reauth_challenge = [ constants.Scopes.CLOUD_PLATFORM, constants.Scopes.REAUTH ] return reauth_creds.Oauth2WithReauthCredentials( None, # access_token client_id, client_secret, config.get('Credentials', 'gs_oauth2_refresh_token'), None, # token_expiry provider_token_uri, None, # user_agent scopes=scopes_for_reauth_challenge) def _GetGceCreds(): if not _HasGceCreds(): return try: return credentials_lib.GceAssertionCredentials( service_account_name=config.get('GoogleCompute', 'service_account', 'default'), cache_filename=GetGceCredentialCacheFilename()) except apitools_exceptions.ResourceUnavailableError as e: if 'service account' in str(e) and 'does not exist' in str(e): return None raise def _GetDevshellCreds(): try: return devshell.DevshellCredentials() except devshell.NoDevshellServer: return None except: raise def _GetImpersonationCredentials(credentials, logger): """Retrieves temporary credentials impersonating a service account""" # We don't use impersoned credentials to impersonate. if isinstance(credentials, ImpersonationCredentials): return return ImpersonationCredentials(_GetImpersonateServiceAccount(), [constants.Scopes.CLOUD_PLATFORM], credentials, logger)