Source code for multistorageclient.config

   1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
   2# SPDX-License-Identifier: Apache-2.0
   3#
   4# Licensed under the Apache License, Version 2.0 (the "License");
   5# you may not use this file except in compliance with the License.
   6# You may obtain a copy of the License at
   7#
   8# http://www.apache.org/licenses/LICENSE-2.0
   9#
  10# Unless required by applicable law or agreed to in writing, software
  11# distributed under the License is distributed on an "AS IS" BASIS,
  12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13# See the License for the specific language governing permissions and
  14# limitations under the License.
  15
  16import json
  17import logging
  18import os
  19import tempfile
  20from collections import defaultdict
  21from collections.abc import Sequence
  22from pathlib import Path
  23from typing import Any, Optional
  24from urllib.parse import urlparse
  25
  26import opentelemetry.metrics as api_metrics
  27import yaml
  28
  29from .cache import DEFAULT_CACHE_SIZE, CacheManager
  30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
  31from .instrumentation import setup_opentelemetry
  32from .providers.manifest_metadata import ManifestMetadataProvider
  33from .rclone import read_rclone_config
  34from .schema import validate_config
  35from .telemetry import Telemetry
  36from .telemetry.attributes.base import AttributesProvider
  37from .types import (
  38    DEFAULT_RETRY_ATTEMPTS,
  39    DEFAULT_RETRY_DELAY,
  40    MSC_PROTOCOL,
  41    AutoCommitConfig,
  42    CredentialsProvider,
  43    MetadataProvider,
  44    ProviderBundle,
  45    Replica,
  46    RetryConfig,
  47    StorageProvider,
  48    StorageProviderConfig,
  49)
  50from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
  51
  52# Constants related to implicit profiles
  53SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
  54PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
  55    "s3": "s3",
  56    "gs": "gcs",
  57    "ais": "ais",
  58    "file": "file",
  59}
  60
  61_TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING = {
  62    "environment_variables": "multistorageclient.telemetry.attributes.environment_variables.EnvironmentVariablesAttributesProvider",
  63    "host": "multistorageclient.telemetry.attributes.host.HostAttributesProvider",
  64    "msc_config": "multistorageclient.telemetry.attributes.msc_config.MSCConfigAttributesProvider",
  65    "process": "multistorageclient.telemetry.attributes.process.ProcessAttributesProvider",
  66    "static": "multistorageclient.telemetry.attributes.static.StaticAttributesProvider",
  67    "thread": "multistorageclient.telemetry.attributes.thread.ThreadAttributesProvider",
  68}
  69
  70
  71# Template for creating implicit profile configurations
  72def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
  73    """
  74    Create a configuration dictionary for an implicit profile.
  75
  76    :param profile_name: The name of the profile (e.g., "_s3-bucket1")
  77    :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
  78    :param base_path: The base path (e.g., bucket name) for the storage provider
  79
  80    :return: A configuration dictionary for the implicit profile
  81    """
  82    provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
  83    return {
  84        "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
  85    }
  86
  87
  88DEFAULT_POSIX_PROFILE_NAME = "default"
  89DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
  90
  91STORAGE_PROVIDER_MAPPING = {
  92    "file": "PosixFileStorageProvider",
  93    "s3": "S3StorageProvider",
  94    "gcs": "GoogleStorageProvider",
  95    "oci": "OracleStorageProvider",
  96    "azure": "AzureBlobStorageProvider",
  97    "ais": "AIStoreStorageProvider",
  98    "s8k": "S8KStorageProvider",
  99    "gcs_s3": "GoogleS3StorageProvider",
 100}
 101
 102CREDENTIALS_PROVIDER_MAPPING = {
 103    "S3Credentials": "StaticS3CredentialsProvider",
 104    "AzureCredentials": "StaticAzureCredentialsProvider",
 105    "AISCredentials": "StaticAISCredentialProvider",
 106    "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
 107}
 108
 109
 110def _find_config_file_paths():
 111    """
 112    Get configuration file search paths.
 113
 114    Returns paths in order of precedence:
 115
 116    1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
 117    2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
 118    """
 119    paths = []
 120
 121    # 1. User-specific configuration directory
 122    xdg_config_home = os.getenv("XDG_CONFIG_HOME")
 123
 124    if xdg_config_home:
 125        paths.extend(
 126            [
 127                os.path.join(xdg_config_home, "msc", "config.yaml"),
 128                os.path.join(xdg_config_home, "msc", "config.json"),
 129            ]
 130        )
 131
 132    user_home = os.getenv("HOME")
 133
 134    if user_home:
 135        paths.extend(
 136            [
 137                os.path.join(user_home, ".msc_config.yaml"),
 138                os.path.join(user_home, ".msc_config.json"),
 139                os.path.join(user_home, ".config", "msc", "config.yaml"),
 140                os.path.join(user_home, ".config", "msc", "config.json"),
 141            ]
 142        )
 143
 144    # 2. System-wide configuration directories
 145    xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
 146    if not xdg_config_dirs:
 147        xdg_config_dirs = "/etc/xdg"
 148
 149    for config_dir in xdg_config_dirs.split(":"):
 150        config_dir = config_dir.strip()
 151        if config_dir:
 152            paths.extend(
 153                [
 154                    os.path.join(config_dir, "msc", "config.yaml"),
 155                    os.path.join(config_dir, "msc", "config.json"),
 156                ]
 157            )
 158
 159    paths.extend(
 160        [
 161            "/etc/msc_config.yaml",
 162            "/etc/msc_config.json",
 163        ]
 164    )
 165
 166    return tuple(paths)
 167
 168
 169DEFAULT_MSC_CONFIG_FILE_SEARCH_PATHS = _find_config_file_paths()
 170
 171PACKAGE_NAME = "multistorageclient"
 172
 173logger = logging.getLogger(__name__)
 174
 175
 176class SimpleProviderBundle(ProviderBundle):
 177    def __init__(
 178        self,
 179        storage_provider_config: StorageProviderConfig,
 180        credentials_provider: Optional[CredentialsProvider] = None,
 181        metadata_provider: Optional[MetadataProvider] = None,
 182        replicas: Optional[list[Replica]] = None,
 183    ):
 184        if replicas is None:
 185            replicas = []
 186
 187        self._storage_provider_config = storage_provider_config
 188        self._credentials_provider = credentials_provider
 189        self._metadata_provider = metadata_provider
 190        self._replicas = replicas
 191
 192    @property
 193    def storage_provider_config(self) -> StorageProviderConfig:
 194        return self._storage_provider_config
 195
 196    @property
 197    def credentials_provider(self) -> Optional[CredentialsProvider]:
 198        return self._credentials_provider
 199
 200    @property
 201    def metadata_provider(self) -> Optional[MetadataProvider]:
 202        return self._metadata_provider
 203
 204    @property
 205    def replicas(self) -> list[Replica]:
 206        return self._replicas
 207
 208
 209DEFAULT_CACHE_REFRESH_INTERVAL = 300
 210
 211
 212class StorageClientConfigLoader:
 213    _provider_bundle: Optional[ProviderBundle]
 214    _profiles: dict[str, Any]
 215    _profile: str
 216    _profile_dict: dict[str, Any]
 217    _opentelemetry_dict: Optional[dict[str, Any]]
 218    _metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]]
 219    _metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]]
 220    _metric_attributes_providers: Optional[Sequence[AttributesProvider]]
 221    _cache_dict: Optional[dict[str, Any]]
 222
 223    def __init__(
 224        self,
 225        config_dict: dict[str, Any],
 226        profile: str = DEFAULT_POSIX_PROFILE_NAME,
 227        provider_bundle: Optional[ProviderBundle] = None,
 228        telemetry: Optional[Telemetry] = None,
 229        metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] = None,
 230        metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] = None,
 231        metric_attributes_providers: Optional[Sequence[AttributesProvider]] = None,
 232    ) -> None:
 233        """
 234        Initializes a :py:class:`StorageClientConfigLoader` to create a
 235        StorageClientConfig. Components are built using the ``config_dict`` and
 236        profile, but a pre-built provider_bundle takes precedence.
 237
 238        :param config_dict: Dictionary of configuration options.
 239        :param profile: Name of profile in ``config_dict`` to use to build configuration.
 240        :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle`, takes precedence over ``config_dict``.
 241        :param telemetry: Optional telemetry instance to use, takes precedence over ``metric_gauges``, ``metric_counters``, and ``metric_attributes_providers``.
 242        :param metric_gauges: Optional metric gauges to use.
 243        :param metric_counters: Optional metric counters to use.
 244        :param metric_attributes_providers: Optional metric attributes providers to use.
 245        """
 246        # ProviderBundle takes precedence
 247        self._provider_bundle = provider_bundle
 248
 249        # Interpolates all environment variables into actual values.
 250        config_dict = expand_env_vars(config_dict)
 251
 252        self._profiles = config_dict.get("profiles", {})
 253
 254        if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
 255            # Assign the default POSIX profile
 256            self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
 257        else:
 258            # Cannot override default POSIX profile
 259            storage_provider_type = (
 260                self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
 261            )
 262            if storage_provider_type != "file":
 263                raise ValueError(
 264                    f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
 265                    f'"{storage_provider_type}"; expected "file".'
 266                )
 267
 268        profile_dict = self._profiles.get(profile)
 269
 270        if not profile_dict:
 271            raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
 272
 273        self._profile = profile
 274        self._profile_dict = profile_dict
 275
 276        self._opentelemetry_dict = config_dict.get("opentelemetry", None)
 277
 278        self._metric_gauges = metric_gauges
 279        self._metric_counters = metric_counters
 280        self._metric_attributes_providers = metric_attributes_providers
 281        if self._opentelemetry_dict is not None:
 282            if "metrics" in self._opentelemetry_dict:
 283                if telemetry is not None:
 284                    self._metric_gauges = {}
 285                    for name in Telemetry.GaugeName:
 286                        gauge = telemetry.gauge(config=self._opentelemetry_dict["metrics"], name=name)
 287                        if gauge is not None:
 288                            self._metric_gauges[name] = gauge
 289                    self._metric_counters = {}
 290                    for name in Telemetry.CounterName:
 291                        counter = telemetry.counter(config=self._opentelemetry_dict["metrics"], name=name)
 292                        if counter is not None:
 293                            self._metric_counters[name] = counter
 294
 295                    if "attributes" in self._opentelemetry_dict["metrics"]:
 296                        attributes_providers: list[AttributesProvider] = []
 297                        attributes_provider_configs: list[dict[str, Any]] = self._opentelemetry_dict["metrics"][
 298                            "attributes"
 299                        ]
 300                        for config in attributes_provider_configs:
 301                            attributes_provider_type: str = config["type"]
 302                            attributes_provider_fully_qualified_name = _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING.get(
 303                                attributes_provider_type, attributes_provider_type
 304                            )
 305                            attributes_provider_module_name, attributes_provider_class_name = (
 306                                attributes_provider_fully_qualified_name.rsplit(".", 1)
 307                            )
 308                            cls = import_class(attributes_provider_class_name, attributes_provider_module_name)
 309                            attributes_provider_options = config.get("options", {})
 310                            if (
 311                                attributes_provider_fully_qualified_name
 312                                == _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING["msc_config"]
 313                            ):
 314                                attributes_provider_options["config_dict"] = config_dict
 315                            attributes_provider: AttributesProvider = cls(**attributes_provider_options)
 316                            attributes_providers.append(attributes_provider)
 317                        self._metric_attributes_providers = tuple(attributes_providers)
 318                elif not any([metric_gauges, metric_counters, metric_attributes_providers]):
 319                    # TODO: Remove "beta" from the log once legacy metrics are removed.
 320                    logger.error("No telemetry instance! Disabling beta metrics.")
 321
 322        self._cache_dict = config_dict.get("cache", None)
 323
 324    def _build_storage_provider(
 325        self,
 326        storage_provider_name: str,
 327        storage_options: Optional[dict[str, Any]] = None,
 328        credentials_provider: Optional[CredentialsProvider] = None,
 329    ) -> StorageProvider:
 330        if storage_options is None:
 331            storage_options = {}
 332        if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
 333            raise ValueError(
 334                f"Storage provider {storage_provider_name} is not supported. "
 335                f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
 336            )
 337        if credentials_provider:
 338            storage_options["credentials_provider"] = credentials_provider
 339        if self._metric_gauges is not None:
 340            storage_options["metric_gauges"] = self._metric_gauges
 341        if self._metric_counters is not None:
 342            storage_options["metric_counters"] = self._metric_counters
 343        if self._metric_attributes_providers is not None:
 344            storage_options["metric_attributes_providers"] = self._metric_attributes_providers
 345        class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
 346        module_name = ".providers"
 347        cls = import_class(class_name, module_name, PACKAGE_NAME)
 348        return cls(**storage_options)
 349
 350    def _build_storage_provider_from_profile(self, storage_provider_profile: str):
 351        storage_profile_dict = self._profiles.get(storage_provider_profile)
 352        if not storage_profile_dict:
 353            raise ValueError(
 354                f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
 355            )
 356
 357        # Check if metadata provider is configured for this profile
 358        # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
 359        local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
 360        if local_metadata_provider_dict:
 361            raise ValueError(
 362                f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
 363            )
 364
 365        # Initialize CredentialsProvider
 366        local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
 367        local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
 368
 369        # Initialize StorageProvider
 370        local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
 371        if local_storage_provider_dict:
 372            local_name = local_storage_provider_dict["type"]
 373            local_storage_options = local_storage_provider_dict.get("options", {})
 374        else:
 375            raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
 376
 377        storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
 378        return storage_provider
 379
 380    def _build_credentials_provider(
 381        self,
 382        credentials_provider_dict: Optional[dict[str, Any]],
 383        storage_options: Optional[dict[str, Any]] = None,
 384    ) -> Optional[CredentialsProvider]:
 385        """
 386        Initializes the CredentialsProvider based on the provided dictionary.
 387
 388        Args:
 389            credentials_provider_dict: Dictionary containing credentials provider configuration
 390            storage_options: Storage provider options required by some credentials providers to scope the credentials.
 391        """
 392        if not credentials_provider_dict:
 393            return None
 394
 395        if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
 396            # Fully qualified class path case
 397            class_type = credentials_provider_dict["type"]
 398            module_name, class_name = class_type.rsplit(".", 1)
 399            cls = import_class(class_name, module_name)
 400        else:
 401            # Mapped class name case
 402            class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
 403            module_name = ".providers"
 404            cls = import_class(class_name, module_name, PACKAGE_NAME)
 405
 406        # Propagate storage provider options to credentials provider since they may be
 407        # required by some credentials providers to scope the credentials.
 408        import inspect
 409
 410        init_params = list(inspect.signature(cls.__init__).parameters)[1:]  # skip 'self'
 411        options = credentials_provider_dict.get("options", {})
 412        if storage_options:
 413            for storage_provider_option in storage_options.keys():
 414                if storage_provider_option in init_params and storage_provider_option not in options:
 415                    options[storage_provider_option] = storage_options[storage_provider_option]
 416
 417        return cls(**options)
 418
 419    def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
 420        # Initialize StorageProvider
 421        storage_provider_dict = profile_dict.get("storage_provider", None)
 422        if storage_provider_dict:
 423            storage_provider_name = storage_provider_dict["type"]
 424            storage_options = storage_provider_dict.get("options", {})
 425        else:
 426            raise ValueError("Missing storage_provider in the config.")
 427
 428        # Initialize CredentialsProvider
 429        # It is prudent to assume that in some cases, the credentials provider
 430        # will provide credentials scoped to specific base_path.
 431        # So we need to pass the storage_options to the credentials provider.
 432        credentials_provider_dict = profile_dict.get("credentials_provider", None)
 433        credentials_provider = self._build_credentials_provider(
 434            credentials_provider_dict=credentials_provider_dict,
 435            storage_options=storage_options,
 436        )
 437
 438        # Initialize MetadataProvider
 439        metadata_provider_dict = profile_dict.get("metadata_provider", None)
 440        metadata_provider = None
 441        if metadata_provider_dict:
 442            if metadata_provider_dict["type"] == "manifest":
 443                metadata_options = metadata_provider_dict.get("options", {})
 444                # If MetadataProvider has a reference to a different storage provider profile
 445                storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
 446                if storage_provider_profile:
 447                    storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
 448                else:
 449                    storage_provider = self._build_storage_provider(
 450                        storage_provider_name, storage_options, credentials_provider
 451                    )
 452
 453                metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
 454            else:
 455                class_type = metadata_provider_dict["type"]
 456                if "." not in class_type:
 457                    raise ValueError(
 458                        f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
 459                    )
 460                module_name, class_name = class_type.rsplit(".", 1)
 461                cls = import_class(class_name, module_name)
 462                options = metadata_provider_dict.get("options", {})
 463                metadata_provider = cls(**options)
 464
 465        # Build replicas if configured
 466        replicas_config = profile_dict.get("replicas", [])
 467        replicas = []
 468        if replicas_config:
 469            for replica_dict in replicas_config:
 470                replicas.append(
 471                    Replica(
 472                        replica_profile=replica_dict["replica_profile"],
 473                        read_priority=replica_dict["read_priority"],
 474                    )
 475                )
 476
 477            # Sort replicas by read_priority
 478            replicas.sort(key=lambda r: r.read_priority)
 479
 480        return SimpleProviderBundle(
 481            storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
 482            credentials_provider=credentials_provider,
 483            metadata_provider=metadata_provider,
 484            replicas=replicas,
 485        )
 486
 487    def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
 488        class_type = provider_bundle_dict["type"]
 489        module_name, class_name = class_type.rsplit(".", 1)
 490        cls = import_class(class_name, module_name)
 491        options = provider_bundle_dict.get("options", {})
 492        return cls(**options)
 493
 494    def _build_provider_bundle(self) -> ProviderBundle:
 495        if self._provider_bundle:
 496            return self._provider_bundle  # Return if previously provided.
 497
 498        # Load 3rd party extension
 499        provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
 500        if provider_bundle_dict:
 501            return self._build_provider_bundle_from_extension(provider_bundle_dict)
 502
 503        return self._build_provider_bundle_from_config(self._profile_dict)
 504
 505    def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
 506        if "size_mb" in cache_dict:
 507            raise ValueError(
 508                "The 'size_mb' property is no longer supported. \n"
 509                "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
 510                "Example configuration:\n"
 511                "cache:\n"
 512                "  size: 500G               # Optional: If not specified, default cache size (10GB) will be used\n"
 513                "  use_etag: true           # Optional: If not specified, default cache use_etag (true) will be used\n"
 514                "  location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
 515                "  eviction_policy:         # Optional: The eviction policy to use\n"
 516                "    policy: fifo           # Optional: The eviction policy to use, default is 'fifo'\n"
 517                "    refresh_interval: 300  # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
 518            )
 519
 520    def _validate_replicas(self, replicas: list[Replica]) -> None:
 521        """
 522        Validates that replica profiles do not have their own replicas configuration.
 523
 524        This prevents circular references where a replica profile could reference
 525        another profile that also has replicas, creating an infinite loop.
 526
 527        :param replicas: The list of Replica objects to validate
 528        :raises ValueError: If any replica profile has its own replicas configuration
 529        """
 530        for replica in replicas:
 531            replica_profile_name = replica.replica_profile
 532
 533            # Check that replica profile is not the same as the current profile
 534            if replica_profile_name == self._profile:
 535                raise ValueError(
 536                    f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
 537                )
 538
 539            # Check if the replica profile exists in the configuration
 540            if replica_profile_name not in self._profiles:
 541                raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
 542
 543            # Get the replica profile configuration
 544            replica_profile_dict = self._profiles[replica_profile_name]
 545
 546            # Check if the replica profile has its own replicas configuration
 547            if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
 548                raise ValueError(
 549                    f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
 550                    f"This creates a circular reference which is not allowed."
 551                )
 552
 553    def build_config(self) -> "StorageClientConfig":
 554        bundle = self._build_provider_bundle()
 555
 556        # Validate replicas to prevent circular references
 557        self._validate_replicas(bundle.replicas)
 558
 559        storage_provider = self._build_storage_provider(
 560            bundle.storage_provider_config.type,
 561            bundle.storage_provider_config.options,
 562            bundle.credentials_provider,
 563        )
 564
 565        cache_config: Optional[CacheConfig] = None
 566        cache_manager: Optional[CacheManager] = None
 567
 568        if self._cache_dict is not None:
 569            tempdir = tempfile.gettempdir()
 570            default_location = os.path.join(tempdir, "msc_cache")
 571            location = self._cache_dict.get("location", default_location)
 572
 573            # Check if cache_backend.cache_path is defined
 574            cache_backend = self._cache_dict.get("cache_backend", {})
 575            cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
 576
 577            # Warn if both location and cache_backend.cache_path are defined
 578            if cache_backend_path and self._cache_dict.get("location") is not None:
 579                logger.warning(
 580                    f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
 581                    f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
 582                )
 583            elif cache_backend_path:
 584                # Use cache_backend.cache_path only if location is not explicitly defined
 585                location = cache_backend_path
 586
 587            if not Path(location).is_absolute():
 588                raise ValueError(f"Cache location must be an absolute path: {location}")
 589
 590            # Initialize cache_dict with default values
 591            cache_dict = self._cache_dict
 592
 593            # Verify cache config
 594            self._verify_cache_config(cache_dict)
 595
 596            # Initialize eviction policy
 597            if "eviction_policy" in cache_dict:
 598                policy = cache_dict["eviction_policy"]["policy"].lower()
 599                eviction_policy = EvictionPolicyConfig(
 600                    policy=policy,
 601                    refresh_interval=cache_dict["eviction_policy"].get(
 602                        "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
 603                    ),
 604                )
 605            else:
 606                eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
 607
 608            # Create cache config from the standardized format
 609            cache_config = CacheConfig(
 610                size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
 611                location=cache_dict.get("location", location),
 612                use_etag=cache_dict.get("use_etag", True),
 613                eviction_policy=eviction_policy,
 614            )
 615
 616            cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
 617
 618        # retry options
 619        retry_config_dict = self._profile_dict.get("retry", None)
 620        if retry_config_dict:
 621            attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
 622            delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
 623            retry_config = RetryConfig(attempts=attempts, delay=delay)
 624        else:
 625            retry_config = RetryConfig(attempts=DEFAULT_RETRY_ATTEMPTS, delay=DEFAULT_RETRY_DELAY)
 626
 627        # autocommit options
 628        autocommit_config = AutoCommitConfig()
 629        autocommit_dict = self._profile_dict.get("autocommit", None)
 630        if autocommit_dict:
 631            interval_minutes = autocommit_dict.get("interval_minutes", None)
 632            at_exit = autocommit_dict.get("at_exit", False)
 633            autocommit_config = AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
 634
 635        # set up OpenTelemetry providers once per process
 636        #
 637        # TODO: Legacy, need to remove.
 638        if self._opentelemetry_dict:
 639            setup_opentelemetry(self._opentelemetry_dict)
 640
 641        return StorageClientConfig(
 642            profile=self._profile,
 643            storage_provider=storage_provider,
 644            credentials_provider=bundle.credentials_provider,
 645            metadata_provider=bundle.metadata_provider,
 646            cache_config=cache_config,
 647            cache_manager=cache_manager,
 648            retry_config=retry_config,
 649            metric_gauges=self._metric_gauges,
 650            metric_counters=self._metric_counters,
 651            metric_attributes_providers=self._metric_attributes_providers,
 652            replicas=bundle.replicas,
 653            autocommit_config=autocommit_config,
 654        )
 655
 656
 657class PathMapping:
 658    """
 659    Class to handle path mappings defined in the MSC configuration.
 660
 661    Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
 662    where entries are sorted by prefix length (longest first) for optimal matching.
 663    Longer paths take precedence when matching.
 664    """
 665
 666    def __init__(self):
 667        """Initialize an empty PathMapping."""
 668        self._mapping = defaultdict(lambda: defaultdict(list))
 669
 670    @classmethod
 671    def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
 672        """
 673        Create a PathMapping instance from configuration dictionary.
 674
 675        :param config_dict: Configuration dictionary, if None the config will be loaded
 676        :return: A PathMapping instance with processed mappings
 677        """
 678        if config_dict is None:
 679            # Import locally to avoid circular imports
 680            from multistorageclient.config import StorageClientConfig
 681
 682            config_dict = StorageClientConfig.read_msc_config()
 683
 684        if not config_dict:
 685            return cls()
 686
 687        instance = cls()
 688        instance._load_mapping(config_dict)
 689        return instance
 690
 691    def _load_mapping(self, config_dict: dict[str, Any]) -> None:
 692        """
 693        Load path mapping from a configuration dictionary.
 694
 695        :param config_dict: Configuration dictionary containing path mapping
 696        """
 697        # Get the path_mapping section
 698        path_mapping = config_dict.get("path_mapping", {})
 699        if path_mapping is None:
 700            return
 701
 702        # Process each mapping
 703        for source_path, dest_path in path_mapping.items():
 704            # Validate format
 705            if not source_path.endswith("/"):
 706                continue
 707            if not dest_path.startswith(MSC_PROTOCOL):
 708                continue
 709            if not dest_path.endswith("/"):
 710                continue
 711
 712            # Extract the destination profile
 713            pr_dest = urlparse(dest_path)
 714            dest_profile = pr_dest.netloc
 715
 716            # Parse the source path
 717            pr = urlparse(source_path)
 718            protocol = pr.scheme.lower() if pr.scheme else "file"
 719
 720            if protocol == "file" or source_path.startswith("/"):
 721                # For file or absolute paths, use the whole path as the prefix
 722                # and leave bucket empty
 723                bucket = ""
 724                prefix = source_path if source_path.startswith("/") else pr.path
 725            else:
 726                # For object storage, extract bucket and prefix
 727                bucket = pr.netloc
 728                prefix = pr.path
 729                if prefix.startswith("/"):
 730                    prefix = prefix[1:]
 731
 732            # Add the mapping to the nested dict
 733            self._mapping[protocol][bucket].append((prefix, dest_profile))
 734
 735        # Sort each bucket's prefixes by length (longest first) for optimal matching
 736        for protocol, buckets in self._mapping.items():
 737            for bucket, prefixes in buckets.items():
 738                self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
 739
 740    def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
 741        """
 742        Find the best matching mapping for the given URL.
 743
 744        :param url: URL to find matching mapping for
 745        :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
 746        """
 747        # Parse the URL
 748        pr = urlparse(url)
 749        protocol = pr.scheme.lower() if pr.scheme else "file"
 750
 751        # For file paths or absolute paths
 752        if protocol == "file" or url.startswith("/"):
 753            path = url if url.startswith("/") else pr.path
 754
 755            possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
 756
 757            # Check each prefix (already sorted by length, longest first)
 758            for prefix, profile in possible_mapping:
 759                if path.startswith(prefix):
 760                    # Calculate the relative path
 761                    rel_path = path[len(prefix) :]
 762                    if not rel_path.startswith("/"):
 763                        rel_path = "/" + rel_path
 764                    return profile, rel_path
 765
 766            return None
 767
 768        # For object storage
 769        bucket = pr.netloc
 770        path = pr.path
 771        if path.startswith("/"):
 772            path = path[1:]
 773
 774        # Check bucket-specific mapping
 775        possible_mapping = (
 776            self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
 777        )
 778
 779        # Check each prefix (already sorted by length, longest first)
 780        for prefix, profile in possible_mapping:
 781            # matching prefix
 782            if path.startswith(prefix):
 783                rel_path = path[len(prefix) :]
 784                # Remove leading slash if present
 785                if rel_path.startswith("/"):
 786                    rel_path = rel_path[1:]
 787
 788                return profile, rel_path
 789
 790        return None
 791
 792
