Source code for multistorageclient.config

   1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
   2# SPDX-License-Identifier: Apache-2.0
   3#
   4# Licensed under the Apache License, Version 2.0 (the "License");
   5# you may not use this file except in compliance with the License.
   6# You may obtain a copy of the License at
   7#
   8# http://www.apache.org/licenses/LICENSE-2.0
   9#
  10# Unless required by applicable law or agreed to in writing, software
  11# distributed under the License is distributed on an "AS IS" BASIS,
  12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13# See the License for the specific language governing permissions and
  14# limitations under the License.
  15
  16import copy
  17import json
  18import logging
  19import os
  20import tempfile
  21from collections import defaultdict
  22from collections.abc import Callable
  23from pathlib import Path
  24from typing import Any, Iterable, Optional, Union
  25from urllib.parse import urlparse
  26
  27import yaml
  28
  29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
  30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
  31from .providers.manifest_metadata import ManifestMetadataProvider
  32from .rclone import read_rclone_config
  33from .schema import validate_config
  34from .telemetry import Telemetry
  35from .telemetry import init as telemetry_init
  36from .types import (
  37    DEFAULT_RETRY_ATTEMPTS,
  38    DEFAULT_RETRY_BACKOFF_MULTIPLIER,
  39    DEFAULT_RETRY_DELAY,
  40    MSC_PROTOCOL,
  41    AutoCommitConfig,
  42    CredentialsProvider,
  43    MetadataProvider,
  44    ProviderBundle,
  45    ProviderBundleV2,
  46    Replica,
  47    RetryConfig,
  48    StorageBackend,
  49    StorageProvider,
  50    StorageProviderConfig,
  51)
  52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
  53
  54# Constants related to implicit profiles
  55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
  56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
  57    "s3": "s3",
  58    "gs": "gcs",
  59    "ais": "ais",
  60    "file": "file",
  61}
  62
  63
  64# Template for creating implicit profile configurations
  65def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
  66    """
  67    Create a configuration dictionary for an implicit profile.
  68
  69    :param profile_name: The name of the profile (e.g., "_s3-bucket1")
  70    :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
  71    :param base_path: The base path (e.g., bucket name) for the storage provider
  72
  73    :return: A configuration dictionary for the implicit profile
  74    """
  75    provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
  76    return {
  77        "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
  78    }
  79
  80
  81DEFAULT_POSIX_PROFILE_NAME = "default"
  82DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
  83
  84STORAGE_PROVIDER_MAPPING = {
  85    "file": "PosixFileStorageProvider",
  86    "s3": "S3StorageProvider",
  87    "gcs": "GoogleStorageProvider",
  88    "oci": "OracleStorageProvider",
  89    "azure": "AzureBlobStorageProvider",
  90    "ais": "AIStoreStorageProvider",
  91    "ais_s3": "AIStoreS3StorageProvider",
  92    "s8k": "S8KStorageProvider",
  93    "gcs_s3": "GoogleS3StorageProvider",
  94    "huggingface": "HuggingFaceStorageProvider",
  95}
  96
  97CREDENTIALS_PROVIDER_MAPPING = {
  98    "S3Credentials": "StaticS3CredentialsProvider",
  99    "AzureCredentials": "StaticAzureCredentialsProvider",
 100    "AISCredentials": "StaticAISCredentialProvider",
 101    "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
 102    "GoogleServiceAccountCredentialsProvider": "GoogleServiceAccountCredentialsProvider",
 103    "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
 104    "FileBasedCredentials": "FileBasedCredentialsProvider",
 105}
 106
 107
 108def _find_config_file_paths() -> tuple[str]:
 109    """
 110    Get configuration file search paths.
 111
 112    Returns paths in order of precedence:
 113
 114    1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
 115    2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
 116    """
 117    paths = []
 118
 119    # 1. User-specific configuration directory
 120    xdg_config_home = os.getenv("XDG_CONFIG_HOME")
 121
 122    if xdg_config_home:
 123        paths.extend(
 124            [
 125                os.path.join(xdg_config_home, "msc", "config.yaml"),
 126                os.path.join(xdg_config_home, "msc", "config.json"),
 127            ]
 128        )
 129
 130    user_home = os.getenv("HOME")
 131
 132    if user_home:
 133        paths.extend(
 134            [
 135                os.path.join(user_home, ".msc_config.yaml"),
 136                os.path.join(user_home, ".msc_config.json"),
 137                os.path.join(user_home, ".config", "msc", "config.yaml"),
 138                os.path.join(user_home, ".config", "msc", "config.json"),
 139            ]
 140        )
 141
 142    # 2. System-wide configuration directories
 143    xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
 144    if not xdg_config_dirs:
 145        xdg_config_dirs = "/etc/xdg"
 146
 147    for config_dir in xdg_config_dirs.split(":"):
 148        config_dir = config_dir.strip()
 149        if config_dir:
 150            paths.extend(
 151                [
 152                    os.path.join(config_dir, "msc", "config.yaml"),
 153                    os.path.join(config_dir, "msc", "config.json"),
 154                ]
 155            )
 156
 157    paths.extend(
 158        [
 159            "/etc/msc_config.yaml",
 160            "/etc/msc_config.json",
 161        ]
 162    )
 163
 164    return tuple(paths)
 165
 166
 167PACKAGE_NAME = "multistorageclient"
 168
 169logger = logging.getLogger(__name__)
 170
 171
 172class ImmutableDict(dict):
 173    """
 174    Immutable dictionary that raises an error when attempting to modify it.
 175    """
 176
 177    def __init__(self, *args, **kwargs):
 178        super().__init__(*args, **kwargs)
 179
 180        # Recursively freeze nested structures
 181        for key, value in list(super().items()):
 182            if isinstance(value, dict) and not isinstance(value, ImmutableDict):
 183                super().__setitem__(key, ImmutableDict(value))
 184            elif isinstance(value, list):
 185                super().__setitem__(key, self._freeze_list(value))
 186
 187    @staticmethod
 188    def _freeze_list(lst):
 189        """
 190        Convert list to tuple, freezing nested dicts recursively.
 191        """
 192        frozen = []
 193        for item in lst:
 194            if isinstance(item, dict):
 195                frozen.append(ImmutableDict(item))
 196            elif isinstance(item, list):
 197                frozen.append(ImmutableDict._freeze_list(item))
 198            else:
 199                frozen.append(item)
 200        return tuple(frozen)
 201
 202    def __deepcopy__(self, memo):
 203        """
 204        Return a regular mutable dict when deepcopy is called.
 205        """
 206        return copy.deepcopy(dict(self), memo)
 207
 208    def __reduce__(self):
 209        """
 210        Support for pickle serialization.
 211        """
 212        return (self.__class__, (dict(self),))
 213
 214    def _copy_value(self, value):
 215        """
 216        Convert frozen structures back to mutable equivalents.
 217        """
 218        if isinstance(value, ImmutableDict):
 219            return {k: self._copy_value(v) for k, v in value.items()}
 220        elif isinstance(value, tuple):
 221            # Check if it was originally a list (frozen by _freeze_list)
 222            return [self._copy_value(item) for item in value]
 223        else:
 224            return value
 225
 226    def __getitem__(self, key):
 227        """
 228        Return a mutable copy of the value.
 229        """
 230        value = super().__getitem__(key)
 231        return self._copy_value(value)
 232
 233    def get(self, key, default=None):
 234        """
 235        Return a mutable copy of the value.
 236        """
 237        return self[key] if key in self else default
 238
 239    def __setitem__(self, key, value):
 240        raise TypeError("ImmutableDict is immutable")
 241
 242    def __delitem__(self, key):
 243        raise TypeError("ImmutableDict is immutable")
 244
 245    def clear(self):
 246        raise TypeError("ImmutableDict is immutable")
 247
 248    def pop(self, *args):
 249        raise TypeError("ImmutableDict is immutable")
 250
 251    def popitem(self):
 252        raise TypeError("ImmutableDict is immutable")
 253
 254    def setdefault(self, key, default=None):
 255        raise TypeError("ImmutableDict is immutable")
 256
 257    def update(self, *args, **kwargs):
 258        raise TypeError("ImmutableDict is immutable")
 259
 260
 261class SimpleProviderBundle(ProviderBundle):
 262    def __init__(
 263        self,
 264        storage_provider_config: StorageProviderConfig,
 265        credentials_provider: Optional[CredentialsProvider] = None,
 266        metadata_provider: Optional[MetadataProvider] = None,
 267        replicas: Optional[list[Replica]] = None,
 268    ):
 269        if replicas is None:
 270            replicas = []
 271
 272        self._storage_provider_config = storage_provider_config
 273        self._credentials_provider = credentials_provider
 274        self._metadata_provider = metadata_provider
 275        self._replicas = replicas
 276
 277    @property
 278    def storage_provider_config(self) -> StorageProviderConfig:
 279        return self._storage_provider_config
 280
 281    @property
 282    def credentials_provider(self) -> Optional[CredentialsProvider]:
 283        return self._credentials_provider
 284
 285    @property
 286    def metadata_provider(self) -> Optional[MetadataProvider]:
 287        return self._metadata_provider
 288
 289    @property
 290    def replicas(self) -> list[Replica]:
 291        return self._replicas
 292
 293
 294class SimpleProviderBundleV2(ProviderBundleV2):
 295    def __init__(
 296        self, storage_backends: dict[str, StorageBackend], metadata_provider: Optional[MetadataProvider] = None
 297    ):
 298        self._storage_backends = storage_backends
 299        self._metadata_provider = metadata_provider
 300
 301    @staticmethod
 302    def from_v1_bundle(profile_name: str, v1_bundle: ProviderBundle) -> "SimpleProviderBundleV2":
 303        backend = StorageBackend(
 304            storage_provider_config=v1_bundle.storage_provider_config,
 305            credentials_provider=v1_bundle.credentials_provider,
 306            replicas=v1_bundle.replicas,
 307        )
 308
 309        return SimpleProviderBundleV2(
 310            storage_backends={profile_name: backend},
 311            metadata_provider=v1_bundle.metadata_provider,
 312        )
 313
 314    @property
 315    def storage_backends(self) -> dict[str, StorageBackend]:
 316        return self._storage_backends
 317
 318    @property
 319    def metadata_provider(self) -> Optional[MetadataProvider]:
 320        return self._metadata_provider
 321
 322
 323DEFAULT_CACHE_REFRESH_INTERVAL = 300
 324
 325
 326class StorageClientConfigLoader:
 327    _provider_bundle: ProviderBundleV2
 328    _resolved_config_dict: dict[str, Any]
 329    _profiles: dict[str, Any]
 330    _profile: str
 331    _profile_dict: dict[str, Any]
 332    _opentelemetry_dict: Optional[dict[str, Any]]
 333    _telemetry_provider: Optional[Callable[[], Telemetry]]
 334    _cache_dict: Optional[dict[str, Any]]
 335
 336    def __init__(
 337        self,
 338        config_dict: dict[str, Any],
 339        profile: str = DEFAULT_POSIX_PROFILE_NAME,
 340        provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]] = None,
 341        telemetry_provider: Optional[Callable[[], Telemetry]] = None,
 342    ) -> None:
 343        """
 344        Initializes a :py:class:`StorageClientConfigLoader` to create a
 345        StorageClientConfig. Components are built using the ``config_dict`` and
 346        profile, but a pre-built provider_bundle takes precedence.
 347
 348        :param config_dict: Dictionary of configuration options.
 349        :param profile: Name of profile in ``config_dict`` to use to build configuration.
 350        :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle` or :py:class:`multistorageclient.types.ProviderBundleV2`, takes precedence over ``config_dict``.
 351        :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
 352        """
 353        # Interpolates all environment variables into actual values.
 354        config_dict = expand_env_vars(config_dict)
 355        self._resolved_config_dict = ImmutableDict(config_dict)
 356
 357        self._profiles = config_dict.get("profiles", {})
 358
 359        if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
 360            # Assign the default POSIX profile
 361            self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
 362        else:
 363            # Cannot override default POSIX profile
 364            storage_provider_type = (
 365                self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
 366            )
 367            if storage_provider_type != "file":
 368                raise ValueError(
 369                    f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
 370                    f'"{storage_provider_type}"; expected "file".'
 371                )
 372
 373        profile_dict = self._profiles.get(profile)
 374
 375        if not profile_dict:
 376            raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
 377
 378        self._profile = profile
 379        self._profile_dict = ImmutableDict(profile_dict)
 380
 381        self._opentelemetry_dict = config_dict.get("opentelemetry", None)
 382        # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
 383        # New processes (e.g. multiprocessing manager server) can't be created during this phase.
 384        #
 385        # Pass thunks everywhere instead for lazy telemetry initialization.
 386        self._telemetry_provider = telemetry_provider or telemetry_init
 387
 388        self._cache_dict = config_dict.get("cache", None)
 389        self._experimental_features = config_dict.get("experimental_features", None)
 390
 391        self._provider_bundle = self._build_provider_bundle(provider_bundle)
 392        self._inject_profiles_from_bundle()
 393
 394    def _build_storage_provider(
 395        self,
 396        storage_provider_name: str,
 397        storage_options: Optional[dict[str, Any]] = None,
 398        credentials_provider: Optional[CredentialsProvider] = None,
 399    ) -> StorageProvider:
 400        if storage_options is None:
 401            storage_options = {}
 402        if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
 403            raise ValueError(
 404                f"Storage provider {storage_provider_name} is not supported. "
 405                f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
 406            )
 407        if credentials_provider:
 408            storage_options["credentials_provider"] = credentials_provider
 409        if self._resolved_config_dict is not None:
 410            # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
 411            storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
 412        if self._telemetry_provider is not None:
 413            storage_options["telemetry_provider"] = self._telemetry_provider
 414        class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
 415        module_name = ".providers"
 416        cls = import_class(class_name, module_name, PACKAGE_NAME)
 417        return cls(**storage_options)
 418
 419    def _build_storage_provider_from_profile(self, storage_provider_profile: str):
 420        storage_profile_dict = self._profiles.get(storage_provider_profile)
 421        if not storage_profile_dict:
 422            raise ValueError(
 423                f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
 424            )
 425
 426        # Check if metadata provider is configured for this profile
 427        # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
 428        local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
 429        if local_metadata_provider_dict:
 430            raise ValueError(
 431                f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
 432            )
 433
 434        # Initialize CredentialsProvider
 435        local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
 436        local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
 437
 438        # Initialize StorageProvider
 439        local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
 440        if local_storage_provider_dict:
 441            local_name = local_storage_provider_dict["type"]
 442            local_storage_options = local_storage_provider_dict.get("options", {})
 443        else:
 444            raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
 445
 446        storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
 447        return storage_provider
 448
 449    def _build_credentials_provider(
 450        self,
 451        credentials_provider_dict: Optional[dict[str, Any]],
 452        storage_options: Optional[dict[str, Any]] = None,
 453    ) -> Optional[CredentialsProvider]:
 454        """
 455        Initializes the CredentialsProvider based on the provided dictionary.
 456
 457        Args:
 458            credentials_provider_dict: Dictionary containing credentials provider configuration
 459            storage_options: Storage provider options required by some credentials providers to scope the credentials.
 460        """
 461        if not credentials_provider_dict:
 462            return None
 463
 464        if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
 465            # Fully qualified class path case
 466            class_type = credentials_provider_dict["type"]
 467            module_name, class_name = class_type.rsplit(".", 1)
 468            cls = import_class(class_name, module_name)
 469        else:
 470            # Mapped class name case
 471            class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
 472            module_name = ".providers"
 473            cls = import_class(class_name, module_name, PACKAGE_NAME)
 474
 475        # Propagate storage provider options to credentials provider since they may be
 476        # required by some credentials providers to scope the credentials.
 477        import inspect
 478
 479        init_params = list(inspect.signature(cls.__init__).parameters)[1:]  # skip 'self'
 480        options = credentials_provider_dict.get("options", {})
 481        if storage_options:
 482            for storage_provider_option in storage_options.keys():
 483                if storage_provider_option in init_params and storage_provider_option not in options:
 484                    options[storage_provider_option] = storage_options[storage_provider_option]
 485
 486        return cls(**options)
 487
 488    def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
 489        # Initialize StorageProvider
 490        storage_provider_dict = profile_dict.get("storage_provider", None)
 491        if storage_provider_dict:
 492            storage_provider_name = storage_provider_dict["type"]
 493            storage_options = storage_provider_dict.get("options", {})
 494        else:
 495            raise ValueError("Missing storage_provider in the config.")
 496
 497        # Initialize CredentialsProvider
 498        # It is prudent to assume that in some cases, the credentials provider
 499        # will provide credentials scoped to specific base_path.
 500        # So we need to pass the storage_options to the credentials provider.
 501        credentials_provider_dict = profile_dict.get("credentials_provider", None)
 502        credentials_provider = self._build_credentials_provider(
 503            credentials_provider_dict=credentials_provider_dict,
 504            storage_options=storage_options,
 505        )
 506
 507        # Initialize MetadataProvider
 508        metadata_provider_dict = profile_dict.get("metadata_provider", None)
 509        metadata_provider = None
 510        if metadata_provider_dict:
 511            if metadata_provider_dict["type"] == "manifest":
 512                metadata_options = metadata_provider_dict.get("options", {})
 513                # If MetadataProvider has a reference to a different storage provider profile
 514                storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
 515                if storage_provider_profile:
 516                    storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
 517                else:
 518                    storage_provider = self._build_storage_provider(
 519                        storage_provider_name, storage_options, credentials_provider
 520                    )
 521
 522                metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
 523            else:
 524                class_type = metadata_provider_dict["type"]
 525                if "." not in class_type:
 526                    raise ValueError(
 527                        f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
 528                    )
 529                module_name, class_name = class_type.rsplit(".", 1)
 530                cls = import_class(class_name, module_name)
 531                options = metadata_provider_dict.get("options", {})
 532                metadata_provider = cls(**options)
 533
 534        # Build replicas if configured
 535        replicas_config = profile_dict.get("replicas", [])
 536        replicas = []
 537        if replicas_config:
 538            for replica_dict in replicas_config:
 539                replicas.append(
 540                    Replica(
 541                        replica_profile=replica_dict["replica_profile"],
 542                        read_priority=replica_dict["read_priority"],
 543                    )
 544                )
 545
 546            # Sort replicas by read_priority
 547            replicas.sort(key=lambda r: r.read_priority)
 548
 549        return SimpleProviderBundle(
 550            storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
 551            credentials_provider=credentials_provider,
 552            metadata_provider=metadata_provider,
 553            replicas=replicas,
 554        )
 555
 556    def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
 557        class_type = provider_bundle_dict["type"]
 558        module_name, class_name = class_type.rsplit(".", 1)
 559        cls = import_class(class_name, module_name)
 560        options = provider_bundle_dict.get("options", {})
 561        return cls(**options)
 562
 563    def _build_provider_bundle(
 564        self, provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]]
 565    ) -> ProviderBundleV2:
 566        if provider_bundle:
 567            bundle = provider_bundle
 568        else:
 569            provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
 570            if provider_bundle_dict:
 571                bundle = self._build_provider_bundle_from_extension(provider_bundle_dict)
 572            else:
 573                bundle = self._build_provider_bundle_from_config(self._profile_dict)
 574
 575        if isinstance(bundle, ProviderBundle) and not isinstance(bundle, ProviderBundleV2):
 576            bundle = SimpleProviderBundleV2.from_v1_bundle(self._profile, bundle)
 577
 578        return bundle
 579
 580    def _inject_profiles_from_bundle(self) -> None:
 581        if len(self._provider_bundle.storage_backends) > 1:
 582            profiles = copy.deepcopy(self._profiles)
 583
 584            for child_name, backend in self._provider_bundle.storage_backends.items():
 585                if child_name in self._profiles:
 586                    raise ValueError(f"Profile '{child_name}' already exists in configuration.")
 587                child_config = {
 588                    "storage_provider": {
 589                        "type": backend.storage_provider_config.type,
 590                        "options": backend.storage_provider_config.options,
 591                    }
 592                }
 593                profiles[child_name] = child_config
 594
 595            resolved_config_dict = copy.deepcopy(self._resolved_config_dict)
 596            resolved_config_dict["profiles"] = profiles
 597
 598            self._profiles = ImmutableDict(profiles)
 599            self._resolved_config_dict = ImmutableDict(resolved_config_dict)
 600
 601    def _build_retry_config(self) -> RetryConfig:
 602        """Build retry config from profile dict."""
 603        retry_config_dict = self._profile_dict.get("retry", None)
 604        if retry_config_dict:
 605            attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
 606            delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
 607            backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
 608            return RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
 609        else:
 610            return RetryConfig(
 611                attempts=DEFAULT_RETRY_ATTEMPTS,
 612                delay=DEFAULT_RETRY_DELAY,
 613                backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
 614            )
 615
 616    def _build_autocommit_config(self) -> AutoCommitConfig:
 617        """Build autocommit config from profile dict."""
 618        autocommit_dict = self._profile_dict.get("autocommit", None)
 619        if autocommit_dict:
 620            interval_minutes = autocommit_dict.get("interval_minutes", None)
 621            at_exit = autocommit_dict.get("at_exit", False)
 622            return AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
 623        return AutoCommitConfig()
 624
 625    def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
 626        if "size_mb" in cache_dict:
 627            raise ValueError(
 628                "The 'size_mb' property is no longer supported. \n"
 629                "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
 630                "Example configuration:\n"
 631                "cache:\n"
 632                "  size: 500G               # Optional: If not specified, default cache size (10GB) will be used\n"
 633                "  use_etag: true           # Optional: If not specified, default cache use_etag (true) will be used\n"
 634                "  location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
 635                "  eviction_policy:         # Optional: The eviction policy to use\n"
 636                "    policy: fifo           # Optional: The eviction policy to use, default is 'fifo'\n"
 637                "    refresh_interval: 300  # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
 638            )
 639
 640    def _validate_replicas(self, replicas: list[Replica]) -> None:
 641        """
 642        Validates that replica profiles do not have their own replicas configuration.
 643
 644        This prevents circular references where a replica profile could reference
 645        another profile that also has replicas, creating an infinite loop.
 646
 647        :param replicas: The list of Replica objects to validate
 648        :raises ValueError: If any replica profile has its own replicas configuration
 649        """
 650        for replica in replicas:
 651            replica_profile_name = replica.replica_profile
 652
 653            # Check that replica profile is not the same as the current profile
 654            if replica_profile_name == self._profile:
 655                raise ValueError(
 656                    f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
 657                )
 658
 659            # Check if the replica profile exists in the configuration
 660            if replica_profile_name not in self._profiles:
 661                raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
 662
 663            # Get the replica profile configuration
 664            replica_profile_dict = self._profiles[replica_profile_name]
 665
 666            # Check if the replica profile has its own replicas configuration
 667            if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
 668                raise ValueError(
 669                    f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
 670                    f"This creates a circular reference which is not allowed."
 671                )
 672
 673    # todo: remove once experimental features are stable
 674    def _validate_experimental_features(self, eviction_policy: EvictionPolicyConfig) -> None:
 675        """
 676        Validate that experimental features are enabled when used.
 677
 678        :param eviction_policy: The eviction policy configuration to validate
 679        :raises ValueError: If experimental features are used without being enabled
 680        """
 681        # Validate MRU eviction policy
 682        if eviction_policy.policy.upper() == "MRU":
 683            if not (self._experimental_features and self._experimental_features.get("cache_mru_eviction")):
 684                raise ValueError(
 685                    "MRU eviction policy is experimental and not enabled.\n"
 686                    "Enable it by adding to config:\n"
 687                    "  experimental_features:\n"
 688                    "    cache_mru_eviction: true"
 689                )
 690
 691        # Validate purge_factor
 692        if eviction_policy.purge_factor != 0:
 693            if eviction_policy.purge_factor < 0 or eviction_policy.purge_factor > 100:
 694                raise ValueError("purge_factor must be between 0 and 100")
 695
 696            if not (self._experimental_features and self._experimental_features.get("cache_purge_factor")):
 697                raise ValueError(
 698                    "purge_factor is experimental and not enabled.\n"
 699                    "Enable it by adding to config:\n"
 700                    "  experimental_features:\n"
 701                    "    cache_purge_factor: true"
 702                )
 703
 704    def build_config(self) -> "StorageClientConfig":
 705        bundle = self._provider_bundle
 706        backends = bundle.storage_backends
 707
 708        if len(backends) > 1:
 709            if not bundle.metadata_provider:
 710                raise ValueError(
 711                    f"Multi-backend configuration for profile '{self._profile}' requires metadata_provider "
 712                    "for routing between storage locations."
 713                )
 714
 715            child_profile_names = list(backends.keys())
 716            retry_config = self._build_retry_config()
 717            autocommit_config = self._build_autocommit_config()
 718
 719            # config for Composite StorageClient
 720            config = StorageClientConfig(
 721                profile=self._profile,
 722                storage_provider=None,
 723                credentials_provider=None,
 724                storage_provider_profiles=child_profile_names,
 725                metadata_provider=bundle.metadata_provider,
 726                cache_config=None,
 727                cache_manager=None,
 728                retry_config=retry_config,
 729                telemetry_provider=self._telemetry_provider,
 730                replicas=[],
 731                autocommit_config=autocommit_config,
 732            )
 733
 734            config._config_dict = self._resolved_config_dict
 735            return config
 736
 737        # Single-backend (len == 1)
 738        _, backend = list(backends.items())[0]
 739        storage_provider = self._build_storage_provider(
 740            backend.storage_provider_config.type,
 741            backend.storage_provider_config.options,
 742            backend.credentials_provider,
 743        )
 744        credentials_provider = backend.credentials_provider
 745        metadata_provider = bundle.metadata_provider
 746        replicas = backend.replicas
 747
 748        # Validate replicas to prevent circular references
 749        if replicas:
 750            self._validate_replicas(replicas)
 751
 752        cache_config: Optional[CacheConfig] = None
 753        cache_manager: Optional[CacheManager] = None
 754
 755        # Check if caching is enabled for this profile
 756        caching_enabled = self._profile_dict.get("caching_enabled", False)
 757
 758        if self._cache_dict is not None and caching_enabled:
 759            tempdir = tempfile.gettempdir()
 760            default_location = os.path.join(tempdir, "msc_cache")
 761            location = self._cache_dict.get("location", default_location)
 762
 763            # Check if cache_backend.cache_path is defined
 764            cache_backend = self._cache_dict.get("cache_backend", {})
 765            cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
 766
 767            # Warn if both location and cache_backend.cache_path are defined
 768            if cache_backend_path and self._cache_dict.get("location") is not None:
 769                logger.warning(
 770                    f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
 771                    f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
 772                )
 773            elif cache_backend_path:
 774                # Use cache_backend.cache_path only if location is not explicitly defined
 775                location = cache_backend_path
 776
 777            # Resolve the effective flag while preserving explicit ``False``.
 778            if "check_source_version" in self._cache_dict:
 779                check_source_version = self._cache_dict["check_source_version"]
 780            else:
 781                check_source_version = self._cache_dict.get("use_etag", True)
 782
 783            # Warn if both keys are specified – the new one wins.
 784            if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
 785                logger.warning(
 786                    "Both 'check_source_version' and 'use_etag' are defined in cache config. "
 787                    "Using 'check_source_version' and ignoring 'use_etag'."
 788                )
 789
 790            if not Path(location).is_absolute():
 791                raise ValueError(f"Cache location must be an absolute path: {location}")
 792
 793            # Initialize cache_dict with default values
 794            cache_dict = self._cache_dict
 795
 796            # Verify cache config
 797            self._verify_cache_config(cache_dict)
 798
 799            # Initialize eviction policy
 800            if "eviction_policy" in cache_dict:
 801                policy = cache_dict["eviction_policy"]["policy"].lower()
 802                purge_factor = cache_dict["eviction_policy"].get("purge_factor", 0)
 803                eviction_policy = EvictionPolicyConfig(
 804                    policy=policy,
 805                    refresh_interval=cache_dict["eviction_policy"].get(
 806                        "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
 807                    ),
 808                    purge_factor=purge_factor,
 809                )
 810            else:
 811                eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
 812
 813            # todo: remove once experimental features are stable
 814            # Validate experimental features before creating cache_config
 815            self._validate_experimental_features(eviction_policy)
 816
 817            # Create cache config from the standardized format
 818            cache_config = CacheConfig(
 819                size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
 820                location=cache_dict.get("location", location),
 821                check_source_version=check_source_version,
 822                eviction_policy=eviction_policy,
 823                cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
 824            )
 825
 826            cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
 827        elif self._cache_dict is not None and not caching_enabled:
 828            logger.debug(f"Caching is disabled for profile '{self._profile}'")
 829        elif self._cache_dict is None and caching_enabled:
 830            logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
 831
 832        retry_config = self._build_retry_config()
 833        autocommit_config = self._build_autocommit_config()
 834
 835        config = StorageClientConfig(
 836            profile=self._profile,
 837            storage_provider=storage_provider,
 838            credentials_provider=credentials_provider,
 839            storage_provider_profiles=None,
 840            metadata_provider=metadata_provider,
 841            cache_config=cache_config,
 842            cache_manager=cache_manager,
 843            retry_config=retry_config,
 844            telemetry_provider=self._telemetry_provider,
 845            replicas=replicas,
 846            autocommit_config=autocommit_config,
 847        )
 848
 849        config._config_dict = self._resolved_config_dict
 850        return config
 851
 852
 853class PathMapping:
 854    """
 855    Class to handle path mappings defined in the MSC configuration.
 856
 857    Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
 858    where entries are sorted by prefix length (longest first) for optimal matching.
 859    Longer paths take precedence when matching.
 860    """
 861
 862    def __init__(self):
 863        """Initialize an empty PathMapping."""
 864        self._mapping = defaultdict(lambda: defaultdict(list))
 865
 866    @classmethod
 867    def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
 868        """
 869        Create a PathMapping instance from configuration dictionary.
 870
 871        :param config_dict: Configuration dictionary, if None the config will be loaded
 872        :return: A PathMapping instance with processed mappings
 873        """
 874        if config_dict is None:
 875            # Import locally to avoid circular imports
 876            from multistorageclient.config import StorageClientConfig
 877
 878            config_dict, _ = StorageClientConfig.read_msc_config()
 879
 880        if not config_dict:
 881            return cls()
 882
 883        instance = cls()
 884        instance._load_mapping(config_dict)
 885        return instance
 886
 887    def _load_mapping(self, config_dict: dict[str, Any]) -> None:
 888        """
 889        Load path mapping from a configuration dictionary.
 890
 891        :param config_dict: Configuration dictionary containing path mapping
 892        """
 893        # Get the path_mapping section
 894        path_mapping = config_dict.get("path_mapping", {})
 895        if path_mapping is None:
 896            return
 897
 898        # Process each mapping
 899        for source_path, dest_path in path_mapping.items():
 900            # Validate format
 901            if not source_path.endswith("/"):
 902                continue
 903            if not dest_path.startswith(MSC_PROTOCOL):
 904                continue
 905            if not dest_path.endswith("/"):
 906                continue
 907
 908            # Extract the destination profile
 909            pr_dest = urlparse(dest_path)
 910            dest_profile = pr_dest.netloc
 911
 912            # Parse the source path
 913            pr = urlparse(source_path)
 914            protocol = pr.scheme.lower() if pr.scheme else "file"
 915
 916            if protocol == "file" or source_path.startswith("/"):
 917                # For file or absolute paths, use the whole path as the prefix
 918                # and leave bucket empty
 919                bucket = ""
 920                prefix = source_path if source_path.startswith("/") else pr.path
 921            else:
 922                # For object storage, extract bucket and prefix
 923                bucket = pr.netloc
 924                prefix = pr.path
 925                if prefix.startswith("/"):
 926                    prefix = prefix[1:]
 927
 928            # Add the mapping to the nested dict
 929            self._mapping[protocol][bucket].append((prefix, dest_profile))
 930
 931        # Sort each bucket's prefixes by length (longest first) for optimal matching
 932        for protocol, buckets in self._mapping.items():
 933            for bucket, prefixes in buckets.items():
 934                self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
 935
 936    def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
 937        """
 938        Find the best matching mapping for the given URL.
 939
 940        :param url: URL to find matching mapping for
 941        :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
 942        """
 943        # Parse the URL
 944        pr = urlparse(url)
 945        protocol = pr.scheme.lower() if pr.scheme else "file"
 946
 947        # For file paths or absolute paths
 948        if protocol == "file" or url.startswith("/"):
 949            path = url if url.startswith("/") else pr.path
 950
 951            possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
 952
 953            # Check each prefix (already sorted by length, longest first)
 954            for prefix, profile in possible_mapping:
 955                if path.startswith(prefix):
 956                    # Calculate the relative path
 957                    rel_path = path[len(prefix) :]
 958                    if not rel_path.startswith("/"):
 959                        rel_path = "/" + rel_path
 960                    return profile, rel_path
 961
 962            return None
 963
 964        # For object storage
 965        bucket = pr.netloc
 966        path = pr.path
 967        if path.startswith("/"):
 968            path = path[1:]
 969
 970        # Check bucket-specific mapping
 971        possible_mapping = (
 972            self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
 973        )
 974
 975        # Check each prefix (already sorted by length, longest first)
 976        for prefix, profile in possible_mapping:
 977            # matching prefix
 978            if path.startswith(prefix):
 979                rel_path = path[len(prefix) :]
 980                # Remove leading slash if present
 981                if rel_path.startswith("/"):
 982                    rel_path = rel_path[1:]
 983
 984                return profile, rel_path
 985
 986        return None
 987
 988
