Source code for multistorageclient.config

   1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
   2# SPDX-License-Identifier: Apache-2.0
   3#
   4# Licensed under the Apache License, Version 2.0 (the "License");
   5# you may not use this file except in compliance with the License.
   6# You may obtain a copy of the License at
   7#
   8# http://www.apache.org/licenses/LICENSE-2.0
   9#
  10# Unless required by applicable law or agreed to in writing, software
  11# distributed under the License is distributed on an "AS IS" BASIS,
  12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13# See the License for the specific language governing permissions and
  14# limitations under the License.
  15
  16import json
  17import logging
  18import os
  19import tempfile
  20from collections import defaultdict
  21from collections.abc import Sequence
  22from multiprocessing import current_process
  23from pathlib import Path
  24from typing import Any, Optional
  25from urllib.parse import urlparse
  26
  27import opentelemetry.metrics as api_metrics
  28import yaml
  29
  30from .cache import DEFAULT_CACHE_SIZE, CacheManager
  31from .caching.cache_config import CacheConfig, EvictionPolicyConfig
  32from .instrumentation import setup_opentelemetry
  33from .providers.manifest_metadata import ManifestMetadataProvider
  34from .rclone import read_rclone_config
  35from .schema import validate_config
  36from .telemetry import Telemetry, TelemetryMode
  37from .telemetry import init as telemetry_init
  38from .telemetry.attributes.base import AttributesProvider
  39from .types import (
  40    DEFAULT_RETRY_ATTEMPTS,
  41    DEFAULT_RETRY_DELAY,
  42    MSC_PROTOCOL,
  43    AutoCommitConfig,
  44    CredentialsProvider,
  45    MetadataProvider,
  46    ProviderBundle,
  47    Replica,
  48    RetryConfig,
  49    StorageProvider,
  50    StorageProviderConfig,
  51)
  52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
  53
  54# Constants related to implicit profiles
  55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
  56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
  57    "s3": "s3",
  58    "gs": "gcs",
  59    "ais": "ais",
  60    "file": "file",
  61}
  62
  63_TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING = {
  64    "environment_variables": "multistorageclient.telemetry.attributes.environment_variables.EnvironmentVariablesAttributesProvider",
  65    "host": "multistorageclient.telemetry.attributes.host.HostAttributesProvider",
  66    "msc_config": "multistorageclient.telemetry.attributes.msc_config.MSCConfigAttributesProvider",
  67    "process": "multistorageclient.telemetry.attributes.process.ProcessAttributesProvider",
  68    "static": "multistorageclient.telemetry.attributes.static.StaticAttributesProvider",
  69    "thread": "multistorageclient.telemetry.attributes.thread.ThreadAttributesProvider",
  70}
  71
  72
  73# Template for creating implicit profile configurations
  74def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
  75    """
  76    Create a configuration dictionary for an implicit profile.
  77
  78    :param profile_name: The name of the profile (e.g., "_s3-bucket1")
  79    :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
  80    :param base_path: The base path (e.g., bucket name) for the storage provider
  81
  82    :return: A configuration dictionary for the implicit profile
  83    """
  84    provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
  85    return {
  86        "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
  87    }
  88
  89
  90DEFAULT_POSIX_PROFILE_NAME = "default"
  91DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
  92
  93STORAGE_PROVIDER_MAPPING = {
  94    "file": "PosixFileStorageProvider",
  95    "s3": "S3StorageProvider",
  96    "gcs": "GoogleStorageProvider",
  97    "oci": "OracleStorageProvider",
  98    "azure": "AzureBlobStorageProvider",
  99    "ais": "AIStoreStorageProvider",
 100    "s8k": "S8KStorageProvider",
 101    "gcs_s3": "GoogleS3StorageProvider",
 102}
 103
 104CREDENTIALS_PROVIDER_MAPPING = {
 105    "S3Credentials": "StaticS3CredentialsProvider",
 106    "AzureCredentials": "StaticAzureCredentialsProvider",
 107    "AISCredentials": "StaticAISCredentialProvider",
 108    "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
 109}
 110
 111
 112def _find_config_file_paths():
 113    """
 114    Get configuration file search paths.
 115
 116    Returns paths in order of precedence:
 117
 118    1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
 119    2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
 120    """
 121    paths = []
 122
 123    # 1. User-specific configuration directory
 124    xdg_config_home = os.getenv("XDG_CONFIG_HOME")
 125
 126    if xdg_config_home:
 127        paths.extend(
 128            [
 129                os.path.join(xdg_config_home, "msc", "config.yaml"),
 130                os.path.join(xdg_config_home, "msc", "config.json"),
 131            ]
 132        )
 133
 134    user_home = os.getenv("HOME")
 135
 136    if user_home:
 137        paths.extend(
 138            [
 139                os.path.join(user_home, ".msc_config.yaml"),
 140                os.path.join(user_home, ".msc_config.json"),
 141                os.path.join(user_home, ".config", "msc", "config.yaml"),
 142                os.path.join(user_home, ".config", "msc", "config.json"),
 143            ]
 144        )
 145
 146    # 2. System-wide configuration directories
 147    xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
 148    if not xdg_config_dirs:
 149        xdg_config_dirs = "/etc/xdg"
 150
 151    for config_dir in xdg_config_dirs.split(":"):
 152        config_dir = config_dir.strip()
 153        if config_dir:
 154            paths.extend(
 155                [
 156                    os.path.join(config_dir, "msc", "config.yaml"),
 157                    os.path.join(config_dir, "msc", "config.json"),
 158                ]
 159            )
 160
 161    paths.extend(
 162        [
 163            "/etc/msc_config.yaml",
 164            "/etc/msc_config.json",
 165        ]
 166    )
 167
 168    return tuple(paths)
 169
 170
 171PACKAGE_NAME = "multistorageclient"
 172
 173logger = logging.getLogger(__name__)
 174
 175
 176class SimpleProviderBundle(ProviderBundle):
 177    def __init__(
 178        self,
 179        storage_provider_config: StorageProviderConfig,
 180        credentials_provider: Optional[CredentialsProvider] = None,
 181        metadata_provider: Optional[MetadataProvider] = None,
 182        replicas: Optional[list[Replica]] = None,
 183    ):
 184        if replicas is None:
 185            replicas = []
 186
 187        self._storage_provider_config = storage_provider_config
 188        self._credentials_provider = credentials_provider
 189        self._metadata_provider = metadata_provider
 190        self._replicas = replicas
 191
 192    @property
 193    def storage_provider_config(self) -> StorageProviderConfig:
 194        return self._storage_provider_config
 195
 196    @property
 197    def credentials_provider(self) -> Optional[CredentialsProvider]:
 198        return self._credentials_provider
 199
 200    @property
 201    def metadata_provider(self) -> Optional[MetadataProvider]:
 202        return self._metadata_provider
 203
 204    @property
 205    def replicas(self) -> list[Replica]:
 206        return self._replicas
 207
 208
 209DEFAULT_CACHE_REFRESH_INTERVAL = 300
 210
 211
 212class StorageClientConfigLoader:
 213    _provider_bundle: Optional[ProviderBundle]
 214    _profiles: dict[str, Any]
 215    _profile: str
 216    _profile_dict: dict[str, Any]
 217    _opentelemetry_dict: Optional[dict[str, Any]]
 218    _metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]]
 219    _metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]]
 220    _metric_attributes_providers: Optional[Sequence[AttributesProvider]]
 221    _cache_dict: Optional[dict[str, Any]]
 222
 223    def __init__(
 224        self,
 225        config_dict: dict[str, Any],
 226        profile: str = DEFAULT_POSIX_PROFILE_NAME,
 227        provider_bundle: Optional[ProviderBundle] = None,
 228        telemetry: Optional[Telemetry] = None,
 229        metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] = None,
 230        metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] = None,
 231        metric_attributes_providers: Optional[Sequence[AttributesProvider]] = None,
 232    ) -> None:
 233        """
 234        Initializes a :py:class:`StorageClientConfigLoader` to create a
 235        StorageClientConfig. Components are built using the ``config_dict`` and
 236        profile, but a pre-built provider_bundle takes precedence.
 237
 238        :param config_dict: Dictionary of configuration options.
 239        :param profile: Name of profile in ``config_dict`` to use to build configuration.
 240        :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle`, takes precedence over ``config_dict``.
 241        :param telemetry: Optional telemetry instance to use, takes precedence over ``metric_gauges``, ``metric_counters``, and ``metric_attributes_providers``.
 242        :param metric_gauges: Optional metric gauges to use.
 243        :param metric_counters: Optional metric counters to use.
 244        :param metric_attributes_providers: Optional metric attributes providers to use.
 245        """
 246        # ProviderBundle takes precedence
 247        self._provider_bundle = provider_bundle
 248
 249        # Interpolates all environment variables into actual values.
 250        config_dict = expand_env_vars(config_dict)
 251
 252        self._profiles = config_dict.get("profiles", {})
 253
 254        if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
 255            # Assign the default POSIX profile
 256            self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
 257        else:
 258            # Cannot override default POSIX profile
 259            storage_provider_type = (
 260                self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
 261            )
 262            if storage_provider_type != "file":
 263                raise ValueError(
 264                    f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
 265                    f'"{storage_provider_type}"; expected "file".'
 266                )
 267
 268        profile_dict = self._profiles.get(profile)
 269
 270        if not profile_dict:
 271            raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
 272
 273        self._profile = profile
 274        self._profile_dict = profile_dict
 275
 276        self._opentelemetry_dict = config_dict.get("opentelemetry", None)
 277
 278        self._metric_gauges = metric_gauges
 279        self._metric_counters = metric_counters
 280        self._metric_attributes_providers = metric_attributes_providers
 281        if self._opentelemetry_dict is not None:
 282            # Try to create a telemetry instance only if no telemetry instance or OpenTelemetry instruments are provided (e.g. when unpickling).
 283            #
 284            # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
 285            # New processes (e.g. multiprocessing manager server) can't be created during this phase.
 286            if telemetry is None and not any(
 287                (self._metric_gauges, self._metric_counters, self._metric_attributes_providers)
 288            ):
 289                try:
 290                    # Try to create a telemetry instance with the default mode heuristic.
 291                    telemetry = telemetry_init()
 292                except (AssertionError, OSError, RuntimeError, ValueError):
 293                    try:
 294                        if current_process().daemon:
 295                            # Daemons can't have child processes.
 296                            #
 297                            # Try to create a telemetry instance in local mode.
 298                            #
 299                            # ⚠️ This may cause CPU contention if the current process is compute-intensive
 300                            # and a high collect and/or export frequency is used due to global interpreter lock (GIL).
 301                            telemetry = telemetry_init(mode=TelemetryMode.LOCAL)
 302                        else:
 303                            # Try to create a telemetry instance in server mode.
 304                            telemetry = telemetry_init(mode=TelemetryMode.SERVER)
 305                    except (AssertionError, OSError, RuntimeError, ValueError):
 306                        # Don't throw on telemetry init failures.
 307                        logger.error("Failed to automatically create telemetry instance!", exc_info=True)
 308
 309            if "metrics" in self._opentelemetry_dict:
 310                if telemetry is not None:
 311                    self._metric_gauges = {}
 312                    for name in Telemetry.GaugeName:
 313                        gauge = telemetry.gauge(config=self._opentelemetry_dict["metrics"], name=name)
 314                        if gauge is not None:
 315                            self._metric_gauges[name] = gauge
 316                    self._metric_counters = {}
 317                    for name in Telemetry.CounterName:
 318                        counter = telemetry.counter(config=self._opentelemetry_dict["metrics"], name=name)
 319                        if counter is not None:
 320                            self._metric_counters[name] = counter
 321
 322                    if "attributes" in self._opentelemetry_dict["metrics"]:
 323                        attributes_providers: list[AttributesProvider] = []
 324                        attributes_provider_configs: list[dict[str, Any]] = self._opentelemetry_dict["metrics"][
 325                            "attributes"
 326                        ]
 327                        for config in attributes_provider_configs:
 328                            attributes_provider_type: str = config["type"]
 329                            attributes_provider_fully_qualified_name = _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING.get(
 330                                attributes_provider_type, attributes_provider_type
 331                            )
 332                            attributes_provider_module_name, attributes_provider_class_name = (
 333                                attributes_provider_fully_qualified_name.rsplit(".", 1)
 334                            )
 335                            cls = import_class(attributes_provider_class_name, attributes_provider_module_name)
 336                            attributes_provider_options = config.get("options", {})
 337                            if (
 338                                attributes_provider_fully_qualified_name
 339                                == _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING["msc_config"]
 340                            ):
 341                                attributes_provider_options["config_dict"] = config_dict
 342                            attributes_provider: AttributesProvider = cls(**attributes_provider_options)
 343                            attributes_providers.append(attributes_provider)
 344                        self._metric_attributes_providers = tuple(attributes_providers)
 345                elif not any([metric_gauges, metric_counters, metric_attributes_providers]):
 346                    # TODO: Remove "beta" from the log once legacy metrics are removed.
 347                    logger.error("No telemetry instance! Disabling beta metrics.")
 348
 349        self._cache_dict = config_dict.get("cache", None)
 350
 351    def _build_storage_provider(
 352        self,
 353        storage_provider_name: str,
 354        storage_options: Optional[dict[str, Any]] = None,
 355        credentials_provider: Optional[CredentialsProvider] = None,
 356    ) -> StorageProvider:
 357        if storage_options is None:
 358            storage_options = {}
 359        if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
 360            raise ValueError(
 361                f"Storage provider {storage_provider_name} is not supported. "
 362                f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
 363            )
 364        if credentials_provider:
 365            storage_options["credentials_provider"] = credentials_provider
 366        if self._metric_gauges is not None:
 367            storage_options["metric_gauges"] = self._metric_gauges
 368        if self._metric_counters is not None:
 369            storage_options["metric_counters"] = self._metric_counters
 370        if self._metric_attributes_providers is not None:
 371            storage_options["metric_attributes_providers"] = self._metric_attributes_providers
 372        class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
 373        module_name = ".providers"
 374        cls = import_class(class_name, module_name, PACKAGE_NAME)
 375        return cls(**storage_options)
 376
 377    def _build_storage_provider_from_profile(self, storage_provider_profile: str):
 378        storage_profile_dict = self._profiles.get(storage_provider_profile)
 379        if not storage_profile_dict:
 380            raise ValueError(
 381                f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
 382            )
 383
 384        # Check if metadata provider is configured for this profile
 385        # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
 386        local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
 387        if local_metadata_provider_dict:
 388            raise ValueError(
 389                f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
 390            )
 391
 392        # Initialize CredentialsProvider
 393        local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
 394        local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
 395
 396        # Initialize StorageProvider
 397        local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
 398        if local_storage_provider_dict:
 399            local_name = local_storage_provider_dict["type"]
 400            local_storage_options = local_storage_provider_dict.get("options", {})
 401        else:
 402            raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
 403
 404        storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
 405        return storage_provider
 406
 407    def _build_credentials_provider(
 408        self,
 409        credentials_provider_dict: Optional[dict[str, Any]],
 410        storage_options: Optional[dict[str, Any]] = None,
 411    ) -> Optional[CredentialsProvider]:
 412        """
 413        Initializes the CredentialsProvider based on the provided dictionary.
 414
 415        Args:
 416            credentials_provider_dict: Dictionary containing credentials provider configuration
 417            storage_options: Storage provider options required by some credentials providers to scope the credentials.
 418        """
 419        if not credentials_provider_dict:
 420            return None
 421
 422        if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
 423            # Fully qualified class path case
 424            class_type = credentials_provider_dict["type"]
 425            module_name, class_name = class_type.rsplit(".", 1)
 426            cls = import_class(class_name, module_name)
 427        else:
 428            # Mapped class name case
 429            class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
 430            module_name = ".providers"
 431            cls = import_class(class_name, module_name, PACKAGE_NAME)
 432
 433        # Propagate storage provider options to credentials provider since they may be
 434        # required by some credentials providers to scope the credentials.
 435        import inspect
 436
 437        init_params = list(inspect.signature(cls.__init__).parameters)[1:]  # skip 'self'
 438        options = credentials_provider_dict.get("options", {})
 439        if storage_options:
 440            for storage_provider_option in storage_options.keys():
 441                if storage_provider_option in init_params and storage_provider_option not in options:
 442                    options[storage_provider_option] = storage_options[storage_provider_option]
 443
 444        return cls(**options)
 445
 446    def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
 447        # Initialize StorageProvider
 448        storage_provider_dict = profile_dict.get("storage_provider", None)
 449        if storage_provider_dict:
 450            storage_provider_name = storage_provider_dict["type"]
 451            storage_options = storage_provider_dict.get("options", {})
 452        else:
 453            raise ValueError("Missing storage_provider in the config.")
 454
 455        # Initialize CredentialsProvider
 456        # It is prudent to assume that in some cases, the credentials provider
 457        # will provide credentials scoped to specific base_path.
 458        # So we need to pass the storage_options to the credentials provider.
 459        credentials_provider_dict = profile_dict.get("credentials_provider", None)
 460        credentials_provider = self._build_credentials_provider(
 461            credentials_provider_dict=credentials_provider_dict,
 462            storage_options=storage_options,
 463        )
 464
 465        # Initialize MetadataProvider
 466        metadata_provider_dict = profile_dict.get("metadata_provider", None)
 467        metadata_provider = None
 468        if metadata_provider_dict:
 469            if metadata_provider_dict["type"] == "manifest":
 470                metadata_options = metadata_provider_dict.get("options", {})
 471                # If MetadataProvider has a reference to a different storage provider profile
 472                storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
 473                if storage_provider_profile:
 474                    storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
 475                else:
 476                    storage_provider = self._build_storage_provider(
 477                        storage_provider_name, storage_options, credentials_provider
 478                    )
 479
 480                metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
 481            else:
 482                class_type = metadata_provider_dict["type"]
 483                if "." not in class_type:
 484                    raise ValueError(
 485                        f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
 486                    )
 487                module_name, class_name = class_type.rsplit(".", 1)
 488                cls = import_class(class_name, module_name)
 489                options = metadata_provider_dict.get("options", {})
 490                metadata_provider = cls(**options)
 491
 492        # Build replicas if configured
 493        replicas_config = profile_dict.get("replicas", [])
 494        replicas = []
 495        if replicas_config:
 496            for replica_dict in replicas_config:
 497                replicas.append(
 498                    Replica(
 499                        replica_profile=replica_dict["replica_profile"],
 500                        read_priority=replica_dict["read_priority"],
 501                    )
 502                )
 503
 504            # Sort replicas by read_priority
 505            replicas.sort(key=lambda r: r.read_priority)
 506
 507        return SimpleProviderBundle(
 508            storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
 509            credentials_provider=credentials_provider,
 510            metadata_provider=metadata_provider,
 511            replicas=replicas,
 512        )
 513
 514    def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
 515        class_type = provider_bundle_dict["type"]
 516        module_name, class_name = class_type.rsplit(".", 1)
 517        cls = import_class(class_name, module_name)
 518        options = provider_bundle_dict.get("options", {})
 519        return cls(**options)
 520
 521    def _build_provider_bundle(self) -> ProviderBundle:
 522        if self._provider_bundle:
 523            return self._provider_bundle  # Return if previously provided.
 524
 525        # Load 3rd party extension
 526        provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
 527        if provider_bundle_dict:
 528            return self._build_provider_bundle_from_extension(provider_bundle_dict)
 529
 530        return self._build_provider_bundle_from_config(self._profile_dict)
 531
 532    def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
 533        if "size_mb" in cache_dict:
 534            raise ValueError(
 535                "The 'size_mb' property is no longer supported. \n"
 536                "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
 537                "Example configuration:\n"
 538                "cache:\n"
 539                "  size: 500G               # Optional: If not specified, default cache size (10GB) will be used\n"
 540                "  use_etag: true           # Optional: If not specified, default cache use_etag (true) will be used\n"
 541                "  location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
 542                "  eviction_policy:         # Optional: The eviction policy to use\n"
 543                "    policy: fifo           # Optional: The eviction policy to use, default is 'fifo'\n"
 544                "    refresh_interval: 300  # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
 545            )
 546
 547    def _validate_replicas(self, replicas: list[Replica]) -> None:
 548        """
 549        Validates that replica profiles do not have their own replicas configuration.
 550
 551        This prevents circular references where a replica profile could reference
 552        another profile that also has replicas, creating an infinite loop.
 553
 554        :param replicas: The list of Replica objects to validate
 555        :raises ValueError: If any replica profile has its own replicas configuration
 556        """
 557        for replica in replicas:
 558            replica_profile_name = replica.replica_profile
 559
 560            # Check that replica profile is not the same as the current profile
 561            if replica_profile_name == self._profile:
 562                raise ValueError(
 563                    f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
 564                )
 565
 566            # Check if the replica profile exists in the configuration
 567            if replica_profile_name not in self._profiles:
 568                raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
 569
 570            # Get the replica profile configuration
 571            replica_profile_dict = self._profiles[replica_profile_name]
 572
 573            # Check if the replica profile has its own replicas configuration
 574            if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
 575                raise ValueError(
 576                    f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
 577                    f"This creates a circular reference which is not allowed."
 578                )
 579
 580    def build_config(self) -> "StorageClientConfig":
 581        bundle = self._build_provider_bundle()
 582
 583        # Validate replicas to prevent circular references
 584        self._validate_replicas(bundle.replicas)
 585
 586        storage_provider = self._build_storage_provider(
 587            bundle.storage_provider_config.type,
 588            bundle.storage_provider_config.options,
 589            bundle.credentials_provider,
 590        )
 591
 592        cache_config: Optional[CacheConfig] = None
 593        cache_manager: Optional[CacheManager] = None
 594
 595        # Check if caching is enabled for this profile
 596        caching_enabled = self._profile_dict.get("caching_enabled", False)
 597
 598        if self._cache_dict is not None and caching_enabled:
 599            tempdir = tempfile.gettempdir()
 600            default_location = os.path.join(tempdir, "msc_cache")
 601            location = self._cache_dict.get("location", default_location)
 602
 603            # Check if cache_backend.cache_path is defined
 604            cache_backend = self._cache_dict.get("cache_backend", {})
 605            cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
 606
 607            # Warn if both location and cache_backend.cache_path are defined
 608            if cache_backend_path and self._cache_dict.get("location") is not None:
 609                logger.warning(
 610                    f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
 611                    f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
 612                )
 613            elif cache_backend_path:
 614                # Use cache_backend.cache_path only if location is not explicitly defined
 615                location = cache_backend_path
 616
 617            # Resolve the effective flag while preserving explicit ``False``.
 618            if "check_source_version" in self._cache_dict:
 619                check_source_version = self._cache_dict["check_source_version"]
 620            else:
 621                check_source_version = self._cache_dict.get("use_etag", True)
 622
 623            # Warn if both keys are specified – the new one wins.
 624            if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
 625                logger.warning(
 626                    "Both 'check_source_version' and 'use_etag' are defined in cache config. "
 627                    "Using 'check_source_version' and ignoring 'use_etag'."
 628                )
 629
 630            if not Path(location).is_absolute():
 631                raise ValueError(f"Cache location must be an absolute path: {location}")
 632
 633            # Initialize cache_dict with default values
 634            cache_dict = self._cache_dict
 635
 636            # Verify cache config
 637            self._verify_cache_config(cache_dict)
 638
 639            # Initialize eviction policy
 640            if "eviction_policy" in cache_dict:
 641                policy = cache_dict["eviction_policy"]["policy"].lower()
 642                eviction_policy = EvictionPolicyConfig(
 643                    policy=policy,
 644                    refresh_interval=cache_dict["eviction_policy"].get(
 645                        "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
 646                    ),
 647                )
 648            else:
 649                eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
 650
 651            # Create cache config from the standardized format
 652            cache_config = CacheConfig(
 653                size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
 654                location=cache_dict.get("location", location),
 655                check_source_version=check_source_version,
 656                eviction_policy=eviction_policy,
 657            )
 658
 659            cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
 660        elif self._cache_dict is not None and not caching_enabled:
 661            logger.debug(f"Caching is disabled for profile '{self._profile}'")
 662        elif self._cache_dict is None and caching_enabled:
 663            logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
 664
 665        # retry options
 666        retry_config_dict = self._profile_dict.get("retry", None)
 667        if retry_config_dict:
 668            attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
 669            delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
 670            retry_config = RetryConfig(attempts=attempts, delay=delay)
 671        else:
 672            retry_config = RetryConfig(attempts=DEFAULT_RETRY_ATTEMPTS, delay=DEFAULT_RETRY_DELAY)
 673
 674        # autocommit options
 675        autocommit_config = AutoCommitConfig()
 676        autocommit_dict = self._profile_dict.get("autocommit", None)
 677        if autocommit_dict:
 678            interval_minutes = autocommit_dict.get("interval_minutes", None)
 679            at_exit = autocommit_dict.get("at_exit", False)
 680            autocommit_config = AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
 681
 682        # set up OpenTelemetry providers once per process
 683        #
 684        # TODO: Legacy, need to remove.
 685        if self._opentelemetry_dict:
 686            setup_opentelemetry(self._opentelemetry_dict)
 687
 688        return StorageClientConfig(
 689            profile=self._profile,
 690            storage_provider=storage_provider,
 691            credentials_provider=bundle.credentials_provider,
 692            metadata_provider=bundle.metadata_provider,
 693            cache_config=cache_config,
 694            cache_manager=cache_manager,
 695            retry_config=retry_config,
 696            metric_gauges=self._metric_gauges,
 697            metric_counters=self._metric_counters,
 698            metric_attributes_providers=self._metric_attributes_providers,
 699            replicas=bundle.replicas,
 700            autocommit_config=autocommit_config,
 701        )
 702
 703
 704class PathMapping:
 705    """
 706    Class to handle path mappings defined in the MSC configuration.
 707
 708    Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
 709    where entries are sorted by prefix length (longest first) for optimal matching.
 710    Longer paths take precedence when matching.
 711    """
 712
 713    def __init__(self):
 714        """Initialize an empty PathMapping."""
 715        self._mapping = defaultdict(lambda: defaultdict(list))
 716
 717    @classmethod
 718    def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
 719        """
 720        Create a PathMapping instance from configuration dictionary.
 721
 722        :param config_dict: Configuration dictionary, if None the config will be loaded
 723        :return: A PathMapping instance with processed mappings
 724        """
 725        if config_dict is None:
 726            # Import locally to avoid circular imports
 727            from multistorageclient.config import StorageClientConfig
 728
 729            config_dict, _ = StorageClientConfig.read_msc_config()
 730
 731        if not config_dict:
 732            return cls()
 733
 734        instance = cls()
 735        instance._load_mapping(config_dict)
 736        return instance
 737
 738    def _load_mapping(self, config_dict: dict[str, Any]) -> None:
 739        """
 740        Load path mapping from a configuration dictionary.
 741
 742        :param config_dict: Configuration dictionary containing path mapping
 743        """
 744        # Get the path_mapping section
 745        path_mapping = config_dict.get("path_mapping", {})
 746        if path_mapping is None:
 747            return
 748
 749        # Process each mapping
 750        for source_path, dest_path in path_mapping.items():
 751            # Validate format
 752            if not source_path.endswith("/"):
 753                continue
 754            if not dest_path.startswith(MSC_PROTOCOL):
 755                continue
 756            if not dest_path.endswith("/"):
 757                continue
 758
 759            # Extract the destination profile
 760            pr_dest = urlparse(dest_path)
 761            dest_profile = pr_dest.netloc
 762
 763            # Parse the source path
 764            pr = urlparse(source_path)
 765            protocol = pr.scheme.lower() if pr.scheme else "file"
 766
 767            if protocol == "file" or source_path.startswith("/"):
 768                # For file or absolute paths, use the whole path as the prefix
 769                # and leave bucket empty
 770                bucket = ""
 771                prefix = source_path if source_path.startswith("/") else pr.path
 772            else:
 773                # For object storage, extract bucket and prefix
 774                bucket = pr.netloc
 775                prefix = pr.path
 776                if prefix.startswith("/"):
 777                    prefix = prefix[1:]
 778
 779            # Add the mapping to the nested dict
 780            self._mapping[protocol][bucket].append((prefix, dest_profile))
 781
 782        # Sort each bucket's prefixes by length (longest first) for optimal matching
 783        for protocol, buckets in self._mapping.items():
 784            for bucket, prefixes in buckets.items():
 785                self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
 786
 787    def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
 788        """
 789        Find the best matching mapping for the given URL.
 790
 791        :param url: URL to find matching mapping for
 792        :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
 793        """
 794        # Parse the URL
 795        pr = urlparse(url)
 796        protocol = pr.scheme.lower() if pr.scheme else "file"
 797
 798        # For file paths or absolute paths
 799        if protocol == "file" or url.startswith("/"):
 800            path = url if url.startswith("/") else pr.path
 801
 802            possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
 803
 804            # Check each prefix (already sorted by length, longest first)
 805            for prefix, profile in possible_mapping:
 806                if path.startswith(prefix):
 807                    # Calculate the relative path
 808                    rel_path = path[len(prefix) :]
 809                    if not rel_path.startswith("/"):
 810                        rel_path = "/" + rel_path
 811                    return profile, rel_path
 812
 813            return None
 814
 815        # For object storage
 816        bucket = pr.netloc
 817        path = pr.path
 818        if path.startswith("/"):
 819            path = path[1:]
 820
 821        # Check bucket-specific mapping
 822        possible_mapping = (
 823            self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
 824        )
 825
 826        # Check each prefix (already sorted by length, longest first)
 827        for prefix, profile in possible_mapping:
 828            # matching prefix
 829            if path.startswith(prefix):
 830                rel_path = path[len(prefix) :]
 831                # Remove leading slash if present
 832                if rel_path.startswith("/"):
 833                    rel_path = rel_path[1:]
 834
 835                return profile, rel_path
 836
 837        return None
 838
 839
