Skip to content
243 changes: 243 additions & 0 deletions google/auth/_agent_identity_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers for Agent Identity credentials."""

import base64
import hashlib
import logging
import os
import re
import time
from urllib.parse import urlparse

from google.auth import environment_vars
from google.auth import exceptions


_LOGGER = logging.getLogger(__name__)

CRYPTOGRAPHY_NOT_FOUND_ERROR = (
"The cryptography library is required for certificate-based authentication."
"Please install it with `pip install google-auth[cryptography]`."
)

# SPIFFE trust domain patterns for Agent Identities.
_AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS = [
r"^agents\.global\.org-\d+\.system\.id\.goog$",
r"^agents\.global\.proj-\d+\.system\.id\.goog$",
]

# Constants for polling the certificate file.
_FAST_POLL_CYCLES = 50
_FAST_POLL_INTERVAL = 0.1 # 100ms
_SLOW_POLL_INTERVAL = 0.5 # 500ms
_TOTAL_TIMEOUT = 30 # seconds

# Calculate the number of slow poll cycles based on the total timeout.
_SLOW_POLL_CYCLES = int(
(_TOTAL_TIMEOUT - (_FAST_POLL_CYCLES * _FAST_POLL_INTERVAL)) / _SLOW_POLL_INTERVAL
)

_POLLING_INTERVALS = ([_FAST_POLL_INTERVAL] * _FAST_POLL_CYCLES) + (
[_SLOW_POLL_INTERVAL] * _SLOW_POLL_CYCLES
)


def get_agent_identity_certificate_path():
"""Gets the certificate path from the certificate config file.

The path to the certificate config file is read from the
GOOGLE_API_CERTIFICATE_CONFIG environment variable. This function
implements a retry mechanism to handle cases where the environment
variable is set before the files are available on the filesystem.

Returns:
str: The path to the leaf certificate file.

Raises:
google.auth.exceptions.RefreshError: If the certificate config file
or the certificate file cannot be found after retries.
"""
import json

cert_config_path = os.environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG)
if not cert_config_path:
return None

has_logged_warning = False

for interval in _POLLING_INTERVALS:
try:
with open(cert_config_path, "r") as f:
cert_config = json.load(f)
cert_path = (
cert_config.get("cert_configs", {})
.get("workload", {})
.get("cert_path")
)
if cert_path and os.path.exists(cert_path):
return cert_path
except (IOError, ValueError, KeyError):
if not has_logged_warning:
_LOGGER.warning(
"Certificate config file not found at %s (from %s environment "
"variable). Retrying for up to %s seconds.",
cert_config_path,
environment_vars.GOOGLE_API_CERTIFICATE_CONFIG,
_TOTAL_TIMEOUT,
)
has_logged_warning = True
pass

# A sleep is required in two cases:
# 1. The config file is not found (the except block).
# 2. The config file is found, but the certificate is not yet available.
# In both cases, we need to poll, so we sleep on every iteration
# that doesn't return a certificate.
time.sleep(interval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't matter at all, but I think the sleep could be inside the except, to remove that pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I've kept the time.sleep() outside the except block to handle two
distinct scenarios while polling for the certificate:

  1. The certificate configuration file (GOOGLE_API_CERTIFICATE_CONFIG) is not yet available. This
    triggers the except block.
  2. The configuration file is available, but the certificate file path specified inside it is not
    yet on the filesystem. This case does not trigger an exception.

In both scenarios, we need to pause and retry. If we move the sleep into the except block, there won't be any pause and sleep for the second scenario would .

I've added a comment to the code to make this polling logic clearer for future developers.


raise exceptions.RefreshError(
"Certificate config or certificate file not found after multiple retries. "
f"Token binding protection is failing. You can turn off this protection by setting "
f"{environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES} to false "
"to fall back to unbound tokens."
)


def get_and_parse_agent_identity_certificate():
"""Gets and parses the agent identity certificate if not opted out.

Checks if the user has opted out of certificate-bound tokens. If not,
it gets the certificate path, reads the file, and parses it.

Returns:
The parsed certificate object if found and not opted out, otherwise None.
"""
# If the user has opted out of cert bound tokens, there is no need to
# look up the certificate.
is_opted_out = (
os.environ.get(
environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
"true",
).lower()
== "false"
)
if is_opted_out:
return None

cert_path = get_agent_identity_certificate_path()
if not cert_path:
return None

with open(cert_path, "rb") as cert_file:
cert_bytes = cert_file.read()

return parse_certificate(cert_bytes)


def parse_certificate(cert_bytes):
"""Parses a PEM-encoded certificate.

