Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 69 additions & 28 deletions packages/google-auth/google/auth/compute_engine/_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import json
import logging
import os
from urllib.parse import urljoin
from urllib.parse import urljoin, urlparse

import requests

Expand Down Expand Up @@ -52,39 +52,70 @@
)


def _validate_gce_mds_configured_environment():
"""Validates the GCE metadata server environment configuration for mTLS.
def _validate_gce_mds_configured_environment(mode: _mtls.MdsMtlsMode, mds_url: str):
"""Validates that the environment is properly configured for GCE MDS if mTLS is enabled.

mTLS is only supported when connecting to the default metadata server hosts.
If we are in strict mode (which requires mTLS), ensure that the metadata host
has not been overridden to a custom value (which means mTLS will fail).

Args:
mode (_mtls.MdsMtlsMode): The mTLS mode configured for the metadata server, parsed from the GCE_METADATA_MTLS_MODE environment variable.
mds_url (str): The metadata server URL to which the request will be made.
Raises:
google.auth.exceptions.MutualTLSChannelError: if the environment
configuration is invalid for mTLS.
"""
mode = _mtls._parse_mds_mode()
if mode == _mtls.MdsMtlsMode.STRICT:
# mTLS is only supported when connecting to the default metadata host.
# Raise an exception if we are in strict mode (which requires mTLS)
# but the metadata host has been overridden to a custom MDS. (which means mTLS will fail)
if _GCE_METADATA_HOST not in _GCE_DEFAULT_MDS_HOSTS:
parsed = urlparse(mds_url)
if parsed.hostname not in _GCE_DEFAULT_MDS_HOSTS:
raise exceptions.MutualTLSChannelError(
"Mutual TLS is required, but the metadata host has been overridden. "
"mTLS is only supported when connecting to the default metadata host."
)
if parsed.scheme != "https":
raise exceptions.MutualTLSChannelError(
"Mutual TLS is required, but the metadata URL scheme is not HTTPS. "
"mTLS requires HTTPS."
)


def _get_metadata_root(use_mtls: bool):
"""Returns the metadata server root URL."""
def _get_metadata_root(
mds_mtls_mode: _mtls.MdsMtlsMode, mds_mtls_adapter_mounted: bool
) -> str:
"""Returns the metadata server root URL, with the appropriate scheme based on mTLS configuration.

Args:
mds_mtls_mode (_mtls.MdsMtlsMode): The mTLS mode configured for the metadata server, parsed from the GCE_METADATA_MTLS_MODE environment variable.
mds_mtls_adapter_mounted (bool): Whether the mTLS adapter was successfully mounted to the request's session.
Returns:
str: The metadata server root URL. The URL will use HTTPS if mTLS is enabled or required, and HTTP otherwise.
"""

scheme = "https" if use_mtls else "http"
scheme = "http"
if mds_mtls_adapter_mounted or mds_mtls_mode == _mtls.MdsMtlsMode.STRICT:
scheme = "https"
Comment on lines +98 to +100
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In DEFAULT mTLS mode, the scheme should only be set to https if the metadata host is one of the default hosts for which the mTLS adapter was mounted. If the host has been overridden to a custom value (via the GCE_METADATA_HOST environment variable), the adapter is not mounted for it, and using https will likely result in a connection failure. Checking if the host is in _GCE_DEFAULT_MDS_HOSTS ensures that https is only used when the mTLS adapter is actually applicable.

Suggested change
scheme = "http"
if mds_mtls_adapter_mounted or mds_mtls_mode == _mtls.MdsMtlsMode.STRICT:
scheme = "https"
scheme = "http"
if mds_mtls_mode == _mtls.MdsMtlsMode.STRICT or (
mds_mtls_adapter_mounted and _GCE_METADATA_HOST in _GCE_DEFAULT_MDS_HOSTS
):
scheme = "https"

return "{}://{}/computeMetadata/v1/".format(scheme, _GCE_METADATA_HOST)


def _get_metadata_ip_root(use_mtls: bool):
"""Returns the metadata server IP root URL."""
scheme = "https" if use_mtls else "http"
def _get_metadata_ip_root(
mds_mtls_mode: _mtls.MdsMtlsMode, mds_mtls_adapter_mounted: bool
) -> str:
"""Returns the metadata server IP root URL, with the appropriate scheme based on mTLS configuration.

