Source code for multistorageclient.config

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import json
 17import logging
 18import os
 19import tempfile
 20from collections import defaultdict
 21from collections.abc import Sequence
 22from pathlib import Path
 23from typing import Any, Optional
 24from urllib.parse import urlparse
 25
 26import opentelemetry.metrics as api_metrics
 27import yaml
 28
 29from .cache import DEFAULT_CACHE_SIZE, CacheBackendFactory, CacheManager
 30from .caching.cache_config import CacheBackendConfig, CacheConfig, EvictionPolicyConfig
 31from .instrumentation import setup_opentelemetry
 32from .providers.manifest_metadata import ManifestMetadataProvider
 33from .rclone import read_rclone_config
 34from .schema import validate_config
 35from .telemetry import Telemetry
 36from .telemetry.attributes.base import AttributesProvider
 37from .types import (
 38    DEFAULT_RETRY_ATTEMPTS,
 39    DEFAULT_RETRY_DELAY,
 40    MSC_PROTOCOL,
 41    CredentialsProvider,
 42    MetadataProvider,
 43    ProviderBundle,
 44    RetryConfig,
 45    StorageProvider,
 46    StorageProviderConfig,
 47)
 48from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
 49
 50# Constants related to implicit profiles
 51SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
 52PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
 53    "s3": "s3",
 54    "gs": "gcs",
 55    "ais": "ais",
 56    "file": "file",
 57}
 58
 59_TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING = {
 60    "environment_variables": "multistorageclient.telemetry.attributes.environment_variables.EnvironmentVariablesAttributesProvider",
 61    "host": "multistorageclient.telemetry.attributes.host.HostAttributesProvider",
 62    "msc_config": "multistorageclient.telemetry.attributes.msc_config.MSCConfigAttributesProvider",
 63    "process": "multistorageclient.telemetry.attributes.process.ProcessAttributesProvider",
 64    "static": "multistorageclient.telemetry.attributes.static.StaticAttributesProvider",
 65    "thread": "multistorageclient.telemetry.attributes.thread.ThreadAttributesProvider",
 66}
 67
 68
 69# Template for creating implicit profile configurations
 70def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
 71    """
 72    Create a configuration dictionary for an implicit profile.
 73
 74    :param profile_name: The name of the profile (e.g., "_s3-bucket1")
 75    :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
 76    :param base_path: The base path (e.g., bucket name) for the storage provider
 77
 78    :return: A configuration dictionary for the implicit profile
 79    """
 80    provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
 81    return {
 82        "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
 83    }
 84
 85
 86DEFAULT_POSIX_PROFILE_NAME = "default"
 87DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
 88
 89STORAGE_PROVIDER_MAPPING = {
 90    "file": "PosixFileStorageProvider",
 91    "s3": "S3StorageProvider",
 92    "gcs": "GoogleStorageProvider",
 93    "oci": "OracleStorageProvider",
 94    "azure": "AzureBlobStorageProvider",
 95    "ais": "AIStoreStorageProvider",
 96    "s8k": "S8KStorageProvider",
 97    "gcs_s3": "GoogleS3StorageProvider",
 98}
 99