[docs] 989class StorageClientConfig: 990 """ 991 Configuration class for the :py:class:`multistorageclient.StorageClient`. 992 """ 993 994 profile: str 995 storage_provider: Optional[StorageProvider] 996 credentials_provider: Optional[CredentialsProvider] 997 storage_provider_profiles: Optional[list[str]] 998 metadata_provider: Optional[MetadataProvider] 999 cache_config: Optional[CacheConfig] 1000 cache_manager: Optional[CacheManager] 1001 retry_config: Optional[RetryConfig] 1002 telemetry_provider: Optional[Callable[[], Telemetry]] 1003 replicas: list[Replica] 1004 autocommit_config: Optional[AutoCommitConfig] 1005 1006 _config_dict: Optional[dict[str, Any]] 1007 1008 def __init__( 1009 self, 1010 profile: str, 1011 storage_provider: Optional[StorageProvider] = None, 1012 credentials_provider: Optional[CredentialsProvider] = None, 1013 storage_provider_profiles: Optional[list[str]] = None, 1014 metadata_provider: Optional[MetadataProvider] = None, 1015 cache_config: Optional[CacheConfig] = None, 1016 cache_manager: Optional[CacheManager] = None, 1017 retry_config: Optional[RetryConfig] = None, 1018 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1019 replicas: Optional[list[Replica]] = None, 1020 autocommit_config: Optional[AutoCommitConfig] = None, 1021 ): 1022 # exactly one of storage_provider or storage_provider_profiles must be set 1023 if storage_provider and storage_provider_profiles: 1024 raise ValueError( 1025 "Cannot specify both storage_provider and storage_provider_profiles. " 1026 "Use storage_provider for SingleStorageClient or storage_provider_profiles for CompositeStorageClient." 1027 ) 1028 if not storage_provider and not storage_provider_profiles: 1029 raise ValueError("Must specify either storage_provider or storage_provider_profiles.") 1030 1031 if replicas is None: 1032 replicas = [] 1033 self.profile = profile 1034 self.storage_provider = storage_provider 1035 self.credentials_provider = credentials_provider 1036 self.storage_provider_profiles = storage_provider_profiles 1037 self.metadata_provider = metadata_provider 1038 self.cache_config = cache_config 1039 self.cache_manager = cache_manager 1040 self.retry_config = retry_config 1041 self.telemetry_provider = telemetry_provider 1042 self.replicas = replicas 1043 self.autocommit_config = autocommit_config 1044
[docs] 1045 @staticmethod 1046 def from_json( 1047 config_json: str, 1048 profile: str = DEFAULT_POSIX_PROFILE_NAME, 1049 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1050 ) -> "StorageClientConfig": 1051 """ 1052 Load a storage client configuration from a JSON string. 1053 1054 :param config_json: Configuration JSON string. 1055 :param profile: Profile to use. 1056 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1057 """ 1058 config_dict = json.loads(config_json) 1059 return StorageClientConfig.from_dict( 1060 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 1061 )
1062
[docs] 1063 @staticmethod 1064 def from_yaml( 1065 config_yaml: str, 1066 profile: str = DEFAULT_POSIX_PROFILE_NAME, 1067 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1068 ) -> "StorageClientConfig": 1069 """ 1070 Load a storage client configuration from a YAML string. 1071 1072 :param config_yaml: Configuration YAML string. 1073 :param profile: Profile to use. 1074 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1075 """ 1076 config_dict = yaml.safe_load(config_yaml) 1077 return StorageClientConfig.from_dict( 1078 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 1079 )
1080
[docs] 1081 @staticmethod 1082 def from_dict( 1083 config_dict: dict[str, Any], 1084 profile: str = DEFAULT_POSIX_PROFILE_NAME, 1085 skip_validation: bool = False, 1086 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1087 ) -> "StorageClientConfig": 1088 """ 1089 Load a storage client configuration from a Python dictionary. 1090 1091 :param config_dict: Configuration Python dictionary. 1092 :param profile: Profile to use. 1093 :param skip_validation: Skip configuration schema validation. 1094 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1095 """ 1096 # Validate the config file with predefined JSON schema 1097 if not skip_validation: 1098 validate_config(config_dict) 1099 1100 # Load config 1101 loader = StorageClientConfigLoader( 1102 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 1103 ) 1104 config = loader.build_config() 1105 1106 return config
1107
[docs] 1108 @staticmethod 1109 def from_file( 1110 config_file_paths: Optional[Iterable[str]] = None, 1111 profile: str = DEFAULT_POSIX_PROFILE_NAME, 1112 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1113 ) -> "StorageClientConfig": 1114 """ 1115 Load a storage client configuration from the first file found. 1116 1117 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`). 1118 :param profile: Profile to use. 1119 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1120 """ 1121 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths) 1122 # Parse rclone config file. 1123 rclone_config_dict, rclone_config_file = read_rclone_config() 1124 1125 # Merge config files. 1126 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict) 1127 if conflicted_keys: 1128 raise ValueError( 1129 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}' 1130 ) 1131 merged_profiles = merged_config.get("profiles", {}) 1132 1133 # Check if profile is in merged_profiles 1134 if profile in merged_profiles: 1135 return StorageClientConfig.from_dict( 1136 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider 1137 ) 1138 else: 1139 # Check if profile is the default profile or an implicit profile 1140 if profile == DEFAULT_POSIX_PROFILE_NAME: 1141 implicit_profile_config = DEFAULT_POSIX_PROFILE 1142 elif profile.startswith("_"): 1143 # Handle implicit profiles 1144 parts = profile[1:].split("-", 1) 1145 if len(parts) == 2: 1146 protocol, bucket = parts 1147 # Verify it's a supported protocol 1148 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 1149 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"') 1150 implicit_profile_config = create_implicit_profile_config( 1151 profile_name=profile, protocol=protocol, base_path=bucket 1152 ) 1153 else: 1154 raise ValueError(f'Invalid implicit profile format: "{profile}"') 1155 else: 1156 raise ValueError( 1157 f'Profile "{profile}" not found in configuration files. Configuration was checked in ' 1158 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. " 1159 f"Please verify that the profile exists and that configuration files are correctly located." 1160 ) 1161 # merge the implicit profile config into the merged config so the cache & observability config can be inherited 1162 if "profiles" not in merged_config: 1163 merged_config["profiles"] = implicit_profile_config["profiles"] 1164 else: 1165 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile] 1166 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_" 1167 return StorageClientConfig.from_dict( 1168 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider 1169 )
1170 1171 @staticmethod 1172 def from_provider_bundle( 1173 config_dict: dict[str, Any], 1174 provider_bundle: Union[ProviderBundle, ProviderBundleV2], 1175 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1176 ) -> "StorageClientConfig": 1177 loader = StorageClientConfigLoader( 1178 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider 1179 ) 1180 config = loader.build_config() 1181 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors 1182 return config 1183
[docs] 1184 @staticmethod 1185 def read_msc_config( 1186 config_file_paths: Optional[Iterable[str]] = None, 1187 ) -> tuple[Optional[dict[str, Any]], Optional[str]]: 1188 """Get the MSC configuration dictionary and the path of the first file found. 1189 1190 If no config paths are specified, configs are searched in the following order: 1191 1192 1. ``MSC_CONFIG`` environment variable (highest precedence) 1193 2. Standard search paths (user-specified config and system-wide config) 1194 1195 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used. 1196 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration 1197 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute 1198 path of the config file used, or ``None`` if no config file was found. 1199 """ 1200 config_dict: dict[str, Any] = {} 1201 config_file_path: Optional[str] = None 1202 1203 config_file_paths = list(config_file_paths or []) 1204 1205 # Add default paths if none provided. 1206 if len(config_file_paths) == 0: 1207 # Environment variable. 1208 msc_config_env = os.getenv("MSC_CONFIG", None) 1209 if msc_config_env is not None: 1210 config_file_paths.append(msc_config_env) 1211 1212 # Standard search paths. 1213 config_file_paths.extend(_find_config_file_paths()) 1214 1215 # Normalize + absolutize paths. 1216 config_file_paths = [os.path.abspath(path) for path in config_file_paths] 1217 1218 # Log plan. 1219 logger.debug(f"Searching MSC config file paths: {config_file_paths}") 1220 1221 # Load config. 1222 for path in config_file_paths: 1223 if os.path.exists(path): 1224 try: 1225 with open(path) as f: 1226 if path.endswith(".json"): 1227 config_dict = json.load(f) 1228 else: 1229 config_dict = yaml.safe_load(f) 1230 config_file_path = path 1231 # Use the first config file. 1232 break 1233 except Exception as e: 1234 raise ValueError(f"malformed MSC config file: {path}, exception: {e}") 1235 1236 # Log result. 1237 if config_file_path is None: 1238 logger.debug("No MSC config files found in any of the search locations.") 1239 else: 1240 logger.debug(f"Using MSC config file: {config_file_path}") 1241 1242 if config_dict: 1243 validate_config(config_dict) 1244 return config_dict, config_file_path
1245
[docs] 1246 @staticmethod 1247 def read_path_mapping() -> PathMapping: 1248 """ 1249 Get the path mapping defined in the MSC configuration. 1250 1251 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)] 1252 where entries are sorted by prefix length (longest first) for optimal matching. 1253 Longer paths take precedence when matching. 1254 1255 :return: A PathMapping instance with translation mappings 1256 """ 1257 try: 1258 return PathMapping.from_config() 1259 except Exception: 1260 # Log the error but continue - this shouldn't stop the application from working 1261 logger.error("Failed to load path_mapping from MSC config") 1262 return PathMapping()
1263 1264 def __getstate__(self) -> dict[str, Any]: 1265 state = self.__dict__.copy() 1266 if not state.get("_config_dict"): 1267 raise ValueError("StorageClientConfig is not serializable") 1268 del state["credentials_provider"] 1269 del state["storage_provider"] 1270 del state["metadata_provider"] 1271 del state["cache_manager"] 1272 del state["replicas"] 1273 return state 1274 1275 def __setstate__(self, state: dict[str, Any]) -> None: 1276 # Presence checked by __getstate__. 1277 config_dict = state["_config_dict"] 1278 loader = StorageClientConfigLoader( 1279 config_dict=config_dict, 1280 profile=state["profile"], 1281 telemetry_provider=state["telemetry_provider"], 1282 ) 1283 new_config = loader.build_config() 1284 self.profile = new_config.profile 1285 self.storage_provider = new_config.storage_provider 1286 self.credentials_provider = new_config.credentials_provider 1287 self.storage_provider_profiles = new_config.storage_provider_profiles 1288 self.metadata_provider = new_config.metadata_provider 1289 self.cache_config = new_config.cache_config 1290 self.cache_manager = new_config.cache_manager 1291 self.retry_config = new_config.retry_config 1292 self.telemetry_provider = new_config.telemetry_provider 1293 self._config_dict = config_dict 1294 self.replicas = new_config.replicas 1295 self.autocommit_config = new_config.autocommit_config