Source code for multistorageclient.config

   1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
   2# SPDX-License-Identifier: Apache-2.0
   3#
   4# Licensed under the Apache License, Version 2.0 (the "License");
   5# you may not use this file except in compliance with the License.
   6# You may obtain a copy of the License at
   7#
   8# http://www.apache.org/licenses/LICENSE-2.0
   9#
  10# Unless required by applicable law or agreed to in writing, software
  11# distributed under the License is distributed on an "AS IS" BASIS,
  12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13# See the License for the specific language governing permissions and
  14# limitations under the License.
  15
  16import copy
  17import json
  18import logging
  19import os
  20import tempfile
  21from collections import defaultdict
  22from collections.abc import Callable
  23from pathlib import Path
  24from typing import Any, Iterable, Optional, Union
  25from urllib.parse import urlparse
  26
  27import yaml
  28
  29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
  30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
  31from .providers.manifest_metadata import ManifestMetadataProvider
  32from .rclone import read_rclone_config
  33from .schema import validate_config
  34from .telemetry import Telemetry
  35from .telemetry import init as telemetry_init
  36from .types import (
  37    DEFAULT_RETRY_ATTEMPTS,
  38    DEFAULT_RETRY_BACKOFF_MULTIPLIER,
  39    DEFAULT_RETRY_DELAY,
  40    MSC_PROTOCOL,
  41    AutoCommitConfig,
  42    CredentialsProvider,
  43    MetadataProvider,
  44    ProviderBundle,
  45    ProviderBundleV2,
  46    Replica,
  47    RetryConfig,
  48    StorageBackend,
  49    StorageProvider,
  50    StorageProviderConfig,
  51)
  52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
  53
  54# Constants related to implicit profiles
  55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
  56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
  57    "s3": "s3",
  58    "gs": "gcs",
  59    "ais": "ais",
  60    "file": "file",
  61}
  62
  63
  64# Template for creating implicit profile configurations
  65def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
  66    """
  67    Create a configuration dictionary for an implicit profile.
  68
  69    :param profile_name: The name of the profile (e.g., "_s3-bucket1")
  70    :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
  71    :param base_path: The base path (e.g., bucket name) for the storage provider
  72
  73    :return: A configuration dictionary for the implicit profile
  74    """
  75    provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
  76    return {
  77        "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
  78    }
  79
  80
  81LEGACY_POSIX_PROFILE_NAME = "default"
  82RESERVED_POSIX_PROFILE_NAME = "__filesystem__"
  83DEFAULT_POSIX_PROFILE = create_implicit_profile_config(RESERVED_POSIX_PROFILE_NAME, "file", "/")
  84
  85STORAGE_PROVIDER_MAPPING = {
  86    "file": "PosixFileStorageProvider",
  87    "s3": "S3StorageProvider",
  88    "gcs": "GoogleStorageProvider",
  89    "oci": "OracleStorageProvider",
  90    "azure": "AzureBlobStorageProvider",
  91    "ais": "AIStoreStorageProvider",
  92    "ais_s3": "AIStoreS3StorageProvider",
  93    "s8k": "S8KStorageProvider",
  94    "gcs_s3": "GoogleS3StorageProvider",
  95    "huggingface": "HuggingFaceStorageProvider",
  96}
  97
  98CREDENTIALS_PROVIDER_MAPPING = {
  99    "S3Credentials": "StaticS3CredentialsProvider",
 100    "AzureCredentials": "StaticAzureCredentialsProvider",
 101    "AISCredentials": "StaticAISCredentialProvider",
 102    "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
 103    "GoogleServiceAccountCredentialsProvider": "GoogleServiceAccountCredentialsProvider",
 104    "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
 105    "FileBasedCredentials": "FileBasedCredentialsProvider",
 106}
 107
 108
 109def _resolve_include_path(include_path: str, parent_config_path: str) -> str:
 110    """
 111    Resolve include path (absolute or relative to parent config file).
 112
 113    :param include_path: Path from include keyword (can be absolute or relative)
 114    :param parent_config_path: Absolute path of the config file containing the include
 115    :return: Absolute, normalized path to the included config file
 116    """
 117    if os.path.isabs(include_path):
 118        return os.path.abspath(include_path)
 119    else:
 120        parent_dir = os.path.dirname(parent_config_path)
 121        return os.path.abspath(os.path.join(parent_dir, include_path))
 122
 123
 124def _merge_profiles(
 125    base_profiles: dict[str, Any],
 126    new_profiles: dict[str, Any],
 127    base_path: str,
 128    new_path: str,
 129) -> dict[str, Any]:
 130    """
 131    Merge profiles from two config files with conflict detection.
 132
 133    Profiles with the same name are allowed if their definitions are identical (idempotent).
 134    If a profile name exists in both configs with different definitions, an error is raised.
 135
 136    NOTE: This function performs SHALLOW merging - each profile is treated as a complete unit.
 137    We cannot use merge_dictionaries_no_overwrite() here because it would recursively merge
 138    profile fields, which could lead to unintended behavior. For example:
 139
 140    base_config:
 141        profiles:
 142            my-profile:
 143                storage_provider:
 144                    type: s3
 145                    options:
 146                        base_path: bucket
 147
 148    new_config:
 149        profiles:
 150            my-profile:
 151                credentials_provider:
 152                    type: S3Credentials
 153                    options:
 154                        access_key: foo
 155                        secret_key: bar
 156
 157    If we used merge_dictionaries_no_overwrite(), the result would be a profile with BOTH storage_provider
 158    and credentials_provider, which might not be the intended configuration.
 159
 160    :param base_profiles: Profiles from the base config
 161    :param new_profiles: Profiles from the new config to merge
 162    :param base_path: Path to the base config file (for error messages)
 163    :param new_path: Path to the new config file (for error messages)
 164    :return: Merged profiles dictionary
 165    :raises ValueError: If any profile name exists in both configs with different definitions
 166    """
 167    result = base_profiles.copy()
 168    conflicting_profiles = []
 169
 170    for profile_name, new_profile_def in new_profiles.items():
 171        if profile_name in result:
 172            if result[profile_name] != new_profile_def:
 173                conflicting_profiles.append(profile_name)
 174        else:
 175            result[profile_name] = new_profile_def
 176
 177    if conflicting_profiles:
 178        conflicts_list = ", ".join(f"'{p}'" for p in sorted(conflicting_profiles))
 179        raise ValueError(f"Profile conflict: {conflicts_list} defined differently in {base_path} and {new_path}")
 180
 181    return result
 182
 183
 184def _merge_opentelemetry(
 185    base_otel: dict[str, Any],
 186    new_otel: dict[str, Any],
 187    base_path: str,
 188    new_path: str,
 189) -> dict[str, Any]:
 190    """
 191    Merge opentelemetry configurations with special handling for metrics.attributes.
 192
 193    Merge strategy:
 194    - metrics.attributes: Concatenate arrays (preserve order for fallback mechanism)
 195    - Other fields: Use idempotent check (must be identical)
 196
 197    Attributes providers are concatenated to support fallback scenarios where multiple
 198    providers of the same type supply the same attribute key with different sources.
 199    The order matters - later providers override earlier ones.
 200
 201    :param base_otel: Base opentelemetry config
 202    :param new_otel: New opentelemetry config to merge
 203    :param base_path: Path to base config file (for error messages)
 204    :param new_path: Path to new config file (for error messages)
 205    :return: Merged opentelemetry configuration
 206    :raises ValueError: If conflicts are detected
 207    """
 208    # Merge metrics.attributes
 209    base_attrs = base_otel.get("metrics", {}).get("attributes", [])
 210    new_attrs = new_otel.get("metrics", {}).get("attributes", [])
 211    merged_attrs = base_attrs + new_attrs
 212
 213    # Merge other opentelemetry fields
 214    base_otel_without_attrs = copy.deepcopy(base_otel)
 215    if "metrics" in base_otel_without_attrs and "attributes" in base_otel_without_attrs["metrics"]:
 216        del base_otel_without_attrs["metrics"]["attributes"]
 217
 218    new_otel_without_attrs = copy.deepcopy(new_otel)
 219    if "metrics" in new_otel_without_attrs and "attributes" in new_otel_without_attrs["metrics"]:
 220        del new_otel_without_attrs["metrics"]["attributes"]
 221
 222    merged, conflicts = merge_dictionaries_no_overwrite(
 223        base_otel_without_attrs, new_otel_without_attrs, allow_idempotent=True
 224    )
 225
 226    if conflicts:
 227        conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
 228        raise ValueError(
 229            f"opentelemetry config conflict: {conflicts_list} defined differently in {base_path} and {new_path}"
 230        )
 231
 232    if "metrics" in merged:
 233        merged["metrics"]["attributes"] = merged_attrs
 234    elif merged_attrs:
 235        merged["metrics"] = {"attributes": merged_attrs}
 236
 237    return merged
 238
 239
 240def _merge_configs(
 241    base_config: dict[str, Any],
 242    new_config: dict[str, Any],
 243    base_path: str,
 244    new_path: str,
 245) -> dict[str, Any]:
 246    """
 247    Merge two config dictionaries with field-specific strategies.
 248
 249    Different config fields have different merge strategies:
 250    - profiles: Shallow merge with idempotent check (uses _merge_profiles)
 251    - path_mapping: Flat dict merge with idempotent check
 252    - experimental_features: Flat dict merge with idempotent check
 253    - opentelemetry: Hybrid merge (attributes concatenate, others idempotent)
 254    - cache, posix: Global configs, idempotent if identical, error if different
 255
 256    :param base_config: Base configuration dictionary
 257    :param new_config: New configuration to merge
 258    :param base_path: Path to base config file (for error messages)
 259    :param new_path: Path to new config file (for error messages)
 260    :return: Merged configuration dictionary
 261    :raises ValueError: If conflicts are detected
 262    """
 263    result = {}
 264
 265    all_keys = set(base_config.keys()) | set(new_config.keys())
 266
 267    for key in all_keys:
 268        base_value = base_config.get(key)
 269        new_value = new_config.get(key)
 270
 271        # Key only in base
 272        if key not in new_config:
 273            result[key] = base_value
 274            continue
 275
 276        # Key only in new
 277        if key not in base_config:
 278            result[key] = new_value
 279            continue
 280
 281        # Key in both - need to merge or detect conflict
 282        if key == "profiles":
 283            result["profiles"] = _merge_profiles(base_value or {}, new_value or {}, base_path, new_path)
 284
 285        elif key in ("path_mapping", "experimental_features"):
 286            merged, conflicts = merge_dictionaries_no_overwrite(
 287                (base_value or {}).copy(), new_value or {}, allow_idempotent=True
 288            )
 289            if conflicts:
 290                conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
 291                raise ValueError(
 292                    f"Config merge conflict: {conflicts_list} have different values in {base_path} and {new_path}"
 293                )
 294            result[key] = merged
 295
 296        elif key == "opentelemetry":
 297            result["opentelemetry"] = _merge_opentelemetry(base_value or {}, new_value or {}, base_path, new_path)
 298
 299        elif key in ("cache", "posix"):
 300            if base_value != new_value:
 301                raise ValueError(f"'{key}' defined differently in {base_path} and {new_path}")
 302            result[key] = base_value
 303
 304        elif key == "include":
 305            # 'include' is processed by _load_and_merge_includes, not part of final config
 306            pass
 307
 308        else:
 309            # This should never happen and all top level fields must have explicit handling above
 310            raise ValueError(f"Unknown field '{key}' in config file.")
 311
 312    return result
 313
 314
 315def _load_and_merge_includes(
 316    main_config_path: str,
 317    main_config_dict: dict[str, Any],
 318) -> dict[str, Any]:
 319    """
 320    Load and merge included config files.
 321
 322    Processes the 'include' directive in the main config, loading and merging all
 323    specified config files. Only supports one level of includes - included files
 324    cannot themselves have 'include' directives.
 325
 326    :param main_config_path: Absolute path to the main config file
 327    :param main_config_dict: Dictionary loaded from the main config file
 328    :return: Merged configuration dictionary (without 'include' field)
 329    :raises ValueError: If include file not found, malformed, or contains nested includes
 330    """
 331    include_paths = main_config_dict.get("include", [])
 332
 333    if not include_paths:
 334        return {k: v for k, v in main_config_dict.items() if k != "include"}
 335
 336    merged_config = {k: v for k, v in main_config_dict.items() if k != "include"}
 337
 338    for include_path in include_paths:
 339        resolved_path = _resolve_include_path(include_path, main_config_path)
 340
 341        if not os.path.exists(resolved_path):
 342            raise ValueError(f"Included config file not found: {resolved_path} (from {main_config_path})")
 343
 344        try:
 345            with open(resolved_path) as f:
 346                if resolved_path.endswith(".json"):
 347                    included_config = json.load(f)
 348                else:
 349                    included_config = yaml.safe_load(f)
 350        except Exception as e:
 351            raise ValueError(f"Failed to load included config {resolved_path}: {e}")
 352
 353        validate_config(included_config)
 354        if "include" in included_config:
 355            raise ValueError(f"Nested includes not allowed: {resolved_path} contains 'include' directive")
 356
 357        merged_config = _merge_configs(merged_config, included_config, main_config_path, resolved_path)
 358
 359    return merged_config
 360
 361
 362def _find_config_file_paths() -> tuple[str]:
 363    """
 364    Get configuration file search paths.
 365
 366    Returns paths in order of precedence:
 367
 368    1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
 369    2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
 370    """
 371    paths = []
 372
 373    # 1. User-specific configuration directory
 374    xdg_config_home = os.getenv("XDG_CONFIG_HOME")
 375
 376    if xdg_config_home:
 377        paths.extend(
 378            [
 379                os.path.join(xdg_config_home, "msc", "config.yaml"),
 380                os.path.join(xdg_config_home, "msc", "config.json"),
 381            ]
 382        )
 383
 384    user_home = os.getenv("HOME")
 385
 386    if user_home:
 387        paths.extend(
 388            [
 389                os.path.join(user_home, ".msc_config.yaml"),
 390                os.path.join(user_home, ".msc_config.json"),
 391                os.path.join(user_home, ".config", "msc", "config.yaml"),
 392                os.path.join(user_home, ".config", "msc", "config.json"),
 393            ]
 394        )
 395
 396    # 2. System-wide configuration directories
 397    xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
 398    if not xdg_config_dirs:
 399        xdg_config_dirs = "/etc/xdg"
 400
 401    for config_dir in xdg_config_dirs.split(":"):
 402        config_dir = config_dir.strip()
 403        if config_dir:
 404            paths.extend(
 405                [
 406                    os.path.join(config_dir, "msc", "config.yaml"),
 407                    os.path.join(config_dir, "msc", "config.json"),
 408                ]
 409            )
 410
 411    paths.extend(
 412        [
 413            "/etc/msc_config.yaml",
 414            "/etc/msc_config.json",
 415        ]
 416    )
 417
 418    return tuple(paths)
 419
 420
 421def _normalize_profile_name(profile: str, config_dict: dict[str, Any]) -> str:
 422    """
 423    Normalize the profile name to the reserved POSIX profile name if the legacy "default" POSIX profile is used.
 424
 425    :param profile: The profile name to normalize
 426    :param config_dict: The configuration dictionary
 427    :return: The normalized profile name
 428    """
 429    if profile == LEGACY_POSIX_PROFILE_NAME and profile not in config_dict.get("profiles", {}):
 430        logger.warning(
 431            f"The profile name '{LEGACY_POSIX_PROFILE_NAME}' is deprecated and will be removed in a future version. Please use '{RESERVED_POSIX_PROFILE_NAME}' instead."
 432        )
 433        return RESERVED_POSIX_PROFILE_NAME
 434    return profile
 435
 436
 437PACKAGE_NAME = "multistorageclient"
 438
 439logger = logging.getLogger(__name__)
 440
 441
 442class ImmutableDict(dict):
 443    """
 444    Immutable dictionary that raises an error when attempting to modify it.
 445    """
 446
 447    def __init__(self, *args, **kwargs):
 448        super().__init__(*args, **kwargs)
 449
 450        # Recursively freeze nested structures
 451        for key, value in list(super().items()):
 452            if isinstance(value, dict) and not isinstance(value, ImmutableDict):
 453                super().__setitem__(key, ImmutableDict(value))
 454            elif isinstance(value, list):
 455                super().__setitem__(key, self._freeze_list(value))
 456
 457    @staticmethod
 458    def _freeze_list(lst):
 459        """
 460        Convert list to tuple, freezing nested dicts recursively.
 461        """
 462        frozen = []
 463        for item in lst:
 464            if isinstance(item, dict):
 465                frozen.append(ImmutableDict(item))
 466            elif isinstance(item, list):
 467                frozen.append(ImmutableDict._freeze_list(item))
 468            else:
 469                frozen.append(item)
 470        return tuple(frozen)
 471
 472    def __deepcopy__(self, memo):
 473        """
 474        Return a regular mutable dict when deepcopy is called.
 475        """
 476        return copy.deepcopy(dict(self), memo)
 477
 478    def __reduce__(self):
 479        """
 480        Support for pickle serialization.
 481        """
 482        return (self.__class__, (dict(self),))
 483
 484    def _copy_value(self, value):
 485        """
 486        Convert frozen structures back to mutable equivalents.
 487        """
 488        if isinstance(value, ImmutableDict):
 489            return {k: self._copy_value(v) for k, v in value.items()}
 490        elif isinstance(value, tuple):
 491            # Check if it was originally a list (frozen by _freeze_list)
 492            return [self._copy_value(item) for item in value]
 493        else:
 494            return value
 495
 496    def __getitem__(self, key):
 497        """
 498        Return a mutable copy of the value.
 499        """
 500        value = super().__getitem__(key)
 501        return self._copy_value(value)
 502
 503    def get(self, key, default=None):
 504        """
 505        Return a mutable copy of the value.
 506        """
 507        return self[key] if key in self else default
 508
 509    def __setitem__(self, key, value):
 510        raise TypeError("ImmutableDict is immutable")
 511
 512    def __delitem__(self, key):
 513        raise TypeError("ImmutableDict is immutable")
 514
 515    def clear(self):
 516        raise TypeError("ImmutableDict is immutable")
 517
 518    def pop(self, *args):
 519        raise TypeError("ImmutableDict is immutable")
 520
 521    def popitem(self):
 522        raise TypeError("ImmutableDict is immutable")
 523
 524    def setdefault(self, key, default=None):
 525        raise TypeError("ImmutableDict is immutable")
 526
 527    def update(self, *args, **kwargs):
 528        raise TypeError("ImmutableDict is immutable")
 529
 530
 531class SimpleProviderBundle(ProviderBundle):
 532    def __init__(
 533        self,
 534        storage_provider_config: StorageProviderConfig,
 535        credentials_provider: Optional[CredentialsProvider] = None,
 536        metadata_provider: Optional[MetadataProvider] = None,
 537        replicas: Optional[list[Replica]] = None,
 538    ):
 539        if replicas is None:
 540            replicas = []
 541
 542        self._storage_provider_config = storage_provider_config
 543        self._credentials_provider = credentials_provider
 544        self._metadata_provider = metadata_provider
 545        self._replicas = replicas
 546
 547    @property
 548    def storage_provider_config(self) -> StorageProviderConfig:
 549        return self._storage_provider_config
 550
 551    @property
 552    def credentials_provider(self) -> Optional[CredentialsProvider]:
 553        return self._credentials_provider
 554
 555    @property
 556    def metadata_provider(self) -> Optional[MetadataProvider]:
 557        return self._metadata_provider
 558
 559    @property
 560    def replicas(self) -> list[Replica]:
 561        return self._replicas
 562
 563
 564class SimpleProviderBundleV2(ProviderBundleV2):
 565    def __init__(
 566        self, storage_backends: dict[str, StorageBackend], metadata_provider: Optional[MetadataProvider] = None
 567    ):
 568        self._storage_backends = storage_backends
 569        self._metadata_provider = metadata_provider
 570
 571    @staticmethod
 572    def from_v1_bundle(profile_name: str, v1_bundle: ProviderBundle) -> "SimpleProviderBundleV2":
 573        backend = StorageBackend(
 574            storage_provider_config=v1_bundle.storage_provider_config,
 575            credentials_provider=v1_bundle.credentials_provider,
 576            replicas=v1_bundle.replicas,
 577        )
 578
 579        return SimpleProviderBundleV2(
 580            storage_backends={profile_name: backend},
 581            metadata_provider=v1_bundle.metadata_provider,
 582        )
 583
 584    @property
 585    def storage_backends(self) -> dict[str, StorageBackend]:
 586        return self._storage_backends
 587
 588    @property
 589    def metadata_provider(self) -> Optional[MetadataProvider]:
 590        return self._metadata_provider
 591
 592
 593DEFAULT_CACHE_REFRESH_INTERVAL = 300
 594
 595
 596class StorageClientConfigLoader:
 597    _provider_bundle: ProviderBundleV2
 598    _resolved_config_dict: dict[str, Any]
 599    _profiles: dict[str, Any]
 600    _profile: str
 601    _profile_dict: dict[str, Any]
 602    _opentelemetry_dict: Optional[dict[str, Any]]
 603    _telemetry_provider: Optional[Callable[[], Telemetry]]
 604    _cache_dict: Optional[dict[str, Any]]
 605
 606    def __init__(
 607        self,
 608        config_dict: dict[str, Any],
 609        profile: str = RESERVED_POSIX_PROFILE_NAME,
 610        provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]] = None,
 611        telemetry_provider: Optional[Callable[[], Telemetry]] = None,
 612    ) -> None:
 613        """
 614        Initializes a :py:class:`StorageClientConfigLoader` to create a
 615        StorageClientConfig. Components are built using the ``config_dict`` and
 616        profile, but a pre-built provider_bundle takes precedence.
 617
 618        :param config_dict: Dictionary of configuration options.
 619        :param profile: Name of profile in ``config_dict`` to use to build configuration.
 620        :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle` or :py:class:`multistorageclient.types.ProviderBundleV2`, takes precedence over ``config_dict``.
 621        :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
 622        """
 623        # Interpolates all environment variables into actual values.
 624        config_dict = expand_env_vars(config_dict)
 625        self._resolved_config_dict = ImmutableDict(config_dict)
 626
 627        self._profiles = config_dict.get("profiles", {})
 628
 629        if RESERVED_POSIX_PROFILE_NAME not in self._profiles:
 630            # Assign the default POSIX profile
 631            self._profiles[RESERVED_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][RESERVED_POSIX_PROFILE_NAME]
 632        else:
 633            # Cannot override default POSIX profile
 634            if (
 635                self._profiles[RESERVED_POSIX_PROFILE_NAME]
 636                != DEFAULT_POSIX_PROFILE["profiles"][RESERVED_POSIX_PROFILE_NAME]
 637            ):
 638                raise ValueError(f'Cannot override "{RESERVED_POSIX_PROFILE_NAME}" profile with different settings.')
 639
 640        profile_dict = self._profiles.get(profile)
 641
 642        if not profile_dict:
 643            raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
 644
 645        self._profile = profile
 646        self._profile_dict = ImmutableDict(profile_dict)
 647
 648        self._opentelemetry_dict = config_dict.get("opentelemetry", None)
 649        # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
 650        # New processes (e.g. multiprocessing manager server) can't be created during this phase.
 651        #
 652        # Pass thunks everywhere instead for lazy telemetry initialization.
 653        self._telemetry_provider = telemetry_provider or telemetry_init
 654
 655        self._cache_dict = config_dict.get("cache", None)
 656        self._experimental_features = config_dict.get("experimental_features", None)
 657
 658        self._provider_bundle = self._build_provider_bundle(provider_bundle)
 659        self._inject_profiles_from_bundle()
 660
 661    def _build_storage_provider(
 662        self,
 663        storage_provider_name: str,
 664        storage_options: Optional[dict[str, Any]] = None,
 665        credentials_provider: Optional[CredentialsProvider] = None,
 666    ) -> StorageProvider:
 667        if storage_options is None:
 668            storage_options = {}
 669        if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
 670            raise ValueError(
 671                f"Storage provider {storage_provider_name} is not supported. "
 672                f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
 673            )
 674        if credentials_provider:
 675            storage_options["credentials_provider"] = credentials_provider
 676        if self._resolved_config_dict is not None:
 677            # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
 678            storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
 679        if self._telemetry_provider is not None:
 680            storage_options["telemetry_provider"] = self._telemetry_provider
 681        class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
 682        module_name = ".providers"
 683        cls = import_class(class_name, module_name, PACKAGE_NAME)
 684        return cls(**storage_options)
 685
 686    def _build_storage_provider_from_profile(self, storage_provider_profile: str):
 687        storage_profile_dict = self._profiles.get(storage_provider_profile)
 688        if not storage_profile_dict:
 689            raise ValueError(
 690                f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
 691            )
 692
 693        # Check if metadata provider is configured for this profile
 694        # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
 695        local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
 696        if local_metadata_provider_dict:
 697            raise ValueError(
 698                f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
 699            )
 700
 701        # Initialize CredentialsProvider
 702        local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
 703        local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
 704
 705        # Initialize StorageProvider
 706        local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
 707        if local_storage_provider_dict:
 708            local_name = local_storage_provider_dict["type"]
 709            local_storage_options = local_storage_provider_dict.get("options", {})
 710        else:
 711            raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
 712
 713        storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
 714        return storage_provider
 715
 716    def _build_credentials_provider(
 717        self,
 718        credentials_provider_dict: Optional[dict[str, Any]],
 719        storage_options: Optional[dict[str, Any]] = None,
 720    ) -> Optional[CredentialsProvider]:
 721        """
 722        Initializes the CredentialsProvider based on the provided dictionary.
 723
 724        Args:
 725            credentials_provider_dict: Dictionary containing credentials provider configuration
 726            storage_options: Storage provider options required by some credentials providers to scope the credentials.
 727        """
 728        if not credentials_provider_dict:
 729            return None
 730
 731        if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
 732            # Fully qualified class path case
 733            class_type = credentials_provider_dict["type"]
 734            module_name, class_name = class_type.rsplit(".", 1)
 735            cls = import_class(class_name, module_name)
 736        else:
 737            # Mapped class name case
 738            class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
 739            module_name = ".providers"
 740            cls = import_class(class_name, module_name, PACKAGE_NAME)
 741
 742        # Propagate storage provider options to credentials provider since they may be
 743        # required by some credentials providers to scope the credentials.
 744        import inspect
 745
 746        init_params = list(inspect.signature(cls.__init__).parameters)[1:]  # skip 'self'
 747        options = credentials_provider_dict.get("options", {})
 748        if storage_options:
 749            for storage_provider_option in storage_options.keys():
 750                if storage_provider_option in init_params and storage_provider_option not in options:
 751                    options[storage_provider_option] = storage_options[storage_provider_option]
 752
 753        return cls(**options)
 754
 755    def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
 756        # Initialize StorageProvider
 757        storage_provider_dict = profile_dict.get("storage_provider", None)
 758        if storage_provider_dict:
 759            storage_provider_name = storage_provider_dict["type"]
 760            storage_options = storage_provider_dict.get("options", {})
 761        else:
 762            raise ValueError("Missing storage_provider in the config.")
 763
 764        # Initialize CredentialsProvider
 765        # It is prudent to assume that in some cases, the credentials provider
 766        # will provide credentials scoped to specific base_path.
 767        # So we need to pass the storage_options to the credentials provider.
 768        credentials_provider_dict = profile_dict.get("credentials_provider", None)
 769        credentials_provider = self._build_credentials_provider(
 770            credentials_provider_dict=credentials_provider_dict,
 771            storage_options=storage_options,
 772        )
 773
 774        # Initialize MetadataProvider
 775        metadata_provider_dict = profile_dict.get("metadata_provider", None)
 776        metadata_provider = None
 777        if metadata_provider_dict:
 778            if metadata_provider_dict["type"] == "manifest":
 779                metadata_options = metadata_provider_dict.get("options", {})
 780                # If MetadataProvider has a reference to a different storage provider profile
 781                storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
 782                if storage_provider_profile:
 783                    storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
 784                else:
 785                    storage_provider = self._build_storage_provider(
 786                        storage_provider_name, storage_options, credentials_provider
 787                    )
 788
 789                metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
 790            else:
 791                class_type = metadata_provider_dict["type"]
 792                if "." not in class_type:
 793                    raise ValueError(
 794                        f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
 795                    )
 796                module_name, class_name = class_type.rsplit(".", 1)
 797                cls = import_class(class_name, module_name)
 798                options = metadata_provider_dict.get("options", {})
 799                metadata_provider = cls(**options)
 800
 801        # Build replicas if configured
 802        replicas_config = profile_dict.get("replicas", [])
 803        replicas = []
 804        if replicas_config:
 805            for replica_dict in replicas_config:
 806                replicas.append(
 807                    Replica(
 808                        replica_profile=replica_dict["replica_profile"],
 809                        read_priority=replica_dict["read_priority"],
 810                    )
 811                )
 812
 813            # Sort replicas by read_priority
 814            replicas.sort(key=lambda r: r.read_priority)
 815
 816        return SimpleProviderBundle(
 817            storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
 818            credentials_provider=credentials_provider,
 819            metadata_provider=metadata_provider,
 820            replicas=replicas,
 821        )
 822
 823    def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
 824        class_type = provider_bundle_dict["type"]
 825        module_name, class_name = class_type.rsplit(".", 1)
 826        cls = import_class(class_name, module_name)
 827        options = provider_bundle_dict.get("options", {})
 828        return cls(**options)
 829
 830    def _build_provider_bundle(
 831        self, provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]]
 832    ) -> ProviderBundleV2:
 833        if provider_bundle:
 834            bundle = provider_bundle
 835        else:
 836            provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
 837            if provider_bundle_dict:
 838                bundle = self._build_provider_bundle_from_extension(provider_bundle_dict)
 839            else:
 840                bundle = self._build_provider_bundle_from_config(self._profile_dict)
 841
 842        if isinstance(bundle, ProviderBundle) and not isinstance(bundle, ProviderBundleV2):
 843            bundle = SimpleProviderBundleV2.from_v1_bundle(self._profile, bundle)
 844
 845        return bundle
 846
 847    def _inject_profiles_from_bundle(self) -> None:
 848        """
 849        Inject child profiles and build child configs for multi-backend configurations.
 850
 851        For ProviderBundleV2 with multiple backends, this method:
 852        1. Injects child profiles into config dict (needed for replica initialization)
 853        2. Pre-builds child configs so CompositeStorageClient can use them directly
 854
 855        Profile injection is required because SingleStorageClient._initialize_replicas()
 856        uses StorageClientConfig.from_dict() to create replica clients, which looks up
 857        profiles in the config dict.
 858        """
 859        self._child_configs: Optional[dict[str, StorageClientConfig]] = None
 860
 861        backends = self._provider_bundle.storage_backends
 862        if len(backends) > 1:
 863            # First, inject all child profiles into config dict (needed for replica lookup)
 864            profiles = copy.deepcopy(self._profiles)
 865            for child_name, backend in backends.items():
 866                child_profile_dict = {
 867                    "storage_provider": {
 868                        "type": backend.storage_provider_config.type,
 869                        "options": backend.storage_provider_config.options,
 870                    }
 871                }
 872
 873                if child_name in profiles:
 874                    # Profile already exists - check if it matches what we would inject
 875                    existing = profiles[child_name]
 876                    if existing.get("storage_provider") != child_profile_dict["storage_provider"]:
 877                        raise ValueError(
 878                            f"Profile '{child_name}' already exists in configuration with different settings."
 879                        )
 880                else:
 881                    profiles[child_name] = child_profile_dict
 882
 883            # Update config dict BEFORE building child configs
 884            resolved_config_dict = copy.deepcopy(self._resolved_config_dict)
 885            resolved_config_dict["profiles"] = profiles
 886            self._profiles = ImmutableDict(profiles)
 887            self._resolved_config_dict = ImmutableDict(resolved_config_dict)
 888
 889            # Now build child configs (they will get the updated config dict)
 890            retry_config = self._build_retry_config()
 891            child_configs: dict[str, StorageClientConfig] = {}
 892            for child_name, backend in backends.items():
 893                storage_provider = self._build_storage_provider(
 894                    backend.storage_provider_config.type,
 895                    backend.storage_provider_config.options,
 896                    backend.credentials_provider,
 897                )
 898
 899                child_config = StorageClientConfig(
 900                    profile=child_name,
 901                    storage_provider=storage_provider,
 902                    credentials_provider=backend.credentials_provider,
 903                    storage_provider_profiles=None,
 904                    child_configs=None,
 905                    metadata_provider=None,
 906                    cache_config=None,
 907                    cache_manager=None,
 908                    retry_config=retry_config,
 909                    telemetry_provider=self._telemetry_provider,
 910                    replicas=backend.replicas,
 911                    autocommit_config=None,
 912                )
 913                child_config._config_dict = self._resolved_config_dict
 914                child_configs[child_name] = child_config
 915
 916            self._child_configs = child_configs
 917
 918    def _build_retry_config(self) -> RetryConfig:
 919        """Build retry config from profile dict."""
 920        retry_config_dict = self._profile_dict.get("retry", None)
 921        if retry_config_dict:
 922            attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
 923            delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
 924            backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
 925            return RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
 926        else:
 927            return RetryConfig(
 928                attempts=DEFAULT_RETRY_ATTEMPTS,
 929                delay=DEFAULT_RETRY_DELAY,
 930                backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
 931            )
 932
 933    def _build_autocommit_config(self) -> AutoCommitConfig:
 934        """Build autocommit config from profile dict."""
 935        autocommit_dict = self._profile_dict.get("autocommit", None)
 936        if autocommit_dict:
 937            interval_minutes = autocommit_dict.get("interval_minutes", None)
 938            at_exit = autocommit_dict.get("at_exit", False)
 939            return AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
 940        return AutoCommitConfig()
 941
 942    def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
 943        if "size_mb" in cache_dict:
 944            raise ValueError(
 945                "The 'size_mb' property is no longer supported. \n"
 946                "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
 947                "Example configuration:\n"
 948                "cache:\n"
 949                "  size: 500G                    # Optional: Maximum cache size (default: 10G)\n"
 950                "  cache_line_size: 64M          # Optional: Chunk size for partial file caching (default: 64M)\n"
 951                "  check_source_version: true    # Optional: Use ETag for cache validation (default: true)\n"
 952                "  location: /tmp/msc_cache      # Optional: Cache directory path (default: system tempdir + '/msc_cache')\n"
 953                "  eviction_policy:               # Optional: Cache eviction policy\n"
 954                "    policy: fifo                 # Optional: Policy type: lru, mru, fifo, random, no_eviction (default: fifo)\n"
 955                "    refresh_interval: 300        # Optional: Cache refresh interval in seconds (default: 300)\n"
 956            )
 957
 958        # Validate that cache_line_size doesn't exceed cache size
 959        cache_size_str = cache_dict.get("size", DEFAULT_CACHE_SIZE)
 960        cache_line_size_str = cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE)
 961
 962        # Use CacheConfig to convert sizes to bytes for comparison
 963        temp_cache_config = CacheConfig(
 964            size=cache_size_str,
 965            cache_line_size=cache_line_size_str,
 966        )
 967        cache_size_bytes = temp_cache_config.size_bytes()
 968        cache_line_size_bytes = temp_cache_config.cache_line_size_bytes()
 969
 970        if cache_line_size_bytes > cache_size_bytes:
 971            raise ValueError(
 972                f"cache_line_size ({cache_line_size_str}) exceeds cache size ({cache_size_str}). "
 973                f"cache_line_size must be less than or equal to cache size. "
 974                f"Consider increasing cache size or decreasing cache_line_size."
 975            )
 976
 977    def _validate_replicas(self, replicas: list[Replica]) -> None:
 978        """
 979        Validates that replica profiles do not have their own replicas configuration.
 980
 981        This prevents circular references where a replica profile could reference
 982        another profile that also has replicas, creating an infinite loop.
 983
 984        :param replicas: The list of Replica objects to validate
 985        :raises ValueError: If any replica profile has its own replicas configuration
 986        """
 987        for replica in replicas:
 988            replica_profile_name = replica.replica_profile
 989
 990            # Check that replica profile is not the same as the current profile
 991            if replica_profile_name == self._profile:
 992                raise ValueError(
 993                    f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
 994                )
 995
 996            # Check if the replica profile exists in the configuration
 997            if replica_profile_name not in self._profiles:
 998                raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
 999
