Source code for multistorageclient.config

   1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
   2# SPDX-License-Identifier: Apache-2.0
   3#
   4# Licensed under the Apache License, Version 2.0 (the "License");
   5# you may not use this file except in compliance with the License.
   6# You may obtain a copy of the License at
   7#
   8# http://www.apache.org/licenses/LICENSE-2.0
   9#
  10# Unless required by applicable law or agreed to in writing, software
  11# distributed under the License is distributed on an "AS IS" BASIS,
  12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13# See the License for the specific language governing permissions and
  14# limitations under the License.
  15
  16import copy
  17import json
  18import logging
  19import os
  20import tempfile
  21from collections import defaultdict
  22from collections.abc import Callable
  23from pathlib import Path
  24from typing import Any, Iterable, Optional, Union
  25from urllib.parse import urlparse
  26
  27import yaml
  28
  29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
  30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
  31from .providers.manifest_metadata import ManifestMetadataProvider
  32from .rclone import read_rclone_config
  33from .schema import validate_config
  34from .telemetry import Telemetry
  35from .telemetry import init as telemetry_init
  36from .types import (
  37    DEFAULT_RETRY_ATTEMPTS,
  38    DEFAULT_RETRY_BACKOFF_MULTIPLIER,
  39    DEFAULT_RETRY_DELAY,
  40    MSC_PROTOCOL,
  41    AutoCommitConfig,
  42    CredentialsProvider,
  43    MetadataProvider,
  44    ProviderBundle,
  45    ProviderBundleV2,
  46    Replica,
  47    RetryConfig,
  48    StorageBackend,
  49    StorageProvider,
  50    StorageProviderConfig,
  51)
  52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
  53
  54# Constants related to implicit profiles
  55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
  56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
  57    "s3": "s3",
  58    "gs": "gcs",
  59    "ais": "ais",
  60    "file": "file",
  61}
  62
  63
  64# Template for creating implicit profile configurations
  65def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
  66    """
  67    Create a configuration dictionary for an implicit profile.
  68
  69    :param profile_name: The name of the profile (e.g., "_s3-bucket1")
  70    :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
  71    :param base_path: The base path (e.g., bucket name) for the storage provider
  72
  73    :return: A configuration dictionary for the implicit profile
  74    """
  75    provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
  76    return {
  77        "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
  78    }
  79
  80
  81DEFAULT_POSIX_PROFILE_NAME = "default"
  82DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
  83
  84STORAGE_PROVIDER_MAPPING = {
  85    "file": "PosixFileStorageProvider",
  86    "s3": "S3StorageProvider",
  87    "gcs": "GoogleStorageProvider",
  88    "oci": "OracleStorageProvider",
  89    "azure": "AzureBlobStorageProvider",
  90    "ais": "AIStoreStorageProvider",
  91    "ais_s3": "AIStoreS3StorageProvider",
  92    "s8k": "S8KStorageProvider",
  93    "gcs_s3": "GoogleS3StorageProvider",
  94    "huggingface": "HuggingFaceStorageProvider",
  95}
  96
  97CREDENTIALS_PROVIDER_MAPPING = {
  98    "S3Credentials": "StaticS3CredentialsProvider",
  99    "AzureCredentials": "StaticAzureCredentialsProvider",
 100    "AISCredentials": "StaticAISCredentialProvider",
 101    "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
 102    "GoogleServiceAccountCredentialsProvider": "GoogleServiceAccountCredentialsProvider",
 103    "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
 104    "FileBasedCredentials": "FileBasedCredentialsProvider",
 105}
 106
 107
 108def _resolve_include_path(include_path: str, parent_config_path: str) -> str:
 109    """
 110    Resolve include path (absolute or relative to parent config file).
 111
 112    :param include_path: Path from include keyword (can be absolute or relative)
 113    :param parent_config_path: Absolute path of the config file containing the include
 114    :return: Absolute, normalized path to the included config file
 115    """
 116    if os.path.isabs(include_path):
 117        return os.path.abspath(include_path)
 118    else:
 119        parent_dir = os.path.dirname(parent_config_path)
 120        return os.path.abspath(os.path.join(parent_dir, include_path))
 121
 122
 123def _merge_profiles(
 124    base_profiles: dict[str, Any],
 125    new_profiles: dict[str, Any],
 126    base_path: str,
 127    new_path: str,
 128) -> dict[str, Any]:
 129    """
 130    Merge profiles from two config files with conflict detection.
 131
 132    Profiles with the same name are allowed if their definitions are identical (idempotent).
 133    If a profile name exists in both configs with different definitions, an error is raised.
 134
 135    NOTE: This function performs SHALLOW merging - each profile is treated as a complete unit.
 136    We cannot use merge_dictionaries_no_overwrite() here because it would recursively merge
 137    profile fields, which could lead to unintended behavior. For example:
 138
 139    base_config:
 140        profiles:
 141            my-profile:
 142                storage_provider:
 143                    type: s3
 144                    options:
 145                        base_path: bucket
 146
 147    new_config:
 148        profiles:
 149            my-profile:
 150                credentials_provider:
 151                    type: S3Credentials
 152                    options:
 153                        access_key: foo
 154                        secret_key: bar
 155
 156    If we used merge_dictionaries_no_overwrite(), the result would be a profile with BOTH storage_provider
 157    and credentials_provider, which might not be the intended configuration.
 158
 159    :param base_profiles: Profiles from the base config
 160    :param new_profiles: Profiles from the new config to merge
 161    :param base_path: Path to the base config file (for error messages)
 162    :param new_path: Path to the new config file (for error messages)
 163    :return: Merged profiles dictionary
 164    :raises ValueError: If any profile name exists in both configs with different definitions
 165    """
 166    result = base_profiles.copy()
 167    conflicting_profiles = []
 168
 169    for profile_name, new_profile_def in new_profiles.items():
 170        if profile_name in result:
 171            if result[profile_name] != new_profile_def:
 172                conflicting_profiles.append(profile_name)
 173        else:
 174            result[profile_name] = new_profile_def
 175
 176    if conflicting_profiles:
 177        conflicts_list = ", ".join(f"'{p}'" for p in sorted(conflicting_profiles))
 178        raise ValueError(f"Profile conflict: {conflicts_list} defined differently in {base_path} and {new_path}")
 179
 180    return result
 181
 182
 183def _merge_opentelemetry(
 184    base_otel: dict[str, Any],
 185    new_otel: dict[str, Any],
 186    base_path: str,
 187    new_path: str,
 188) -> dict[str, Any]:
 189    """
 190    Merge opentelemetry configurations with special handling for metrics.attributes.
 191
 192    Merge strategy:
 193    - metrics.attributes: Concatenate arrays (preserve order for fallback mechanism)
 194    - Other fields: Use idempotent check (must be identical)
 195
 196    Attributes providers are concatenated to support fallback scenarios where multiple
 197    providers of the same type supply the same attribute key with different sources.
 198    The order matters - later providers override earlier ones.
 199
 200    :param base_otel: Base opentelemetry config
 201    :param new_otel: New opentelemetry config to merge
 202    :param base_path: Path to base config file (for error messages)
 203    :param new_path: Path to new config file (for error messages)
 204    :return: Merged opentelemetry configuration
 205    :raises ValueError: If conflicts are detected
 206    """
 207    # Merge metrics.attributes
 208    base_attrs = base_otel.get("metrics", {}).get("attributes", [])
 209    new_attrs = new_otel.get("metrics", {}).get("attributes", [])
 210    merged_attrs = base_attrs + new_attrs
 211
 212    # Merge other opentelemetry fields
 213    base_otel_without_attrs = copy.deepcopy(base_otel)
 214    if "metrics" in base_otel_without_attrs and "attributes" in base_otel_without_attrs["metrics"]:
 215        del base_otel_without_attrs["metrics"]["attributes"]
 216
 217    new_otel_without_attrs = copy.deepcopy(new_otel)
 218    if "metrics" in new_otel_without_attrs and "attributes" in new_otel_without_attrs["metrics"]:
 219        del new_otel_without_attrs["metrics"]["attributes"]
 220
 221    merged, conflicts = merge_dictionaries_no_overwrite(
 222        base_otel_without_attrs, new_otel_without_attrs, allow_idempotent=True
 223    )
 224
 225    if conflicts:
 226        conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
 227        raise ValueError(
 228            f"opentelemetry config conflict: {conflicts_list} defined differently in {base_path} and {new_path}"
 229        )
 230
 231    if "metrics" in merged:
 232        merged["metrics"]["attributes"] = merged_attrs
 233    elif merged_attrs:
 234        merged["metrics"] = {"attributes": merged_attrs}
 235
 236    return merged
 237
 238
 239def _merge_configs(
 240    base_config: dict[str, Any],
 241    new_config: dict[str, Any],
 242    base_path: str,
 243    new_path: str,
 244) -> dict[str, Any]:
 245    """
 246    Merge two config dictionaries with field-specific strategies.
 247
 248    Different config fields have different merge strategies:
 249    - profiles: Shallow merge with idempotent check (uses _merge_profiles)
 250    - path_mapping: Flat dict merge with idempotent check
 251    - experimental_features: Flat dict merge with idempotent check
 252    - opentelemetry: Hybrid merge (attributes concatenate, others idempotent)
 253    - cache, posix: Global configs, idempotent if identical, error if different
 254
 255    :param base_config: Base configuration dictionary
 256    :param new_config: New configuration to merge
 257    :param base_path: Path to base config file (for error messages)
 258    :param new_path: Path to new config file (for error messages)
 259    :return: Merged configuration dictionary
 260    :raises ValueError: If conflicts are detected
 261    """
 262    result = {}
 263
 264    all_keys = set(base_config.keys()) | set(new_config.keys())
 265
 266    for key in all_keys:
 267        base_value = base_config.get(key)
 268        new_value = new_config.get(key)
 269
 270        # Key only in base
 271        if key not in new_config:
 272            result[key] = base_value
 273            continue
 274
 275        # Key only in new
 276        if key not in base_config:
 277            result[key] = new_value
 278            continue
 279
 280        # Key in both - need to merge or detect conflict
 281        if key == "profiles":
 282            result["profiles"] = _merge_profiles(base_value or {}, new_value or {}, base_path, new_path)
 283
 284        elif key in ("path_mapping", "experimental_features"):
 285            merged, conflicts = merge_dictionaries_no_overwrite(
 286                (base_value or {}).copy(), new_value or {}, allow_idempotent=True
 287            )
 288            if conflicts:
 289                conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
 290                raise ValueError(
 291                    f"Config merge conflict: {conflicts_list} have different values in {base_path} and {new_path}"
 292                )
 293            result[key] = merged
 294
 295        elif key == "opentelemetry":
 296            result["opentelemetry"] = _merge_opentelemetry(base_value or {}, new_value or {}, base_path, new_path)
 297
 298        elif key in ("cache", "posix"):
 299            if base_value != new_value:
 300                raise ValueError(f"'{key}' defined differently in {base_path} and {new_path}")
 301            result[key] = base_value
 302
 303        elif key == "include":
 304            # 'include' is processed by _load_and_merge_includes, not part of final config
 305            pass
 306
 307        else:
 308            # This should never happen and all top level fields must have explicit handling above
 309            raise ValueError(f"Unknown field '{key}' in config file.")
 310
 311    return result
 312
 313
 314def _load_and_merge_includes(
 315    main_config_path: str,
 316    main_config_dict: dict[str, Any],
 317) -> dict[str, Any]:
 318    """
 319    Load and merge included config files.
 320
 321    Processes the 'include' directive in the main config, loading and merging all
 322    specified config files. Only supports one level of includes - included files
 323    cannot themselves have 'include' directives.
 324
 325    :param main_config_path: Absolute path to the main config file
 326    :param main_config_dict: Dictionary loaded from the main config file
 327    :return: Merged configuration dictionary (without 'include' field)
 328    :raises ValueError: If include file not found, malformed, or contains nested includes
 329    """
 330    include_paths = main_config_dict.get("include", [])
 331
 332    if not include_paths:
 333        return {k: v for k, v in main_config_dict.items() if k != "include"}
 334
 335    merged_config = {k: v for k, v in main_config_dict.items() if k != "include"}
 336
 337    for include_path in include_paths:
 338        resolved_path = _resolve_include_path(include_path, main_config_path)
 339
 340        if not os.path.exists(resolved_path):
 341            raise ValueError(f"Included config file not found: {resolved_path} (from {main_config_path})")
 342
 343        try:
 344            with open(resolved_path) as f:
 345                if resolved_path.endswith(".json"):
 346                    included_config = json.load(f)
 347                else:
 348                    included_config = yaml.safe_load(f)
 349        except Exception as e:
 350            raise ValueError(f"Failed to load included config {resolved_path}: {e}")
 351
 352        validate_config(included_config)
 353        if "include" in included_config:
 354            raise ValueError(f"Nested includes not allowed: {resolved_path} contains 'include' directive")
 355
 356        merged_config = _merge_configs(merged_config, included_config, main_config_path, resolved_path)
 357
 358    return merged_config
 359
 360
 361def _find_config_file_paths() -> tuple[str]:
 362    """
 363    Get configuration file search paths.
 364
 365    Returns paths in order of precedence:
 366
 367    1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
 368    2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
 369    """
 370    paths = []
 371
 372    # 1. User-specific configuration directory
 373    xdg_config_home = os.getenv("XDG_CONFIG_HOME")
 374
 375    if xdg_config_home:
 376        paths.extend(
 377            [
 378                os.path.join(xdg_config_home, "msc", "config.yaml"),
 379                os.path.join(xdg_config_home, "msc", "config.json"),
 380            ]
 381        )
 382
 383    user_home = os.getenv("HOME")
 384
 385    if user_home:
 386        paths.extend(
 387            [
 388                os.path.join(user_home, ".msc_config.yaml"),
 389                os.path.join(user_home, ".msc_config.json"),
 390                os.path.join(user_home, ".config", "msc", "config.yaml"),
 391                os.path.join(user_home, ".config", "msc", "config.json"),
 392            ]
 393        )
 394
 395    # 2. System-wide configuration directories
 396    xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
 397    if not xdg_config_dirs:
 398        xdg_config_dirs = "/etc/xdg"
 399
 400    for config_dir in xdg_config_dirs.split(":"):
 401        config_dir = config_dir.strip()
 402        if config_dir:
 403            paths.extend(
 404                [
 405                    os.path.join(config_dir, "msc", "config.yaml"),
 406                    os.path.join(config_dir, "msc", "config.json"),
 407                ]
 408            )
 409
 410    paths.extend(
 411        [
 412            "/etc/msc_config.yaml",
 413            "/etc/msc_config.json",
 414        ]
 415    )
 416
 417    return tuple(paths)
 418
 419
 420PACKAGE_NAME = "multistorageclient"
 421
 422logger = logging.getLogger(__name__)
 423
 424
 425class ImmutableDict(dict):
 426    """
 427    Immutable dictionary that raises an error when attempting to modify it.
 428    """
 429
 430    def __init__(self, *args, **kwargs):
 431        super().__init__(*args, **kwargs)
 432
 433        # Recursively freeze nested structures
 434        for key, value in list(super().items()):
 435            if isinstance(value, dict) and not isinstance(value, ImmutableDict):
 436                super().__setitem__(key, ImmutableDict(value))
 437            elif isinstance(value, list):
 438                super().__setitem__(key, self._freeze_list(value))
 439
 440    @staticmethod
 441    def _freeze_list(lst):
 442        """
 443        Convert list to tuple, freezing nested dicts recursively.
 444        """
 445        frozen = []
 446        for item in lst:
 447            if isinstance(item, dict):
 448                frozen.append(ImmutableDict(item))
 449            elif isinstance(item, list):
 450                frozen.append(ImmutableDict._freeze_list(item))
 451            else:
 452                frozen.append(item)
 453        return tuple(frozen)
 454
 455    def __deepcopy__(self, memo):
 456        """
 457        Return a regular mutable dict when deepcopy is called.
 458        """
 459        return copy.deepcopy(dict(self), memo)
 460
 461    def __reduce__(self):
 462        """
 463        Support for pickle serialization.
 464        """
 465        return (self.__class__, (dict(self),))
 466
 467    def _copy_value(self, value):
 468        """
 469        Convert frozen structures back to mutable equivalents.
 470        """
 471        if isinstance(value, ImmutableDict):
 472            return {k: self._copy_value(v) for k, v in value.items()}
 473        elif isinstance(value, tuple):
 474            # Check if it was originally a list (frozen by _freeze_list)
 475            return [self._copy_value(item) for item in value]
 476        else:
 477            return value
 478
 479    def __getitem__(self, key):
 480        """
 481        Return a mutable copy of the value.
 482        """
 483        value = super().__getitem__(key)
 484        return self._copy_value(value)
 485
 486    def get(self, key, default=None):
 487        """
 488        Return a mutable copy of the value.
 489        """
 490        return self[key] if key in self else default
 491
 492    def __setitem__(self, key, value):
 493        raise TypeError("ImmutableDict is immutable")
 494
 495    def __delitem__(self, key):
 496        raise TypeError("ImmutableDict is immutable")
 497
 498    def clear(self):
 499        raise TypeError("ImmutableDict is immutable")
 500
 501    def pop(self, *args):
 502        raise TypeError("ImmutableDict is immutable")
 503
 504    def popitem(self):
 505        raise TypeError("ImmutableDict is immutable")
 506
 507    def setdefault(self, key, default=None):
 508        raise TypeError("ImmutableDict is immutable")
 509
 510    def update(self, *args, **kwargs):
 511        raise TypeError("ImmutableDict is immutable")
 512
 513
 514class SimpleProviderBundle(ProviderBundle):
 515    def __init__(
 516        self,
 517        storage_provider_config: StorageProviderConfig,
 518        credentials_provider: Optional[CredentialsProvider] = None,
 519        metadata_provider: Optional[MetadataProvider] = None,
 520        replicas: Optional[list[Replica]] = None,
 521    ):
 522        if replicas is None:
 523            replicas = []
 524
 525        self._storage_provider_config = storage_provider_config
 526        self._credentials_provider = credentials_provider
 527        self._metadata_provider = metadata_provider
 528        self._replicas = replicas
 529
 530    @property
 531    def storage_provider_config(self) -> StorageProviderConfig:
 532        return self._storage_provider_config
 533
 534    @property
 535    def credentials_provider(self) -> Optional[CredentialsProvider]:
 536        return self._credentials_provider
 537
 538    @property
 539    def metadata_provider(self) -> Optional[MetadataProvider]:
 540        return self._metadata_provider
 541
 542    @property
 543    def replicas(self) -> list[Replica]:
 544        return self._replicas
 545
 546
 547class SimpleProviderBundleV2(ProviderBundleV2):
 548    def __init__(
 549        self, storage_backends: dict[str, StorageBackend], metadata_provider: Optional[MetadataProvider] = None
 550    ):
 551        self._storage_backends = storage_backends
 552        self._metadata_provider = metadata_provider
 553
 554    @staticmethod
 555    def from_v1_bundle(profile_name: str, v1_bundle: ProviderBundle) -> "SimpleProviderBundleV2":
 556        backend = StorageBackend(
 557            storage_provider_config=v1_bundle.storage_provider_config,
 558            credentials_provider=v1_bundle.credentials_provider,
 559            replicas=v1_bundle.replicas,
 560        )
 561
 562        return SimpleProviderBundleV2(
 563            storage_backends={profile_name: backend},
 564            metadata_provider=v1_bundle.metadata_provider,
 565        )
 566
 567    @property
 568    def storage_backends(self) -> dict[str, StorageBackend]:
 569        return self._storage_backends
 570
 571    @property
 572    def metadata_provider(self) -> Optional[MetadataProvider]:
 573        return self._metadata_provider
 574
 575
 576DEFAULT_CACHE_REFRESH_INTERVAL = 300
 577
 578
 579class StorageClientConfigLoader:
 580    _provider_bundle: ProviderBundleV2
 581    _resolved_config_dict: dict[str, Any]
 582    _profiles: dict[str, Any]
 583    _profile: str
 584    _profile_dict: dict[str, Any]
 585    _opentelemetry_dict: Optional[dict[str, Any]]
 586    _telemetry_provider: Optional[Callable[[], Telemetry]]
 587    _cache_dict: Optional[dict[str, Any]]
 588
 589    def __init__(
 590        self,
 591        config_dict: dict[str, Any],
 592        profile: str = DEFAULT_POSIX_PROFILE_NAME,
 593        provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]] = None,
 594        telemetry_provider: Optional[Callable[[], Telemetry]] = None,
 595    ) -> None:
 596        """
 597        Initializes a :py:class:`StorageClientConfigLoader` to create a
 598        StorageClientConfig. Components are built using the ``config_dict`` and
 599        profile, but a pre-built provider_bundle takes precedence.
 600
 601        :param config_dict: Dictionary of configuration options.
 602        :param profile: Name of profile in ``config_dict`` to use to build configuration.
 603        :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle` or :py:class:`multistorageclient.types.ProviderBundleV2`, takes precedence over ``config_dict``.
 604        :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
 605        """
 606        # Interpolates all environment variables into actual values.
 607        config_dict = expand_env_vars(config_dict)
 608        self._resolved_config_dict = ImmutableDict(config_dict)
 609
 610        self._profiles = config_dict.get("profiles", {})
 611
 612        if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
 613            # Assign the default POSIX profile
 614            self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
 615        else:
 616            # Cannot override default POSIX profile
 617            storage_provider_type = (
 618                self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
 619            )
 620            if storage_provider_type != "file":
 621                raise ValueError(
 622                    f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
 623                    f'"{storage_provider_type}"; expected "file".'
 624                )
 625
 626        profile_dict = self._profiles.get(profile)
 627
 628        if not profile_dict:
 629            raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
 630
 631        self._profile = profile
 632        self._profile_dict = ImmutableDict(profile_dict)
 633
 634        self._opentelemetry_dict = config_dict.get("opentelemetry", None)
 635        # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
 636        # New processes (e.g. multiprocessing manager server) can't be created during this phase.
 637        #
 638        # Pass thunks everywhere instead for lazy telemetry initialization.
 639        self._telemetry_provider = telemetry_provider or telemetry_init
 640
 641        self._cache_dict = config_dict.get("cache", None)
 642        self._experimental_features = config_dict.get("experimental_features", None)
 643
 644        self._provider_bundle = self._build_provider_bundle(provider_bundle)
 645        self._inject_profiles_from_bundle()
 646
 647    def _build_storage_provider(
 648        self,
 649        storage_provider_name: str,
 650        storage_options: Optional[dict[str, Any]] = None,
 651        credentials_provider: Optional[CredentialsProvider] = None,
 652    ) -> StorageProvider:
 653        if storage_options is None:
 654            storage_options = {}
 655        if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
 656            raise ValueError(
 657                f"Storage provider {storage_provider_name} is not supported. "
 658                f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
 659            )
 660        if credentials_provider:
 661            storage_options["credentials_provider"] = credentials_provider
 662        if self._resolved_config_dict is not None:
 663            # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
 664            storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
 665        if self._telemetry_provider is not None:
 666            storage_options["telemetry_provider"] = self._telemetry_provider
 667        class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
 668        module_name = ".providers"
 669        cls = import_class(class_name, module_name, PACKAGE_NAME)
 670        return cls(**storage_options)
 671
 672    def _build_storage_provider_from_profile(self, storage_provider_profile: str):
 673        storage_profile_dict = self._profiles.get(storage_provider_profile)
 674        if not storage_profile_dict:
 675            raise ValueError(
 676                f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
 677            )
 678
 679        # Check if metadata provider is configured for this profile
 680        # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
 681        local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
 682        if local_metadata_provider_dict:
 683            raise ValueError(
 684                f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
 685            )
 686
 687        # Initialize CredentialsProvider
 688        local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
 689        local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
 690
 691        # Initialize StorageProvider
 692        local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
 693        if local_storage_provider_dict:
 694            local_name = local_storage_provider_dict["type"]
 695            local_storage_options = local_storage_provider_dict.get("options", {})
 696        else:
 697            raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
 698
 699        storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
 700        return storage_provider
 701
 702    def _build_credentials_provider(
 703        self,
 704        credentials_provider_dict: Optional[dict[str, Any]],
 705        storage_options: Optional[dict[str, Any]] = None,
 706    ) -> Optional[CredentialsProvider]:
 707        """
 708        Initializes the CredentialsProvider based on the provided dictionary.
 709
 710        Args:
 711            credentials_provider_dict: Dictionary containing credentials provider configuration
 712            storage_options: Storage provider options required by some credentials providers to scope the credentials.
 713        """
 714        if not credentials_provider_dict:
 715            return None
 716
 717        if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
 718            # Fully qualified class path case
 719            class_type = credentials_provider_dict["type"]
 720            module_name, class_name = class_type.rsplit(".", 1)
 721            cls = import_class(class_name, module_name)
 722        else:
 723            # Mapped class name case
 724            class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
 725            module_name = ".providers"
 726            cls = import_class(class_name, module_name, PACKAGE_NAME)
 727
 728        # Propagate storage provider options to credentials provider since they may be
 729        # required by some credentials providers to scope the credentials.
 730        import inspect
 731
 732        init_params = list(inspect.signature(cls.__init__).parameters)[1:]  # skip 'self'
 733        options = credentials_provider_dict.get("options", {})
 734        if storage_options:
 735            for storage_provider_option in storage_options.keys():
 736                if storage_provider_option in init_params and storage_provider_option not in options:
 737                    options[storage_provider_option] = storage_options[storage_provider_option]
 738
 739        return cls(**options)
 740
 741    def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
 742        # Initialize StorageProvider
 743        storage_provider_dict = profile_dict.get("storage_provider", None)
 744        if storage_provider_dict:
 745            storage_provider_name = storage_provider_dict["type"]
 746            storage_options = storage_provider_dict.get("options", {})
 747        else:
 748            raise ValueError("Missing storage_provider in the config.")
 749
 750        # Initialize CredentialsProvider
 751        # It is prudent to assume that in some cases, the credentials provider
 752        # will provide credentials scoped to specific base_path.
 753        # So we need to pass the storage_options to the credentials provider.
 754        credentials_provider_dict = profile_dict.get("credentials_provider", None)
 755        credentials_provider = self._build_credentials_provider(
 756            credentials_provider_dict=credentials_provider_dict,
 757            storage_options=storage_options,
 758        )
 759
 760        # Initialize MetadataProvider
 761        metadata_provider_dict = profile_dict.get("metadata_provider", None)
 762        metadata_provider = None
 763        if metadata_provider_dict:
 764            if metadata_provider_dict["type"] == "manifest":
 765                metadata_options = metadata_provider_dict.get("options", {})
 766                # If MetadataProvider has a reference to a different storage provider profile
 767                storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
 768                if storage_provider_profile:
 769                    storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
 770                else:
 771                    storage_provider = self._build_storage_provider(
 772                        storage_provider_name, storage_options, credentials_provider
 773                    )
 774
 775                metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
 776            else:
 777                class_type = metadata_provider_dict["type"]
 778                if "." not in class_type:
 779                    raise ValueError(
 780                        f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
 781                    )
 782                module_name, class_name = class_type.rsplit(".", 1)
 783                cls = import_class(class_name, module_name)
 784                options = metadata_provider_dict.get("options", {})
 785                metadata_provider = cls(**options)
 786
 787        # Build replicas if configured
 788        replicas_config = profile_dict.get("replicas", [])
 789        replicas = []
 790        if replicas_config:
 791            for replica_dict in replicas_config:
 792                replicas.append(
 793                    Replica(
 794                        replica_profile=replica_dict["replica_profile"],
 795                        read_priority=replica_dict["read_priority"],
 796                    )
 797                )
 798
 799            # Sort replicas by read_priority
 800            replicas.sort(key=lambda r: r.read_priority)
 801
 802        return SimpleProviderBundle(
 803            storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
 804            credentials_provider=credentials_provider,
 805            metadata_provider=metadata_provider,
 806            replicas=replicas,
 807        )
 808
 809    def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
 810        class_type = provider_bundle_dict["type"]
 811        module_name, class_name = class_type.rsplit(".", 1)
 812        cls = import_class(class_name, module_name)
 813        options = provider_bundle_dict.get("options", {})
 814        return cls(**options)
 815
 816    def _build_provider_bundle(
 817        self, provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]]
 818    ) -> ProviderBundleV2:
 819        if provider_bundle:
 820            bundle = provider_bundle
 821        else:
 822            provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
 823            if provider_bundle_dict:
 824                bundle = self._build_provider_bundle_from_extension(provider_bundle_dict)
 825            else:
 826                bundle = self._build_provider_bundle_from_config(self._profile_dict)
 827
 828        if isinstance(bundle, ProviderBundle) and not isinstance(bundle, ProviderBundleV2):
 829            bundle = SimpleProviderBundleV2.from_v1_bundle(self._profile, bundle)
 830
 831        return bundle
 832
 833    def _inject_profiles_from_bundle(self) -> None:
 834        """
 835        Inject child profiles and build child configs for multi-backend configurations.
 836
 837        For ProviderBundleV2 with multiple backends, this method:
 838        1. Injects child profiles into config dict (needed for replica initialization)
 839        2. Pre-builds child configs so CompositeStorageClient can use them directly
 840
 841        Profile injection is required because SingleStorageClient._initialize_replicas()
 842        uses StorageClientConfig.from_dict() to create replica clients, which looks up
 843        profiles in the config dict.
 844        """
 845        self._child_configs: Optional[dict[str, StorageClientConfig]] = None
 846
 847        backends = self._provider_bundle.storage_backends
 848        if len(backends) > 1:
 849            # First, inject all child profiles into config dict (needed for replica lookup)
 850            profiles = copy.deepcopy(self._profiles)
 851            for child_name, backend in backends.items():
 852                child_profile_dict = {
 853                    "storage_provider": {
 854                        "type": backend.storage_provider_config.type,
 855                        "options": backend.storage_provider_config.options,
 856                    }
 857                }
 858
 859                if child_name in profiles:
 860                    # Profile already exists - check if it matches what we would inject
 861                    existing = profiles[child_name]
 862                    if existing.get("storage_provider") != child_profile_dict["storage_provider"]:
 863                        raise ValueError(
 864                            f"Profile '{child_name}' already exists in configuration with different settings."
 865                        )
 866                else:
 867                    profiles[child_name] = child_profile_dict
 868
 869            # Update config dict BEFORE building child configs
 870            resolved_config_dict = copy.deepcopy(self._resolved_config_dict)
 871            resolved_config_dict["profiles"] = profiles
 872            self._profiles = ImmutableDict(profiles)
 873            self._resolved_config_dict = ImmutableDict(resolved_config_dict)
 874
 875            # Now build child configs (they will get the updated config dict)
 876            retry_config = self._build_retry_config()
 877            child_configs: dict[str, StorageClientConfig] = {}
 878            for child_name, backend in backends.items():
 879                storage_provider = self._build_storage_provider(
 880                    backend.storage_provider_config.type,
 881                    backend.storage_provider_config.options,
 882                    backend.credentials_provider,
 883                )
 884
 885                child_config = StorageClientConfig(
 886                    profile=child_name,
 887                    storage_provider=storage_provider,
 888                    credentials_provider=backend.credentials_provider,
 889                    storage_provider_profiles=None,
 890                    child_configs=None,
 891                    metadata_provider=None,
 892                    cache_config=None,
 893                    cache_manager=None,
 894                    retry_config=retry_config,
 895                    telemetry_provider=self._telemetry_provider,
 896                    replicas=backend.replicas,
 897                    autocommit_config=None,
 898                )
 899                child_config._config_dict = self._resolved_config_dict
 900                child_configs[child_name] = child_config
 901
 902            self._child_configs = child_configs
 903
 904    def _build_retry_config(self) -> RetryConfig:
 905        """Build retry config from profile dict."""
 906        retry_config_dict = self._profile_dict.get("retry", None)
 907        if retry_config_dict:
 908            attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
 909            delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
 910            backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
 911            return RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
 912        else:
 913            return RetryConfig(
 914                attempts=DEFAULT_RETRY_ATTEMPTS,
 915                delay=DEFAULT_RETRY_DELAY,
 916                backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
 917            )
 918
 919    def _build_autocommit_config(self) -> AutoCommitConfig:
 920        """Build autocommit config from profile dict."""
 921        autocommit_dict = self._profile_dict.get("autocommit", None)
 922        if autocommit_dict:
 923            interval_minutes = autocommit_dict.get("interval_minutes", None)
 924            at_exit = autocommit_dict.get("at_exit", False)
 925            return AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
 926        return AutoCommitConfig()
 927
 928    def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
 929        if "size_mb" in cache_dict:
 930            raise ValueError(
 931                "The 'size_mb' property is no longer supported. \n"
 932                "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
 933                "Example configuration:\n"
 934                "cache:\n"
 935                "  size: 500G               # Optional: If not specified, default cache size (10GB) will be used\n"
 936                "  use_etag: true           # Optional: If not specified, default cache use_etag (true) will be used\n"
 937                "  location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
 938                "  eviction_policy:         # Optional: The eviction policy to use\n"
 939                "    policy: fifo           # Optional: The eviction policy to use, default is 'fifo'\n"
 940                "    refresh_interval: 300  # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
 941            )
 942
 943    def _validate_replicas(self, replicas: list[Replica]) -> None:
 944        """
 945        Validates that replica profiles do not have their own replicas configuration.
 946
 947        This prevents circular references where a replica profile could reference
 948        another profile that also has replicas, creating an infinite loop.
 949
 950        :param replicas: The list of Replica objects to validate
 951        :raises ValueError: If any replica profile has its own replicas configuration
 952        """
 953        for replica in replicas:
 954            replica_profile_name = replica.replica_profile
 955
 956            # Check that replica profile is not the same as the current profile
 957            if replica_profile_name == self._profile:
 958                raise ValueError(
 959                    f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
 960                )
 961
 962            # Check if the replica profile exists in the configuration
 963            if replica_profile_name not in self._profiles:
 964                raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
 965
 966            # Get the replica profile configuration
 967            replica_profile_dict = self._profiles[replica_profile_name]
 968
 969            # Check if the replica profile has its own replicas configuration
 970            if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
 971                raise ValueError(
 972                    f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
 973                    f"This creates a circular reference which is not allowed."
 974                )
 975
 976    # todo: remove once experimental features are stable
 977    def _validate_experimental_features(self, eviction_policy: EvictionPolicyConfig) -> None:
 978        """
 979        Validate that experimental features are enabled when used.
 980
 981        :param eviction_policy: The eviction policy configuration to validate
 982        :raises ValueError: If experimental features are used without being enabled
 983        """
 984        # Validate MRU eviction policy
 985        if eviction_policy.policy.upper() == "MRU":
 986            if not (self._experimental_features and self._experimental_features.get("cache_mru_eviction")):
 987                raise ValueError(
 988                    "MRU eviction policy is experimental and not enabled.\n"
 989                    "Enable it by adding to config:\n"
 990                    "  experimental_features:\n"
 991                    "    cache_mru_eviction: true"
 992                )
 993
 994        # Validate purge_factor
 995        if eviction_policy.purge_factor != 0:
 996            if eviction_policy.purge_factor < 0 or eviction_policy.purge_factor > 100:
 997                raise ValueError("purge_factor must be between 0 and 100")
 998
 999            if not (self._experimental_features and self._experimental_features.get("cache_purge_factor")):
