Source code for multistorageclient.config

   1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
   2# SPDX-License-Identifier: Apache-2.0
   3#
   4# Licensed under the Apache License, Version 2.0 (the "License");
   5# you may not use this file except in compliance with the License.
   6# You may obtain a copy of the License at
   7#
   8# http://www.apache.org/licenses/LICENSE-2.0
   9#
  10# Unless required by applicable law or agreed to in writing, software
  11# distributed under the License is distributed on an "AS IS" BASIS,
  12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13# See the License for the specific language governing permissions and
  14# limitations under the License.
  15
  16import copy
  17import json
  18import logging
  19import os
  20import tempfile
  21from collections import defaultdict
  22from collections.abc import Callable
  23from pathlib import Path
  24from typing import Any, Iterable, Optional, Union
  25from urllib.parse import urlparse
  26
  27import yaml
  28
  29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
  30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
  31from .providers.manifest_metadata import ManifestMetadataProvider
  32from .rclone import read_rclone_config
  33from .schema import validate_config
  34from .telemetry import Telemetry
  35from .telemetry import init as telemetry_init
  36from .types import (
  37    DEFAULT_RETRY_ATTEMPTS,
  38    DEFAULT_RETRY_BACKOFF_MULTIPLIER,
  39    DEFAULT_RETRY_DELAY,
  40    MSC_PROTOCOL,
  41    AutoCommitConfig,
  42    CredentialsProvider,
  43    MetadataProvider,
  44    ProviderBundle,
  45    ProviderBundleV2,
  46    Replica,
  47    RetryConfig,
  48    StorageBackend,
  49    StorageProvider,
  50    StorageProviderConfig,
  51)
  52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
  53
  54# Constants related to implicit profiles
  55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
  56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
  57    "s3": "s3",
  58    "gs": "gcs",
  59    "ais": "ais",
  60    "file": "file",
  61}
  62
  63
  64# Template for creating implicit profile configurations
  65def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
  66    """
  67    Create a configuration dictionary for an implicit profile.
  68
  69    :param profile_name: The name of the profile (e.g., "_s3-bucket1")
  70    :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
  71    :param base_path: The base path (e.g., bucket name) for the storage provider
  72
  73    :return: A configuration dictionary for the implicit profile
  74    """
  75    provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
  76    return {
  77        "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
  78    }
  79
  80
  81DEFAULT_POSIX_PROFILE_NAME = "default"
  82DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
  83
  84STORAGE_PROVIDER_MAPPING = {
  85    "file": "PosixFileStorageProvider",
  86    "s3": "S3StorageProvider",
  87    "gcs": "GoogleStorageProvider",
  88    "oci": "OracleStorageProvider",
  89    "azure": "AzureBlobStorageProvider",
  90    "ais": "AIStoreStorageProvider",
  91    "ais_s3": "AIStoreS3StorageProvider",
  92    "s8k": "S8KStorageProvider",
  93    "gcs_s3": "GoogleS3StorageProvider",
  94    "huggingface": "HuggingFaceStorageProvider",
  95}
  96
  97CREDENTIALS_PROVIDER_MAPPING = {
  98    "S3Credentials": "StaticS3CredentialsProvider",
  99    "AzureCredentials": "StaticAzureCredentialsProvider",
 100    "AISCredentials": "StaticAISCredentialProvider",
 101    "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
 102    "GoogleServiceAccountCredentialsProvider": "GoogleServiceAccountCredentialsProvider",
 103    "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
 104    "FileBasedCredentials": "FileBasedCredentialsProvider",
 105}
 106
 107
 108def _resolve_include_path(include_path: str, parent_config_path: str) -> str:
 109    """
 110    Resolve include path (absolute or relative to parent config file).
 111
 112    :param include_path: Path from include keyword (can be absolute or relative)
 113    :param parent_config_path: Absolute path of the config file containing the include
 114    :return: Absolute, normalized path to the included config file
 115    """
 116    if os.path.isabs(include_path):
 117        return os.path.abspath(include_path)
 118    else:
 119        parent_dir = os.path.dirname(parent_config_path)
 120        return os.path.abspath(os.path.join(parent_dir, include_path))
 121
 122
 123def _merge_profiles(
 124    base_profiles: dict[str, Any],
 125    new_profiles: dict[str, Any],
 126    base_path: str,
 127    new_path: str,
 128) -> dict[str, Any]:
 129    """
 130    Merge profiles from two config files with conflict detection.
 131
 132    Profiles with the same name are allowed if their definitions are identical (idempotent).
 133    If a profile name exists in both configs with different definitions, an error is raised.
 134
 135    NOTE: This function performs SHALLOW merging - each profile is treated as a complete unit.
 136    We cannot use merge_dictionaries_no_overwrite() here because it would recursively merge
 137    profile fields, which could lead to unintended behavior. For example:
 138
 139    base_config:
 140        profiles:
 141            my-profile:
 142                storage_provider:
 143                    type: s3
 144                    options:
 145                        base_path: bucket
 146
 147    new_config:
 148        profiles:
 149            my-profile:
 150                credentials_provider:
 151                    type: S3Credentials,
 152                    options:
 153                        access_key: foo
 154                        secret_key: bar
 155
 156    If we used merge_dictionaries_no_overwrite(), the result would be a profile with BOTH storage_provider
 157    and credentials_provider, which might not be the intended configuration.
 158
 159    :param base_profiles: Profiles from the base config
 160    :param new_profiles: Profiles from the new config to merge
 161    :param base_path: Path to the base config file (for error messages)
 162    :param new_path: Path to the new config file (for error messages)
 163    :return: Merged profiles dictionary
 164    :raises ValueError: If any profile name exists in both configs with different definitions
 165    """
 166    result = base_profiles.copy()
 167    conflicting_profiles = []
 168
 169    for profile_name, new_profile_def in new_profiles.items():
 170        if profile_name in result:
 171            if result[profile_name] != new_profile_def:
 172                conflicting_profiles.append(profile_name)
 173        else:
 174            result[profile_name] = new_profile_def
 175
 176    if conflicting_profiles:
 177        conflicts_list = ", ".join(f"'{p}'" for p in sorted(conflicting_profiles))
 178        raise ValueError(f"Profile conflict: {conflicts_list} defined differently in {base_path} and {new_path}")
 179
 180    return result
 181
 182
 183def _merge_configs(
 184    base_config: dict[str, Any],
 185    new_config: dict[str, Any],
 186    base_path: str,
 187    new_path: str,
 188) -> dict[str, Any]:
 189    """
 190    Merge two config dictionaries with field-specific strategies.
 191
 192    Different config fields have different merge strategies:
 193    - profiles: Shallow merge with idempotent check (uses _merge_profiles)
 194    - path_mapping: Flat dict merge with idempotent check
 195    - experimental_features: Flat dict merge with idempotent check
 196    - cache, opentelemetry, posix: Global configs, idempotent if identical, error if different
 197
 198    :param base_config: Base configuration dictionary
 199    :param new_config: New configuration to merge
 200    :param base_path: Path to base config file (for error messages)
 201    :param new_path: Path to new config file (for error messages)
 202    :return: Merged configuration dictionary
 203    :raises ValueError: If conflicts are detected
 204    """
 205    result = {}
 206
 207    all_keys = set(base_config.keys()) | set(new_config.keys())
 208
 209    for key in all_keys:
 210        base_value = base_config.get(key)
 211        new_value = new_config.get(key)
 212
 213        # Key only in base
 214        if key not in new_config:
 215            result[key] = base_value
 216            continue
 217
 218        # Key only in new
 219        if key not in base_config:
 220            result[key] = new_value
 221            continue
 222
 223        # Key in both - need to merge or detect conflict
 224        if key == "profiles":
 225            result["profiles"] = _merge_profiles(base_value or {}, new_value or {}, base_path, new_path)
 226
 227        elif key in ("path_mapping", "experimental_features"):
 228            merged, conflicts = merge_dictionaries_no_overwrite(
 229                (base_value or {}).copy(), new_value or {}, allow_idempotent=True
 230            )
 231            if conflicts:
 232                conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
 233                raise ValueError(
 234                    f"Config merge conflict: {conflicts_list} have different values in {base_path} and {new_path}"
 235                )
 236            result[key] = merged
 237
 238        elif key in ("cache", "opentelemetry", "posix"):
 239            if base_value != new_value:
 240                raise ValueError(f"'{key}' defined differently in {base_path} and {new_path}")
 241            result[key] = base_value
 242
 243        elif key == "include":
 244            # 'include' is processed by _load_and_merge_includes, not part of final config
 245            pass
 246
 247        else:
 248            # This should never happen and all top level fields must have explicit handling above
 249            raise ValueError(f"Unknown field '{key}' in config file.")
 250
 251    return result
 252
 253
 254def _load_and_merge_includes(
 255    main_config_path: str,
 256    main_config_dict: dict[str, Any],
 257) -> dict[str, Any]:
 258    """
 259    Load and merge included config files.
 260
 261    Processes the 'include' directive in the main config, loading and merging all
 262    specified config files. Only supports one level of includes - included files
 263    cannot themselves have 'include' directives.
 264
 265    :param main_config_path: Absolute path to the main config file
 266    :param main_config_dict: Dictionary loaded from the main config file
 267    :return: Merged configuration dictionary (without 'include' field)
 268    :raises ValueError: If include file not found, malformed, or contains nested includes
 269    """
 270    include_paths = main_config_dict.get("include", [])
 271
 272    if not include_paths:
 273        return {k: v for k, v in main_config_dict.items() if k != "include"}
 274
 275    merged_config = {k: v for k, v in main_config_dict.items() if k != "include"}
 276
 277    for include_path in include_paths:
 278        resolved_path = _resolve_include_path(include_path, main_config_path)
 279
 280        if not os.path.exists(resolved_path):
 281            raise ValueError(f"Included config file not found: {resolved_path} (from {main_config_path})")
 282
 283        try:
 284            with open(resolved_path) as f:
 285                if resolved_path.endswith(".json"):
 286                    included_config = json.load(f)
 287                else:
 288                    included_config = yaml.safe_load(f)
 289        except Exception as e:
 290            raise ValueError(f"Failed to load included config {resolved_path}: {e}")
 291
 292        validate_config(included_config)
 293        if "include" in included_config:
 294            raise ValueError(f"Nested includes not allowed: {resolved_path} contains 'include' directive")
 295
 296        merged_config = _merge_configs(merged_config, included_config, main_config_path, resolved_path)
 297
 298    return merged_config
 299
 300
 301def _find_config_file_paths() -> tuple[str]:
 302    """
 303    Get configuration file search paths.
 304
 305    Returns paths in order of precedence:
 306
 307    1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
 308    2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
 309    """
 310    paths = []
 311
 312    # 1. User-specific configuration directory
 313    xdg_config_home = os.getenv("XDG_CONFIG_HOME")
 314
 315    if xdg_config_home:
 316        paths.extend(
 317            [
 318                os.path.join(xdg_config_home, "msc", "config.yaml"),
 319                os.path.join(xdg_config_home, "msc", "config.json"),
 320            ]
 321        )
 322
 323    user_home = os.getenv("HOME")
 324
 325    if user_home:
 326        paths.extend(
 327            [
 328                os.path.join(user_home, ".msc_config.yaml"),
 329                os.path.join(user_home, ".msc_config.json"),
 330                os.path.join(user_home, ".config", "msc", "config.yaml"),
 331                os.path.join(user_home, ".config", "msc", "config.json"),
 332            ]
 333        )
 334
 335    # 2. System-wide configuration directories
 336    xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
 337    if not xdg_config_dirs:
 338        xdg_config_dirs = "/etc/xdg"
 339
 340    for config_dir in xdg_config_dirs.split(":"):
 341        config_dir = config_dir.strip()
 342        if config_dir:
 343            paths.extend(
 344                [
 345                    os.path.join(config_dir, "msc", "config.yaml"),
 346                    os.path.join(config_dir, "msc", "config.json"),
 347                ]
 348            )
 349
 350    paths.extend(
 351        [
 352            "/etc/msc_config.yaml",
 353            "/etc/msc_config.json",
 354        ]
 355    )
 356
 357    return tuple(paths)
 358
 359
 360PACKAGE_NAME = "multistorageclient"
 361
 362logger = logging.getLogger(__name__)
 363
 364
 365class ImmutableDict(dict):
 366    """
 367    Immutable dictionary that raises an error when attempting to modify it.
 368    """
 369
 370    def __init__(self, *args, **kwargs):
 371        super().__init__(*args, **kwargs)
 372
 373        # Recursively freeze nested structures
 374        for key, value in list(super().items()):
 375            if isinstance(value, dict) and not isinstance(value, ImmutableDict):
 376                super().__setitem__(key, ImmutableDict(value))
 377            elif isinstance(value, list):
 378                super().__setitem__(key, self._freeze_list(value))
 379
 380    @staticmethod
 381    def _freeze_list(lst):
 382        """
 383        Convert list to tuple, freezing nested dicts recursively.
 384        """
 385        frozen = []
 386        for item in lst:
 387            if isinstance(item, dict):
 388                frozen.append(ImmutableDict(item))
 389            elif isinstance(item, list):
 390                frozen.append(ImmutableDict._freeze_list(item))
 391            else:
 392                frozen.append(item)
 393        return tuple(frozen)
 394
 395    def __deepcopy__(self, memo):
 396        """
 397        Return a regular mutable dict when deepcopy is called.
 398        """
 399        return copy.deepcopy(dict(self), memo)
 400
 401    def __reduce__(self):
 402        """
 403        Support for pickle serialization.
 404        """
 405        return (self.__class__, (dict(self),))
 406
 407    def _copy_value(self, value):
 408        """
 409        Convert frozen structures back to mutable equivalents.
 410        """
 411        if isinstance(value, ImmutableDict):
 412            return {k: self._copy_value(v) for k, v in value.items()}
 413        elif isinstance(value, tuple):
 414            # Check if it was originally a list (frozen by _freeze_list)
 415            return [self._copy_value(item) for item in value]
 416        else:
 417            return value
 418
 419    def __getitem__(self, key):
 420        """
 421        Return a mutable copy of the value.
 422        """
 423        value = super().__getitem__(key)
 424        return self._copy_value(value)
 425
 426    def get(self, key, default=None):
 427        """
 428        Return a mutable copy of the value.
 429        """
 430        return self[key] if key in self else default
 431
 432    def __setitem__(self, key, value):
 433        raise TypeError("ImmutableDict is immutable")
 434
 435    def __delitem__(self, key):
 436        raise TypeError("ImmutableDict is immutable")
 437
 438    def clear(self):
 439        raise TypeError("ImmutableDict is immutable")
 440
 441    def pop(self, *args):
 442        raise TypeError("ImmutableDict is immutable")
 443
 444    def popitem(self):
 445        raise TypeError("ImmutableDict is immutable")
 446
 447    def setdefault(self, key, default=None):
 448        raise TypeError("ImmutableDict is immutable")
 449
 450    def update(self, *args, **kwargs):
 451        raise TypeError("ImmutableDict is immutable")
 452
 453
 454class SimpleProviderBundle(ProviderBundle):
 455    def __init__(
 456        self,
 457        storage_provider_config: StorageProviderConfig,
 458        credentials_provider: Optional[CredentialsProvider] = None,
 459        metadata_provider: Optional[MetadataProvider] = None,
 460        replicas: Optional[list[Replica]] = None,
 461    ):
 462        if replicas is None:
 463            replicas = []
 464
 465        self._storage_provider_config = storage_provider_config
 466        self._credentials_provider = credentials_provider
 467        self._metadata_provider = metadata_provider
 468        self._replicas = replicas
 469
 470    @property
 471    def storage_provider_config(self) -> StorageProviderConfig:
 472        return self._storage_provider_config
 473
 474    @property
 475    def credentials_provider(self) -> Optional[CredentialsProvider]:
 476        return self._credentials_provider
 477
 478    @property
 479    def metadata_provider(self) -> Optional[MetadataProvider]:
 480        return self._metadata_provider
 481
 482    @property
 483    def replicas(self) -> list[Replica]:
 484        return self._replicas
 485
 486
 487class SimpleProviderBundleV2(ProviderBundleV2):
 488    def __init__(
 489        self, storage_backends: dict[str, StorageBackend], metadata_provider: Optional[MetadataProvider] = None
 490    ):
 491        self._storage_backends = storage_backends
 492        self._metadata_provider = metadata_provider
 493
 494    @staticmethod
 495    def from_v1_bundle(profile_name: str, v1_bundle: ProviderBundle) -> "SimpleProviderBundleV2":
 496        backend = StorageBackend(
 497            storage_provider_config=v1_bundle.storage_provider_config,
 498            credentials_provider=v1_bundle.credentials_provider,
 499            replicas=v1_bundle.replicas,
 500        )
 501
 502        return SimpleProviderBundleV2(
 503            storage_backends={profile_name: backend},
 504            metadata_provider=v1_bundle.metadata_provider,
 505        )
 506
 507    @property
 508    def storage_backends(self) -> dict[str, StorageBackend]:
 509        return self._storage_backends
 510
 511    @property
 512    def metadata_provider(self) -> Optional[MetadataProvider]:
 513        return self._metadata_provider
 514
 515
 516DEFAULT_CACHE_REFRESH_INTERVAL = 300
 517
 518
 519class StorageClientConfigLoader:
 520    _provider_bundle: ProviderBundleV2
 521    _resolved_config_dict: dict[str, Any]
 522    _profiles: dict[str, Any]
 523    _profile: str
 524    _profile_dict: dict[str, Any]
 525    _opentelemetry_dict: Optional[dict[str, Any]]
 526    _telemetry_provider: Optional[Callable[[], Telemetry]]
 527    _cache_dict: Optional[dict[str, Any]]
 528
 529    def __init__(
 530        self,
 531        config_dict: dict[str, Any],
 532        profile: str = DEFAULT_POSIX_PROFILE_NAME,
 533        provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]] = None,
 534        telemetry_provider: Optional[Callable[[], Telemetry]] = None,
 535    ) -> None:
 536        """
 537        Initializes a :py:class:`StorageClientConfigLoader` to create a
 538        StorageClientConfig. Components are built using the ``config_dict`` and
 539        profile, but a pre-built provider_bundle takes precedence.
 540
 541        :param config_dict: Dictionary of configuration options.
 542        :param profile: Name of profile in ``config_dict`` to use to build configuration.
 543        :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle` or :py:class:`multistorageclient.types.ProviderBundleV2`, takes precedence over ``config_dict``.
 544        :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
 545        """
 546        # Interpolates all environment variables into actual values.
 547        config_dict = expand_env_vars(config_dict)
 548        self._resolved_config_dict = ImmutableDict(config_dict)
 549
 550        self._profiles = config_dict.get("profiles", {})
 551
 552        if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
 553            # Assign the default POSIX profile
 554            self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
 555        else:
 556            # Cannot override default POSIX profile
 557            storage_provider_type = (
 558                self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
 559            )
 560            if storage_provider_type != "file":
 561                raise ValueError(
 562                    f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
 563                    f'"{storage_provider_type}"; expected "file".'
 564                )
 565
 566        profile_dict = self._profiles.get(profile)
 567
 568        if not profile_dict:
 569            raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
 570
 571        self._profile = profile
 572        self._profile_dict = ImmutableDict(profile_dict)
 573
 574        self._opentelemetry_dict = config_dict.get("opentelemetry", None)
 575        # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
 576        # New processes (e.g. multiprocessing manager server) can't be created during this phase.
 577        #
 578        # Pass thunks everywhere instead for lazy telemetry initialization.
 579        self._telemetry_provider = telemetry_provider or telemetry_init
 580
 581        self._cache_dict = config_dict.get("cache", None)
 582        self._experimental_features = config_dict.get("experimental_features", None)
 583
 584        self._provider_bundle = self._build_provider_bundle(provider_bundle)
 585        self._inject_profiles_from_bundle()
 586
 587    def _build_storage_provider(
 588        self,
 589        storage_provider_name: str,
 590        storage_options: Optional[dict[str, Any]] = None,
 591        credentials_provider: Optional[CredentialsProvider] = None,
 592    ) -> StorageProvider:
 593        if storage_options is None:
 594            storage_options = {}
 595        if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
 596            raise ValueError(
 597                f"Storage provider {storage_provider_name} is not supported. "
 598                f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
 599            )
 600        if credentials_provider:
 601            storage_options["credentials_provider"] = credentials_provider
 602        if self._resolved_config_dict is not None:
 603            # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
 604            storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
 605        if self._telemetry_provider is not None:
 606            storage_options["telemetry_provider"] = self._telemetry_provider
 607        class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
 608        module_name = ".providers"
 609        cls = import_class(class_name, module_name, PACKAGE_NAME)
 610        return cls(**storage_options)
 611
 612    def _build_storage_provider_from_profile(self, storage_provider_profile: str):
 613        storage_profile_dict = self._profiles.get(storage_provider_profile)
 614        if not storage_profile_dict:
 615            raise ValueError(
 616                f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
 617            )
 618
 619        # Check if metadata provider is configured for this profile
 620        # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
 621        local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
 622        if local_metadata_provider_dict:
 623            raise ValueError(
 624                f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
 625            )
 626
 627        # Initialize CredentialsProvider
 628        local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
 629        local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
 630
 631        # Initialize StorageProvider
 632        local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
 633        if local_storage_provider_dict:
 634            local_name = local_storage_provider_dict["type"]
 635            local_storage_options = local_storage_provider_dict.get("options", {})
 636        else:
 637            raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
 638
 639        storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
 640        return storage_provider
 641
 642    def _build_credentials_provider(
 643        self,
 644        credentials_provider_dict: Optional[dict[str, Any]],
 645        storage_options: Optional[dict[str, Any]] = None,
 646    ) -> Optional[CredentialsProvider]:
 647        """
 648        Initializes the CredentialsProvider based on the provided dictionary.
 649
 650        Args:
 651            credentials_provider_dict: Dictionary containing credentials provider configuration
 652            storage_options: Storage provider options required by some credentials providers to scope the credentials.
 653        """
 654        if not credentials_provider_dict:
 655            return None
 656
 657        if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
 658            # Fully qualified class path case
 659            class_type = credentials_provider_dict["type"]
 660            module_name, class_name = class_type.rsplit(".", 1)
 661            cls = import_class(class_name, module_name)
 662        else:
 663            # Mapped class name case
 664            class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
 665            module_name = ".providers"
 666            cls = import_class(class_name, module_name, PACKAGE_NAME)
 667
 668        # Propagate storage provider options to credentials provider since they may be
 669        # required by some credentials providers to scope the credentials.
 670        import inspect
 671
 672        init_params = list(inspect.signature(cls.__init__).parameters)[1:]  # skip 'self'
 673        options = credentials_provider_dict.get("options", {})
 674        if storage_options:
 675            for storage_provider_option in storage_options.keys():
 676                if storage_provider_option in init_params and storage_provider_option not in options:
 677                    options[storage_provider_option] = storage_options[storage_provider_option]
 678
 679        return cls(**options)
 680
 681    def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
 682        # Initialize StorageProvider
 683        storage_provider_dict = profile_dict.get("storage_provider", None)
 684        if storage_provider_dict:
 685            storage_provider_name = storage_provider_dict["type"]
 686            storage_options = storage_provider_dict.get("options", {})
 687        else:
 688            raise ValueError("Missing storage_provider in the config.")
 689
 690        # Initialize CredentialsProvider
 691        # It is prudent to assume that in some cases, the credentials provider
 692        # will provide credentials scoped to specific base_path.
 693        # So we need to pass the storage_options to the credentials provider.
 694        credentials_provider_dict = profile_dict.get("credentials_provider", None)
 695        credentials_provider = self._build_credentials_provider(
 696            credentials_provider_dict=credentials_provider_dict,
 697            storage_options=storage_options,
 698        )
 699
 700        # Initialize MetadataProvider
 701        metadata_provider_dict = profile_dict.get("metadata_provider", None)
 702        metadata_provider = None
 703        if metadata_provider_dict:
 704            if metadata_provider_dict["type"] == "manifest":
 705                metadata_options = metadata_provider_dict.get("options", {})
 706                # If MetadataProvider has a reference to a different storage provider profile
 707                storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
 708                if storage_provider_profile:
 709                    storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
 710                else:
 711                    storage_provider = self._build_storage_provider(
 712                        storage_provider_name, storage_options, credentials_provider
 713                    )
 714
 715                metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
 716            else:
 717                class_type = metadata_provider_dict["type"]
 718                if "." not in class_type:
 719                    raise ValueError(
 720                        f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
 721                    )
 722                module_name, class_name = class_type.rsplit(".", 1)
 723                cls = import_class(class_name, module_name)
 724                options = metadata_provider_dict.get("options", {})
 725                metadata_provider = cls(**options)
 726
 727        # Build replicas if configured
 728        replicas_config = profile_dict.get("replicas", [])
 729        replicas = []
 730        if replicas_config:
 731            for replica_dict in replicas_config:
 732                replicas.append(
 733                    Replica(
 734                        replica_profile=replica_dict["replica_profile"],
 735                        read_priority=replica_dict["read_priority"],
 736                    )
 737                )
 738
 739            # Sort replicas by read_priority
 740            replicas.sort(key=lambda r: r.read_priority)
 741
 742        return SimpleProviderBundle(
 743            storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
 744            credentials_provider=credentials_provider,
 745            metadata_provider=metadata_provider,
 746            replicas=replicas,
 747        )
 748
 749    def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
 750        class_type = provider_bundle_dict["type"]
 751        module_name, class_name = class_type.rsplit(".", 1)
 752        cls = import_class(class_name, module_name)
 753        options = provider_bundle_dict.get("options", {})
 754        return cls(**options)
 755
 756    def _build_provider_bundle(
 757        self, provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]]
 758    ) -> ProviderBundleV2:
 759        if provider_bundle:
 760            bundle = provider_bundle
 761        else:
 762            provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
 763            if provider_bundle_dict:
 764                bundle = self._build_provider_bundle_from_extension(provider_bundle_dict)
 765            else:
 766                bundle = self._build_provider_bundle_from_config(self._profile_dict)
 767
 768        if isinstance(bundle, ProviderBundle) and not isinstance(bundle, ProviderBundleV2):
 769            bundle = SimpleProviderBundleV2.from_v1_bundle(self._profile, bundle)
 770
 771        return bundle
 772
 773    def _inject_profiles_from_bundle(self) -> None:
 774        if len(self._provider_bundle.storage_backends) > 1:
 775            profiles = copy.deepcopy(self._profiles)
 776
 777            for child_name, backend in self._provider_bundle.storage_backends.items():
 778                child_config = {
 779                    "storage_provider": {
 780                        "type": backend.storage_provider_config.type,
 781                        "options": backend.storage_provider_config.options,
 782                    }
 783                }
 784
 785                if child_name in self._profiles:
 786                    # Profile already exists - check if it matches what we would inject
 787                    existing = self._profiles[child_name]
 788                    if existing.get("storage_provider") == child_config["storage_provider"]:
 789                        continue
 790                    else:
 791                        raise ValueError(
 792                            f"Profile '{child_name}' already exists in configuration with different settings."
 793                        )
 794
 795                profiles[child_name] = child_config
 796
 797            resolved_config_dict = copy.deepcopy(self._resolved_config_dict)
 798            resolved_config_dict["profiles"] = profiles
 799
 800            self._profiles = ImmutableDict(profiles)
 801            self._resolved_config_dict = ImmutableDict(resolved_config_dict)
 802
 803    def _build_retry_config(self) -> RetryConfig:
 804        """Build retry config from profile dict."""
 805        retry_config_dict = self._profile_dict.get("retry", None)
 806        if retry_config_dict:
 807            attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
 808            delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
 809            backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
 810            return RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
 811        else:
 812            return RetryConfig(
 813                attempts=DEFAULT_RETRY_ATTEMPTS,
 814                delay=DEFAULT_RETRY_DELAY,
 815                backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
 816            )
 817
 818    def _build_autocommit_config(self) -> AutoCommitConfig:
 819        """Build autocommit config from profile dict."""
 820        autocommit_dict = self._profile_dict.get("autocommit", None)
 821        if autocommit_dict:
 822            interval_minutes = autocommit_dict.get("interval_minutes", None)
 823            at_exit = autocommit_dict.get("at_exit", False)
 824            return AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
 825        return AutoCommitConfig()
 826
 827    def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
 828        if "size_mb" in cache_dict:
 829            raise ValueError(
 830                "The 'size_mb' property is no longer supported. \n"
 831                "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
 832                "Example configuration:\n"
 833                "cache:\n"
 834                "  size: 500G               # Optional: If not specified, default cache size (10GB) will be used\n"
 835                "  use_etag: true           # Optional: If not specified, default cache use_etag (true) will be used\n"
 836                "  location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
 837                "  eviction_policy:         # Optional: The eviction policy to use\n"
 838                "    policy: fifo           # Optional: The eviction policy to use, default is 'fifo'\n"
 839                "    refresh_interval: 300  # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
 840            )
 841
 842    def _validate_replicas(self, replicas: list[Replica]) -> None:
 843        """
 844        Validates that replica profiles do not have their own replicas configuration.
 845
 846        This prevents circular references where a replica profile could reference
 847        another profile that also has replicas, creating an infinite loop.
 848
 849        :param replicas: The list of Replica objects to validate
 850        :raises ValueError: If any replica profile has its own replicas configuration
 851        """
 852        for replica in replicas:
 853            replica_profile_name = replica.replica_profile
 854
 855            # Check that replica profile is not the same as the current profile
 856            if replica_profile_name == self._profile:
 857                raise ValueError(
 858                    f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
 859                )
 860
 861            # Check if the replica profile exists in the configuration
 862            if replica_profile_name not in self._profiles:
 863                raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
 864
 865            # Get the replica profile configuration
 866            replica_profile_dict = self._profiles[replica_profile_name]
 867
 868            # Check if the replica profile has its own replicas configuration
 869            if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
 870                raise ValueError(
 871                    f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
 872                    f"This creates a circular reference which is not allowed."
 873                )
 874
 875    # todo: remove once experimental features are stable
 876    def _validate_experimental_features(self, eviction_policy: EvictionPolicyConfig) -> None:
 877        """
 878        Validate that experimental features are enabled when used.
 879
 880        :param eviction_policy: The eviction policy configuration to validate
 881        :raises ValueError: If experimental features are used without being enabled
 882        """
 883        # Validate MRU eviction policy
 884        if eviction_policy.policy.upper() == "MRU":
 885            if not (self._experimental_features and self._experimental_features.get("cache_mru_eviction")):
 886                raise ValueError(
 887                    "MRU eviction policy is experimental and not enabled.\n"
 888                    "Enable it by adding to config:\n"
 889                    "  experimental_features:\n"
 890                    "    cache_mru_eviction: true"
 891                )
 892
 893        # Validate purge_factor
 894        if eviction_policy.purge_factor != 0:
 895            if eviction_policy.purge_factor < 0 or eviction_policy.purge_factor > 100:
 896                raise ValueError("purge_factor must be between 0 and 100")
 897
 898            if not (self._experimental_features and self._experimental_features.get("cache_purge_factor")):
 899                raise ValueError(
 900                    "purge_factor is experimental and not enabled.\n"
 901                    "Enable it by adding to config:\n"
 902                    "  experimental_features:\n"
 903                    "    cache_purge_factor: true"
 904                )
 905
 906    def build_config(self) -> "StorageClientConfig":
 907        bundle = self._provider_bundle
 908        backends = bundle.storage_backends
 909
 910        if len(backends) > 1:
 911            if not bundle.metadata_provider:
 912                raise ValueError(
 913                    f"Multi-backend configuration for profile '{self._profile}' requires metadata_provider "
 914                    "for routing between storage locations."
 915                )
 916
 917            child_profile_names = list(backends.keys())
 918            retry_config = self._build_retry_config()
 919            autocommit_config = self._build_autocommit_config()
 920
 921            # config for Composite StorageClient
 922            config = StorageClientConfig(
 923                profile=self._profile,
 924                storage_provider=None,
 925                credentials_provider=None,
 926                storage_provider_profiles=child_profile_names,
 927                metadata_provider=bundle.metadata_provider,
 928                cache_config=None,
 929                cache_manager=None,
 930                retry_config=retry_config,
 931                telemetry_provider=self._telemetry_provider,
 932                replicas=[],
 933                autocommit_config=autocommit_config,
 934            )
 935
 936            config._config_dict = self._resolved_config_dict
 937            return config
 938
 939        # Single-backend (len == 1)
 940        _, backend = list(backends.items())[0]
 941        storage_provider = self._build_storage_provider(
 942            backend.storage_provider_config.type,
 943            backend.storage_provider_config.options,
 944            backend.credentials_provider,
 945        )
 946        credentials_provider = backend.credentials_provider
 947        metadata_provider = bundle.metadata_provider
 948        replicas = backend.replicas
 949
 950        # Validate replicas to prevent circular references
 951        if replicas:
 952            self._validate_replicas(replicas)
 953
 954        cache_config: Optional[CacheConfig] = None
 955        cache_manager: Optional[CacheManager] = None
 956
 957        # Check if caching is enabled for this profile
 958        caching_enabled = self._profile_dict.get("caching_enabled", False)
 959
 960        if self._cache_dict is not None and caching_enabled:
 961            tempdir = tempfile.gettempdir()
 962            default_location = os.path.join(tempdir, "msc_cache")
 963            location = self._cache_dict.get("location", default_location)
 964
 965            # Check if cache_backend.cache_path is defined
 966            cache_backend = self._cache_dict.get("cache_backend", {})
 967            cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
 968
 969            # Warn if both location and cache_backend.cache_path are defined
 970            if cache_backend_path and self._cache_dict.get("location") is not None:
 971                logger.warning(
 972                    f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
 973                    f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
 974                )
 975            elif cache_backend_path:
 976                # Use cache_backend.cache_path only if location is not explicitly defined
 977                location = cache_backend_path
 978
 979            # Resolve the effective flag while preserving explicit ``False``.
 980            if "check_source_version" in self._cache_dict:
 981                check_source_version = self._cache_dict["check_source_version"]
 982            else:
 983                check_source_version = self._cache_dict.get("use_etag", True)
 984
 985            # Warn if both keys are specified – the new one wins.
 986            if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
 987                logger.warning(
 988                    "Both 'check_source_version' and 'use_etag' are defined in cache config. "
 989                    "Using 'check_source_version' and ignoring 'use_etag'."
 990                )
 991
 992            if not Path(location).is_absolute():
 993                raise ValueError(f"Cache location must be an absolute path: {location}")
 994
 995            # Initialize cache_dict with default values
 996            cache_dict = self._cache_dict
 997
 998            # Verify cache config
 999            self._verify_cache_config(cache_dict)
