Source code for multistorageclient.config

   1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
   2# SPDX-License-Identifier: Apache-2.0
   3#
   4# Licensed under the Apache License, Version 2.0 (the "License");
   5# you may not use this file except in compliance with the License.
   6# You may obtain a copy of the License at
   7#
   8# http://www.apache.org/licenses/LICENSE-2.0
   9#
  10# Unless required by applicable law or agreed to in writing, software
  11# distributed under the License is distributed on an "AS IS" BASIS,
  12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13# See the License for the specific language governing permissions and
  14# limitations under the License.
  15
  16import copy
  17import json
  18import logging
  19import os
  20import tempfile
  21from collections import defaultdict
  22from collections.abc import Callable
  23from pathlib import Path
  24from typing import Any, Iterable, Optional
  25from urllib.parse import urlparse
  26
  27import yaml
  28
  29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
  30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
  31from .providers.manifest_metadata import ManifestMetadataProvider
  32from .rclone import read_rclone_config
  33from .schema import validate_config
  34from .telemetry import Telemetry
  35from .telemetry import init as telemetry_init
  36from .types import (
  37    DEFAULT_RETRY_ATTEMPTS,
  38    DEFAULT_RETRY_BACKOFF_MULTIPLIER,
  39    DEFAULT_RETRY_DELAY,
  40    MSC_PROTOCOL,
  41    AutoCommitConfig,
  42    CredentialsProvider,
  43    MetadataProvider,
  44    ProviderBundle,
  45    Replica,
  46    RetryConfig,
  47    StorageProvider,
  48    StorageProviderConfig,
  49)
  50from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
  51
  52# Constants related to implicit profiles
  53SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
  54PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
  55    "s3": "s3",
  56    "gs": "gcs",
  57    "ais": "ais",
  58    "file": "file",
  59}
  60
  61
  62# Template for creating implicit profile configurations
  63def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
  64    """
  65    Create a configuration dictionary for an implicit profile.
  66
  67    :param profile_name: The name of the profile (e.g., "_s3-bucket1")
  68    :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
  69    :param base_path: The base path (e.g., bucket name) for the storage provider
  70
  71    :return: A configuration dictionary for the implicit profile
  72    """
  73    provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
  74    return {
  75        "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
  76    }
  77
  78
  79DEFAULT_POSIX_PROFILE_NAME = "default"
  80DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
  81
  82STORAGE_PROVIDER_MAPPING = {
  83    "file": "PosixFileStorageProvider",
  84    "s3": "S3StorageProvider",
  85    "gcs": "GoogleStorageProvider",
  86    "oci": "OracleStorageProvider",
  87    "azure": "AzureBlobStorageProvider",
  88    "ais": "AIStoreStorageProvider",
  89    "ais_s3": "AIStoreS3StorageProvider",
  90    "s8k": "S8KStorageProvider",
  91    "gcs_s3": "GoogleS3StorageProvider",
  92    "huggingface": "HuggingFaceStorageProvider",
  93}
  94
  95CREDENTIALS_PROVIDER_MAPPING = {
  96    "S3Credentials": "StaticS3CredentialsProvider",
  97    "AzureCredentials": "StaticAzureCredentialsProvider",
  98    "AISCredentials": "StaticAISCredentialProvider",
  99    "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
 100    "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
 101}
 102
 103
 104def _find_config_file_paths() -> tuple[str]:
 105    """
 106    Get configuration file search paths.
 107
 108    Returns paths in order of precedence:
 109
 110    1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
 111    2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
 112    """
 113    paths = []
 114
 115    # 1. User-specific configuration directory
 116    xdg_config_home = os.getenv("XDG_CONFIG_HOME")
 117
 118    if xdg_config_home:
 119        paths.extend(
 120            [
 121                os.path.join(xdg_config_home, "msc", "config.yaml"),
 122                os.path.join(xdg_config_home, "msc", "config.json"),
 123            ]
 124        )
 125
 126    user_home = os.getenv("HOME")
 127
 128    if user_home:
 129        paths.extend(
 130            [
 131                os.path.join(user_home, ".msc_config.yaml"),
 132                os.path.join(user_home, ".msc_config.json"),
 133                os.path.join(user_home, ".config", "msc", "config.yaml"),
 134                os.path.join(user_home, ".config", "msc", "config.json"),
 135            ]
 136        )
 137
 138    # 2. System-wide configuration directories
 139    xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
 140    if not xdg_config_dirs:
 141        xdg_config_dirs = "/etc/xdg"
 142
 143    for config_dir in xdg_config_dirs.split(":"):
 144        config_dir = config_dir.strip()
 145        if config_dir:
 146            paths.extend(
 147                [
 148                    os.path.join(config_dir, "msc", "config.yaml"),
 149                    os.path.join(config_dir, "msc", "config.json"),
 150                ]
 151            )
 152
 153    paths.extend(
 154        [
 155            "/etc/msc_config.yaml",
 156            "/etc/msc_config.json",
 157        ]
 158    )
 159
 160    return tuple(paths)
 161
 162
 163PACKAGE_NAME = "multistorageclient"
 164
 165logger = logging.getLogger(__name__)
 166
 167
 168class ImmutableDict(dict):
 169    """
 170    Immutable dictionary that raises an error when attempting to modify it.
 171    """
 172
 173    def __init__(self, *args, **kwargs):
 174        super().__init__(*args, **kwargs)
 175
 176        # Recursively freeze nested structures
 177        for key, value in list(super().items()):
 178            if isinstance(value, dict) and not isinstance(value, ImmutableDict):
 179                super().__setitem__(key, ImmutableDict(value))
 180            elif isinstance(value, list):
 181                super().__setitem__(key, self._freeze_list(value))
 182
 183    @staticmethod
 184    def _freeze_list(lst):
 185        """
 186        Convert list to tuple, freezing nested dicts recursively.
 187        """
 188        frozen = []
 189        for item in lst:
 190            if isinstance(item, dict):
 191                frozen.append(ImmutableDict(item))
 192            elif isinstance(item, list):
 193                frozen.append(ImmutableDict._freeze_list(item))
 194            else:
 195                frozen.append(item)
 196        return tuple(frozen)
 197
 198    def __deepcopy__(self, memo):
 199        """
 200        Return a regular mutable dict when deepcopy is called.
 201        """
 202        return copy.deepcopy(dict(self), memo)
 203
 204    def _copy_value(self, value):
 205        """
 206        Convert frozen structures back to mutable equivalents.
 207        """
 208        if isinstance(value, ImmutableDict):
 209            return {k: self._copy_value(v) for k, v in value.items()}
 210        elif isinstance(value, tuple):
 211            # Check if it was originally a list (frozen by _freeze_list)
 212            return [self._copy_value(item) for item in value]
 213        else:
 214            return value
 215
 216    def __getitem__(self, key):
 217        """
 218        Return a mutable copy of the value.
 219        """
 220        value = super().__getitem__(key)
 221        return self._copy_value(value)
 222
 223    def get(self, key, default=None):
 224        """
 225        Return a mutable copy of the value.
 226        """
 227        return self[key] if key in self else default
 228
 229    def __setitem__(self, key, value):
 230        raise TypeError("ImmutableDict is immutable")
 231
 232    def __delitem__(self, key):
 233        raise TypeError("ImmutableDict is immutable")
 234
 235    def clear(self):
 236        raise TypeError("ImmutableDict is immutable")
 237
 238    def pop(self, *args):
 239        raise TypeError("ImmutableDict is immutable")
 240
 241    def popitem(self):
 242        raise TypeError("ImmutableDict is immutable")
 243
 244    def setdefault(self, key, default=None):
 245        raise TypeError("ImmutableDict is immutable")
 246
 247    def update(self, *args, **kwargs):
 248        raise TypeError("ImmutableDict is immutable")
 249
 250
 251class SimpleProviderBundle(ProviderBundle):
 252    def __init__(
 253        self,
 254        storage_provider_config: StorageProviderConfig,
 255        credentials_provider: Optional[CredentialsProvider] = None,
 256        metadata_provider: Optional[MetadataProvider] = None,
 257        replicas: Optional[list[Replica]] = None,
 258    ):
 259        if replicas is None:
 260            replicas = []
 261
 262        self._storage_provider_config = storage_provider_config
 263        self._credentials_provider = credentials_provider
 264        self._metadata_provider = metadata_provider
 265        self._replicas = replicas
 266
 267    @property
 268    def storage_provider_config(self) -> StorageProviderConfig:
 269        return self._storage_provider_config
 270
 271    @property
 272    def credentials_provider(self) -> Optional[CredentialsProvider]:
 273        return self._credentials_provider
 274
 275    @property
 276    def metadata_provider(self) -> Optional[MetadataProvider]:
 277        return self._metadata_provider
 278
 279    @property
 280    def replicas(self) -> list[Replica]:
 281        return self._replicas
 282
 283
 284DEFAULT_CACHE_REFRESH_INTERVAL = 300
 285
 286
 287class StorageClientConfigLoader:
 288    _provider_bundle: Optional[ProviderBundle]
 289    _resolved_config_dict: dict[str, Any]
 290    _profiles: dict[str, Any]
 291    _profile: str
 292    _profile_dict: dict[str, Any]
 293    _opentelemetry_dict: Optional[dict[str, Any]]
 294    _telemetry_provider: Optional[Callable[[], Telemetry]]
 295    _cache_dict: Optional[dict[str, Any]]
 296
 297    def __init__(
 298        self,
 299        config_dict: dict[str, Any],
 300        profile: str = DEFAULT_POSIX_PROFILE_NAME,
 301        provider_bundle: Optional[ProviderBundle] = None,
 302        telemetry_provider: Optional[Callable[[], Telemetry]] = None,
 303    ) -> None:
 304        """
 305        Initializes a :py:class:`StorageClientConfigLoader` to create a
 306        StorageClientConfig. Components are built using the ``config_dict`` and
 307        profile, but a pre-built provider_bundle takes precedence.
 308
 309        :param config_dict: Dictionary of configuration options.
 310        :param profile: Name of profile in ``config_dict`` to use to build configuration.
 311        :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle`, takes precedence over ``config_dict``.
 312        :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
 313        """
 314        # ProviderBundle takes precedence
 315        self._provider_bundle = provider_bundle
 316
 317        # Interpolates all environment variables into actual values.
 318        config_dict = expand_env_vars(config_dict)
 319        self._resolved_config_dict = ImmutableDict(config_dict)
 320
 321        self._profiles = config_dict.get("profiles", {})
 322
 323        if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
 324            # Assign the default POSIX profile
 325            self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
 326        else:
 327            # Cannot override default POSIX profile
 328            storage_provider_type = (
 329                self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
 330            )
 331            if storage_provider_type != "file":
 332                raise ValueError(
 333                    f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
 334                    f'"{storage_provider_type}"; expected "file".'
 335                )
 336
 337        profile_dict = self._profiles.get(profile)
 338
 339        if not profile_dict:
 340            raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
 341
 342        self._profile = profile
 343        self._profile_dict = ImmutableDict(profile_dict)
 344
 345        self._opentelemetry_dict = config_dict.get("opentelemetry", None)
 346        # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
 347        # New processes (e.g. multiprocessing manager server) can't be created during this phase.
 348        #
 349        # Pass thunks everywhere instead for lazy telemetry initialization.
 350        self._telemetry_provider = telemetry_provider or telemetry_init
 351
 352        self._cache_dict = config_dict.get("cache", None)
 353        self._experimental_features = config_dict.get("experimental_features", None)
 354
 355    def _build_storage_provider(
 356        self,
 357        storage_provider_name: str,
 358        storage_options: Optional[dict[str, Any]] = None,
 359        credentials_provider: Optional[CredentialsProvider] = None,
 360    ) -> StorageProvider:
 361        if storage_options is None:
 362            storage_options = {}
 363        if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
 364            raise ValueError(
 365                f"Storage provider {storage_provider_name} is not supported. "
 366                f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
 367            )
 368        if credentials_provider:
 369            storage_options["credentials_provider"] = credentials_provider
 370        if self._resolved_config_dict is not None:
 371            # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
 372            storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
 373        if self._telemetry_provider is not None:
 374            storage_options["telemetry_provider"] = self._telemetry_provider
 375        class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
 376        module_name = ".providers"
 377        cls = import_class(class_name, module_name, PACKAGE_NAME)
 378        return cls(**storage_options)
 379
 380    def _build_storage_provider_from_profile(self, storage_provider_profile: str):
 381        storage_profile_dict = self._profiles.get(storage_provider_profile)
 382        if not storage_profile_dict:
 383            raise ValueError(
 384                f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
 385            )
 386
 387        # Check if metadata provider is configured for this profile
 388        # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
 389        local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
 390        if local_metadata_provider_dict:
 391            raise ValueError(
 392                f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
 393            )
 394
 395        # Initialize CredentialsProvider
 396        local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
 397        local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
 398
 399        # Initialize StorageProvider
 400        local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
 401        if local_storage_provider_dict:
 402            local_name = local_storage_provider_dict["type"]
 403            local_storage_options = local_storage_provider_dict.get("options", {})
 404        else:
 405            raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
 406
 407        storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
 408        return storage_provider
 409
 410    def _build_credentials_provider(
 411        self,
 412        credentials_provider_dict: Optional[dict[str, Any]],
 413        storage_options: Optional[dict[str, Any]] = None,
 414    ) -> Optional[CredentialsProvider]:
 415        """
 416        Initializes the CredentialsProvider based on the provided dictionary.
 417
 418        Args:
 419            credentials_provider_dict: Dictionary containing credentials provider configuration
 420            storage_options: Storage provider options required by some credentials providers to scope the credentials.
 421        """
 422        if not credentials_provider_dict:
 423            return None
 424
 425        if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
 426            # Fully qualified class path case
 427            class_type = credentials_provider_dict["type"]
 428            module_name, class_name = class_type.rsplit(".", 1)
 429            cls = import_class(class_name, module_name)
 430        else:
 431            # Mapped class name case
 432            class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
 433            module_name = ".providers"
 434            cls = import_class(class_name, module_name, PACKAGE_NAME)
 435
 436        # Propagate storage provider options to credentials provider since they may be
 437        # required by some credentials providers to scope the credentials.
 438        import inspect
 439
 440        init_params = list(inspect.signature(cls.__init__).parameters)[1:]  # skip 'self'
 441        options = credentials_provider_dict.get("options", {})
 442        if storage_options:
 443            for storage_provider_option in storage_options.keys():
 444                if storage_provider_option in init_params and storage_provider_option not in options:
 445                    options[storage_provider_option] = storage_options[storage_provider_option]
 446
 447        return cls(**options)
 448
 449    def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
 450        # Initialize StorageProvider
 451        storage_provider_dict = profile_dict.get("storage_provider", None)
 452        if storage_provider_dict:
 453            storage_provider_name = storage_provider_dict["type"]
 454            storage_options = storage_provider_dict.get("options", {})
 455        else:
 456            raise ValueError("Missing storage_provider in the config.")
 457
 458        # Initialize CredentialsProvider
 459        # It is prudent to assume that in some cases, the credentials provider
 460        # will provide credentials scoped to specific base_path.
 461        # So we need to pass the storage_options to the credentials provider.
 462        credentials_provider_dict = profile_dict.get("credentials_provider", None)
 463        credentials_provider = self._build_credentials_provider(
 464            credentials_provider_dict=credentials_provider_dict,
 465            storage_options=storage_options,
 466        )
 467
 468        # Initialize MetadataProvider
 469        metadata_provider_dict = profile_dict.get("metadata_provider", None)
 470        metadata_provider = None
 471        if metadata_provider_dict:
 472            if metadata_provider_dict["type"] == "manifest":
 473                metadata_options = metadata_provider_dict.get("options", {})
 474                # If MetadataProvider has a reference to a different storage provider profile
 475                storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
 476                if storage_provider_profile:
 477                    storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
 478                else:
 479                    storage_provider = self._build_storage_provider(
 480                        storage_provider_name, storage_options, credentials_provider
 481                    )
 482
 483                metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
 484            else:
 485                class_type = metadata_provider_dict["type"]
 486                if "." not in class_type:
 487                    raise ValueError(
 488                        f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
 489                    )
 490                module_name, class_name = class_type.rsplit(".", 1)
 491                cls = import_class(class_name, module_name)
 492                options = metadata_provider_dict.get("options", {})
 493                metadata_provider = cls(**options)
 494
 495        # Build replicas if configured
 496        replicas_config = profile_dict.get("replicas", [])
 497        replicas = []
 498        if replicas_config:
 499            for replica_dict in replicas_config:
 500                replicas.append(
 501                    Replica(
 502                        replica_profile=replica_dict["replica_profile"],
 503                        read_priority=replica_dict["read_priority"],
 504                    )
 505                )
 506
 507            # Sort replicas by read_priority
 508            replicas.sort(key=lambda r: r.read_priority)
 509
 510        return SimpleProviderBundle(
 511            storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
 512            credentials_provider=credentials_provider,
 513            metadata_provider=metadata_provider,
 514            replicas=replicas,
 515        )
 516
 517    def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
 518        class_type = provider_bundle_dict["type"]
 519        module_name, class_name = class_type.rsplit(".", 1)
 520        cls = import_class(class_name, module_name)
 521        options = provider_bundle_dict.get("options", {})
 522        return cls(**options)
 523
 524    def _build_provider_bundle(self) -> ProviderBundle:
 525        if self._provider_bundle:
 526            return self._provider_bundle  # Return if previously provided.
 527
 528        # Load 3rd party extension
 529        provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
 530        if provider_bundle_dict:
 531            return self._build_provider_bundle_from_extension(provider_bundle_dict)
 532
 533        return self._build_provider_bundle_from_config(self._profile_dict)
 534
 535    def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
 536        if "size_mb" in cache_dict:
 537            raise ValueError(
 538                "The 'size_mb' property is no longer supported. \n"
 539                "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
 540                "Example configuration:\n"
 541                "cache:\n"
 542                "  size: 500G               # Optional: If not specified, default cache size (10GB) will be used\n"
 543                "  use_etag: true           # Optional: If not specified, default cache use_etag (true) will be used\n"
 544                "  location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
 545                "  eviction_policy:         # Optional: The eviction policy to use\n"
 546                "    policy: fifo           # Optional: The eviction policy to use, default is 'fifo'\n"
 547                "    refresh_interval: 300  # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
 548            )
 549
 550    def _validate_replicas(self, replicas: list[Replica]) -> None:
 551        """
 552        Validates that replica profiles do not have their own replicas configuration.
 553
 554        This prevents circular references where a replica profile could reference
 555        another profile that also has replicas, creating an infinite loop.
 556
 557        :param replicas: The list of Replica objects to validate
 558        :raises ValueError: If any replica profile has its own replicas configuration
 559        """
 560        for replica in replicas:
 561            replica_profile_name = replica.replica_profile
 562
 563            # Check that replica profile is not the same as the current profile
 564            if replica_profile_name == self._profile:
 565                raise ValueError(
 566                    f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
 567                )
 568
 569            # Check if the replica profile exists in the configuration
 570            if replica_profile_name not in self._profiles:
 571                raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
 572
 573            # Get the replica profile configuration
 574            replica_profile_dict = self._profiles[replica_profile_name]
 575
 576            # Check if the replica profile has its own replicas configuration
 577            if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
 578                raise ValueError(
 579                    f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
 580                    f"This creates a circular reference which is not allowed."
 581                )
 582
 583    # todo: remove once experimental features are stable
 584    def _validate_experimental_features(self, eviction_policy: EvictionPolicyConfig) -> None:
 585        """
 586        Validate that experimental features are enabled when used.
 587
 588        :param eviction_policy: The eviction policy configuration to validate
 589        :raises ValueError: If experimental features are used without being enabled
 590        """
 591        # Validate MRU eviction policy
 592        if eviction_policy.policy.upper() == "MRU":
 593            if not (self._experimental_features and self._experimental_features.get("cache_mru_eviction")):
 594                raise ValueError(
 595                    "MRU eviction policy is experimental and not enabled.\n"
 596                    "Enable it by adding to config:\n"
 597                    "  experimental_features:\n"
 598                    "    cache_mru_eviction: true"
 599                )
 600
 601        # Validate purge_factor
 602        if eviction_policy.purge_factor != 0:
 603            if eviction_policy.purge_factor < 0 or eviction_policy.purge_factor > 100:
 604                raise ValueError("purge_factor must be between 0 and 100")
 605
 606            if not (self._experimental_features and self._experimental_features.get("cache_purge_factor")):
 607                raise ValueError(
 608                    "purge_factor is experimental and not enabled.\n"
 609                    "Enable it by adding to config:\n"
 610                    "  experimental_features:\n"
 611                    "    cache_purge_factor: true"
 612                )
 613
 614    def build_config(self) -> "StorageClientConfig":
 615        bundle = self._build_provider_bundle()
 616
 617        # Validate replicas to prevent circular references
 618        self._validate_replicas(bundle.replicas)
 619
 620        storage_provider = self._build_storage_provider(
 621            bundle.storage_provider_config.type,
 622            bundle.storage_provider_config.options,
 623            bundle.credentials_provider,
 624        )
 625
 626        cache_config: Optional[CacheConfig] = None
 627        cache_manager: Optional[CacheManager] = None
 628
 629        # Check if caching is enabled for this profile
 630        caching_enabled = self._profile_dict.get("caching_enabled", False)
 631
 632        if self._cache_dict is not None and caching_enabled:
 633            tempdir = tempfile.gettempdir()
 634            default_location = os.path.join(tempdir, "msc_cache")
 635            location = self._cache_dict.get("location", default_location)
 636
 637            # Check if cache_backend.cache_path is defined
 638            cache_backend = self._cache_dict.get("cache_backend", {})
 639            cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
 640
 641            # Warn if both location and cache_backend.cache_path are defined
 642            if cache_backend_path and self._cache_dict.get("location") is not None:
 643                logger.warning(
 644                    f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
 645                    f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
 646                )
 647            elif cache_backend_path:
 648                # Use cache_backend.cache_path only if location is not explicitly defined
 649                location = cache_backend_path
 650
 651            # Resolve the effective flag while preserving explicit ``False``.
 652            if "check_source_version" in self._cache_dict:
 653                check_source_version = self._cache_dict["check_source_version"]
 654            else:
 655                check_source_version = self._cache_dict.get("use_etag", True)
 656
 657            # Warn if both keys are specified – the new one wins.
 658            if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
 659                logger.warning(
 660                    "Both 'check_source_version' and 'use_etag' are defined in cache config. "
 661                    "Using 'check_source_version' and ignoring 'use_etag'."
 662                )
 663
 664            if not Path(location).is_absolute():
 665                raise ValueError(f"Cache location must be an absolute path: {location}")
 666
 667            # Initialize cache_dict with default values
 668            cache_dict = self._cache_dict
 669
 670            # Verify cache config
 671            self._verify_cache_config(cache_dict)
 672
 673            # Initialize eviction policy
 674            if "eviction_policy" in cache_dict:
 675                policy = cache_dict["eviction_policy"]["policy"].lower()
 676                purge_factor = cache_dict["eviction_policy"].get("purge_factor", 0)
 677                eviction_policy = EvictionPolicyConfig(
 678                    policy=policy,
 679                    refresh_interval=cache_dict["eviction_policy"].get(
 680                        "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
 681                    ),
 682                    purge_factor=purge_factor,
 683                )
 684            else:
 685                eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
 686
 687            # todo: remove once experimental features are stable
 688            # Validate experimental features before creating cache_config
 689            self._validate_experimental_features(eviction_policy)
 690
 691            # Create cache config from the standardized format
 692            cache_config = CacheConfig(
 693                size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
 694                location=cache_dict.get("location", location),
 695                check_source_version=check_source_version,
 696                eviction_policy=eviction_policy,
 697                cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
 698            )
 699
 700            cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
 701        elif self._cache_dict is not None and not caching_enabled:
 702            logger.debug(f"Caching is disabled for profile '{self._profile}'")
 703        elif self._cache_dict is None and caching_enabled:
 704            logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
 705
 706        # retry options
 707        retry_config_dict = self._profile_dict.get("retry", None)
 708        if retry_config_dict:
 709            attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
 710            delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
 711            backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
 712            retry_config = RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
 713        else:
 714            retry_config = RetryConfig(
 715                attempts=DEFAULT_RETRY_ATTEMPTS,
 716                delay=DEFAULT_RETRY_DELAY,
 717                backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
 718            )
 719
 720        # autocommit options
 721        autocommit_config = AutoCommitConfig()
 722        autocommit_dict = self._profile_dict.get("autocommit", None)
 723        if autocommit_dict:
 724            interval_minutes = autocommit_dict.get("interval_minutes", None)
 725            at_exit = autocommit_dict.get("at_exit", False)
 726            autocommit_config = AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
 727
 728        return StorageClientConfig(
 729            profile=self._profile,
 730            storage_provider=storage_provider,
 731            credentials_provider=bundle.credentials_provider,
 732            metadata_provider=bundle.metadata_provider,
 733            cache_config=cache_config,
 734            cache_manager=cache_manager,
 735            retry_config=retry_config,
 736            telemetry_provider=self._telemetry_provider,
 737            replicas=bundle.replicas,
 738            autocommit_config=autocommit_config,
 739        )
 740
 741
 742class PathMapping:
 743    """
 744    Class to handle path mappings defined in the MSC configuration.
 745
 746    Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
 747    where entries are sorted by prefix length (longest first) for optimal matching.
 748    Longer paths take precedence when matching.
 749    """
 750
 751    def __init__(self):
 752        """Initialize an empty PathMapping."""
 753        self._mapping = defaultdict(lambda: defaultdict(list))
 754
 755    @classmethod
 756    def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
 757        """
 758        Create a PathMapping instance from configuration dictionary.
 759
 760        :param config_dict: Configuration dictionary, if None the config will be loaded
 761        :return: A PathMapping instance with processed mappings
 762        """
 763        if config_dict is None:
 764            # Import locally to avoid circular imports
 765            from multistorageclient.config import StorageClientConfig
 766
 767            config_dict, _ = StorageClientConfig.read_msc_config()
 768
 769        if not config_dict:
 770            return cls()
 771
 772        instance = cls()
 773        instance._load_mapping(config_dict)
 774        return instance
 775
 776    def _load_mapping(self, config_dict: dict[str, Any]) -> None:
 777        """
 778        Load path mapping from a configuration dictionary.
 779
 780        :param config_dict: Configuration dictionary containing path mapping
 781        """
 782        # Get the path_mapping section
 783        path_mapping = config_dict.get("path_mapping", {})
 784        if path_mapping is None:
 785            return
 786
 787        # Process each mapping
 788        for source_path, dest_path in path_mapping.items():
 789            # Validate format
 790            if not source_path.endswith("/"):
 791                continue
 792            if not dest_path.startswith(MSC_PROTOCOL):
 793                continue
 794            if not dest_path.endswith("/"):
 795                continue
 796
 797            # Extract the destination profile
 798            pr_dest = urlparse(dest_path)
 799            dest_profile = pr_dest.netloc
 800
 801            # Parse the source path
 802            pr = urlparse(source_path)
 803            protocol = pr.scheme.lower() if pr.scheme else "file"
 804
 805            if protocol == "file" or source_path.startswith("/"):
 806                # For file or absolute paths, use the whole path as the prefix
 807                # and leave bucket empty
 808                bucket = ""
 809                prefix = source_path if source_path.startswith("/") else pr.path
 810            else:
 811                # For object storage, extract bucket and prefix
 812                bucket = pr.netloc
 813                prefix = pr.path
 814                if prefix.startswith("/"):
 815                    prefix = prefix[1:]
 816
 817            # Add the mapping to the nested dict
 818            self._mapping[protocol][bucket].append((prefix, dest_profile))
 819
 820        # Sort each bucket's prefixes by length (longest first) for optimal matching
 821        for protocol, buckets in self._mapping.items():
 822            for bucket, prefixes in buckets.items():
 823                self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
 824
 825    def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
 826        """
 827        Find the best matching mapping for the given URL.
 828
 829        :param url: URL to find matching mapping for
 830        :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
 831        """
 832        # Parse the URL
 833        pr = urlparse(url)
 834        protocol = pr.scheme.lower() if pr.scheme else "file"
 835
 836        # For file paths or absolute paths
 837        if protocol == "file" or url.startswith("/"):
 838            path = url if url.startswith("/") else pr.path
 839
 840            possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
 841
 842            # Check each prefix (already sorted by length, longest first)
 843            for prefix, profile in possible_mapping:
 844                if path.startswith(prefix):
 845                    # Calculate the relative path
 846                    rel_path = path[len(prefix) :]
 847                    if not rel_path.startswith("/"):
 848                        rel_path = "/" + rel_path
 849                    return profile, rel_path
 850
 851            return None
 852
 853        # For object storage
 854        bucket = pr.netloc
 855        path = pr.path
 856        if path.startswith("/"):
 857            path = path[1:]
 858
 859        # Check bucket-specific mapping
 860        possible_mapping = (
 861            self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
 862        )
 863
 864        # Check each prefix (already sorted by length, longest first)
 865        for prefix, profile in possible_mapping:
 866            # matching prefix
 867            if path.startswith(prefix):
 868                rel_path = path[len(prefix) :]
 869                # Remove leading slash if present
 870                if rel_path.startswith("/"):
 871                    rel_path = rel_path[1:]
 872
 873                return profile, rel_path
 874
 875        return None
 876
 877
