1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import copy
17import json
18import logging
19import os
20import tempfile
21from collections import defaultdict
22from collections.abc import Callable
23from pathlib import Path
24from typing import Any, Iterable, Optional, Union
25from urllib.parse import urlparse
26
27import yaml
28
29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
31from .providers.manifest_metadata import ManifestMetadataProvider
32from .rclone import read_rclone_config
33from .schema import validate_config
34from .telemetry import Telemetry
35from .telemetry import init as telemetry_init
36from .types import (
37 DEFAULT_RETRY_ATTEMPTS,
38 DEFAULT_RETRY_BACKOFF_MULTIPLIER,
39 DEFAULT_RETRY_DELAY,
40 MSC_PROTOCOL,
41 AutoCommitConfig,
42 CredentialsProvider,
43 MetadataProvider,
44 ProviderBundle,
45 ProviderBundleV2,
46 Replica,
47 RetryConfig,
48 StorageBackend,
49 StorageProvider,
50 StorageProviderConfig,
51)
52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
53
54# Constants related to implicit profiles
55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
57 "s3": "s3",
58 "gs": "gcs",
59 "ais": "ais",
60 "file": "file",
61}
62
63
64# Template for creating implicit profile configurations
65def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
66 """
67 Create a configuration dictionary for an implicit profile.
68
69 :param profile_name: The name of the profile (e.g., "_s3-bucket1")
70 :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
71 :param base_path: The base path (e.g., bucket name) for the storage provider
72
73 :return: A configuration dictionary for the implicit profile
74 """
75 provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
76 return {
77 "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
78 }
79
80
81DEFAULT_POSIX_PROFILE_NAME = "default"
82DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
83
84STORAGE_PROVIDER_MAPPING = {
85 "file": "PosixFileStorageProvider",
86 "s3": "S3StorageProvider",
87 "gcs": "GoogleStorageProvider",
88 "oci": "OracleStorageProvider",
89 "azure": "AzureBlobStorageProvider",
90 "ais": "AIStoreStorageProvider",
91 "ais_s3": "AIStoreS3StorageProvider",
92 "s8k": "S8KStorageProvider",
93 "gcs_s3": "GoogleS3StorageProvider",
94 "huggingface": "HuggingFaceStorageProvider",
95}
96
97CREDENTIALS_PROVIDER_MAPPING = {
98 "S3Credentials": "StaticS3CredentialsProvider",
99 "AzureCredentials": "StaticAzureCredentialsProvider",
100 "AISCredentials": "StaticAISCredentialProvider",
101 "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
102 "GoogleServiceAccountCredentialsProvider": "GoogleServiceAccountCredentialsProvider",
103 "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
104 "FileBasedCredentials": "FileBasedCredentialsProvider",
105}
106
107
108def _resolve_include_path(include_path: str, parent_config_path: str) -> str:
109 """
110 Resolve include path (absolute or relative to parent config file).
111
112 :param include_path: Path from include keyword (can be absolute or relative)
113 :param parent_config_path: Absolute path of the config file containing the include
114 :return: Absolute, normalized path to the included config file
115 """
116 if os.path.isabs(include_path):
117 return os.path.abspath(include_path)
118 else:
119 parent_dir = os.path.dirname(parent_config_path)
120 return os.path.abspath(os.path.join(parent_dir, include_path))
121
122
123def _merge_profiles(
124 base_profiles: dict[str, Any],
125 new_profiles: dict[str, Any],
126 base_path: str,
127 new_path: str,
128) -> dict[str, Any]:
129 """
130 Merge profiles from two config files with conflict detection.
131
132 Profiles with the same name are allowed if their definitions are identical (idempotent).
133 If a profile name exists in both configs with different definitions, an error is raised.
134
135 NOTE: This function performs SHALLOW merging - each profile is treated as a complete unit.
136 We cannot use merge_dictionaries_no_overwrite() here because it would recursively merge
137 profile fields, which could lead to unintended behavior. For example:
138
139 base_config:
140 profiles:
141 my-profile:
142 storage_provider:
143 type: s3
144 options:
145 base_path: bucket
146
147 new_config:
148 profiles:
149 my-profile:
150 credentials_provider:
151 type: S3Credentials
152 options:
153 access_key: foo
154 secret_key: bar
155
156 If we used merge_dictionaries_no_overwrite(), the result would be a profile with BOTH storage_provider
157 and credentials_provider, which might not be the intended configuration.
158
159 :param base_profiles: Profiles from the base config
160 :param new_profiles: Profiles from the new config to merge
161 :param base_path: Path to the base config file (for error messages)
162 :param new_path: Path to the new config file (for error messages)
163 :return: Merged profiles dictionary
164 :raises ValueError: If any profile name exists in both configs with different definitions
165 """
166 result = base_profiles.copy()
167 conflicting_profiles = []
168
169 for profile_name, new_profile_def in new_profiles.items():
170 if profile_name in result:
171 if result[profile_name] != new_profile_def:
172 conflicting_profiles.append(profile_name)
173 else:
174 result[profile_name] = new_profile_def
175
176 if conflicting_profiles:
177 conflicts_list = ", ".join(f"'{p}'" for p in sorted(conflicting_profiles))
178 raise ValueError(f"Profile conflict: {conflicts_list} defined differently in {base_path} and {new_path}")
179
180 return result
181
182
183def _merge_opentelemetry(
184 base_otel: dict[str, Any],
185 new_otel: dict[str, Any],
186 base_path: str,
187 new_path: str,
188) -> dict[str, Any]:
189 """
190 Merge opentelemetry configurations with special handling for metrics.attributes.
191
192 Merge strategy:
193 - metrics.attributes: Concatenate arrays (preserve order for fallback mechanism)
194 - Other fields: Use idempotent check (must be identical)
195
196 Attributes providers are concatenated to support fallback scenarios where multiple
197 providers of the same type supply the same attribute key with different sources.
198 The order matters - later providers override earlier ones.
199
200 :param base_otel: Base opentelemetry config
201 :param new_otel: New opentelemetry config to merge
202 :param base_path: Path to base config file (for error messages)
203 :param new_path: Path to new config file (for error messages)
204 :return: Merged opentelemetry configuration
205 :raises ValueError: If conflicts are detected
206 """
207 # Merge metrics.attributes
208 base_attrs = base_otel.get("metrics", {}).get("attributes", [])
209 new_attrs = new_otel.get("metrics", {}).get("attributes", [])
210 merged_attrs = base_attrs + new_attrs
211
212 # Merge other opentelemetry fields
213 base_otel_without_attrs = copy.deepcopy(base_otel)
214 if "metrics" in base_otel_without_attrs and "attributes" in base_otel_without_attrs["metrics"]:
215 del base_otel_without_attrs["metrics"]["attributes"]
216
217 new_otel_without_attrs = copy.deepcopy(new_otel)
218 if "metrics" in new_otel_without_attrs and "attributes" in new_otel_without_attrs["metrics"]:
219 del new_otel_without_attrs["metrics"]["attributes"]
220
221 merged, conflicts = merge_dictionaries_no_overwrite(
222 base_otel_without_attrs, new_otel_without_attrs, allow_idempotent=True
223 )
224
225 if conflicts:
226 conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
227 raise ValueError(
228 f"opentelemetry config conflict: {conflicts_list} defined differently in {base_path} and {new_path}"
229 )
230
231 if "metrics" in merged:
232 merged["metrics"]["attributes"] = merged_attrs
233 elif merged_attrs:
234 merged["metrics"] = {"attributes": merged_attrs}
235
236 return merged
237
238
239def _merge_configs(
240 base_config: dict[str, Any],
241 new_config: dict[str, Any],
242 base_path: str,
243 new_path: str,
244) -> dict[str, Any]:
245 """
246 Merge two config dictionaries with field-specific strategies.
247
248 Different config fields have different merge strategies:
249 - profiles: Shallow merge with idempotent check (uses _merge_profiles)
250 - path_mapping: Flat dict merge with idempotent check
251 - experimental_features: Flat dict merge with idempotent check
252 - opentelemetry: Hybrid merge (attributes concatenate, others idempotent)
253 - cache, posix: Global configs, idempotent if identical, error if different
254
255 :param base_config: Base configuration dictionary
256 :param new_config: New configuration to merge
257 :param base_path: Path to base config file (for error messages)
258 :param new_path: Path to new config file (for error messages)
259 :return: Merged configuration dictionary
260 :raises ValueError: If conflicts are detected
261 """
262 result = {}
263
264 all_keys = set(base_config.keys()) | set(new_config.keys())
265
266 for key in all_keys:
267 base_value = base_config.get(key)
268 new_value = new_config.get(key)
269
270 # Key only in base
271 if key not in new_config:
272 result[key] = base_value
273 continue
274
275 # Key only in new
276 if key not in base_config:
277 result[key] = new_value
278 continue
279
280 # Key in both - need to merge or detect conflict
281 if key == "profiles":
282 result["profiles"] = _merge_profiles(base_value or {}, new_value or {}, base_path, new_path)
283
284 elif key in ("path_mapping", "experimental_features"):
285 merged, conflicts = merge_dictionaries_no_overwrite(
286 (base_value or {}).copy(), new_value or {}, allow_idempotent=True
287 )
288 if conflicts:
289 conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
290 raise ValueError(
291 f"Config merge conflict: {conflicts_list} have different values in {base_path} and {new_path}"
292 )
293 result[key] = merged
294
295 elif key == "opentelemetry":
296 result["opentelemetry"] = _merge_opentelemetry(base_value or {}, new_value or {}, base_path, new_path)
297
298 elif key in ("cache", "posix"):
299 if base_value != new_value:
300 raise ValueError(f"'{key}' defined differently in {base_path} and {new_path}")
301 result[key] = base_value
302
303 elif key == "include":
304 # 'include' is processed by _load_and_merge_includes, not part of final config
305 pass
306
307 else:
308 # This should never happen and all top level fields must have explicit handling above
309 raise ValueError(f"Unknown field '{key}' in config file.")
310
311 return result
312
313
314def _load_and_merge_includes(
315 main_config_path: str,
316 main_config_dict: dict[str, Any],
317) -> dict[str, Any]:
318 """
319 Load and merge included config files.
320
321 Processes the 'include' directive in the main config, loading and merging all
322 specified config files. Only supports one level of includes - included files
323 cannot themselves have 'include' directives.
324
325 :param main_config_path: Absolute path to the main config file
326 :param main_config_dict: Dictionary loaded from the main config file
327 :return: Merged configuration dictionary (without 'include' field)
328 :raises ValueError: If include file not found, malformed, or contains nested includes
329 """
330 include_paths = main_config_dict.get("include", [])
331
332 if not include_paths:
333 return {k: v for k, v in main_config_dict.items() if k != "include"}
334
335 merged_config = {k: v for k, v in main_config_dict.items() if k != "include"}
336
337 for include_path in include_paths:
338 resolved_path = _resolve_include_path(include_path, main_config_path)
339
340 if not os.path.exists(resolved_path):
341 raise ValueError(f"Included config file not found: {resolved_path} (from {main_config_path})")
342
343 try:
344 with open(resolved_path) as f:
345 if resolved_path.endswith(".json"):
346 included_config = json.load(f)
347 else:
348 included_config = yaml.safe_load(f)
349 except Exception as e:
350 raise ValueError(f"Failed to load included config {resolved_path}: {e}")
351
352 validate_config(included_config)
353 if "include" in included_config:
354 raise ValueError(f"Nested includes not allowed: {resolved_path} contains 'include' directive")
355
356 merged_config = _merge_configs(merged_config, included_config, main_config_path, resolved_path)
357
358 return merged_config
359
360
361def _find_config_file_paths() -> tuple[str]:
362 """
363 Get configuration file search paths.
364
365 Returns paths in order of precedence:
366
367 1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
368 2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
369 """
370 paths = []
371
372 # 1. User-specific configuration directory
373 xdg_config_home = os.getenv("XDG_CONFIG_HOME")
374
375 if xdg_config_home:
376 paths.extend(
377 [
378 os.path.join(xdg_config_home, "msc", "config.yaml"),
379 os.path.join(xdg_config_home, "msc", "config.json"),
380 ]
381 )
382
383 user_home = os.getenv("HOME")
384
385 if user_home:
386 paths.extend(
387 [
388 os.path.join(user_home, ".msc_config.yaml"),
389 os.path.join(user_home, ".msc_config.json"),
390 os.path.join(user_home, ".config", "msc", "config.yaml"),
391 os.path.join(user_home, ".config", "msc", "config.json"),
392 ]
393 )
394
395 # 2. System-wide configuration directories
396 xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
397 if not xdg_config_dirs:
398 xdg_config_dirs = "/etc/xdg"
399
400 for config_dir in xdg_config_dirs.split(":"):
401 config_dir = config_dir.strip()
402 if config_dir:
403 paths.extend(
404 [
405 os.path.join(config_dir, "msc", "config.yaml"),
406 os.path.join(config_dir, "msc", "config.json"),
407 ]
408 )
409
410 paths.extend(
411 [
412 "/etc/msc_config.yaml",
413 "/etc/msc_config.json",
414 ]
415 )
416
417 return tuple(paths)
418
419
420PACKAGE_NAME = "multistorageclient"
421
422logger = logging.getLogger(__name__)
423
424
425class ImmutableDict(dict):
426 """
427 Immutable dictionary that raises an error when attempting to modify it.
428 """
429
430 def __init__(self, *args, **kwargs):
431 super().__init__(*args, **kwargs)
432
433 # Recursively freeze nested structures
434 for key, value in list(super().items()):
435 if isinstance(value, dict) and not isinstance(value, ImmutableDict):
436 super().__setitem__(key, ImmutableDict(value))
437 elif isinstance(value, list):
438 super().__setitem__(key, self._freeze_list(value))
439
440 @staticmethod
441 def _freeze_list(lst):
442 """
443 Convert list to tuple, freezing nested dicts recursively.
444 """
445 frozen = []
446 for item in lst:
447 if isinstance(item, dict):
448 frozen.append(ImmutableDict(item))
449 elif isinstance(item, list):
450 frozen.append(ImmutableDict._freeze_list(item))
451 else:
452 frozen.append(item)
453 return tuple(frozen)
454
455 def __deepcopy__(self, memo):
456 """
457 Return a regular mutable dict when deepcopy is called.
458 """
459 return copy.deepcopy(dict(self), memo)
460
461 def __reduce__(self):
462 """
463 Support for pickle serialization.
464 """
465 return (self.__class__, (dict(self),))
466
467 def _copy_value(self, value):
468 """
469 Convert frozen structures back to mutable equivalents.
470 """
471 if isinstance(value, ImmutableDict):
472 return {k: self._copy_value(v) for k, v in value.items()}
473 elif isinstance(value, tuple):
474 # Check if it was originally a list (frozen by _freeze_list)
475 return [self._copy_value(item) for item in value]
476 else:
477 return value
478
479 def __getitem__(self, key):
480 """
481 Return a mutable copy of the value.
482 """
483 value = super().__getitem__(key)
484 return self._copy_value(value)
485
486 def get(self, key, default=None):
487 """
488 Return a mutable copy of the value.
489 """
490 return self[key] if key in self else default
491
492 def __setitem__(self, key, value):
493 raise TypeError("ImmutableDict is immutable")
494
495 def __delitem__(self, key):
496 raise TypeError("ImmutableDict is immutable")
497
498 def clear(self):
499 raise TypeError("ImmutableDict is immutable")
500
501 def pop(self, *args):
502 raise TypeError("ImmutableDict is immutable")
503
504 def popitem(self):
505 raise TypeError("ImmutableDict is immutable")
506
507 def setdefault(self, key, default=None):
508 raise TypeError("ImmutableDict is immutable")
509
510 def update(self, *args, **kwargs):
511 raise TypeError("ImmutableDict is immutable")
512
513
514class SimpleProviderBundle(ProviderBundle):
515 def __init__(
516 self,
517 storage_provider_config: StorageProviderConfig,
518 credentials_provider: Optional[CredentialsProvider] = None,
519 metadata_provider: Optional[MetadataProvider] = None,
520 replicas: Optional[list[Replica]] = None,
521 ):
522 if replicas is None:
523 replicas = []
524
525 self._storage_provider_config = storage_provider_config
526 self._credentials_provider = credentials_provider
527 self._metadata_provider = metadata_provider
528 self._replicas = replicas
529
530 @property
531 def storage_provider_config(self) -> StorageProviderConfig:
532 return self._storage_provider_config
533
534 @property
535 def credentials_provider(self) -> Optional[CredentialsProvider]:
536 return self._credentials_provider
537
538 @property
539 def metadata_provider(self) -> Optional[MetadataProvider]:
540 return self._metadata_provider
541
542 @property
543 def replicas(self) -> list[Replica]:
544 return self._replicas
545
546
547class SimpleProviderBundleV2(ProviderBundleV2):
548 def __init__(
549 self, storage_backends: dict[str, StorageBackend], metadata_provider: Optional[MetadataProvider] = None
550 ):
551 self._storage_backends = storage_backends
552 self._metadata_provider = metadata_provider
553
554 @staticmethod
555 def from_v1_bundle(profile_name: str, v1_bundle: ProviderBundle) -> "SimpleProviderBundleV2":
556 backend = StorageBackend(
557 storage_provider_config=v1_bundle.storage_provider_config,
558 credentials_provider=v1_bundle.credentials_provider,
559 replicas=v1_bundle.replicas,
560 )
561
562 return SimpleProviderBundleV2(
563 storage_backends={profile_name: backend},
564 metadata_provider=v1_bundle.metadata_provider,
565 )
566
567 @property
568 def storage_backends(self) -> dict[str, StorageBackend]:
569 return self._storage_backends
570
571 @property
572 def metadata_provider(self) -> Optional[MetadataProvider]:
573 return self._metadata_provider
574
575
576DEFAULT_CACHE_REFRESH_INTERVAL = 300
577
578
579class StorageClientConfigLoader:
580 _provider_bundle: ProviderBundleV2
581 _resolved_config_dict: dict[str, Any]
582 _profiles: dict[str, Any]
583 _profile: str
584 _profile_dict: dict[str, Any]
585 _opentelemetry_dict: Optional[dict[str, Any]]
586 _telemetry_provider: Optional[Callable[[], Telemetry]]
587 _cache_dict: Optional[dict[str, Any]]
588
589 def __init__(
590 self,
591 config_dict: dict[str, Any],
592 profile: str = DEFAULT_POSIX_PROFILE_NAME,
593 provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]] = None,
594 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
595 ) -> None:
596 """
597 Initializes a :py:class:`StorageClientConfigLoader` to create a
598 StorageClientConfig. Components are built using the ``config_dict`` and
599 profile, but a pre-built provider_bundle takes precedence.
600
601 :param config_dict: Dictionary of configuration options.
602 :param profile: Name of profile in ``config_dict`` to use to build configuration.
603 :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle` or :py:class:`multistorageclient.types.ProviderBundleV2`, takes precedence over ``config_dict``.
604 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
605 """
606 # Interpolates all environment variables into actual values.
607 config_dict = expand_env_vars(config_dict)
608 self._resolved_config_dict = ImmutableDict(config_dict)
609
610 self._profiles = config_dict.get("profiles", {})
611
612 if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
613 # Assign the default POSIX profile
614 self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
615 else:
616 # Cannot override default POSIX profile
617 storage_provider_type = (
618 self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
619 )
620 if storage_provider_type != "file":
621 raise ValueError(
622 f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
623 f'"{storage_provider_type}"; expected "file".'
624 )
625
626 profile_dict = self._profiles.get(profile)
627
628 if not profile_dict:
629 raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
630
631 self._profile = profile
632 self._profile_dict = ImmutableDict(profile_dict)
633
634 self._opentelemetry_dict = config_dict.get("opentelemetry", None)
635 # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
636 # New processes (e.g. multiprocessing manager server) can't be created during this phase.
637 #
638 # Pass thunks everywhere instead for lazy telemetry initialization.
639 self._telemetry_provider = telemetry_provider or telemetry_init
640
641 self._cache_dict = config_dict.get("cache", None)
642 self._experimental_features = config_dict.get("experimental_features", None)
643
644 self._provider_bundle = self._build_provider_bundle(provider_bundle)
645 self._inject_profiles_from_bundle()
646
647 def _build_storage_provider(
648 self,
649 storage_provider_name: str,
650 storage_options: Optional[dict[str, Any]] = None,
651 credentials_provider: Optional[CredentialsProvider] = None,
652 ) -> StorageProvider:
653 if storage_options is None:
654 storage_options = {}
655 if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
656 raise ValueError(
657 f"Storage provider {storage_provider_name} is not supported. "
658 f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
659 )
660 if credentials_provider:
661 storage_options["credentials_provider"] = credentials_provider
662 if self._resolved_config_dict is not None:
663 # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
664 storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
665 if self._telemetry_provider is not None:
666 storage_options["telemetry_provider"] = self._telemetry_provider
667 class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
668 module_name = ".providers"
669 cls = import_class(class_name, module_name, PACKAGE_NAME)
670 return cls(**storage_options)
671
672 def _build_storage_provider_from_profile(self, storage_provider_profile: str):
673 storage_profile_dict = self._profiles.get(storage_provider_profile)
674 if not storage_profile_dict:
675 raise ValueError(
676 f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
677 )
678
679 # Check if metadata provider is configured for this profile
680 # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
681 local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
682 if local_metadata_provider_dict:
683 raise ValueError(
684 f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
685 )
686
687 # Initialize CredentialsProvider
688 local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
689 local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
690
691 # Initialize StorageProvider
692 local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
693 if local_storage_provider_dict:
694 local_name = local_storage_provider_dict["type"]
695 local_storage_options = local_storage_provider_dict.get("options", {})
696 else:
697 raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
698
699 storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
700 return storage_provider
701
702 def _build_credentials_provider(
703 self,
704 credentials_provider_dict: Optional[dict[str, Any]],
705 storage_options: Optional[dict[str, Any]] = None,
706 ) -> Optional[CredentialsProvider]:
707 """
708 Initializes the CredentialsProvider based on the provided dictionary.
709
710 Args:
711 credentials_provider_dict: Dictionary containing credentials provider configuration
712 storage_options: Storage provider options required by some credentials providers to scope the credentials.
713 """
714 if not credentials_provider_dict:
715 return None
716
717 if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
718 # Fully qualified class path case
719 class_type = credentials_provider_dict["type"]
720 module_name, class_name = class_type.rsplit(".", 1)
721 cls = import_class(class_name, module_name)
722 else:
723 # Mapped class name case
724 class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
725 module_name = ".providers"
726 cls = import_class(class_name, module_name, PACKAGE_NAME)
727
728 # Propagate storage provider options to credentials provider since they may be
729 # required by some credentials providers to scope the credentials.
730 import inspect
731
732 init_params = list(inspect.signature(cls.__init__).parameters)[1:] # skip 'self'
733 options = credentials_provider_dict.get("options", {})
734 if storage_options:
735 for storage_provider_option in storage_options.keys():
736 if storage_provider_option in init_params and storage_provider_option not in options:
737 options[storage_provider_option] = storage_options[storage_provider_option]
738
739 return cls(**options)
740
741 def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
742 # Initialize StorageProvider
743 storage_provider_dict = profile_dict.get("storage_provider", None)
744 if storage_provider_dict:
745 storage_provider_name = storage_provider_dict["type"]
746 storage_options = storage_provider_dict.get("options", {})
747 else:
748 raise ValueError("Missing storage_provider in the config.")
749
750 # Initialize CredentialsProvider
751 # It is prudent to assume that in some cases, the credentials provider
752 # will provide credentials scoped to specific base_path.
753 # So we need to pass the storage_options to the credentials provider.
754 credentials_provider_dict = profile_dict.get("credentials_provider", None)
755 credentials_provider = self._build_credentials_provider(
756 credentials_provider_dict=credentials_provider_dict,
757 storage_options=storage_options,
758 )
759
760 # Initialize MetadataProvider
761 metadata_provider_dict = profile_dict.get("metadata_provider", None)
762 metadata_provider = None
763 if metadata_provider_dict:
764 if metadata_provider_dict["type"] == "manifest":
765 metadata_options = metadata_provider_dict.get("options", {})
766 # If MetadataProvider has a reference to a different storage provider profile
767 storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
768 if storage_provider_profile:
769 storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
770 else:
771 storage_provider = self._build_storage_provider(
772 storage_provider_name, storage_options, credentials_provider
773 )
774
775 metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
776 else:
777 class_type = metadata_provider_dict["type"]
778 if "." not in class_type:
779 raise ValueError(
780 f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
781 )
782 module_name, class_name = class_type.rsplit(".", 1)
783 cls = import_class(class_name, module_name)
784 options = metadata_provider_dict.get("options", {})
785 metadata_provider = cls(**options)
786
787 # Build replicas if configured
788 replicas_config = profile_dict.get("replicas", [])
789 replicas = []
790 if replicas_config:
791 for replica_dict in replicas_config:
792 replicas.append(
793 Replica(
794 replica_profile=replica_dict["replica_profile"],
795 read_priority=replica_dict["read_priority"],
796 )
797 )
798
799 # Sort replicas by read_priority
800 replicas.sort(key=lambda r: r.read_priority)
801
802 return SimpleProviderBundle(
803 storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
804 credentials_provider=credentials_provider,
805 metadata_provider=metadata_provider,
806 replicas=replicas,
807 )
808
809 def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
810 class_type = provider_bundle_dict["type"]
811 module_name, class_name = class_type.rsplit(".", 1)
812 cls = import_class(class_name, module_name)
813 options = provider_bundle_dict.get("options", {})
814 return cls(**options)
815
816 def _build_provider_bundle(
817 self, provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]]
818 ) -> ProviderBundleV2:
819 if provider_bundle:
820 bundle = provider_bundle
821 else:
822 provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
823 if provider_bundle_dict:
824 bundle = self._build_provider_bundle_from_extension(provider_bundle_dict)
825 else:
826 bundle = self._build_provider_bundle_from_config(self._profile_dict)
827
828 if isinstance(bundle, ProviderBundle) and not isinstance(bundle, ProviderBundleV2):
829 bundle = SimpleProviderBundleV2.from_v1_bundle(self._profile, bundle)
830
831 return bundle
832
833 def _inject_profiles_from_bundle(self) -> None:
834 """
835 Inject child profiles and build child configs for multi-backend configurations.
836
837 For ProviderBundleV2 with multiple backends, this method:
838 1. Injects child profiles into config dict (needed for replica initialization)
839 2. Pre-builds child configs so CompositeStorageClient can use them directly
840
841 Profile injection is required because SingleStorageClient._initialize_replicas()
842 uses StorageClientConfig.from_dict() to create replica clients, which looks up
843 profiles in the config dict.
844 """
845 self._child_configs: Optional[dict[str, StorageClientConfig]] = None
846
847 backends = self._provider_bundle.storage_backends
848 if len(backends) > 1:
849 # First, inject all child profiles into config dict (needed for replica lookup)
850 profiles = copy.deepcopy(self._profiles)
851 for child_name, backend in backends.items():
852 child_profile_dict = {
853 "storage_provider": {
854 "type": backend.storage_provider_config.type,
855 "options": backend.storage_provider_config.options,
856 }
857 }
858
859 if child_name in profiles:
860 # Profile already exists - check if it matches what we would inject
861 existing = profiles[child_name]
862 if existing.get("storage_provider") != child_profile_dict["storage_provider"]:
863 raise ValueError(
864 f"Profile '{child_name}' already exists in configuration with different settings."
865 )
866 else:
867 profiles[child_name] = child_profile_dict
868
869 # Update config dict BEFORE building child configs
870 resolved_config_dict = copy.deepcopy(self._resolved_config_dict)
871 resolved_config_dict["profiles"] = profiles
872 self._profiles = ImmutableDict(profiles)
873 self._resolved_config_dict = ImmutableDict(resolved_config_dict)
874
875 # Now build child configs (they will get the updated config dict)
876 retry_config = self._build_retry_config()
877 child_configs: dict[str, StorageClientConfig] = {}
878 for child_name, backend in backends.items():
879 storage_provider = self._build_storage_provider(
880 backend.storage_provider_config.type,
881 backend.storage_provider_config.options,
882 backend.credentials_provider,
883 )
884
885 child_config = StorageClientConfig(
886 profile=child_name,
887 storage_provider=storage_provider,
888 credentials_provider=backend.credentials_provider,
889 storage_provider_profiles=None,
890 child_configs=None,
891 metadata_provider=None,
892 cache_config=None,
893 cache_manager=None,
894 retry_config=retry_config,
895 telemetry_provider=self._telemetry_provider,
896 replicas=backend.replicas,
897 autocommit_config=None,
898 )
899 child_config._config_dict = self._resolved_config_dict
900 child_configs[child_name] = child_config
901
902 self._child_configs = child_configs
903
904 def _build_retry_config(self) -> RetryConfig:
905 """Build retry config from profile dict."""
906 retry_config_dict = self._profile_dict.get("retry", None)
907 if retry_config_dict:
908 attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
909 delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
910 backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
911 return RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
912 else:
913 return RetryConfig(
914 attempts=DEFAULT_RETRY_ATTEMPTS,
915 delay=DEFAULT_RETRY_DELAY,
916 backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
917 )
918
919 def _build_autocommit_config(self) -> AutoCommitConfig:
920 """Build autocommit config from profile dict."""
921 autocommit_dict = self._profile_dict.get("autocommit", None)
922 if autocommit_dict:
923 interval_minutes = autocommit_dict.get("interval_minutes", None)
924 at_exit = autocommit_dict.get("at_exit", False)
925 return AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
926 return AutoCommitConfig()
927
928 def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
929 if "size_mb" in cache_dict:
930 raise ValueError(
931 "The 'size_mb' property is no longer supported. \n"
932 "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
933 "Example configuration:\n"
934 "cache:\n"
935 " size: 500G # Optional: If not specified, default cache size (10GB) will be used\n"
936 " use_etag: true # Optional: If not specified, default cache use_etag (true) will be used\n"
937 " location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
938 " eviction_policy: # Optional: The eviction policy to use\n"
939 " policy: fifo # Optional: The eviction policy to use, default is 'fifo'\n"
940 " refresh_interval: 300 # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
941 )
942
943 def _validate_replicas(self, replicas: list[Replica]) -> None:
944 """
945 Validates that replica profiles do not have their own replicas configuration.
946
947 This prevents circular references where a replica profile could reference
948 another profile that also has replicas, creating an infinite loop.
949
950 :param replicas: The list of Replica objects to validate
951 :raises ValueError: If any replica profile has its own replicas configuration
952 """
953 for replica in replicas:
954 replica_profile_name = replica.replica_profile
955
956 # Check that replica profile is not the same as the current profile
957 if replica_profile_name == self._profile:
958 raise ValueError(
959 f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
960 )
961
962 # Check if the replica profile exists in the configuration
963 if replica_profile_name not in self._profiles:
964 raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
965
966 # Get the replica profile configuration
967 replica_profile_dict = self._profiles[replica_profile_name]
968
969 # Check if the replica profile has its own replicas configuration
970 if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
971 raise ValueError(
972 f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
973 f"This creates a circular reference which is not allowed."
974 )
975
976 # todo: remove once experimental features are stable
977 def _validate_experimental_features(self, eviction_policy: EvictionPolicyConfig) -> None:
978 """
979 Validate that experimental features are enabled when used.
980
981 :param eviction_policy: The eviction policy configuration to validate
982 :raises ValueError: If experimental features are used without being enabled
983 """
984 # Validate MRU eviction policy
985 if eviction_policy.policy.upper() == "MRU":
986 if not (self._experimental_features and self._experimental_features.get("cache_mru_eviction")):
987 raise ValueError(
988 "MRU eviction policy is experimental and not enabled.\n"
989 "Enable it by adding to config:\n"
990 " experimental_features:\n"
991 " cache_mru_eviction: true"
992 )
993
994 # Validate purge_factor
995 if eviction_policy.purge_factor != 0:
996 if eviction_policy.purge_factor < 0 or eviction_policy.purge_factor > 100:
997 raise ValueError("purge_factor must be between 0 and 100")
998
999 if not (self._experimental_features and self._experimental_features.get("cache_purge_factor")):
1000 raise ValueError(
1001 "purge_factor is experimental and not enabled.\n"
1002 "Enable it by adding to config:\n"
1003 " experimental_features:\n"
1004 " cache_purge_factor: true"
1005 )
1006
1007 def build_config(self) -> "StorageClientConfig":
1008 bundle = self._provider_bundle
1009 backends = bundle.storage_backends
1010
1011 if len(backends) > 1:
1012 if not bundle.metadata_provider:
1013 raise ValueError(
1014 f"Multi-backend configuration for profile '{self._profile}' requires metadata_provider "
1015 "for routing between storage locations."
1016 )
1017
1018 child_profile_names = list(backends.keys())
1019 retry_config = self._build_retry_config()
1020 autocommit_config = self._build_autocommit_config()
1021
1022 # config for Composite StorageClient
1023 config = StorageClientConfig(
1024 profile=self._profile,
1025 storage_provider=None,
1026 credentials_provider=None,
1027 storage_provider_profiles=child_profile_names,
1028 child_configs=self._child_configs,
1029 metadata_provider=bundle.metadata_provider,
1030 cache_config=None,
1031 cache_manager=None,
1032 retry_config=retry_config,
1033 telemetry_provider=self._telemetry_provider,
1034 replicas=[],
1035 autocommit_config=autocommit_config,
1036 )
1037
1038 config._config_dict = self._resolved_config_dict
1039 return config
1040
1041 # Single-backend (len == 1)
1042 _, backend = list(backends.items())[0]
1043 storage_provider = self._build_storage_provider(
1044 backend.storage_provider_config.type,
1045 backend.storage_provider_config.options,
1046 backend.credentials_provider,
1047 )
1048 credentials_provider = backend.credentials_provider
1049 metadata_provider = bundle.metadata_provider
1050 replicas = backend.replicas
1051
1052 # Validate replicas to prevent circular references
1053 if replicas:
1054 self._validate_replicas(replicas)
1055
1056 cache_config: Optional[CacheConfig] = None
1057 cache_manager: Optional[CacheManager] = None
1058
1059 # Check if caching is enabled for this profile
1060 caching_enabled = self._profile_dict.get("caching_enabled", False)
1061
1062 if self._cache_dict is not None and caching_enabled:
1063 tempdir = tempfile.gettempdir()
1064 default_location = os.path.join(tempdir, "msc_cache")
1065 location = self._cache_dict.get("location", default_location)
1066
1067 # Check if cache_backend.cache_path is defined
1068 cache_backend = self._cache_dict.get("cache_backend", {})
1069 cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
1070
1071 # Warn if both location and cache_backend.cache_path are defined
1072 if cache_backend_path and self._cache_dict.get("location") is not None:
1073 logger.warning(
1074 f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
1075 f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
1076 )
1077 elif cache_backend_path:
1078 # Use cache_backend.cache_path only if location is not explicitly defined
1079 location = cache_backend_path
1080
1081 # Resolve the effective flag while preserving explicit ``False``.
1082 if "check_source_version" in self._cache_dict:
1083 check_source_version = self._cache_dict["check_source_version"]
1084 else:
1085 check_source_version = self._cache_dict.get("use_etag", True)
1086
1087 # Warn if both keys are specified – the new one wins.
1088 if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
1089 logger.warning(
1090 "Both 'check_source_version' and 'use_etag' are defined in cache config. "
1091 "Using 'check_source_version' and ignoring 'use_etag'."
1092 )
1093
1094 if not Path(location).is_absolute():
1095 raise ValueError(f"Cache location must be an absolute path: {location}")
1096
1097 # Initialize cache_dict with default values
1098 cache_dict = self._cache_dict
1099
1100 # Verify cache config
1101 self._verify_cache_config(cache_dict)
1102
1103 # Initialize eviction policy
1104 if "eviction_policy" in cache_dict:
1105 policy = cache_dict["eviction_policy"]["policy"].lower()
1106 purge_factor = cache_dict["eviction_policy"].get("purge_factor", 0)
1107 eviction_policy = EvictionPolicyConfig(
1108 policy=policy,
1109 refresh_interval=cache_dict["eviction_policy"].get(
1110 "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
1111 ),
1112 purge_factor=purge_factor,
1113 )
1114 else:
1115 eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
1116
1117 # todo: remove once experimental features are stable
1118 # Validate experimental features before creating cache_config
1119 self._validate_experimental_features(eviction_policy)
1120
1121 # Create cache config from the standardized format
1122 cache_config = CacheConfig(
1123 size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
1124 location=cache_dict.get("location", location),
1125 check_source_version=check_source_version,
1126 eviction_policy=eviction_policy,
1127 cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
1128 )
1129
1130 cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
1131 elif self._cache_dict is not None and not caching_enabled:
1132 logger.debug(f"Caching is disabled for profile '{self._profile}'")
1133 elif self._cache_dict is None and caching_enabled:
1134 logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
1135
1136 retry_config = self._build_retry_config()
1137 autocommit_config = self._build_autocommit_config()
1138
1139 config = StorageClientConfig(
1140 profile=self._profile,
1141 storage_provider=storage_provider,
1142 credentials_provider=credentials_provider,
1143 storage_provider_profiles=None,
1144 child_configs=None,
1145 metadata_provider=metadata_provider,
1146 cache_config=cache_config,
1147 cache_manager=cache_manager,
1148 retry_config=retry_config,
1149 telemetry_provider=self._telemetry_provider,
1150 replicas=replicas,
1151 autocommit_config=autocommit_config,
1152 )
1153
1154 config._config_dict = self._resolved_config_dict
1155 return config
1156
1157
1158class PathMapping:
1159 """
1160 Class to handle path mappings defined in the MSC configuration.
1161
1162 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1163 where entries are sorted by prefix length (longest first) for optimal matching.
1164 Longer paths take precedence when matching.
1165 """
1166
1167 def __init__(self):
1168 """Initialize an empty PathMapping."""
1169 self._mapping = defaultdict(lambda: defaultdict(list))
1170
1171 @classmethod
1172 def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
1173 """
1174 Create a PathMapping instance from configuration dictionary.
1175
1176 :param config_dict: Configuration dictionary, if None the config will be loaded
1177 :return: A PathMapping instance with processed mappings
1178 """
1179 if config_dict is None:
1180 # Import locally to avoid circular imports
1181 from multistorageclient.config import StorageClientConfig
1182
1183 config_dict, _ = StorageClientConfig.read_msc_config()
1184
1185 if not config_dict:
1186 return cls()
1187
1188 instance = cls()
1189 instance._load_mapping(config_dict)
1190 return instance
1191
1192 def _load_mapping(self, config_dict: dict[str, Any]) -> None:
1193 """
1194 Load path mapping from a configuration dictionary.
1195
1196 :param config_dict: Configuration dictionary containing path mapping
1197 """
1198 # Get the path_mapping section
1199 path_mapping = config_dict.get("path_mapping", {})
1200 if path_mapping is None:
1201 return
1202
1203 # Process each mapping
1204 for source_path, dest_path in path_mapping.items():
1205 # Validate format
1206 if not source_path.endswith("/"):
1207 continue
1208 if not dest_path.startswith(MSC_PROTOCOL):
1209 continue
1210 if not dest_path.endswith("/"):
1211 continue
1212
1213 # Extract the destination profile
1214 pr_dest = urlparse(dest_path)
1215 dest_profile = pr_dest.netloc
1216
1217 # Parse the source path
1218 pr = urlparse(source_path)
1219 protocol = pr.scheme.lower() if pr.scheme else "file"
1220
1221 if protocol == "file" or source_path.startswith("/"):
1222 # For file or absolute paths, use the whole path as the prefix
1223 # and leave bucket empty
1224 bucket = ""
1225 prefix = source_path if source_path.startswith("/") else pr.path
1226 else:
1227 # For object storage, extract bucket and prefix
1228 bucket = pr.netloc
1229 prefix = pr.path
1230 if prefix.startswith("/"):
1231 prefix = prefix[1:]
1232
1233 # Add the mapping to the nested dict
1234 self._mapping[protocol][bucket].append((prefix, dest_profile))
1235
1236 # Sort each bucket's prefixes by length (longest first) for optimal matching
1237 for protocol, buckets in self._mapping.items():
1238 for bucket, prefixes in buckets.items():
1239 self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
1240
1241 def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
1242 """
1243 Find the best matching mapping for the given URL.
1244
1245 :param url: URL to find matching mapping for
1246 :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
1247 """
1248 # Parse the URL
1249 pr = urlparse(url)
1250 protocol = pr.scheme.lower() if pr.scheme else "file"
1251
1252 # For file paths or absolute paths
1253 if protocol == "file" or url.startswith("/"):
1254 path = url if url.startswith("/") else pr.path
1255
1256 possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
1257
1258 # Check each prefix (already sorted by length, longest first)
1259 for prefix, profile in possible_mapping:
1260 if path.startswith(prefix):
1261 # Calculate the relative path
1262 rel_path = path[len(prefix) :]
1263 if not rel_path.startswith("/"):
1264 rel_path = "/" + rel_path
1265 return profile, rel_path
1266
1267 return None
1268
1269 # For object storage
1270 bucket = pr.netloc
1271 path = pr.path
1272 if path.startswith("/"):
1273 path = path[1:]
1274
1275 # Check bucket-specific mapping
1276 possible_mapping = (
1277 self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
1278 )
1279
1280 # Check each prefix (already sorted by length, longest first)
1281 for prefix, profile in possible_mapping:
1282 # matching prefix
1283 if path.startswith(prefix):
1284 rel_path = path[len(prefix) :]
1285 # Remove leading slash if present
1286 if rel_path.startswith("/"):
1287 rel_path = rel_path[1:]
1288
1289 return profile, rel_path
1290
1291 return None
1292
1293
[docs]
1294class StorageClientConfig:
1295 """
1296 Configuration class for the :py:class:`multistorageclient.StorageClient`.
1297 """
1298
1299 profile: str
1300 storage_provider: Optional[StorageProvider]
1301 credentials_provider: Optional[CredentialsProvider]
1302 storage_provider_profiles: Optional[list[str]]
1303 child_configs: Optional[dict[str, "StorageClientConfig"]]
1304 metadata_provider: Optional[MetadataProvider]
1305 cache_config: Optional[CacheConfig]
1306 cache_manager: Optional[CacheManager]
1307 retry_config: Optional[RetryConfig]
1308 telemetry_provider: Optional[Callable[[], Telemetry]]
1309 replicas: list[Replica]
1310 autocommit_config: Optional[AutoCommitConfig]
1311
1312 _config_dict: Optional[dict[str, Any]]
1313
1314 def __init__(
1315 self,
1316 profile: str,
1317 storage_provider: Optional[StorageProvider] = None,
1318 credentials_provider: Optional[CredentialsProvider] = None,
1319 storage_provider_profiles: Optional[list[str]] = None,
1320 child_configs: Optional[dict[str, "StorageClientConfig"]] = None,
1321 metadata_provider: Optional[MetadataProvider] = None,
1322 cache_config: Optional[CacheConfig] = None,
1323 cache_manager: Optional[CacheManager] = None,
1324 retry_config: Optional[RetryConfig] = None,
1325 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1326 replicas: Optional[list[Replica]] = None,
1327 autocommit_config: Optional[AutoCommitConfig] = None,
1328 ):
1329 # exactly one of storage_provider or storage_provider_profiles must be set
1330 if storage_provider and storage_provider_profiles:
1331 raise ValueError(
1332 "Cannot specify both storage_provider and storage_provider_profiles. "
1333 "Use storage_provider for SingleStorageClient or storage_provider_profiles for CompositeStorageClient."
1334 )
1335 if not storage_provider and not storage_provider_profiles:
1336 raise ValueError("Must specify either storage_provider or storage_provider_profiles.")
1337
1338 if replicas is None:
1339 replicas = []
1340 self.profile = profile
1341 self.storage_provider = storage_provider
1342 self.credentials_provider = credentials_provider
1343 self.storage_provider_profiles = storage_provider_profiles
1344 self.child_configs = child_configs
1345 self.metadata_provider = metadata_provider
1346 self.cache_config = cache_config
1347 self.cache_manager = cache_manager
1348 self.retry_config = retry_config
1349 self.telemetry_provider = telemetry_provider
1350 self.replicas = replicas
1351 self.autocommit_config = autocommit_config
1352
[docs]
1353 @staticmethod
1354 def from_json(
1355 config_json: str,
1356 profile: str = DEFAULT_POSIX_PROFILE_NAME,
1357 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1358 ) -> "StorageClientConfig":
1359 """
1360 Load a storage client configuration from a JSON string.
1361
1362 :param config_json: Configuration JSON string.
1363 :param profile: Profile to use.
1364 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1365 """
1366 config_dict = json.loads(config_json)
1367 return StorageClientConfig.from_dict(
1368 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
1369 )
1370
[docs]
1371 @staticmethod
1372 def from_yaml(
1373 config_yaml: str,
1374 profile: str = DEFAULT_POSIX_PROFILE_NAME,
1375 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1376 ) -> "StorageClientConfig":
1377 """
1378 Load a storage client configuration from a YAML string.
1379
1380 :param config_yaml: Configuration YAML string.
1381 :param profile: Profile to use.
1382 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1383 """
1384 config_dict = yaml.safe_load(config_yaml)
1385 return StorageClientConfig.from_dict(
1386 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
1387 )
1388
[docs]
1389 @staticmethod
1390 def from_dict(
1391 config_dict: dict[str, Any],
1392 profile: str = DEFAULT_POSIX_PROFILE_NAME,
1393 skip_validation: bool = False,
1394 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1395 ) -> "StorageClientConfig":
1396 """
1397 Load a storage client configuration from a Python dictionary.
1398
1399 :param config_dict: Configuration Python dictionary.
1400 :param profile: Profile to use.
1401 :param skip_validation: Skip configuration schema validation.
1402 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1403 """
1404 # Validate the config file with predefined JSON schema
1405 if not skip_validation:
1406 validate_config(config_dict)
1407
1408 # Load config
1409 loader = StorageClientConfigLoader(
1410 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
1411 )
1412 config = loader.build_config()
1413
1414 return config
1415
[docs]
1416 @staticmethod
1417 def from_file(
1418 config_file_paths: Optional[Iterable[str]] = None,
1419 profile: str = DEFAULT_POSIX_PROFILE_NAME,
1420 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1421 ) -> "StorageClientConfig":
1422 """
1423 Load a storage client configuration from the first file found.
1424
1425 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`).
1426 :param profile: Profile to use.
1427 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1428 """
1429 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths)
1430 # Parse rclone config file.
1431 rclone_config_dict, rclone_config_file = read_rclone_config()
1432
1433 # Merge config files.
1434 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict)
1435 if conflicted_keys:
1436 raise ValueError(
1437 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}'
1438 )
1439 merged_profiles = merged_config.get("profiles", {})
1440
1441 # Check if profile is in merged_profiles
1442 if profile in merged_profiles:
1443 return StorageClientConfig.from_dict(
1444 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider
1445 )
1446 else:
1447 # Check if profile is the default profile or an implicit profile
1448 if profile == DEFAULT_POSIX_PROFILE_NAME:
1449 implicit_profile_config = DEFAULT_POSIX_PROFILE
1450 elif profile.startswith("_"):
1451 # Handle implicit profiles
1452 parts = profile[1:].split("-", 1)
1453 if len(parts) == 2:
1454 protocol, bucket = parts
1455 # Verify it's a supported protocol
1456 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS:
1457 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"')
1458 implicit_profile_config = create_implicit_profile_config(
1459 profile_name=profile, protocol=protocol, base_path=bucket
1460 )
1461 else:
1462 raise ValueError(f'Invalid implicit profile format: "{profile}"')
1463 else:
1464 raise ValueError(
1465 f'Profile "{profile}" not found in configuration files. Configuration was checked in '
1466 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. "
1467 f"Please verify that the profile exists and that configuration files are correctly located."
1468 )
1469 # merge the implicit profile config into the merged config so the cache & observability config can be inherited
1470 if "profiles" not in merged_config:
1471 merged_config["profiles"] = implicit_profile_config["profiles"]
1472 else:
1473 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile]
1474 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_"
1475 return StorageClientConfig.from_dict(
1476 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider
1477 )
1478
1479 @staticmethod
1480 def from_provider_bundle(
1481 config_dict: dict[str, Any],
1482 provider_bundle: Union[ProviderBundle, ProviderBundleV2],
1483 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1484 ) -> "StorageClientConfig":
1485 loader = StorageClientConfigLoader(
1486 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider
1487 )
1488 config = loader.build_config()
1489 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors
1490 return config
1491
[docs]
1492 @staticmethod
1493 def read_msc_config(
1494 config_file_paths: Optional[Iterable[str]] = None,
1495 ) -> tuple[Optional[dict[str, Any]], Optional[str]]:
1496 """Get the MSC configuration dictionary and the path of the first file found.
1497
1498 If no config paths are specified, configs are searched in the following order:
1499
1500 1. ``MSC_CONFIG`` environment variable (highest precedence)
1501 2. Standard search paths (user-specified config and system-wide config)
1502
1503 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used.
1504 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration
1505 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute
1506 path of the config file used, or ``None`` if no config file was found.
1507 """
1508 config_dict: dict[str, Any] = {}
1509 config_file_path: Optional[str] = None
1510
1511 config_file_paths = list(config_file_paths or [])
1512
1513 # Add default paths if none provided.
1514 if len(config_file_paths) == 0:
1515 # Environment variable.
1516 msc_config_env = os.getenv("MSC_CONFIG", None)
1517 if msc_config_env is not None:
1518 config_file_paths.append(msc_config_env)
1519
1520 # Standard search paths.
1521 config_file_paths.extend(_find_config_file_paths())
1522
1523 # Normalize + absolutize paths.
1524 config_file_paths = [os.path.abspath(path) for path in config_file_paths]
1525
1526 # Log plan.
1527 logger.debug(f"Searching MSC config file paths: {config_file_paths}")
1528
1529 # Load config.
1530 for path in config_file_paths:
1531 if os.path.exists(path):
1532 try:
1533 with open(path) as f:
1534 if path.endswith(".json"):
1535 config_dict = json.load(f)
1536 else:
1537 config_dict = yaml.safe_load(f)
1538 config_file_path = path
1539 # Use the first config file.
1540 break
1541 except Exception as e:
1542 raise ValueError(f"malformed MSC config file: {path}, exception: {e}")
1543
1544 # Log result.
1545 if config_file_path is None:
1546 logger.debug("No MSC config files found in any of the search locations.")
1547 else:
1548 logger.debug(f"Using MSC config file: {config_file_path}")
1549
1550 if config_dict:
1551 validate_config(config_dict)
1552
1553 if "include" in config_dict and config_file_path:
1554 config_dict = _load_and_merge_includes(config_file_path, config_dict)
1555
1556 return config_dict, config_file_path
1557
[docs]
1558 @staticmethod
1559 def read_path_mapping() -> PathMapping:
1560 """
1561 Get the path mapping defined in the MSC configuration.
1562
1563 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1564 where entries are sorted by prefix length (longest first) for optimal matching.
1565 Longer paths take precedence when matching.
1566
1567 :return: A PathMapping instance with translation mappings
1568 """
1569 try:
1570 return PathMapping.from_config()
1571 except Exception:
1572 # Log the error but continue - this shouldn't stop the application from working
1573 logger.error("Failed to load path_mapping from MSC config")
1574 return PathMapping()
1575
1576 def __getstate__(self) -> dict[str, Any]:
1577 state = self.__dict__.copy()
1578 if not state.get("_config_dict"):
1579 raise ValueError("StorageClientConfig is not serializable")
1580 del state["credentials_provider"]
1581 del state["storage_provider"]
1582 del state["metadata_provider"]
1583 del state["cache_manager"]
1584 del state["replicas"]
1585 del state["child_configs"]
1586 return state
1587
1588 def __setstate__(self, state: dict[str, Any]) -> None:
1589 # Presence checked by __getstate__.
1590 config_dict = state["_config_dict"]
1591 loader = StorageClientConfigLoader(
1592 config_dict=config_dict,
1593 profile=state["profile"],
1594 telemetry_provider=state["telemetry_provider"],
1595 )
1596 new_config = loader.build_config()
1597 self.profile = new_config.profile
1598 self.storage_provider = new_config.storage_provider
1599 self.credentials_provider = new_config.credentials_provider
1600 self.storage_provider_profiles = new_config.storage_provider_profiles
1601 self.child_configs = new_config.child_configs
1602 self.metadata_provider = new_config.metadata_provider
1603 self.cache_config = new_config.cache_config
1604 self.cache_manager = new_config.cache_manager
1605 self.retry_config = new_config.retry_config
1606 self.telemetry_provider = new_config.telemetry_provider
1607 self._config_dict = config_dict
1608 self.replicas = new_config.replicas
1609 self.autocommit_config = new_config.autocommit_config