1000
1001            # Initialize eviction policy
1002            if "eviction_policy" in cache_dict:
1003                policy = cache_dict["eviction_policy"]["policy"].lower()
1004                purge_factor = cache_dict["eviction_policy"].get("purge_factor", 0)
1005                eviction_policy = EvictionPolicyConfig(
1006                    policy=policy,
1007                    refresh_interval=cache_dict["eviction_policy"].get(
1008                        "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
1009                    ),
1010                    purge_factor=purge_factor,
1011                )
1012            else:
1013                eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
1014
1015            # todo: remove once experimental features are stable
1016            # Validate experimental features before creating cache_config
1017            self._validate_experimental_features(eviction_policy)
1018
1019            # Create cache config from the standardized format
1020            cache_config = CacheConfig(
1021                size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
1022                location=cache_dict.get("location", location),
1023                check_source_version=check_source_version,
1024                eviction_policy=eviction_policy,
1025                cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
1026            )
1027
1028            cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
1029        elif self._cache_dict is not None and not caching_enabled:
1030            logger.debug(f"Caching is disabled for profile '{self._profile}'")
1031        elif self._cache_dict is None and caching_enabled:
1032            logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
1033
1034        retry_config = self._build_retry_config()
1035        autocommit_config = self._build_autocommit_config()
1036
1037        config = StorageClientConfig(
1038            profile=self._profile,
1039            storage_provider=storage_provider,
1040            credentials_provider=credentials_provider,
1041            storage_provider_profiles=None,
1042            metadata_provider=metadata_provider,
1043            cache_config=cache_config,
1044            cache_manager=cache_manager,
1045            retry_config=retry_config,
1046            telemetry_provider=self._telemetry_provider,
1047            replicas=replicas,
1048            autocommit_config=autocommit_config,
1049        )
1050
1051        config._config_dict = self._resolved_config_dict
1052        return config
1053
1054
1055class PathMapping:
1056    """
1057    Class to handle path mappings defined in the MSC configuration.
1058
1059    Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1060    where entries are sorted by prefix length (longest first) for optimal matching.
1061    Longer paths take precedence when matching.
1062    """
1063
1064    def __init__(self):
1065        """Initialize an empty PathMapping."""
1066        self._mapping = defaultdict(lambda: defaultdict(list))
1067
1068    @classmethod
1069    def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
1070        """
1071        Create a PathMapping instance from configuration dictionary.
1072
1073        :param config_dict: Configuration dictionary, if None the config will be loaded
1074        :return: A PathMapping instance with processed mappings
1075        """
1076        if config_dict is None:
1077            # Import locally to avoid circular imports
1078            from multistorageclient.config import StorageClientConfig
1079
1080            config_dict, _ = StorageClientConfig.read_msc_config()
1081
1082        if not config_dict:
1083            return cls()
1084
1085        instance = cls()
1086        instance._load_mapping(config_dict)
1087        return instance
1088
1089    def _load_mapping(self, config_dict: dict[str, Any]) -> None:
1090        """
1091        Load path mapping from a configuration dictionary.
1092
1093        :param config_dict: Configuration dictionary containing path mapping
1094        """
1095        # Get the path_mapping section
1096        path_mapping = config_dict.get("path_mapping", {})
1097        if path_mapping is None:
1098            return
1099
1100        # Process each mapping
1101        for source_path, dest_path in path_mapping.items():
1102            # Validate format
1103            if not source_path.endswith("/"):
1104                continue
1105            if not dest_path.startswith(MSC_PROTOCOL):
1106                continue
1107            if not dest_path.endswith("/"):
1108                continue
1109
1110            # Extract the destination profile
1111            pr_dest = urlparse(dest_path)
1112            dest_profile = pr_dest.netloc
1113
1114            # Parse the source path
1115            pr = urlparse(source_path)
1116            protocol = pr.scheme.lower() if pr.scheme else "file"
1117
1118            if protocol == "file" or source_path.startswith("/"):
1119                # For file or absolute paths, use the whole path as the prefix
1120                # and leave bucket empty
1121                bucket = ""
1122                prefix = source_path if source_path.startswith("/") else pr.path
1123            else:
1124                # For object storage, extract bucket and prefix
1125                bucket = pr.netloc
1126                prefix = pr.path
1127                if prefix.startswith("/"):
1128                    prefix = prefix[1:]
1129
1130            # Add the mapping to the nested dict
1131            self._mapping[protocol][bucket].append((prefix, dest_profile))
1132
1133        # Sort each bucket's prefixes by length (longest first) for optimal matching
1134        for protocol, buckets in self._mapping.items():
1135            for bucket, prefixes in buckets.items():
1136                self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
1137
1138    def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
1139        """
1140        Find the best matching mapping for the given URL.
1141
1142        :param url: URL to find matching mapping for
1143        :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
1144        """
1145        # Parse the URL
1146        pr = urlparse(url)
1147        protocol = pr.scheme.lower() if pr.scheme else "file"
1148
1149        # For file paths or absolute paths
1150        if protocol == "file" or url.startswith("/"):
1151            path = url if url.startswith("/") else pr.path
1152
1153            possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
1154
1155            # Check each prefix (already sorted by length, longest first)
1156            for prefix, profile in possible_mapping:
1157                if path.startswith(prefix):
1158                    # Calculate the relative path
1159                    rel_path = path[len(prefix) :]
1160                    if not rel_path.startswith("/"):
1161                        rel_path = "/" + rel_path
1162                    return profile, rel_path
1163
1164            return None
1165
1166        # For object storage
1167        bucket = pr.netloc
1168        path = pr.path
1169        if path.startswith("/"):
1170            path = path[1:]
1171
1172        # Check bucket-specific mapping
1173        possible_mapping = (
1174            self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
1175        )
1176
1177        # Check each prefix (already sorted by length, longest first)
1178        for prefix, profile in possible_mapping:
1179            # matching prefix
1180            if path.startswith(prefix):
1181                rel_path = path[len(prefix) :]
1182                # Remove leading slash if present
1183                if rel_path.startswith("/"):
1184                    rel_path = rel_path[1:]
1185
1186                return profile, rel_path
1187
1188        return None
1189
1190
[docs] 1191class StorageClientConfig: 1192 """ 1193 Configuration class for the :py:class:`multistorageclient.StorageClient`. 1194 """ 1195 1196 profile: str 1197 storage_provider: Optional[StorageProvider] 1198 credentials_provider: Optional[CredentialsProvider] 1199 storage_provider_profiles: Optional[list[str]] 1200 metadata_provider: Optional[MetadataProvider] 1201 cache_config: Optional[CacheConfig] 1202 cache_manager: Optional[CacheManager] 1203 retry_config: Optional[RetryConfig] 1204 telemetry_provider: Optional[Callable[[], Telemetry]] 1205 replicas: list[Replica] 1206 autocommit_config: Optional[AutoCommitConfig] 1207 1208 _config_dict: Optional[dict[str, Any]] 1209 1210 def __init__( 1211 self, 1212 profile: str, 1213 storage_provider: Optional[StorageProvider] = None, 1214 credentials_provider: Optional[CredentialsProvider] = None, 1215 storage_provider_profiles: Optional[list[str]] = None, 1216 metadata_provider: Optional[MetadataProvider] = None, 1217 cache_config: Optional[CacheConfig] = None, 1218 cache_manager: Optional[CacheManager] = None, 1219 retry_config: Optional[RetryConfig] = None, 1220 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1221 replicas: Optional[list[Replica]] = None, 1222 autocommit_config: Optional[AutoCommitConfig] = None, 1223 ): 1224 # exactly one of storage_provider or storage_provider_profiles must be set 1225 if storage_provider and storage_provider_profiles: 1226 raise ValueError( 1227 "Cannot specify both storage_provider and storage_provider_profiles. " 1228 "Use storage_provider for SingleStorageClient or storage_provider_profiles for CompositeStorageClient." 1229 ) 1230 if not storage_provider and not storage_provider_profiles: 1231 raise ValueError("Must specify either storage_provider or storage_provider_profiles.") 1232 1233 if replicas is None: 1234 replicas = [] 1235 self.profile = profile 1236 self.storage_provider = storage_provider 1237 self.credentials_provider = credentials_provider 1238 self.storage_provider_profiles = storage_provider_profiles 1239 self.metadata_provider = metadata_provider 1240 self.cache_config = cache_config 1241 self.cache_manager = cache_manager 1242 self.retry_config = retry_config 1243 self.telemetry_provider = telemetry_provider 1244 self.replicas = replicas 1245 self.autocommit_config = autocommit_config 1246
[docs] 1247 @staticmethod 1248 def from_json( 1249 config_json: str, 1250 profile: str = DEFAULT_POSIX_PROFILE_NAME, 1251 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1252 ) -> "StorageClientConfig": 1253 """ 1254 Load a storage client configuration from a JSON string. 1255 1256 :param config_json: Configuration JSON string. 1257 :param profile: Profile to use. 1258 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1259 """ 1260 config_dict = json.loads(config_json) 1261 return StorageClientConfig.from_dict( 1262 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 1263 )
1264
[docs] 1265 @staticmethod 1266 def from_yaml( 1267 config_yaml: str, 1268 profile: str = DEFAULT_POSIX_PROFILE_NAME, 1269 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1270 ) -> "StorageClientConfig": 1271 """ 1272 Load a storage client configuration from a YAML string. 1273 1274 :param config_yaml: Configuration YAML string. 1275 :param profile: Profile to use. 1276 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1277 """ 1278 config_dict = yaml.safe_load(config_yaml) 1279 return StorageClientConfig.from_dict( 1280 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 1281 )
1282
[docs] 1283 @staticmethod 1284 def from_dict( 1285 config_dict: dict[str, Any], 1286 profile: str = DEFAULT_POSIX_PROFILE_NAME, 1287 skip_validation: bool = False, 1288 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1289 ) -> "StorageClientConfig": 1290 """ 1291 Load a storage client configuration from a Python dictionary. 1292 1293 :param config_dict: Configuration Python dictionary. 1294 :param profile: Profile to use. 1295 :param skip_validation: Skip configuration schema validation. 1296 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1297 """ 1298 # Validate the config file with predefined JSON schema 1299 if not skip_validation: 1300 validate_config(config_dict) 1301 1302 # Load config 1303 loader = StorageClientConfigLoader( 1304 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 1305 ) 1306 config = loader.build_config() 1307 1308 return config
1309
[docs] 1310 @staticmethod 1311 def from_file( 1312 config_file_paths: Optional[Iterable[str]] = None, 1313 profile: str = DEFAULT_POSIX_PROFILE_NAME, 1314 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1315 ) -> "StorageClientConfig": 1316 """ 1317 Load a storage client configuration from the first file found. 1318 1319 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`). 1320 :param profile: Profile to use. 1321 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1322 """ 1323 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths) 1324 # Parse rclone config file. 1325 rclone_config_dict, rclone_config_file = read_rclone_config() 1326 1327 # Merge config files. 1328 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict) 1329 if conflicted_keys: 1330 raise ValueError( 1331 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}' 1332 ) 1333 merged_profiles = merged_config.get("profiles", {}) 1334 1335 # Check if profile is in merged_profiles 1336 if profile in merged_profiles: 1337 return StorageClientConfig.from_dict( 1338 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider 1339 ) 1340 else: 1341 # Check if profile is the default profile or an implicit profile 1342 if profile == DEFAULT_POSIX_PROFILE_NAME: 1343 implicit_profile_config = DEFAULT_POSIX_PROFILE 1344 elif profile.startswith("_"): 1345 # Handle implicit profiles 1346 parts = profile[1:].split("-", 1) 1347 if len(parts) == 2: 1348 protocol, bucket = parts 1349 # Verify it's a supported protocol 1350 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 1351 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"') 1352 implicit_profile_config = create_implicit_profile_config( 1353 profile_name=profile, protocol=protocol, base_path=bucket 1354 ) 1355 else: 1356 raise ValueError(f'Invalid implicit profile format: "{profile}"') 1357 else: 1358 raise ValueError( 1359 f'Profile "{profile}" not found in configuration files. Configuration was checked in ' 1360 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. " 1361 f"Please verify that the profile exists and that configuration files are correctly located." 1362 ) 1363 # merge the implicit profile config into the merged config so the cache & observability config can be inherited 1364 if "profiles" not in merged_config: 1365 merged_config["profiles"] = implicit_profile_config["profiles"] 1366 else: 1367 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile] 1368 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_" 1369 return StorageClientConfig.from_dict( 1370 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider 1371 )
1372 1373 @staticmethod 1374 def from_provider_bundle( 1375 config_dict: dict[str, Any], 1376 provider_bundle: Union[ProviderBundle, ProviderBundleV2], 1377 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1378 ) -> "StorageClientConfig": 1379 loader = StorageClientConfigLoader( 1380 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider 1381 ) 1382 config = loader.build_config() 1383 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors 1384 return config 1385
[docs] 1386 @staticmethod 1387 def read_msc_config( 1388 config_file_paths: Optional[Iterable[str]] = None, 1389 ) -> tuple[Optional[dict[str, Any]], Optional[str]]: 1390 """Get the MSC configuration dictionary and the path of the first file found. 1391 1392 If no config paths are specified, configs are searched in the following order: 1393 1394 1. ``MSC_CONFIG`` environment variable (highest precedence) 1395 2. Standard search paths (user-specified config and system-wide config) 1396 1397 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used. 1398 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration 1399 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute 1400 path of the config file used, or ``None`` if no config file was found. 1401 """ 1402 config_dict: dict[str, Any] = {} 1403 config_file_path: Optional[str] = None 1404 1405 config_file_paths = list(config_file_paths or []) 1406 1407 # Add default paths if none provided. 1408 if len(config_file_paths) == 0: 1409 # Environment variable. 1410 msc_config_env = os.getenv("MSC_CONFIG", None) 1411 if msc_config_env is not None: 1412 config_file_paths.append(msc_config_env) 1413 1414 # Standard search paths. 1415 config_file_paths.extend(_find_config_file_paths()) 1416 1417 # Normalize + absolutize paths. 1418 config_file_paths = [os.path.abspath(path) for path in config_file_paths] 1419 1420 # Log plan. 1421 logger.debug(f"Searching MSC config file paths: {config_file_paths}") 1422 1423 # Load config. 1424 for path in config_file_paths: 1425 if os.path.exists(path): 1426 try: 1427 with open(path) as f: 1428 if path.endswith(".json"): 1429 config_dict = json.load(f) 1430 else: 1431 config_dict = yaml.safe_load(f) 1432 config_file_path = path 1433 # Use the first config file. 1434 break 1435 except Exception as e: 1436 raise ValueError(f"malformed MSC config file: {path}, exception: {e}") 1437 1438 # Log result. 1439 if config_file_path is None: 1440 logger.debug("No MSC config files found in any of the search locations.") 1441 else: 1442 logger.debug(f"Using MSC config file: {config_file_path}") 1443 1444 if config_dict: 1445 validate_config(config_dict) 1446 1447 if "include" in config_dict and config_file_path: 1448 config_dict = _load_and_merge_includes(config_file_path, config_dict) 1449 1450 return config_dict, config_file_path
1451
[docs] 1452 @staticmethod 1453 def read_path_mapping() -> PathMapping: 1454 """ 1455 Get the path mapping defined in the MSC configuration. 1456 1457 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)] 1458 where entries are sorted by prefix length (longest first) for optimal matching. 1459 Longer paths take precedence when matching. 1460 1461 :return: A PathMapping instance with translation mappings 1462 """ 1463 try: 1464 return PathMapping.from_config() 1465 except Exception: 1466 # Log the error but continue - this shouldn't stop the application from working 1467 logger.error("Failed to load path_mapping from MSC config") 1468 return PathMapping()
1469 1470 def __getstate__(self) -> dict[str, Any]: 1471 state = self.__dict__.copy() 1472 if not state.get("_config_dict"): 1473 raise ValueError("StorageClientConfig is not serializable") 1474 del state["credentials_provider"] 1475 del state["storage_provider"] 1476 del state["metadata_provider"] 1477 del state["cache_manager"] 1478 del state["replicas"] 1479 return state 1480 1481 def __setstate__(self, state: dict[str, Any]) -> None: 1482 # Presence checked by __getstate__. 1483 config_dict = state["_config_dict"] 1484 loader = StorageClientConfigLoader( 1485 config_dict=config_dict, 1486 profile=state["profile"], 1487 telemetry_provider=state["telemetry_provider"], 1488 ) 1489 new_config = loader.build_config() 1490 self.profile = new_config.profile 1491 self.storage_provider = new_config.storage_provider 1492 self.credentials_provider = new_config.credentials_provider 1493 self.storage_provider_profiles = new_config.storage_provider_profiles 1494 self.metadata_provider = new_config.metadata_provider 1495 self.cache_config = new_config.cache_config 1496 self.cache_manager = new_config.cache_manager 1497 self.retry_config = new_config.retry_config 1498 self.telemetry_provider = new_config.telemetry_provider 1499 self._config_dict = config_dict 1500 self.replicas = new_config.replicas 1501 self.autocommit_config = new_config.autocommit_config