[docs] 878class StorageClientConfig: 879 """ 880 Configuration class for the :py:class:`multistorageclient.StorageClient`. 881 """ 882 883 profile: str 884 storage_provider: StorageProvider 885 credentials_provider: Optional[CredentialsProvider] 886 metadata_provider: Optional[MetadataProvider] 887 cache_config: Optional[CacheConfig] 888 cache_manager: Optional[CacheManager] 889 retry_config: Optional[RetryConfig] 890 telemetry_provider: Optional[Callable[[], Telemetry]] 891 replicas: list[Replica] 892 autocommit_config: Optional[AutoCommitConfig] 893 894 _config_dict: Optional[dict[str, Any]] 895 896 def __init__( 897 self, 898 profile: str, 899 storage_provider: StorageProvider, 900 credentials_provider: Optional[CredentialsProvider] = None, 901 metadata_provider: Optional[MetadataProvider] = None, 902 cache_config: Optional[CacheConfig] = None, 903 cache_manager: Optional[CacheManager] = None, 904 retry_config: Optional[RetryConfig] = None, 905 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 906 replicas: Optional[list[Replica]] = None, 907 autocommit_config: Optional[AutoCommitConfig] = None, 908 ): 909 if replicas is None: 910 replicas = [] 911 self.profile = profile 912 self.storage_provider = storage_provider 913 self.credentials_provider = credentials_provider 914 self.metadata_provider = metadata_provider 915 self.cache_config = cache_config 916 self.cache_manager = cache_manager 917 self.retry_config = retry_config 918 self.telemetry_provider = telemetry_provider 919 self.replicas = replicas 920 self.autocommit_config = autocommit_config 921
[docs] 922 @staticmethod 923 def from_json( 924 config_json: str, 925 profile: str = DEFAULT_POSIX_PROFILE_NAME, 926 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 927 ) -> "StorageClientConfig": 928 """ 929 Load a storage client configuration from a JSON string. 930 931 :param config_json: Configuration JSON string. 932 :param profile: Profile to use. 933 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 934 """ 935 config_dict = json.loads(config_json) 936 return StorageClientConfig.from_dict( 937 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 938 )
939
[docs] 940 @staticmethod 941 def from_yaml( 942 config_yaml: str, 943 profile: str = DEFAULT_POSIX_PROFILE_NAME, 944 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 945 ) -> "StorageClientConfig": 946 """ 947 Load a storage client configuration from a YAML string. 948 949 :param config_yaml: Configuration YAML string. 950 :param profile: Profile to use. 951 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 952 """ 953 config_dict = yaml.safe_load(config_yaml) 954 return StorageClientConfig.from_dict( 955 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 956 )
957
[docs] 958 @staticmethod 959 def from_dict( 960 config_dict: dict[str, Any], 961 profile: str = DEFAULT_POSIX_PROFILE_NAME, 962 skip_validation: bool = False, 963 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 964 ) -> "StorageClientConfig": 965 """ 966 Load a storage client configuration from a Python dictionary. 967 968 :param config_dict: Configuration Python dictionary. 969 :param profile: Profile to use. 970 :param skip_validation: Skip configuration schema validation. 971 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 972 """ 973 # Validate the config file with predefined JSON schema 974 if not skip_validation: 975 validate_config(config_dict) 976 977 # Load config 978 loader = StorageClientConfigLoader( 979 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider 980 ) 981 config = loader.build_config() 982 config._config_dict = config_dict 983 984 return config
985
[docs] 986 @staticmethod 987 def from_file( 988 config_file_paths: Optional[Iterable[str]] = None, 989 profile: str = DEFAULT_POSIX_PROFILE_NAME, 990 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 991 ) -> "StorageClientConfig": 992 """ 993 Load a storage client configuration from the first file found. 994 995 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`). 996 :param profile: Profile to use. 997 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling. 998 """ 999 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths) 1000 # Parse rclone config file. 1001 rclone_config_dict, rclone_config_file = read_rclone_config() 1002 1003 # Merge config files. 1004 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict) 1005 if conflicted_keys: 1006 raise ValueError( 1007 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}' 1008 ) 1009 merged_profiles = merged_config.get("profiles", {}) 1010 1011 # Check if profile is in merged_profiles 1012 if profile in merged_profiles: 1013 return StorageClientConfig.from_dict( 1014 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider 1015 ) 1016 else: 1017 # Check if profile is the default profile or an implicit profile 1018 if profile == DEFAULT_POSIX_PROFILE_NAME: 1019 implicit_profile_config = DEFAULT_POSIX_PROFILE 1020 elif profile.startswith("_"): 1021 # Handle implicit profiles 1022 parts = profile[1:].split("-", 1) 1023 if len(parts) == 2: 1024 protocol, bucket = parts 1025 # Verify it's a supported protocol 1026 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 1027 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"') 1028 implicit_profile_config = create_implicit_profile_config( 1029 profile_name=profile, protocol=protocol, base_path=bucket 1030 ) 1031 else: 1032 raise ValueError(f'Invalid implicit profile format: "{profile}"') 1033 else: 1034 raise ValueError( 1035 f'Profile "{profile}" not found in configuration files. Configuration was checked in ' 1036 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. " 1037 f"Please verify that the profile exists and that configuration files are correctly located." 1038 ) 1039 # merge the implicit profile config into the merged config so the cache & observability config can be inherited 1040 if "profiles" not in merged_config: 1041 merged_config["profiles"] = implicit_profile_config["profiles"] 1042 else: 1043 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile] 1044 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_" 1045 return StorageClientConfig.from_dict( 1046 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider 1047 )
1048 1049 @staticmethod 1050 def from_provider_bundle( 1051 config_dict: dict[str, Any], 1052 provider_bundle: ProviderBundle, 1053 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 1054 ) -> "StorageClientConfig": 1055 loader = StorageClientConfigLoader( 1056 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider 1057 ) 1058 config = loader.build_config() 1059 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors 1060 return config 1061
[docs] 1062 @staticmethod 1063 def read_msc_config( 1064 config_file_paths: Optional[Iterable[str]] = None, 1065 ) -> tuple[Optional[dict[str, Any]], Optional[str]]: 1066 """Get the MSC configuration dictionary and the path of the first file found. 1067 1068 If no config paths are specified, configs are searched in the following order: 1069 1070 1. ``MSC_CONFIG`` environment variable (highest precedence) 1071 2. Standard search paths (user-specified config and system-wide config) 1072 1073 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used. 1074 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration 1075 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute 1076 path of the config file used, or ``None`` if no config file was found. 1077 """ 1078 config_dict: dict[str, Any] = {} 1079 config_file_path: Optional[str] = None 1080 1081 config_file_paths = list(config_file_paths or []) 1082 1083 # Add default paths if none provided. 1084 if len(config_file_paths) == 0: 1085 # Environment variable. 1086 msc_config_env = os.getenv("MSC_CONFIG", None) 1087 if msc_config_env is not None: 1088 config_file_paths.append(msc_config_env) 1089 1090 # Standard search paths. 1091 config_file_paths.extend(_find_config_file_paths()) 1092 1093 # Normalize + absolutize paths. 1094 config_file_paths = [os.path.abspath(path) for path in config_file_paths] 1095 1096 # Log plan. 1097 logger.debug(f"Searching MSC config file paths: {config_file_paths}") 1098 1099 # Load config. 1100 for path in config_file_paths: 1101 if os.path.exists(path): 1102 try: 1103 with open(path) as f: 1104 if path.endswith(".json"): 1105 config_dict = json.load(f) 1106 else: 1107 config_dict = yaml.safe_load(f) 1108 config_file_path = path 1109 # Use the first config file. 1110 break 1111 except Exception as e: 1112 raise ValueError(f"malformed MSC config file: {path}, exception: {e}") 1113 1114 # Log result. 1115 if config_file_path is None: 1116 logger.debug("No MSC config files found in any of the search locations.") 1117 else: 1118 logger.debug(f"Using MSC config file: {config_file_path}") 1119 1120 if config_dict: 1121 validate_config(config_dict) 1122 return config_dict, config_file_path
1123
[docs] 1124 @staticmethod 1125 def read_path_mapping() -> PathMapping: 1126 """ 1127 Get the path mapping defined in the MSC configuration. 1128 1129 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)] 1130 where entries are sorted by prefix length (longest first) for optimal matching. 1131 Longer paths take precedence when matching. 1132 1133 :return: A PathMapping instance with translation mappings 1134 """ 1135 try: 1136 return PathMapping.from_config() 1137 except Exception: 1138 # Log the error but continue - this shouldn't stop the application from working 1139 logger.error("Failed to load path_mapping from MSC config") 1140 return PathMapping()
1141 1142 def __getstate__(self) -> dict[str, Any]: 1143 state = self.__dict__.copy() 1144 if not state.get("_config_dict"): 1145 raise ValueError("StorageClientConfig is not serializable") 1146 del state["credentials_provider"] 1147 del state["storage_provider"] 1148 del state["metadata_provider"] 1149 del state["cache_manager"] 1150 del state["replicas"] 1151 return state 1152 1153 def __setstate__(self, state: dict[str, Any]) -> None: 1154 # Presence checked by __getstate__. 1155 config_dict = state["_config_dict"] 1156 loader = StorageClientConfigLoader( 1157 config_dict=config_dict, 1158 profile=state["profile"], 1159 telemetry_provider=state["telemetry_provider"], 1160 ) 1161 new_config = loader.build_config() 1162 self.profile = new_config.profile 1163 self.storage_provider = new_config.storage_provider 1164 self.credentials_provider = new_config.credentials_provider 1165 self.metadata_provider = new_config.metadata_provider 1166 self.cache_config = new_config.cache_config 1167 self.cache_manager = new_config.cache_manager 1168 self.retry_config = new_config.retry_config 1169 self.telemetry_provider = new_config.telemetry_provider 1170 self._config_dict = config_dict 1171 self.replicas = new_config.replicas 1172 self.autocommit_config = new_config.autocommit_config