Source code for multistorageclient.config

   1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
   2# SPDX-License-Identifier: Apache-2.0
   3#
   4# Licensed under the Apache License, Version 2.0 (the "License");
   5# you may not use this file except in compliance with the License.
   6# You may obtain a copy of the License at
   7#
   8# http://www.apache.org/licenses/LICENSE-2.0
   9#
  10# Unless required by applicable law or agreed to in writing, software
  11# distributed under the License is distributed on an "AS IS" BASIS,
  12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13# See the License for the specific language governing permissions and
  14# limitations under the License.
  15
  16import copy
  17import json
  18import logging
  19import os
  20import tempfile
  21from collections import defaultdict
  22from collections.abc import Callable
  23from pathlib import Path
  24from typing import Any, Iterable, Optional
  25from urllib.parse import urlparse
  26
  27import yaml
  28
  29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
  30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
  31from .instrumentation import setup_opentelemetry
  32from .providers.manifest_metadata import ManifestMetadataProvider
  33from .rclone import read_rclone_config
  34from .schema import validate_config
  35from .telemetry import Telemetry
  36from .telemetry import init as telemetry_init
  37from .types import (
  38    DEFAULT_RETRY_ATTEMPTS,
  39    DEFAULT_RETRY_BACKOFF_MULTIPLIER,
  40    DEFAULT_RETRY_DELAY,
  41    MSC_PROTOCOL,
  42    AutoCommitConfig,
  43    CredentialsProvider,
  44    MetadataProvider,
  45    ProviderBundle,
  46    Replica,
  47    RetryConfig,
  48    StorageProvider,
  49    StorageProviderConfig,
  50)
  51from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
  52
  53# Constants related to implicit profiles
  54SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
  55PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
  56    "s3": "s3",
  57    "gs": "gcs",
  58    "ais": "ais",
  59    "file": "file",
  60}
  61
  62
  63# Template for creating implicit profile configurations
  64def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
  65    """
  66    Create a configuration dictionary for an implicit profile.
  67
  68    :param profile_name: The name of the profile (e.g., "_s3-bucket1")
  69    :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
  70    :param base_path: The base path (e.g., bucket name) for the storage provider
  71
  72    :return: A configuration dictionary for the implicit profile
  73    """
  74    provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
  75    return {
  76        "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
  77    }
  78
  79
  80DEFAULT_POSIX_PROFILE_NAME = "default"
  81DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
  82
  83STORAGE_PROVIDER_MAPPING = {
  84    "file": "PosixFileStorageProvider",
  85    "s3": "S3StorageProvider",
  86    "gcs": "GoogleStorageProvider",
  87    "oci": "OracleStorageProvider",
  88    "azure": "AzureBlobStorageProvider",
  89    "ais": "AIStoreStorageProvider",
  90    "s8k": "S8KStorageProvider",
  91    "gcs_s3": "GoogleS3StorageProvider",
  92    "huggingface": "HuggingFaceStorageProvider",
  93}
  94
  95CREDENTIALS_PROVIDER_MAPPING = {
  96    "S3Credentials": "StaticS3CredentialsProvider",
  97    "AzureCredentials": "StaticAzureCredentialsProvider",
  98    "AISCredentials": "StaticAISCredentialProvider",
  99    "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
 100    "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
 101}
 102
 103
 104def _find_config_file_paths() -> tuple[str]:
 105    """
 106    Get configuration file search paths.
 107
 108    Returns paths in order of precedence:
 109
 110    1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
 111    2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
 112    """
 113    paths = []
 114
 115    # 1. User-specific configuration directory
 116    xdg_config_home = os.getenv("XDG_CONFIG_HOME")
 117
 118    if xdg_config_home:
 119        paths.extend(
 120            [
 121                os.path.join(xdg_config_home, "msc", "config.yaml"),
 122                os.path.join(xdg_config_home, "msc", "config.json"),
 123            ]
 124        )
 125
 126    user_home = os.getenv("HOME")
 127
 128    if user_home:
 129        paths.extend(
 130            [
 131                os.path.join(user_home, ".msc_config.yaml"),
 132                os.path.join(user_home, ".msc_config.json"),
 133                os.path.join(user_home, ".config", "msc", "config.yaml"),
 134                os.path.join(user_home, ".config", "msc", "config.json"),
 135            ]
 136        )
 137
 138    # 2. System-wide configuration directories
 139    xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
 140    if not xdg_config_dirs:
 141        xdg_config_dirs = "/etc/xdg"
 142
 143    for config_dir in xdg_config_dirs.split(":"):
 144        config_dir = config_dir.strip()
 145        if config_dir:
 146            paths.extend(
 147                [
 148                    os.path.join(config_dir, "msc", "config.yaml"),
 149                    os.path.join(config_dir, "msc", "config.json"),
 150                ]
 151            )
 152
 153    paths.extend(
 154        [
 155            "/etc/msc_config.yaml",
 156            "/etc/msc_config.json",
 157        ]
 158    )
 159
 160    return tuple(paths)
 161
 162
 163PACKAGE_NAME = "multistorageclient"
 164
 165logger = logging.getLogger(__name__)
 166
 167
 168class SimpleProviderBundle(ProviderBundle):
 169    def __init__(
 170        self,
 171        storage_provider_config: StorageProviderConfig,
 172        credentials_provider: Optional[CredentialsProvider] = None,
 173        metadata_provider: Optional[MetadataProvider] = None,
 174        replicas: Optional[list[Replica]] = None,
 175    ):
 176        if replicas is None:
 177            replicas = []
 178
 179        self._storage_provider_config = storage_provider_config
 180        self._credentials_provider = credentials_provider
 181        self._metadata_provider = metadata_provider
 182        self._replicas = replicas
 183
 184    @property
 185    def storage_provider_config(self) -> StorageProviderConfig:
 186        return self._storage_provider_config
 187
 188    @property
 189    def credentials_provider(self) -> Optional[CredentialsProvider]:
 190        return self._credentials_provider
 191
 192    @property
 193    def metadata_provider(self) -> Optional[MetadataProvider]:
 194        return self._metadata_provider
 195
 196    @property
 197    def replicas(self) -> list[Replica]:
 198        return self._replicas
 199
 200
 201DEFAULT_CACHE_REFRESH_INTERVAL = 300
 202
 203
 204class StorageClientConfigLoader:
 205    _provider_bundle: Optional[ProviderBundle]
 206    _resolved_config_dict: dict[str, Any]
 207    _profiles: dict[str, Any]
 208    _profile: str
 209    _profile_dict: dict[str, Any]
 210    _opentelemetry_dict: Optional[dict[str, Any]]
 211    _telemetry_provider: Optional[Callable[[], Telemetry]]
 212    _cache_dict: Optional[dict[str, Any]]
 213
 214    def __init__(
 215        self,
 216        config_dict: dict[str, Any],
 217        profile: str = DEFAULT_POSIX_PROFILE_NAME,
 218        provider_bundle: Optional[ProviderBundle] = None,
 219        telemetry_provider: Optional[Callable[[], Telemetry]] = None,
 220    ) -> None:
 221        """
 222        Initializes a :py:class:`StorageClientConfigLoader` to create a
 223        StorageClientConfig. Components are built using the ``config_dict`` and
 224        profile, but a pre-built provider_bundle takes precedence.
 225
 226        :param config_dict: Dictionary of configuration options.
 227        :param profile: Name of profile in ``config_dict`` to use to build configuration.
 228        :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle`, takes precedence over ``config_dict``.
 229        :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
 230        """
 231        # ProviderBundle takes precedence
 232        self._provider_bundle = provider_bundle
 233
 234        # Interpolates all environment variables into actual values.
 235        config_dict = expand_env_vars(config_dict)
 236        self._resolved_config_dict = config_dict
 237
 238        self._profiles = config_dict.get("profiles", {})
 239
 240        if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
 241            # Assign the default POSIX profile
 242            self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
 243        else:
 244            # Cannot override default POSIX profile
 245            storage_provider_type = (
 246                self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
 247            )
 248            if storage_provider_type != "file":
 249                raise ValueError(
 250                    f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
 251                    f'"{storage_provider_type}"; expected "file".'
 252                )
 253
 254        profile_dict = self._profiles.get(profile)
 255
 256        if not profile_dict:
 257            raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
 258
 259        self._profile = profile
 260        self._profile_dict = profile_dict
 261
 262        self._opentelemetry_dict = config_dict.get("opentelemetry", None)
 263        # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
 264        # New processes (e.g. multiprocessing manager server) can't be created during this phase.
 265        #
 266        # Pass thunks everywhere instead for lazy telemetry initialization.
 267        self._telemetry_provider = telemetry_provider or telemetry_init
 268
 269        self._cache_dict = config_dict.get("cache", None)
 270
 271    def _build_storage_provider(
 272        self,
 273        storage_provider_name: str,
 274        storage_options: Optional[dict[str, Any]] = None,
 275        credentials_provider: Optional[CredentialsProvider] = None,
 276    ) -> StorageProvider:
 277        if storage_options is None:
 278            storage_options = {}
 279        if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
 280            raise ValueError(
 281                f"Storage provider {storage_provider_name} is not supported. "
 282                f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
 283            )
 284        if credentials_provider:
 285            storage_options["credentials_provider"] = credentials_provider
 286        if self._resolved_config_dict is not None:
 287            # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
 288            storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
 289        if self._telemetry_provider is not None:
 290            storage_options["telemetry_provider"] = self._telemetry_provider
 291        class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
 292        module_name = ".providers"
 293        cls = import_class(class_name, module_name, PACKAGE_NAME)
 294        return cls(**storage_options)
 295
 296    def _build_storage_provider_from_profile(self, storage_provider_profile: str):
 297        storage_profile_dict = self._profiles.get(storage_provider_profile)
 298        if not storage_profile_dict:
 299            raise ValueError(
 300                f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
 301            )
 302
 303        # Check if metadata provider is configured for this profile
 304        # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
 305        local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
 306        if local_metadata_provider_dict:
 307            raise ValueError(
 308                f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
 309            )
 310
 311        # Initialize CredentialsProvider
 312        local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
 313        local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
 314
 315        # Initialize StorageProvider
 316        local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
 317        if local_storage_provider_dict:
 318            local_name = local_storage_provider_dict["type"]
 319            local_storage_options = local_storage_provider_dict.get("options", {})
 320        else:
 321            raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
 322
 323        storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
 324        return storage_provider
 325
 326    def _build_credentials_provider(
 327        self,
 328        credentials_provider_dict: Optional[dict[str, Any]],
 329        storage_options: Optional[dict[str, Any]] = None,
 330    ) -> Optional[CredentialsProvider]:
 331        """
 332        Initializes the CredentialsProvider based on the provided dictionary.
 333
 334        Args:
 335            credentials_provider_dict: Dictionary containing credentials provider configuration
 336            storage_options: Storage provider options required by some credentials providers to scope the credentials.
 337        """
 338        if not credentials_provider_dict:
 339            return None
 340
 341        if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
 342            # Fully qualified class path case
 343            class_type = credentials_provider_dict["type"]
 344            module_name, class_name = class_type.rsplit(".", 1)
 345            cls = import_class(class_name, module_name)
 346        else:
 347            # Mapped class name case
 348            class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
 349            module_name = ".providers"
 350            cls = import_class(class_name, module_name, PACKAGE_NAME)
 351
 352        # Propagate storage provider options to credentials provider since they may be
 353        # required by some credentials providers to scope the credentials.
 354        import inspect
 355
 356        init_params = list(inspect.signature(cls.__init__).parameters)[1:]  # skip 'self'
 357        options = credentials_provider_dict.get("options", {})
 358        if storage_options:
 359            for storage_provider_option in storage_options.keys():
 360                if storage_provider_option in init_params and storage_provider_option not in options:
 361                    options[storage_provider_option] = storage_options[storage_provider_option]
 362
 363        return cls(**options)
 364
 365    def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
 366        # Initialize StorageProvider
 367        storage_provider_dict = profile_dict.get("storage_provider", None)
 368        if storage_provider_dict:
 369            storage_provider_name = storage_provider_dict["type"]
 370            storage_options = storage_provider_dict.get("options", {})
 371        else:
 372            raise ValueError("Missing storage_provider in the config.")
 373
 374        # Initialize CredentialsProvider
 375        # It is prudent to assume that in some cases, the credentials provider
 376        # will provide credentials scoped to specific base_path.
 377        # So we need to pass the storage_options to the credentials provider.
 378        credentials_provider_dict = profile_dict.get("credentials_provider", None)
 379        credentials_provider = self._build_credentials_provider(
 380            credentials_provider_dict=credentials_provider_dict,
 381            storage_options=storage_options,
 382        )
 383
 384        # Initialize MetadataProvider
 385        metadata_provider_dict = profile_dict.get("metadata_provider", None)
 386        metadata_provider = None
 387        if metadata_provider_dict:
 388            if metadata_provider_dict["type"] == "manifest":
 389                metadata_options = metadata_provider_dict.get("options", {})
 390                # If MetadataProvider has a reference to a different storage provider profile
 391                storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
 392                if storage_provider_profile:
 393                    storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
 394                else:
 395                    storage_provider = self._build_storage_provider(
 396                        storage_provider_name, storage_options, credentials_provider
 397                    )
 398
 399                metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
 400            else:
 401                class_type = metadata_provider_dict["type"]
 402                if "." not in class_type:
 403                    raise ValueError(
 404                        f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
 405                    )
 406                module_name, class_name = class_type.rsplit(".", 1)
 407                cls = import_class(class_name, module_name)
 408                options = metadata_provider_dict.get("options", {})
 409                metadata_provider = cls(**options)
 410
 411        # Build replicas if configured
 412        replicas_config = profile_dict.get("replicas", [])
 413        replicas = []
 414        if replicas_config:
 415            for replica_dict in replicas_config:
 416                replicas.append(
 417                    Replica(
 418                        replica_profile=replica_dict["replica_profile"],
 419                        read_priority=replica_dict["read_priority"],
 420                    )
 421                )
 422
 423            # Sort replicas by read_priority
 424            replicas.sort(key=lambda r: r.read_priority)
 425
 426        return SimpleProviderBundle(
 427            storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
 428            credentials_provider=credentials_provider,
 429            metadata_provider=metadata_provider,
 430            replicas=replicas,
 431        )
 432
 433    def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
 434        class_type = provider_bundle_dict["type"]
 435        module_name, class_name = class_type.rsplit(".", 1)
 436        cls = import_class(class_name, module_name)
 437        options = provider_bundle_dict.get("options", {})
 438        return cls(**options)
 439
 440    def _build_provider_bundle(self) -> ProviderBundle:
 441        if self._provider_bundle:
 442            return self._provider_bundle  # Return if previously provided.
 443
 444        # Load 3rd party extension
 445        provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
 446        if provider_bundle_dict:
 447            return self._build_provider_bundle_from_extension(provider_bundle_dict)
 448
 449        return self._build_provider_bundle_from_config(self._profile_dict)
 450
 451    def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
 452        if "size_mb" in cache_dict:
 453            raise ValueError(
 454                "The 'size_mb' property is no longer supported. \n"
 455                "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
 456                "Example configuration:\n"
 457                "cache:\n"
 458                "  size: 500G               # Optional: If not specified, default cache size (10GB) will be used\n"
 459                "  use_etag: true           # Optional: If not specified, default cache use_etag (true) will be used\n"
 460                "  location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
 461                "  eviction_policy:         # Optional: The eviction policy to use\n"
 462                "    policy: fifo           # Optional: The eviction policy to use, default is 'fifo'\n"
 463                "    refresh_interval: 300  # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
 464            )
 465
 466    def _validate_replicas(self, replicas: list[Replica]) -> None:
 467        """
 468        Validates that replica profiles do not have their own replicas configuration.
 469
 470        This prevents circular references where a replica profile could reference
 471        another profile that also has replicas, creating an infinite loop.
 472
 473        :param replicas: The list of Replica objects to validate
 474        :raises ValueError: If any replica profile has its own replicas configuration
 475        """
 476        for replica in replicas:
 477            replica_profile_name = replica.replica_profile
 478
 479            # Check that replica profile is not the same as the current profile
 480            if replica_profile_name == self._profile:
 481                raise ValueError(
 482                    f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
 483                )
 484
 485            # Check if the replica profile exists in the configuration
 486            if replica_profile_name not in self._profiles:
 487                raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
 488
 489            # Get the replica profile configuration
 490            replica_profile_dict = self._profiles[replica_profile_name]
 491
 492            # Check if the replica profile has its own replicas configuration
 493            if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
 494                raise ValueError(
 495                    f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
 496                    f"This creates a circular reference which is not allowed."
 497                )
 498
 499    def build_config(self) -> "StorageClientConfig":
 500        bundle = self._build_provider_bundle()
 501
 502        # Validate replicas to prevent circular references
 503        self._validate_replicas(bundle.replicas)
 504
 505        storage_provider = self._build_storage_provider(
 506            bundle.storage_provider_config.type,
 507            bundle.storage_provider_config.options,
 508            bundle.credentials_provider,
 509        )
 510
 511        cache_config: Optional[CacheConfig] = None
 512        cache_manager: Optional[CacheManager] = None
 513
 514        # Check if caching is enabled for this profile
 515        caching_enabled = self._profile_dict.get("caching_enabled", False)
 516
 517        if self._cache_dict is not None and caching_enabled:
 518            tempdir = tempfile.gettempdir()
 519            default_location = os.path.join(tempdir, "msc_cache")
 520            location = self._cache_dict.get("location", default_location)
 521
 522            # Check if cache_backend.cache_path is defined
 523            cache_backend = self._cache_dict.get("cache_backend", {})
 524            cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
 525
 526            # Warn if both location and cache_backend.cache_path are defined
 527            if cache_backend_path and self._cache_dict.get("location") is not None:
 528                logger.warning(
 529                    f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
 530                    f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
 531                )
 532            elif cache_backend_path:
 533                # Use cache_backend.cache_path only if location is not explicitly defined
 534                location = cache_backend_path
 535
 536            # Resolve the effective flag while preserving explicit ``False``.
 537            if "check_source_version" in self._cache_dict:
 538                check_source_version = self._cache_dict["check_source_version"]
 539            else:
 540                check_source_version = self._cache_dict.get("use_etag", True)
 541
 542            # Warn if both keys are specified – the new one wins.
 543            if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
 544                logger.warning(
 545                    "Both 'check_source_version' and 'use_etag' are defined in cache config. "
 546                    "Using 'check_source_version' and ignoring 'use_etag'."
 547                )
 548
 549            if not Path(location).is_absolute():
 550                raise ValueError(f"Cache location must be an absolute path: {location}")
 551
 552            # Initialize cache_dict with default values
 553            cache_dict = self._cache_dict
 554
 555            # Verify cache config
 556            self._verify_cache_config(cache_dict)
 557
 558            # Initialize eviction policy
 559            if "eviction_policy" in cache_dict:
 560                policy = cache_dict["eviction_policy"]["policy"].lower()
 561                eviction_policy = EvictionPolicyConfig(
 562                    policy=policy,
 563                    refresh_interval=cache_dict["eviction_policy"].get(
 564                        "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
 565                    ),
 566                )
 567            else:
 568                eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
 569
 570            # Create cache config from the standardized format
 571            cache_config = CacheConfig(
 572                size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
 573                location=cache_dict.get("location", location),
 574                check_source_version=check_source_version,
 575                eviction_policy=eviction_policy,
 576                cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
 577            )
 578
 579            cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
 580        elif self._cache_dict is not None and not caching_enabled:
 581            logger.debug(f"Caching is disabled for profile '{self._profile}'")
 582        elif self._cache_dict is None and caching_enabled:
 583            logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
 584
 585        # retry options
 586        retry_config_dict = self._profile_dict.get("retry", None)
 587        if retry_config_dict:
 588            attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
 589            delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
 590            backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
 591            retry_config = RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
 592        else:
 593            retry_config = RetryConfig(
 594                attempts=DEFAULT_RETRY_ATTEMPTS,
 595                delay=DEFAULT_RETRY_DELAY,
 596                backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
 597            )
 598
 599        # autocommit options
 600        autocommit_config = AutoCommitConfig()
 601        autocommit_dict = self._profile_dict.get("autocommit", None)
 602        if autocommit_dict:
 603            interval_minutes = autocommit_dict.get("interval_minutes", None)
 604            at_exit = autocommit_dict.get("at_exit", False)
 605            autocommit_config = AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
 606
 607        # set up OpenTelemetry providers once per process
 608        #
 609        # TODO: Legacy, need to remove.
 610        if self._opentelemetry_dict:
 611            setup_opentelemetry(self._opentelemetry_dict)
 612
 613        return StorageClientConfig(
 614            profile=self._profile,
 615            storage_provider=storage_provider,
 616            credentials_provider=bundle.credentials_provider,
 617            metadata_provider=bundle.metadata_provider,
 618            cache_config=cache_config,
 619            cache_manager=cache_manager,
 620            retry_config=retry_config,
 621            telemetry_provider=self._telemetry_provider,
 622            replicas=bundle.replicas,
 623            autocommit_config=autocommit_config,
 624        )
 625
 626
 627class PathMapping:
 628    """
 629    Class to handle path mappings defined in the MSC configuration.
 630
 631    Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
 632    where entries are sorted by prefix length (longest first) for optimal matching.
 633    Longer paths take precedence when matching.
 634    """
 635
 636    def __init__(self):
 637        """Initialize an empty PathMapping."""
 638        self._mapping = defaultdict(lambda: defaultdict(list))
 639
 640    @classmethod
 641    def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
 642        """
 643        Create a PathMapping instance from configuration dictionary.
 644
 645        :param config_dict: Configuration dictionary, if None the config will be loaded
 646        :return: A PathMapping instance with processed mappings
 647        """
 648        if config_dict is None:
 649            # Import locally to avoid circular imports
 650            from multistorageclient.config import StorageClientConfig
 651
 652            config_dict, _ = StorageClientConfig.read_msc_config()
 653
 654        if not config_dict:
 655            return cls()
 656
 657        instance = cls()
 658        instance._load_mapping(config_dict)
 659        return instance
 660
 661    def _load_mapping(self, config_dict: dict[str, Any]) -> None:
 662        """
 663        Load path mapping from a configuration dictionary.
 664
 665        :param config_dict: Configuration dictionary containing path mapping
 666        """
 667        # Get the path_mapping section
 668        path_mapping = config_dict.get("path_mapping", {})
 669        if path_mapping is None:
 670            return
 671
 672        # Process each mapping
 673        for source_path, dest_path in path_mapping.items():
 674            # Validate format
 675            if not source_path.endswith("/"):
 676                continue
 677            if not dest_path.startswith(MSC_PROTOCOL):
 678                continue
 679            if not dest_path.endswith("/"):
 680                continue
 681
 682            # Extract the destination profile
 683            pr_dest = urlparse(dest_path)
 684            dest_profile = pr_dest.netloc
 685
 686            # Parse the source path
 687            pr = urlparse(source_path)
 688            protocol = pr.scheme.lower() if pr.scheme else "file"
 689
 690            if protocol == "file" or source_path.startswith("/"):
 691                # For file or absolute paths, use the whole path as the prefix
 692                # and leave bucket empty
 693                bucket = ""
 694                prefix = source_path if source_path.startswith("/") else pr.path
 695            else:
 696                # For object storage, extract bucket and prefix
 697                bucket = pr.netloc
 698                prefix = pr.path
 699                if prefix.startswith("/"):
 700                    prefix = prefix[1:]
 701
 702            # Add the mapping to the nested dict
 703            self._mapping[protocol][bucket].append((prefix, dest_profile))
 704
 705        # Sort each bucket's prefixes by length (longest first) for optimal matching
 706        for protocol, buckets in self._mapping.items():
 707            for bucket, prefixes in buckets.items():
 708                self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
 709
 710    def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
 711        """
 712        Find the best matching mapping for the given URL.
 713
 714        :param url: URL to find matching mapping for
 715        :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
 716        """
 717        # Parse the URL
 718        pr = urlparse(url)
 719        protocol = pr.scheme.lower() if pr.scheme else "file"
 720
 721        # For file paths or absolute paths
 722        if protocol == "file" or url.startswith("/"):
 723            path = url if url.startswith("/") else pr.path
 724
 725            possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
 726
 727            # Check each prefix (already sorted by length, longest first)
 728            for prefix, profile in possible_mapping:
 729                if path.startswith(prefix):
 730                    # Calculate the relative path
 731                    rel_path = path[len(prefix) :]
 732                    if not rel_path.startswith("/"):
 733                        rel_path = "/" + rel_path
 734                    return profile, rel_path
 735
 736            return None
 737
 738        # For object storage
 739        bucket = pr.netloc
 740        path = pr.path
 741        if path.startswith("/"):
 742            path = path[1:]
 743
 744        # Check bucket-specific mapping
 745        possible_mapping = (
 746            self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
 747        )
 748
 749        # Check each prefix (already sorted by length, longest first)
 750        for prefix, profile in possible_mapping:
 751            # matching prefix
 752            if path.startswith(prefix):
 753                rel_path = path[len(prefix) :]
 754                # Remove leading slash if present
 755                if rel_path.startswith("/"):
 756                    rel_path = rel_path[1:]
 757
 758                return profile, rel_path
 759
 760        return None
 761
 762