1000                raise ValueError(
1001                    "purge_factor is experimental and not enabled.\n"
1002                    "Enable it by adding to config:\n"
1003                    "  experimental_features:\n"
1004                    "    cache_purge_factor: true"
1005                )
1006
1007    def build_config(self) -> "StorageClientConfig":
1008        bundle = self._provider_bundle
1009        backends = bundle.storage_backends
1010
1011        if len(backends) > 1:
1012            if not bundle.metadata_provider:
1013                raise ValueError(
1014                    f"Multi-backend configuration for profile '{self._profile}' requires metadata_provider "
1015                    "for routing between storage locations."
1016                )
1017
1018            child_profile_names = list(backends.keys())
1019            retry_config = self._build_retry_config()
1020            autocommit_config = self._build_autocommit_config()
1021
1022            # config for Composite StorageClient
1023            config = StorageClientConfig(
1024                profile=self._profile,
1025                storage_provider=None,
1026                credentials_provider=None,
1027                storage_provider_profiles=child_profile_names,
1028                child_configs=self._child_configs,
1029                metadata_provider=bundle.metadata_provider,
1030                cache_config=None,
1031                cache_manager=None,
1032                retry_config=retry_config,
1033                telemetry_provider=self._telemetry_provider,
1034                replicas=[],
1035                autocommit_config=autocommit_config,
1036            )
1037
1038            config._config_dict = self._resolved_config_dict
1039            return config
1040
1041        # Single-backend (len == 1)
1042        _, backend = list(backends.items())[0]
1043        storage_provider = self._build_storage_provider(
1044            backend.storage_provider_config.type,
1045            backend.storage_provider_config.options,
1046            backend.credentials_provider,
1047        )
1048        credentials_provider = backend.credentials_provider
1049        metadata_provider = bundle.metadata_provider
1050        replicas = backend.replicas
1051
1052        # Validate replicas to prevent circular references
1053        if replicas:
1054            self._validate_replicas(replicas)
1055
1056        cache_config: Optional[CacheConfig] = None
1057        cache_manager: Optional[CacheManager] = None
1058
1059        # Check if caching is enabled for this profile
1060        caching_enabled = self._profile_dict.get("caching_enabled", False)
1061
1062        if self._cache_dict is not None and caching_enabled:
1063            tempdir = tempfile.gettempdir()
1064            default_location = os.path.join(tempdir, "msc_cache")
1065            location = self._cache_dict.get("location", default_location)
1066
1067            # Check if cache_backend.cache_path is defined
1068            cache_backend = self._cache_dict.get("cache_backend", {})
1069            cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
1070
1071            # Warn if both location and cache_backend.cache_path are defined
1072            if cache_backend_path and self._cache_dict.get("location") is not None:
1073                logger.warning(
1074                    f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
1075                    f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
1076                )
1077            elif cache_backend_path:
1078                # Use cache_backend.cache_path only if location is not explicitly defined
1079                location = cache_backend_path
1080
1081            # Resolve the effective flag while preserving explicit ``False``.
1082            if "check_source_version" in self._cache_dict:
1083                check_source_version = self._cache_dict["check_source_version"]
1084            else:
1085                check_source_version = self._cache_dict.get("use_etag", True)
1086
1087            # Warn if both keys are specified – the new one wins.
1088            if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
1089                logger.warning(
1090                    "Both 'check_source_version' and 'use_etag' are defined in cache config. "
1091                    "Using 'check_source_version' and ignoring 'use_etag'."
1092                )
1093
1094            if not Path(location).is_absolute():
1095                raise ValueError(f"Cache location must be an absolute path: {location}")
1096
1097            # Initialize cache_dict with default values
1098            cache_dict = self._cache_dict
1099
1100            # Verify cache config
1101            self._verify_cache_config(cache_dict)
1102
1103            # Initialize eviction policy
1104            if "eviction_policy" in cache_dict:
1105                policy = cache_dict["eviction_policy"]["policy"].lower()
1106                purge_factor = cache_dict["eviction_policy"].get("purge_factor", 0)
1107                eviction_policy = EvictionPolicyConfig(
1108                    policy=policy,
1109                    refresh_interval=cache_dict["eviction_policy"].get(
1110                        "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
1111                    ),
1112                    purge_factor=purge_factor,
1113                )
1114            else:
1115                eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
1116
1117            # todo: remove once experimental features are stable
1118            # Validate experimental features before creating cache_config
1119            self._validate_experimental_features(eviction_policy)
1120
1121            # Create cache config from the standardized format
1122            cache_config = CacheConfig(
1123                size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
1124                location=cache_dict.get("location", location),
1125                check_source_version=check_source_version,
1126                eviction_policy=eviction_policy,
1127                cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
1128            )
1129
1130            cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
1131        elif self._cache_dict is not None and not caching_enabled:
1132            logger.debug(f"Caching is disabled for profile '{self._profile}'")
1133        elif self._cache_dict is None and caching_enabled:
1134            logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
1135
1136        retry_config = self._build_retry_config()
1137        autocommit_config = self._build_autocommit_config()
1138
1139        config = StorageClientConfig(
1140            profile=self._profile,
1141            storage_provider=storage_provider,
1142            credentials_provider=credentials_provider,
1143            storage_provider_profiles=None,
1144            child_configs=None,
1145            metadata_provider=metadata_provider,
1146            cache_config=cache_config,
1147            cache_manager=cache_manager,
1148            retry_config=retry_config,
1149            telemetry_provider=self._telemetry_provider,
1150            replicas=replicas,
1151            autocommit_config=autocommit_config,
1152        )
1153
1154        config._config_dict = self._resolved_config_dict
1155        return config
1156
1157
1158class PathMapping:
1159    """
1160    Class to handle path mappings defined in the MSC configuration.
1161
1162    Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1163    where entries are sorted by prefix length (longest first) for optimal matching.
1164    Longer paths take precedence when matching.
1165    """
1166
1167    def __init__(self):
1168        """Initialize an empty PathMapping."""
1169        self._mapping = defaultdict(lambda: defaultdict(list))
1170
1171    @classmethod
1172    def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
1173        """
1174        Create a PathMapping instance from configuration dictionary.
1175
1176        :param config_dict: Configuration dictionary, if None the config will be loaded
1177        :return: A PathMapping instance with processed mappings
1178        """
1179        if config_dict is None:
1180            # Import locally to avoid circular imports
1181            from multistorageclient.config import StorageClientConfig
1182
1183            config_dict, _ = StorageClientConfig.read_msc_config()
1184
1185        if not config_dict:
1186            return cls()
1187
1188        instance = cls()
1189        instance._load_mapping(config_dict)
1190        return instance
1191
1192    def _load_mapping(self, config_dict: dict[str, Any]) -> None:
1193        """
1194        Load path mapping from a configuration dictionary.
1195
1196        :param config_dict: Configuration dictionary containing path mapping
1197        """
1198        # Get the path_mapping section
1199        path_mapping = config_dict.get("path_mapping", {})
1200        if path_mapping is None:
1201            return
1202
1203        # Process each mapping
1204        for source_path, dest_path in path_mapping.items():
1205            # Validate format
1206            if not source_path.endswith("/"):
1207                continue
1208            if not dest_path.startswith(MSC_PROTOCOL):
1209                continue
1210            if not dest_path.endswith("/"):
1211                continue
1212
1213            # Extract the destination profile
1214            pr_dest = urlparse(dest_path)
1215            dest_profile = pr_dest.netloc
1216
1217            # Parse the source path
1218            pr = urlparse(source_path)
1219            protocol = pr.scheme.lower() if pr.scheme else "file"
1220
1221            if protocol == "file" or source_path.startswith("/"):
1222                # For file or absolute paths, use the whole path as the prefix
1223                # and leave bucket empty
1224                bucket = ""
1225                prefix = source_path if source_path.startswith("/") else pr.path
1226            else:
1227                # For object storage, extract bucket and prefix
1228                bucket = pr.netloc
1229                prefix = pr.path
1230                if prefix.startswith("/"):
1231                    prefix = prefix[1:]
1232
1233            # Add the mapping to the nested dict
1234            self._mapping[protocol][bucket].append((prefix, dest_profile))
1235
1236        # Sort each bucket's prefixes by length (longest first) for optimal matching
1237        for protocol, buckets in self._mapping.items():
1238            for bucket, prefixes in buckets.items():
1239                self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
1240
1241    def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
1242        """
1243        Find the best matching mapping for the given URL.
1244
1245        :param url: URL to find matching mapping for
1246        :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
1247        """
1248        # Parse the URL
1249        pr = urlparse(url)
1250        protocol = pr.scheme.lower() if pr.scheme else "file"
1251
1252        # For file paths or absolute paths
1253        if protocol == "file" or url.startswith("/"):
1254            path = url if url.startswith("/") else pr.path
1255
1256            possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
1257
1258            # Check each prefix (already sorted by length, longest first)
1259            for prefix, profile in possible_mapping:
1260                if path.startswith(prefix):
1261                    # Calculate the relative path
1262                    rel_path = path[len(prefix) :]
1263                    if not rel_path.startswith("/"):
1264                        rel_path = "/" + rel_path
1265                    return profile, rel_path
1266
1267            return None
1268
1269        # For object storage
1270        bucket = pr.netloc
1271        path = pr.path
1272        if path.startswith("/"):
1273            path = path[1:]
1274
1275        # Check bucket-specific mapping
1276        possible_mapping = (
1277            self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
1278        )
1279
1280        # Check each prefix (already sorted by length, longest first)
1281        for prefix, profile in possible_mapping:
1282            # matching prefix
1283            if path.startswith(prefix):
1284                rel_path = path[len(prefix) :]
1285                # Remove leading slash if present
1286                if rel_path.startswith("/"):
1287                    rel_path = rel_path[1:]
1288
1289                return profile, rel_path
1290
1291        return None
1292
1293
[docs] 1294class StorageClientConfig: 1295 """ 1296 Configuration class for the :py:class:`multistorageclient.StorageClient`. 1297 """ 1298 1299 profile: str 1300 storage_provider: Optional[StorageProvider] 1301 credentials_provider: Optional[CredentialsProvider] 1302 storage_provider_profiles: Optional[list[str]] 1303 child_configs: Optional[dict[str, "StorageClientConfig"]] 1304 metadata_provider: Optional[MetadataProvider] 1305 cache_config: Optional[CacheConfig] 1306 cache_manager: Optional[CacheManager] 1307 retry_config: Optional[RetryConfig] 1308 telemetry_provider: Optional[Callable[[], Telemetry]] 1309 replicas: list[Replica] 1310 autocommit_config: Optional[AutoCommitConfig] 1311 1312 _config_dict: Optional[dict[str, Any]] 1313 1314 def __init__( 1315 self, 1316 profile: str, 1317 storage_provider: Optional[StorageProvider] = None, 1318 credentials_provider: Optional[CredentialsProvider] = None, 1319 storage_provider_profiles: Optional[list[str]] = None, 1320 child_configs: Optional[dict[str, "StorageClientConfig"]] = None, 1321 metadata_provider: Optional[MetadataProvider] = None, 1322 cache_config: Optional[CacheConfig] = None, 1323 cache_manager: Optional[CacheManager] = None, 1324 retry_config: Optional[RetryConfig] = None, 1325 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1326 replicas: Optional[list[Replica]] = None, 1327 autocommit_config: Optional[AutoCommitConfig] = None, 1328 ): 1329 # exactly one of storage_provider or storage_provider_profiles must be set 1330 if storage_provider and storage_provider_profiles: 1331 raise ValueError( 1332 "Cannot specify both storage_provider and storage_provider_profiles. " 1333 "Use storage_provider for SingleStorageClient or storage_provider_profiles for CompositeStorageClient." 1334 ) 1335 if not storage_provider and not storage_provider_profiles: 1336 raise ValueError("Must specify either storage_provider or storage_provider_profiles.") 1337 1338 if replicas is None: 1339 replicas = [] 1340 self.profile = profile 1341 self.storage_provider = storage_provider 1342 self.credentials_provider = credentials_provider 1343 self.storage_provider_profiles = storage_provider_profiles 1344 self.child_configs = child_configs 1345 self.metadata_provider = metadata_provider 1346 self.cache_config = cache_config 1347 self.cache_manager = cache_manager 1348 self.retry_config = retry_config 1349 self.telemetry_provider = telemetry_provider 1350 self.replicas = replicas 1351 self.autocommit_config = autocommit_config 1352
[docs] 1353 @staticmethod 1354 def from_json( 1355 config_json: str, 1356 profile: str = DEFAULT_POSIX_PROFILE_NAME, 1357 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1358 ) -> "StorageClientConfig": 1359 """ 1360 Load a storage client configuration from a JSON string. 1361 1362 :param config_json: Configuration JSON string. 1363 :param profile: Profile to use. 1364 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1365 """ 1366 config_dict = json.loads(config_json) 1367 return StorageClientConfig.from_dict( 1368 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 1369 )
1370
[docs] 1371 @staticmethod 1372 def from_yaml( 1373 config_yaml: str, 1374 profile: str = DEFAULT_POSIX_PROFILE_NAME, 1375 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1376 ) -> "StorageClientConfig": 1377 """ 1378 Load a storage client configuration from a YAML string. 1379 1380 :param config_yaml: Configuration YAML string. 1381 :param profile: Profile to use. 1382 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1383 """ 1384 config_dict = yaml.safe_load(config_yaml) 1385 return StorageClientConfig.from_dict( 1386 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 1387 )
1388
[docs] 1389 @staticmethod 1390 def from_dict( 1391 config_dict: dict[str, Any], 1392 profile: str = DEFAULT_POSIX_PROFILE_NAME, 1393 skip_validation: bool = False, 1394 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1395 ) -> "StorageClientConfig": 1396 """ 1397 Load a storage client configuration from a Python dictionary. 1398 1399 :param config_dict: Configuration Python dictionary. 1400 :param profile: Profile to use. 1401 :param skip_validation: Skip configuration schema validation. 1402 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1403 """ 1404 # Validate the config file with predefined JSON schema 1405 if not skip_validation: 1406 validate_config(config_dict) 1407 1408 # Load config 1409 loader = StorageClientConfigLoader( 1410 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 1411 ) 1412 config = loader.build_config() 1413 1414 return config
1415
[docs] 1416 @staticmethod 1417 def from_file( 1418 config_file_paths: Optional[Iterable[str]] = None, 1419 profile: str = DEFAULT_POSIX_PROFILE_NAME, 1420 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1421 ) -> "StorageClientConfig": 1422 """ 1423 Load a storage client configuration from the first file found. 1424 1425 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`). 1426 :param profile: Profile to use. 1427 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 1428 """ 1429 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths) 1430 # Parse rclone config file. 1431 rclone_config_dict, rclone_config_file = read_rclone_config() 1432 1433 # Merge config files. 1434 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict) 1435 if conflicted_keys: 1436 raise ValueError( 1437 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}' 1438 ) 1439 merged_profiles = merged_config.get("profiles", {}) 1440 1441 # Check if profile is in merged_profiles 1442 if profile in merged_profiles: 1443 return StorageClientConfig.from_dict( 1444 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider 1445 ) 1446 else: 1447 # Check if profile is the default profile or an implicit profile 1448 if profile == DEFAULT_POSIX_PROFILE_NAME: 1449 implicit_profile_config = DEFAULT_POSIX_PROFILE 1450 elif profile.startswith("_"): 1451 # Handle implicit profiles 1452 parts = profile[1:].split("-", 1) 1453 if len(parts) == 2: 1454 protocol, bucket = parts 1455 # Verify it's a supported protocol 1456 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 1457 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"') 1458 implicit_profile_config = create_implicit_profile_config( 1459 profile_name=profile, protocol=protocol, base_path=bucket 1460 ) 1461 else: 1462 raise ValueError(f'Invalid implicit profile format: "{profile}"') 1463 else: 1464 raise ValueError( 1465 f'Profile "{profile}" not found in configuration files. Configuration was checked in ' 1466 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. " 1467 f"Please verify that the profile exists and that configuration files are correctly located." 1468 ) 1469 # merge the implicit profile config into the merged config so the cache & observability config can be inherited 1470 if "profiles" not in merged_config: 1471 merged_config["profiles"] = implicit_profile_config["profiles"] 1472 else: 1473 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile] 1474 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_" 1475 return StorageClientConfig.from_dict( 1476 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider 1477 )
1478 1479 @staticmethod 1480 def from_provider_bundle( 1481 config_dict: dict[str, Any], 1482 provider_bundle: Union[ProviderBundle, ProviderBundleV2], 1483 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1484 ) -> "StorageClientConfig": 1485 loader = StorageClientConfigLoader( 1486 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider 1487 ) 1488 config = loader.build_config() 1489 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors 1490 return config 1491
[docs] 1492 @staticmethod 1493 def read_msc_config( 1494 config_file_paths: Optional[Iterable[str]] = None, 1495 ) -> tuple[Optional[dict[str, Any]], Optional[str]]: 1496 """Get the MSC configuration dictionary and the path of the first file found. 1497 1498 If no config paths are specified, configs are searched in the following order: 1499 1500 1. ``MSC_CONFIG`` environment variable (highest precedence) 1501 2. Standard search paths (user-specified config and system-wide config) 1502 1503 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used. 1504 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration 1505 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute 1506 path of the config file used, or ``None`` if no config file was found. 1507 """ 1508 config_dict: dict[str, Any] = {} 1509 config_file_path: Optional[str] = None 1510 1511 config_file_paths = list(config_file_paths or []) 1512 1513 # Add default paths if none provided. 1514 if len(config_file_paths) == 0: 1515 # Environment variable. 1516 msc_config_env = os.getenv("MSC_CONFIG", None) 1517 if msc_config_env is not None: 1518 config_file_paths.append(msc_config_env) 1519 1520 # Standard search paths. 1521 config_file_paths.extend(_find_config_file_paths()) 1522 1523 # Normalize + absolutize paths. 1524 config_file_paths = [os.path.abspath(path) for path in config_file_paths] 1525 1526 # Log plan. 1527 logger.debug(f"Searching MSC config file paths: {config_file_paths}") 1528 1529 # Load config. 1530 for path in config_file_paths: 1531 if os.path.exists(path): 1532 try: 1533 with open(path) as f: 1534 if path.endswith(".json"): 1535 config_dict = json.load(f) 1536 else: 1537 config_dict = yaml.safe_load(f) 1538 config_file_path = path 1539 # Use the first config file. 1540 break 1541 except Exception as e: 1542 raise ValueError(f"malformed MSC config file: {path}, exception: {e}") 1543 1544 # Log result. 1545 if config_file_path is None: 1546 logger.debug("No MSC config files found in any of the search locations.") 1547 else: 1548 logger.debug(f"Using MSC config file: {config_file_path}") 1549 1550 if config_dict: 1551 validate_config(config_dict) 1552 1553 if "include" in config_dict and config_file_path: 1554 config_dict = _load_and_merge_includes(config_file_path, config_dict) 1555 1556 return config_dict, config_file_path
1557
[docs] 1558 @staticmethod 1559 def read_path_mapping() -> PathMapping: 1560 """ 1561 Get the path mapping defined in the MSC configuration. 1562 1563 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)] 1564 where entries are sorted by prefix length (longest first) for optimal matching. 1565 Longer paths take precedence when matching. 1566 1567 :return: A PathMapping instance with translation mappings 1568 """ 1569 try: 1570 return PathMapping.from_config() 1571 except Exception: 1572 # Log the error but continue - this shouldn't stop the application from working 1573 logger.error("Failed to load path_mapping from MSC config") 1574 return PathMapping()
1575 1576 def __getstate__(self) -> dict[str, Any]: 1577 state = self.__dict__.copy() 1578 if not state.get("_config_dict"): 1579 raise ValueError("StorageClientConfig is not serializable") 1580 del state["credentials_provider"] 1581 del state["storage_provider"] 1582 del state["metadata_provider"] 1583 del state["cache_manager"] 1584 del state["replicas"] 1585 del state["child_configs"] 1586 return state 1587 1588 def __setstate__(self, state: dict[str, Any]) -> None: 1589 # Presence checked by __getstate__. 1590 config_dict = state["_config_dict"] 1591 loader = StorageClientConfigLoader( 1592 config_dict=config_dict, 1593 profile=state["profile"], 1594 telemetry_provider=state["telemetry_provider"], 1595 ) 1596 new_config = loader.build_config() 1597 self.profile = new_config.profile 1598 self.storage_provider = new_config.storage_provider 1599 self.credentials_provider = new_config.credentials_provider 1600 self.storage_provider_profiles = new_config.storage_provider_profiles 1601 self.child_configs = new_config.child_configs 1602 self.metadata_provider = new_config.metadata_provider 1603 self.cache_config = new_config.cache_config 1604 self.cache_manager = new_config.cache_manager 1605 self.retry_config = new_config.retry_config 1606 self.telemetry_provider = new_config.telemetry_provider 1607 self._config_dict = config_dict 1608 self.replicas = new_config.replicas 1609 self.autocommit_config = new_config.autocommit_config