[docs] 840class StorageClientConfig: 841 """ 842 Configuration class for the :py:class:`multistorageclient.StorageClient`. 843 """ 844 845 profile: str 846 storage_provider: StorageProvider 847 credentials_provider: Optional[CredentialsProvider] 848 metadata_provider: Optional[MetadataProvider] 849 cache_config: Optional[CacheConfig] 850 cache_manager: Optional[CacheManager] 851 retry_config: Optional[RetryConfig] 852 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] 853 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] 854 metric_attributes_providers: Optional[Sequence[AttributesProvider]] 855 replicas: list[Replica] 856 autocommit_config: Optional[AutoCommitConfig] 857 858 _config_dict: Optional[dict[str, Any]] 859 860 def __init__( 861 self, 862 profile: str, 863 storage_provider: StorageProvider, 864 credentials_provider: Optional[CredentialsProvider] = None, 865 metadata_provider: Optional[MetadataProvider] = None, 866 cache_config: Optional[CacheConfig] = None, 867 cache_manager: Optional[CacheManager] = None, 868 retry_config: Optional[RetryConfig] = None, 869 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] = None, 870 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] = None, 871 metric_attributes_providers: Optional[Sequence[AttributesProvider]] = None, 872 replicas: Optional[list[Replica]] = None, 873 autocommit_config: Optional[AutoCommitConfig] = None, 874 ): 875 if replicas is None: 876 replicas = [] 877 self.profile = profile 878 self.storage_provider = storage_provider 879 self.credentials_provider = credentials_provider 880 self.metadata_provider = metadata_provider 881 self.cache_config = cache_config 882 self.cache_manager = cache_manager 883 self.retry_config = retry_config 884 self.metric_gauges = metric_gauges 885 self.metric_counters = metric_counters 886 self.metric_attributes_providers = metric_attributes_providers 887 self.replicas = replicas 888 self.autocommit_config = autocommit_config 889 890 @staticmethod 891 def from_json( 892 config_json: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None 893 ) -> "StorageClientConfig": 894 config_dict = json.loads(config_json) 895 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry) 896 897 @staticmethod 898 def from_yaml( 899 config_yaml: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None 900 ) -> "StorageClientConfig": 901 config_dict = yaml.safe_load(config_yaml) 902 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry) 903 904 @staticmethod 905 def from_dict( 906 config_dict: dict[str, Any], 907 profile: str = DEFAULT_POSIX_PROFILE_NAME, 908 skip_validation: bool = False, 909 telemetry: Optional[Telemetry] = None, 910 ) -> "StorageClientConfig": 911 # Validate the config file with predefined JSON schema 912 if not skip_validation: 913 validate_config(config_dict) 914 915 # Load config 916 loader = StorageClientConfigLoader(config_dict=config_dict, profile=profile, telemetry=telemetry) 917 config = loader.build_config() 918 config._config_dict = config_dict 919 920 return config 921 922 @staticmethod 923 def from_file( 924 profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None 925 ) -> "StorageClientConfig": 926 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config() 927 # Parse rclone config file. 928 rclone_config_dict, rclone_config_file = read_rclone_config() 929 930 # Merge config files. 931 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict) 932 if conflicted_keys: 933 raise ValueError( 934 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}' 935 ) 936 merged_profiles = merged_config.get("profiles", {}) 937 938 # Check if profile is in merged_profiles 939 if profile in merged_profiles: 940 return StorageClientConfig.from_dict(config_dict=merged_config, profile=profile, telemetry=telemetry) 941 else: 942 # Check if profile is the default profile or an implicit profile 943 if profile == DEFAULT_POSIX_PROFILE_NAME: 944 implicit_profile_config = DEFAULT_POSIX_PROFILE 945 elif profile.startswith("_"): 946 # Handle implicit profiles 947 parts = profile[1:].split("-", 1) 948 if len(parts) == 2: 949 protocol, bucket = parts 950 # Verify it's a supported protocol 951 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 952 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"') 953 implicit_profile_config = create_implicit_profile_config( 954 profile_name=profile, protocol=protocol, base_path=bucket 955 ) 956 else: 957 raise ValueError(f'Invalid implicit profile format: "{profile}"') 958 else: 959 raise ValueError( 960 f'Profile "{profile}" not found in configuration files. Configuration was checked in ' 961 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. " 962 f"Please verify that the profile exists and that configuration files are correctly located." 963 ) 964 # merge the implicit profile config into the merged config so the cache & observability config can be inherited 965 if "profiles" not in merged_config: 966 merged_config["profiles"] = implicit_profile_config["profiles"] 967 else: 968 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile] 969 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_" 970 return StorageClientConfig.from_dict( 971 config_dict=merged_config, profile=profile, skip_validation=True, telemetry=telemetry 972 ) 973 974 @staticmethod 975 def from_provider_bundle( 976 config_dict: dict[str, Any], provider_bundle: ProviderBundle, telemetry: Optional[Telemetry] = None 977 ) -> "StorageClientConfig": 978 loader = StorageClientConfigLoader( 979 config_dict=config_dict, provider_bundle=provider_bundle, telemetry=telemetry 980 ) 981 config = loader.build_config() 982 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors 983 return config 984
[docs] 985 @staticmethod 986 def read_msc_config() -> tuple[Optional[dict[str, Any]], Optional[str]]: 987 """Get the MSC configuration dictionary and the path of the config file used. 988 989 Configs are searched in the following order: 990 1. MSC_CONFIG environment variable (highest precedence) 991 2. Standard search paths (user-specified config and system-wide config) 992 993 994 :return: Tuple of (config_dict, config_file_path). config_dict is the MSC configuration 995 dictionary or empty dict if no config was found. config_file_path is the absolute 996 path of the config file used, or None if no config file was found. 997 """ 998 config_dict = {} 999 found_config_files = [] 1000 used_config_file = None 1001 1002 # Check for environment variable first 1003 msc_config = os.getenv("MSC_CONFIG", None) 1004 if msc_config and os.path.exists(msc_config): 1005 try: 1006 with open(msc_config) as f: 1007 if msc_config.endswith(".json"): 1008 config_dict = json.load(f) 1009 used_config_file = msc_config 1010 else: 1011 config_dict = yaml.safe_load(f) 1012 used_config_file = msc_config 1013 found_config_files.append(msc_config) 1014 except Exception as e: 1015 raise ValueError(f"malformed msc config file: {msc_config}, exception: {e}") 1016 1017 # Check all standard search paths 1018 for path in _find_config_file_paths(): 1019 if os.path.exists(path): 1020 found_config_files.append(path) 1021 # Only load the first found config file (environment variable takes precedence) 1022 if used_config_file is None: 1023 try: 1024 with open(path) as f: 1025 if path.endswith(".json"): 1026 config_dict = json.load(f) 1027 used_config_file = path 1028 else: 1029 config_dict = yaml.safe_load(f) 1030 used_config_file = path 1031 except Exception as e: 1032 raise ValueError(f"malformed msc config file: {path}, exception: {e}") 1033 1034 # Log debug and info messages 1035 if len(found_config_files) > 1: 1036 found_config_files_str = ", ".join(found_config_files) 1037 logger.debug(f"Multiple MSC config files found: {found_config_files_str}. ") 1038 1039 if len(found_config_files) == 0: 1040 logger.debug("No MSC config files found in any of the search locations.") 1041 else: 1042 logger.info(f"Using MSC config file: {used_config_file}") 1043 1044 if config_dict: 1045 validate_config(config_dict) 1046 return config_dict, used_config_file
1047
[docs] 1048 @staticmethod 1049 def read_path_mapping() -> PathMapping: 1050 """ 1051 Get the path mapping defined in the MSC configuration. 1052 1053 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)] 1054 where entries are sorted by prefix length (longest first) for optimal matching. 1055 Longer paths take precedence when matching. 1056 1057 :return: A PathMapping instance with translation mappings 1058 """ 1059 try: 1060 return PathMapping.from_config() 1061 except Exception: 1062 # Log the error but continue - this shouldn't stop the application from working 1063 logger.error("Failed to load path_mapping from MSC config") 1064 return PathMapping()
1065 1066 def __getstate__(self) -> dict[str, Any]: 1067 state = self.__dict__.copy() 1068 if not state.get("_config_dict"): 1069 raise ValueError("StorageClientConfig is not serializable") 1070 del state["credentials_provider"] 1071 del state["storage_provider"] 1072 del state["metadata_provider"] 1073 del state["cache_manager"] 1074 del state["replicas"] 1075 return state 1076 1077 def __setstate__(self, state: dict[str, Any]) -> None: 1078 # Presence checked by __getstate__. 1079 config_dict = state["_config_dict"] 1080 loader = StorageClientConfigLoader( 1081 config_dict=config_dict, 1082 profile=state["profile"], 1083 metric_gauges=state["metric_gauges"], 1084 metric_counters=state["metric_counters"], 1085 metric_attributes_providers=state["metric_attributes_providers"], 1086 ) 1087 new_config = loader.build_config() 1088 self.profile = new_config.profile 1089 self.storage_provider = new_config.storage_provider 1090 self.credentials_provider = new_config.credentials_provider 1091 self.metadata_provider = new_config.metadata_provider 1092 self.cache_config = new_config.cache_config 1093 self.cache_manager = new_config.cache_manager 1094 self.retry_config = new_config.retry_config 1095 self.metric_gauges = new_config.metric_gauges 1096 self.metric_counters = new_config.metric_counters 1097 self.metric_attributes_providers = new_config.metric_attributes_providers 1098 self._config_dict = config_dict 1099 self.replicas = new_config.replicas 1100 self.autocommit_config = new_config.autocommit_config