1000            # Get the replica profile configuration
1001            replica_profile_dict = self._profiles[replica_profile_name]
1002
1003            # Check if the replica profile has its own replicas configuration
1004            if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
1005                raise ValueError(
1006                    f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
1007                    f"This creates a circular reference which is not allowed."
1008                )
1009
1010    # todo: remove once experimental features are stable
1011    def _validate_experimental_features(self, eviction_policy: EvictionPolicyConfig) -> None:
1012        """
1013        Validate that experimental features are enabled when used.
1014
1015        :param eviction_policy: The eviction policy configuration to validate
1016        :raises ValueError: If experimental features are used without being enabled
1017        """
1018        # Validate MRU eviction policy
1019        if eviction_policy.policy.upper() == "MRU":
1020            if not (self._experimental_features and self._experimental_features.get("cache_mru_eviction")):
1021                raise ValueError(
1022                    "MRU eviction policy is experimental and not enabled.\n"
1023                    "Enable it by adding to config:\n"
1024                    "  experimental_features:\n"
1025                    "    cache_mru_eviction: true"
1026                )
1027
1028        # Validate purge_factor
1029        if eviction_policy.purge_factor != 0:
1030            if eviction_policy.purge_factor < 0 or eviction_policy.purge_factor > 100:
1031                raise ValueError("purge_factor must be between 0 and 100")
1032
1033            if not (self._experimental_features and self._experimental_features.get("cache_purge_factor")):
1034                raise ValueError(
1035                    "purge_factor is experimental and not enabled.\n"
1036                    "Enable it by adding to config:\n"
1037                    "  experimental_features:\n"
1038                    "    cache_purge_factor: true"
1039                )
1040
1041    def build_config(self) -> "StorageClientConfig":
1042        bundle = self._provider_bundle
1043        backends = bundle.storage_backends
1044
1045        if len(backends) > 1:
1046            if not bundle.metadata_provider:
1047                raise ValueError(
1048                    f"Multi-backend configuration for profile '{self._profile}' requires metadata_provider "
1049                    "for routing between storage locations."
1050                )
1051
1052            child_profile_names = list(backends.keys())
1053            retry_config = self._build_retry_config()
1054            autocommit_config = self._build_autocommit_config()
1055
1056            # config for Composite StorageClient
1057            config = StorageClientConfig(
1058                profile=self._profile,
1059                storage_provider=None,
1060                credentials_provider=None,
1061                storage_provider_profiles=child_profile_names,
1062                child_configs=self._child_configs,
1063                metadata_provider=bundle.metadata_provider,
1064                cache_config=None,
1065                cache_manager=None,
1066                retry_config=retry_config,
1067                telemetry_provider=self._telemetry_provider,
1068                replicas=[],
1069                autocommit_config=autocommit_config,
1070            )
1071
1072            config._config_dict = self._resolved_config_dict
1073            return config
1074
1075        # Single-backend (len == 1)
1076        _, backend = list(backends.items())[0]
1077        storage_provider = self._build_storage_provider(
1078            backend.storage_provider_config.type,
1079            backend.storage_provider_config.options,
1080            backend.credentials_provider,
1081        )
1082        credentials_provider = backend.credentials_provider
1083        metadata_provider = bundle.metadata_provider
1084        replicas = backend.replicas
1085
1086        # Validate replicas to prevent circular references
1087        if replicas:
1088            self._validate_replicas(replicas)
1089
1090        cache_config: Optional[CacheConfig] = None
1091        cache_manager: Optional[CacheManager] = None
1092
1093        # Check if caching is enabled for this profile
1094        caching_enabled = self._profile_dict.get("caching_enabled", False)
1095
1096        if self._cache_dict is not None and caching_enabled:
1097            tempdir = tempfile.gettempdir()
1098            default_location = os.path.join(tempdir, "msc_cache")
1099            location = self._cache_dict.get("location", default_location)
1100
1101            # Check if cache_backend.cache_path is defined
1102            cache_backend = self._cache_dict.get("cache_backend", {})
1103            cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
1104
1105            # Warn if both location and cache_backend.cache_path are defined
1106            if cache_backend_path and self._cache_dict.get("location") is not None:
1107                logger.warning(
1108                    f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
1109                    f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
1110                )
1111            elif cache_backend_path:
1112                # Use cache_backend.cache_path only if location is not explicitly defined
1113                location = cache_backend_path
1114
1115            # Resolve the effective flag while preserving explicit ``False``.
1116            if "check_source_version" in self._cache_dict:
1117                check_source_version = self._cache_dict["check_source_version"]
1118            else:
1119                check_source_version = self._cache_dict.get("use_etag", True)
1120
1121            # Warn if both keys are specified – the new one wins.
1122            if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
1123                logger.warning(
1124                    "Both 'check_source_version' and 'use_etag' are defined in cache config. "
1125                    "Using 'check_source_version' and ignoring 'use_etag'."
1126                )
1127
1128            if not Path(location).is_absolute():
1129                raise ValueError(f"Cache location must be an absolute path: {location}")
1130
1131            # Initialize cache_dict with default values
1132            cache_dict = self._cache_dict
1133
1134            # Verify cache config
1135            self._verify_cache_config(cache_dict)
1136
1137            # Initialize eviction policy
1138            if "eviction_policy" in cache_dict:
1139                policy = cache_dict["eviction_policy"]["policy"].lower()
1140                purge_factor = cache_dict["eviction_policy"].get("purge_factor", 0)
1141                eviction_policy = EvictionPolicyConfig(
1142                    policy=policy,
1143                    refresh_interval=cache_dict["eviction_policy"].get(
1144                        "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
1145                    ),
1146                    purge_factor=purge_factor,
1147                )
1148            else:
1149                eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
1150
1151            # todo: remove once experimental features are stable
1152            # Validate experimental features before creating cache_config
1153            self._validate_experimental_features(eviction_policy)
1154
1155            # Create cache config from the standardized format
1156            cache_config = CacheConfig(
1157                size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
1158                location=cache_dict.get("location", location),
1159                check_source_version=check_source_version,
1160                eviction_policy=eviction_policy,
1161                cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
1162            )
1163
1164            cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
1165        elif self._cache_dict is not None and not caching_enabled:
1166            logger.debug(f"Caching is disabled for profile '{self._profile}'")
1167        elif self._cache_dict is None and caching_enabled:
1168            logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
1169
1170        retry_config = self._build_retry_config()
1171        autocommit_config = self._build_autocommit_config()
1172
1173        config = StorageClientConfig(
1174            profile=self._profile,
1175            storage_provider=storage_provider,
1176            credentials_provider=credentials_provider,
1177            storage_provider_profiles=None,
1178            child_configs=None,
1179            metadata_provider=metadata_provider,
1180            cache_config=cache_config,
1181            cache_manager=cache_manager,
1182            retry_config=retry_config,
1183            telemetry_provider=self._telemetry_provider,
1184            replicas=replicas,
1185            autocommit_config=autocommit_config,
1186        )
1187
1188        config._config_dict = self._resolved_config_dict
1189        return config
1190
1191
1192class PathMapping:
1193    """
1194    Class to handle path mappings defined in the MSC configuration.
1195
1196    Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1197    where entries are sorted by prefix length (longest first) for optimal matching.
1198    Longer paths take precedence when matching.
1199    """
1200
1201    def __init__(self):
1202        """Initialize an empty PathMapping."""
1203        self._mapping = defaultdict(lambda: defaultdict(list))
1204
1205    @classmethod
1206    def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
1207        """
1208        Create a PathMapping instance from configuration dictionary.
1209
1210        :param config_dict: Configuration dictionary, if None the config will be loaded
1211        :return: A PathMapping instance with processed mappings
1212        """
1213        if config_dict is None:
1214            # Import locally to avoid circular imports
1215            from multistorageclient.config import StorageClientConfig
1216
1217            config_dict, _ = StorageClientConfig.read_msc_config()
1218
1219        if not config_dict:
1220            return cls()
1221
1222        instance = cls()
1223        instance._load_mapping(config_dict)
1224        return instance
1225
1226    def _load_mapping(self, config_dict: dict[str, Any]) -> None:
1227        """
1228        Load path mapping from a configuration dictionary.
1229
1230        :param config_dict: Configuration dictionary containing path mapping
1231        """
1232        # Get the path_mapping section
1233        path_mapping = config_dict.get("path_mapping", {})
1234        if path_mapping is None:
1235            return
1236
1237        # Process each mapping
1238        for source_path, dest_path in path_mapping.items():
1239            # Validate format
1240            if not source_path.endswith("/"):
1241                continue
1242            if not dest_path.startswith(MSC_PROTOCOL):
1243                continue
1244            if not dest_path.endswith("/"):
1245                continue
1246
1247            # Extract the destination profile
1248            pr_dest = urlparse(dest_path)
1249            dest_profile = pr_dest.netloc
1250
1251            # Parse the source path
1252            pr = urlparse(source_path)
1253            protocol = pr.scheme.lower() if pr.scheme else "file"
1254
1255            if protocol == "file" or source_path.startswith("/"):
1256                # For file or absolute paths, use the whole path as the prefix
1257                # and leave bucket empty
1258                bucket = ""
1259                prefix = source_path if source_path.startswith("/") else pr.path
1260            else:
1261                # For object storage, extract bucket and prefix
1262                bucket = pr.netloc
1263                prefix = pr.path
1264                if prefix.startswith("/"):
1265                    prefix = prefix[1:]
1266
1267            # Add the mapping to the nested dict
1268            self._mapping[protocol][bucket].append((prefix, dest_profile))
1269
1270        # Sort each bucket's prefixes by length (longest first) for optimal matching
1271        for protocol, buckets in self._mapping.items():
1272            for bucket, prefixes in buckets.items():
1273                self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
1274
1275    def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
1276        """
1277        Find the best matching mapping for the given URL.
1278
1279        :param url: URL to find matching mapping for
1280        :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
1281        """
1282        # Parse the URL
1283        pr = urlparse(url)
1284        protocol = pr.scheme.lower() if pr.scheme else "file"
1285
1286        # For file paths or absolute paths
1287        if protocol == "file" or url.startswith("/"):
1288            path = url if url.startswith("/") else pr.path
1289
1290            possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
1291
1292            # Check each prefix (already sorted by length, longest first)
1293            for prefix, profile in possible_mapping:
1294                if path.startswith(prefix):
1295                    # Calculate the relative path
1296                    rel_path = path[len(prefix) :]
1297                    if not rel_path.startswith("/"):
1298                        rel_path = "/" + rel_path
1299                    return profile, rel_path
1300
1301            return None
1302
1303        # For object storage
1304        bucket = pr.netloc
1305        path = pr.path
1306        if path.startswith("/"):
1307            path = path[1:]
1308
1309        # Check bucket-specific mapping
1310        possible_mapping = (
1311            self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
1312        )
1313
1314        # Check each prefix (already sorted by length, longest first)
1315        for prefix, profile in possible_mapping:
1316            # matching prefix
1317            if path.startswith(prefix):
1318                rel_path = path[len(prefix) :]
1319                # Remove leading slash if present
1320                if rel_path.startswith("/"):
1321                    rel_path = rel_path[1:]
1322
1323                return profile, rel_path
1324
1325        return None
1326
1327
[docs] 1328class StorageClientConfig: 1329 """ 1330 Configuration class for the :py:class:`multistorageclient.StorageClient`. 1331 """ 1332 1333 profile: str 1334 storage_provider: Optional[StorageProvider] 1335 credentials_provider: Optional[CredentialsProvider] 1336 storage_provider_profiles: Optional[list[str]] 1337 child_configs: Optional[dict[str, "StorageClientConfig"]] 1338 metadata_provider: Optional[MetadataProvider] 1339 cache_config: Optional[CacheConfig] 1340 cache_manager: Optional[CacheManager] 1341 retry_config: Optional[RetryConfig] 1342 telemetry_provider: Optional[Callable[[], Telemetry]] 1343 replicas: list[Replica] 1344 autocommit_config: Optional[AutoCommitConfig] 1345 1346 _config_dict: Optional[dict[str, Any]] 1347 1348 def __init__( 1349 self, 1350 profile: str, 1351 storage_provider: Optional[StorageProvider] = None, 1352 credentials_provider: Optional[CredentialsProvider] = None, 1353 storage_provider_profiles: Optional[list[str]] = None, 1354 child_configs: Optional[dict[str, "StorageClientConfig"]] = None, 1355 metadata_provider: Optional[MetadataProvider] = None, 1356 cache_config: Optional[CacheConfig] = None, 1357 cache_manager: Optional[CacheManager] = None, 1358 retry_config: Optional[RetryConfig] = None, 1359 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1360 replicas: Optional[list[Replica]] = None, 1361 autocommit_config: Optional[AutoCommitConfig] = None, 1362 ): 1363 # exactly one of storage_provider or storage_provider_profiles must be set 1364 if storage_provider and storage_provider_profiles: 1365 raise ValueError( 1366 "Cannot specify both storage_provider and storage_provider_profiles. " 1367 "Use storage_provider for SingleStorageClient or storage_provider_profiles for CompositeStorageClient." 1368 ) 1369 if not storage_provider and not storage_provider_profiles: 1370 raise ValueError("Must specify either storage_provider or storage_provider_profiles.") 1371 1372 if replicas is None: 1373 replicas = [] 1374 self.profile = profile 1375 self.storage_provider = storage_provider 1376 self.credentials_provider = credentials_provider 1377 self.storage_provider_profiles = storage_provider_profiles 1378 self.child_configs = child_configs 1379 self.metadata_provider = metadata_provider 1380 self.cache_config = cache_config 1381 self.cache_manager = cache_manager 1382 self.retry_config = retry_config 1383 self.telemetry_provider = telemetry_provider 1384 self.replicas = replicas 1385 self.autocommit_config = autocommit_config 1386
[docs] 1387 @staticmethod 1388 def from_json( 1389 config_json: str, 1390 profile: str = RESERVED_POSIX_PROFILE_NAME, 1391 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1392 ) -> "StorageClientConfig": 1393 """ 1394 Load a storage client configuration from a JSON string. 1395 1396 :param config_json: Configuration JSON string. 1397 :param profile: Profile to use. 1398 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1399 """ 1400 config_dict = json.loads(config_json) 1401 return StorageClientConfig.from_dict( 1402 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 1403 )
1404
[docs] 1405 @staticmethod 1406 def from_yaml( 1407 config_yaml: str, 1408 profile: str = RESERVED_POSIX_PROFILE_NAME, 1409 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1410 ) -> "StorageClientConfig": 1411 """ 1412 Load a storage client configuration from a YAML string. 1413 1414 :param config_yaml: Configuration YAML string. 1415 :param profile: Profile to use. 1416 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1417 """ 1418 config_dict = yaml.safe_load(config_yaml) or {} 1419 return StorageClientConfig.from_dict( 1420 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 1421 )
1422
[docs] 1423 @staticmethod 1424 def from_dict( 1425 config_dict: dict[str, Any], 1426 profile: str = RESERVED_POSIX_PROFILE_NAME, 1427 skip_validation: bool = False, 1428 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1429 ) -> "StorageClientConfig": 1430 """ 1431 Load a storage client configuration from a Python dictionary. 1432 1433 :param config_dict: Configuration Python dictionary. 1434 :param profile: Profile to use. 1435 :param skip_validation: Skip configuration schema validation. 1436 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1437 """ 1438 # Validate the config file with predefined JSON schema 1439 if not skip_validation: 1440 validate_config(config_dict) 1441 1442 # Load config 1443 loader = StorageClientConfigLoader( 1444 config_dict=config_dict, 1445 profile=_normalize_profile_name(profile, config_dict), 1446 telemetry_provider=telemetry_provider, 1447 ) 1448 config = loader.build_config() 1449 1450 return config
1451
[docs] 1452 @staticmethod 1453 def from_file( 1454 config_file_paths: Optional[Iterable[str]] = None, 1455 profile: str = RESERVED_POSIX_PROFILE_NAME, 1456 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1457 ) -> "StorageClientConfig": 1458 """ 1459 Load a storage client configuration from the first file found. 1460 1461 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`). 1462 :param profile: Profile to use. 1463 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1464 """ 1465 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths) 1466 # Parse rclone config file. 1467 rclone_config_dict, rclone_config_file = read_rclone_config() 1468 1469 # Merge config files. 1470 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict) 1471 if conflicted_keys: 1472 raise ValueError( 1473 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}' 1474 ) 1475 merged_profiles = merged_config.get("profiles", {}) 1476 1477 # Check if profile is in merged_profiles 1478 if profile in merged_profiles: 1479 return StorageClientConfig.from_dict( 1480 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider 1481 ) 1482 else: 1483 # Check if profile is the default POSIX profile or an implicit profile 1484 if profile == RESERVED_POSIX_PROFILE_NAME or profile == LEGACY_POSIX_PROFILE_NAME: 1485 implicit_profile_config = DEFAULT_POSIX_PROFILE 1486 elif profile.startswith("_"): 1487 # Handle implicit profiles 1488 parts = profile[1:].split("-", 1) 1489 if len(parts) == 2: 1490 protocol, bucket = parts 1491 # Verify it's a supported protocol 1492 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 1493 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"') 1494 implicit_profile_config = create_implicit_profile_config( 1495 profile_name=profile, protocol=protocol, base_path=bucket 1496 ) 1497 else: 1498 raise ValueError(f'Invalid implicit profile format: "{profile}"') 1499 else: 1500 raise ValueError( 1501 f'Profile "{profile}" not found in configuration files. Configuration was checked in ' 1502 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. " 1503 f"Please verify that the profile exists and that configuration files are correctly located." 1504 ) 1505 # merge the implicit profile config into the merged config so the cache & observability config can be inherited 1506 if "profiles" not in merged_config: 1507 merged_config["profiles"] = implicit_profile_config["profiles"] 1508 else: 1509 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile] 1510 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_" 1511 return StorageClientConfig.from_dict( 1512 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider 1513 )
1514 1515 @staticmethod 1516 def from_provider_bundle( 1517 config_dict: dict[str, Any], 1518 provider_bundle: Union[ProviderBundle, ProviderBundleV2], 1519 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1520 ) -> "StorageClientConfig": 1521 loader = StorageClientConfigLoader( 1522 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider 1523 ) 1524 config = loader.build_config() 1525 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors 1526 return config 1527
[docs] 1528 @staticmethod 1529 def read_msc_config( 1530 config_file_paths: Optional[Iterable[str]] = None, 1531 ) -> tuple[Optional[dict[str, Any]], Optional[str]]: 1532 """Get the MSC configuration dictionary and the path of the first file found. 1533 1534 If no config paths are specified, configs are searched in the following order: 1535 1536 1. ``MSC_CONFIG`` environment variable (highest precedence) 1537 2. Standard search paths (user-specified config and system-wide config) 1538 1539 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used. 1540 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration 1541 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute 1542 path of the config file used, or ``None`` if no config file was found. 1543 """ 1544 config_dict: dict[str, Any] = {} 1545 config_file_path: Optional[str] = None 1546 1547 config_file_paths = list(config_file_paths or []) 1548 1549 # Add default paths if none provided. 1550 if len(config_file_paths) == 0: 1551 # Environment variable. 1552 msc_config_env = os.getenv("MSC_CONFIG", None) 1553 if msc_config_env is not None: 1554 config_file_paths.append(msc_config_env) 1555 1556 # Standard search paths. 1557 config_file_paths.extend(_find_config_file_paths()) 1558 1559 # Normalize + absolutize paths. 1560 config_file_paths = [os.path.abspath(path) for path in config_file_paths] 1561 1562 # Log plan. 1563 logger.debug(f"Searching MSC config file paths: {config_file_paths}") 1564 1565 # Load config. 1566 for path in config_file_paths: 1567 if os.path.exists(path): 1568 try: 1569 with open(path) as f: 1570 if path.endswith(".json"): 1571 config_dict = json.load(f) 1572 else: 1573 config_dict = yaml.safe_load(f) 1574 config_file_path = path 1575 # Use the first config file. 1576 break 1577 except Exception as e: 1578 raise ValueError(f"malformed MSC config file: {path}, exception: {e}") 1579 1580 # Log result. 1581 if config_file_path is None: 1582 logger.debug("No MSC config files found in any of the search locations.") 1583 else: 1584 logger.debug(f"Using MSC config file: {config_file_path}") 1585 1586 if config_dict: 1587 validate_config(config_dict) 1588 1589 if "include" in config_dict and config_file_path: 1590 config_dict = _load_and_merge_includes(config_file_path, config_dict) 1591 1592 return config_dict, config_file_path
1593
[docs] 1594 @staticmethod 1595 def read_path_mapping() -> PathMapping: 1596 """ 1597 Get the path mapping defined in the MSC configuration. 1598 1599 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)] 1600 where entries are sorted by prefix length (longest first) for optimal matching. 1601 Longer paths take precedence when matching. 1602 1603 :return: A PathMapping instance with translation mappings 1604 """ 1605 try: 1606 return PathMapping.from_config() 1607 except Exception: 1608 # Log the error but continue - this shouldn't stop the application from working 1609 logger.error("Failed to load path_mapping from MSC config") 1610 return PathMapping()
1611 1612 def __getstate__(self) -> dict[str, Any]: 1613 state = self.__dict__.copy() 1614 if not state.get("_config_dict"): 1615 raise ValueError("StorageClientConfig is not serializable") 1616 del state["credentials_provider"] 1617 del state["storage_provider"] 1618 del state["metadata_provider"] 1619 del state["cache_manager"] 1620 del state["replicas"] 1621 del state["child_configs"] 1622 return state 1623 1624 def __setstate__(self, state: dict[str, Any]) -> None: 1625 # Presence checked by __getstate__. 1626 config_dict = state["_config_dict"] 1627 loader = StorageClientConfigLoader( 1628 config_dict=config_dict, 1629 profile=state["profile"], 1630 telemetry_provider=state["telemetry_provider"], 1631 ) 1632 new_config = loader.build_config() 1633 self.profile = new_config.profile 1634 self.storage_provider = new_config.storage_provider 1635 self.credentials_provider = new_config.credentials_provider 1636 self.storage_provider_profiles = new_config.storage_provider_profiles 1637 self.child_configs = new_config.child_configs 1638 self.metadata_provider = new_config.metadata_provider 1639 self.cache_config = new_config.cache_config 1640 self.cache_manager = new_config.cache_manager 1641 self.retry_config = new_config.retry_config 1642 self.telemetry_provider = new_config.telemetry_provider 1643 self._config_dict = config_dict 1644 self.replicas = new_config.replicas 1645 self.autocommit_config = new_config.autocommit_config