Source code for multistorageclient.config

   1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
   2# SPDX-License-Identifier: Apache-2.0
   3#
   4# Licensed under the Apache License, Version 2.0 (the "License");
   5# you may not use this file except in compliance with the License.
   6# You may obtain a copy of the License at
   7#
   8# http://www.apache.org/licenses/LICENSE-2.0
   9#
  10# Unless required by applicable law or agreed to in writing, software
  11# distributed under the License is distributed on an "AS IS" BASIS,
  12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13# See the License for the specific language governing permissions and
  14# limitations under the License.
  15
  16import json
  17import logging
  18import os
  19import tempfile
  20from collections import defaultdict
  21from collections.abc import Sequence
  22from multiprocessing import current_process
  23from pathlib import Path
  24from typing import Any, Optional
  25from urllib.parse import urlparse
  26
  27import opentelemetry.metrics as api_metrics
  28import yaml
  29
  30from .cache import DEFAULT_CACHE_SIZE, CacheManager
  31from .caching.cache_config import CacheConfig, EvictionPolicyConfig
  32from .instrumentation import setup_opentelemetry
  33from .providers.manifest_metadata import ManifestMetadataProvider
  34from .rclone import read_rclone_config
  35from .schema import validate_config
  36from .telemetry import Telemetry, TelemetryMode
  37from .telemetry import init as telemetry_init
  38from .telemetry.attributes.base import AttributesProvider
  39from .types import (
  40    DEFAULT_RETRY_ATTEMPTS,
  41    DEFAULT_RETRY_DELAY,
  42    MSC_PROTOCOL,
  43    AutoCommitConfig,
  44    CredentialsProvider,
  45    MetadataProvider,
  46    ProviderBundle,
  47    Replica,
  48    RetryConfig,
  49    StorageProvider,
  50    StorageProviderConfig,
  51)
  52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
  53
  54# Constants related to implicit profiles
  55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
  56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
  57    "s3": "s3",
  58    "gs": "gcs",
  59    "ais": "ais",
  60    "file": "file",
  61}
  62
  63_TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING = {
  64    "environment_variables": "multistorageclient.telemetry.attributes.environment_variables.EnvironmentVariablesAttributesProvider",
  65    "host": "multistorageclient.telemetry.attributes.host.HostAttributesProvider",
  66    "msc_config": "multistorageclient.telemetry.attributes.msc_config.MSCConfigAttributesProvider",
  67    "process": "multistorageclient.telemetry.attributes.process.ProcessAttributesProvider",
  68    "static": "multistorageclient.telemetry.attributes.static.StaticAttributesProvider",
  69    "thread": "multistorageclient.telemetry.attributes.thread.ThreadAttributesProvider",
  70}
  71
  72
  73# Template for creating implicit profile configurations
  74def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
  75    """
  76    Create a configuration dictionary for an implicit profile.
  77
  78    :param profile_name: The name of the profile (e.g., "_s3-bucket1")
  79    :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
  80    :param base_path: The base path (e.g., bucket name) for the storage provider
  81
  82    :return: A configuration dictionary for the implicit profile
  83    """
  84    provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
  85    return {
  86        "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
  87    }
  88
  89
  90DEFAULT_POSIX_PROFILE_NAME = "default"
  91DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
  92
  93STORAGE_PROVIDER_MAPPING = {
  94    "file": "PosixFileStorageProvider",
  95    "s3": "S3StorageProvider",
  96    "gcs": "GoogleStorageProvider",
  97    "oci": "OracleStorageProvider",
  98    "azure": "AzureBlobStorageProvider",
  99    "ais": "AIStoreStorageProvider",
 100    "s8k": "S8KStorageProvider",
 101    "gcs_s3": "GoogleS3StorageProvider",
 102}
 103
 104CREDENTIALS_PROVIDER_MAPPING = {
 105    "S3Credentials": "StaticS3CredentialsProvider",
 106    "AzureCredentials": "StaticAzureCredentialsProvider",
 107    "AISCredentials": "StaticAISCredentialProvider",
 108    "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
 109}
 110
 111
 112def _find_config_file_paths():
 113    """
 114    Get configuration file search paths.
 115
 116    Returns paths in order of precedence:
 117
 118    1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
 119    2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
 120    """
 121    paths = []
 122
 123    # 1. User-specific configuration directory
 124    xdg_config_home = os.getenv("XDG_CONFIG_HOME")
 125
 126    if xdg_config_home:
 127        paths.extend(
 128            [
 129                os.path.join(xdg_config_home, "msc", "config.yaml"),
 130                os.path.join(xdg_config_home, "msc", "config.json"),
 131            ]
 132        )
 133
 134    user_home = os.getenv("HOME")
 135
 136    if user_home:
 137        paths.extend(
 138            [
 139                os.path.join(user_home, ".msc_config.yaml"),
 140                os.path.join(user_home, ".msc_config.json"),
 141                os.path.join(user_home, ".config", "msc", "config.yaml"),
 142                os.path.join(user_home, ".config", "msc", "config.json"),
 143            ]
 144        )
 145
 146    # 2. System-wide configuration directories
 147    xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
 148    if not xdg_config_dirs:
 149        xdg_config_dirs = "/etc/xdg"
 150
 151    for config_dir in xdg_config_dirs.split(":"):
 152        config_dir = config_dir.strip()
 153        if config_dir:
 154            paths.extend(
 155                [
 156                    os.path.join(config_dir, "msc", "config.yaml"),
 157                    os.path.join(config_dir, "msc", "config.json"),
 158                ]
 159            )
 160
 161    paths.extend(
 162        [
 163            "/etc/msc_config.yaml",
 164            "/etc/msc_config.json",
 165        ]
 166    )
 167
 168    return tuple(paths)
 169
 170
 171PACKAGE_NAME = "multistorageclient"
 172
 173logger = logging.getLogger(__name__)
 174
 175
 176class SimpleProviderBundle(ProviderBundle):
 177    def __init__(
 178        self,
 179        storage_provider_config: StorageProviderConfig,
 180        credentials_provider: Optional[CredentialsProvider] = None,
 181        metadata_provider: Optional[MetadataProvider] = None,
 182        replicas: Optional[list[Replica]] = None,
 183    ):
 184        if replicas is None:
 185            replicas = []
 186
 187        self._storage_provider_config = storage_provider_config
 188        self._credentials_provider = credentials_provider
 189        self._metadata_provider = metadata_provider
 190        self._replicas = replicas
 191
 192    @property
 193    def storage_provider_config(self) -> StorageProviderConfig:
 194        return self._storage_provider_config
 195
 196    @property
 197    def credentials_provider(self) -> Optional[CredentialsProvider]:
 198        return self._credentials_provider
 199
 200    @property
 201    def metadata_provider(self) -> Optional[MetadataProvider]:
 202        return self._metadata_provider
 203
 204    @property
 205    def replicas(self) -> list[Replica]:
 206        return self._replicas
 207
 208
 209DEFAULT_CACHE_REFRESH_INTERVAL = 300
 210
 211
 212class StorageClientConfigLoader:
 213    _provider_bundle: Optional[ProviderBundle]
 214    _profiles: dict[str, Any]
 215    _profile: str
 216    _profile_dict: dict[str, Any]
 217    _opentelemetry_dict: Optional[dict[str, Any]]
 218    _metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]]
 219    _metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]]
 220    _metric_attributes_providers: Optional[Sequence[AttributesProvider]]
 221    _cache_dict: Optional[dict[str, Any]]
 222
 223    def __init__(
 224        self,
 225        config_dict: dict[str, Any],
 226        profile: str = DEFAULT_POSIX_PROFILE_NAME,
 227        provider_bundle: Optional[ProviderBundle] = None,
 228        telemetry: Optional[Telemetry] = None,
 229        metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] = None,
 230        metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] = None,
 231        metric_attributes_providers: Optional[Sequence[AttributesProvider]] = None,
 232    ) -> None:
 233        """
 234        Initializes a :py:class:`StorageClientConfigLoader` to create a
 235        StorageClientConfig. Components are built using the ``config_dict`` and
 236        profile, but a pre-built provider_bundle takes precedence.
 237
 238        :param config_dict: Dictionary of configuration options.
 239        :param profile: Name of profile in ``config_dict`` to use to build configuration.
 240        :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle`, takes precedence over ``config_dict``.
 241        :param telemetry: Optional telemetry instance to use, takes precedence over ``metric_gauges``, ``metric_counters``, and ``metric_attributes_providers``.
 242        :param metric_gauges: Optional metric gauges to use.
 243        :param metric_counters: Optional metric counters to use.
 244        :param metric_attributes_providers: Optional metric attributes providers to use.
 245        """
 246        # ProviderBundle takes precedence
 247        self._provider_bundle = provider_bundle
 248
 249        # Interpolates all environment variables into actual values.
 250        config_dict = expand_env_vars(config_dict)
 251
 252        self._profiles = config_dict.get("profiles", {})
 253
 254        if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
 255            # Assign the default POSIX profile
 256            self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
 257        else:
 258            # Cannot override default POSIX profile
 259            storage_provider_type = (
 260                self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
 261            )
 262            if storage_provider_type != "file":
 263                raise ValueError(
 264                    f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
 265                    f'"{storage_provider_type}"; expected "file".'
 266                )
 267
 268        profile_dict = self._profiles.get(profile)
 269
 270        if not profile_dict:
 271            raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
 272
 273        self._profile = profile
 274        self._profile_dict = profile_dict
 275
 276        self._opentelemetry_dict = config_dict.get("opentelemetry", None)
 277
 278        self._metric_gauges = metric_gauges
 279        self._metric_counters = metric_counters
 280        self._metric_attributes_providers = metric_attributes_providers
 281        if self._opentelemetry_dict is not None:
 282            if telemetry is None:
 283                try:
 284                    # Try to create a telemetry instance with the default mode heuristic.
 285                    telemetry = telemetry_init()
 286                except (AssertionError, OSError, RuntimeError, ValueError):
 287                    try:
 288                        if current_process().daemon:
 289                            # Daemons can't have child processes.
 290                            #
 291                            # Try to create a telemetry instance in local mode.
 292                            #
 293                            # ⚠️ This may cause CPU contention if the current process is compute-intensive
 294                            # and a high collect and/or export frequency is used due to global interpreter lock (GIL).
 295                            telemetry = telemetry_init(mode=TelemetryMode.LOCAL)
 296                        else:
 297                            # Try to create a telemetry instance in server mode.
 298                            telemetry = telemetry_init(mode=TelemetryMode.SERVER)
 299                    except (AssertionError, OSError, RuntimeError, ValueError):
 300                        # Don't throw on telemetry init failures.
 301                        logger.error("Failed to automatically create telemetry instance!", exc_info=True)
 302
 303            if "metrics" in self._opentelemetry_dict:
 304                if telemetry is not None:
 305                    self._metric_gauges = {}
 306                    for name in Telemetry.GaugeName:
 307                        gauge = telemetry.gauge(config=self._opentelemetry_dict["metrics"], name=name)
 308                        if gauge is not None:
 309                            self._metric_gauges[name] = gauge
 310                    self._metric_counters = {}
 311                    for name in Telemetry.CounterName:
 312                        counter = telemetry.counter(config=self._opentelemetry_dict["metrics"], name=name)
 313                        if counter is not None:
 314                            self._metric_counters[name] = counter
 315
 316                    if "attributes" in self._opentelemetry_dict["metrics"]:
 317                        attributes_providers: list[AttributesProvider] = []
 318                        attributes_provider_configs: list[dict[str, Any]] = self._opentelemetry_dict["metrics"][
 319                            "attributes"
 320                        ]
 321                        for config in attributes_provider_configs:
 322                            attributes_provider_type: str = config["type"]
 323                            attributes_provider_fully_qualified_name = _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING.get(
 324                                attributes_provider_type, attributes_provider_type
 325                            )
 326                            attributes_provider_module_name, attributes_provider_class_name = (
 327                                attributes_provider_fully_qualified_name.rsplit(".", 1)
 328                            )
 329                            cls = import_class(attributes_provider_class_name, attributes_provider_module_name)
 330                            attributes_provider_options = config.get("options", {})
 331                            if (
 332                                attributes_provider_fully_qualified_name
 333                                == _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING["msc_config"]
 334                            ):
 335                                attributes_provider_options["config_dict"] = config_dict
 336                            attributes_provider: AttributesProvider = cls(**attributes_provider_options)
 337                            attributes_providers.append(attributes_provider)
 338                        self._metric_attributes_providers = tuple(attributes_providers)
 339                elif not any([metric_gauges, metric_counters, metric_attributes_providers]):
 340                    # TODO: Remove "beta" from the log once legacy metrics are removed.
 341                    logger.error("No telemetry instance! Disabling beta metrics.")
 342
 343        self._cache_dict = config_dict.get("cache", None)
 344
 345    def _build_storage_provider(
 346        self,
 347        storage_provider_name: str,
 348        storage_options: Optional[dict[str, Any]] = None,
 349        credentials_provider: Optional[CredentialsProvider] = None,
 350    ) -> StorageProvider:
 351        if storage_options is None:
 352            storage_options = {}
 353        if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
 354            raise ValueError(
 355                f"Storage provider {storage_provider_name} is not supported. "
 356                f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
 357            )
 358        if credentials_provider:
 359            storage_options["credentials_provider"] = credentials_provider
 360        if self._metric_gauges is not None:
 361            storage_options["metric_gauges"] = self._metric_gauges
 362        if self._metric_counters is not None:
 363            storage_options["metric_counters"] = self._metric_counters
 364        if self._metric_attributes_providers is not None:
 365            storage_options["metric_attributes_providers"] = self._metric_attributes_providers
 366        class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
 367        module_name = ".providers"
 368        cls = import_class(class_name, module_name, PACKAGE_NAME)
 369        return cls(**storage_options)
 370
 371    def _build_storage_provider_from_profile(self, storage_provider_profile: str):
 372        storage_profile_dict = self._profiles.get(storage_provider_profile)
 373        if not storage_profile_dict:
 374            raise ValueError(
 375                f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
 376            )
 377
 378        # Check if metadata provider is configured for this profile
 379        # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
 380        local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
 381        if local_metadata_provider_dict:
 382            raise ValueError(
 383                f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
 384            )
 385
 386        # Initialize CredentialsProvider
 387        local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
 388        local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
 389
 390        # Initialize StorageProvider
 391        local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
 392        if local_storage_provider_dict:
 393            local_name = local_storage_provider_dict["type"]
 394            local_storage_options = local_storage_provider_dict.get("options", {})
 395        else:
 396            raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
 397
 398        storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
 399        return storage_provider
 400
 401    def _build_credentials_provider(
 402        self,
 403        credentials_provider_dict: Optional[dict[str, Any]],
 404        storage_options: Optional[dict[str, Any]] = None,
 405    ) -> Optional[CredentialsProvider]:
 406        """
 407        Initializes the CredentialsProvider based on the provided dictionary.
 408
 409        Args:
 410            credentials_provider_dict: Dictionary containing credentials provider configuration
 411            storage_options: Storage provider options required by some credentials providers to scope the credentials.
 412        """
 413        if not credentials_provider_dict:
 414            return None
 415
 416        if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
 417            # Fully qualified class path case
 418            class_type = credentials_provider_dict["type"]
 419            module_name, class_name = class_type.rsplit(".", 1)
 420            cls = import_class(class_name, module_name)
 421        else:
 422            # Mapped class name case
 423            class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
 424            module_name = ".providers"
 425            cls = import_class(class_name, module_name, PACKAGE_NAME)
 426
 427        # Propagate storage provider options to credentials provider since they may be
 428        # required by some credentials providers to scope the credentials.
 429        import inspect
 430
 431        init_params = list(inspect.signature(cls.__init__).parameters)[1:]  # skip 'self'
 432        options = credentials_provider_dict.get("options", {})
 433        if storage_options:
 434            for storage_provider_option in storage_options.keys():
 435                if storage_provider_option in init_params and storage_provider_option not in options:
 436                    options[storage_provider_option] = storage_options[storage_provider_option]
 437
 438        return cls(**options)
 439
 440    def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
 441        # Initialize StorageProvider
 442        storage_provider_dict = profile_dict.get("storage_provider", None)
 443        if storage_provider_dict:
 444            storage_provider_name = storage_provider_dict["type"]
 445            storage_options = storage_provider_dict.get("options", {})
 446        else:
 447            raise ValueError("Missing storage_provider in the config.")
 448
 449        # Initialize CredentialsProvider
 450        # It is prudent to assume that in some cases, the credentials provider
 451        # will provide credentials scoped to specific base_path.
 452        # So we need to pass the storage_options to the credentials provider.
 453        credentials_provider_dict = profile_dict.get("credentials_provider", None)
 454        credentials_provider = self._build_credentials_provider(
 455            credentials_provider_dict=credentials_provider_dict,
 456            storage_options=storage_options,
 457        )
 458
 459        # Initialize MetadataProvider
 460        metadata_provider_dict = profile_dict.get("metadata_provider", None)
 461        metadata_provider = None
 462        if metadata_provider_dict:
 463            if metadata_provider_dict["type"] == "manifest":
 464                metadata_options = metadata_provider_dict.get("options", {})
 465                # If MetadataProvider has a reference to a different storage provider profile
 466                storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
 467                if storage_provider_profile:
 468                    storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
 469                else:
 470                    storage_provider = self._build_storage_provider(
 471                        storage_provider_name, storage_options, credentials_provider
 472                    )
 473
 474                metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
 475            else:
 476                class_type = metadata_provider_dict["type"]
 477                if "." not in class_type:
 478                    raise ValueError(
 479                        f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
 480                    )
 481                module_name, class_name = class_type.rsplit(".", 1)
 482                cls = import_class(class_name, module_name)
 483                options = metadata_provider_dict.get("options", {})
 484                metadata_provider = cls(**options)
 485
 486        # Build replicas if configured
 487        replicas_config = profile_dict.get("replicas", [])
 488        replicas = []
 489        if replicas_config:
 490            for replica_dict in replicas_config:
 491                replicas.append(
 492                    Replica(
 493                        replica_profile=replica_dict["replica_profile"],
 494                        read_priority=replica_dict["read_priority"],
 495                    )
 496                )
 497
 498            # Sort replicas by read_priority
 499            replicas.sort(key=lambda r: r.read_priority)
 500
 501        return SimpleProviderBundle(
 502            storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
 503            credentials_provider=credentials_provider,
 504            metadata_provider=metadata_provider,
 505            replicas=replicas,
 506        )
 507
 508    def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
 509        class_type = provider_bundle_dict["type"]
 510        module_name, class_name = class_type.rsplit(".", 1)
 511        cls = import_class(class_name, module_name)
 512        options = provider_bundle_dict.get("options", {})
 513        return cls(**options)
 514
 515    def _build_provider_bundle(self) -> ProviderBundle:
 516        if self._provider_bundle:
 517            return self._provider_bundle  # Return if previously provided.
 518
 519        # Load 3rd party extension
 520        provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
 521        if provider_bundle_dict:
 522            return self._build_provider_bundle_from_extension(provider_bundle_dict)
 523
 524        return self._build_provider_bundle_from_config(self._profile_dict)
 525
 526    def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
 527        if "size_mb" in cache_dict:
 528            raise ValueError(
 529                "The 'size_mb' property is no longer supported. \n"
 530                "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
 531                "Example configuration:\n"
 532                "cache:\n"
 533                "  size: 500G               # Optional: If not specified, default cache size (10GB) will be used\n"
 534                "  use_etag: true           # Optional: If not specified, default cache use_etag (true) will be used\n"
 535                "  location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
 536                "  eviction_policy:         # Optional: The eviction policy to use\n"
 537                "    policy: fifo           # Optional: The eviction policy to use, default is 'fifo'\n"
 538                "    refresh_interval: 300  # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
 539            )
 540
 541    def _validate_replicas(self, replicas: list[Replica]) -> None:
 542        """
 543        Validates that replica profiles do not have their own replicas configuration.
 544
 545        This prevents circular references where a replica profile could reference
 546        another profile that also has replicas, creating an infinite loop.
 547
 548        :param replicas: The list of Replica objects to validate
 549        :raises ValueError: If any replica profile has its own replicas configuration
 550        """
 551        for replica in replicas:
 552            replica_profile_name = replica.replica_profile
 553
 554            # Check that replica profile is not the same as the current profile
 555            if replica_profile_name == self._profile:
 556                raise ValueError(
 557                    f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
 558                )
 559
 560            # Check if the replica profile exists in the configuration
 561            if replica_profile_name not in self._profiles:
 562                raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
 563
 564            # Get the replica profile configuration
 565            replica_profile_dict = self._profiles[replica_profile_name]
 566
 567            # Check if the replica profile has its own replicas configuration
 568            if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
 569                raise ValueError(
 570                    f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
 571                    f"This creates a circular reference which is not allowed."
 572                )
 573
 574    def build_config(self) -> "StorageClientConfig":
 575        bundle = self._build_provider_bundle()
 576
 577        # Validate replicas to prevent circular references
 578        self._validate_replicas(bundle.replicas)
 579
 580        storage_provider = self._build_storage_provider(
 581            bundle.storage_provider_config.type,
 582            bundle.storage_provider_config.options,
 583            bundle.credentials_provider,
 584        )
 585
 586        cache_config: Optional[CacheConfig] = None
 587        cache_manager: Optional[CacheManager] = None
 588
 589        # Check if caching is enabled for this profile
 590        caching_enabled = self._profile_dict.get("caching_enabled", False)
 591
 592        if self._cache_dict is not None and caching_enabled:
 593            tempdir = tempfile.gettempdir()
 594            default_location = os.path.join(tempdir, "msc_cache")
 595            location = self._cache_dict.get("location", default_location)
 596
 597            # Check if cache_backend.cache_path is defined
 598            cache_backend = self._cache_dict.get("cache_backend", {})
 599            cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
 600
 601            # Warn if both location and cache_backend.cache_path are defined
 602            if cache_backend_path and self._cache_dict.get("location") is not None:
 603                logger.warning(
 604                    f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
 605                    f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
 606                )
 607            elif cache_backend_path:
 608                # Use cache_backend.cache_path only if location is not explicitly defined
 609                location = cache_backend_path
 610
 611            # Resolve the effective flag while preserving explicit ``False``.
 612            if "check_source_version" in self._cache_dict:
 613                check_source_version = self._cache_dict["check_source_version"]
 614            else:
 615                check_source_version = self._cache_dict.get("use_etag", True)
 616
 617            # Warn if both keys are specified – the new one wins.
 618            if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
 619                logger.warning(
 620                    "Both 'check_source_version' and 'use_etag' are defined in cache config. "
 621                    "Using 'check_source_version' and ignoring 'use_etag'."
 622                )
 623
 624            if not Path(location).is_absolute():
 625                raise ValueError(f"Cache location must be an absolute path: {location}")
 626
 627            # Initialize cache_dict with default values
 628            cache_dict = self._cache_dict
 629
 630            # Verify cache config
 631            self._verify_cache_config(cache_dict)
 632
 633            # Initialize eviction policy
 634            if "eviction_policy" in cache_dict:
 635                policy = cache_dict["eviction_policy"]["policy"].lower()
 636                eviction_policy = EvictionPolicyConfig(
 637                    policy=policy,
 638                    refresh_interval=cache_dict["eviction_policy"].get(
 639                        "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
 640                    ),
 641                )
 642            else:
 643                eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
 644
 645            # Create cache config from the standardized format
 646            cache_config = CacheConfig(
 647                size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
 648                location=cache_dict.get("location", location),
 649                check_source_version=check_source_version,
 650                eviction_policy=eviction_policy,
 651            )
 652
 653            cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
 654        elif self._cache_dict is not None and not caching_enabled:
 655            logger.debug(f"Caching is disabled for profile '{self._profile}'")
 656        elif self._cache_dict is None and caching_enabled:
 657            logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
 658
 659        # retry options
 660        retry_config_dict = self._profile_dict.get("retry", None)
 661        if retry_config_dict:
 662            attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
 663            delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
 664            retry_config = RetryConfig(attempts=attempts, delay=delay)
 665        else:
 666            retry_config = RetryConfig(attempts=DEFAULT_RETRY_ATTEMPTS, delay=DEFAULT_RETRY_DELAY)
 667
 668        # autocommit options
 669        autocommit_config = AutoCommitConfig()
 670        autocommit_dict = self._profile_dict.get("autocommit", None)
 671        if autocommit_dict:
 672            interval_minutes = autocommit_dict.get("interval_minutes", None)
 673            at_exit = autocommit_dict.get("at_exit", False)
 674            autocommit_config = AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
 675
 676        # set up OpenTelemetry providers once per process
 677        #
 678        # TODO: Legacy, need to remove.
 679        if self._opentelemetry_dict:
 680            setup_opentelemetry(self._opentelemetry_dict)
 681
 682        return StorageClientConfig(
 683            profile=self._profile,
 684            storage_provider=storage_provider,
 685            credentials_provider=bundle.credentials_provider,
 686            metadata_provider=bundle.metadata_provider,
 687            cache_config=cache_config,
 688            cache_manager=cache_manager,
 689            retry_config=retry_config,
 690            metric_gauges=self._metric_gauges,
 691            metric_counters=self._metric_counters,
 692            metric_attributes_providers=self._metric_attributes_providers,
 693            replicas=bundle.replicas,
 694            autocommit_config=autocommit_config,
 695        )
 696
 697
 698class PathMapping:
 699    """
 700    Class to handle path mappings defined in the MSC configuration.
 701
 702    Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
 703    where entries are sorted by prefix length (longest first) for optimal matching.
 704    Longer paths take precedence when matching.
 705    """
 706
 707    def __init__(self):
 708        """Initialize an empty PathMapping."""
 709        self._mapping = defaultdict(lambda: defaultdict(list))
 710
 711    @classmethod
 712    def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
 713        """
 714        Create a PathMapping instance from configuration dictionary.
 715
 716        :param config_dict: Configuration dictionary, if None the config will be loaded
 717        :return: A PathMapping instance with processed mappings
 718        """
 719        if config_dict is None:
 720            # Import locally to avoid circular imports
 721            from multistorageclient.config import StorageClientConfig
 722
 723            config_dict, _ = StorageClientConfig.read_msc_config()
 724
 725        if not config_dict:
 726            return cls()
 727
 728        instance = cls()
 729        instance._load_mapping(config_dict)
 730        return instance
 731
 732    def _load_mapping(self, config_dict: dict[str, Any]) -> None:
 733        """
 734        Load path mapping from a configuration dictionary.
 735
 736        :param config_dict: Configuration dictionary containing path mapping
 737        """
 738        # Get the path_mapping section
 739        path_mapping = config_dict.get("path_mapping", {})
 740        if path_mapping is None:
 741            return
 742
 743        # Process each mapping
 744        for source_path, dest_path in path_mapping.items():
 745            # Validate format
 746            if not source_path.endswith("/"):
 747                continue
 748            if not dest_path.startswith(MSC_PROTOCOL):
 749                continue
 750            if not dest_path.endswith("/"):
 751                continue
 752
 753            # Extract the destination profile
 754            pr_dest = urlparse(dest_path)
 755            dest_profile = pr_dest.netloc
 756
 757            # Parse the source path
 758            pr = urlparse(source_path)
 759            protocol = pr.scheme.lower() if pr.scheme else "file"
 760
 761            if protocol == "file" or source_path.startswith("/"):
 762                # For file or absolute paths, use the whole path as the prefix
 763                # and leave bucket empty
 764                bucket = ""
 765                prefix = source_path if source_path.startswith("/") else pr.path
 766            else:
 767                # For object storage, extract bucket and prefix
 768                bucket = pr.netloc
 769                prefix = pr.path
 770                if prefix.startswith("/"):
 771                    prefix = prefix[1:]
 772
 773            # Add the mapping to the nested dict
 774            self._mapping[protocol][bucket].append((prefix, dest_profile))
 775
 776        # Sort each bucket's prefixes by length (longest first) for optimal matching
 777        for protocol, buckets in self._mapping.items():
 778            for bucket, prefixes in buckets.items():
 779                self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
 780
 781    def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
 782        """
 783        Find the best matching mapping for the given URL.
 784
 785        :param url: URL to find matching mapping for
 786        :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
 787        """
 788        # Parse the URL
 789        pr = urlparse(url)
 790        protocol = pr.scheme.lower() if pr.scheme else "file"
 791
 792        # For file paths or absolute paths
 793        if protocol == "file" or url.startswith("/"):
 794            path = url if url.startswith("/") else pr.path
 795
 796            possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
 797
 798            # Check each prefix (already sorted by length, longest first)
 799            for prefix, profile in possible_mapping:
 800                if path.startswith(prefix):
 801                    # Calculate the relative path
 802                    rel_path = path[len(prefix) :]
 803                    if not rel_path.startswith("/"):
 804                        rel_path = "/" + rel_path
 805                    return profile, rel_path
 806
 807            return None
 808
 809        # For object storage
 810        bucket = pr.netloc
 811        path = pr.path
 812        if path.startswith("/"):
 813            path = path[1:]
 814
 815        # Check bucket-specific mapping
 816        possible_mapping = (
 817            self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
 818        )
 819
 820        # Check each prefix (already sorted by length, longest first)
 821        for prefix, profile in possible_mapping:
 822            # matching prefix
 823            if path.startswith(prefix):
 824                rel_path = path[len(prefix) :]
 825                # Remove leading slash if present
 826                if rel_path.startswith("/"):
 827                    rel_path = rel_path[1:]
 828
 829                return profile, rel_path
 830
 831        return None
 832
 833
