1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import copy
17import json
18import logging
19import os
20import tempfile
21from collections import defaultdict
22from collections.abc import Callable
23from pathlib import Path
24from typing import Any, Iterable, Optional, Union
25from urllib.parse import urlparse
26
27import yaml
28
29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
31from .providers.manifest_metadata import ManifestMetadataProvider
32from .rclone import read_rclone_config
33from .schema import validate_config
34from .telemetry import Telemetry
35from .telemetry import init as telemetry_init
36from .types import (
37 DEFAULT_RETRY_ATTEMPTS,
38 DEFAULT_RETRY_BACKOFF_MULTIPLIER,
39 DEFAULT_RETRY_DELAY,
40 MSC_PROTOCOL,
41 AutoCommitConfig,
42 CredentialsProvider,
43 MetadataProvider,
44 ProviderBundle,
45 ProviderBundleV2,
46 Replica,
47 RetryConfig,
48 StorageBackend,
49 StorageProvider,
50 StorageProviderConfig,
51)
52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
53
54# Constants related to implicit profiles
55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
57 "s3": "s3",
58 "gs": "gcs",
59 "ais": "ais",
60 "file": "file",
61}
62
63
64# Template for creating implicit profile configurations
65def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
66 """
67 Create a configuration dictionary for an implicit profile.
68
69 :param profile_name: The name of the profile (e.g., "_s3-bucket1")
70 :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
71 :param base_path: The base path (e.g., bucket name) for the storage provider
72
73 :return: A configuration dictionary for the implicit profile
74 """
75 provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
76 return {
77 "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
78 }
79
80
81LEGACY_POSIX_PROFILE_NAME = "default"
82RESERVED_POSIX_PROFILE_NAME = "__filesystem__"
83DEFAULT_POSIX_PROFILE = create_implicit_profile_config(RESERVED_POSIX_PROFILE_NAME, "file", "/")
84
85STORAGE_PROVIDER_MAPPING = {
86 "file": "PosixFileStorageProvider",
87 "s3": "S3StorageProvider",
88 "gcs": "GoogleStorageProvider",
89 "oci": "OracleStorageProvider",
90 "azure": "AzureBlobStorageProvider",
91 "ais": "AIStoreStorageProvider",
92 "ais_s3": "AIStoreS3StorageProvider",
93 "s8k": "S8KStorageProvider",
94 "gcs_s3": "GoogleS3StorageProvider",
95 "huggingface": "HuggingFaceStorageProvider",
96}
97
98CREDENTIALS_PROVIDER_MAPPING = {
99 "S3Credentials": "StaticS3CredentialsProvider",
100 "AzureCredentials": "StaticAzureCredentialsProvider",
101 "AISCredentials": "StaticAISCredentialProvider",
102 "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
103 "GoogleServiceAccountCredentialsProvider": "GoogleServiceAccountCredentialsProvider",
104 "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
105 "FileBasedCredentials": "FileBasedCredentialsProvider",
106}
107
108
109def _resolve_include_path(include_path: str, parent_config_path: str) -> str:
110 """
111 Resolve include path (absolute or relative to parent config file).
112
113 :param include_path: Path from include keyword (can be absolute or relative)
114 :param parent_config_path: Absolute path of the config file containing the include
115 :return: Absolute, normalized path to the included config file
116 """
117 if os.path.isabs(include_path):
118 return os.path.abspath(include_path)
119 else:
120 parent_dir = os.path.dirname(parent_config_path)
121 return os.path.abspath(os.path.join(parent_dir, include_path))
122
123
124def _merge_profiles(
125 base_profiles: dict[str, Any],
126 new_profiles: dict[str, Any],
127 base_path: str,
128 new_path: str,
129) -> dict[str, Any]:
130 """
131 Merge profiles from two config files with conflict detection.
132
133 Profiles with the same name are allowed if their definitions are identical (idempotent).
134 If a profile name exists in both configs with different definitions, an error is raised.
135
136 NOTE: This function performs SHALLOW merging - each profile is treated as a complete unit.
137 We cannot use merge_dictionaries_no_overwrite() here because it would recursively merge
138 profile fields, which could lead to unintended behavior. For example:
139
140 base_config:
141 profiles:
142 my-profile:
143 storage_provider:
144 type: s3
145 options:
146 base_path: bucket
147
148 new_config:
149 profiles:
150 my-profile:
151 credentials_provider:
152 type: S3Credentials
153 options:
154 access_key: foo
155 secret_key: bar
156
157 If we used merge_dictionaries_no_overwrite(), the result would be a profile with BOTH storage_provider
158 and credentials_provider, which might not be the intended configuration.
159
160 :param base_profiles: Profiles from the base config
161 :param new_profiles: Profiles from the new config to merge
162 :param base_path: Path to the base config file (for error messages)
163 :param new_path: Path to the new config file (for error messages)
164 :return: Merged profiles dictionary
165 :raises ValueError: If any profile name exists in both configs with different definitions
166 """
167 result = base_profiles.copy()
168 conflicting_profiles = []
169
170 for profile_name, new_profile_def in new_profiles.items():
171 if profile_name in result:
172 if result[profile_name] != new_profile_def:
173 conflicting_profiles.append(profile_name)
174 else:
175 result[profile_name] = new_profile_def
176
177 if conflicting_profiles:
178 conflicts_list = ", ".join(f"'{p}'" for p in sorted(conflicting_profiles))
179 raise ValueError(f"Profile conflict: {conflicts_list} defined differently in {base_path} and {new_path}")
180
181 return result
182
183
184def _merge_opentelemetry(
185 base_otel: dict[str, Any],
186 new_otel: dict[str, Any],
187 base_path: str,
188 new_path: str,
189) -> dict[str, Any]:
190 """
191 Merge opentelemetry configurations with special handling for metrics.attributes.
192
193 Merge strategy:
194 - metrics.attributes: Concatenate arrays (preserve order for fallback mechanism)
195 - Other fields: Use idempotent check (must be identical)
196
197 Attributes providers are concatenated to support fallback scenarios where multiple
198 providers of the same type supply the same attribute key with different sources.
199 The order matters - later providers override earlier ones.
200
201 :param base_otel: Base opentelemetry config
202 :param new_otel: New opentelemetry config to merge
203 :param base_path: Path to base config file (for error messages)
204 :param new_path: Path to new config file (for error messages)
205 :return: Merged opentelemetry configuration
206 :raises ValueError: If conflicts are detected
207 """
208 # Merge metrics.attributes
209 base_attrs = base_otel.get("metrics", {}).get("attributes", [])
210 new_attrs = new_otel.get("metrics", {}).get("attributes", [])
211 merged_attrs = base_attrs + new_attrs
212
213 # Merge other opentelemetry fields
214 base_otel_without_attrs = copy.deepcopy(base_otel)
215 if "metrics" in base_otel_without_attrs and "attributes" in base_otel_without_attrs["metrics"]:
216 del base_otel_without_attrs["metrics"]["attributes"]
217
218 new_otel_without_attrs = copy.deepcopy(new_otel)
219 if "metrics" in new_otel_without_attrs and "attributes" in new_otel_without_attrs["metrics"]:
220 del new_otel_without_attrs["metrics"]["attributes"]
221
222 merged, conflicts = merge_dictionaries_no_overwrite(
223 base_otel_without_attrs, new_otel_without_attrs, allow_idempotent=True
224 )
225
226 if conflicts:
227 conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
228 raise ValueError(
229 f"opentelemetry config conflict: {conflicts_list} defined differently in {base_path} and {new_path}"
230 )
231
232 if "metrics" in merged:
233 merged["metrics"]["attributes"] = merged_attrs
234 elif merged_attrs:
235 merged["metrics"] = {"attributes": merged_attrs}
236
237 return merged
238
239
240def _merge_configs(
241 base_config: dict[str, Any],
242 new_config: dict[str, Any],
243 base_path: str,
244 new_path: str,
245) -> dict[str, Any]:
246 """
247 Merge two config dictionaries with field-specific strategies.
248
249 Different config fields have different merge strategies:
250 - profiles: Shallow merge with idempotent check (uses _merge_profiles)
251 - path_mapping: Flat dict merge with idempotent check
252 - experimental_features: Flat dict merge with idempotent check
253 - opentelemetry: Hybrid merge (attributes concatenate, others idempotent)
254 - cache, posix: Global configs, idempotent if identical, error if different
255
256 :param base_config: Base configuration dictionary
257 :param new_config: New configuration to merge
258 :param base_path: Path to base config file (for error messages)
259 :param new_path: Path to new config file (for error messages)
260 :return: Merged configuration dictionary
261 :raises ValueError: If conflicts are detected
262 """
263 result = {}
264
265 all_keys = set(base_config.keys()) | set(new_config.keys())
266
267 for key in all_keys:
268 base_value = base_config.get(key)
269 new_value = new_config.get(key)
270
271 # Key only in base
272 if key not in new_config:
273 result[key] = base_value
274 continue
275
276 # Key only in new
277 if key not in base_config:
278 result[key] = new_value
279 continue
280
281 # Key in both - need to merge or detect conflict
282 if key == "profiles":
283 result["profiles"] = _merge_profiles(base_value or {}, new_value or {}, base_path, new_path)
284
285 elif key in ("path_mapping", "experimental_features"):
286 merged, conflicts = merge_dictionaries_no_overwrite(
287 (base_value or {}).copy(), new_value or {}, allow_idempotent=True
288 )
289 if conflicts:
290 conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
291 raise ValueError(
292 f"Config merge conflict: {conflicts_list} have different values in {base_path} and {new_path}"
293 )
294 result[key] = merged
295
296 elif key == "opentelemetry":
297 result["opentelemetry"] = _merge_opentelemetry(base_value or {}, new_value or {}, base_path, new_path)
298
299 elif key in ("cache", "posix"):
300 if base_value != new_value:
301 raise ValueError(f"'{key}' defined differently in {base_path} and {new_path}")
302 result[key] = base_value
303
304 elif key == "include":
305 # 'include' is processed by _load_and_merge_includes, not part of final config
306 pass
307
308 else:
309 # This should never happen and all top level fields must have explicit handling above
310 raise ValueError(f"Unknown field '{key}' in config file.")
311
312 return result
313
314
315def _load_and_merge_includes(
316 main_config_path: str,
317 main_config_dict: dict[str, Any],
318) -> dict[str, Any]:
319 """
320 Load and merge included config files.
321
322 Processes the 'include' directive in the main config, loading and merging all
323 specified config files. Only supports one level of includes - included files
324 cannot themselves have 'include' directives.
325
326 :param main_config_path: Absolute path to the main config file
327 :param main_config_dict: Dictionary loaded from the main config file
328 :return: Merged configuration dictionary (without 'include' field)
329 :raises ValueError: If include file not found, malformed, or contains nested includes
330 """
331 include_paths = main_config_dict.get("include", [])
332
333 if not include_paths:
334 return {k: v for k, v in main_config_dict.items() if k != "include"}
335
336 merged_config = {k: v for k, v in main_config_dict.items() if k != "include"}
337
338 for include_path in include_paths:
339 resolved_path = _resolve_include_path(include_path, main_config_path)
340
341 if not os.path.exists(resolved_path):
342 raise ValueError(f"Included config file not found: {resolved_path} (from {main_config_path})")
343
344 try:
345 with open(resolved_path) as f:
346 if resolved_path.endswith(".json"):
347 included_config = json.load(f)
348 else:
349 included_config = yaml.safe_load(f)
350 except Exception as e:
351 raise ValueError(f"Failed to load included config {resolved_path}: {e}")
352
353 validate_config(included_config)
354 if "include" in included_config:
355 raise ValueError(f"Nested includes not allowed: {resolved_path} contains 'include' directive")
356
357 merged_config = _merge_configs(merged_config, included_config, main_config_path, resolved_path)
358
359 return merged_config
360
361
362def _find_config_file_paths() -> tuple[str]:
363 """
364 Get configuration file search paths.
365
366 Returns paths in order of precedence:
367
368 1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
369 2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
370 """
371 paths = []
372
373 # 1. User-specific configuration directory
374 xdg_config_home = os.getenv("XDG_CONFIG_HOME")
375
376 if xdg_config_home:
377 paths.extend(
378 [
379 os.path.join(xdg_config_home, "msc", "config.yaml"),
380 os.path.join(xdg_config_home, "msc", "config.json"),
381 ]
382 )
383
384 user_home = os.getenv("HOME")
385
386 if user_home:
387 paths.extend(
388 [
389 os.path.join(user_home, ".msc_config.yaml"),
390 os.path.join(user_home, ".msc_config.json"),
391 os.path.join(user_home, ".config", "msc", "config.yaml"),
392 os.path.join(user_home, ".config", "msc", "config.json"),
393 ]
394 )
395
396 # 2. System-wide configuration directories
397 xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
398 if not xdg_config_dirs:
399 xdg_config_dirs = "/etc/xdg"
400
401 for config_dir in xdg_config_dirs.split(":"):
402 config_dir = config_dir.strip()
403 if config_dir:
404 paths.extend(
405 [
406 os.path.join(config_dir, "msc", "config.yaml"),
407 os.path.join(config_dir, "msc", "config.json"),
408 ]
409 )
410
411 paths.extend(
412 [
413 "/etc/msc_config.yaml",
414 "/etc/msc_config.json",
415 ]
416 )
417
418 return tuple(paths)
419
420
421def _normalize_profile_name(profile: str, config_dict: dict[str, Any]) -> str:
422 """
423 Normalize the profile name to the reserved POSIX profile name if the legacy "default" POSIX profile is used.
424
425 :param profile: The profile name to normalize
426 :param config_dict: The configuration dictionary
427 :return: The normalized profile name
428 """
429 if profile == LEGACY_POSIX_PROFILE_NAME and profile not in config_dict.get("profiles", {}):
430 logger.warning(
431 f"The profile name '{LEGACY_POSIX_PROFILE_NAME}' is deprecated and will be removed in a future version. Please use '{RESERVED_POSIX_PROFILE_NAME}' instead."
432 )
433 return RESERVED_POSIX_PROFILE_NAME
434 return profile
435
436
437PACKAGE_NAME = "multistorageclient"
438
439logger = logging.getLogger(__name__)
440
441
442class ImmutableDict(dict):
443 """
444 Immutable dictionary that raises an error when attempting to modify it.
445 """
446
447 def __init__(self, *args, **kwargs):
448 super().__init__(*args, **kwargs)
449
450 # Recursively freeze nested structures
451 for key, value in list(super().items()):
452 if isinstance(value, dict) and not isinstance(value, ImmutableDict):
453 super().__setitem__(key, ImmutableDict(value))
454 elif isinstance(value, list):
455 super().__setitem__(key, self._freeze_list(value))
456
457 @staticmethod
458 def _freeze_list(lst):
459 """
460 Convert list to tuple, freezing nested dicts recursively.
461 """
462 frozen = []
463 for item in lst:
464 if isinstance(item, dict):
465 frozen.append(ImmutableDict(item))
466 elif isinstance(item, list):
467 frozen.append(ImmutableDict._freeze_list(item))
468 else:
469 frozen.append(item)
470 return tuple(frozen)
471
472 def __deepcopy__(self, memo):
473 """
474 Return a regular mutable dict when deepcopy is called.
475 """
476 return copy.deepcopy(dict(self), memo)
477
478 def __reduce__(self):
479 """
480 Support for pickle serialization.
481 """
482 return (self.__class__, (dict(self),))
483
484 def _copy_value(self, value):
485 """
486 Convert frozen structures back to mutable equivalents.
487 """
488 if isinstance(value, ImmutableDict):
489 return {k: self._copy_value(v) for k, v in value.items()}
490 elif isinstance(value, tuple):
491 # Check if it was originally a list (frozen by _freeze_list)
492 return [self._copy_value(item) for item in value]
493 else:
494 return value
495
496 def __getitem__(self, key):
497 """
498 Return a mutable copy of the value.
499 """
500 value = super().__getitem__(key)
501 return self._copy_value(value)
502
503 def get(self, key, default=None):
504 """
505 Return a mutable copy of the value.
506 """
507 return self[key] if key in self else default
508
509 def __setitem__(self, key, value):
510 raise TypeError("ImmutableDict is immutable")
511
512 def __delitem__(self, key):
513 raise TypeError("ImmutableDict is immutable")
514
515 def clear(self):
516 raise TypeError("ImmutableDict is immutable")
517
518 def pop(self, *args):
519 raise TypeError("ImmutableDict is immutable")
520
521 def popitem(self):
522 raise TypeError("ImmutableDict is immutable")
523
524 def setdefault(self, key, default=None):
525 raise TypeError("ImmutableDict is immutable")
526
527 def update(self, *args, **kwargs):
528 raise TypeError("ImmutableDict is immutable")
529
530
531class SimpleProviderBundle(ProviderBundle):
532 def __init__(
533 self,
534 storage_provider_config: StorageProviderConfig,
535 credentials_provider: Optional[CredentialsProvider] = None,
536 metadata_provider: Optional[MetadataProvider] = None,
537 replicas: Optional[list[Replica]] = None,
538 ):
539 if replicas is None:
540 replicas = []
541
542 self._storage_provider_config = storage_provider_config
543 self._credentials_provider = credentials_provider
544 self._metadata_provider = metadata_provider
545 self._replicas = replicas
546
547 @property
548 def storage_provider_config(self) -> StorageProviderConfig:
549 return self._storage_provider_config
550
551 @property
552 def credentials_provider(self) -> Optional[CredentialsProvider]:
553 return self._credentials_provider
554
555 @property
556 def metadata_provider(self) -> Optional[MetadataProvider]:
557 return self._metadata_provider
558
559 @property
560 def replicas(self) -> list[Replica]:
561 return self._replicas
562
563
564class SimpleProviderBundleV2(ProviderBundleV2):
565 def __init__(
566 self, storage_backends: dict[str, StorageBackend], metadata_provider: Optional[MetadataProvider] = None
567 ):
568 self._storage_backends = storage_backends
569 self._metadata_provider = metadata_provider
570
571 @staticmethod
572 def from_v1_bundle(profile_name: str, v1_bundle: ProviderBundle) -> "SimpleProviderBundleV2":
573 backend = StorageBackend(
574 storage_provider_config=v1_bundle.storage_provider_config,
575 credentials_provider=v1_bundle.credentials_provider,
576 replicas=v1_bundle.replicas,
577 )
578
579 return SimpleProviderBundleV2(
580 storage_backends={profile_name: backend},
581 metadata_provider=v1_bundle.metadata_provider,
582 )
583
584 @property
585 def storage_backends(self) -> dict[str, StorageBackend]:
586 return self._storage_backends
587
588 @property
589 def metadata_provider(self) -> Optional[MetadataProvider]:
590 return self._metadata_provider
591
592
593DEFAULT_CACHE_REFRESH_INTERVAL = 300
594
595
596class StorageClientConfigLoader:
597 _provider_bundle: ProviderBundleV2
598 _resolved_config_dict: dict[str, Any]
599 _profiles: dict[str, Any]
600 _profile: str
601 _profile_dict: dict[str, Any]
602 _opentelemetry_dict: Optional[dict[str, Any]]
603 _telemetry_provider: Optional[Callable[[], Telemetry]]
604 _cache_dict: Optional[dict[str, Any]]
605
606 def __init__(
607 self,
608 config_dict: dict[str, Any],
609 profile: str = RESERVED_POSIX_PROFILE_NAME,
610 provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]] = None,
611 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
612 ) -> None:
613 """
614 Initializes a :py:class:`StorageClientConfigLoader` to create a
615 StorageClientConfig. Components are built using the ``config_dict`` and
616 profile, but a pre-built provider_bundle takes precedence.
617
618 :param config_dict: Dictionary of configuration options.
619 :param profile: Name of profile in ``config_dict`` to use to build configuration.
620 :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle` or :py:class:`multistorageclient.types.ProviderBundleV2`, takes precedence over ``config_dict``.
621 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
622 """
623 # Interpolates all environment variables into actual values.
624 config_dict = expand_env_vars(config_dict)
625 self._resolved_config_dict = ImmutableDict(config_dict)
626
627 self._profiles = config_dict.get("profiles", {})
628
629 if RESERVED_POSIX_PROFILE_NAME not in self._profiles:
630 # Assign the default POSIX profile
631 self._profiles[RESERVED_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][RESERVED_POSIX_PROFILE_NAME]
632 else:
633 # Cannot override default POSIX profile
634 if (
635 self._profiles[RESERVED_POSIX_PROFILE_NAME]
636 != DEFAULT_POSIX_PROFILE["profiles"][RESERVED_POSIX_PROFILE_NAME]
637 ):
638 raise ValueError(f'Cannot override "{RESERVED_POSIX_PROFILE_NAME}" profile with different settings.')
639
640 profile_dict = self._profiles.get(profile)
641
642 if not profile_dict:
643 raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
644
645 self._profile = profile
646 self._profile_dict = ImmutableDict(profile_dict)
647
648 self._opentelemetry_dict = config_dict.get("opentelemetry", None)
649 # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
650 # New processes (e.g. multiprocessing manager server) can't be created during this phase.
651 #
652 # Pass thunks everywhere instead for lazy telemetry initialization.
653 self._telemetry_provider = telemetry_provider or telemetry_init
654
655 self._cache_dict = config_dict.get("cache", None)
656 self._experimental_features = config_dict.get("experimental_features", None)
657
658 self._provider_bundle = self._build_provider_bundle(provider_bundle)
659 self._inject_profiles_from_bundle()
660
661 def _build_storage_provider(
662 self,
663 storage_provider_name: str,
664 storage_options: Optional[dict[str, Any]] = None,
665 credentials_provider: Optional[CredentialsProvider] = None,
666 ) -> StorageProvider:
667 if storage_options is None:
668 storage_options = {}
669 if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
670 raise ValueError(
671 f"Storage provider {storage_provider_name} is not supported. "
672 f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
673 )
674 if credentials_provider:
675 storage_options["credentials_provider"] = credentials_provider
676 if self._resolved_config_dict is not None:
677 # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
678 storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
679 if self._telemetry_provider is not None:
680 storage_options["telemetry_provider"] = self._telemetry_provider
681 class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
682 module_name = ".providers"
683 cls = import_class(class_name, module_name, PACKAGE_NAME)
684 return cls(**storage_options)
685
686 def _build_storage_provider_from_profile(self, storage_provider_profile: str):
687 storage_profile_dict = self._profiles.get(storage_provider_profile)
688 if not storage_profile_dict:
689 raise ValueError(
690 f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
691 )
692
693 # Check if metadata provider is configured for this profile
694 # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
695 local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
696 if local_metadata_provider_dict:
697 raise ValueError(
698 f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
699 )
700
701 # Initialize CredentialsProvider
702 local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
703 local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
704
705 # Initialize StorageProvider
706 local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
707 if local_storage_provider_dict:
708 local_name = local_storage_provider_dict["type"]
709 local_storage_options = local_storage_provider_dict.get("options", {})
710 else:
711 raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
712
713 storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
714 return storage_provider
715
716 def _build_credentials_provider(
717 self,
718 credentials_provider_dict: Optional[dict[str, Any]],
719 storage_options: Optional[dict[str, Any]] = None,
720 ) -> Optional[CredentialsProvider]:
721 """
722 Initializes the CredentialsProvider based on the provided dictionary.
723
724 Args:
725 credentials_provider_dict: Dictionary containing credentials provider configuration
726 storage_options: Storage provider options required by some credentials providers to scope the credentials.
727 """
728 if not credentials_provider_dict:
729 return None
730
731 if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
732 # Fully qualified class path case
733 class_type = credentials_provider_dict["type"]
734 module_name, class_name = class_type.rsplit(".", 1)
735 cls = import_class(class_name, module_name)
736 else:
737 # Mapped class name case
738 class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
739 module_name = ".providers"
740 cls = import_class(class_name, module_name, PACKAGE_NAME)
741
742 # Propagate storage provider options to credentials provider since they may be
743 # required by some credentials providers to scope the credentials.
744 import inspect
745
746 init_params = list(inspect.signature(cls.__init__).parameters)[1:] # skip 'self'
747 options = credentials_provider_dict.get("options", {})
748 if storage_options:
749 for storage_provider_option in storage_options.keys():
750 if storage_provider_option in init_params and storage_provider_option not in options:
751 options[storage_provider_option] = storage_options[storage_provider_option]
752
753 return cls(**options)
754
755 def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
756 # Initialize StorageProvider
757 storage_provider_dict = profile_dict.get("storage_provider", None)
758 if storage_provider_dict:
759 storage_provider_name = storage_provider_dict["type"]
760 storage_options = storage_provider_dict.get("options", {})
761 else:
762 raise ValueError("Missing storage_provider in the config.")
763
764 # Initialize CredentialsProvider
765 # It is prudent to assume that in some cases, the credentials provider
766 # will provide credentials scoped to specific base_path.
767 # So we need to pass the storage_options to the credentials provider.
768 credentials_provider_dict = profile_dict.get("credentials_provider", None)
769 credentials_provider = self._build_credentials_provider(
770 credentials_provider_dict=credentials_provider_dict,
771 storage_options=storage_options,
772 )
773
774 # Initialize MetadataProvider
775 metadata_provider_dict = profile_dict.get("metadata_provider", None)
776 metadata_provider = None
777 if metadata_provider_dict:
778 if metadata_provider_dict["type"] == "manifest":
779 metadata_options = metadata_provider_dict.get("options", {})
780 # If MetadataProvider has a reference to a different storage provider profile
781 storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
782 if storage_provider_profile:
783 storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
784 else:
785 storage_provider = self._build_storage_provider(
786 storage_provider_name, storage_options, credentials_provider
787 )
788
789 metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
790 else:
791 class_type = metadata_provider_dict["type"]
792 if "." not in class_type:
793 raise ValueError(
794 f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
795 )
796 module_name, class_name = class_type.rsplit(".", 1)
797 cls = import_class(class_name, module_name)
798 options = metadata_provider_dict.get("options", {})
799 metadata_provider = cls(**options)
800
801 # Build replicas if configured
802 replicas_config = profile_dict.get("replicas", [])
803 replicas = []
804 if replicas_config:
805 for replica_dict in replicas_config:
806 replicas.append(
807 Replica(
808 replica_profile=replica_dict["replica_profile"],
809 read_priority=replica_dict["read_priority"],
810 )
811 )
812
813 # Sort replicas by read_priority
814 replicas.sort(key=lambda r: r.read_priority)
815
816 return SimpleProviderBundle(
817 storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
818 credentials_provider=credentials_provider,
819 metadata_provider=metadata_provider,
820 replicas=replicas,
821 )
822
823 def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
824 class_type = provider_bundle_dict["type"]
825 module_name, class_name = class_type.rsplit(".", 1)
826 cls = import_class(class_name, module_name)
827 options = provider_bundle_dict.get("options", {})
828 return cls(**options)
829
830 def _build_provider_bundle(
831 self, provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]]
832 ) -> ProviderBundleV2:
833 if provider_bundle:
834 bundle = provider_bundle
835 else:
836 provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
837 if provider_bundle_dict:
838 bundle = self._build_provider_bundle_from_extension(provider_bundle_dict)
839 else:
840 bundle = self._build_provider_bundle_from_config(self._profile_dict)
841
842 if isinstance(bundle, ProviderBundle) and not isinstance(bundle, ProviderBundleV2):
843 bundle = SimpleProviderBundleV2.from_v1_bundle(self._profile, bundle)
844
845 return bundle
846
847 def _inject_profiles_from_bundle(self) -> None:
848 """
849 Inject child profiles and build child configs for multi-backend configurations.
850
851 For ProviderBundleV2 with multiple backends, this method:
852 1. Injects child profiles into config dict (needed for replica initialization)
853 2. Pre-builds child configs so CompositeStorageClient can use them directly
854
855 Profile injection is required because SingleStorageClient._initialize_replicas()
856 uses StorageClientConfig.from_dict() to create replica clients, which looks up
857 profiles in the config dict.
858 """
859 self._child_configs: Optional[dict[str, StorageClientConfig]] = None
860
861 backends = self._provider_bundle.storage_backends
862 if len(backends) > 1:
863 # First, inject all child profiles into config dict (needed for replica lookup)
864 profiles = copy.deepcopy(self._profiles)
865 for child_name, backend in backends.items():
866 child_profile_dict = {
867 "storage_provider": {
868 "type": backend.storage_provider_config.type,
869 "options": backend.storage_provider_config.options,
870 }
871 }
872
873 if child_name in profiles:
874 # Profile already exists - check if it matches what we would inject
875 existing = profiles[child_name]
876 if existing.get("storage_provider") != child_profile_dict["storage_provider"]:
877 raise ValueError(
878 f"Profile '{child_name}' already exists in configuration with different settings."
879 )
880 else:
881 profiles[child_name] = child_profile_dict
882
883 # Update config dict BEFORE building child configs
884 resolved_config_dict = copy.deepcopy(self._resolved_config_dict)
885 resolved_config_dict["profiles"] = profiles
886 self._profiles = ImmutableDict(profiles)
887 self._resolved_config_dict = ImmutableDict(resolved_config_dict)
888
889 # Now build child configs (they will get the updated config dict)
890 retry_config = self._build_retry_config()
891 child_configs: dict[str, StorageClientConfig] = {}
892 for child_name, backend in backends.items():
893 storage_provider = self._build_storage_provider(
894 backend.storage_provider_config.type,
895 backend.storage_provider_config.options,
896 backend.credentials_provider,
897 )
898
899 child_config = StorageClientConfig(
900 profile=child_name,
901 storage_provider=storage_provider,
902 credentials_provider=backend.credentials_provider,
903 storage_provider_profiles=None,
904 child_configs=None,
905 metadata_provider=None,
906 cache_config=None,
907 cache_manager=None,
908 retry_config=retry_config,
909 telemetry_provider=self._telemetry_provider,
910 replicas=backend.replicas,
911 autocommit_config=None,
912 )
913 child_config._config_dict = self._resolved_config_dict
914 child_configs[child_name] = child_config
915
916 self._child_configs = child_configs
917
918 def _build_retry_config(self) -> RetryConfig:
919 """Build retry config from profile dict."""
920 retry_config_dict = self._profile_dict.get("retry", None)
921 if retry_config_dict:
922 attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
923 delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
924 backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
925 return RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
926 else:
927 return RetryConfig(
928 attempts=DEFAULT_RETRY_ATTEMPTS,
929 delay=DEFAULT_RETRY_DELAY,
930 backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
931 )
932
933 def _build_autocommit_config(self) -> AutoCommitConfig:
934 """Build autocommit config from profile dict."""
935 autocommit_dict = self._profile_dict.get("autocommit", None)
936 if autocommit_dict:
937 interval_minutes = autocommit_dict.get("interval_minutes", None)
938 at_exit = autocommit_dict.get("at_exit", False)
939 return AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
940 return AutoCommitConfig()
941
942 def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
943 if "size_mb" in cache_dict:
944 raise ValueError(
945 "The 'size_mb' property is no longer supported. \n"
946 "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
947 "Example configuration:\n"
948 "cache:\n"
949 " size: 500G # Optional: Maximum cache size (default: 10G)\n"
950 " cache_line_size: 64M # Optional: Chunk size for partial file caching (default: 64M)\n"
951 " check_source_version: true # Optional: Use ETag for cache validation (default: true)\n"
952 " location: /tmp/msc_cache # Optional: Cache directory path (default: system tempdir + '/msc_cache')\n"
953 " eviction_policy: # Optional: Cache eviction policy\n"
954 " policy: fifo # Optional: Policy type: lru, mru, fifo, random, no_eviction (default: fifo)\n"
955 " refresh_interval: 300 # Optional: Cache refresh interval in seconds (default: 300)\n"
956 )
957
958 # Validate that cache_line_size doesn't exceed cache size
959 cache_size_str = cache_dict.get("size", DEFAULT_CACHE_SIZE)
960 cache_line_size_str = cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE)
961
962 # Use CacheConfig to convert sizes to bytes for comparison
963 temp_cache_config = CacheConfig(
964 size=cache_size_str,
965 cache_line_size=cache_line_size_str,
966 )
967 cache_size_bytes = temp_cache_config.size_bytes()
968 cache_line_size_bytes = temp_cache_config.cache_line_size_bytes()
969
970 if cache_line_size_bytes > cache_size_bytes:
971 raise ValueError(
972 f"cache_line_size ({cache_line_size_str}) exceeds cache size ({cache_size_str}). "
973 f"cache_line_size must be less than or equal to cache size. "
974 f"Consider increasing cache size or decreasing cache_line_size."
975 )
976
977 def _validate_replicas(self, replicas: list[Replica]) -> None:
978 """
979 Validates that replica profiles do not have their own replicas configuration.
980
981 This prevents circular references where a replica profile could reference
982 another profile that also has replicas, creating an infinite loop.
983
984 :param replicas: The list of Replica objects to validate
985 :raises ValueError: If any replica profile has its own replicas configuration
986 """
987 for replica in replicas:
988 replica_profile_name = replica.replica_profile
989
990 # Check that replica profile is not the same as the current profile
991 if replica_profile_name == self._profile:
992 raise ValueError(
993 f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
994 )
995
996 # Check if the replica profile exists in the configuration
997 if replica_profile_name not in self._profiles:
998 raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
999
1000 # Get the replica profile configuration
1001 replica_profile_dict = self._profiles[replica_profile_name]
1002
1003 # Check if the replica profile has its own replicas configuration
1004 if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
1005 raise ValueError(
1006 f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
1007 f"This creates a circular reference which is not allowed."
1008 )
1009
1010 # todo: remove once experimental features are stable
1011 def _validate_experimental_features(self, eviction_policy: EvictionPolicyConfig) -> None:
1012 """
1013 Validate that experimental features are enabled when used.
1014
1015 :param eviction_policy: The eviction policy configuration to validate
1016 :raises ValueError: If experimental features are used without being enabled
1017 """
1018 # Validate MRU eviction policy
1019 if eviction_policy.policy.upper() == "MRU":
1020 if not (self._experimental_features and self._experimental_features.get("cache_mru_eviction")):
1021 raise ValueError(
1022 "MRU eviction policy is experimental and not enabled.\n"
1023 "Enable it by adding to config:\n"
1024 " experimental_features:\n"
1025 " cache_mru_eviction: true"
1026 )
1027
1028 # Validate purge_factor
1029 if eviction_policy.purge_factor != 0:
1030 if eviction_policy.purge_factor < 0 or eviction_policy.purge_factor > 100:
1031 raise ValueError("purge_factor must be between 0 and 100")
1032
1033 if not (self._experimental_features and self._experimental_features.get("cache_purge_factor")):
1034 raise ValueError(
1035 "purge_factor is experimental and not enabled.\n"
1036 "Enable it by adding to config:\n"
1037 " experimental_features:\n"
1038 " cache_purge_factor: true"
1039 )
1040
1041 def build_config(self) -> "StorageClientConfig":
1042 bundle = self._provider_bundle
1043 backends = bundle.storage_backends
1044
1045 if len(backends) > 1:
1046 if not bundle.metadata_provider:
1047 raise ValueError(
1048 f"Multi-backend configuration for profile '{self._profile}' requires metadata_provider "
1049 "for routing between storage locations."
1050 )
1051
1052 child_profile_names = list(backends.keys())
1053 retry_config = self._build_retry_config()
1054 autocommit_config = self._build_autocommit_config()
1055
1056 # config for Composite StorageClient
1057 config = StorageClientConfig(
1058 profile=self._profile,
1059 storage_provider=None,
1060 credentials_provider=None,
1061 storage_provider_profiles=child_profile_names,
1062 child_configs=self._child_configs,
1063 metadata_provider=bundle.metadata_provider,
1064 cache_config=None,
1065 cache_manager=None,
1066 retry_config=retry_config,
1067 telemetry_provider=self._telemetry_provider,
1068 replicas=[],
1069 autocommit_config=autocommit_config,
1070 )
1071
1072 config._config_dict = self._resolved_config_dict
1073 return config
1074
1075 # Single-backend (len == 1)
1076 _, backend = list(backends.items())[0]
1077 storage_provider = self._build_storage_provider(
1078 backend.storage_provider_config.type,
1079 backend.storage_provider_config.options,
1080 backend.credentials_provider,
1081 )
1082 credentials_provider = backend.credentials_provider
1083 metadata_provider = bundle.metadata_provider
1084 replicas = backend.replicas
1085
1086 # Validate replicas to prevent circular references
1087 if replicas:
1088 self._validate_replicas(replicas)
1089
1090 cache_config: Optional[CacheConfig] = None
1091 cache_manager: Optional[CacheManager] = None
1092
1093 # Check if caching is enabled for this profile
1094 caching_enabled = self._profile_dict.get("caching_enabled", False)
1095
1096 if self._cache_dict is not None and caching_enabled:
1097 tempdir = tempfile.gettempdir()
1098 default_location = os.path.join(tempdir, "msc_cache")
1099 location = self._cache_dict.get("location", default_location)
1100
1101 # Check if cache_backend.cache_path is defined
1102 cache_backend = self._cache_dict.get("cache_backend", {})
1103 cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
1104
1105 # Warn if both location and cache_backend.cache_path are defined
1106 if cache_backend_path and self._cache_dict.get("location") is not None:
1107 logger.warning(
1108 f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
1109 f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
1110 )
1111 elif cache_backend_path:
1112 # Use cache_backend.cache_path only if location is not explicitly defined
1113 location = cache_backend_path
1114
1115 # Resolve the effective flag while preserving explicit ``False``.
1116 if "check_source_version" in self._cache_dict:
1117 check_source_version = self._cache_dict["check_source_version"]
1118 else:
1119 check_source_version = self._cache_dict.get("use_etag", True)
1120
1121 # Warn if both keys are specified – the new one wins.
1122 if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
1123 logger.warning(
1124 "Both 'check_source_version' and 'use_etag' are defined in cache config. "
1125 "Using 'check_source_version' and ignoring 'use_etag'."
1126 )
1127
1128 if not Path(location).is_absolute():
1129 raise ValueError(f"Cache location must be an absolute path: {location}")
1130
1131 # Initialize cache_dict with default values
1132 cache_dict = self._cache_dict
1133
1134 # Verify cache config
1135 self._verify_cache_config(cache_dict)
1136
1137 # Initialize eviction policy
1138 if "eviction_policy" in cache_dict:
1139 policy = cache_dict["eviction_policy"]["policy"].lower()
1140 purge_factor = cache_dict["eviction_policy"].get("purge_factor", 0)
1141 eviction_policy = EvictionPolicyConfig(
1142 policy=policy,
1143 refresh_interval=cache_dict["eviction_policy"].get(
1144 "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
1145 ),
1146 purge_factor=purge_factor,
1147 )
1148 else:
1149 eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
1150
1151 # todo: remove once experimental features are stable
1152 # Validate experimental features before creating cache_config
1153 self._validate_experimental_features(eviction_policy)
1154
1155 # Create cache config from the standardized format
1156 cache_config = CacheConfig(
1157 size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
1158 location=cache_dict.get("location", location),
1159 check_source_version=check_source_version,
1160 eviction_policy=eviction_policy,
1161 cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
1162 )
1163
1164 cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
1165 elif self._cache_dict is not None and not caching_enabled:
1166 logger.debug(f"Caching is disabled for profile '{self._profile}'")
1167 elif self._cache_dict is None and caching_enabled:
1168 logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
1169
1170 retry_config = self._build_retry_config()
1171 autocommit_config = self._build_autocommit_config()
1172
1173 config = StorageClientConfig(
1174 profile=self._profile,
1175 storage_provider=storage_provider,
1176 credentials_provider=credentials_provider,
1177 storage_provider_profiles=None,
1178 child_configs=None,
1179 metadata_provider=metadata_provider,
1180 cache_config=cache_config,
1181 cache_manager=cache_manager,
1182 retry_config=retry_config,
1183 telemetry_provider=self._telemetry_provider,
1184 replicas=replicas,
1185 autocommit_config=autocommit_config,
1186 )
1187
1188 config._config_dict = self._resolved_config_dict
1189 return config
1190
1191
1192class PathMapping:
1193 """
1194 Class to handle path mappings defined in the MSC configuration.
1195
1196 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1197 where entries are sorted by prefix length (longest first) for optimal matching.
1198 Longer paths take precedence when matching.
1199 """
1200
1201 def __init__(self):
1202 """Initialize an empty PathMapping."""
1203 self._mapping = defaultdict(lambda: defaultdict(list))
1204
1205 @classmethod
1206 def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
1207 """
1208 Create a PathMapping instance from configuration dictionary.
1209
1210 :param config_dict: Configuration dictionary, if None the config will be loaded
1211 :return: A PathMapping instance with processed mappings
1212 """
1213 if config_dict is None:
1214 # Import locally to avoid circular imports
1215 from multistorageclient.config import StorageClientConfig
1216
1217 config_dict, _ = StorageClientConfig.read_msc_config()
1218
1219 if not config_dict:
1220 return cls()
1221
1222 instance = cls()
1223 instance._load_mapping(config_dict)
1224 return instance
1225
1226 def _load_mapping(self, config_dict: dict[str, Any]) -> None:
1227 """
1228 Load path mapping from a configuration dictionary.
1229
1230 :param config_dict: Configuration dictionary containing path mapping
1231 """
1232 # Get the path_mapping section
1233 path_mapping = config_dict.get("path_mapping", {})
1234 if path_mapping is None:
1235 return
1236
1237 # Process each mapping
1238 for source_path, dest_path in path_mapping.items():
1239 # Validate format
1240 if not source_path.endswith("/"):
1241 continue
1242 if not dest_path.startswith(MSC_PROTOCOL):
1243 continue
1244 if not dest_path.endswith("/"):
1245 continue
1246
1247 # Extract the destination profile
1248 pr_dest = urlparse(dest_path)
1249 dest_profile = pr_dest.netloc
1250
1251 # Parse the source path
1252 pr = urlparse(source_path)
1253 protocol = pr.scheme.lower() if pr.scheme else "file"
1254
1255 if protocol == "file" or source_path.startswith("/"):
1256 # For file or absolute paths, use the whole path as the prefix
1257 # and leave bucket empty
1258 bucket = ""
1259 prefix = source_path if source_path.startswith("/") else pr.path
1260 else:
1261 # For object storage, extract bucket and prefix
1262 bucket = pr.netloc
1263 prefix = pr.path
1264 if prefix.startswith("/"):
1265 prefix = prefix[1:]
1266
1267 # Add the mapping to the nested dict
1268 self._mapping[protocol][bucket].append((prefix, dest_profile))
1269
1270 # Sort each bucket's prefixes by length (longest first) for optimal matching
1271 for protocol, buckets in self._mapping.items():
1272 for bucket, prefixes in buckets.items():
1273 self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
1274
1275 def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
1276 """
1277 Find the best matching mapping for the given URL.
1278
1279 :param url: URL to find matching mapping for
1280 :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
1281 """
1282 # Parse the URL
1283 pr = urlparse(url)
1284 protocol = pr.scheme.lower() if pr.scheme else "file"
1285
1286 # For file paths or absolute paths
1287 if protocol == "file" or url.startswith("/"):
1288 path = url if url.startswith("/") else pr.path
1289
1290 possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
1291
1292 # Check each prefix (already sorted by length, longest first)
1293 for prefix, profile in possible_mapping:
1294 if path.startswith(prefix):
1295 # Calculate the relative path
1296 rel_path = path[len(prefix) :]
1297 if not rel_path.startswith("/"):
1298 rel_path = "/" + rel_path
1299 return profile, rel_path
1300
1301 return None
1302
1303 # For object storage
1304 bucket = pr.netloc
1305 path = pr.path
1306 if path.startswith("/"):
1307 path = path[1:]
1308
1309 # Check bucket-specific mapping
1310 possible_mapping = (
1311 self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
1312 )
1313
1314 # Check each prefix (already sorted by length, longest first)
1315 for prefix, profile in possible_mapping:
1316 # matching prefix
1317 if path.startswith(prefix):
1318 rel_path = path[len(prefix) :]
1319 # Remove leading slash if present
1320 if rel_path.startswith("/"):
1321 rel_path = rel_path[1:]
1322
1323 return profile, rel_path
1324
1325 return None
1326
1327
[docs]
1328class StorageClientConfig:
1329 """
1330 Configuration class for the :py:class:`multistorageclient.StorageClient`.
1331 """
1332
1333 profile: str
1334 storage_provider: Optional[StorageProvider]
1335 credentials_provider: Optional[CredentialsProvider]
1336 storage_provider_profiles: Optional[list[str]]
1337 child_configs: Optional[dict[str, "StorageClientConfig"]]
1338 metadata_provider: Optional[MetadataProvider]
1339 cache_config: Optional[CacheConfig]
1340 cache_manager: Optional[CacheManager]
1341 retry_config: Optional[RetryConfig]
1342 telemetry_provider: Optional[Callable[[], Telemetry]]
1343 replicas: list[Replica]
1344 autocommit_config: Optional[AutoCommitConfig]
1345
1346 _config_dict: Optional[dict[str, Any]]
1347
1348 def __init__(
1349 self,
1350 profile: str,
1351 storage_provider: Optional[StorageProvider] = None,
1352 credentials_provider: Optional[CredentialsProvider] = None,
1353 storage_provider_profiles: Optional[list[str]] = None,
1354 child_configs: Optional[dict[str, "StorageClientConfig"]] = None,
1355 metadata_provider: Optional[MetadataProvider] = None,
1356 cache_config: Optional[CacheConfig] = None,
1357 cache_manager: Optional[CacheManager] = None,
1358 retry_config: Optional[RetryConfig] = None,
1359 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1360 replicas: Optional[list[Replica]] = None,
1361 autocommit_config: Optional[AutoCommitConfig] = None,
1362 ):
1363 # exactly one of storage_provider or storage_provider_profiles must be set
1364 if storage_provider and storage_provider_profiles:
1365 raise ValueError(
1366 "Cannot specify both storage_provider and storage_provider_profiles. "
1367 "Use storage_provider for SingleStorageClient or storage_provider_profiles for CompositeStorageClient."
1368 )
1369 if not storage_provider and not storage_provider_profiles:
1370 raise ValueError("Must specify either storage_provider or storage_provider_profiles.")
1371
1372 if replicas is None:
1373 replicas = []
1374 self.profile = profile
1375 self.storage_provider = storage_provider
1376 self.credentials_provider = credentials_provider
1377 self.storage_provider_profiles = storage_provider_profiles
1378 self.child_configs = child_configs
1379 self.metadata_provider = metadata_provider
1380 self.cache_config = cache_config
1381 self.cache_manager = cache_manager
1382 self.retry_config = retry_config
1383 self.telemetry_provider = telemetry_provider
1384 self.replicas = replicas
1385 self.autocommit_config = autocommit_config
1386
[docs]
1387 @staticmethod
1388 def from_json(
1389 config_json: str,
1390 profile: str = RESERVED_POSIX_PROFILE_NAME,
1391 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1392 ) -> "StorageClientConfig":
1393 """
1394 Load a storage client configuration from a JSON string.
1395
1396 :param config_json: Configuration JSON string.
1397 :param profile: Profile to use.
1398 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1399 """
1400 config_dict = json.loads(config_json)
1401 return StorageClientConfig.from_dict(
1402 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
1403 )
1404
[docs]
1405 @staticmethod
1406 def from_yaml(
1407 config_yaml: str,
1408 profile: str = RESERVED_POSIX_PROFILE_NAME,
1409 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1410 ) -> "StorageClientConfig":
1411 """
1412 Load a storage client configuration from a YAML string.
1413
1414 :param config_yaml: Configuration YAML string.
1415 :param profile: Profile to use.
1416 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1417 """
1418 config_dict = yaml.safe_load(config_yaml) or {}
1419 return StorageClientConfig.from_dict(
1420 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
1421 )
1422
[docs]
1423 @staticmethod
1424 def from_dict(
1425 config_dict: dict[str, Any],
1426 profile: str = RESERVED_POSIX_PROFILE_NAME,
1427 skip_validation: bool = False,
1428 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1429 ) -> "StorageClientConfig":
1430 """
1431 Load a storage client configuration from a Python dictionary.
1432
1433 :param config_dict: Configuration Python dictionary.
1434 :param profile: Profile to use.
1435 :param skip_validation: Skip configuration schema validation.
1436 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1437 """
1438 # Validate the config file with predefined JSON schema
1439 if not skip_validation:
1440 validate_config(config_dict)
1441
1442 # Load config
1443 loader = StorageClientConfigLoader(
1444 config_dict=config_dict,
1445 profile=_normalize_profile_name(profile, config_dict),
1446 telemetry_provider=telemetry_provider,
1447 )
1448 config = loader.build_config()
1449
1450 return config
1451
[docs]
1452 @staticmethod
1453 def from_file(
1454 config_file_paths: Optional[Iterable[str]] = None,
1455 profile: str = RESERVED_POSIX_PROFILE_NAME,
1456 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1457 ) -> "StorageClientConfig":
1458 """
1459 Load a storage client configuration from the first file found.
1460
1461 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`).
1462 :param profile: Profile to use.
1463 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1464 """
1465 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths)
1466 # Parse rclone config file.
1467 rclone_config_dict, rclone_config_file = read_rclone_config()
1468
1469 # Merge config files.
1470 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict)
1471 if conflicted_keys:
1472 raise ValueError(
1473 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}'
1474 )
1475 merged_profiles = merged_config.get("profiles", {})
1476
1477 # Check if profile is in merged_profiles
1478 if profile in merged_profiles:
1479 return StorageClientConfig.from_dict(
1480 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider
1481 )
1482 else:
1483 # Check if profile is the default POSIX profile or an implicit profile
1484 if profile == RESERVED_POSIX_PROFILE_NAME or profile == LEGACY_POSIX_PROFILE_NAME:
1485 implicit_profile_config = DEFAULT_POSIX_PROFILE
1486 elif profile.startswith("_"):
1487 # Handle implicit profiles
1488 parts = profile[1:].split("-", 1)
1489 if len(parts) == 2:
1490 protocol, bucket = parts
1491 # Verify it's a supported protocol
1492 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS:
1493 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"')
1494 implicit_profile_config = create_implicit_profile_config(
1495 profile_name=profile, protocol=protocol, base_path=bucket
1496 )
1497 else:
1498 raise ValueError(f'Invalid implicit profile format: "{profile}"')
1499 else:
1500 raise ValueError(
1501 f'Profile "{profile}" not found in configuration files. Configuration was checked in '
1502 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. "
1503 f"Please verify that the profile exists and that configuration files are correctly located."
1504 )
1505 # merge the implicit profile config into the merged config so the cache & observability config can be inherited
1506 if "profiles" not in merged_config:
1507 merged_config["profiles"] = implicit_profile_config["profiles"]
1508 else:
1509 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile]
1510 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_"
1511 return StorageClientConfig.from_dict(
1512 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider
1513 )
1514
1515 @staticmethod
1516 def from_provider_bundle(
1517 config_dict: dict[str, Any],
1518 provider_bundle: Union[ProviderBundle, ProviderBundleV2],
1519 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1520 ) -> "StorageClientConfig":
1521 loader = StorageClientConfigLoader(
1522 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider
1523 )
1524 config = loader.build_config()
1525 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors
1526 return config
1527
[docs]
1528 @staticmethod
1529 def read_msc_config(
1530 config_file_paths: Optional[Iterable[str]] = None,
1531 ) -> tuple[Optional[dict[str, Any]], Optional[str]]:
1532 """Get the MSC configuration dictionary and the path of the first file found.
1533
1534 If no config paths are specified, configs are searched in the following order:
1535
1536 1. ``MSC_CONFIG`` environment variable (highest precedence)
1537 2. Standard search paths (user-specified config and system-wide config)
1538
1539 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used.
1540 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration
1541 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute
1542 path of the config file used, or ``None`` if no config file was found.
1543 """
1544 config_dict: dict[str, Any] = {}
1545 config_file_path: Optional[str] = None
1546
1547 config_file_paths = list(config_file_paths or [])
1548
1549 # Add default paths if none provided.
1550 if len(config_file_paths) == 0:
1551 # Environment variable.
1552 msc_config_env = os.getenv("MSC_CONFIG", None)
1553 if msc_config_env is not None:
1554 config_file_paths.append(msc_config_env)
1555
1556 # Standard search paths.
1557 config_file_paths.extend(_find_config_file_paths())
1558
1559 # Normalize + absolutize paths.
1560 config_file_paths = [os.path.abspath(path) for path in config_file_paths]
1561
1562 # Log plan.
1563 logger.debug(f"Searching MSC config file paths: {config_file_paths}")
1564
1565 # Load config.
1566 for path in config_file_paths:
1567 if os.path.exists(path):
1568 try:
1569 with open(path) as f:
1570 if path.endswith(".json"):
1571 config_dict = json.load(f)
1572 else:
1573 config_dict = yaml.safe_load(f)
1574 config_file_path = path
1575 # Use the first config file.
1576 break
1577 except Exception as e:
1578 raise ValueError(f"malformed MSC config file: {path}, exception: {e}")
1579
1580 # Log result.
1581 if config_file_path is None:
1582 logger.debug("No MSC config files found in any of the search locations.")
1583 else:
1584 logger.debug(f"Using MSC config file: {config_file_path}")
1585
1586 if config_dict:
1587 validate_config(config_dict)
1588
1589 if "include" in config_dict and config_file_path:
1590 config_dict = _load_and_merge_includes(config_file_path, config_dict)
1591
1592 return config_dict, config_file_path
1593
[docs]
1594 @staticmethod
1595 def read_path_mapping() -> PathMapping:
1596 """
1597 Get the path mapping defined in the MSC configuration.
1598
1599 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1600 where entries are sorted by prefix length (longest first) for optimal matching.
1601 Longer paths take precedence when matching.
1602
1603 :return: A PathMapping instance with translation mappings
1604 """
1605 try:
1606 return PathMapping.from_config()
1607 except Exception:
1608 # Log the error but continue - this shouldn't stop the application from working
1609 logger.error("Failed to load path_mapping from MSC config")
1610 return PathMapping()
1611
1612 def __getstate__(self) -> dict[str, Any]:
1613 state = self.__dict__.copy()
1614 if not state.get("_config_dict"):
1615 raise ValueError("StorageClientConfig is not serializable")
1616 del state["credentials_provider"]
1617 del state["storage_provider"]
1618 del state["metadata_provider"]
1619 del state["cache_manager"]
1620 del state["replicas"]
1621 del state["child_configs"]
1622 return state
1623
1624 def __setstate__(self, state: dict[str, Any]) -> None:
1625 # Presence checked by __getstate__.
1626 config_dict = state["_config_dict"]
1627 loader = StorageClientConfigLoader(
1628 config_dict=config_dict,
1629 profile=state["profile"],
1630 telemetry_provider=state["telemetry_provider"],
1631 )
1632 new_config = loader.build_config()
1633 self.profile = new_config.profile
1634 self.storage_provider = new_config.storage_provider
1635 self.credentials_provider = new_config.credentials_provider
1636 self.storage_provider_profiles = new_config.storage_provider_profiles
1637 self.child_configs = new_config.child_configs
1638 self.metadata_provider = new_config.metadata_provider
1639 self.cache_config = new_config.cache_config
1640 self.cache_manager = new_config.cache_manager
1641 self.retry_config = new_config.retry_config
1642 self.telemetry_provider = new_config.telemetry_provider
1643 self._config_dict = config_dict
1644 self.replicas = new_config.replicas
1645 self.autocommit_config = new_config.autocommit_config