[docs] 763class StorageClientConfig: 764 """ 765 Configuration class for the :py:class:`multistorageclient.StorageClient`. 766 """ 767 768 profile: str 769 storage_provider: StorageProvider 770 credentials_provider: Optional[CredentialsProvider] 771 metadata_provider: Optional[MetadataProvider] 772 cache_config: Optional[CacheConfig] 773 cache_manager: Optional[CacheManager] 774 retry_config: Optional[RetryConfig] 775 telemetry_provider: Optional[Callable[[], Telemetry]] 776 replicas: list[Replica] 777 autocommit_config: Optional[AutoCommitConfig] 778 779 _config_dict: Optional[dict[str, Any]] 780 781 def __init__( 782 self, 783 profile: str, 784 storage_provider: StorageProvider, 785 credentials_provider: Optional[CredentialsProvider] = None, 786 metadata_provider: Optional[MetadataProvider] = None, 787 cache_config: Optional[CacheConfig] = None, 788 cache_manager: Optional[CacheManager] = None, 789 retry_config: Optional[RetryConfig] = None, 790 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 791 replicas: Optional[list[Replica]] = None, 792 autocommit_config: Optional[AutoCommitConfig] = None, 793 ): 794 if replicas is None: 795 replicas = [] 796 self.profile = profile 797 self.storage_provider = storage_provider 798 self.credentials_provider = credentials_provider 799 self.metadata_provider = metadata_provider 800 self.cache_config = cache_config 801 self.cache_manager = cache_manager 802 self.retry_config = retry_config 803 self.telemetry_provider = telemetry_provider 804 self.replicas = replicas 805 self.autocommit_config = autocommit_config 806
[docs] 807 @staticmethod 808 def from_json( 809 config_json: str, 810 profile: str = DEFAULT_POSIX_PROFILE_NAME, 811 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 812 ) -> "StorageClientConfig": 813 """ 814 Load a storage client configuration from a JSON string. 815 816 :param config_json: Configuration JSON string. 817 :param profile: Profile to use. 818 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 819 """ 820 config_dict = json.loads(config_json) 821 return StorageClientConfig.from_dict( 822 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 823 )
824
[docs] 825 @staticmethod 826 def from_yaml( 827 config_yaml: str, 828 profile: str = DEFAULT_POSIX_PROFILE_NAME, 829 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 830 ) -> "StorageClientConfig": 831 """ 832 Load a storage client configuration from a YAML string. 833 834 :param config_yaml: Configuration YAML string. 835 :param profile: Profile to use. 836 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 837 """ 838 config_dict = yaml.safe_load(config_yaml) 839 return StorageClientConfig.from_dict( 840 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 841 )
842
[docs] 843 @staticmethod 844 def from_dict( 845 config_dict: dict[str, Any], 846 profile: str = DEFAULT_POSIX_PROFILE_NAME, 847 skip_validation: bool = False, 848 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 849 ) -> "StorageClientConfig": 850 """ 851 Load a storage client configuration from a Python dictionary. 852 853 :param config_dict: Configuration Python dictionary. 854 :param profile: Profile to use. 855 :param skip_validation: Skip configuration schema validation. 856 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 857 """ 858 # Validate the config file with predefined JSON schema 859 if not skip_validation: 860 validate_config(config_dict) 861 862 # Load config 863 loader = StorageClientConfigLoader( 864 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 865 ) 866 config = loader.build_config() 867 config._config_dict = config_dict 868 869 return config
870
[docs] 871 @staticmethod 872 def from_file( 873 config_file_paths: Optional[Iterable[str]] = None, 874 profile: str = DEFAULT_POSIX_PROFILE_NAME, 875 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 876 ) -> "StorageClientConfig": 877 """ 878 Load a storage client configuration from the first file found. 879 880 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`). 881 :param profile: Profile to use. 882 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 883 """ 884 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths) 885 # Parse rclone config file. 886 rclone_config_dict, rclone_config_file = read_rclone_config() 887 888 # Merge config files. 889 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict) 890 if conflicted_keys: 891 raise ValueError( 892 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}' 893 ) 894 merged_profiles = merged_config.get("profiles", {}) 895 896 # Check if profile is in merged_profiles 897 if profile in merged_profiles: 898 return StorageClientConfig.from_dict( 899 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider 900 ) 901 else: 902 # Check if profile is the default profile or an implicit profile 903 if profile == DEFAULT_POSIX_PROFILE_NAME: 904 implicit_profile_config = DEFAULT_POSIX_PROFILE 905 elif profile.startswith("_"): 906 # Handle implicit profiles 907 parts = profile[1:].split("-", 1) 908 if len(parts) == 2: 909 protocol, bucket = parts 910 # Verify it's a supported protocol 911 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 912 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"') 913 implicit_profile_config = create_implicit_profile_config( 914 profile_name=profile, protocol=protocol, base_path=bucket 915 ) 916 else: 917 raise ValueError(f'Invalid implicit profile format: "{profile}"') 918 else: 919 raise ValueError( 920 f'Profile "{profile}" not found in configuration files. Configuration was checked in ' 921 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. " 922 f"Please verify that the profile exists and that configuration files are correctly located." 923 ) 924 # merge the implicit profile config into the merged config so the cache & observability config can be inherited 925 if "profiles" not in merged_config: 926 merged_config["profiles"] = implicit_profile_config["profiles"] 927 else: 928 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile] 929 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_" 930 return StorageClientConfig.from_dict( 931 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider 932 )
933 934 @staticmethod 935 def from_provider_bundle( 936 config_dict: dict[str, Any], 937 provider_bundle: ProviderBundle, 938 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 939 ) -> "StorageClientConfig": 940 loader = StorageClientConfigLoader( 941 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider 942 ) 943 config = loader.build_config() 944 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors 945 return config 946
[docs] 947 @staticmethod 948 def read_msc_config( 949 config_file_paths: Optional[Iterable[str]] = None, 950 ) -> tuple[Optional[dict[str, Any]], Optional[str]]: 951 """Get the MSC configuration dictionary and the path of the first file found. 952 953 If no config paths are specified, configs are searched in the following order: 954 955 1. ``MSC_CONFIG`` environment variable (highest precedence) 956 2. Standard search paths (user-specified config and system-wide config) 957 958 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used. 959 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration 960 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute 961 path of the config file used, or ``None`` if no config file was found. 962 """ 963 config_dict: dict[str, Any] = {} 964 config_file_path: Optional[str] = None 965 966 config_file_paths = list(config_file_paths or []) 967 968 # Add default paths if none provided. 969 if len(config_file_paths) == 0: 970 # Environment variable. 971 msc_config_env = os.getenv("MSC_CONFIG", None) 972 if msc_config_env is not None: 973 config_file_paths.append(msc_config_env) 974 975 # Standard search paths. 976 config_file_paths.extend(_find_config_file_paths()) 977 978 # Normalize + absolutize paths. 979 config_file_paths = [os.path.abspath(path) for path in config_file_paths] 980 981 # Log plan. 982 logger.debug(f"Searching MSC config file paths: {config_file_paths}") 983 984 # Load config. 985 for path in config_file_paths: 986 if os.path.exists(path): 987 try: 988 with open(path) as f: 989 if path.endswith(".json"): 990 config_dict = json.load(f) 991 else: 992 config_dict = yaml.safe_load(f) 993 config_file_path = path 994 # Use the first config file. 995 break 996 except Exception as e: 997 raise ValueError(f"malformed MSC config file: {path}, exception: {e}") 998 999 # Log result. 1000 if config_file_path is None: 1001 logger.debug("No MSC config files found in any of the search locations.") 1002 else: 1003 logger.info(f"Using MSC config file: {config_file_path}") 1004 1005 if config_dict: 1006 validate_config(config_dict) 1007 return config_dict, config_file_path
1008
[docs] 1009 @staticmethod 1010 def read_path_mapping() -> PathMapping: 1011 """ 1012 Get the path mapping defined in the MSC configuration. 1013 1014 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)] 1015 where entries are sorted by prefix length (longest first) for optimal matching. 1016 Longer paths take precedence when matching. 1017 1018 :return: A PathMapping instance with translation mappings 1019 """ 1020 try: 1021 return PathMapping.from_config() 1022 except Exception: 1023 # Log the error but continue - this shouldn't stop the application from working 1024 logger.error("Failed to load path_mapping from MSC config") 1025 return PathMapping()
1026 1027 def __getstate__(self) -> dict[str, Any]: 1028 state = self.__dict__.copy() 1029 if not state.get("_config_dict"): 1030 raise ValueError("StorageClientConfig is not serializable") 1031 del state["credentials_provider"] 1032 del state["storage_provider"] 1033 del state["metadata_provider"] 1034 del state["cache_manager"] 1035 del state["replicas"] 1036 return state 1037 1038 def __setstate__(self, state: dict[str, Any]) -> None: 1039 # Presence checked by __getstate__. 1040 config_dict = state["_config_dict"] 1041 loader = StorageClientConfigLoader( 1042 config_dict=config_dict, 1043 profile=state["profile"], 1044 telemetry_provider=state["telemetry_provider"], 1045 ) 1046 new_config = loader.build_config() 1047 self.profile = new_config.profile 1048 self.storage_provider = new_config.storage_provider 1049 self.credentials_provider = new_config.credentials_provider 1050 self.metadata_provider = new_config.metadata_provider 1051 self.cache_config = new_config.cache_config 1052 self.cache_manager = new_config.cache_manager 1053 self.retry_config = new_config.retry_config 1054 self.telemetry_provider = new_config.telemetry_provider 1055 self._config_dict = config_dict 1056 self.replicas = new_config.replicas 1057 self.autocommit_config = new_config.autocommit_config