Args:
mds_mtls_mode (_mtls.MdsMtlsMode): The mTLS mode configured for the metadata server, parsed from the GCE_METADATA_MTLS_MODE environment variable.
mds_mtls_adapter_mounted (bool): Whether the mTLS adapter was successfully mounted to the request's session.
Returns:
str: The metadata server IP root URL. The URL will use HTTPS if mTLS is enabled or required, and HTTP otherwise.
"""

scheme = "http"
if mds_mtls_adapter_mounted or mds_mtls_mode == _mtls.MdsMtlsMode.STRICT:
scheme = "https"
return "{}://{}".format(
scheme, os.getenv(environment_vars.GCE_METADATA_IP, _GCE_DEFAULT_MDS_IP)
)
Comment on lines +116 to 121
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to _get_metadata_root, we should only use https in DEFAULT mode if the metadata IP is one of the default hosts. If GCE_METADATA_IP is overridden to a custom value, the code should fall back to http (unless in STRICT mode) because the mTLS adapter is not mounted for custom IPs. This suggestion also consolidates the os.getenv call for efficiency.

Suggested change
scheme = "http"
if mds_mtls_adapter_mounted or mds_mtls_mode == _mtls.MdsMtlsMode.STRICT:
scheme = "https"
return "{}://{}".format(
scheme, os.getenv(environment_vars.GCE_METADATA_IP, _GCE_DEFAULT_MDS_IP)
)
mds_ip = os.getenv(environment_vars.GCE_METADATA_IP, _GCE_DEFAULT_MDS_IP)
scheme = "http"
if mds_mtls_mode == _mtls.MdsMtlsMode.STRICT or (
mds_mtls_adapter_mounted and mds_ip in _GCE_DEFAULT_MDS_HOSTS
):
scheme = "https"
return "{}://{}".format(scheme, mds_ip)

Expand Down Expand Up @@ -159,30 +190,38 @@ def detect_gce_residency_linux():
return content.startswith(_GOOGLE)


def _prepare_request_for_mds(request, use_mtls=False) -> None:
"""Prepares a request for the metadata server.

This will check if mTLS should be used and mount the mTLS adapter if needed.
def _try_mount_mds_mtls_adapter(request, mode: _mtls.MdsMtlsMode) -> bool:
"""Tries to mount the mTLS adapter to the request's session if mTLS is enabled and certificates are present.

Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests. If mTLS is enabled, and the request supports sessions,
the request will have the mTLS adapter mounted. Otherwise, there
will be no change.
use_mtls (bool): Whether to use mTLS for the request.
mode (_mtls.MdsMtlsMode): The mTLS mode configured for the metadata server, parsed from the GCE_METADATA_MTLS_MODE environment variable.
Returns:
bool: True if the mTLS adapter was mounted, False otherwise.
"""

mds_mtls_config = _mtls.MdsMtlsConfig()
should_mount_adapter = _mtls.should_use_mds_mtls(
mode, mds_mtls_config=mds_mtls_config
)

"""
# Only modify the request if mTLS is enabled, and request supports sessions.
if use_mtls and hasattr(request, "session"):
mds_mtls_adapter_mounted = False
if should_mount_adapter and hasattr(request, "session"):
# Ensure the request has a session to mount the adapter to.
if not request.session:
request.session = requests.Session()

adapter = _mtls.MdsMtlsAdapter()
adapter = _mtls.MdsMtlsAdapter(mds_mtls_config=mds_mtls_config)
# Mount the adapter for all default GCE metadata hosts.
for host in _GCE_DEFAULT_MDS_HOSTS:
request.session.mount(f"https://{host}/", adapter)
mds_mtls_adapter_mounted = True

return mds_mtls_adapter_mounted


def ping(
Expand All @@ -200,8 +239,12 @@ def ping(
Returns:
bool: True if the metadata server is reachable, False otherwise.
"""
use_mtls = _mtls.should_use_mds_mtls()
_prepare_request_for_mds(request, use_mtls=use_mtls)
mds_mtls_mode = _mtls._parse_mds_mode()
mds_mtls_adapter_mounted = _try_mount_mds_mtls_adapter(request, mds_mtls_mode)

