Source code for multistorageclient.config

   1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
   2# SPDX-License-Identifier: Apache-2.0
   3#
   4# Licensed under the Apache License, Version 2.0 (the "License");
   5# you may not use this file except in compliance with the License.
   6# You may obtain a copy of the License at
   7#
   8# http://www.apache.org/licenses/LICENSE-2.0
   9#
  10# Unless required by applicable law or agreed to in writing, software
  11# distributed under the License is distributed on an "AS IS" BASIS,
  12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13# See the License for the specific language governing permissions and
  14# limitations under the License.
  15
  16import copy
  17import json
  18import logging
  19import os
  20import tempfile
  21from collections import defaultdict
  22from collections.abc import Callable
  23from pathlib import Path
  24from typing import Any, Iterable, Optional, Union
  25from urllib.parse import urlparse
  26
  27import yaml
  28
  29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
  30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
  31from .providers.manifest_metadata import ManifestMetadataProvider
  32from .rclone import read_rclone_config
  33from .schema import validate_config
  34from .telemetry import Telemetry
  35from .telemetry import init as telemetry_init
  36from .types import (
  37    DEFAULT_RETRY_ATTEMPTS,
  38    DEFAULT_RETRY_BACKOFF_MULTIPLIER,
  39    DEFAULT_RETRY_DELAY,
  40    MSC_PROTOCOL,
  41    AutoCommitConfig,
  42    CredentialsProvider,
  43    MetadataProvider,
  44    ProviderBundle,
  45    ProviderBundleV2,
  46    Replica,
  47    RetryConfig,
  48    StorageBackend,
  49    StorageProvider,
  50    StorageProviderConfig,
  51)
  52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
  53
  54# Constants related to implicit profiles
  55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
  56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
  57    "s3": "s3",
  58    "gs": "gcs",
  59    "ais": "ais",
  60    "file": "file",
  61}
  62
  63
  64# Template for creating implicit profile configurations
  65def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
  66    """
  67    Create a configuration dictionary for an implicit profile.
  68
  69    :param profile_name: The name of the profile (e.g., "_s3-bucket1")
  70    :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
  71    :param base_path: The base path (e.g., bucket name) for the storage provider
  72
  73    :return: A configuration dictionary for the implicit profile
  74    """
  75    provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
  76    return {
  77        "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
  78    }
  79
  80
  81LEGACY_POSIX_PROFILE_NAME = "default"
  82RESERVED_POSIX_PROFILE_NAME = "__filesystem__"
  83DEFAULT_POSIX_PROFILE = create_implicit_profile_config(RESERVED_POSIX_PROFILE_NAME, "file", "/")
  84
  85STORAGE_PROVIDER_MAPPING = {
  86    "file": "PosixFileStorageProvider",
  87    "s3": "S3StorageProvider",
  88    "gcs": "GoogleStorageProvider",
  89    "oci": "OracleStorageProvider",
  90    "azure": "AzureBlobStorageProvider",
  91    "ais": "AIStoreStorageProvider",
  92    "ais_s3": "AIStoreS3StorageProvider",
  93    "s8k": "S8KStorageProvider",
  94    "gcs_s3": "GoogleS3StorageProvider",
  95    "huggingface": "HuggingFaceStorageProvider",
  96}
  97
  98CREDENTIALS_PROVIDER_MAPPING = {
  99    "S3Credentials": "StaticS3CredentialsProvider",
 100    "AzureCredentials": "StaticAzureCredentialsProvider",
 101    "DefaultAzureCredentials": "DefaultAzureCredentialsProvider",
 102    "AISCredentials": "StaticAISCredentialProvider",
 103    "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
 104    "GoogleServiceAccountCredentialsProvider": "GoogleServiceAccountCredentialsProvider",
 105    "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
 106    "FileBasedCredentials": "FileBasedCredentialsProvider",
 107}
 108
 109
 110def _resolve_include_path(include_path: str, parent_config_path: str) -> str:
 111    """
 112    Resolve include path (absolute or relative to parent config file).
 113
 114    :param include_path: Path from include keyword (can be absolute or relative)
 115    :param parent_config_path: Absolute path of the config file containing the include
 116    :return: Absolute, normalized path to the included config file
 117    """
 118    if os.path.isabs(include_path):
 119        return os.path.abspath(include_path)
 120    else:
 121        parent_dir = os.path.dirname(parent_config_path)
 122        return os.path.abspath(os.path.join(parent_dir, include_path))
 123
 124
 125def _merge_profiles(
 126    base_profiles: dict[str, Any],
 127    new_profiles: dict[str, Any],
 128    base_path: str,
 129    new_path: str,
 130) -> dict[str, Any]:
 131    """
 132    Merge profiles from two config files with conflict detection.
 133
 134    Profiles with the same name are allowed if their definitions are identical (idempotent).
 135    If a profile name exists in both configs with different definitions, an error is raised.
 136
 137    NOTE: This function performs SHALLOW merging - each profile is treated as a complete unit.
 138    We cannot use merge_dictionaries_no_overwrite() here because it would recursively merge
 139    profile fields, which could lead to unintended behavior. For example:
 140
 141    base_config:
 142        profiles:
 143            my-profile:
 144                storage_provider:
 145                    type: s3
 146                    options:
 147                        base_path: bucket
 148
 149    new_config:
 150        profiles:
 151            my-profile:
 152                credentials_provider:
 153                    type: S3Credentials
 154                    options:
 155                        access_key: foo
 156                        secret_key: bar
 157
 158    If we used merge_dictionaries_no_overwrite(), the result would be a profile with BOTH storage_provider
 159    and credentials_provider, which might not be the intended configuration.
 160
 161    :param base_profiles: Profiles from the base config
 162    :param new_profiles: Profiles from the new config to merge
 163    :param base_path: Path to the base config file (for error messages)
 164    :param new_path: Path to the new config file (for error messages)
 165    :return: Merged profiles dictionary
 166    :raises ValueError: If any profile name exists in both configs with different definitions
 167    """
 168    result = base_profiles.copy()
 169    conflicting_profiles = []
 170
 171    for profile_name, new_profile_def in new_profiles.items():
 172        if profile_name in result:
 173            if result[profile_name] != new_profile_def:
 174                conflicting_profiles.append(profile_name)
 175        else:
 176            result[profile_name] = new_profile_def
 177
 178    if conflicting_profiles:
 179        conflicts_list = ", ".join(f"'{p}'" for p in sorted(conflicting_profiles))
 180        raise ValueError(f"Profile conflict: {conflicts_list} defined differently in {base_path} and {new_path}")
 181
 182    return result
 183
 184
 185def _merge_opentelemetry(
 186    base_otel: dict[str, Any],
 187    new_otel: dict[str, Any],
 188    base_path: str,
 189    new_path: str,
 190) -> dict[str, Any]:
 191    """
 192    Merge opentelemetry configurations with special handling for metrics.attributes.
 193
 194    Merge strategy:
 195    - metrics.attributes: Concatenate arrays (preserve order for fallback mechanism)
 196    - Other fields: Use idempotent check (must be identical)
 197
 198    Attributes providers are concatenated to support fallback scenarios where multiple
 199    providers of the same type supply the same attribute key with different sources.
 200    The order matters - later providers override earlier ones.
 201
 202    :param base_otel: Base opentelemetry config
 203    :param new_otel: New opentelemetry config to merge
 204    :param base_path: Path to base config file (for error messages)
 205    :param new_path: Path to new config file (for error messages)
 206    :return: Merged opentelemetry configuration
 207    :raises ValueError: If conflicts are detected
 208    """
 209    # Merge metrics.attributes
 210    base_attrs = base_otel.get("metrics", {}).get("attributes", [])
 211    new_attrs = new_otel.get("metrics", {}).get("attributes", [])
 212    merged_attrs = base_attrs + new_attrs
 213
 214    # Merge other opentelemetry fields
 215    base_otel_without_attrs = copy.deepcopy(base_otel)
 216    if "metrics" in base_otel_without_attrs and "attributes" in base_otel_without_attrs["metrics"]:
 217        del base_otel_without_attrs["metrics"]["attributes"]
 218
 219    new_otel_without_attrs = copy.deepcopy(new_otel)
 220    if "metrics" in new_otel_without_attrs and "attributes" in new_otel_without_attrs["metrics"]:
 221        del new_otel_without_attrs["metrics"]["attributes"]
 222
 223    merged, conflicts = merge_dictionaries_no_overwrite(
 224        base_otel_without_attrs, new_otel_without_attrs, allow_idempotent=True
 225    )
 226
 227    if conflicts:
 228        conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
 229        raise ValueError(
 230            f"opentelemetry config conflict: {conflicts_list} defined differently in {base_path} and {new_path}"
 231        )
 232
 233    if "metrics" in merged:
 234        merged["metrics"]["attributes"] = merged_attrs
 235    elif merged_attrs:
 236        merged["metrics"] = {"attributes": merged_attrs}
 237
 238    return merged
 239
 240
 241def _merge_configs(
 242    base_config: dict[str, Any],
 243    new_config: dict[str, Any],
 244    base_path: str,
 245    new_path: str,
 246) -> dict[str, Any]:
 247    """
 248    Merge two config dictionaries with field-specific strategies.
 249
 250    Different config fields have different merge strategies:
 251    - profiles: Shallow merge with idempotent check (uses _merge_profiles)
 252    - path_mapping: Flat dict merge with idempotent check
 253    - experimental_features: Flat dict merge with idempotent check
 254    - opentelemetry: Hybrid merge (attributes concatenate, others idempotent)
 255    - cache, posix: Global configs, idempotent if identical, error if different
 256
 257    :param base_config: Base configuration dictionary
 258    :param new_config: New configuration to merge
 259    :param base_path: Path to base config file (for error messages)
 260    :param new_path: Path to new config file (for error messages)
 261    :return: Merged configuration dictionary
 262    :raises ValueError: If conflicts are detected
 263    """
 264    result = {}
 265
 266    all_keys = set(base_config.keys()) | set(new_config.keys())
 267
 268    for key in all_keys:
 269        base_value = base_config.get(key)
 270        new_value = new_config.get(key)
 271
 272        # Key only in base
 273        if key not in new_config:
 274            result[key] = base_value
 275            continue
 276
 277        # Key only in new
 278        if key not in base_config:
 279            result[key] = new_value
 280            continue
 281
 282        # Key in both - need to merge or detect conflict
 283        if key == "profiles":
 284            result["profiles"] = _merge_profiles(base_value or {}, new_value or {}, base_path, new_path)
 285
 286        elif key in ("path_mapping", "experimental_features"):
 287            merged, conflicts = merge_dictionaries_no_overwrite(
 288                (base_value or {}).copy(), new_value or {}, allow_idempotent=True
 289            )
 290            if conflicts:
 291                conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
 292                raise ValueError(
 293                    f"Config merge conflict: {conflicts_list} have different values in {base_path} and {new_path}"
 294                )
 295            result[key] = merged
 296
 297        elif key == "opentelemetry":
 298            result["opentelemetry"] = _merge_opentelemetry(base_value or {}, new_value or {}, base_path, new_path)
 299
 300        elif key in ("cache", "posix"):
 301            if base_value != new_value:
 302                raise ValueError(f"'{key}' defined differently in {base_path} and {new_path}")
 303            result[key] = base_value
 304
 305        elif key == "include":
 306            # 'include' is processed by _load_and_merge_includes, not part of final config
 307            pass
 308
 309        else:
 310            # This should never happen and all top level fields must have explicit handling above
 311            raise ValueError(f"Unknown field '{key}' in config file.")
 312
 313    return result
 314
 315
 316def _load_and_merge_includes(
 317    main_config_path: str,
 318    main_config_dict: dict[str, Any],
 319) -> dict[str, Any]:
 320    """
 321    Load and merge included config files.
 322
 323    Processes the 'include' directive in the main config, loading and merging all
 324    specified config files. Only supports one level of includes - included files
 325    cannot themselves have 'include' directives.
 326
 327    :param main_config_path: Absolute path to the main config file
 328    :param main_config_dict: Dictionary loaded from the main config file
 329    :return: Merged configuration dictionary (without 'include' field)
 330    :raises ValueError: If include file not found, malformed, or contains nested includes
 331    """
 332    include_paths = main_config_dict.get("include", [])
 333
 334    if not include_paths:
 335        return {k: v for k, v in main_config_dict.items() if k != "include"}
 336
 337    merged_config = {k: v for k, v in main_config_dict.items() if k != "include"}
 338
 339    for include_path in include_paths:
 340        resolved_path = _resolve_include_path(include_path, main_config_path)
 341
 342        if not os.path.exists(resolved_path):
 343            raise ValueError(f"Included config file not found: {resolved_path} (from {main_config_path})")
 344
 345        try:
 346            with open(resolved_path) as f:
 347                if resolved_path.endswith(".json"):
 348                    included_config = json.load(f)
 349                else:
 350                    included_config = yaml.safe_load(f)
 351        except Exception as e:
 352            raise ValueError(f"Failed to load included config {resolved_path}: {e}")
 353
 354        validate_config(included_config)
 355        if "include" in included_config:
 356            raise ValueError(f"Nested includes not allowed: {resolved_path} contains 'include' directive")
 357
 358        merged_config = _merge_configs(merged_config, included_config, main_config_path, resolved_path)
 359
 360    return merged_config
 361
 362
 363def _find_config_file_paths() -> tuple[str]:
 364    """
 365    Get configuration file search paths.
 366
 367    Returns paths in order of precedence:
 368
 369    1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
 370    2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
 371    """
 372    paths = []
 373
 374    # 1. User-specific configuration directory
 375    xdg_config_home = os.getenv("XDG_CONFIG_HOME")
 376
 377    if xdg_config_home:
 378        paths.extend(
 379            [
 380                os.path.join(xdg_config_home, "msc", "config.yaml"),
 381                os.path.join(xdg_config_home, "msc", "config.json"),
 382            ]
 383        )
 384
 385    user_home = os.getenv("HOME")
 386
 387    if user_home:
 388        paths.extend(
 389            [
 390                os.path.join(user_home, ".msc_config.yaml"),
 391                os.path.join(user_home, ".msc_config.json"),
 392                os.path.join(user_home, ".config", "msc", "config.yaml"),
 393                os.path.join(user_home, ".config", "msc", "config.json"),
 394            ]
 395        )
 396
 397    # 2. System-wide configuration directories
 398    xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
 399    if not xdg_config_dirs:
 400        xdg_config_dirs = "/etc/xdg"
 401
 402    for config_dir in xdg_config_dirs.split(":"):
 403        config_dir = config_dir.strip()
 404        if config_dir:
 405            paths.extend(
 406                [
 407                    os.path.join(config_dir, "msc", "config.yaml"),
 408                    os.path.join(config_dir, "msc", "config.json"),
 409                ]
 410            )
 411
 412    paths.extend(
 413        [
 414            "/etc/msc_config.yaml",
 415            "/etc/msc_config.json",
 416        ]
 417    )
 418
 419    return tuple(paths)
 420
 421
 422def _normalize_profile_name(profile: str, config_dict: dict[str, Any]) -> str:
 423    """
 424    Normalize the profile name to the reserved POSIX profile name if the legacy "default" POSIX profile is used.
 425
 426    :param profile: The profile name to normalize
 427    :param config_dict: The configuration dictionary
 428    :return: The normalized profile name
 429    """
 430    if profile == LEGACY_POSIX_PROFILE_NAME and profile not in config_dict.get("profiles", {}):
 431        logger.warning(
 432            f"The profile name '{LEGACY_POSIX_PROFILE_NAME}' is deprecated and will be removed in a future version. Please use '{RESERVED_POSIX_PROFILE_NAME}' instead."
 433        )
 434        return RESERVED_POSIX_PROFILE_NAME
 435    return profile
 436
 437
 438PACKAGE_NAME = "multistorageclient"
 439
 440logger = logging.getLogger(__name__)
 441
 442
 443class ImmutableDict(dict):
 444    """
 445    Immutable dictionary that raises an error when attempting to modify it.
 446    """
 447
 448    def __init__(self, *args, **kwargs):
 449        super().__init__(*args, **kwargs)
 450
 451        # Recursively freeze nested structures
 452        for key, value in list(super().items()):
 453            if isinstance(value, dict) and not isinstance(value, ImmutableDict):
 454                super().__setitem__(key, ImmutableDict(value))
 455            elif isinstance(value, list):
 456                super().__setitem__(key, self._freeze_list(value))
 457
 458    @staticmethod
 459    def _freeze_list(lst):
 460        """
 461        Convert list to tuple, freezing nested dicts recursively.
 462        """
 463        frozen = []
 464        for item in lst:
 465            if isinstance(item, dict):
 466                frozen.append(ImmutableDict(item))
 467            elif isinstance(item, list):
 468                frozen.append(ImmutableDict._freeze_list(item))
 469            else:
 470                frozen.append(item)
 471        return tuple(frozen)
 472
 473    def __deepcopy__(self, memo):
 474        """
 475        Return a regular mutable dict when deepcopy is called.
 476        """
 477        return copy.deepcopy(dict(self), memo)
 478
 479    def __reduce__(self):
 480        """
 481        Support for pickle serialization.
 482        """
 483        return (self.__class__, (dict(self),))
 484
 485    def _copy_value(self, value):
 486        """
 487        Convert frozen structures back to mutable equivalents.
 488        """
 489        if isinstance(value, ImmutableDict):
 490            return {k: self._copy_value(v) for k, v in value.items()}
 491        elif isinstance(value, tuple):
 492            # Check if it was originally a list (frozen by _freeze_list)
 493            return [self._copy_value(item) for item in value]
 494        else:
 495            return value
 496
 497    def __getitem__(self, key):
 498        """
 499        Return a mutable copy of the value.
 500        """
 501        value = super().__getitem__(key)
 502        return self._copy_value(value)
 503
 504    def get(self, key, default=None):
 505        """
 506        Return a mutable copy of the value.
 507        """
 508        return self[key] if key in self else default
 509
 510    def __setitem__(self, key, value):
 511        raise TypeError("ImmutableDict is immutable")
 512
 513    def __delitem__(self, key):
 514        raise TypeError("ImmutableDict is immutable")
 515
 516    def clear(self):
 517        raise TypeError("ImmutableDict is immutable")
 518
 519    def pop(self, *args):
 520        raise TypeError("ImmutableDict is immutable")
 521
 522    def popitem(self):
 523        raise TypeError("ImmutableDict is immutable")
 524
 525    def setdefault(self, key, default=None):
 526        raise TypeError("ImmutableDict is immutable")
 527
 528    def update(self, *args, **kwargs):
 529        raise TypeError("ImmutableDict is immutable")
 530
 531
 532class SimpleProviderBundle(ProviderBundle):
 533    def __init__(
 534        self,
 535        storage_provider_config: StorageProviderConfig,
 536        credentials_provider: Optional[CredentialsProvider] = None,
 537        metadata_provider: Optional[MetadataProvider] = None,
 538        replicas: Optional[list[Replica]] = None,
 539    ):
 540        if replicas is None:
 541            replicas = []
 542
 543        self._storage_provider_config = storage_provider_config
 544        self._credentials_provider = credentials_provider
 545        self._metadata_provider = metadata_provider
 546        self._replicas = replicas
 547
 548    @property
 549    def storage_provider_config(self) -> StorageProviderConfig:
 550        return self._storage_provider_config
 551
 552    @property
 553    def credentials_provider(self) -> Optional[CredentialsProvider]:
 554        return self._credentials_provider
 555
 556    @property
 557    def metadata_provider(self) -> Optional[MetadataProvider]:
 558        return self._metadata_provider
 559
 560    @property
 561    def replicas(self) -> list[Replica]:
 562        return self._replicas
 563
 564
 565class SimpleProviderBundleV2(ProviderBundleV2):
 566    def __init__(
 567        self, storage_backends: dict[str, StorageBackend], metadata_provider: Optional[MetadataProvider] = None
 568    ):
 569        self._storage_backends = storage_backends
 570        self._metadata_provider = metadata_provider
 571
 572    @staticmethod
 573    def from_v1_bundle(profile_name: str, v1_bundle: ProviderBundle) -> "SimpleProviderBundleV2":
 574        backend = StorageBackend(
 575            storage_provider_config=v1_bundle.storage_provider_config,
 576            credentials_provider=v1_bundle.credentials_provider,
 577            replicas=v1_bundle.replicas,
 578        )
 579
 580        return SimpleProviderBundleV2(
 581            storage_backends={profile_name: backend},
 582            metadata_provider=v1_bundle.metadata_provider,
 583        )
 584
 585    @property
 586    def storage_backends(self) -> dict[str, StorageBackend]:
 587        return self._storage_backends
 588
 589    @property
 590    def metadata_provider(self) -> Optional[MetadataProvider]:
 591        return self._metadata_provider
 592
 593
 594DEFAULT_CACHE_REFRESH_INTERVAL = 300
 595
 596
 597class StorageClientConfigLoader:
 598    _provider_bundle: ProviderBundleV2
 599    _resolved_config_dict: dict[str, Any]
 600    _profiles: dict[str, Any]
 601    _profile: str
 602    _profile_dict: dict[str, Any]
 603    _opentelemetry_dict: Optional[dict[str, Any]]
 604    _telemetry_provider: Optional[Callable[[], Telemetry]]
 605    _cache_dict: Optional[dict[str, Any]]
 606
 607    def __init__(
 608        self,
 609        config_dict: dict[str, Any],
 610        profile: str = RESERVED_POSIX_PROFILE_NAME,
 611        provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]] = None,
 612        telemetry_provider: Optional[Callable[[], Telemetry]] = None,
 613    ) -> None:
 614        """
 615        Initializes a :py:class:`StorageClientConfigLoader` to create a
 616        StorageClientConfig. Components are built using the ``config_dict`` and
 617        profile, but a pre-built provider_bundle takes precedence.
 618
 619        :param config_dict: Dictionary of configuration options.
 620        :param profile: Name of profile in ``config_dict`` to use to build configuration.
 621        :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle` or :py:class:`multistorageclient.types.ProviderBundleV2`, takes precedence over ``config_dict``.
 622        :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
 623        """
 624        # Interpolates all environment variables into actual values.
 625        config_dict = expand_env_vars(config_dict)
 626        self._resolved_config_dict = ImmutableDict(config_dict)
 627
 628        self._profiles = config_dict.get("profiles", {})
 629
 630        if RESERVED_POSIX_PROFILE_NAME not in self._profiles:
 631            # Assign the default POSIX profile
 632            self._profiles[RESERVED_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][RESERVED_POSIX_PROFILE_NAME]
 633        else:
 634            # Cannot override default POSIX profile
 635            if (
 636                self._profiles[RESERVED_POSIX_PROFILE_NAME]
 637                != DEFAULT_POSIX_PROFILE["profiles"][RESERVED_POSIX_PROFILE_NAME]
 638            ):
 639                raise ValueError(f'Cannot override "{RESERVED_POSIX_PROFILE_NAME}" profile with different settings.')
 640
 641        profile_dict = self._profiles.get(profile)
 642
 643        if not profile_dict:
 644            raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
 645
 646        self._profile = profile
 647        self._profile_dict = ImmutableDict(profile_dict)
 648
 649        self._opentelemetry_dict = config_dict.get("opentelemetry", None)
 650        # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
 651        # New processes (e.g. multiprocessing manager server) can't be created during this phase.
 652        #
 653        # Pass thunks everywhere instead for lazy telemetry initialization.
 654        self._telemetry_provider = telemetry_provider or telemetry_init
 655
 656        self._cache_dict = config_dict.get("cache", None)
 657        self._experimental_features = config_dict.get("experimental_features", None)
 658
 659        self._provider_bundle = self._build_provider_bundle(provider_bundle)
 660        self._inject_profiles_from_bundle()
 661
 662    def _build_storage_provider(
 663        self,
 664        storage_provider_name: str,
 665        storage_options: Optional[dict[str, Any]] = None,
 666        credentials_provider: Optional[CredentialsProvider] = None,
 667    ) -> StorageProvider:
 668        if storage_options is None:
 669            storage_options = {}
 670        if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
 671            raise ValueError(
 672                f"Storage provider {storage_provider_name} is not supported. "
 673                f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
 674            )
 675        if credentials_provider:
 676            storage_options["credentials_provider"] = credentials_provider
 677        if self._resolved_config_dict is not None:
 678            # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
 679            storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
 680        if self._telemetry_provider is not None:
 681            storage_options["telemetry_provider"] = self._telemetry_provider
 682        class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
 683        module_name = ".providers"
 684        cls = import_class(class_name, module_name, PACKAGE_NAME)
 685        return cls(**storage_options)
 686
 687    def _build_storage_provider_from_profile(self, storage_provider_profile: str):
 688        storage_profile_dict = self._profiles.get(storage_provider_profile)
 689        if not storage_profile_dict:
 690            raise ValueError(
 691                f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
 692            )
 693
 694        # Check if metadata provider is configured for this profile
 695        # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
 696        local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
 697        if local_metadata_provider_dict:
 698            raise ValueError(
 699                f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
 700            )
 701
 702        # Initialize CredentialsProvider
 703        local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
 704        local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
 705
 706        # Initialize StorageProvider
 707        local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
 708        if local_storage_provider_dict:
 709            local_name = local_storage_provider_dict["type"]
 710            local_storage_options = local_storage_provider_dict.get("options", {})
 711        else:
 712            raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
 713
 714        storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
 715        return storage_provider
 716
 717    def _build_credentials_provider(
 718        self,
 719        credentials_provider_dict: Optional[dict[str, Any]],
 720        storage_options: Optional[dict[str, Any]] = None,
 721    ) -> Optional[CredentialsProvider]:
 722        """
 723        Initializes the CredentialsProvider based on the provided dictionary.
 724
 725        Args:
 726            credentials_provider_dict: Dictionary containing credentials provider configuration
 727            storage_options: Storage provider options required by some credentials providers to scope the credentials.
 728        """
 729        if not credentials_provider_dict:
 730            return None
 731
 732        if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
 733            # Fully qualified class path case
 734            class_type = credentials_provider_dict["type"]
 735            module_name, class_name = class_type.rsplit(".", 1)
 736            cls = import_class(class_name, module_name)
 737        else:
 738            # Mapped class name case
 739            class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
 740            module_name = ".providers"
 741            cls = import_class(class_name, module_name, PACKAGE_NAME)
 742
 743        # Propagate storage provider options to credentials provider since they may be
 744        # required by some credentials providers to scope the credentials.
 745        import inspect
 746
 747        init_params = list(inspect.signature(cls.__init__).parameters)[1:]  # skip 'self'
 748        options = credentials_provider_dict.get("options", {})
 749        if storage_options:
 750            for storage_provider_option in storage_options.keys():
 751                if storage_provider_option in init_params and storage_provider_option not in options:
 752                    options[storage_provider_option] = storage_options[storage_provider_option]
 753
 754        return cls(**options)
 755
 756    def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
 757        # Initialize StorageProvider
 758        storage_provider_dict = profile_dict.get("storage_provider", None)
 759        if storage_provider_dict:
 760            storage_provider_name = storage_provider_dict["type"]
 761            storage_options = storage_provider_dict.get("options", {})
 762        else:
 763            raise ValueError("Missing storage_provider in the config.")
 764
 765        # Initialize CredentialsProvider
 766        # It is prudent to assume that in some cases, the credentials provider
 767        # will provide credentials scoped to specific base_path.
 768        # So we need to pass the storage_options to the credentials provider.
 769        credentials_provider_dict = profile_dict.get("credentials_provider", None)
 770        credentials_provider = self._build_credentials_provider(
 771            credentials_provider_dict=credentials_provider_dict,
 772            storage_options=storage_options,
 773        )
 774
 775        # Initialize MetadataProvider
 776        metadata_provider_dict = profile_dict.get("metadata_provider", None)
 777        metadata_provider = None
 778        if metadata_provider_dict:
 779            if metadata_provider_dict["type"] == "manifest":
 780                metadata_options = metadata_provider_dict.get("options", {})
 781                # If MetadataProvider has a reference to a different storage provider profile
 782                storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
 783                if storage_provider_profile:
 784                    storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
 785                else:
 786                    storage_provider = self._build_storage_provider(
 787                        storage_provider_name, storage_options, credentials_provider
 788                    )
 789
 790                metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
 791            else:
 792                class_type = metadata_provider_dict["type"]
 793                if "." not in class_type:
 794                    raise ValueError(
 795                        f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
 796                    )
 797                module_name, class_name = class_type.rsplit(".", 1)
 798                cls = import_class(class_name, module_name)
 799                options = metadata_provider_dict.get("options", {})
 800                metadata_provider = cls(**options)
 801
 802        # Build replicas if configured
 803        replicas_config = profile_dict.get("replicas", [])
 804        replicas = []
 805        if replicas_config:
 806            for replica_dict in replicas_config:
 807                replicas.append(
 808                    Replica(
 809                        replica_profile=replica_dict["replica_profile"],
 810                        read_priority=replica_dict["read_priority"],
 811                    )
 812                )
 813
 814            # Sort replicas by read_priority
 815            replicas.sort(key=lambda r: r.read_priority)
 816
 817        return SimpleProviderBundle(
 818            storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
 819            credentials_provider=credentials_provider,
 820            metadata_provider=metadata_provider,
 821            replicas=replicas,
 822        )
 823
 824    def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
 825        class_type = provider_bundle_dict["type"]
 826        module_name, class_name = class_type.rsplit(".", 1)
 827        cls = import_class(class_name, module_name)
 828        options = provider_bundle_dict.get("options", {})
 829        return cls(**options)
 830
 831    def _build_provider_bundle(
 832        self, provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]]
 833    ) -> ProviderBundleV2:
 834        if provider_bundle:
 835            bundle = provider_bundle
 836        else:
 837            provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
 838            if provider_bundle_dict:
 839                bundle = self._build_provider_bundle_from_extension(provider_bundle_dict)
 840            else:
 841                bundle = self._build_provider_bundle_from_config(self._profile_dict)
 842
 843        if isinstance(bundle, ProviderBundle) and not isinstance(bundle, ProviderBundleV2):
 844            bundle = SimpleProviderBundleV2.from_v1_bundle(self._profile, bundle)
 845
 846        return bundle
 847
 848    def _inject_profiles_from_bundle(self) -> None:
 849        """
 850        Inject child profiles and build child configs for multi-backend configurations.
 851
 852        For ProviderBundleV2 with multiple backends, this method:
 853        1. Injects child profiles into config dict (needed for replica initialization)
 854        2. Pre-builds child configs so CompositeStorageClient can use them directly
 855
 856        Profile injection is required because SingleStorageClient._initialize_replicas()
 857        uses StorageClientConfig.from_dict() to create replica clients, which looks up
 858        profiles in the config dict.
 859        """
 860        self._child_configs: Optional[dict[str, StorageClientConfig]] = None
 861
 862        backends = self._provider_bundle.storage_backends
 863        if len(backends) > 1:
 864            # First, inject all child profiles into config dict (needed for replica lookup)
 865            profiles = copy.deepcopy(self._profiles)
 866            for child_name, backend in backends.items():
 867                child_profile_dict = {
 868                    "storage_provider": {
 869                        "type": backend.storage_provider_config.type,
 870                        "options": backend.storage_provider_config.options,
 871                    }
 872                }
 873
 874                if child_name in profiles:
 875                    # Profile already exists - check if it matches what we would inject
 876                    existing = profiles[child_name]
 877                    if existing.get("storage_provider") != child_profile_dict["storage_provider"]:
 878                        raise ValueError(
 879                            f"Profile '{child_name}' already exists in configuration with different settings."
 880                        )
 881                else:
 882                    profiles[child_name] = child_profile_dict
 883
 884            # Update config dict BEFORE building child configs
 885            resolved_config_dict = copy.deepcopy(self._resolved_config_dict)
 886            resolved_config_dict["profiles"] = profiles
 887            self._profiles = ImmutableDict(profiles)
 888            self._resolved_config_dict = ImmutableDict(resolved_config_dict)
 889
 890            # Now build child configs (they will get the updated config dict)
 891            retry_config = self._build_retry_config()
 892            child_configs: dict[str, StorageClientConfig] = {}
 893            for child_name, backend in backends.items():
 894                storage_provider = self._build_storage_provider(
 895                    backend.storage_provider_config.type,
 896                    backend.storage_provider_config.options,
 897                    backend.credentials_provider,
 898                )
 899
 900                child_config = StorageClientConfig(
 901                    profile=child_name,
 902                    storage_provider=storage_provider,
 903                    credentials_provider=backend.credentials_provider,
 904                    storage_provider_profiles=None,
 905                    child_configs=None,
 906                    metadata_provider=None,
 907                    cache_config=None,
 908                    cache_manager=None,
 909                    retry_config=retry_config,
 910                    telemetry_provider=self._telemetry_provider,
 911                    replicas=backend.replicas,
 912                    autocommit_config=None,
 913                )
 914                child_config._config_dict = self._resolved_config_dict
 915                child_configs[child_name] = child_config
 916
 917            self._child_configs = child_configs
 918
 919    def _build_retry_config(self) -> RetryConfig:
 920        """Build retry config from profile dict."""
 921        retry_config_dict = self._profile_dict.get("retry", None)
 922        if retry_config_dict:
 923            attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
 924            delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
 925            backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
 926            return RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
 927        else:
 928            return RetryConfig(
 929                attempts=DEFAULT_RETRY_ATTEMPTS,
 930                delay=DEFAULT_RETRY_DELAY,
 931                backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
 932            )
 933
 934    def _build_autocommit_config(self) -> AutoCommitConfig:
 935        """Build autocommit config from profile dict."""
 936        autocommit_dict = self._profile_dict.get("autocommit", None)
 937        if autocommit_dict:
 938            interval_minutes = autocommit_dict.get("interval_minutes", None)
 939            at_exit = autocommit_dict.get("at_exit", False)
 940            return AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
 941        return AutoCommitConfig()
 942
 943    def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
 944        if "size_mb" in cache_dict:
 945            raise ValueError(
 946                "The 'size_mb' property is no longer supported. \n"
 947                "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
 948                "Example configuration:\n"
 949                "cache:\n"
 950                "  size: 500G                    # Optional: Maximum cache size (default: 10G)\n"
 951                "  cache_line_size: 64M          # Optional: Chunk size for partial file caching (default: 64M)\n"
 952                "  check_source_version: true    # Optional: Use ETag for cache validation (default: true)\n"
 953                "  location: /tmp/msc_cache      # Optional: Cache directory path (default: system tempdir + '/msc_cache')\n"
 954                "  eviction_policy:               # Optional: Cache eviction policy\n"
 955                "    policy: fifo                 # Optional: Policy type: lru, mru, fifo, random, no_eviction (default: fifo)\n"
 956                "    refresh_interval: 300        # Optional: Cache refresh interval in seconds (default: 300)\n"
 957            )
 958
 959        # Validate that cache_line_size doesn't exceed cache size
 960        cache_size_str = cache_dict.get("size", DEFAULT_CACHE_SIZE)
 961        cache_line_size_str = cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE)
 962
 963        # Use CacheConfig to convert sizes to bytes for comparison
 964        temp_cache_config = CacheConfig(
 965            size=cache_size_str,
 966            cache_line_size=cache_line_size_str,
 967        )
 968        cache_size_bytes = temp_cache_config.size_bytes()
 969        cache_line_size_bytes = temp_cache_config.cache_line_size_bytes()
 970
 971        if cache_line_size_bytes > cache_size_bytes:
 972            raise ValueError(
 973                f"cache_line_size ({cache_line_size_str}) exceeds cache size ({cache_size_str}). "
 974                f"cache_line_size must be less than or equal to cache size. "
 975                f"Consider increasing cache size or decreasing cache_line_size."
 976            )
 977
 978    def _validate_replicas(self, replicas: list[Replica]) -> None:
 979        """
 980        Validates that replica profiles do not have their own replicas configuration.
 981
 982        This prevents circular references where a replica profile could reference
 983        another profile that also has replicas, creating an infinite loop.
 984
 985        :param replicas: The list of Replica objects to validate
 986        :raises ValueError: If any replica profile has its own replicas configuration
 987        """
 988        for replica in replicas:
 989            replica_profile_name = replica.replica_profile
 990
 991            # Check that replica profile is not the same as the current profile
 992            if replica_profile_name == self._profile:
 993                raise ValueError(
 994                    f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
 995                )
 996
 997            # Check if the replica profile exists in the configuration
 998            if replica_profile_name not in self._profiles:
 999                raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