100CREDENTIALS_PROVIDER_MAPPING = {
101    "S3Credentials": "StaticS3CredentialsProvider",
102    "AzureCredentials": "StaticAzureCredentialsProvider",
103    "AISCredentials": "StaticAISCredentialProvider",
104    "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
105}
106
107
108def _find_config_file_paths():
109    """
110    Get configuration file search paths.
111
112    Returns paths in order of precedence:
113
114    1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
115    2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
116    """
117    paths = []
118
119    # 1. User-specific configuration directory
120    xdg_config_home = os.getenv("XDG_CONFIG_HOME")
121
122    if xdg_config_home:
123        paths.extend(
124            [
125                os.path.join(xdg_config_home, "msc", "config.yaml"),
126                os.path.join(xdg_config_home, "msc", "config.json"),
127            ]
128        )
129
130    user_home = os.getenv("HOME")
131
132    if user_home:
133        paths.extend(
134            [
135                os.path.join(user_home, ".msc_config.yaml"),
136                os.path.join(user_home, ".msc_config.json"),
137                os.path.join(user_home, ".config", "msc", "config.yaml"),
138                os.path.join(user_home, ".config", "msc", "config.json"),
139            ]
140        )
141
142    # 2. System-wide configuration directories
143    xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
144    if not xdg_config_dirs:
145        xdg_config_dirs = "/etc/xdg"
146
147    for config_dir in xdg_config_dirs.split(":"):
148        config_dir = config_dir.strip()
149        if config_dir:
150            paths.extend(
151                [
152                    os.path.join(config_dir, "msc", "config.yaml"),
153                    os.path.join(config_dir, "msc", "config.json"),
154                ]
155            )
156
157    paths.extend(
158        [
159            "/etc/msc_config.yaml",
160            "/etc/msc_config.json",
161        ]
162    )
163
164    return tuple(paths)
165
166
167DEFAULT_MSC_CONFIG_FILE_SEARCH_PATHS = _find_config_file_paths()
168
169PACKAGE_NAME = "multistorageclient"
170
171logger = logging.Logger(__name__)
172
173
174class SimpleProviderBundle(ProviderBundle):
175    def __init__(
176        self,
177        storage_provider_config: StorageProviderConfig,
178        credentials_provider: Optional[CredentialsProvider] = None,
179        metadata_provider: Optional[MetadataProvider] = None,
180    ):
181        self._storage_provider_config = storage_provider_config
182        self._credentials_provider = credentials_provider
183        self._metadata_provider = metadata_provider
184
185    @property
186    def storage_provider_config(self) -> StorageProviderConfig:
187        return self._storage_provider_config
188
189    @property
190    def credentials_provider(self) -> Optional[CredentialsProvider]:
191        return self._credentials_provider
192
193    @property
194    def metadata_provider(self) -> Optional[MetadataProvider]:
195        return self._metadata_provider
196
197
198DEFAULT_CACHE_REFRESH_INTERVAL = 300
199
200
201class StorageClientConfigLoader:
202    _provider_bundle: Optional[ProviderBundle]
203    _profiles: dict[str, Any]
204    _profile: str
205    _profile_dict: dict[str, Any]
206    _opentelemetry_dict: Optional[dict[str, Any]]
207    _metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]]
208    _metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]]
209    _metric_attributes_providers: Optional[Sequence[AttributesProvider]]
210    _cache_dict: Optional[dict[str, Any]]
211
212    def __init__(
213        self,
214        config_dict: dict[str, Any],
215        profile: str = DEFAULT_POSIX_PROFILE_NAME,
216        provider_bundle: Optional[ProviderBundle] = None,
217        telemetry: Optional[Telemetry] = None,
218    ) -> None:
219        """
220        Initializes a :py:class:`StorageClientConfigLoader` to create a
221        StorageClientConfig. Components are built using the ``config_dict`` and
222        profile, but a pre-built provider_bundle takes precedence.
223
224        :param config_dict: Dictionary of configuration options.
225        :param profile: Name of profile in ``config_dict`` to use to build configuration.
226        :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle`, takes precedence over ``config_dict``.
227        :param telemetry: Telemetry instance to use.
228        """
229        # ProviderBundle takes precedence
230        self._provider_bundle = provider_bundle
231
232        # Interpolates all environment variables into actual values.
233        config_dict = expand_env_vars(config_dict)
234
235        self._profiles = config_dict.get("profiles", {})
236
237        if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
238            # Assign the default POSIX profile
239            self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
240        else:
241            # Cannot override default POSIX profile
242            storage_provider_type = (
243                self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
244            )
245            if storage_provider_type != "file":
246                raise ValueError(
247                    f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
248                    f'"{storage_provider_type}"; expected "file".'
249                )
250
251        profile_dict = self._profiles.get(profile)
252
253        if not profile_dict:
254            raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
255
256        self._profile = profile
257        self._profile_dict = profile_dict
258
259        self._opentelemetry_dict = config_dict.get("opentelemetry", None)
260
261        self._metric_gauges = None
262        self._metric_counters = None
263        self._metric_attributes_providers = None
264        if self._opentelemetry_dict is not None:
265            if "metrics" in self._opentelemetry_dict:
266                if telemetry is not None:
267                    self._metric_gauges = {}
268                    for name in Telemetry.GaugeName:
269                        gauge = telemetry.gauge(config=self._opentelemetry_dict["metrics"], name=name)
270                        if gauge is not None:
271                            self._metric_gauges[name] = gauge
272                    self._metric_counters = {}
273                    for name in Telemetry.CounterName:
274                        counter = telemetry.counter(config=self._opentelemetry_dict["metrics"], name=name)
275                        if counter is not None:
276                            self._metric_counters[name] = counter
277
278                    if "attributes" in self._opentelemetry_dict["metrics"]:
279                        attributes_providers: list[AttributesProvider] = []
280                        attributes_provider_configs: list[dict[str, Any]] = self._opentelemetry_dict["metrics"][
281                            "attributes"
282                        ]
283                        for config in attributes_provider_configs:
284                            attributes_provider_type: str = config["type"]
285                            attributes_provider_fully_qualified_name = _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING.get(
286                                attributes_provider_type, attributes_provider_type
287                            )
288                            attributes_provider_module_name, attributes_provider_class_name = (
289                                attributes_provider_fully_qualified_name.rsplit(".", 1)
290                            )
291                            cls = import_class(attributes_provider_class_name, attributes_provider_module_name)
292                            attributes_provider_options = config.get("options", {})
293                            attributes_provider: AttributesProvider = cls(**attributes_provider_options)
294                            attributes_providers.append(attributes_provider)
295                        self._metric_attributes_providers = tuple(attributes_providers)
296                else:
297                    # TODO: Remove "beta" from the log once legacy metrics are removed.
298                    logging.error("No telemetry instance! Disabling beta metrics.")
299
300        self._cache_dict = config_dict.get("cache", None)
301
302    def _build_storage_provider(
303        self,
304        storage_provider_name: str,
305        storage_options: Optional[dict[str, Any]] = None,
306        credentials_provider: Optional[CredentialsProvider] = None,
307    ) -> StorageProvider:
308        if storage_options is None:
309            storage_options = {}
310        if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
311            raise ValueError(
312                f"Storage provider {storage_provider_name} is not supported. "
313                f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
314            )
315        if credentials_provider:
316            storage_options["credentials_provider"] = credentials_provider
317        if self._metric_gauges:
318            storage_options["metric_gauges"] = self._metric_gauges
319        if self._metric_counters:
320            storage_options["metric_counters"] = self._metric_counters
321        if self._metric_attributes_providers:
322            storage_options["metric_attributes_providers"] = self._metric_attributes_providers
323        class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
324        module_name = ".providers"
325        cls = import_class(class_name, module_name, PACKAGE_NAME)
326        return cls(**storage_options)
327
328    def _build_storage_provider_from_profile(self, storage_provider_profile: str):
329        storage_profile_dict = self._profiles.get(storage_provider_profile)
330        if not storage_profile_dict:
331            raise ValueError(
332                f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
333            )
334
335        # Check if metadata provider is configured for this profile
336        # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
337        local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
338        if local_metadata_provider_dict:
339            raise ValueError(
340                f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
341            )
342
343        # Initialize CredentialsProvider
344        local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
345        local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
346
347        # Initialize StorageProvider
348        local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
349        if local_storage_provider_dict:
350            local_name = local_storage_provider_dict["type"]
351            local_storage_options = local_storage_provider_dict.get("options", {})
352        else:
353            raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
354
355        storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
356        return storage_provider
357
358    def _build_credentials_provider(
359        self,
360        credentials_provider_dict: Optional[dict[str, Any]],
361        storage_options: Optional[dict[str, Any]] = None,
362    ) -> Optional[CredentialsProvider]:
363        """
364        Initializes the CredentialsProvider based on the provided dictionary.
365
366        Args:
367            credentials_provider_dict: Dictionary containing credentials provider configuration
368            storage_options: Storage provider options required by some credentials providers to scope the credentials.
369        """
370        if not credentials_provider_dict:
371            return None
372
373        if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
374            # Fully qualified class path case
375            class_type = credentials_provider_dict["type"]
376            module_name, class_name = class_type.rsplit(".", 1)
377            cls = import_class(class_name, module_name)
378        else:
379            # Mapped class name case
380            class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
381            module_name = ".providers"
382            cls = import_class(class_name, module_name, PACKAGE_NAME)
383
384        # Propagate storage provider options to credentials provider since they may be
385        # required by some credentials providers to scope the credentials.
386        import inspect
387
388        init_params = list(inspect.signature(cls.__init__).parameters)[1:]  # skip 'self'
389        options = credentials_provider_dict.get("options", {})
390        if storage_options:
391            for storage_provider_option in storage_options.keys():
392                if storage_provider_option in init_params and storage_provider_option not in options:
393                    options[storage_provider_option] = storage_options[storage_provider_option]
394
395        return cls(**options)
396
397    def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
398        # Initialize StorageProvider
399        storage_provider_dict = profile_dict.get("storage_provider", None)
400        if storage_provider_dict:
401            storage_provider_name = storage_provider_dict["type"]
402            storage_options = storage_provider_dict.get("options", {})
403        else:
404            raise ValueError("Missing storage_provider in the config.")
405
406        # Initialize CredentialsProvider
407        # It is prudent to assume that in some cases, the credentials provider
408        # will provide credentials scoped to specific base_path.
409        # So we need to pass the storage_options to the credentials provider.
410        credentials_provider_dict = profile_dict.get("credentials_provider", None)
411        credentials_provider = self._build_credentials_provider(
412            credentials_provider_dict=credentials_provider_dict,
413            storage_options=storage_options,
414        )
415
416        # Initialize MetadataProvider
417        metadata_provider_dict = profile_dict.get("metadata_provider", None)
418        metadata_provider = None
419        if metadata_provider_dict:
420            if metadata_provider_dict["type"] == "manifest":
421                metadata_options = metadata_provider_dict.get("options", {})
422                # If MetadataProvider has a reference to a different storage provider profile
423                storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
424                if storage_provider_profile:
425                    storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
426                else:
427                    storage_provider = self._build_storage_provider(
428                        storage_provider_name, storage_options, credentials_provider
429                    )
430
431                metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
432            else:
433                class_type = metadata_provider_dict["type"]
434                if "." not in class_type:
435                    raise ValueError(
436                        f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
437                    )
438                module_name, class_name = class_type.rsplit(".", 1)
439                cls = import_class(class_name, module_name)
440                options = metadata_provider_dict.get("options", {})
441                metadata_provider = cls(**options)
442
443        return SimpleProviderBundle(
444            storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
445            credentials_provider=credentials_provider,
446            metadata_provider=metadata_provider,
447        )
448
449    def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
450        class_type = provider_bundle_dict["type"]
451        module_name, class_name = class_type.rsplit(".", 1)
452        cls = import_class(class_name, module_name)
453        options = provider_bundle_dict.get("options", {})
454        return cls(**options)
455
456    def _build_provider_bundle(self) -> ProviderBundle:
457        if self._provider_bundle:
458            return self._provider_bundle  # Return if previously provided.
459
460        # Load 3rd party extension
461        provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
462        if provider_bundle_dict:
463            return self._build_provider_bundle_from_extension(provider_bundle_dict)
464
465        return self._build_provider_bundle_from_config(self._profile_dict)
466
467    def _build_cache_manager(self, cache_config: CacheConfig) -> CacheManager:
468        cache_storage_provider = None
469
470        # Use the storage provider profile from cache config if specified
471        if cache_config.backend and cache_config.backend.storage_provider_profile:
472            cache_profile = cache_config.backend.storage_provider_profile
473            if cache_profile == self._profile:
474                logger.warning(
475                    f"Same profile used for cache backend and storage provider: {cache_profile}. "
476                    "This is not recommended as it may lead to unintended modifications of the source data. "
477                    "Consider using a separate read-only profile for the cache backend."
478                )
479
480            cache_storage_provider = self._build_storage_provider_from_profile(cache_profile)
481
482        cache_manager = CacheBackendFactory.create(
483            profile=self._profile, cache_config=cache_config, storage_provider=cache_storage_provider
484        )
485        return cache_manager
486
487    def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
488        if "size_mb" in cache_dict:
489            raise ValueError(
490                "The 'size_mb' property is no longer supported. \n"
491                "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
492                "Example configuration:\n"
493                "cache:\n"
494                "  size: 500G               # Optional: If not specified, default cache size (10GB) will be used\n"
495                "  use_etag: true           # Optional: If not specified, default cache use_etag (true) will be used\n"
496                "  location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
497                "  eviction_policy:         # Optional: The eviction policy to use\n"
498                "    policy: fifo           # Optional: The eviction policy to use, default is 'fifo'\n"
499                "    refresh_interval: 300  # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
500            )
501
502    def build_config(self) -> "StorageClientConfig":
503        bundle = self._build_provider_bundle()
504        storage_provider = self._build_storage_provider(
505            bundle.storage_provider_config.type,
506            bundle.storage_provider_config.options,
507            bundle.credentials_provider,
508        )
509
510        cache_config: Optional[CacheConfig] = None
511        cache_manager: Optional[CacheManager] = None
512
513        if self._cache_dict is not None:
514            tempdir = tempfile.gettempdir()
515            default_location = os.path.join(tempdir, "msc_cache")
516            location = self._cache_dict.get("location", default_location)
517
518            if not Path(location).is_absolute():
519                raise ValueError(f"Cache location must be an absolute path: {location}")
520
521            # Initialize cache_dict with default values
522            cache_dict = self._cache_dict
523
524            # Verify cache config
525            self._verify_cache_config(cache_dict)
526
527            # Initialize eviction policy
528            if "eviction_policy" in cache_dict:
529                policy = cache_dict["eviction_policy"]["policy"].lower()
530                eviction_policy = EvictionPolicyConfig(
531                    policy=policy,
532                    refresh_interval=cache_dict["eviction_policy"].get(
533                        "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
534                    ),
535                )
536            else:
537                eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
538
539            # Create cache config from the standardized format
540            cache_config = CacheConfig(
541                size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
542                use_etag=cache_dict.get("use_etag", True),
543                eviction_policy=eviction_policy,
544                backend=CacheBackendConfig(
545                    cache_path=cache_dict.get("cache_backend", {}).get("cache_path", location),
546                    storage_provider_profile=cache_dict.get("cache_backend", {}).get("storage_provider_profile"),
547                ),
548            )
549
550            cache_manager = self._build_cache_manager(cache_config)
551
552        # retry options
553        retry_config_dict = self._profile_dict.get("retry", None)
554        if retry_config_dict:
555            attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
556            delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
557            retry_config = RetryConfig(attempts=attempts, delay=delay)
558        else:
559            retry_config = RetryConfig(attempts=DEFAULT_RETRY_ATTEMPTS, delay=DEFAULT_RETRY_DELAY)
560
561        # set up OpenTelemetry providers once per process
562        #
563        # TODO: Legacy, need to remove.
564        if self._opentelemetry_dict:
565            setup_opentelemetry(self._opentelemetry_dict)
566
567        return StorageClientConfig(
568            profile=self._profile,
569            storage_provider=storage_provider,
570            credentials_provider=bundle.credentials_provider,
571            metadata_provider=bundle.metadata_provider,
572            cache_config=cache_config,
573            cache_manager=cache_manager,
574            retry_config=retry_config,
575        )
576
577
578class PathMapping:
579    """
580    Class to handle path mappings defined in the MSC configuration.
581
582    Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
583    where entries are sorted by prefix length (longest first) for optimal matching.
584    Longer paths take precedence when matching.
585    """
586
587    def __init__(self):
588        """Initialize an empty PathMapping."""
589        self._mapping = defaultdict(lambda: defaultdict(list))
590
591    @classmethod
592    def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
593        """
594        Create a PathMapping instance from configuration dictionary.
595
596        :param config_dict: Configuration dictionary, if None the config will be loaded
597        :return: A PathMapping instance with processed mappings
598        """
599        if config_dict is None:
600            # Import locally to avoid circular imports
601            from multistorageclient.config import StorageClientConfig
602
603            config_dict = StorageClientConfig.read_msc_config()
604
605        if not config_dict:
606            return cls()
607
608        instance = cls()
609        instance._load_mapping(config_dict)
610        return instance
611
612    def _load_mapping(self, config_dict: dict[str, Any]) -> None:
613        """
614        Load path mapping from a configuration dictionary.
615
616        :param config_dict: Configuration dictionary containing path mapping
617        """
618        # Get the path_mapping section
619        path_mapping = config_dict.get("path_mapping", {})
620        if path_mapping is None:
621            return
622
623        # Process each mapping
624        for source_path, dest_path in path_mapping.items():
625            # Validate format
626            if not source_path.endswith("/"):
627                continue
628            if not dest_path.startswith(MSC_PROTOCOL):
629                continue
630            if not dest_path.endswith("/"):
631                continue
632
633            # Extract the destination profile
634            pr_dest = urlparse(dest_path)
635            dest_profile = pr_dest.netloc
636
637            # Parse the source path
638            pr = urlparse(source_path)
639            protocol = pr.scheme.lower() if pr.scheme else "file"
640
641            if protocol == "file" or source_path.startswith("/"):
642                # For file or absolute paths, use the whole path as the prefix
643                # and leave bucket empty
644                bucket = ""
645                prefix = source_path if source_path.startswith("/") else pr.path
646            else:
647                # For object storage, extract bucket and prefix
648                bucket = pr.netloc
649                prefix = pr.path
650                if prefix.startswith("/"):
651                    prefix = prefix[1:]
652
653            # Add the mapping to the nested dict
654            self._mapping[protocol][bucket].append((prefix, dest_profile))
655
656        # Sort each bucket's prefixes by length (longest first) for optimal matching
657        for protocol, buckets in self._mapping.items():
658            for bucket, prefixes in buckets.items():
659                self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
660
661    def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
662        """
663        Find the best matching mapping for the given URL.
664
665        :param url: URL to find matching mapping for
666        :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
667        """
668        # Parse the URL
669        pr = urlparse(url)
670        protocol = pr.scheme.lower() if pr.scheme else "file"
671
672        # For file paths or absolute paths
673        if protocol == "file" or url.startswith("/"):
674            path = url if url.startswith("/") else pr.path
675            possible_mapping = self._mapping[protocol][""]
676
677            # Check each prefix (already sorted by length, longest first)
678            for prefix, profile in possible_mapping:
679                if path.startswith(prefix):
680                    # Calculate the relative path
681                    rel_path = path[len(prefix) :]
682                    if not rel_path.startswith("/"):
683                        rel_path = "/" + rel_path
684                    return profile, rel_path
685
686            return None
687
688        # For object storage
689        bucket = pr.netloc
690        path = pr.path
691        if path.startswith("/"):
692            path = path[1:]
693
694        # Check bucket-specific mapping
695        possible_mapping = self._mapping[protocol][bucket]
696
697        # Check each prefix (already sorted by length, longest first)
698        for prefix, profile in possible_mapping:
699            # matching prefix
700            if path.startswith(prefix):
701                rel_path = path[len(prefix) :]
702                # Remove leading slash if present
703                if rel_path.startswith("/"):
704                    rel_path = rel_path[1:]
705
706                return profile, rel_path
707
708        return None
709
710
[docs] 711class StorageClientConfig: 712 """ 713 Configuration class for the :py:class:`multistorageclient.StorageClient`. 714 """ 715 716 profile: str 717 storage_provider: StorageProvider 718 credentials_provider: Optional[CredentialsProvider] 719 metadata_provider: Optional[MetadataProvider] 720 cache_config: Optional[CacheConfig] 721 cache_manager: Optional[CacheManager] 722 retry_config: Optional[RetryConfig] 723 724 _config_dict: Optional[dict[str, Any]] 725 726 def __init__( 727 self, 728 profile: str, 729 storage_provider: StorageProvider, 730 credentials_provider: Optional[CredentialsProvider] = None, 731 metadata_provider: Optional[MetadataProvider] = None, 732 cache_config: Optional[CacheConfig] = None, 733 cache_manager: Optional[CacheManager] = None, 734 retry_config: Optional[RetryConfig] = None, 735 ): 736 self.profile = profile 737 self.storage_provider = storage_provider 738 self.credentials_provider = credentials_provider 739 self.metadata_provider = metadata_provider 740 self.cache_config = cache_config 741 self.retry_config = retry_config 742 self.cache_manager = cache_manager 743 744 @staticmethod 745 def from_json( 746 config_json: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None 747 ) -> "StorageClientConfig": 748 config_dict = json.loads(config_json) 749 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry) 750 751 @staticmethod 752 def from_yaml( 753 config_yaml: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None 754 ) -> "StorageClientConfig": 755 config_dict = yaml.safe_load(config_yaml) 756 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry) 757 758 @staticmethod 759 def from_dict( 760 config_dict: dict[str, Any], 761 profile: str = DEFAULT_POSIX_PROFILE_NAME, 762 skip_validation: bool = False, 763 telemetry: Optional[Telemetry] = None, 764 ) -> "StorageClientConfig": 765 # Validate the config file with predefined JSON schema 766 if not skip_validation: 767 validate_config(config_dict) 768 769 # Load config 770 loader = StorageClientConfigLoader(config_dict=config_dict, profile=profile, telemetry=telemetry) 771 config = loader.build_config() 772 config._config_dict = config_dict 773 774 return config 775 776 @staticmethod 777 def from_file( 778 profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None 779 ) -> "StorageClientConfig": 780 msc_config_file = os.getenv("MSC_CONFIG", None) 781 782 # Search config paths 783 if msc_config_file is None: 784 for filename in DEFAULT_MSC_CONFIG_FILE_SEARCH_PATHS: 785 if os.path.exists(filename): 786 msc_config_file = filename 787 break 788 789 msc_config_dict = StorageClientConfig.read_msc_config() 790 # Parse rclone config file. 791 rclone_config_dict, rclone_config_file = read_rclone_config() 792 793 # Merge config files. 794 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict) 795 if conflicted_keys: 796 raise ValueError( 797 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}' 798 ) 799 merged_profiles = merged_config.get("profiles", {}) 800 801 # Check if profile is in merged_profiles 802 if profile in merged_profiles: 803 return StorageClientConfig.from_dict(config_dict=merged_config, profile=profile, telemetry=telemetry) 804 else: 805 # Check if profile is the default profile or an implicit profile 806 if profile == DEFAULT_POSIX_PROFILE_NAME: 807 implicit_profile_config = DEFAULT_POSIX_PROFILE 808 elif profile.startswith("_"): 809 # Handle implicit profiles 810 parts = profile[1:].split("-", 1) 811 if len(parts) == 2: 812 protocol, bucket = parts 813 # Verify it's a supported protocol 814 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS: 815 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"') 816 implicit_profile_config = create_implicit_profile_config( 817 profile_name=profile, protocol=protocol, base_path=bucket 818 ) 819 else: 820 raise ValueError(f'Invalid implicit profile format: "{profile}"') 821 else: 822 raise ValueError( 823 f'Profile "{profile}" not found in configuration files. Configuration was checked in ' 824 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. " 825 f"Please verify that the profile exists and that configuration files are correctly located." 826 ) 827 # merge the implicit profile config into the merged config so the cache & observability config can be inherited 828 if "profiles" not in merged_config: 829 merged_config["profiles"] = implicit_profile_config["profiles"] 830 else: 831 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile] 832 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_" 833 return StorageClientConfig.from_dict( 834 config_dict=merged_config, profile=profile, skip_validation=True, telemetry=telemetry 835 ) 836 837 @staticmethod 838 def from_provider_bundle( 839 config_dict: dict[str, Any], provider_bundle: ProviderBundle, telemetry: Optional[Telemetry] = None 840 ) -> "StorageClientConfig": 841 loader = StorageClientConfigLoader( 842 config_dict=config_dict, provider_bundle=provider_bundle, telemetry=telemetry 843 ) 844 config = loader.build_config() 845 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors 846 return config 847
[docs] 848 @staticmethod 849 def read_msc_config() -> Optional[dict[str, Any]]: 850 """Get the MSC configuration dictionary. 851 852 :return: The MSC configuration dictionary or empty dict if no config was found 853 """ 854 config_dict = {} 855 config_found = False 856 857 # Check for environment variable first 858 msc_config = os.getenv("MSC_CONFIG", None) 859 if msc_config and os.path.exists(msc_config): 860 try: 861 with open(msc_config) as f: 862 if msc_config.endswith(".json"): 863 config_dict = json.load(f) 864 config_found = True 865 else: 866 config_dict = yaml.safe_load(f) 867 config_found = True 868 except Exception as e: 869 raise ValueError(f"malformed msc config file: {msc_config}, exception: {e}") 870 871 # If not found through environment variable, try standard search paths 872 if not config_found: 873 for path in DEFAULT_MSC_CONFIG_FILE_SEARCH_PATHS: 874 if not os.path.exists(path): 875 continue 876 try: 877 with open(path) as f: 878 if path.endswith(".json"): 879 config_dict = json.load(f) 880 config_found = True 881 break 882 else: 883 config_dict = yaml.safe_load(f) 884 config_found = True 885 break 886 except Exception as e: 887 raise ValueError(f"malformed msc config file: {msc_config}, exception: {e}") 888 889 if config_dict: 890 validate_config(config_dict) 891 return config_dict
892
[docs] 893 @staticmethod 894 def read_path_mapping() -> PathMapping: 895 """ 896 Get the path mapping defined in the MSC configuration. 897 898 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)] 899 where entries are sorted by prefix length (longest first) for optimal matching. 900 Longer paths take precedence when matching. 901 902 :return: A PathMapping instance with translation mappings 903 """ 904 try: 905 return PathMapping.from_config() 906 except Exception: 907 # Log the error but continue - this shouldn't stop the application from working 908 logger.error("Failed to load path_mapping from MSC config") 909 return PathMapping()
910 911 def __getstate__(self) -> dict[str, Any]: 912 state = self.__dict__.copy() 913 if not state.get("_config_dict"): 914 raise ValueError("StorageClientConfig is not serializable") 915 del state["credentials_provider"] 916 del state["storage_provider"] 917 del state["metadata_provider"] 918 del state["cache_manager"] 919 return state 920 921 def __setstate__(self, state: dict[str, Any]) -> None: 922 self.profile = state["profile"] 923 loader = StorageClientConfigLoader(state["_config_dict"], self.profile) 924 new_config = loader.build_config() 925 self.storage_provider = new_config.storage_provider 926 self.credentials_provider = new_config.credentials_provider 927 self.metadata_provider = new_config.metadata_provider 928 self.cache_config = new_config.cache_config 929 self.retry_config = new_config.retry_config 930 self.cache_manager = new_config.cache_manager