metadata_ip_root = _get_metadata_ip_root(mds_mtls_mode, mds_mtls_adapter_mounted)
_validate_gce_mds_configured_environment(mds_mtls_mode, metadata_ip_root)

# NOTE: The explicit ``timeout`` is a workaround. The underlying
# issue is that resolving an unknown host on some networks will take
# 20-30 seconds; making this timeout short fixes the issue, but
Expand All @@ -216,7 +259,7 @@ def ping(
for attempt in backoff:
try:
response = request(
url=_get_metadata_ip_root(use_mtls),
url=metadata_ip_root,
method="GET",
headers=headers,
timeout=timeout,
Expand Down Expand Up @@ -285,18 +328,16 @@ def get(
has been overridden in strict mTLS mode).

"""
use_mtls = _mtls.should_use_mds_mtls()
# Prepare the request object for mTLS if needed.
# This will create a new request object with the mTLS session.
_prepare_request_for_mds(request, use_mtls=use_mtls)
mds_mtls_mode = _mtls._parse_mds_mode()
mds_mtls_adapter_mounted = _try_mount_mds_mtls_adapter(request, mds_mtls_mode)

if root is None:
root = _get_metadata_root(use_mtls)
root = _get_metadata_root(mds_mtls_mode, mds_mtls_adapter_mounted)

# mTLS is only supported when connecting to the default metadata host.
# If we are in strict mode (which requires mTLS), ensure that the metadata host
# has not been overridden to a non-default host value (which means mTLS will fail).
_validate_gce_mds_configured_environment()
_validate_gce_mds_configured_environment(mds_mtls_mode, root)

base_url = urljoin(root, path)
query_params = {} if params is None else params
Expand Down
40 changes: 31 additions & 9 deletions packages/google-auth/google/auth/compute_engine/_mtls.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,16 @@ class MdsMtlsConfig:
) # path to file containing client certificate and key


def _certs_exist(mds_mtls_config: MdsMtlsConfig):
"""Checks if the mTLS certificates exist."""
def mds_mtls_certificates_exist(mds_mtls_config: MdsMtlsConfig):
"""Checks if the mTLS certificates exist.

Args:
mds_mtls_config (MdsMtlsConfig): The mTLS configuration containing the
paths to the CA and client certificates.

Returns:
bool: True if both certificates exist, False otherwise.
"""
return os.path.exists(mds_mtls_config.ca_cert_path) and os.path.exists(
mds_mtls_config.client_combined_cert_path
)
Expand Down Expand Up @@ -98,19 +106,33 @@ def _parse_mds_mode():
)


def should_use_mds_mtls(mds_mtls_config: MdsMtlsConfig = MdsMtlsConfig()):
"""Determines if mTLS should be used for the metadata server."""
mode = _parse_mds_mode()
def should_use_mds_mtls(
mode: MdsMtlsMode, mds_mtls_config: MdsMtlsConfig = MdsMtlsConfig()
) -> bool:
"""Determines if mTLS should be used for the metadata server.

Args:
mode (MdsMtlsMode): The mTLS mode configured for the metadata server,
parsed from the GCE_METADATA_MTLS_MODE environment variable.
mds_mtls_config (MdsMtlsConfig): The mTLS configuration containing the
paths to the CA and client certificates.

Returns:
bool: True if mTLS should be used, False otherwise.

Raises:
google.auth.exceptions.MutualTLSChannelError: if mTLS is required (STRICT mode)
but certificates are missing.
"""
if mode == MdsMtlsMode.STRICT:
if not _certs_exist(mds_mtls_config):
if not mds_mtls_certificates_exist(mds_mtls_config):
raise exceptions.MutualTLSChannelError(
"mTLS certificates not found in strict mode."
)
return True
elif mode == MdsMtlsMode.NONE:
if mode == MdsMtlsMode.NONE:
return False
else: # Default mode
return _certs_exist(mds_mtls_config)
return mds_mtls_certificates_exist(mds_mtls_config)


class MdsMtlsAdapter(HTTPAdapter):
Expand Down
Loading
Loading