[docs] 793class StorageClientConfig: 794 """ 795 Configuration class for the :py:class:`multistorageclient.StorageClient`. 796 """ 797 798 profile: str 799 storage_provider: StorageProvider 800 credentials_provider: Optional[CredentialsProvider] 801 metadata_provider: Optional[MetadataProvider] 802 cache_config: Optional[CacheConfig] 803 cache_manager: Optional[CacheManager] 804 retry_config: Optional[RetryConfig] 805 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] 806 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] 807 metric_attributes_providers: Optional[Sequence[AttributesProvider]] 808 replicas: list[Replica] 809 autocommit_config: Optional[AutoCommitConfig] 810 811 _config_dict: Optional[dict[str, Any]] 812 813 def __init__( 814 self, 815 profile: str, 816 storage_provider: StorageProvider, 817 credentials_provider: Optional[CredentialsProvider] = None, 818 metadata_provider: Optional[MetadataProvider] = None, 819 cache_config: Optional[CacheConfig] = None, 820 cache_manager: Optional[CacheManager] = None, 821 retry_config: Optional[RetryConfig] = None, 822 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] = None, 823 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] = None, 824 metric_attributes_providers: Optional[Sequence[AttributesProvider]] = None, 825 replicas: Optional[list[Replica]] = None, 826 autocommit_config: Optional[AutoCommitConfig] = None, 827 ): 828 if replicas is None: 829 replicas = [] 830 self.profile = profile 831 self.storage_provider = storage_provider 832 self.credentials_provider = credentials_provider 833 self.metadata_provider = metadata_provider 834 self.cache_config = cache_config 835 self.cache_manager = cache_manager 836 self.retry_config = retry_config 837 self.metric_gauges = metric_gauges 838 self.metric_counters = metric_counters 839 self.metric_attributes_providers = metric_attributes_providers 840 self.replicas = replicas 841 self.autocommit_config = autocommit_config 842 843 @staticmethod 844 def from_json( 845 config_json: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None 846 ) -> "StorageClientConfig": 847 config_dict = json.loads(config_json) 848 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry) 849 850 @staticmethod 851 def from_yaml( 852 config_yaml: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None 853 ) -> "StorageClientConfig": 854 config_dict = yaml.safe_load(config_yaml) 855 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry) 856 857 @staticmethod 858 def from_dict( 859 config_dict: dict[str, Any], 860 profile: str = DEFAULT_POSIX_PROFILE_NAME, 861 skip_validation: bool = False, 862 telemetry: Optional[Telemetry] = None, 863 ) -> "StorageClientConfig": 864 # Validate the config file with predefined JSON schema 865 if not skip_validation: 866 validate_config(config_dict) 867 868 # Load config 869 loader = StorageClientConfigLoader(config_dict=config_dict, profile=profile, telemetry=telemetry) 870 config = loader.build_config() 871 config._config_dict = config_dict 872 873 return config 874 875 @staticmethod 876 def from_file( 877 profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None 878 ) -> "StorageClientConfig": 879 msc_config_file = os.getenv("MSC_CONFIG", None) 880 881 # Search config paths 882 if msc_config_file is None: 883 for filename in DEFAULT_MSC_CONFIG_FILE_SEARCH_PATHS: 884 if os.path.exists(filename): 885 msc_config_file = filename 886 break 887 888 msc_config_dict = StorageClientConfig.read_msc_config() 889 # Parse rclone config file. 890 rclone_config_dict, rclone_config_file = read_rclone_config() 891 892 # Merge config files. 893 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict) 894 if conflicted_keys: 895 raise ValueError( 896 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}' 897 ) 898 merged_profiles = merged_config.get("profiles", {}) 899 900 # Check if profile is in merged_profiles 901 if profile in merged_profiles: 902 return StorageClientConfig.from_dict(config_dict=merged_config, profile=profile, telemetry=telemetry) 903 else: 904 # Check if profile is the default profile or an implicit profile 905 if profile == DEFAULT_POSIX_PROFILE_NAME: 906 implicit_profile_config = DEFAULT_POSIX_PROFILE 907 elif profile.startswith("_"): 908 # Handle implicit profiles 909 parts = profile[1:].split("-", 1) 910 if len(parts) == 2: 911 protocol, bucket = parts 912 # Verify it's a supported protocol 913 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 914 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"') 915 implicit_profile_config = create_implicit_profile_config( 916 profile_name=profile, protocol=protocol, base_path=bucket 917 ) 918 else: 919 raise ValueError(f'Invalid implicit profile format: "{profile}"') 920 else: 921 raise ValueError( 922 f'Profile "{profile}" not found in configuration files. Configuration was checked in ' 923 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. " 924 f"Please verify that the profile exists and that configuration files are correctly located." 925 ) 926 # merge the implicit profile config into the merged config so the cache & observability config can be inherited 927 if "profiles" not in merged_config: 928 merged_config["profiles"] = implicit_profile_config["profiles"] 929 else: 930 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile] 931 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_" 932 return StorageClientConfig.from_dict( 933 config_dict=merged_config, profile=profile, skip_validation=True, telemetry=telemetry 934 ) 935 936 @staticmethod 937 def from_provider_bundle( 938 config_dict: dict[str, Any], provider_bundle: ProviderBundle, telemetry: Optional[Telemetry] = None 939 ) -> "StorageClientConfig": 940 loader = StorageClientConfigLoader( 941 config_dict=config_dict, provider_bundle=provider_bundle, telemetry=telemetry 942 ) 943 config = loader.build_config() 944 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors 945 return config 946
[docs] 947 @staticmethod 948 def read_msc_config() -> Optional[dict[str, Any]]: 949 """Get the MSC configuration dictionary. 950 951 :return: The MSC configuration dictionary or empty dict if no config was found 952 """ 953 config_dict = {} 954 config_found = False 955 956 # Check for environment variable first 957 msc_config = os.getenv("MSC_CONFIG", None) 958 if msc_config and os.path.exists(msc_config): 959 try: 960 with open(msc_config) as f: 961 if msc_config.endswith(".json"): 962 config_dict = json.load(f) 963 config_found = True 964 else: 965 config_dict = yaml.safe_load(f) 966 config_found = True 967 except Exception as e: 968 raise ValueError(f"malformed msc config file: {msc_config}, exception: {e}") 969 970 # If not found through environment variable, try standard search paths 971 if not config_found: 972 for path in DEFAULT_MSC_CONFIG_FILE_SEARCH_PATHS: 973 if not os.path.exists(path): 974 continue 975 try: 976 with open(path) as f: 977 if path.endswith(".json"): 978 config_dict = json.load(f) 979 config_found = True 980 break 981 else: 982 config_dict = yaml.safe_load(f) 983 config_found = True 984 break 985 except Exception as e: 986 raise ValueError(f"malformed msc config file: {msc_config}, exception: {e}") 987 988 if config_dict: 989 validate_config(config_dict) 990 return config_dict
991
[docs] 992 @staticmethod 993 def read_path_mapping() -> PathMapping: 994 """ 995 Get the path mapping defined in the MSC configuration. 996 997 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)] 998 where entries are sorted by prefix length (longest first) for optimal matching. 999 Longer paths take precedence when matching. 1000 1001 :return: A PathMapping instance with translation mappings 1002 """ 1003 try: 1004 return PathMapping.from_config() 1005 except Exception: 1006 # Log the error but continue - this shouldn't stop the application from working 1007 logger.error("Failed to load path_mapping from MSC config") 1008 return PathMapping()
1009 1010 def __getstate__(self) -> dict[str, Any]: 1011 state = self.__dict__.copy() 1012 if not state.get("_config_dict"): 1013 raise ValueError("StorageClientConfig is not serializable") 1014 del state["credentials_provider"] 1015 del state["storage_provider"] 1016 del state["metadata_provider"] 1017 del state["cache_manager"] 1018 del state["replicas"] 1019 return state 1020 1021 def __setstate__(self, state: dict[str, Any]) -> None: 1022 # Presence checked by __getstate__. 1023 config_dict = state["_config_dict"] 1024 loader = StorageClientConfigLoader( 1025 config_dict=config_dict, 1026 profile=state["profile"], 1027 metric_gauges=state["metric_gauges"], 1028 metric_counters=state["metric_counters"], 1029 metric_attributes_providers=state["metric_attributes_providers"], 1030 ) 1031 new_config = loader.build_config() 1032 self.profile = new_config.profile 1033 self.storage_provider = new_config.storage_provider 1034 self.credentials_provider = new_config.credentials_provider 1035 self.metadata_provider = new_config.metadata_provider 1036 self.cache_config = new_config.cache_config 1037 self.cache_manager = new_config.cache_manager 1038 self.retry_config = new_config.retry_config 1039 self.metric_gauges = new_config.metric_gauges 1040 self.metric_counters = new_config.metric_counters 1041 self.metric_attributes_providers = new_config.metric_attributes_providers 1042 self._config_dict = config_dict 1043 self.replicas = new_config.replicas 1044 self.autocommit_config = new_config.autocommit_config