1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import json
17import logging
18import os
19import tempfile
20from collections import defaultdict
21from collections.abc import Sequence
22from pathlib import Path
23from typing import Any, Optional
24from urllib.parse import urlparse
25
26import opentelemetry.metrics as api_metrics
27import yaml
28
29from .cache import DEFAULT_CACHE_SIZE, CacheBackendFactory, CacheManager
30from .caching.cache_config import CacheBackendConfig, CacheConfig, EvictionPolicyConfig
31from .instrumentation import setup_opentelemetry
32from .providers.manifest_metadata import ManifestMetadataProvider
33from .rclone import read_rclone_config
34from .schema import validate_config
35from .telemetry import Telemetry
36from .telemetry.attributes.base import AttributesProvider
37from .types import (
38 DEFAULT_RETRY_ATTEMPTS,
39 DEFAULT_RETRY_DELAY,
40 MSC_PROTOCOL,
41 CredentialsProvider,
42 MetadataProvider,
43 ProviderBundle,
44 RetryConfig,
45 StorageProvider,
46 StorageProviderConfig,
47)
48from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
49
50# Constants related to implicit profiles
51SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
52PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
53 "s3": "s3",
54 "gs": "gcs",
55 "ais": "ais",
56 "file": "file",
57}
58
59_TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING = {
60 "environment_variables": "multistorageclient.telemetry.attributes.environment_variables.EnvironmentVariablesAttributesProvider",
61 "host": "multistorageclient.telemetry.attributes.host.HostAttributesProvider",
62 "msc_config": "multistorageclient.telemetry.attributes.msc_config.MSCConfigAttributesProvider",
63 "process": "multistorageclient.telemetry.attributes.process.ProcessAttributesProvider",
64 "static": "multistorageclient.telemetry.attributes.static.StaticAttributesProvider",
65 "thread": "multistorageclient.telemetry.attributes.thread.ThreadAttributesProvider",
66}
67
68
69# Template for creating implicit profile configurations
70def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
71 """
72 Create a configuration dictionary for an implicit profile.
73
74 :param profile_name: The name of the profile (e.g., "_s3-bucket1")
75 :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
76 :param base_path: The base path (e.g., bucket name) for the storage provider
77
78 :return: A configuration dictionary for the implicit profile
79 """
80 provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
81 return {
82 "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
83 }
84
85
86DEFAULT_POSIX_PROFILE_NAME = "default"
87DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
88
89STORAGE_PROVIDER_MAPPING = {
90 "file": "PosixFileStorageProvider",
91 "s3": "S3StorageProvider",
92 "gcs": "GoogleStorageProvider",
93 "oci": "OracleStorageProvider",
94 "azure": "AzureBlobStorageProvider",
95 "ais": "AIStoreStorageProvider",
96 "s8k": "S8KStorageProvider",
97 "gcs_s3": "GoogleS3StorageProvider",
98}
99
100CREDENTIALS_PROVIDER_MAPPING = {
101 "S3Credentials": "StaticS3CredentialsProvider",
102 "AzureCredentials": "StaticAzureCredentialsProvider",
103 "AISCredentials": "StaticAISCredentialProvider",
104 "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
105}
106
107
108def _find_config_file_paths():
109 """
110 Get configuration file search paths.
111
112 Returns paths in order of precedence:
113
114 1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
115 2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
116 """
117 paths = []
118
119 # 1. User-specific configuration directory
120 xdg_config_home = os.getenv("XDG_CONFIG_HOME")
121
122 if xdg_config_home:
123 paths.extend(
124 [
125 os.path.join(xdg_config_home, "msc", "config.yaml"),
126 os.path.join(xdg_config_home, "msc", "config.json"),
127 ]
128 )
129
130 user_home = os.getenv("HOME")
131
132 if user_home:
133 paths.extend(
134 [
135 os.path.join(user_home, ".msc_config.yaml"),
136 os.path.join(user_home, ".msc_config.json"),
137 os.path.join(user_home, ".config", "msc", "config.yaml"),
138 os.path.join(user_home, ".config", "msc", "config.json"),
139 ]
140 )
141
142 # 2. System-wide configuration directories
143 xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
144 if not xdg_config_dirs:
145 xdg_config_dirs = "/etc/xdg"
146
147 for config_dir in xdg_config_dirs.split(":"):
148 config_dir = config_dir.strip()
149 if config_dir:
150 paths.extend(
151 [
152 os.path.join(config_dir, "msc", "config.yaml"),
153 os.path.join(config_dir, "msc", "config.json"),
154 ]
155 )
156
157 paths.extend(
158 [
159 "/etc/msc_config.yaml",
160 "/etc/msc_config.json",
161 ]
162 )
163
164 return tuple(paths)
165
166
167DEFAULT_MSC_CONFIG_FILE_SEARCH_PATHS = _find_config_file_paths()
168
169PACKAGE_NAME = "multistorageclient"
170
171logger = logging.Logger(__name__)
172
173
174class SimpleProviderBundle(ProviderBundle):
175 def __init__(
176 self,
177 storage_provider_config: StorageProviderConfig,
178 credentials_provider: Optional[CredentialsProvider] = None,
179 metadata_provider: Optional[MetadataProvider] = None,
180 ):
181 self._storage_provider_config = storage_provider_config
182 self._credentials_provider = credentials_provider
183 self._metadata_provider = metadata_provider
184
185 @property
186 def storage_provider_config(self) -> StorageProviderConfig:
187 return self._storage_provider_config
188
189 @property
190 def credentials_provider(self) -> Optional[CredentialsProvider]:
191 return self._credentials_provider
192
193 @property
194 def metadata_provider(self) -> Optional[MetadataProvider]:
195 return self._metadata_provider
196
197
198DEFAULT_CACHE_REFRESH_INTERVAL = 300
199
200
201class StorageClientConfigLoader:
202 _provider_bundle: Optional[ProviderBundle]
203 _profiles: dict[str, Any]
204 _profile: str
205 _profile_dict: dict[str, Any]
206 _opentelemetry_dict: Optional[dict[str, Any]]
207 _metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]]
208 _metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]]
209 _metric_attributes_providers: Optional[Sequence[AttributesProvider]]
210 _cache_dict: Optional[dict[str, Any]]
211
212 def __init__(
213 self,
214 config_dict: dict[str, Any],
215 profile: str = DEFAULT_POSIX_PROFILE_NAME,
216 provider_bundle: Optional[ProviderBundle] = None,
217 telemetry: Optional[Telemetry] = None,
218 ) -> None:
219 """
220 Initializes a :py:class:`StorageClientConfigLoader` to create a
221 StorageClientConfig. Components are built using the ``config_dict`` and
222 profile, but a pre-built provider_bundle takes precedence.
223
224 :param config_dict: Dictionary of configuration options.
225 :param profile: Name of profile in ``config_dict`` to use to build configuration.
226 :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle`, takes precedence over ``config_dict``.
227 :param telemetry: Telemetry instance to use.
228 """
229 # ProviderBundle takes precedence
230 self._provider_bundle = provider_bundle
231
232 # Interpolates all environment variables into actual values.
233 config_dict = expand_env_vars(config_dict)
234
235 self._profiles = config_dict.get("profiles", {})
236
237 if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
238 # Assign the default POSIX profile
239 self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
240 else:
241 # Cannot override default POSIX profile
242 storage_provider_type = (
243 self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
244 )
245 if storage_provider_type != "file":
246 raise ValueError(
247 f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
248 f'"{storage_provider_type}"; expected "file".'
249 )
250
251 profile_dict = self._profiles.get(profile)
252
253 if not profile_dict:
254 raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
255
256 self._profile = profile
257 self._profile_dict = profile_dict
258
259 self._opentelemetry_dict = config_dict.get("opentelemetry", None)
260
261 self._metric_gauges = None
262 self._metric_counters = None
263 self._metric_attributes_providers = None
264 if self._opentelemetry_dict is not None:
265 if "metrics" in self._opentelemetry_dict:
266 if telemetry is not None:
267 self._metric_gauges = {}
268 for name in Telemetry.GaugeName:
269 gauge = telemetry.gauge(config=self._opentelemetry_dict["metrics"], name=name)
270 if gauge is not None:
271 self._metric_gauges[name] = gauge
272 self._metric_counters = {}
273 for name in Telemetry.CounterName:
274 counter = telemetry.counter(config=self._opentelemetry_dict["metrics"], name=name)
275 if counter is not None:
276 self._metric_counters[name] = counter
277
278 if "attributes" in self._opentelemetry_dict["metrics"]:
279 attributes_providers: list[AttributesProvider] = []
280 attributes_provider_configs: list[dict[str, Any]] = self._opentelemetry_dict["metrics"][
281 "attributes"
282 ]
283 for config in attributes_provider_configs:
284 attributes_provider_type: str = config["type"]
285 attributes_provider_fully_qualified_name = _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING.get(
286 attributes_provider_type, attributes_provider_type
287 )
288 attributes_provider_module_name, attributes_provider_class_name = (
289 attributes_provider_fully_qualified_name.rsplit(".", 1)
290 )
291 cls = import_class(attributes_provider_class_name, attributes_provider_module_name)
292 attributes_provider_options = config.get("options", {})
293 attributes_provider: AttributesProvider = cls(**attributes_provider_options)
294 attributes_providers.append(attributes_provider)
295 self._metric_attributes_providers = tuple(attributes_providers)
296 else:
297 # TODO: Remove "beta" from the log once legacy metrics are removed.
298 logging.error("No telemetry instance! Disabling beta metrics.")
299
300 self._cache_dict = config_dict.get("cache", None)
301
302 def _build_storage_provider(
303 self,
304 storage_provider_name: str,
305 storage_options: Optional[dict[str, Any]] = None,
306 credentials_provider: Optional[CredentialsProvider] = None,
307 ) -> StorageProvider:
308 if storage_options is None:
309 storage_options = {}
310 if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
311 raise ValueError(
312 f"Storage provider {storage_provider_name} is not supported. "
313 f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
314 )
315 if credentials_provider:
316 storage_options["credentials_provider"] = credentials_provider
317 if self._metric_gauges:
318 storage_options["metric_gauges"] = self._metric_gauges
319 if self._metric_counters:
320 storage_options["metric_counters"] = self._metric_counters
321 if self._metric_attributes_providers:
322 storage_options["metric_attributes_providers"] = self._metric_attributes_providers
323 class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
324 module_name = ".providers"
325 cls = import_class(class_name, module_name, PACKAGE_NAME)
326 return cls(**storage_options)
327
328 def _build_storage_provider_from_profile(self, storage_provider_profile: str):
329 storage_profile_dict = self._profiles.get(storage_provider_profile)
330 if not storage_profile_dict:
331 raise ValueError(
332 f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
333 )
334
335 # Check if metadata provider is configured for this profile
336 # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
337 local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
338 if local_metadata_provider_dict:
339 raise ValueError(
340 f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
341 )
342
343 # Initialize CredentialsProvider
344 local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
345 local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
346
347 # Initialize StorageProvider
348 local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
349 if local_storage_provider_dict:
350 local_name = local_storage_provider_dict["type"]
351 local_storage_options = local_storage_provider_dict.get("options", {})
352 else:
353 raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
354
355 storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
356 return storage_provider
357
358 def _build_credentials_provider(
359 self,
360 credentials_provider_dict: Optional[dict[str, Any]],
361 storage_options: Optional[dict[str, Any]] = None,
362 ) -> Optional[CredentialsProvider]:
363 """
364 Initializes the CredentialsProvider based on the provided dictionary.
365
366 Args:
367 credentials_provider_dict: Dictionary containing credentials provider configuration
368 storage_options: Storage provider options required by some credentials providers to scope the credentials.
369 """
370 if not credentials_provider_dict:
371 return None
372
373 if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
374 # Fully qualified class path case
375 class_type = credentials_provider_dict["type"]
376 module_name, class_name = class_type.rsplit(".", 1)
377 cls = import_class(class_name, module_name)
378 else:
379 # Mapped class name case
380 class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
381 module_name = ".providers"
382 cls = import_class(class_name, module_name, PACKAGE_NAME)
383
384 # Propagate storage provider options to credentials provider since they may be
385 # required by some credentials providers to scope the credentials.
386 import inspect
387
388 init_params = list(inspect.signature(cls.__init__).parameters)[1:] # skip 'self'
389 options = credentials_provider_dict.get("options", {})
390 if storage_options:
391 for storage_provider_option in storage_options.keys():
392 if storage_provider_option in init_params and storage_provider_option not in options:
393 options[storage_provider_option] = storage_options[storage_provider_option]
394
395 return cls(**options)
396
397 def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
398 # Initialize StorageProvider
399 storage_provider_dict = profile_dict.get("storage_provider", None)
400 if storage_provider_dict:
401 storage_provider_name = storage_provider_dict["type"]
402 storage_options = storage_provider_dict.get("options", {})
403 else:
404 raise ValueError("Missing storage_provider in the config.")
405
406 # Initialize CredentialsProvider
407 # It is prudent to assume that in some cases, the credentials provider
408 # will provide credentials scoped to specific base_path.
409 # So we need to pass the storage_options to the credentials provider.
410 credentials_provider_dict = profile_dict.get("credentials_provider", None)
411 credentials_provider = self._build_credentials_provider(
412 credentials_provider_dict=credentials_provider_dict,
413 storage_options=storage_options,
414 )
415
416 # Initialize MetadataProvider
417 metadata_provider_dict = profile_dict.get("metadata_provider", None)
418 metadata_provider = None
419 if metadata_provider_dict:
420 if metadata_provider_dict["type"] == "manifest":
421 metadata_options = metadata_provider_dict.get("options", {})
422 # If MetadataProvider has a reference to a different storage provider profile
423 storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
424 if storage_provider_profile:
425 storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
426 else:
427 storage_provider = self._build_storage_provider(
428 storage_provider_name, storage_options, credentials_provider
429 )
430
431 metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
432 else:
433 class_type = metadata_provider_dict["type"]
434 if "." not in class_type:
435 raise ValueError(
436 f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
437 )
438 module_name, class_name = class_type.rsplit(".", 1)
439 cls = import_class(class_name, module_name)
440 options = metadata_provider_dict.get("options", {})
441 metadata_provider = cls(**options)
442
443 return SimpleProviderBundle(
444 storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
445 credentials_provider=credentials_provider,
446 metadata_provider=metadata_provider,
447 )
448
449 def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
450 class_type = provider_bundle_dict["type"]
451 module_name, class_name = class_type.rsplit(".", 1)
452 cls = import_class(class_name, module_name)
453 options = provider_bundle_dict.get("options", {})
454 return cls(**options)
455
456 def _build_provider_bundle(self) -> ProviderBundle:
457 if self._provider_bundle:
458 return self._provider_bundle # Return if previously provided.
459
460 # Load 3rd party extension
461 provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
462 if provider_bundle_dict:
463 return self._build_provider_bundle_from_extension(provider_bundle_dict)
464
465 return self._build_provider_bundle_from_config(self._profile_dict)
466
467 def _build_cache_manager(self, cache_config: CacheConfig) -> CacheManager:
468 cache_storage_provider = None
469
470 # Use the storage provider profile from cache config if specified
471 if cache_config.backend and cache_config.backend.storage_provider_profile:
472 cache_profile = cache_config.backend.storage_provider_profile
473 if cache_profile == self._profile:
474 logger.warning(
475 f"Same profile used for cache backend and storage provider: {cache_profile}. "
476 "This is not recommended as it may lead to unintended modifications of the source data. "
477 "Consider using a separate read-only profile for the cache backend."
478 )
479
480 cache_storage_provider = self._build_storage_provider_from_profile(cache_profile)
481
482 cache_manager = CacheBackendFactory.create(
483 profile=self._profile, cache_config=cache_config, storage_provider=cache_storage_provider
484 )
485 return cache_manager
486
487 def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
488 if "size_mb" in cache_dict:
489 raise ValueError(
490 "The 'size_mb' property is no longer supported. \n"
491 "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
492 "Example configuration:\n"
493 "cache:\n"
494 " size: 500G # Optional: If not specified, default cache size (10GB) will be used\n"
495 " use_etag: true # Optional: If not specified, default cache use_etag (true) will be used\n"
496 " location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
497 " eviction_policy: # Optional: The eviction policy to use\n"
498 " policy: fifo # Optional: The eviction policy to use, default is 'fifo'\n"
499 " refresh_interval: 300 # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
500 )
501
502 def build_config(self) -> "StorageClientConfig":
503 bundle = self._build_provider_bundle()
504 storage_provider = self._build_storage_provider(
505 bundle.storage_provider_config.type,
506 bundle.storage_provider_config.options,
507 bundle.credentials_provider,
508 )
509
510 cache_config: Optional[CacheConfig] = None
511 cache_manager: Optional[CacheManager] = None
512
513 if self._cache_dict is not None:
514 tempdir = tempfile.gettempdir()
515 default_location = os.path.join(tempdir, "msc_cache")
516 location = self._cache_dict.get("location", default_location)
517
518 if not Path(location).is_absolute():
519 raise ValueError(f"Cache location must be an absolute path: {location}")
520
521 # Initialize cache_dict with default values
522 cache_dict = self._cache_dict
523
524 # Verify cache config
525 self._verify_cache_config(cache_dict)
526
527 # Initialize eviction policy
528 if "eviction_policy" in cache_dict:
529 policy = cache_dict["eviction_policy"]["policy"].lower()
530 eviction_policy = EvictionPolicyConfig(
531 policy=policy,
532 refresh_interval=cache_dict["eviction_policy"].get(
533 "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
534 ),
535 )
536 else:
537 eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
538
539 # Create cache config from the standardized format
540 cache_config = CacheConfig(
541 size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
542 use_etag=cache_dict.get("use_etag", True),
543 eviction_policy=eviction_policy,
544 backend=CacheBackendConfig(
545 cache_path=cache_dict.get("cache_backend", {}).get("cache_path", location),
546 storage_provider_profile=cache_dict.get("cache_backend", {}).get("storage_provider_profile"),
547 ),
548 )
549
550 cache_manager = self._build_cache_manager(cache_config)
551
552 # retry options
553 retry_config_dict = self._profile_dict.get("retry", None)
554 if retry_config_dict:
555 attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
556 delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
557 retry_config = RetryConfig(attempts=attempts, delay=delay)
558 else:
559 retry_config = RetryConfig(attempts=DEFAULT_RETRY_ATTEMPTS, delay=DEFAULT_RETRY_DELAY)
560
561 # set up OpenTelemetry providers once per process
562 #
563 # TODO: Legacy, need to remove.
564 if self._opentelemetry_dict:
565 setup_opentelemetry(self._opentelemetry_dict)
566
567 return StorageClientConfig(
568 profile=self._profile,
569 storage_provider=storage_provider,
570 credentials_provider=bundle.credentials_provider,
571 metadata_provider=bundle.metadata_provider,
572 cache_config=cache_config,
573 cache_manager=cache_manager,
574 retry_config=retry_config,
575 )
576
577
578class PathMapping:
579 """
580 Class to handle path mappings defined in the MSC configuration.
581
582 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
583 where entries are sorted by prefix length (longest first) for optimal matching.
584 Longer paths take precedence when matching.
585 """
586
587 def __init__(self):
588 """Initialize an empty PathMapping."""
589 self._mapping = defaultdict(lambda: defaultdict(list))
590
591 @classmethod
592 def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
593 """
594 Create a PathMapping instance from configuration dictionary.
595
596 :param config_dict: Configuration dictionary, if None the config will be loaded
597 :return: A PathMapping instance with processed mappings
598 """
599 if config_dict is None:
600 # Import locally to avoid circular imports
601 from multistorageclient.config import StorageClientConfig
602
603 config_dict = StorageClientConfig.read_msc_config()
604
605 if not config_dict:
606 return cls()
607
608 instance = cls()
609 instance._load_mapping(config_dict)
610 return instance
611
612 def _load_mapping(self, config_dict: dict[str, Any]) -> None:
613 """
614 Load path mapping from a configuration dictionary.
615
616 :param config_dict: Configuration dictionary containing path mapping
617 """
618 # Get the path_mapping section
619 path_mapping = config_dict.get("path_mapping", {})
620 if path_mapping is None:
621 return
622
623 # Process each mapping
624 for source_path, dest_path in path_mapping.items():
625 # Validate format
626 if not source_path.endswith("/"):
627 continue
628 if not dest_path.startswith(MSC_PROTOCOL):
629 continue
630 if not dest_path.endswith("/"):
631 continue
632
633 # Extract the destination profile
634 pr_dest = urlparse(dest_path)
635 dest_profile = pr_dest.netloc
636
637 # Parse the source path
638 pr = urlparse(source_path)
639 protocol = pr.scheme.lower() if pr.scheme else "file"
640
641 if protocol == "file" or source_path.startswith("/"):
642 # For file or absolute paths, use the whole path as the prefix
643 # and leave bucket empty
644 bucket = ""
645 prefix = source_path if source_path.startswith("/") else pr.path
646 else:
647 # For object storage, extract bucket and prefix
648 bucket = pr.netloc
649 prefix = pr.path
650 if prefix.startswith("/"):
651 prefix = prefix[1:]
652
653 # Add the mapping to the nested dict
654 self._mapping[protocol][bucket].append((prefix, dest_profile))
655
656 # Sort each bucket's prefixes by length (longest first) for optimal matching
657 for protocol, buckets in self._mapping.items():
658 for bucket, prefixes in buckets.items():
659 self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
660
661 def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
662 """
663 Find the best matching mapping for the given URL.
664
665 :param url: URL to find matching mapping for
666 :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
667 """
668 # Parse the URL
669 pr = urlparse(url)
670 protocol = pr.scheme.lower() if pr.scheme else "file"
671
672 # For file paths or absolute paths
673 if protocol == "file" or url.startswith("/"):
674 path = url if url.startswith("/") else pr.path
675 possible_mapping = self._mapping[protocol][""]
676
677 # Check each prefix (already sorted by length, longest first)
678 for prefix, profile in possible_mapping:
679 if path.startswith(prefix):
680 # Calculate the relative path
681 rel_path = path[len(prefix) :]
682 if not rel_path.startswith("/"):
683 rel_path = "/" + rel_path
684 return profile, rel_path
685
686 return None
687
688 # For object storage
689 bucket = pr.netloc
690 path = pr.path
691 if path.startswith("/"):
692 path = path[1:]
693
694 # Check bucket-specific mapping
695 possible_mapping = self._mapping[protocol][bucket]
696
697 # Check each prefix (already sorted by length, longest first)
698 for prefix, profile in possible_mapping:
699 # matching prefix
700 if path.startswith(prefix):
701 rel_path = path[len(prefix) :]
702 # Remove leading slash if present
703 if rel_path.startswith("/"):
704 rel_path = rel_path[1:]
705
706 return profile, rel_path
707
708 return None
709
710
[docs]
711class StorageClientConfig:
712 """
713 Configuration class for the :py:class:`multistorageclient.StorageClient`.
714 """
715
716 profile: str
717 storage_provider: StorageProvider
718 credentials_provider: Optional[CredentialsProvider]
719 metadata_provider: Optional[MetadataProvider]
720 cache_config: Optional[CacheConfig]
721 cache_manager: Optional[CacheManager]
722 retry_config: Optional[RetryConfig]
723
724 _config_dict: Optional[dict[str, Any]]
725
726 def __init__(
727 self,
728 profile: str,
729 storage_provider: StorageProvider,
730 credentials_provider: Optional[CredentialsProvider] = None,
731 metadata_provider: Optional[MetadataProvider] = None,
732 cache_config: Optional[CacheConfig] = None,
733 cache_manager: Optional[CacheManager] = None,
734 retry_config: Optional[RetryConfig] = None,
735 ):
736 self.profile = profile
737 self.storage_provider = storage_provider
738 self.credentials_provider = credentials_provider
739 self.metadata_provider = metadata_provider
740 self.cache_config = cache_config
741 self.retry_config = retry_config
742 self.cache_manager = cache_manager
743
744 @staticmethod
745 def from_json(
746 config_json: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None
747 ) -> "StorageClientConfig":
748 config_dict = json.loads(config_json)
749 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry)
750
751 @staticmethod
752 def from_yaml(
753 config_yaml: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None
754 ) -> "StorageClientConfig":
755 config_dict = yaml.safe_load(config_yaml)
756 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry)
757
758 @staticmethod
759 def from_dict(
760 config_dict: dict[str, Any],
761 profile: str = DEFAULT_POSIX_PROFILE_NAME,
762 skip_validation: bool = False,
763 telemetry: Optional[Telemetry] = None,
764 ) -> "StorageClientConfig":
765 # Validate the config file with predefined JSON schema
766 if not skip_validation:
767 validate_config(config_dict)
768
769 # Load config
770 loader = StorageClientConfigLoader(config_dict=config_dict, profile=profile, telemetry=telemetry)
771 config = loader.build_config()
772 config._config_dict = config_dict
773
774 return config
775
776 @staticmethod
777 def from_file(
778 profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None
779 ) -> "StorageClientConfig":
780 msc_config_file = os.getenv("MSC_CONFIG", None)
781
782 # Search config paths
783 if msc_config_file is None:
784 for filename in DEFAULT_MSC_CONFIG_FILE_SEARCH_PATHS:
785 if os.path.exists(filename):
786 msc_config_file = filename
787 break
788
789 msc_config_dict = StorageClientConfig.read_msc_config()
790 # Parse rclone config file.
791 rclone_config_dict, rclone_config_file = read_rclone_config()
792
793 # Merge config files.
794 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict)
795 if conflicted_keys:
796 raise ValueError(
797 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}'
798 )
799 merged_profiles = merged_config.get("profiles", {})
800
801 # Check if profile is in merged_profiles
802 if profile in merged_profiles:
803 return StorageClientConfig.from_dict(config_dict=merged_config, profile=profile, telemetry=telemetry)
804 else:
805 # Check if profile is the default profile or an implicit profile
806 if profile == DEFAULT_POSIX_PROFILE_NAME:
807 implicit_profile_config = DEFAULT_POSIX_PROFILE
808 elif profile.startswith("_"):
809 # Handle implicit profiles
810 parts = profile[1:].split("-", 1)
811 if len(parts) == 2:
812 protocol, bucket = parts
813 # Verify it's a supported protocol
814 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS:
815 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"')
816 implicit_profile_config = create_implicit_profile_config(
817 profile_name=profile, protocol=protocol, base_path=bucket
818 )
819 else:
820 raise ValueError(f'Invalid implicit profile format: "{profile}"')
821 else:
822 raise ValueError(
823 f'Profile "{profile}" not found in configuration files. Configuration was checked in '
824 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. "
825 f"Please verify that the profile exists and that configuration files are correctly located."
826 )
827 # merge the implicit profile config into the merged config so the cache & observability config can be inherited
828 if "profiles" not in merged_config:
829 merged_config["profiles"] = implicit_profile_config["profiles"]
830 else:
831 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile]
832 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_"
833 return StorageClientConfig.from_dict(
834 config_dict=merged_config, profile=profile, skip_validation=True, telemetry=telemetry
835 )
836
837 @staticmethod
838 def from_provider_bundle(
839 config_dict: dict[str, Any], provider_bundle: ProviderBundle, telemetry: Optional[Telemetry] = None
840 ) -> "StorageClientConfig":
841 loader = StorageClientConfigLoader(
842 config_dict=config_dict, provider_bundle=provider_bundle, telemetry=telemetry
843 )
844 config = loader.build_config()
845 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors
846 return config
847
[docs]
848 @staticmethod
849 def read_msc_config() -> Optional[dict[str, Any]]:
850 """Get the MSC configuration dictionary.
851
852 :return: The MSC configuration dictionary or empty dict if no config was found
853 """
854 config_dict = {}
855 config_found = False
856
857 # Check for environment variable first
858 msc_config = os.getenv("MSC_CONFIG", None)
859 if msc_config and os.path.exists(msc_config):
860 try:
861 with open(msc_config) as f:
862 if msc_config.endswith(".json"):
863 config_dict = json.load(f)
864 config_found = True
865 else:
866 config_dict = yaml.safe_load(f)
867 config_found = True
868 except Exception as e:
869 raise ValueError(f"malformed msc config file: {msc_config}, exception: {e}")
870
871 # If not found through environment variable, try standard search paths
872 if not config_found:
873 for path in DEFAULT_MSC_CONFIG_FILE_SEARCH_PATHS:
874 if not os.path.exists(path):
875 continue
876 try:
877 with open(path) as f:
878 if path.endswith(".json"):
879 config_dict = json.load(f)
880 config_found = True
881 break
882 else:
883 config_dict = yaml.safe_load(f)
884 config_found = True
885 break
886 except Exception as e:
887 raise ValueError(f"malformed msc config file: {msc_config}, exception: {e}")
888
889 if config_dict:
890 validate_config(config_dict)
891 return config_dict
892
[docs]
893 @staticmethod
894 def read_path_mapping() -> PathMapping:
895 """
896 Get the path mapping defined in the MSC configuration.
897
898 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
899 where entries are sorted by prefix length (longest first) for optimal matching.
900 Longer paths take precedence when matching.
901
902 :return: A PathMapping instance with translation mappings
903 """
904 try:
905 return PathMapping.from_config()
906 except Exception:
907 # Log the error but continue - this shouldn't stop the application from working
908 logger.error("Failed to load path_mapping from MSC config")
909 return PathMapping()
910
911 def __getstate__(self) -> dict[str, Any]:
912 state = self.__dict__.copy()
913 if not state.get("_config_dict"):
914 raise ValueError("StorageClientConfig is not serializable")
915 del state["credentials_provider"]
916 del state["storage_provider"]
917 del state["metadata_provider"]
918 del state["cache_manager"]
919 return state
920
921 def __setstate__(self, state: dict[str, Any]) -> None:
922 self.profile = state["profile"]
923 loader = StorageClientConfigLoader(state["_config_dict"], self.profile)
924 new_config = loader.build_config()
925 self.storage_provider = new_config.storage_provider
926 self.credentials_provider = new_config.credentials_provider
927 self.metadata_provider = new_config.metadata_provider
928 self.cache_config = new_config.cache_config
929 self.retry_config = new_config.retry_config
930 self.cache_manager = new_config.cache_manager