Args:
cert_bytes (bytes): The PEM-encoded certificate bytes.

Returns:
cryptography.x509.Certificate: The parsed certificate object.
"""
try:
from cryptography import x509

return x509.load_pem_x509_certificate(cert_bytes)
except ImportError as e:
raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e


def _is_agent_identity_certificate(cert):
"""Checks if a certificate is an Agent Identity certificate.

This is determined by checking the Subject Alternative Name (SAN) for a
SPIFFE ID with a trust domain matching Agent Identity patterns.

Args:
cert (cryptography.x509.Certificate): The parsed certificate object.

Returns:
bool: True if the certificate is an Agent Identity certificate,
False otherwise.
"""
try:
from cryptography import x509
from cryptography.x509.oid import ExtensionOID

try:
ext = cert.extensions.get_extension_for_oid(
ExtensionOID.SUBJECT_ALTERNATIVE_NAME
)
except x509.ExtensionNotFound:
return False
uris = ext.value.get_values_for_type(x509.UniformResourceIdentifier)

for uri in uris:
parsed_uri = urlparse(uri)
if parsed_uri.scheme == "spiffe":
trust_domain = parsed_uri.netloc
for pattern in _AGENT_IDENTITY_SPIFFE_TRUST_DOMAIN_PATTERNS:
if re.match(pattern, trust_domain):
return True
return False
except ImportError as e:
raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e


def calculate_certificate_fingerprint(cert):
"""Calculates the base64-encoded SHA256 hash of a DER-encoded certificate.

Args:
cert (cryptography.x509.Certificate): The parsed certificate object.

Returns:
str: The base64-encoded SHA256 fingerprint.
"""
try:
from cryptography.hazmat.primitives import serialization

der_cert = cert.public_bytes(serialization.Encoding.DER)
fingerprint = hashlib.sha256(der_cert).digest()
return base64.urlsafe_b64encode(fingerprint).rstrip(b"=").decode("utf-8")
except ImportError as e:
raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e


def should_request_bound_token(cert):
"""Determines if a bound token should be requested.

This is based on the GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES
environment variable and whether the certificate is an agent identity cert.

Args:
cert (cryptography.x509.Certificate): The parsed certificate object.

Returns:
bool: True if a bound token should be requested, False otherwise.
"""
is_agent_cert = _is_agent_identity_certificate(cert)
is_opted_in = (
os.environ.get(
environment_vars.GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES,
"true",
).lower()
== "true"
)
return is_agent_cert and is_opted_in
2 changes: 1 addition & 1 deletion google/auth/_oauth2client.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def _convert_appengine_app_assertion_credentials(credentials):
oauth2client.contrib.gce.AppAssertionCredentials: _convert_gce_app_assertion_credentials,
}

if _HAS_APPENGINE:
if _HAS_APPENGINE: # pragma: no cover
_CLASS_CONVERSION_MAP[
oauth2client.contrib.appengine.AppAssertionCredentials
] = _convert_appengine_app_assertion_credentials
Expand Down
13 changes: 10 additions & 3 deletions google/auth/compute_engine/_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,12 +360,19 @@ def get_service_account_token(request, service_account="default", scopes=None):
google.auth.exceptions.TransportError: if an error occurred while
retrieving metadata.
"""
from google.auth import _agent_identity_utils

params = {}
if scopes:
if not isinstance(scopes, str):
scopes = ",".join(scopes)
params = {"scopes": scopes}
else:
params = None
params["scopes"] = scopes

cert = _agent_identity_utils.get_and_parse_agent_identity_certificate()
if cert:
if _agent_identity_utils.should_request_bound_token(cert):
fingerprint = _agent_identity_utils.calculate_certificate_fingerprint(cert)
params["bindCertificateFingerprint"] = fingerprint