[docs] 834class StorageClientConfig: 835 """ 836 Configuration class for the :py:class:`multistorageclient.StorageClient`. 837 """ 838 839 profile: str 840 storage_provider: StorageProvider 841 credentials_provider: Optional[CredentialsProvider] 842 metadata_provider: Optional[MetadataProvider] 843 cache_config: Optional[CacheConfig] 844 cache_manager: Optional[CacheManager] 845 retry_config: Optional[RetryConfig] 846 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] 847 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] 848 metric_attributes_providers: Optional[Sequence[AttributesProvider]] 849 replicas: list[Replica] 850 autocommit_config: Optional[AutoCommitConfig] 851 852 _config_dict: Optional[dict[str, Any]] 853 854 def __init__( 855 self, 856 profile: str, 857 storage_provider: StorageProvider, 858 credentials_provider: Optional[CredentialsProvider] = None, 859 metadata_provider: Optional[MetadataProvider] = None, 860 cache_config: Optional[CacheConfig] = None, 861 cache_manager: Optional[CacheManager] = None, 862 retry_config: Optional[RetryConfig] = None, 863 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] = None, 864 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] = None, 865 metric_attributes_providers: Optional[Sequence[AttributesProvider]] = None, 866 replicas: Optional[list[Replica]] = None, 867 autocommit_config: Optional[AutoCommitConfig] = None, 868 ): 869 if replicas is None: 870 replicas = [] 871 self.profile = profile 872 self.storage_provider = storage_provider 873 self.credentials_provider = credentials_provider 874 self.metadata_provider = metadata_provider 875 self.cache_config = cache_config 876 self.cache_manager = cache_manager 877 self.retry_config = retry_config 878 self.metric_gauges = metric_gauges 879 self.metric_counters = metric_counters 880 self.metric_attributes_providers = metric_attributes_providers 881 self.replicas = replicas 882 self.autocommit_config = autocommit_config 883 884 @staticmethod 885 def from_json( 886 config_json: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None 887 ) -> "StorageClientConfig": 888 config_dict = json.loads(config_json) 889 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry) 890 891 @staticmethod 892 def from_yaml( 893 config_yaml: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None 894 ) -> "StorageClientConfig": 895 config_dict = yaml.safe_load(config_yaml) 896 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry) 897 898 @staticmethod 899 def from_dict( 900 config_dict: dict[str, Any], 901 profile: str = DEFAULT_POSIX_PROFILE_NAME, 902 skip_validation: bool = False, 903 telemetry: Optional[Telemetry] = None, 904 ) -> "StorageClientConfig": 905 # Validate the config file with predefined JSON schema 906 if not skip_validation: 907 validate_config(config_dict) 908 909 # Load config 910 loader = StorageClientConfigLoader(config_dict=config_dict, profile=profile, telemetry=telemetry) 911 config = loader.build_config() 912 config._config_dict = config_dict 913 914 return config 915 916 @staticmethod 917 def from_file( 918 profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None 919 ) -> "StorageClientConfig": 920 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config() 921 # Parse rclone config file. 922 rclone_config_dict, rclone_config_file = read_rclone_config() 923 924 # Merge config files. 925 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict) 926 if conflicted_keys: 927 raise ValueError( 928 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}' 929 ) 930 merged_profiles = merged_config.get("profiles", {}) 931 932 # Check if profile is in merged_profiles 933 if profile in merged_profiles: 934 return StorageClientConfig.from_dict(config_dict=merged_config, profile=profile, telemetry=telemetry) 935 else: 936 # Check if profile is the default profile or an implicit profile 937 if profile == DEFAULT_POSIX_PROFILE_NAME: 938 implicit_profile_config = DEFAULT_POSIX_PROFILE 939 elif profile.startswith("_"): 940 # Handle implicit profiles 941 parts = profile[1:].split("-", 1) 942 if len(parts) == 2: 943 protocol, bucket = parts 944 # Verify it's a supported protocol 945 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 946 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"') 947 implicit_profile_config = create_implicit_profile_config( 948 profile_name=profile, protocol=protocol, base_path=bucket 949 ) 950 else: 951 raise ValueError(f'Invalid implicit profile format: "{profile}"') 952 else: 953 raise ValueError( 954 f'Profile "{profile}" not found in configuration files. Configuration was checked in ' 955 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. " 956 f"Please verify that the profile exists and that configuration files are correctly located." 957 ) 958 # merge the implicit profile config into the merged config so the cache & observability config can be inherited 959 if "profiles" not in merged_config: 960 merged_config["profiles"] = implicit_profile_config["profiles"] 961 else: 962 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile] 963 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_" 964 return StorageClientConfig.from_dict( 965 config_dict=merged_config, profile=profile, skip_validation=True, telemetry=telemetry 966 ) 967 968 @staticmethod 969 def from_provider_bundle( 970 config_dict: dict[str, Any], provider_bundle: ProviderBundle, telemetry: Optional[Telemetry] = None 971 ) -> "StorageClientConfig": 972 loader = StorageClientConfigLoader( 973 config_dict=config_dict, provider_bundle=provider_bundle, telemetry=telemetry 974 ) 975 config = loader.build_config() 976 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors 977 return config 978
[docs] 979 @staticmethod 980 def read_msc_config() -> tuple[Optional[dict[str, Any]], Optional[str]]: 981 """Get the MSC configuration dictionary and the path of the config file used. 982 983 Configs are searched in the following order: 984 1. MSC_CONFIG environment variable (highest precedence) 985 2. Standard search paths (user-specified config and system-wide config) 986 987 988 :return: Tuple of (config_dict, config_file_path). config_dict is the MSC configuration 989 dictionary or empty dict if no config was found. config_file_path is the absolute 990 path of the config file used, or None if no config file was found. 991 """ 992 config_dict = {} 993 found_config_files = [] 994 used_config_file = None 995 996 # Check for environment variable first 997 msc_config = os.getenv("MSC_CONFIG", None) 998 if msc_config and os.path.exists(msc_config): 999 try: 1000 with open(msc_config) as f: 1001 if msc_config.endswith(".json"): 1002 config_dict = json.load(f) 1003 used_config_file = msc_config 1004 else: 1005 config_dict = yaml.safe_load(f) 1006 used_config_file = msc_config 1007 found_config_files.append(msc_config) 1008 except Exception as e: 1009 raise ValueError(f"malformed msc config file: {msc_config}, exception: {e}") 1010 1011 # Check all standard search paths 1012 for path in _find_config_file_paths(): 1013 if os.path.exists(path): 1014 found_config_files.append(path) 1015 # Only load the first found config file (environment variable takes precedence) 1016 if used_config_file is None: 1017 try: 1018 with open(path) as f: 1019 if path.endswith(".json"): 1020 config_dict = json.load(f) 1021 used_config_file = path 1022 else: 1023 config_dict = yaml.safe_load(f) 1024 used_config_file = path 1025 except Exception as e: 1026 raise ValueError(f"malformed msc config file: {path}, exception: {e}") 1027 1028 # Log warnings and info messages 1029 if len(found_config_files) > 1: 1030 found_config_files_str = ", ".join(found_config_files) 1031 logger.warning(f"Multiple MSC config files found: {found_config_files_str}. ") 1032 1033 if len(found_config_files) == 0: 1034 logger.warning("No MSC config files found in any of the search locations.") 1035 else: 1036 logger.info(f"Using MSC config file: {used_config_file}") 1037 1038 if config_dict: 1039 validate_config(config_dict) 1040 return config_dict, used_config_file
1041
[docs] 1042 @staticmethod 1043 def read_path_mapping() -> PathMapping: 1044 """ 1045 Get the path mapping defined in the MSC configuration. 1046 1047 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)] 1048 where entries are sorted by prefix length (longest first) for optimal matching. 1049 Longer paths take precedence when matching. 1050 1051 :return: A PathMapping instance with translation mappings 1052 """ 1053 try: 1054 return PathMapping.from_config() 1055 except Exception: 1056 # Log the error but continue - this shouldn't stop the application from working 1057 logger.error("Failed to load path_mapping from MSC config") 1058 return PathMapping()
1059 1060 def __getstate__(self) -> dict[str, Any]: 1061 state = self.__dict__.copy() 1062 if not state.get("_config_dict"): 1063 raise ValueError("StorageClientConfig is not serializable") 1064 del state["credentials_provider"] 1065 del state["storage_provider"] 1066 del state["metadata_provider"] 1067 del state["cache_manager"] 1068 del state["replicas"] 1069 return state 1070 1071 def __setstate__(self, state: dict[str, Any]) -> None: 1072 # Presence checked by __getstate__. 1073 config_dict = state["_config_dict"] 1074 loader = StorageClientConfigLoader( 1075 config_dict=config_dict, 1076 profile=state["profile"], 1077 metric_gauges=state["metric_gauges"], 1078 metric_counters=state["metric_counters"], 1079 metric_attributes_providers=state["metric_attributes_providers"], 1080 ) 1081 new_config = loader.build_config() 1082 self.profile = new_config.profile 1083 self.storage_provider = new_config.storage_provider 1084 self.credentials_provider = new_config.credentials_provider 1085 self.metadata_provider = new_config.metadata_provider 1086 self.cache_config = new_config.cache_config 1087 self.cache_manager = new_config.cache_manager 1088 self.retry_config = new_config.retry_config 1089 self.metric_gauges = new_config.metric_gauges 1090 self.metric_counters = new_config.metric_counters 1091 self.metric_attributes_providers = new_config.metric_attributes_providers 1092 self._config_dict = config_dict 1093 self.replicas = new_config.replicas 1094 self.autocommit_config = new_config.autocommit_config