1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import copy
17import json
18import logging
19import os
20import tempfile
21from collections import defaultdict
22from collections.abc import Callable
23from pathlib import Path
24from typing import Any, Iterable, Optional, Union
25from urllib.parse import urlparse
26
27import yaml
28
29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
31from .providers.manifest_metadata import ManifestMetadataProvider
32from .rclone import read_rclone_config
33from .schema import validate_config
34from .telemetry import Telemetry
35from .telemetry import init as telemetry_init
36from .types import (
37 DEFAULT_RETRY_ATTEMPTS,
38 DEFAULT_RETRY_BACKOFF_MULTIPLIER,
39 DEFAULT_RETRY_DELAY,
40 MSC_PROTOCOL,
41 AutoCommitConfig,
42 CredentialsProvider,
43 MetadataProvider,
44 ProviderBundle,
45 ProviderBundleV2,
46 Replica,
47 RetryConfig,
48 StorageBackend,
49 StorageProvider,
50 StorageProviderConfig,
51)
52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
53
54# Constants related to implicit profiles
55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
57 "s3": "s3",
58 "gs": "gcs",
59 "ais": "ais",
60 "file": "file",
61}
62
63
64# Template for creating implicit profile configurations
65def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
66 """
67 Create a configuration dictionary for an implicit profile.
68
69 :param profile_name: The name of the profile (e.g., "_s3-bucket1")
70 :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
71 :param base_path: The base path (e.g., bucket name) for the storage provider
72
73 :return: A configuration dictionary for the implicit profile
74 """
75 provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
76 return {
77 "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
78 }
79
80
81DEFAULT_POSIX_PROFILE_NAME = "default"
82DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
83
84STORAGE_PROVIDER_MAPPING = {
85 "file": "PosixFileStorageProvider",
86 "s3": "S3StorageProvider",
87 "gcs": "GoogleStorageProvider",
88 "oci": "OracleStorageProvider",
89 "azure": "AzureBlobStorageProvider",
90 "ais": "AIStoreStorageProvider",
91 "ais_s3": "AIStoreS3StorageProvider",
92 "s8k": "S8KStorageProvider",
93 "gcs_s3": "GoogleS3StorageProvider",
94 "huggingface": "HuggingFaceStorageProvider",
95}
96
97CREDENTIALS_PROVIDER_MAPPING = {
98 "S3Credentials": "StaticS3CredentialsProvider",
99 "AzureCredentials": "StaticAzureCredentialsProvider",
100 "AISCredentials": "StaticAISCredentialProvider",
101 "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
102 "GoogleServiceAccountCredentialsProvider": "GoogleServiceAccountCredentialsProvider",
103 "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
104 "FileBasedCredentials": "FileBasedCredentialsProvider",
105}
106
107
108def _resolve_include_path(include_path: str, parent_config_path: str) -> str:
109 """
110 Resolve include path (absolute or relative to parent config file).
111
112 :param include_path: Path from include keyword (can be absolute or relative)
113 :param parent_config_path: Absolute path of the config file containing the include
114 :return: Absolute, normalized path to the included config file
115 """
116 if os.path.isabs(include_path):
117 return os.path.abspath(include_path)
118 else:
119 parent_dir = os.path.dirname(parent_config_path)
120 return os.path.abspath(os.path.join(parent_dir, include_path))
121
122
123def _merge_profiles(
124 base_profiles: dict[str, Any],
125 new_profiles: dict[str, Any],
126 base_path: str,
127 new_path: str,
128) -> dict[str, Any]:
129 """
130 Merge profiles from two config files with conflict detection.
131
132 Profiles with the same name are allowed if their definitions are identical (idempotent).
133 If a profile name exists in both configs with different definitions, an error is raised.
134
135 NOTE: This function performs SHALLOW merging - each profile is treated as a complete unit.
136 We cannot use merge_dictionaries_no_overwrite() here because it would recursively merge
137 profile fields, which could lead to unintended behavior. For example:
138
139 base_config:
140 profiles:
141 my-profile:
142 storage_provider:
143 type: s3
144 options:
145 base_path: bucket
146
147 new_config:
148 profiles:
149 my-profile:
150 credentials_provider:
151 type: S3Credentials,
152 options:
153 access_key: foo
154 secret_key: bar
155
156 If we used merge_dictionaries_no_overwrite(), the result would be a profile with BOTH storage_provider
157 and credentials_provider, which might not be the intended configuration.
158
159 :param base_profiles: Profiles from the base config
160 :param new_profiles: Profiles from the new config to merge
161 :param base_path: Path to the base config file (for error messages)
162 :param new_path: Path to the new config file (for error messages)
163 :return: Merged profiles dictionary
164 :raises ValueError: If any profile name exists in both configs with different definitions
165 """
166 result = base_profiles.copy()
167 conflicting_profiles = []
168
169 for profile_name, new_profile_def in new_profiles.items():
170 if profile_name in result:
171 if result[profile_name] != new_profile_def:
172 conflicting_profiles.append(profile_name)
173 else:
174 result[profile_name] = new_profile_def
175
176 if conflicting_profiles:
177 conflicts_list = ", ".join(f"'{p}'" for p in sorted(conflicting_profiles))
178 raise ValueError(f"Profile conflict: {conflicts_list} defined differently in {base_path} and {new_path}")
179
180 return result
181
182
183def _merge_configs(
184 base_config: dict[str, Any],
185 new_config: dict[str, Any],
186 base_path: str,
187 new_path: str,
188) -> dict[str, Any]:
189 """
190 Merge two config dictionaries with field-specific strategies.
191
192 Different config fields have different merge strategies:
193 - profiles: Shallow merge with idempotent check (uses _merge_profiles)
194 - path_mapping: Flat dict merge with idempotent check
195 - experimental_features: Flat dict merge with idempotent check
196 - cache, opentelemetry, posix: Global configs, idempotent if identical, error if different
197
198 :param base_config: Base configuration dictionary
199 :param new_config: New configuration to merge
200 :param base_path: Path to base config file (for error messages)
201 :param new_path: Path to new config file (for error messages)
202 :return: Merged configuration dictionary
203 :raises ValueError: If conflicts are detected
204 """
205 result = {}
206
207 all_keys = set(base_config.keys()) | set(new_config.keys())
208
209 for key in all_keys:
210 base_value = base_config.get(key)
211 new_value = new_config.get(key)
212
213 # Key only in base
214 if key not in new_config:
215 result[key] = base_value
216 continue
217
218 # Key only in new
219 if key not in base_config:
220 result[key] = new_value
221 continue
222
223 # Key in both - need to merge or detect conflict
224 if key == "profiles":
225 result["profiles"] = _merge_profiles(base_value or {}, new_value or {}, base_path, new_path)
226
227 elif key in ("path_mapping", "experimental_features"):
228 merged, conflicts = merge_dictionaries_no_overwrite(
229 (base_value or {}).copy(), new_value or {}, allow_idempotent=True
230 )
231 if conflicts:
232 conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
233 raise ValueError(
234 f"Config merge conflict: {conflicts_list} have different values in {base_path} and {new_path}"
235 )
236 result[key] = merged
237
238 elif key in ("cache", "opentelemetry", "posix"):
239 if base_value != new_value:
240 raise ValueError(f"'{key}' defined differently in {base_path} and {new_path}")
241 result[key] = base_value
242
243 elif key == "include":
244 # 'include' is processed by _load_and_merge_includes, not part of final config
245 pass
246
247 else:
248 # This should never happen and all top level fields must have explicit handling above
249 raise ValueError(f"Unknown field '{key}' in config file.")
250
251 return result
252
253
254def _load_and_merge_includes(
255 main_config_path: str,
256 main_config_dict: dict[str, Any],
257) -> dict[str, Any]:
258 """
259 Load and merge included config files.
260
261 Processes the 'include' directive in the main config, loading and merging all
262 specified config files. Only supports one level of includes - included files
263 cannot themselves have 'include' directives.
264
265 :param main_config_path: Absolute path to the main config file
266 :param main_config_dict: Dictionary loaded from the main config file
267 :return: Merged configuration dictionary (without 'include' field)
268 :raises ValueError: If include file not found, malformed, or contains nested includes
269 """
270 include_paths = main_config_dict.get("include", [])
271
272 if not include_paths:
273 return {k: v for k, v in main_config_dict.items() if k != "include"}
274
275 merged_config = {k: v for k, v in main_config_dict.items() if k != "include"}
276
277 for include_path in include_paths:
278 resolved_path = _resolve_include_path(include_path, main_config_path)
279
280 if not os.path.exists(resolved_path):
281 raise ValueError(f"Included config file not found: {resolved_path} (from {main_config_path})")
282
283 try:
284 with open(resolved_path) as f:
285 if resolved_path.endswith(".json"):
286 included_config = json.load(f)
287 else:
288 included_config = yaml.safe_load(f)
289 except Exception as e:
290 raise ValueError(f"Failed to load included config {resolved_path}: {e}")
291
292 validate_config(included_config)
293 if "include" in included_config:
294 raise ValueError(f"Nested includes not allowed: {resolved_path} contains 'include' directive")
295
296 merged_config = _merge_configs(merged_config, included_config, main_config_path, resolved_path)
297
298 return merged_config
299
300
301def _find_config_file_paths() -> tuple[str]:
302 """
303 Get configuration file search paths.
304
305 Returns paths in order of precedence:
306
307 1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
308 2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
309 """
310 paths = []
311
312 # 1. User-specific configuration directory
313 xdg_config_home = os.getenv("XDG_CONFIG_HOME")
314
315 if xdg_config_home:
316 paths.extend(
317 [
318 os.path.join(xdg_config_home, "msc", "config.yaml"),
319 os.path.join(xdg_config_home, "msc", "config.json"),
320 ]
321 )
322
323 user_home = os.getenv("HOME")
324
325 if user_home:
326 paths.extend(
327 [
328 os.path.join(user_home, ".msc_config.yaml"),
329 os.path.join(user_home, ".msc_config.json"),
330 os.path.join(user_home, ".config", "msc", "config.yaml"),
331 os.path.join(user_home, ".config", "msc", "config.json"),
332 ]
333 )
334
335 # 2. System-wide configuration directories
336 xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
337 if not xdg_config_dirs:
338 xdg_config_dirs = "/etc/xdg"
339
340 for config_dir in xdg_config_dirs.split(":"):
341 config_dir = config_dir.strip()
342 if config_dir:
343 paths.extend(
344 [
345 os.path.join(config_dir, "msc", "config.yaml"),
346 os.path.join(config_dir, "msc", "config.json"),
347 ]
348 )
349
350 paths.extend(
351 [
352 "/etc/msc_config.yaml",
353 "/etc/msc_config.json",
354 ]
355 )
356
357 return tuple(paths)
358
359
360PACKAGE_NAME = "multistorageclient"
361
362logger = logging.getLogger(__name__)
363
364
365class ImmutableDict(dict):
366 """
367 Immutable dictionary that raises an error when attempting to modify it.
368 """
369
370 def __init__(self, *args, **kwargs):
371 super().__init__(*args, **kwargs)
372
373 # Recursively freeze nested structures
374 for key, value in list(super().items()):
375 if isinstance(value, dict) and not isinstance(value, ImmutableDict):
376 super().__setitem__(key, ImmutableDict(value))
377 elif isinstance(value, list):
378 super().__setitem__(key, self._freeze_list(value))
379
380 @staticmethod
381 def _freeze_list(lst):
382 """
383 Convert list to tuple, freezing nested dicts recursively.
384 """
385 frozen = []
386 for item in lst:
387 if isinstance(item, dict):
388 frozen.append(ImmutableDict(item))
389 elif isinstance(item, list):
390 frozen.append(ImmutableDict._freeze_list(item))
391 else:
392 frozen.append(item)
393 return tuple(frozen)
394
395 def __deepcopy__(self, memo):
396 """
397 Return a regular mutable dict when deepcopy is called.
398 """
399 return copy.deepcopy(dict(self), memo)
400
401 def __reduce__(self):
402 """
403 Support for pickle serialization.
404 """
405 return (self.__class__, (dict(self),))
406
407 def _copy_value(self, value):
408 """
409 Convert frozen structures back to mutable equivalents.
410 """
411 if isinstance(value, ImmutableDict):
412 return {k: self._copy_value(v) for k, v in value.items()}
413 elif isinstance(value, tuple):
414 # Check if it was originally a list (frozen by _freeze_list)
415 return [self._copy_value(item) for item in value]
416 else:
417 return value
418
419 def __getitem__(self, key):
420 """
421 Return a mutable copy of the value.
422 """
423 value = super().__getitem__(key)
424 return self._copy_value(value)
425
426 def get(self, key, default=None):
427 """
428 Return a mutable copy of the value.
429 """
430 return self[key] if key in self else default
431
432 def __setitem__(self, key, value):
433 raise TypeError("ImmutableDict is immutable")
434
435 def __delitem__(self, key):
436 raise TypeError("ImmutableDict is immutable")
437
438 def clear(self):
439 raise TypeError("ImmutableDict is immutable")
440
441 def pop(self, *args):
442 raise TypeError("ImmutableDict is immutable")
443
444 def popitem(self):
445 raise TypeError("ImmutableDict is immutable")
446
447 def setdefault(self, key, default=None):
448 raise TypeError("ImmutableDict is immutable")
449
450 def update(self, *args, **kwargs):
451 raise TypeError("ImmutableDict is immutable")
452
453
454class SimpleProviderBundle(ProviderBundle):
455 def __init__(
456 self,
457 storage_provider_config: StorageProviderConfig,
458 credentials_provider: Optional[CredentialsProvider] = None,
459 metadata_provider: Optional[MetadataProvider] = None,
460 replicas: Optional[list[Replica]] = None,
461 ):
462 if replicas is None:
463 replicas = []
464
465 self._storage_provider_config = storage_provider_config
466 self._credentials_provider = credentials_provider
467 self._metadata_provider = metadata_provider
468 self._replicas = replicas
469
470 @property
471 def storage_provider_config(self) -> StorageProviderConfig:
472 return self._storage_provider_config
473
474 @property
475 def credentials_provider(self) -> Optional[CredentialsProvider]:
476 return self._credentials_provider
477
478 @property
479 def metadata_provider(self) -> Optional[MetadataProvider]:
480 return self._metadata_provider
481
482 @property
483 def replicas(self) -> list[Replica]:
484 return self._replicas
485
486
487class SimpleProviderBundleV2(ProviderBundleV2):
488 def __init__(
489 self, storage_backends: dict[str, StorageBackend], metadata_provider: Optional[MetadataProvider] = None
490 ):
491 self._storage_backends = storage_backends
492 self._metadata_provider = metadata_provider
493
494 @staticmethod
495 def from_v1_bundle(profile_name: str, v1_bundle: ProviderBundle) -> "SimpleProviderBundleV2":
496 backend = StorageBackend(
497 storage_provider_config=v1_bundle.storage_provider_config,
498 credentials_provider=v1_bundle.credentials_provider,
499 replicas=v1_bundle.replicas,
500 )
501
502 return SimpleProviderBundleV2(
503 storage_backends={profile_name: backend},
504 metadata_provider=v1_bundle.metadata_provider,
505 )
506
507 @property
508 def storage_backends(self) -> dict[str, StorageBackend]:
509 return self._storage_backends
510
511 @property
512 def metadata_provider(self) -> Optional[MetadataProvider]:
513 return self._metadata_provider
514
515
516DEFAULT_CACHE_REFRESH_INTERVAL = 300
517
518
519class StorageClientConfigLoader:
520 _provider_bundle: ProviderBundleV2
521 _resolved_config_dict: dict[str, Any]
522 _profiles: dict[str, Any]
523 _profile: str
524 _profile_dict: dict[str, Any]
525 _opentelemetry_dict: Optional[dict[str, Any]]
526 _telemetry_provider: Optional[Callable[[], Telemetry]]
527 _cache_dict: Optional[dict[str, Any]]
528
529 def __init__(
530 self,
531 config_dict: dict[str, Any],
532 profile: str = DEFAULT_POSIX_PROFILE_NAME,
533 provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]] = None,
534 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
535 ) -> None:
536 """
537 Initializes a :py:class:`StorageClientConfigLoader` to create a
538 StorageClientConfig. Components are built using the ``config_dict`` and
539 profile, but a pre-built provider_bundle takes precedence.
540
541 :param config_dict: Dictionary of configuration options.
542 :param profile: Name of profile in ``config_dict`` to use to build configuration.
543 :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle` or :py:class:`multistorageclient.types.ProviderBundleV2`, takes precedence over ``config_dict``.
544 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
545 """
546 # Interpolates all environment variables into actual values.
547 config_dict = expand_env_vars(config_dict)
548 self._resolved_config_dict = ImmutableDict(config_dict)
549
550 self._profiles = config_dict.get("profiles", {})
551
552 if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
553 # Assign the default POSIX profile
554 self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
555 else:
556 # Cannot override default POSIX profile
557 storage_provider_type = (
558 self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
559 )
560 if storage_provider_type != "file":
561 raise ValueError(
562 f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
563 f'"{storage_provider_type}"; expected "file".'
564 )
565
566 profile_dict = self._profiles.get(profile)
567
568 if not profile_dict:
569 raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
570
571 self._profile = profile
572 self._profile_dict = ImmutableDict(profile_dict)
573
574 self._opentelemetry_dict = config_dict.get("opentelemetry", None)
575 # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
576 # New processes (e.g. multiprocessing manager server) can't be created during this phase.
577 #
578 # Pass thunks everywhere instead for lazy telemetry initialization.
579 self._telemetry_provider = telemetry_provider or telemetry_init
580
581 self._cache_dict = config_dict.get("cache", None)
582 self._experimental_features = config_dict.get("experimental_features", None)
583
584 self._provider_bundle = self._build_provider_bundle(provider_bundle)
585 self._inject_profiles_from_bundle()
586
587 def _build_storage_provider(
588 self,
589 storage_provider_name: str,
590 storage_options: Optional[dict[str, Any]] = None,
591 credentials_provider: Optional[CredentialsProvider] = None,
592 ) -> StorageProvider:
593 if storage_options is None:
594 storage_options = {}
595 if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
596 raise ValueError(
597 f"Storage provider {storage_provider_name} is not supported. "
598 f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
599 )
600 if credentials_provider:
601 storage_options["credentials_provider"] = credentials_provider
602 if self._resolved_config_dict is not None:
603 # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
604 storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
605 if self._telemetry_provider is not None:
606 storage_options["telemetry_provider"] = self._telemetry_provider
607 class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
608 module_name = ".providers"
609 cls = import_class(class_name, module_name, PACKAGE_NAME)
610 return cls(**storage_options)
611
612 def _build_storage_provider_from_profile(self, storage_provider_profile: str):
613 storage_profile_dict = self._profiles.get(storage_provider_profile)
614 if not storage_profile_dict:
615 raise ValueError(
616 f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
617 )
618
619 # Check if metadata provider is configured for this profile
620 # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
621 local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
622 if local_metadata_provider_dict:
623 raise ValueError(
624 f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
625 )
626
627 # Initialize CredentialsProvider
628 local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
629 local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
630
631 # Initialize StorageProvider
632 local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
633 if local_storage_provider_dict:
634 local_name = local_storage_provider_dict["type"]
635 local_storage_options = local_storage_provider_dict.get("options", {})
636 else:
637 raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
638
639 storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
640 return storage_provider
641
642 def _build_credentials_provider(
643 self,
644 credentials_provider_dict: Optional[dict[str, Any]],
645 storage_options: Optional[dict[str, Any]] = None,
646 ) -> Optional[CredentialsProvider]:
647 """
648 Initializes the CredentialsProvider based on the provided dictionary.
649
650 Args:
651 credentials_provider_dict: Dictionary containing credentials provider configuration
652 storage_options: Storage provider options required by some credentials providers to scope the credentials.
653 """
654 if not credentials_provider_dict:
655 return None
656
657 if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
658 # Fully qualified class path case
659 class_type = credentials_provider_dict["type"]
660 module_name, class_name = class_type.rsplit(".", 1)
661 cls = import_class(class_name, module_name)
662 else:
663 # Mapped class name case
664 class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
665 module_name = ".providers"
666 cls = import_class(class_name, module_name, PACKAGE_NAME)
667
668 # Propagate storage provider options to credentials provider since they may be
669 # required by some credentials providers to scope the credentials.
670 import inspect
671
672 init_params = list(inspect.signature(cls.__init__).parameters)[1:] # skip 'self'
673 options = credentials_provider_dict.get("options", {})
674 if storage_options:
675 for storage_provider_option in storage_options.keys():
676 if storage_provider_option in init_params and storage_provider_option not in options:
677 options[storage_provider_option] = storage_options[storage_provider_option]
678
679 return cls(**options)
680
681 def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
682 # Initialize StorageProvider
683 storage_provider_dict = profile_dict.get("storage_provider", None)
684 if storage_provider_dict:
685 storage_provider_name = storage_provider_dict["type"]
686 storage_options = storage_provider_dict.get("options", {})
687 else:
688 raise ValueError("Missing storage_provider in the config.")
689
690 # Initialize CredentialsProvider
691 # It is prudent to assume that in some cases, the credentials provider
692 # will provide credentials scoped to specific base_path.
693 # So we need to pass the storage_options to the credentials provider.
694 credentials_provider_dict = profile_dict.get("credentials_provider", None)
695 credentials_provider = self._build_credentials_provider(
696 credentials_provider_dict=credentials_provider_dict,
697 storage_options=storage_options,
698 )
699
700 # Initialize MetadataProvider
701 metadata_provider_dict = profile_dict.get("metadata_provider", None)
702 metadata_provider = None
703 if metadata_provider_dict:
704 if metadata_provider_dict["type"] == "manifest":
705 metadata_options = metadata_provider_dict.get("options", {})
706 # If MetadataProvider has a reference to a different storage provider profile
707 storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
708 if storage_provider_profile:
709 storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
710 else:
711 storage_provider = self._build_storage_provider(
712 storage_provider_name, storage_options, credentials_provider
713 )
714
715 metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
716 else:
717 class_type = metadata_provider_dict["type"]
718 if "." not in class_type:
719 raise ValueError(
720 f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
721 )
722 module_name, class_name = class_type.rsplit(".", 1)
723 cls = import_class(class_name, module_name)
724 options = metadata_provider_dict.get("options", {})
725 metadata_provider = cls(**options)
726
727 # Build replicas if configured
728 replicas_config = profile_dict.get("replicas", [])
729 replicas = []
730 if replicas_config:
731 for replica_dict in replicas_config:
732 replicas.append(
733 Replica(
734 replica_profile=replica_dict["replica_profile"],
735 read_priority=replica_dict["read_priority"],
736 )
737 )
738
739 # Sort replicas by read_priority
740 replicas.sort(key=lambda r: r.read_priority)
741
742 return SimpleProviderBundle(
743 storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
744 credentials_provider=credentials_provider,
745 metadata_provider=metadata_provider,
746 replicas=replicas,
747 )
748
749 def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
750 class_type = provider_bundle_dict["type"]
751 module_name, class_name = class_type.rsplit(".", 1)
752 cls = import_class(class_name, module_name)
753 options = provider_bundle_dict.get("options", {})
754 return cls(**options)
755
756 def _build_provider_bundle(
757 self, provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]]
758 ) -> ProviderBundleV2:
759 if provider_bundle:
760 bundle = provider_bundle
761 else:
762 provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
763 if provider_bundle_dict:
764 bundle = self._build_provider_bundle_from_extension(provider_bundle_dict)
765 else:
766 bundle = self._build_provider_bundle_from_config(self._profile_dict)
767
768 if isinstance(bundle, ProviderBundle) and not isinstance(bundle, ProviderBundleV2):
769 bundle = SimpleProviderBundleV2.from_v1_bundle(self._profile, bundle)
770
771 return bundle
772
773 def _inject_profiles_from_bundle(self) -> None:
774 if len(self._provider_bundle.storage_backends) > 1:
775 profiles = copy.deepcopy(self._profiles)
776
777 for child_name, backend in self._provider_bundle.storage_backends.items():
778 child_config = {
779 "storage_provider": {
780 "type": backend.storage_provider_config.type,
781 "options": backend.storage_provider_config.options,
782 }
783 }
784
785 if child_name in self._profiles:
786 # Profile already exists - check if it matches what we would inject
787 existing = self._profiles[child_name]
788 if existing.get("storage_provider") == child_config["storage_provider"]:
789 continue
790 else:
791 raise ValueError(
792 f"Profile '{child_name}' already exists in configuration with different settings."
793 )
794
795 profiles[child_name] = child_config
796
797 resolved_config_dict = copy.deepcopy(self._resolved_config_dict)
798 resolved_config_dict["profiles"] = profiles
799
800 self._profiles = ImmutableDict(profiles)
801 self._resolved_config_dict = ImmutableDict(resolved_config_dict)
802
803 def _build_retry_config(self) -> RetryConfig:
804 """Build retry config from profile dict."""
805 retry_config_dict = self._profile_dict.get("retry", None)
806 if retry_config_dict:
807 attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
808 delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
809 backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
810 return RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
811 else:
812 return RetryConfig(
813 attempts=DEFAULT_RETRY_ATTEMPTS,
814 delay=DEFAULT_RETRY_DELAY,
815 backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
816 )
817
818 def _build_autocommit_config(self) -> AutoCommitConfig:
819 """Build autocommit config from profile dict."""
820 autocommit_dict = self._profile_dict.get("autocommit", None)
821 if autocommit_dict:
822 interval_minutes = autocommit_dict.get("interval_minutes", None)
823 at_exit = autocommit_dict.get("at_exit", False)
824 return AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
825 return AutoCommitConfig()
826
827 def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
828 if "size_mb" in cache_dict:
829 raise ValueError(
830 "The 'size_mb' property is no longer supported. \n"
831 "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
832 "Example configuration:\n"
833 "cache:\n"
834 " size: 500G # Optional: If not specified, default cache size (10GB) will be used\n"
835 " use_etag: true # Optional: If not specified, default cache use_etag (true) will be used\n"
836 " location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
837 " eviction_policy: # Optional: The eviction policy to use\n"
838 " policy: fifo # Optional: The eviction policy to use, default is 'fifo'\n"
839 " refresh_interval: 300 # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
840 )
841
842 def _validate_replicas(self, replicas: list[Replica]) -> None:
843 """
844 Validates that replica profiles do not have their own replicas configuration.
845
846 This prevents circular references where a replica profile could reference
847 another profile that also has replicas, creating an infinite loop.
848
849 :param replicas: The list of Replica objects to validate
850 :raises ValueError: If any replica profile has its own replicas configuration
851 """
852 for replica in replicas:
853 replica_profile_name = replica.replica_profile
854
855 # Check that replica profile is not the same as the current profile
856 if replica_profile_name == self._profile:
857 raise ValueError(
858 f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
859 )
860
861 # Check if the replica profile exists in the configuration
862 if replica_profile_name not in self._profiles:
863 raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
864
865 # Get the replica profile configuration
866 replica_profile_dict = self._profiles[replica_profile_name]
867
868 # Check if the replica profile has its own replicas configuration
869 if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
870 raise ValueError(
871 f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
872 f"This creates a circular reference which is not allowed."
873 )
874
875 # todo: remove once experimental features are stable
876 def _validate_experimental_features(self, eviction_policy: EvictionPolicyConfig) -> None:
877 """
878 Validate that experimental features are enabled when used.
879
880 :param eviction_policy: The eviction policy configuration to validate
881 :raises ValueError: If experimental features are used without being enabled
882 """
883 # Validate MRU eviction policy
884 if eviction_policy.policy.upper() == "MRU":
885 if not (self._experimental_features and self._experimental_features.get("cache_mru_eviction")):
886 raise ValueError(
887 "MRU eviction policy is experimental and not enabled.\n"
888 "Enable it by adding to config:\n"
889 " experimental_features:\n"
890 " cache_mru_eviction: true"
891 )
892
893 # Validate purge_factor
894 if eviction_policy.purge_factor != 0:
895 if eviction_policy.purge_factor < 0 or eviction_policy.purge_factor > 100:
896 raise ValueError("purge_factor must be between 0 and 100")
897
898 if not (self._experimental_features and self._experimental_features.get("cache_purge_factor")):
899 raise ValueError(
900 "purge_factor is experimental and not enabled.\n"
901 "Enable it by adding to config:\n"
902 " experimental_features:\n"
903 " cache_purge_factor: true"
904 )
905
906 def build_config(self) -> "StorageClientConfig":
907 bundle = self._provider_bundle
908 backends = bundle.storage_backends
909
910 if len(backends) > 1:
911 if not bundle.metadata_provider:
912 raise ValueError(
913 f"Multi-backend configuration for profile '{self._profile}' requires metadata_provider "
914 "for routing between storage locations."
915 )
916
917 child_profile_names = list(backends.keys())
918 retry_config = self._build_retry_config()
919 autocommit_config = self._build_autocommit_config()
920
921 # config for Composite StorageClient
922 config = StorageClientConfig(
923 profile=self._profile,
924 storage_provider=None,
925 credentials_provider=None,
926 storage_provider_profiles=child_profile_names,
927 metadata_provider=bundle.metadata_provider,
928 cache_config=None,
929 cache_manager=None,
930 retry_config=retry_config,
931 telemetry_provider=self._telemetry_provider,
932 replicas=[],
933 autocommit_config=autocommit_config,
934 )
935
936 config._config_dict = self._resolved_config_dict
937 return config
938
939 # Single-backend (len == 1)
940 _, backend = list(backends.items())[0]
941 storage_provider = self._build_storage_provider(
942 backend.storage_provider_config.type,
943 backend.storage_provider_config.options,
944 backend.credentials_provider,
945 )
946 credentials_provider = backend.credentials_provider
947 metadata_provider = bundle.metadata_provider
948 replicas = backend.replicas
949
950 # Validate replicas to prevent circular references
951 if replicas:
952 self._validate_replicas(replicas)
953
954 cache_config: Optional[CacheConfig] = None
955 cache_manager: Optional[CacheManager] = None
956
957 # Check if caching is enabled for this profile
958 caching_enabled = self._profile_dict.get("caching_enabled", False)
959
960 if self._cache_dict is not None and caching_enabled:
961 tempdir = tempfile.gettempdir()
962 default_location = os.path.join(tempdir, "msc_cache")
963 location = self._cache_dict.get("location", default_location)
964
965 # Check if cache_backend.cache_path is defined
966 cache_backend = self._cache_dict.get("cache_backend", {})
967 cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
968
969 # Warn if both location and cache_backend.cache_path are defined
970 if cache_backend_path and self._cache_dict.get("location") is not None:
971 logger.warning(
972 f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
973 f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
974 )
975 elif cache_backend_path:
976 # Use cache_backend.cache_path only if location is not explicitly defined
977 location = cache_backend_path
978
979 # Resolve the effective flag while preserving explicit ``False``.
980 if "check_source_version" in self._cache_dict:
981 check_source_version = self._cache_dict["check_source_version"]
982 else:
983 check_source_version = self._cache_dict.get("use_etag", True)
984
985 # Warn if both keys are specified – the new one wins.
986 if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
987 logger.warning(
988 "Both 'check_source_version' and 'use_etag' are defined in cache config. "
989 "Using 'check_source_version' and ignoring 'use_etag'."
990 )
991
992 if not Path(location).is_absolute():
993 raise ValueError(f"Cache location must be an absolute path: {location}")
994
995 # Initialize cache_dict with default values
996 cache_dict = self._cache_dict
997
998 # Verify cache config
999 self._verify_cache_config(cache_dict)
1000
1001 # Initialize eviction policy
1002 if "eviction_policy" in cache_dict:
1003 policy = cache_dict["eviction_policy"]["policy"].lower()
1004 purge_factor = cache_dict["eviction_policy"].get("purge_factor", 0)
1005 eviction_policy = EvictionPolicyConfig(
1006 policy=policy,
1007 refresh_interval=cache_dict["eviction_policy"].get(
1008 "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
1009 ),
1010 purge_factor=purge_factor,
1011 )
1012 else:
1013 eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
1014
1015 # todo: remove once experimental features are stable
1016 # Validate experimental features before creating cache_config
1017 self._validate_experimental_features(eviction_policy)
1018
1019 # Create cache config from the standardized format
1020 cache_config = CacheConfig(
1021 size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
1022 location=cache_dict.get("location", location),
1023 check_source_version=check_source_version,
1024 eviction_policy=eviction_policy,
1025 cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
1026 )
1027
1028 cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
1029 elif self._cache_dict is not None and not caching_enabled:
1030 logger.debug(f"Caching is disabled for profile '{self._profile}'")
1031 elif self._cache_dict is None and caching_enabled:
1032 logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
1033
1034 retry_config = self._build_retry_config()
1035 autocommit_config = self._build_autocommit_config()
1036
1037 config = StorageClientConfig(
1038 profile=self._profile,
1039 storage_provider=storage_provider,
1040 credentials_provider=credentials_provider,
1041 storage_provider_profiles=None,
1042 metadata_provider=metadata_provider,
1043 cache_config=cache_config,
1044 cache_manager=cache_manager,
1045 retry_config=retry_config,
1046 telemetry_provider=self._telemetry_provider,
1047 replicas=replicas,
1048 autocommit_config=autocommit_config,
1049 )
1050
1051 config._config_dict = self._resolved_config_dict
1052 return config
1053
1054
1055class PathMapping:
1056 """
1057 Class to handle path mappings defined in the MSC configuration.
1058
1059 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1060 where entries are sorted by prefix length (longest first) for optimal matching.
1061 Longer paths take precedence when matching.
1062 """
1063
1064 def __init__(self):
1065 """Initialize an empty PathMapping."""
1066 self._mapping = defaultdict(lambda: defaultdict(list))
1067
1068 @classmethod
1069 def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
1070 """
1071 Create a PathMapping instance from configuration dictionary.
1072
1073 :param config_dict: Configuration dictionary, if None the config will be loaded
1074 :return: A PathMapping instance with processed mappings
1075 """
1076 if config_dict is None:
1077 # Import locally to avoid circular imports
1078 from multistorageclient.config import StorageClientConfig
1079
1080 config_dict, _ = StorageClientConfig.read_msc_config()
1081
1082 if not config_dict:
1083 return cls()
1084
1085 instance = cls()
1086 instance._load_mapping(config_dict)
1087 return instance
1088
1089 def _load_mapping(self, config_dict: dict[str, Any]) -> None:
1090 """
1091 Load path mapping from a configuration dictionary.
1092
1093 :param config_dict: Configuration dictionary containing path mapping
1094 """
1095 # Get the path_mapping section
1096 path_mapping = config_dict.get("path_mapping", {})
1097 if path_mapping is None:
1098 return
1099
1100 # Process each mapping
1101 for source_path, dest_path in path_mapping.items():
1102 # Validate format
1103 if not source_path.endswith("/"):
1104 continue
1105 if not dest_path.startswith(MSC_PROTOCOL):
1106 continue
1107 if not dest_path.endswith("/"):
1108 continue
1109
1110 # Extract the destination profile
1111 pr_dest = urlparse(dest_path)
1112 dest_profile = pr_dest.netloc
1113
1114 # Parse the source path
1115 pr = urlparse(source_path)
1116 protocol = pr.scheme.lower() if pr.scheme else "file"
1117
1118 if protocol == "file" or source_path.startswith("/"):
1119 # For file or absolute paths, use the whole path as the prefix
1120 # and leave bucket empty
1121 bucket = ""
1122 prefix = source_path if source_path.startswith("/") else pr.path
1123 else:
1124 # For object storage, extract bucket and prefix
1125 bucket = pr.netloc
1126 prefix = pr.path
1127 if prefix.startswith("/"):
1128 prefix = prefix[1:]
1129
1130 # Add the mapping to the nested dict
1131 self._mapping[protocol][bucket].append((prefix, dest_profile))
1132
1133 # Sort each bucket's prefixes by length (longest first) for optimal matching
1134 for protocol, buckets in self._mapping.items():
1135 for bucket, prefixes in buckets.items():
1136 self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
1137
1138 def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
1139 """
1140 Find the best matching mapping for the given URL.
1141
1142 :param url: URL to find matching mapping for
1143 :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
1144 """
1145 # Parse the URL
1146 pr = urlparse(url)
1147 protocol = pr.scheme.lower() if pr.scheme else "file"
1148
1149 # For file paths or absolute paths
1150 if protocol == "file" or url.startswith("/"):
1151 path = url if url.startswith("/") else pr.path
1152
1153 possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
1154
1155 # Check each prefix (already sorted by length, longest first)
1156 for prefix, profile in possible_mapping:
1157 if path.startswith(prefix):
1158 # Calculate the relative path
1159 rel_path = path[len(prefix) :]
1160 if not rel_path.startswith("/"):
1161 rel_path = "/" + rel_path
1162 return profile, rel_path
1163
1164 return None
1165
1166 # For object storage
1167 bucket = pr.netloc
1168 path = pr.path
1169 if path.startswith("/"):
1170 path = path[1:]
1171
1172 # Check bucket-specific mapping
1173 possible_mapping = (
1174 self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
1175 )
1176
1177 # Check each prefix (already sorted by length, longest first)
1178 for prefix, profile in possible_mapping:
1179 # matching prefix
1180 if path.startswith(prefix):
1181 rel_path = path[len(prefix) :]
1182 # Remove leading slash if present
1183 if rel_path.startswith("/"):
1184 rel_path = rel_path[1:]
1185
1186 return profile, rel_path
1187
1188 return None
1189
1190
[docs]
1191class StorageClientConfig:
1192 """
1193 Configuration class for the :py:class:`multistorageclient.StorageClient`.
1194 """
1195
1196 profile: str
1197 storage_provider: Optional[StorageProvider]
1198 credentials_provider: Optional[CredentialsProvider]
1199 storage_provider_profiles: Optional[list[str]]
1200 metadata_provider: Optional[MetadataProvider]
1201 cache_config: Optional[CacheConfig]
1202 cache_manager: Optional[CacheManager]
1203 retry_config: Optional[RetryConfig]
1204 telemetry_provider: Optional[Callable[[], Telemetry]]
1205 replicas: list[Replica]
1206 autocommit_config: Optional[AutoCommitConfig]
1207
1208 _config_dict: Optional[dict[str, Any]]
1209
1210 def __init__(
1211 self,
1212 profile: str,
1213 storage_provider: Optional[StorageProvider] = None,
1214 credentials_provider: Optional[CredentialsProvider] = None,
1215 storage_provider_profiles: Optional[list[str]] = None,
1216 metadata_provider: Optional[MetadataProvider] = None,
1217 cache_config: Optional[CacheConfig] = None,
1218 cache_manager: Optional[CacheManager] = None,
1219 retry_config: Optional[RetryConfig] = None,
1220 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1221 replicas: Optional[list[Replica]] = None,
1222 autocommit_config: Optional[AutoCommitConfig] = None,
1223 ):
1224 # exactly one of storage_provider or storage_provider_profiles must be set
1225 if storage_provider and storage_provider_profiles:
1226 raise ValueError(
1227 "Cannot specify both storage_provider and storage_provider_profiles. "
1228 "Use storage_provider for SingleStorageClient or storage_provider_profiles for CompositeStorageClient."
1229 )
1230 if not storage_provider and not storage_provider_profiles:
1231 raise ValueError("Must specify either storage_provider or storage_provider_profiles.")
1232
1233 if replicas is None:
1234 replicas = []
1235 self.profile = profile
1236 self.storage_provider = storage_provider
1237 self.credentials_provider = credentials_provider
1238 self.storage_provider_profiles = storage_provider_profiles
1239 self.metadata_provider = metadata_provider
1240 self.cache_config = cache_config
1241 self.cache_manager = cache_manager
1242 self.retry_config = retry_config
1243 self.telemetry_provider = telemetry_provider
1244 self.replicas = replicas
1245 self.autocommit_config = autocommit_config
1246
[docs]
1247 @staticmethod
1248 def from_json(
1249 config_json: str,
1250 profile: str = DEFAULT_POSIX_PROFILE_NAME,
1251 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1252 ) -> "StorageClientConfig":
1253 """
1254 Load a storage client configuration from a JSON string.
1255
1256 :param config_json: Configuration JSON string.
1257 :param profile: Profile to use.
1258 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1259 """
1260 config_dict = json.loads(config_json)
1261 return StorageClientConfig.from_dict(
1262 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
1263 )
1264
[docs]
1265 @staticmethod
1266 def from_yaml(
1267 config_yaml: str,
1268 profile: str = DEFAULT_POSIX_PROFILE_NAME,
1269 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1270 ) -> "StorageClientConfig":
1271 """
1272 Load a storage client configuration from a YAML string.
1273
1274 :param config_yaml: Configuration YAML string.
1275 :param profile: Profile to use.
1276 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1277 """
1278 config_dict = yaml.safe_load(config_yaml)
1279 return StorageClientConfig.from_dict(
1280 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
1281 )
1282
[docs]
1283 @staticmethod
1284 def from_dict(
1285 config_dict: dict[str, Any],
1286 profile: str = DEFAULT_POSIX_PROFILE_NAME,
1287 skip_validation: bool = False,
1288 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1289 ) -> "StorageClientConfig":
1290 """
1291 Load a storage client configuration from a Python dictionary.
1292
1293 :param config_dict: Configuration Python dictionary.
1294 :param profile: Profile to use.
1295 :param skip_validation: Skip configuration schema validation.
1296 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1297 """
1298 # Validate the config file with predefined JSON schema
1299 if not skip_validation:
1300 validate_config(config_dict)
1301
1302 # Load config
1303 loader = StorageClientConfigLoader(
1304 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
1305 )
1306 config = loader.build_config()
1307
1308 return config
1309
[docs]
1310 @staticmethod
1311 def from_file(
1312 config_file_paths: Optional[Iterable[str]] = None,
1313 profile: str = DEFAULT_POSIX_PROFILE_NAME,
1314 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1315 ) -> "StorageClientConfig":
1316 """
1317 Load a storage client configuration from the first file found.
1318
1319 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`).
1320 :param profile: Profile to use.
1321 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1322 """
1323 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths)
1324 # Parse rclone config file.
1325 rclone_config_dict, rclone_config_file = read_rclone_config()
1326
1327 # Merge config files.
1328 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict)
1329 if conflicted_keys:
1330 raise ValueError(
1331 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}'
1332 )
1333 merged_profiles = merged_config.get("profiles", {})
1334
1335 # Check if profile is in merged_profiles
1336 if profile in merged_profiles:
1337 return StorageClientConfig.from_dict(
1338 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider
1339 )
1340 else:
1341 # Check if profile is the default profile or an implicit profile
1342 if profile == DEFAULT_POSIX_PROFILE_NAME:
1343 implicit_profile_config = DEFAULT_POSIX_PROFILE
1344 elif profile.startswith("_"):
1345 # Handle implicit profiles
1346 parts = profile[1:].split("-", 1)
1347 if len(parts) == 2:
1348 protocol, bucket = parts
1349 # Verify it's a supported protocol
1350 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS:
1351 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"')
1352 implicit_profile_config = create_implicit_profile_config(
1353 profile_name=profile, protocol=protocol, base_path=bucket
1354 )
1355 else:
1356 raise ValueError(f'Invalid implicit profile format: "{profile}"')
1357 else:
1358 raise ValueError(
1359 f'Profile "{profile}" not found in configuration files. Configuration was checked in '
1360 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. "
1361 f"Please verify that the profile exists and that configuration files are correctly located."
1362 )
1363 # merge the implicit profile config into the merged config so the cache & observability config can be inherited
1364 if "profiles" not in merged_config:
1365 merged_config["profiles"] = implicit_profile_config["profiles"]
1366 else:
1367 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile]
1368 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_"
1369 return StorageClientConfig.from_dict(
1370 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider
1371 )
1372
1373 @staticmethod
1374 def from_provider_bundle(
1375 config_dict: dict[str, Any],
1376 provider_bundle: Union[ProviderBundle, ProviderBundleV2],
1377 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1378 ) -> "StorageClientConfig":
1379 loader = StorageClientConfigLoader(
1380 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider
1381 )
1382 config = loader.build_config()
1383 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors
1384 return config
1385
[docs]
1386 @staticmethod
1387 def read_msc_config(
1388 config_file_paths: Optional[Iterable[str]] = None,
1389 ) -> tuple[Optional[dict[str, Any]], Optional[str]]:
1390 """Get the MSC configuration dictionary and the path of the first file found.
1391
1392 If no config paths are specified, configs are searched in the following order:
1393
1394 1. ``MSC_CONFIG`` environment variable (highest precedence)
1395 2. Standard search paths (user-specified config and system-wide config)
1396
1397 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used.
1398 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration
1399 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute
1400 path of the config file used, or ``None`` if no config file was found.
1401 """
1402 config_dict: dict[str, Any] = {}
1403 config_file_path: Optional[str] = None
1404
1405 config_file_paths = list(config_file_paths or [])
1406
1407 # Add default paths if none provided.
1408 if len(config_file_paths) == 0:
1409 # Environment variable.
1410 msc_config_env = os.getenv("MSC_CONFIG", None)
1411 if msc_config_env is not None:
1412 config_file_paths.append(msc_config_env)
1413
1414 # Standard search paths.
1415 config_file_paths.extend(_find_config_file_paths())
1416
1417 # Normalize + absolutize paths.
1418 config_file_paths = [os.path.abspath(path) for path in config_file_paths]
1419
1420 # Log plan.
1421 logger.debug(f"Searching MSC config file paths: {config_file_paths}")
1422
1423 # Load config.
1424 for path in config_file_paths:
1425 if os.path.exists(path):
1426 try:
1427 with open(path) as f:
1428 if path.endswith(".json"):
1429 config_dict = json.load(f)
1430 else:
1431 config_dict = yaml.safe_load(f)
1432 config_file_path = path
1433 # Use the first config file.
1434 break
1435 except Exception as e:
1436 raise ValueError(f"malformed MSC config file: {path}, exception: {e}")
1437
1438 # Log result.
1439 if config_file_path is None:
1440 logger.debug("No MSC config files found in any of the search locations.")
1441 else:
1442 logger.debug(f"Using MSC config file: {config_file_path}")
1443
1444 if config_dict:
1445 validate_config(config_dict)
1446
1447 if "include" in config_dict and config_file_path:
1448 config_dict = _load_and_merge_includes(config_file_path, config_dict)
1449
1450 return config_dict, config_file_path
1451
[docs]
1452 @staticmethod
1453 def read_path_mapping() -> PathMapping:
1454 """
1455 Get the path mapping defined in the MSC configuration.
1456
1457 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1458 where entries are sorted by prefix length (longest first) for optimal matching.
1459 Longer paths take precedence when matching.
1460
1461 :return: A PathMapping instance with translation mappings
1462 """
1463 try:
1464 return PathMapping.from_config()
1465 except Exception:
1466 # Log the error but continue - this shouldn't stop the application from working
1467 logger.error("Failed to load path_mapping from MSC config")
1468 return PathMapping()
1469
1470 def __getstate__(self) -> dict[str, Any]:
1471 state = self.__dict__.copy()
1472 if not state.get("_config_dict"):
1473 raise ValueError("StorageClientConfig is not serializable")
1474 del state["credentials_provider"]
1475 del state["storage_provider"]
1476 del state["metadata_provider"]
1477 del state["cache_manager"]
1478 del state["replicas"]
1479 return state
1480
1481 def __setstate__(self, state: dict[str, Any]) -> None:
1482 # Presence checked by __getstate__.
1483 config_dict = state["_config_dict"]
1484 loader = StorageClientConfigLoader(
1485 config_dict=config_dict,
1486 profile=state["profile"],
1487 telemetry_provider=state["telemetry_provider"],
1488 )
1489 new_config = loader.build_config()
1490 self.profile = new_config.profile
1491 self.storage_provider = new_config.storage_provider
1492 self.credentials_provider = new_config.credentials_provider
1493 self.storage_provider_profiles = new_config.storage_provider_profiles
1494 self.metadata_provider = new_config.metadata_provider
1495 self.cache_config = new_config.cache_config
1496 self.cache_manager = new_config.cache_manager
1497 self.retry_config = new_config.retry_config
1498 self.telemetry_provider = new_config.telemetry_provider
1499 self._config_dict = config_dict
1500 self.replicas = new_config.replicas
1501 self.autocommit_config = new_config.autocommit_config