metrics_header = {
metrics.API_CLIENT_HEADER: metrics.token_request_access_token_mds()
Expand Down
2 changes: 1 addition & 1 deletion google/auth/compute_engine/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,9 @@ def _refresh_token(self, request):
service can't be reached if if the instance has not
credentials.
"""
scopes = self._scopes if self._scopes is not None else self._default_scopes
try:
self._retrieve_info(request)
scopes = self._scopes if self._scopes is not None else self._default_scopes
# Always fetch token with default service account email.
self.token, self.expiry = _metadata.get_service_account_token(
request, service_account="default", scopes=scopes
Expand Down
9 changes: 9 additions & 0 deletions google/auth/environment_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,12 @@
GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED = "GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED"
"""Environment variable controlling whether to enable trust boundary feature.
The default value is false. Users have to explicitly set this value to true."""

GOOGLE_API_CERTIFICATE_CONFIG = "GOOGLE_API_CERTIFICATE_CONFIG"
"""Environment variable defining the location of Google API certificate config
file."""

GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES = (
"GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES"
)
"""Environment variable to prevent agent token sharing for GCP services."""
15 changes: 11 additions & 4 deletions google/auth/external_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,9 @@ def refresh(self, request):
credentials, it will refresh the access token and the trust boundary.
"""
self._refresh_token(request)
self._handle_trust_boundary(request)

def _handle_trust_boundary(self, request):
# If we are impersonating, the trust boundary is handled by the
# impersonated credentials object. We need to get it from there.
if self._service_account_impersonation_url:
Expand All @@ -428,7 +431,7 @@ def refresh(self, request):
# Otherwise, refresh the trust boundary for the external account.
self._refresh_trust_boundary(request)

def _refresh_token(self, request):
def _refresh_token(self, request, cert_fingerprint=None):
scopes = self._scopes if self._scopes is not None else self._default_scopes

# Inject client certificate into request.
Expand All @@ -446,11 +449,15 @@ def _refresh_token(self, request):
self.expiry = self._impersonated_credentials.expiry
else:
now = _helpers.utcnow()
additional_options = None
additional_options = {}
# Do not pass workforce_pool_user_project when client authentication
# is used. The client ID is sufficient for determining the user project.
if self._workforce_pool_user_project and not self._client_id:
additional_options = {"userProject": self._workforce_pool_user_project}
additional_options["userProject"] = self._workforce_pool_user_project

if cert_fingerprint:
additional_options["bindCertFingerprint"] = cert_fingerprint

additional_headers = {
metrics.API_CLIENT_HEADER: metrics.byoid_metrics_header(
self._metrics_options
Expand All @@ -464,7 +471,7 @@ def _refresh_token(self, request):
audience=self._audience,
scopes=scopes,
requested_token_type=_STS_REQUESTED_TOKEN_TYPE,
additional_options=additional_options,
additional_options=additional_options if additional_options else None,
additional_headers=additional_headers,
)
self.token = response_data.get("access_token")
Expand Down
22 changes: 22 additions & 0 deletions google/auth/identity_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,3 +550,25 @@ def from_file(cls, filename, **kwargs):
credentials.
"""
return super(Credentials, cls).from_file(filename, **kwargs)

def refresh(self, request):
"""Refreshes the access token.

Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.
"""
from google.auth import _agent_identity_utils

cert_fingerprint = None
# Check if the credential is X.509 based.
if self._credential_source_certificate is not None:
cert_bytes = self._get_cert_bytes()
cert = _agent_identity_utils.parse_certificate(cert_bytes)
if _agent_identity_utils.should_request_bound_token(cert):
cert_fingerprint = _agent_identity_utils.calculate_certificate_fingerprint(
cert
)

self._refresh_token(request, cert_fingerprint=cert_fingerprint)
self._handle_trust_boundary(request)
4 changes: 2 additions & 2 deletions google/auth/transport/_mtls_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import re
import subprocess

from google.auth import environment_vars
from google.auth import exceptions

CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json"
CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json"
_CERTIFICATE_CONFIGURATION_ENV = "GOOGLE_API_CERTIFICATE_CONFIG"
_CERT_PROVIDER_COMMAND = "cert_provider_command"
_CERT_REGEX = re.compile(
b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL
Expand Down Expand Up @@ -132,7 +132,7 @@ def _get_cert_config_path(certificate_config_path=None):
"""

if certificate_config_path is None:
env_path = environ.get(_CERTIFICATE_CONFIGURATION_ENV, None)
env_path = environ.get(environment_vars.GOOGLE_API_CERTIFICATE_CONFIG, None)
if env_path is not None and env_path != "":
certificate_config_path = env_path
else:
Expand Down
2 changes: 1 addition & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[mypy]
python_version = 3.7
python_version = 3.9
namespace_packages = True
Loading