1000
1001            # Get the replica profile configuration
1002            replica_profile_dict = self._profiles[replica_profile_name]
1003
1004            # Check if the replica profile has its own replicas configuration
1005            if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
1006                raise ValueError(
1007                    f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
1008                    f"This creates a circular reference which is not allowed."
1009                )
1010
1011    # todo: remove once experimental features are stable
1012    def _validate_experimental_features(self, eviction_policy: EvictionPolicyConfig) -> None:
1013        """
1014        Validate that experimental features are enabled when used.
1015
1016        :param eviction_policy: The eviction policy configuration to validate
1017        :raises ValueError: If experimental features are used without being enabled
1018        """
1019        # Validate MRU eviction policy
1020        if eviction_policy.policy.upper() == "MRU":
1021            if not (self._experimental_features and self._experimental_features.get("cache_mru_eviction")):
1022                raise ValueError(
1023                    "MRU eviction policy is experimental and not enabled.\n"
1024                    "Enable it by adding to config:\n"
1025                    "  experimental_features:\n"
1026                    "    cache_mru_eviction: true"
1027                )
1028
1029        # Validate purge_factor
1030        if eviction_policy.purge_factor != 0:
1031            if eviction_policy.purge_factor < 0 or eviction_policy.purge_factor > 100:
1032                raise ValueError("purge_factor must be between 0 and 100")
1033
1034            if not (self._experimental_features and self._experimental_features.get("cache_purge_factor")):
1035                raise ValueError(
1036                    "purge_factor is experimental and not enabled.\n"
1037                    "Enable it by adding to config:\n"
1038                    "  experimental_features:\n"
1039                    "    cache_purge_factor: true"
1040                )
1041
1042    def build_config(self) -> "StorageClientConfig":
1043        bundle = self._provider_bundle
1044        backends = bundle.storage_backends
1045
1046        if len(backends) > 1:
1047            if not bundle.metadata_provider:
1048                raise ValueError(
1049                    f"Multi-backend configuration for profile '{self._profile}' requires metadata_provider "
1050                    "for routing between storage locations."
1051                )
1052
1053            child_profile_names = list(backends.keys())
1054            retry_config = self._build_retry_config()
1055            autocommit_config = self._build_autocommit_config()
1056
1057            # config for Composite StorageClient
1058            config = StorageClientConfig(
1059                profile=self._profile,
1060                storage_provider=None,
1061                credentials_provider=None,
1062                storage_provider_profiles=child_profile_names,
1063                child_configs=self._child_configs,
1064                metadata_provider=bundle.metadata_provider,
1065                cache_config=None,
1066                cache_manager=None,
1067                retry_config=retry_config,
1068                telemetry_provider=self._telemetry_provider,
1069                replicas=[],
1070                autocommit_config=autocommit_config,
1071            )
1072
1073            config._config_dict = self._resolved_config_dict
1074            return config
1075
1076        # Single-backend (len == 1)
1077        _, backend = list(backends.items())[0]
1078        storage_provider = self._build_storage_provider(
1079            backend.storage_provider_config.type,
1080            backend.storage_provider_config.options,
1081            backend.credentials_provider,
1082        )
1083        credentials_provider = backend.credentials_provider
1084        metadata_provider = bundle.metadata_provider
1085        replicas = backend.replicas
1086
1087        # Validate replicas to prevent circular references
1088        if replicas:
1089            self._validate_replicas(replicas)
1090
1091        cache_config: Optional[CacheConfig] = None
1092        cache_manager: Optional[CacheManager] = None
1093
1094        # Check if caching is enabled for this profile
1095        caching_enabled = self._profile_dict.get("caching_enabled", False)
1096
1097        if self._cache_dict is not None and caching_enabled:
1098            tempdir = tempfile.gettempdir()
1099            default_location = os.path.join(tempdir, "msc_cache")
1100            location = self._cache_dict.get("location", default_location)
1101
1102            # Check if cache_backend.cache_path is defined
1103            cache_backend = self._cache_dict.get("cache_backend", {})
1104            cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
1105
1106            # Warn if both location and cache_backend.cache_path are defined
1107            if cache_backend_path and self._cache_dict.get("location") is not None:
1108                logger.warning(
1109                    f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
1110                    f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
1111                )
1112            elif cache_backend_path:
1113                # Use cache_backend.cache_path only if location is not explicitly defined
1114                location = cache_backend_path
1115
1116            # Resolve the effective flag while preserving explicit ``False``.
1117            if "check_source_version" in self._cache_dict:
1118                check_source_version = self._cache_dict["check_source_version"]
1119            else:
1120                check_source_version = self._cache_dict.get("use_etag", True)
1121
1122            # Warn if both keys are specified – the new one wins.
1123            if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
1124                logger.warning(
1125                    "Both 'check_source_version' and 'use_etag' are defined in cache config. "
1126                    "Using 'check_source_version' and ignoring 'use_etag'."
1127                )
1128
1129            if not Path(location).is_absolute():
1130                raise ValueError(f"Cache location must be an absolute path: {location}")
1131
1132            # Initialize cache_dict with default values
1133            cache_dict = self._cache_dict
1134
1135            # Verify cache config
1136            self._verify_cache_config(cache_dict)
1137
1138            # Initialize eviction policy
1139            if "eviction_policy" in cache_dict:
1140                policy = cache_dict["eviction_policy"]["policy"].lower()
1141                purge_factor = cache_dict["eviction_policy"].get("purge_factor", 0)
1142                eviction_policy = EvictionPolicyConfig(
1143                    policy=policy,
1144                    refresh_interval=cache_dict["eviction_policy"].get(
1145                        "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
1146                    ),
1147                    purge_factor=purge_factor,
1148                )
1149            else:
1150                eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
1151
1152            # todo: remove once experimental features are stable
1153            # Validate experimental features before creating cache_config
1154            self._validate_experimental_features(eviction_policy)
1155
1156            # Create cache config from the standardized format
1157            cache_config = CacheConfig(
1158                size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
1159                location=cache_dict.get("location", location),
1160                check_source_version=check_source_version,
1161                eviction_policy=eviction_policy,
1162                cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
1163            )
1164
1165            cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
1166        elif self._cache_dict is not None and not caching_enabled:
1167            logger.debug(f"Caching is disabled for profile '{self._profile}'")
1168        elif self._cache_dict is None and caching_enabled:
1169            logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
1170
1171        retry_config = self._build_retry_config()
1172        autocommit_config = self._build_autocommit_config()
1173
1174        config = StorageClientConfig(
1175            profile=self._profile,
1176            storage_provider=storage_provider,
1177            credentials_provider=credentials_provider,
1178            storage_provider_profiles=None,
1179            child_configs=None,
1180            metadata_provider=metadata_provider,
1181            cache_config=cache_config,
1182            cache_manager=cache_manager,
1183            retry_config=retry_config,
1184            telemetry_provider=self._telemetry_provider,
1185            replicas=replicas,
1186            autocommit_config=autocommit_config,
1187        )
1188
1189        config._config_dict = self._resolved_config_dict
1190        return config
1191
1192
1193class PathMapping:
1194    """
1195    Class to handle path mappings defined in the MSC configuration.
1196
1197    Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1198    where entries are sorted by prefix length (longest first) for optimal matching.
1199    Longer paths take precedence when matching.
1200    """
1201
1202    def __init__(self):
1203        """Initialize an empty PathMapping."""
1204        self._mapping = defaultdict(lambda: defaultdict(list))
1205
1206    @classmethod
1207    def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
1208        """
1209        Create a PathMapping instance from configuration dictionary.
1210
1211        :param config_dict: Configuration dictionary, if None the config will be loaded
1212        :return: A PathMapping instance with processed mappings
1213        """
1214        if config_dict is None:
1215            # Import locally to avoid circular imports
1216            from multistorageclient.config import StorageClientConfig
1217
1218            config_dict, _ = StorageClientConfig.read_msc_config()
1219
1220        if not config_dict:
1221            return cls()
1222
1223        instance = cls()
1224        instance._load_mapping(config_dict)
1225        return instance
1226
1227    def _load_mapping(self, config_dict: dict[str, Any]) -> None:
1228        """
1229        Load path mapping from a configuration dictionary.
1230
1231        :param config_dict: Configuration dictionary containing path mapping
1232        """
1233        # Get the path_mapping section
1234        path_mapping = config_dict.get("path_mapping", {})
1235        if path_mapping is None:
1236            return
1237
1238        # Process each mapping
1239        for source_path, dest_path in path_mapping.items():
1240            # Validate format
1241            if not source_path.endswith("/"):
1242                continue
1243            if not dest_path.startswith(MSC_PROTOCOL):
1244                continue
1245            if not dest_path.endswith("/"):
1246                continue
1247
1248            # Extract the destination profile
1249            pr_dest = urlparse(dest_path)
1250            dest_profile = pr_dest.netloc
1251
1252            # Parse the source path
1253            pr = urlparse(source_path)
1254            protocol = pr.scheme.lower() if pr.scheme else "file"
1255
1256            if protocol == "file" or source_path.startswith("/"):
1257                # For file or absolute paths, use the whole path as the prefix
1258                # and leave bucket empty
1259                bucket = ""
1260                prefix = source_path if source_path.startswith("/") else pr.path
1261            else:
1262                # For object storage, extract bucket and prefix
1263                bucket = pr.netloc
1264                prefix = pr.path
1265                if prefix.startswith("/"):
1266                    prefix = prefix[1:]
1267
1268            # Add the mapping to the nested dict
1269            self._mapping[protocol][bucket].append((prefix, dest_profile))
1270
1271        # Sort each bucket's prefixes by length (longest first) for optimal matching
1272        for protocol, buckets in self._mapping.items():
1273            for bucket, prefixes in buckets.items():
1274                self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
1275
1276    def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
1277        """
1278        Find the best matching mapping for the given URL.
1279
1280        :param url: URL to find matching mapping for
1281        :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
1282        """
1283        # Parse the URL
1284        pr = urlparse(url)
1285        protocol = pr.scheme.lower() if pr.scheme else "file"
1286
1287        # For file paths or absolute paths
1288        if protocol == "file" or url.startswith("/"):
1289            path = url if url.startswith("/") else pr.path
1290
1291            possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
1292
1293            # Check each prefix (already sorted by length, longest first)
1294            for prefix, profile in possible_mapping:
1295                if path.startswith(prefix):
1296                    # Calculate the relative path
1297                    rel_path = path[len(prefix) :]
1298                    if not rel_path.startswith("/"):
1299                        rel_path = "/" + rel_path
1300                    return profile, rel_path
1301
1302            return None
1303
1304        # For object storage
1305        bucket = pr.netloc
1306        path = pr.path
1307        if path.startswith("/"):
1308            path = path[1:]
1309
1310        # Check bucket-specific mapping
1311        possible_mapping = (
1312            self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
1313        )
1314
1315        # Check each prefix (already sorted by length, longest first)
1316        for prefix, profile in possible_mapping:
1317            # matching prefix
1318            if path.startswith(prefix):
1319                rel_path = path[len(prefix) :]
1320                # Remove leading slash if present
1321                if rel_path.startswith("/"):
1322                    rel_path = rel_path[1:]
1323
1324                return profile, rel_path
1325
1326        return None
1327
1328
[docs] 1329class StorageClientConfig: 1330 """ 1331 Configuration class for the :py:class:`multistorageclient.StorageClient`. 1332 """ 1333 1334 profile: str 1335 storage_provider: Optional[StorageProvider] 1336 credentials_provider: Optional[CredentialsProvider] 1337 storage_provider_profiles: Optional[list[str]] 1338 child_configs: Optional[dict[str, "StorageClientConfig"]] 1339 metadata_provider: Optional[MetadataProvider] 1340 cache_config: Optional[CacheConfig] 1341 cache_manager: Optional[CacheManager] 1342 retry_config: Optional[RetryConfig] 1343 telemetry_provider: Optional[Callable[[], Telemetry]] 1344 replicas: list[Replica] 1345 autocommit_config: Optional[AutoCommitConfig] 1346 1347 _config_dict: Optional[dict[str, Any]] 1348 1349 def __init__( 1350 self, 1351 profile: str, 1352 storage_provider: Optional[StorageProvider] = None, 1353 credentials_provider: Optional[CredentialsProvider] = None, 1354 storage_provider_profiles: Optional[list[str]] = None, 1355 child_configs: Optional[dict[str, "StorageClientConfig"]] = None, 1356 metadata_provider: Optional[MetadataProvider] = None, 1357 cache_config: Optional[CacheConfig] = None, 1358 cache_manager: Optional[CacheManager] = None, 1359 retry_config: Optional[RetryConfig] = None, 1360 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1361 replicas: Optional[list[Replica]] = None, 1362 autocommit_config: Optional[AutoCommitConfig] = None, 1363 ): 1364 # exactly one of storage_provider or storage_provider_profiles must be set 1365 if storage_provider and storage_provider_profiles: 1366 raise ValueError( 1367 "Cannot specify both storage_provider and storage_provider_profiles. " 1368 "Use storage_provider for SingleStorageClient or storage_provider_profiles for CompositeStorageClient." 1369 ) 1370 if not storage_provider and not storage_provider_profiles: 1371 raise ValueError("Must specify either storage_provider or storage_provider_profiles.") 1372 1373 if replicas is None: 1374 replicas = [] 1375 self.profile = profile 1376 self.storage_provider = storage_provider 1377 self.credentials_provider = credentials_provider 1378 self.storage_provider_profiles = storage_provider_profiles 1379 self.child_configs = child_configs 1380 self.metadata_provider = metadata_provider 1381 self.cache_config = cache_config 1382 self.cache_manager = cache_manager 1383 self.retry_config = retry_config 1384 self.telemetry_provider = telemetry_provider 1385 self.replicas = replicas 1386 self.autocommit_config = autocommit_config 1387
[docs] 1388 @staticmethod 1389 def from_json( 1390 config_json: str, 1391 profile: str = RESERVED_POSIX_PROFILE_NAME, 1392 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1393 ) -> "StorageClientConfig": 1394 """ 1395 Load a storage client configuration from a JSON string. 1396 1397 :param config_json: Configuration JSON string. 1398 :param profile: Profile to use. 1399 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1400 """ 1401 config_dict = json.loads(config_json) 1402 return StorageClientConfig.from_dict( 1403 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 1404 )
1405
[docs] 1406 @staticmethod 1407 def from_yaml( 1408 config_yaml: str, 1409 profile: str = RESERVED_POSIX_PROFILE_NAME, 1410 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1411 ) -> "StorageClientConfig": 1412 """ 1413 Load a storage client configuration from a YAML string. 1414 1415 :param config_yaml: Configuration YAML string. 1416 :param profile: Profile to use. 1417 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1418 """ 1419 config_dict = yaml.safe_load(config_yaml) or {} 1420 return StorageClientConfig.from_dict( 1421 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 1422 )
1423
[docs] 1424 @staticmethod 1425 def from_dict( 1426 config_dict: dict[str, Any], 1427 profile: str = RESERVED_POSIX_PROFILE_NAME, 1428 skip_validation: bool = False, 1429 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1430 ) -> "StorageClientConfig": 1431 """ 1432 Load a storage client configuration from a Python dictionary. 1433 1434 :param config_dict: Configuration Python dictionary. 1435 :param profile: Profile to use. 1436 :param skip_validation: Skip configuration schema validation. 1437 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1438 """ 1439 # Validate the config file with predefined JSON schema 1440 if not skip_validation: 1441 validate_config(config_dict) 1442 1443 # Load config 1444 loader = StorageClientConfigLoader( 1445 config_dict=config_dict, 1446 profile=_normalize_profile_name(profile, config_dict), 1447 telemetry_provider=telemetry_provider, 1448 ) 1449 config = loader.build_config() 1450 1451 return config
1452
[docs] 1453 @staticmethod 1454 def from_file( 1455 config_file_paths: Optional[Iterable[str]] = None, 1456 profile: str = RESERVED_POSIX_PROFILE_NAME, 1457 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1458 ) -> "StorageClientConfig": 1459 """ 1460 Load a storage client configuration from the first file found. 1461 1462 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`). 1463 :param profile: Profile to use. 1464 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1465 """ 1466 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths) 1467 # Parse rclone config file. 1468 rclone_config_dict, rclone_config_file = read_rclone_config() 1469 1470 # Merge config files. 1471 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict) 1472 if conflicted_keys: 1473 raise ValueError( 1474 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}' 1475 ) 1476 merged_profiles = merged_config.get("profiles", {}) 1477 1478 # Check if profile is in merged_profiles 1479 if profile in merged_profiles: 1480 return StorageClientConfig.from_dict( 1481 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider 1482 ) 1483 else: 1484 # Check if profile is the default POSIX profile or an implicit profile 1485 if profile == RESERVED_POSIX_PROFILE_NAME or profile == LEGACY_POSIX_PROFILE_NAME: 1486 implicit_profile_config = DEFAULT_POSIX_PROFILE 1487 elif profile.startswith("_"): 1488 # Handle implicit profiles 1489 parts = profile[1:].split("-", 1) 1490 if len(parts) == 2: 1491 protocol, bucket = parts 1492 # Verify it's a supported protocol 1493 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 1494 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"') 1495 implicit_profile_config = create_implicit_profile_config( 1496 profile_name=profile, protocol=protocol, base_path=bucket 1497 ) 1498 else: 1499 raise ValueError(f'Invalid implicit profile format: "{profile}"') 1500 else: 1501 raise ValueError( 1502 f'Profile "{profile}" not found in configuration files. Configuration was checked in ' 1503 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. " 1504 f"Please verify that the profile exists and that configuration files are correctly located." 1505 ) 1506 # merge the implicit profile config into the merged config so the cache & observability config can be inherited 1507 if "profiles" not in merged_config: 1508 merged_config["profiles"] = implicit_profile_config["profiles"] 1509 else: 1510 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile] 1511 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_" 1512 return StorageClientConfig.from_dict( 1513 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider 1514 )
1515 1516 @staticmethod 1517 def from_provider_bundle( 1518 config_dict: dict[str, Any], 1519 provider_bundle: Union[ProviderBundle, ProviderBundleV2], 1520 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1521 ) -> "StorageClientConfig": 1522 loader = StorageClientConfigLoader( 1523 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider 1524 ) 1525 config = loader.build_config() 1526 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors 1527 return config 1528
[docs] 1529 @staticmethod 1530 def read_msc_config( 1531 config_file_paths: Optional[Iterable[str]] = None, 1532 ) -> tuple[Optional[dict[str, Any]], Optional[str]]: 1533 """Get the MSC configuration dictionary and the path of the first file found. 1534 1535 If no config paths are specified, configs are searched in the following order: 1536 1537 1. ``MSC_CONFIG`` environment variable (highest precedence) 1538 2. Standard search paths (user-specified config and system-wide config) 1539 1540 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used. 1541 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration 1542 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute 1543 path of the config file used, or ``None`` if no config file was found. 1544 """ 1545 config_dict: dict[str, Any] = {} 1546 config_file_path: Optional[str] = None 1547 1548 config_file_paths = list(config_file_paths or []) 1549 1550 # Add default paths if none provided. 1551 if len(config_file_paths) == 0: 1552 # Environment variable. 1553 msc_config_env = os.getenv("MSC_CONFIG", None) 1554 if msc_config_env is not None: 1555 config_file_paths.append(msc_config_env) 1556 1557 # Standard search paths. 1558 config_file_paths.extend(_find_config_file_paths()) 1559 1560 # Normalize + absolutize paths. 1561 config_file_paths = [os.path.abspath(path) for path in config_file_paths] 1562 1563 # Log plan. 1564 logger.debug(f"Searching MSC config file paths: {config_file_paths}") 1565 1566 # Load config. 1567 for path in config_file_paths: 1568 if os.path.exists(path): 1569 try: 1570 with open(path) as f: 1571 if path.endswith(".json"): 1572 config_dict = json.load(f) 1573 else: 1574 config_dict = yaml.safe_load(f) 1575 config_file_path = path 1576 # Use the first config file. 1577 break 1578 except Exception as e: 1579 raise ValueError(f"malformed MSC config file: {path}, exception: {e}") 1580 1581 # Log result. 1582 if config_file_path is None: 1583 logger.debug("No MSC config files found in any of the search locations.") 1584 else: 1585 logger.debug(f"Using MSC config file: {config_file_path}") 1586 1587 if config_dict: 1588 validate_config(config_dict) 1589 1590 if "include" in config_dict and config_file_path: 1591 config_dict = _load_and_merge_includes(config_file_path, config_dict) 1592 1593 return config_dict, config_file_path
1594
[docs] 1595 @staticmethod 1596 def read_path_mapping() -> PathMapping: 1597 """ 1598 Get the path mapping defined in the MSC configuration. 1599 1600 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)] 1601 where entries are sorted by prefix length (longest first) for optimal matching. 1602 Longer paths take precedence when matching. 1603 1604 :return: A PathMapping instance with translation mappings 1605 """ 1606 try: 1607 return PathMapping.from_config() 1608 except Exception: 1609 # Log the error but continue - this shouldn't stop the application from working 1610 logger.error("Failed to load path_mapping from MSC config") 1611 return PathMapping()
1612 1613 def __getstate__(self) -> dict[str, Any]: 1614 state = self.__dict__.copy() 1615 if not state.get("_config_dict"): 1616 raise ValueError("StorageClientConfig is not serializable") 1617 del state["credentials_provider"] 1618 del state["storage_provider"] 1619 del state["metadata_provider"] 1620 del state["cache_manager"] 1621 del state["replicas"] 1622 del state["child_configs"] 1623 return state 1624 1625 def __setstate__(self, state: dict[str, Any]) -> None: 1626 # Presence checked by __getstate__. 1627 config_dict = state["_config_dict"] 1628 loader = StorageClientConfigLoader( 1629 config_dict=config_dict, 1630 profile=state["profile"], 1631 telemetry_provider=state["telemetry_provider"], 1632 ) 1633 new_config = loader.build_config() 1634 self.profile = new_config.profile 1635 self.storage_provider = new_config.storage_provider 1636 self.credentials_provider = new_config.credentials_provider 1637 self.storage_provider_profiles = new_config.storage_provider_profiles 1638 self.child_configs = new_config.child_configs 1639 self.metadata_provider = new_config.metadata_provider 1640 self.cache_config = new_config.cache_config 1641 self.cache_manager = new_config.cache_manager 1642 self.retry_config = new_config.retry_config 1643 self.telemetry_provider = new_config.telemetry_provider 1644 self._config_dict = config_dict 1645 self.replicas = new_config.replicas 1646 self.autocommit_config = new_config.autocommit_config