1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import copy
17import json
18import logging
19import os
20import tempfile
21from collections import defaultdict
22from collections.abc import Callable
23from pathlib import Path
24from typing import Any, Iterable, Optional, Union
25from urllib.parse import urlparse
26
27import yaml
28
29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
31from .providers.manifest_metadata import ManifestMetadataProvider
32from .rclone import read_rclone_config
33from .schema import validate_config
34from .telemetry import Telemetry
35from .telemetry import init as telemetry_init
36from .types import (
37 DEFAULT_RETRY_ATTEMPTS,
38 DEFAULT_RETRY_BACKOFF_MULTIPLIER,
39 DEFAULT_RETRY_DELAY,
40 MSC_PROTOCOL,
41 AutoCommitConfig,
42 CredentialsProvider,
43 MetadataProvider,
44 ProviderBundle,
45 ProviderBundleV2,
46 Replica,
47 RetryConfig,
48 StorageBackend,
49 StorageProvider,
50 StorageProviderConfig,
51)
52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
53
54# Constants related to implicit profiles
55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
57 "s3": "s3",
58 "gs": "gcs",
59 "ais": "ais",
60 "file": "file",
61}
62
63
64# Template for creating implicit profile configurations
65def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
66 """
67 Create a configuration dictionary for an implicit profile.
68
69 :param profile_name: The name of the profile (e.g., "_s3-bucket1")
70 :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
71 :param base_path: The base path (e.g., bucket name) for the storage provider
72
73 :return: A configuration dictionary for the implicit profile
74 """
75 provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
76 return {
77 "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
78 }
79
80
81LEGACY_POSIX_PROFILE_NAME = "default"
82RESERVED_POSIX_PROFILE_NAME = "__filesystem__"
83DEFAULT_POSIX_PROFILE = create_implicit_profile_config(RESERVED_POSIX_PROFILE_NAME, "file", "/")
84
85STORAGE_PROVIDER_MAPPING = {
86 "file": "PosixFileStorageProvider",
87 "s3": "S3StorageProvider",
88 "gcs": "GoogleStorageProvider",
89 "oci": "OracleStorageProvider",
90 "azure": "AzureBlobStorageProvider",
91 "ais": "AIStoreStorageProvider",
92 "ais_s3": "AIStoreS3StorageProvider",
93 "s8k": "S8KStorageProvider",
94 "gcs_s3": "GoogleS3StorageProvider",
95 "huggingface": "HuggingFaceStorageProvider",
96}
97
98CREDENTIALS_PROVIDER_MAPPING = {
99 "S3Credentials": "StaticS3CredentialsProvider",
100 "AzureCredentials": "StaticAzureCredentialsProvider",
101 "DefaultAzureCredentials": "DefaultAzureCredentialsProvider",
102 "AISCredentials": "StaticAISCredentialProvider",
103 "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
104 "GoogleServiceAccountCredentialsProvider": "GoogleServiceAccountCredentialsProvider",
105 "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
106 "FileBasedCredentials": "FileBasedCredentialsProvider",
107}
108
109
110def _resolve_include_path(include_path: str, parent_config_path: str) -> str:
111 """
112 Resolve include path (absolute or relative to parent config file).
113
114 :param include_path: Path from include keyword (can be absolute or relative)
115 :param parent_config_path: Absolute path of the config file containing the include
116 :return: Absolute, normalized path to the included config file
117 """
118 if os.path.isabs(include_path):
119 return os.path.abspath(include_path)
120 else:
121 parent_dir = os.path.dirname(parent_config_path)
122 return os.path.abspath(os.path.join(parent_dir, include_path))
123
124
125def _merge_profiles(
126 base_profiles: dict[str, Any],
127 new_profiles: dict[str, Any],
128 base_path: str,
129 new_path: str,
130) -> dict[str, Any]:
131 """
132 Merge profiles from two config files with conflict detection.
133
134 Profiles with the same name are allowed if their definitions are identical (idempotent).
135 If a profile name exists in both configs with different definitions, an error is raised.
136
137 NOTE: This function performs SHALLOW merging - each profile is treated as a complete unit.
138 We cannot use merge_dictionaries_no_overwrite() here because it would recursively merge
139 profile fields, which could lead to unintended behavior. For example:
140
141 base_config:
142 profiles:
143 my-profile:
144 storage_provider:
145 type: s3
146 options:
147 base_path: bucket
148
149 new_config:
150 profiles:
151 my-profile:
152 credentials_provider:
153 type: S3Credentials
154 options:
155 access_key: foo
156 secret_key: bar
157
158 If we used merge_dictionaries_no_overwrite(), the result would be a profile with BOTH storage_provider
159 and credentials_provider, which might not be the intended configuration.
160
161 :param base_profiles: Profiles from the base config
162 :param new_profiles: Profiles from the new config to merge
163 :param base_path: Path to the base config file (for error messages)
164 :param new_path: Path to the new config file (for error messages)
165 :return: Merged profiles dictionary
166 :raises ValueError: If any profile name exists in both configs with different definitions
167 """
168 result = base_profiles.copy()
169 conflicting_profiles = []
170
171 for profile_name, new_profile_def in new_profiles.items():
172 if profile_name in result:
173 if result[profile_name] != new_profile_def:
174 conflicting_profiles.append(profile_name)
175 else:
176 result[profile_name] = new_profile_def
177
178 if conflicting_profiles:
179 conflicts_list = ", ".join(f"'{p}'" for p in sorted(conflicting_profiles))
180 raise ValueError(f"Profile conflict: {conflicts_list} defined differently in {base_path} and {new_path}")
181
182 return result
183
184
185def _merge_opentelemetry(
186 base_otel: dict[str, Any],
187 new_otel: dict[str, Any],
188 base_path: str,
189 new_path: str,
190) -> dict[str, Any]:
191 """
192 Merge opentelemetry configurations with special handling for metrics.attributes.
193
194 Merge strategy:
195 - metrics.attributes: Concatenate arrays (preserve order for fallback mechanism)
196 - Other fields: Use idempotent check (must be identical)
197
198 Attributes providers are concatenated to support fallback scenarios where multiple
199 providers of the same type supply the same attribute key with different sources.
200 The order matters - later providers override earlier ones.
201
202 :param base_otel: Base opentelemetry config
203 :param new_otel: New opentelemetry config to merge
204 :param base_path: Path to base config file (for error messages)
205 :param new_path: Path to new config file (for error messages)
206 :return: Merged opentelemetry configuration
207 :raises ValueError: If conflicts are detected
208 """
209 # Merge metrics.attributes
210 base_attrs = base_otel.get("metrics", {}).get("attributes", [])
211 new_attrs = new_otel.get("metrics", {}).get("attributes", [])
212 merged_attrs = base_attrs + new_attrs
213
214 # Merge other opentelemetry fields
215 base_otel_without_attrs = copy.deepcopy(base_otel)
216 if "metrics" in base_otel_without_attrs and "attributes" in base_otel_without_attrs["metrics"]:
217 del base_otel_without_attrs["metrics"]["attributes"]
218
219 new_otel_without_attrs = copy.deepcopy(new_otel)
220 if "metrics" in new_otel_without_attrs and "attributes" in new_otel_without_attrs["metrics"]:
221 del new_otel_without_attrs["metrics"]["attributes"]
222
223 merged, conflicts = merge_dictionaries_no_overwrite(
224 base_otel_without_attrs, new_otel_without_attrs, allow_idempotent=True
225 )
226
227 if conflicts:
228 conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
229 raise ValueError(
230 f"opentelemetry config conflict: {conflicts_list} defined differently in {base_path} and {new_path}"
231 )
232
233 if "metrics" in merged:
234 merged["metrics"]["attributes"] = merged_attrs
235 elif merged_attrs:
236 merged["metrics"] = {"attributes": merged_attrs}
237
238 return merged
239
240
241def _merge_configs(
242 base_config: dict[str, Any],
243 new_config: dict[str, Any],
244 base_path: str,
245 new_path: str,
246) -> dict[str, Any]:
247 """
248 Merge two config dictionaries with field-specific strategies.
249
250 Different config fields have different merge strategies:
251 - profiles: Shallow merge with idempotent check (uses _merge_profiles)
252 - path_mapping: Flat dict merge with idempotent check
253 - experimental_features: Flat dict merge with idempotent check
254 - opentelemetry: Hybrid merge (attributes concatenate, others idempotent)
255 - cache, posix: Global configs, idempotent if identical, error if different
256
257 :param base_config: Base configuration dictionary
258 :param new_config: New configuration to merge
259 :param base_path: Path to base config file (for error messages)
260 :param new_path: Path to new config file (for error messages)
261 :return: Merged configuration dictionary
262 :raises ValueError: If conflicts are detected
263 """
264 result = {}
265
266 all_keys = set(base_config.keys()) | set(new_config.keys())
267
268 for key in all_keys:
269 base_value = base_config.get(key)
270 new_value = new_config.get(key)
271
272 # Key only in base
273 if key not in new_config:
274 result[key] = base_value
275 continue
276
277 # Key only in new
278 if key not in base_config:
279 result[key] = new_value
280 continue
281
282 # Key in both - need to merge or detect conflict
283 if key == "profiles":
284 result["profiles"] = _merge_profiles(base_value or {}, new_value or {}, base_path, new_path)
285
286 elif key in ("path_mapping", "experimental_features"):
287 merged, conflicts = merge_dictionaries_no_overwrite(
288 (base_value or {}).copy(), new_value or {}, allow_idempotent=True
289 )
290 if conflicts:
291 conflicts_list = ", ".join(f"'{k}'" for k in sorted(conflicts))
292 raise ValueError(
293 f"Config merge conflict: {conflicts_list} have different values in {base_path} and {new_path}"
294 )
295 result[key] = merged
296
297 elif key == "opentelemetry":
298 result["opentelemetry"] = _merge_opentelemetry(base_value or {}, new_value or {}, base_path, new_path)
299
300 elif key in ("cache", "posix"):
301 if base_value != new_value:
302 raise ValueError(f"'{key}' defined differently in {base_path} and {new_path}")
303 result[key] = base_value
304
305 elif key == "include":
306 # 'include' is processed by _load_and_merge_includes, not part of final config
307 pass
308
309 else:
310 # This should never happen and all top level fields must have explicit handling above
311 raise ValueError(f"Unknown field '{key}' in config file.")
312
313 return result
314
315
316def _load_and_merge_includes(
317 main_config_path: str,
318 main_config_dict: dict[str, Any],
319) -> dict[str, Any]:
320 """
321 Load and merge included config files.
322
323 Processes the 'include' directive in the main config, loading and merging all
324 specified config files. Only supports one level of includes - included files
325 cannot themselves have 'include' directives.
326
327 :param main_config_path: Absolute path to the main config file
328 :param main_config_dict: Dictionary loaded from the main config file
329 :return: Merged configuration dictionary (without 'include' field)
330 :raises ValueError: If include file not found, malformed, or contains nested includes
331 """
332 include_paths = main_config_dict.get("include", [])
333
334 if not include_paths:
335 return {k: v for k, v in main_config_dict.items() if k != "include"}
336
337 merged_config = {k: v for k, v in main_config_dict.items() if k != "include"}
338
339 for include_path in include_paths:
340 resolved_path = _resolve_include_path(include_path, main_config_path)
341
342 if not os.path.exists(resolved_path):
343 raise ValueError(f"Included config file not found: {resolved_path} (from {main_config_path})")
344
345 try:
346 with open(resolved_path) as f:
347 if resolved_path.endswith(".json"):
348 included_config = json.load(f)
349 else:
350 included_config = yaml.safe_load(f)
351 except Exception as e:
352 raise ValueError(f"Failed to load included config {resolved_path}: {e}")
353
354 validate_config(included_config)
355 if "include" in included_config:
356 raise ValueError(f"Nested includes not allowed: {resolved_path} contains 'include' directive")
357
358 merged_config = _merge_configs(merged_config, included_config, main_config_path, resolved_path)
359
360 return merged_config
361
362
363def _find_config_file_paths() -> tuple[str]:
364 """
365 Get configuration file search paths.
366
367 Returns paths in order of precedence:
368
369 1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
370 2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
371 """
372 paths = []
373
374 # 1. User-specific configuration directory
375 xdg_config_home = os.getenv("XDG_CONFIG_HOME")
376
377 if xdg_config_home:
378 paths.extend(
379 [
380 os.path.join(xdg_config_home, "msc", "config.yaml"),
381 os.path.join(xdg_config_home, "msc", "config.json"),
382 ]
383 )
384
385 user_home = os.getenv("HOME")
386
387 if user_home:
388 paths.extend(
389 [
390 os.path.join(user_home, ".msc_config.yaml"),
391 os.path.join(user_home, ".msc_config.json"),
392 os.path.join(user_home, ".config", "msc", "config.yaml"),
393 os.path.join(user_home, ".config", "msc", "config.json"),
394 ]
395 )
396
397 # 2. System-wide configuration directories
398 xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
399 if not xdg_config_dirs:
400 xdg_config_dirs = "/etc/xdg"
401
402 for config_dir in xdg_config_dirs.split(":"):
403 config_dir = config_dir.strip()
404 if config_dir:
405 paths.extend(
406 [
407 os.path.join(config_dir, "msc", "config.yaml"),
408 os.path.join(config_dir, "msc", "config.json"),
409 ]
410 )
411
412 paths.extend(
413 [
414 "/etc/msc_config.yaml",
415 "/etc/msc_config.json",
416 ]
417 )
418
419 return tuple(paths)
420
421
422def _normalize_profile_name(profile: str, config_dict: dict[str, Any]) -> str:
423 """
424 Normalize the profile name to the reserved POSIX profile name if the legacy "default" POSIX profile is used.
425
426 :param profile: The profile name to normalize
427 :param config_dict: The configuration dictionary
428 :return: The normalized profile name
429 """
430 if profile == LEGACY_POSIX_PROFILE_NAME and profile not in config_dict.get("profiles", {}):
431 logger.warning(
432 f"The profile name '{LEGACY_POSIX_PROFILE_NAME}' is deprecated and will be removed in a future version. Please use '{RESERVED_POSIX_PROFILE_NAME}' instead."
433 )
434 return RESERVED_POSIX_PROFILE_NAME
435 return profile
436
437
438PACKAGE_NAME = "multistorageclient"
439
440logger = logging.getLogger(__name__)
441
442
443class ImmutableDict(dict):
444 """
445 Immutable dictionary that raises an error when attempting to modify it.
446 """
447
448 def __init__(self, *args, **kwargs):
449 super().__init__(*args, **kwargs)
450
451 # Recursively freeze nested structures
452 for key, value in list(super().items()):
453 if isinstance(value, dict) and not isinstance(value, ImmutableDict):
454 super().__setitem__(key, ImmutableDict(value))
455 elif isinstance(value, list):
456 super().__setitem__(key, self._freeze_list(value))
457
458 @staticmethod
459 def _freeze_list(lst):
460 """
461 Convert list to tuple, freezing nested dicts recursively.
462 """
463 frozen = []
464 for item in lst:
465 if isinstance(item, dict):
466 frozen.append(ImmutableDict(item))
467 elif isinstance(item, list):
468 frozen.append(ImmutableDict._freeze_list(item))
469 else:
470 frozen.append(item)
471 return tuple(frozen)
472
473 def __deepcopy__(self, memo):
474 """
475 Return a regular mutable dict when deepcopy is called.
476 """
477 return copy.deepcopy(dict(self), memo)
478
479 def __reduce__(self):
480 """
481 Support for pickle serialization.
482 """
483 return (self.__class__, (dict(self),))
484
485 def _copy_value(self, value):
486 """
487 Convert frozen structures back to mutable equivalents.
488 """
489 if isinstance(value, ImmutableDict):
490 return {k: self._copy_value(v) for k, v in value.items()}
491 elif isinstance(value, tuple):
492 # Check if it was originally a list (frozen by _freeze_list)
493 return [self._copy_value(item) for item in value]
494 else:
495 return value
496
497 def __getitem__(self, key):
498 """
499 Return a mutable copy of the value.
500 """
501 value = super().__getitem__(key)
502 return self._copy_value(value)
503
504 def get(self, key, default=None):
505 """
506 Return a mutable copy of the value.
507 """
508 return self[key] if key in self else default
509
510 def __setitem__(self, key, value):
511 raise TypeError("ImmutableDict is immutable")
512
513 def __delitem__(self, key):
514 raise TypeError("ImmutableDict is immutable")
515
516 def clear(self):
517 raise TypeError("ImmutableDict is immutable")
518
519 def pop(self, *args):
520 raise TypeError("ImmutableDict is immutable")
521
522 def popitem(self):
523 raise TypeError("ImmutableDict is immutable")
524
525 def setdefault(self, key, default=None):
526 raise TypeError("ImmutableDict is immutable")
527
528 def update(self, *args, **kwargs):
529 raise TypeError("ImmutableDict is immutable")
530
531
532class SimpleProviderBundle(ProviderBundle):
533 def __init__(
534 self,
535 storage_provider_config: StorageProviderConfig,
536 credentials_provider: Optional[CredentialsProvider] = None,
537 metadata_provider: Optional[MetadataProvider] = None,
538 replicas: Optional[list[Replica]] = None,
539 ):
540 if replicas is None:
541 replicas = []
542
543 self._storage_provider_config = storage_provider_config
544 self._credentials_provider = credentials_provider
545 self._metadata_provider = metadata_provider
546 self._replicas = replicas
547
548 @property
549 def storage_provider_config(self) -> StorageProviderConfig:
550 return self._storage_provider_config
551
552 @property
553 def credentials_provider(self) -> Optional[CredentialsProvider]:
554 return self._credentials_provider
555
556 @property
557 def metadata_provider(self) -> Optional[MetadataProvider]:
558 return self._metadata_provider
559
560 @property
561 def replicas(self) -> list[Replica]:
562 return self._replicas
563
564
565class SimpleProviderBundleV2(ProviderBundleV2):
566 def __init__(
567 self, storage_backends: dict[str, StorageBackend], metadata_provider: Optional[MetadataProvider] = None
568 ):
569 self._storage_backends = storage_backends
570 self._metadata_provider = metadata_provider
571
572 @staticmethod
573 def from_v1_bundle(profile_name: str, v1_bundle: ProviderBundle) -> "SimpleProviderBundleV2":
574 backend = StorageBackend(
575 storage_provider_config=v1_bundle.storage_provider_config,
576 credentials_provider=v1_bundle.credentials_provider,
577 replicas=v1_bundle.replicas,
578 )
579
580 return SimpleProviderBundleV2(
581 storage_backends={profile_name: backend},
582 metadata_provider=v1_bundle.metadata_provider,
583 )
584
585 @property
586 def storage_backends(self) -> dict[str, StorageBackend]:
587 return self._storage_backends
588
589 @property
590 def metadata_provider(self) -> Optional[MetadataProvider]:
591 return self._metadata_provider
592
593
594DEFAULT_CACHE_REFRESH_INTERVAL = 300
595
596
597class StorageClientConfigLoader:
598 _provider_bundle: ProviderBundleV2
599 _resolved_config_dict: dict[str, Any]
600 _profiles: dict[str, Any]
601 _profile: str
602 _profile_dict: dict[str, Any]
603 _opentelemetry_dict: Optional[dict[str, Any]]
604 _telemetry_provider: Optional[Callable[[], Telemetry]]
605 _cache_dict: Optional[dict[str, Any]]
606
607 def __init__(
608 self,
609 config_dict: dict[str, Any],
610 profile: str = RESERVED_POSIX_PROFILE_NAME,
611 provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]] = None,
612 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
613 ) -> None:
614 """
615 Initializes a :py:class:`StorageClientConfigLoader` to create a
616 StorageClientConfig. Components are built using the ``config_dict`` and
617 profile, but a pre-built provider_bundle takes precedence.
618
619 :param config_dict: Dictionary of configuration options.
620 :param profile: Name of profile in ``config_dict`` to use to build configuration.
621 :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle` or :py:class:`multistorageclient.types.ProviderBundleV2`, takes precedence over ``config_dict``.
622 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
623 """
624 # Interpolates all environment variables into actual values.
625 config_dict = expand_env_vars(config_dict)
626 self._resolved_config_dict = ImmutableDict(config_dict)
627
628 self._profiles = config_dict.get("profiles", {})
629
630 if RESERVED_POSIX_PROFILE_NAME not in self._profiles:
631 # Assign the default POSIX profile
632 self._profiles[RESERVED_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][RESERVED_POSIX_PROFILE_NAME]
633 else:
634 # Cannot override default POSIX profile
635 if (
636 self._profiles[RESERVED_POSIX_PROFILE_NAME]
637 != DEFAULT_POSIX_PROFILE["profiles"][RESERVED_POSIX_PROFILE_NAME]
638 ):
639 raise ValueError(f'Cannot override "{RESERVED_POSIX_PROFILE_NAME}" profile with different settings.')
640
641 profile_dict = self._profiles.get(profile)
642
643 if not profile_dict:
644 raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
645
646 self._profile = profile
647 self._profile_dict = ImmutableDict(profile_dict)
648
649 self._opentelemetry_dict = config_dict.get("opentelemetry", None)
650 # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
651 # New processes (e.g. multiprocessing manager server) can't be created during this phase.
652 #
653 # Pass thunks everywhere instead for lazy telemetry initialization.
654 self._telemetry_provider = telemetry_provider or telemetry_init
655
656 self._cache_dict = config_dict.get("cache", None)
657 self._experimental_features = config_dict.get("experimental_features", None)
658
659 self._provider_bundle = self._build_provider_bundle(provider_bundle)
660 self._inject_profiles_from_bundle()
661
662 def _build_storage_provider(
663 self,
664 storage_provider_name: str,
665 storage_options: Optional[dict[str, Any]] = None,
666 credentials_provider: Optional[CredentialsProvider] = None,
667 ) -> StorageProvider:
668 if storage_options is None:
669 storage_options = {}
670 if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
671 raise ValueError(
672 f"Storage provider {storage_provider_name} is not supported. "
673 f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
674 )
675 if credentials_provider:
676 storage_options["credentials_provider"] = credentials_provider
677 if self._resolved_config_dict is not None:
678 # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
679 storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
680 if self._telemetry_provider is not None:
681 storage_options["telemetry_provider"] = self._telemetry_provider
682 class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
683 module_name = ".providers"
684 cls = import_class(class_name, module_name, PACKAGE_NAME)
685 return cls(**storage_options)
686
687 def _build_storage_provider_from_profile(self, storage_provider_profile: str):
688 storage_profile_dict = self._profiles.get(storage_provider_profile)
689 if not storage_profile_dict:
690 raise ValueError(
691 f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
692 )
693
694 # Check if metadata provider is configured for this profile
695 # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
696 local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
697 if local_metadata_provider_dict:
698 raise ValueError(
699 f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
700 )
701
702 # Initialize CredentialsProvider
703 local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
704 local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
705
706 # Initialize StorageProvider
707 local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
708 if local_storage_provider_dict:
709 local_name = local_storage_provider_dict["type"]
710 local_storage_options = local_storage_provider_dict.get("options", {})
711 else:
712 raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
713
714 storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
715 return storage_provider
716
717 def _build_credentials_provider(
718 self,
719 credentials_provider_dict: Optional[dict[str, Any]],
720 storage_options: Optional[dict[str, Any]] = None,
721 ) -> Optional[CredentialsProvider]:
722 """
723 Initializes the CredentialsProvider based on the provided dictionary.
724
725 Args:
726 credentials_provider_dict: Dictionary containing credentials provider configuration
727 storage_options: Storage provider options required by some credentials providers to scope the credentials.
728 """
729 if not credentials_provider_dict:
730 return None
731
732 if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
733 # Fully qualified class path case
734 class_type = credentials_provider_dict["type"]
735 module_name, class_name = class_type.rsplit(".", 1)
736 cls = import_class(class_name, module_name)
737 else:
738 # Mapped class name case
739 class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
740 module_name = ".providers"
741 cls = import_class(class_name, module_name, PACKAGE_NAME)
742
743 # Propagate storage provider options to credentials provider since they may be
744 # required by some credentials providers to scope the credentials.
745 import inspect
746
747 init_params = list(inspect.signature(cls.__init__).parameters)[1:] # skip 'self'
748 options = credentials_provider_dict.get("options", {})
749 if storage_options:
750 for storage_provider_option in storage_options.keys():
751 if storage_provider_option in init_params and storage_provider_option not in options:
752 options[storage_provider_option] = storage_options[storage_provider_option]
753
754 return cls(**options)
755
756 def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
757 # Initialize StorageProvider
758 storage_provider_dict = profile_dict.get("storage_provider", None)
759 if storage_provider_dict:
760 storage_provider_name = storage_provider_dict["type"]
761 storage_options = storage_provider_dict.get("options", {})
762 else:
763 raise ValueError("Missing storage_provider in the config.")
764
765 # Initialize CredentialsProvider
766 # It is prudent to assume that in some cases, the credentials provider
767 # will provide credentials scoped to specific base_path.
768 # So we need to pass the storage_options to the credentials provider.
769 credentials_provider_dict = profile_dict.get("credentials_provider", None)
770 credentials_provider = self._build_credentials_provider(
771 credentials_provider_dict=credentials_provider_dict,
772 storage_options=storage_options,
773 )
774
775 # Initialize MetadataProvider
776 metadata_provider_dict = profile_dict.get("metadata_provider", None)
777 metadata_provider = None
778 if metadata_provider_dict:
779 if metadata_provider_dict["type"] == "manifest":
780 metadata_options = metadata_provider_dict.get("options", {})
781 # If MetadataProvider has a reference to a different storage provider profile
782 storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
783 if storage_provider_profile:
784 storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
785 else:
786 storage_provider = self._build_storage_provider(
787 storage_provider_name, storage_options, credentials_provider
788 )
789
790 metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
791 else:
792 class_type = metadata_provider_dict["type"]
793 if "." not in class_type:
794 raise ValueError(
795 f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
796 )
797 module_name, class_name = class_type.rsplit(".", 1)
798 cls = import_class(class_name, module_name)
799 options = metadata_provider_dict.get("options", {})
800 metadata_provider = cls(**options)
801
802 # Build replicas if configured
803 replicas_config = profile_dict.get("replicas", [])
804 replicas = []
805 if replicas_config:
806 for replica_dict in replicas_config:
807 replicas.append(
808 Replica(
809 replica_profile=replica_dict["replica_profile"],
810 read_priority=replica_dict["read_priority"],
811 )
812 )
813
814 # Sort replicas by read_priority
815 replicas.sort(key=lambda r: r.read_priority)
816
817 return SimpleProviderBundle(
818 storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
819 credentials_provider=credentials_provider,
820 metadata_provider=metadata_provider,
821 replicas=replicas,
822 )
823
824 def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
825 class_type = provider_bundle_dict["type"]
826 module_name, class_name = class_type.rsplit(".", 1)
827 cls = import_class(class_name, module_name)
828 options = provider_bundle_dict.get("options", {})
829 return cls(**options)
830
831 def _build_provider_bundle(
832 self, provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]]
833 ) -> ProviderBundleV2:
834 if provider_bundle:
835 bundle = provider_bundle
836 else:
837 provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
838 if provider_bundle_dict:
839 bundle = self._build_provider_bundle_from_extension(provider_bundle_dict)
840 else:
841 bundle = self._build_provider_bundle_from_config(self._profile_dict)
842
843 if isinstance(bundle, ProviderBundle) and not isinstance(bundle, ProviderBundleV2):
844 bundle = SimpleProviderBundleV2.from_v1_bundle(self._profile, bundle)
845
846 return bundle
847
848 def _inject_profiles_from_bundle(self) -> None:
849 """
850 Inject child profiles and build child configs for multi-backend configurations.
851
852 For ProviderBundleV2 with multiple backends, this method:
853 1. Injects child profiles into config dict (needed for replica initialization)
854 2. Pre-builds child configs so CompositeStorageClient can use them directly
855
856 Profile injection is required because SingleStorageClient._initialize_replicas()
857 uses StorageClientConfig.from_dict() to create replica clients, which looks up
858 profiles in the config dict.
859 """
860 self._child_configs: Optional[dict[str, StorageClientConfig]] = None
861
862 backends = self._provider_bundle.storage_backends
863 if len(backends) > 1:
864 # First, inject all child profiles into config dict (needed for replica lookup)
865 profiles = copy.deepcopy(self._profiles)
866 for child_name, backend in backends.items():
867 child_profile_dict = {
868 "storage_provider": {
869 "type": backend.storage_provider_config.type,
870 "options": backend.storage_provider_config.options,
871 }
872 }
873
874 if child_name in profiles:
875 # Profile already exists - check if it matches what we would inject
876 existing = profiles[child_name]
877 if existing.get("storage_provider") != child_profile_dict["storage_provider"]:
878 raise ValueError(
879 f"Profile '{child_name}' already exists in configuration with different settings."
880 )
881 else:
882 profiles[child_name] = child_profile_dict
883
884 # Update config dict BEFORE building child configs
885 resolved_config_dict = copy.deepcopy(self._resolved_config_dict)
886 resolved_config_dict["profiles"] = profiles
887 self._profiles = ImmutableDict(profiles)
888 self._resolved_config_dict = ImmutableDict(resolved_config_dict)
889
890 # Now build child configs (they will get the updated config dict)
891 retry_config = self._build_retry_config()
892 child_configs: dict[str, StorageClientConfig] = {}
893 for child_name, backend in backends.items():
894 storage_provider = self._build_storage_provider(
895 backend.storage_provider_config.type,
896 backend.storage_provider_config.options,
897 backend.credentials_provider,
898 )
899
900 child_config = StorageClientConfig(
901 profile=child_name,
902 storage_provider=storage_provider,
903 credentials_provider=backend.credentials_provider,
904 storage_provider_profiles=None,
905 child_configs=None,
906 metadata_provider=None,
907 cache_config=None,
908 cache_manager=None,
909 retry_config=retry_config,
910 telemetry_provider=self._telemetry_provider,
911 replicas=backend.replicas,
912 autocommit_config=None,
913 )
914 child_config._config_dict = self._resolved_config_dict
915 child_configs[child_name] = child_config
916
917 self._child_configs = child_configs
918
919 def _build_retry_config(self) -> RetryConfig:
920 """Build retry config from profile dict."""
921 retry_config_dict = self._profile_dict.get("retry", None)
922 if retry_config_dict:
923 attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
924 delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
925 backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
926 return RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
927 else:
928 return RetryConfig(
929 attempts=DEFAULT_RETRY_ATTEMPTS,
930 delay=DEFAULT_RETRY_DELAY,
931 backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
932 )
933
934 def _build_autocommit_config(self) -> AutoCommitConfig:
935 """Build autocommit config from profile dict."""
936 autocommit_dict = self._profile_dict.get("autocommit", None)
937 if autocommit_dict:
938 interval_minutes = autocommit_dict.get("interval_minutes", None)
939 at_exit = autocommit_dict.get("at_exit", False)
940 return AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
941 return AutoCommitConfig()
942
943 def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
944 if "size_mb" in cache_dict:
945 raise ValueError(
946 "The 'size_mb' property is no longer supported. \n"
947 "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
948 "Example configuration:\n"
949 "cache:\n"
950 " size: 500G # Optional: Maximum cache size (default: 10G)\n"
951 " cache_line_size: 64M # Optional: Chunk size for partial file caching (default: 64M)\n"
952 " check_source_version: true # Optional: Use ETag for cache validation (default: true)\n"
953 " location: /tmp/msc_cache # Optional: Cache directory path (default: system tempdir + '/msc_cache')\n"
954 " eviction_policy: # Optional: Cache eviction policy\n"
955 " policy: fifo # Optional: Policy type: lru, mru, fifo, random, no_eviction (default: fifo)\n"
956 " refresh_interval: 300 # Optional: Cache refresh interval in seconds (default: 300)\n"
957 )
958
959 # Validate that cache_line_size doesn't exceed cache size
960 cache_size_str = cache_dict.get("size", DEFAULT_CACHE_SIZE)
961 cache_line_size_str = cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE)
962
963 # Use CacheConfig to convert sizes to bytes for comparison
964 temp_cache_config = CacheConfig(
965 size=cache_size_str,
966 cache_line_size=cache_line_size_str,
967 )
968 cache_size_bytes = temp_cache_config.size_bytes()
969 cache_line_size_bytes = temp_cache_config.cache_line_size_bytes()
970
971 if cache_line_size_bytes > cache_size_bytes:
972 raise ValueError(
973 f"cache_line_size ({cache_line_size_str}) exceeds cache size ({cache_size_str}). "
974 f"cache_line_size must be less than or equal to cache size. "
975 f"Consider increasing cache size or decreasing cache_line_size."
976 )
977
978 def _validate_replicas(self, replicas: list[Replica]) -> None:
979 """
980 Validates that replica profiles do not have their own replicas configuration.
981
982 This prevents circular references where a replica profile could reference
983 another profile that also has replicas, creating an infinite loop.
984
985 :param replicas: The list of Replica objects to validate
986 :raises ValueError: If any replica profile has its own replicas configuration
987 """
988 for replica in replicas:
989 replica_profile_name = replica.replica_profile
990
991 # Check that replica profile is not the same as the current profile
992 if replica_profile_name == self._profile:
993 raise ValueError(
994 f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
995 )
996
997 # Check if the replica profile exists in the configuration
998 if replica_profile_name not in self._profiles:
999 raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
1000
1001 # Get the replica profile configuration
1002 replica_profile_dict = self._profiles[replica_profile_name]
1003
1004 # Check if the replica profile has its own replicas configuration
1005 if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
1006 raise ValueError(
1007 f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
1008 f"This creates a circular reference which is not allowed."
1009 )
1010
1011 # todo: remove once experimental features are stable
1012 def _validate_experimental_features(self, eviction_policy: EvictionPolicyConfig) -> None:
1013 """
1014 Validate that experimental features are enabled when used.
1015
1016 :param eviction_policy: The eviction policy configuration to validate
1017 :raises ValueError: If experimental features are used without being enabled
1018 """
1019 # Validate MRU eviction policy
1020 if eviction_policy.policy.upper() == "MRU":
1021 if not (self._experimental_features and self._experimental_features.get("cache_mru_eviction")):
1022 raise ValueError(
1023 "MRU eviction policy is experimental and not enabled.\n"
1024 "Enable it by adding to config:\n"
1025 " experimental_features:\n"
1026 " cache_mru_eviction: true"
1027 )
1028
1029 # Validate purge_factor
1030 if eviction_policy.purge_factor != 0:
1031 if eviction_policy.purge_factor < 0 or eviction_policy.purge_factor > 100:
1032 raise ValueError("purge_factor must be between 0 and 100")
1033
1034 if not (self._experimental_features and self._experimental_features.get("cache_purge_factor")):
1035 raise ValueError(
1036 "purge_factor is experimental and not enabled.\n"
1037 "Enable it by adding to config:\n"
1038 " experimental_features:\n"
1039 " cache_purge_factor: true"
1040 )
1041
1042 def build_config(self) -> "StorageClientConfig":
1043 bundle = self._provider_bundle
1044 backends = bundle.storage_backends
1045
1046 if len(backends) > 1:
1047 if not bundle.metadata_provider:
1048 raise ValueError(
1049 f"Multi-backend configuration for profile '{self._profile}' requires metadata_provider "
1050 "for routing between storage locations."
1051 )
1052
1053 child_profile_names = list(backends.keys())
1054 retry_config = self._build_retry_config()
1055 autocommit_config = self._build_autocommit_config()
1056
1057 # config for Composite StorageClient
1058 config = StorageClientConfig(
1059 profile=self._profile,
1060 storage_provider=None,
1061 credentials_provider=None,
1062 storage_provider_profiles=child_profile_names,
1063 child_configs=self._child_configs,
1064 metadata_provider=bundle.metadata_provider,
1065 cache_config=None,
1066 cache_manager=None,
1067 retry_config=retry_config,
1068 telemetry_provider=self._telemetry_provider,
1069 replicas=[],
1070 autocommit_config=autocommit_config,
1071 )
1072
1073 config._config_dict = self._resolved_config_dict
1074 return config
1075
1076 # Single-backend (len == 1)
1077 _, backend = list(backends.items())[0]
1078 storage_provider = self._build_storage_provider(
1079 backend.storage_provider_config.type,
1080 backend.storage_provider_config.options,
1081 backend.credentials_provider,
1082 )
1083 credentials_provider = backend.credentials_provider
1084 metadata_provider = bundle.metadata_provider
1085 replicas = backend.replicas
1086
1087 # Validate replicas to prevent circular references
1088 if replicas:
1089 self._validate_replicas(replicas)
1090
1091 cache_config: Optional[CacheConfig] = None
1092 cache_manager: Optional[CacheManager] = None
1093
1094 # Check if caching is enabled for this profile
1095 caching_enabled = self._profile_dict.get("caching_enabled", False)
1096
1097 if self._cache_dict is not None and caching_enabled:
1098 tempdir = tempfile.gettempdir()
1099 default_location = os.path.join(tempdir, "msc_cache")
1100 location = self._cache_dict.get("location", default_location)
1101
1102 # Check if cache_backend.cache_path is defined
1103 cache_backend = self._cache_dict.get("cache_backend", {})
1104 cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
1105
1106 # Warn if both location and cache_backend.cache_path are defined
1107 if cache_backend_path and self._cache_dict.get("location") is not None:
1108 logger.warning(
1109 f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
1110 f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
1111 )
1112 elif cache_backend_path:
1113 # Use cache_backend.cache_path only if location is not explicitly defined
1114 location = cache_backend_path
1115
1116 # Resolve the effective flag while preserving explicit ``False``.
1117 if "check_source_version" in self._cache_dict:
1118 check_source_version = self._cache_dict["check_source_version"]
1119 else:
1120 check_source_version = self._cache_dict.get("use_etag", True)
1121
1122 # Warn if both keys are specified – the new one wins.
1123 if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
1124 logger.warning(
1125 "Both 'check_source_version' and 'use_etag' are defined in cache config. "
1126 "Using 'check_source_version' and ignoring 'use_etag'."
1127 )
1128
1129 if not Path(location).is_absolute():
1130 raise ValueError(f"Cache location must be an absolute path: {location}")
1131
1132 # Initialize cache_dict with default values
1133 cache_dict = self._cache_dict
1134
1135 # Verify cache config
1136 self._verify_cache_config(cache_dict)
1137
1138 # Initialize eviction policy
1139 if "eviction_policy" in cache_dict:
1140 policy = cache_dict["eviction_policy"]["policy"].lower()
1141 purge_factor = cache_dict["eviction_policy"].get("purge_factor", 0)
1142 eviction_policy = EvictionPolicyConfig(
1143 policy=policy,
1144 refresh_interval=cache_dict["eviction_policy"].get(
1145 "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
1146 ),
1147 purge_factor=purge_factor,
1148 )
1149 else:
1150 eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
1151
1152 # todo: remove once experimental features are stable
1153 # Validate experimental features before creating cache_config
1154 self._validate_experimental_features(eviction_policy)
1155
1156 # Create cache config from the standardized format
1157 cache_config = CacheConfig(
1158 size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
1159 location=cache_dict.get("location", location),
1160 check_source_version=check_source_version,
1161 eviction_policy=eviction_policy,
1162 cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
1163 )
1164
1165 cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
1166 elif self._cache_dict is not None and not caching_enabled:
1167 logger.debug(f"Caching is disabled for profile '{self._profile}'")
1168 elif self._cache_dict is None and caching_enabled:
1169 logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
1170
1171 retry_config = self._build_retry_config()
1172 autocommit_config = self._build_autocommit_config()
1173
1174 config = StorageClientConfig(
1175 profile=self._profile,
1176 storage_provider=storage_provider,
1177 credentials_provider=credentials_provider,
1178 storage_provider_profiles=None,
1179 child_configs=None,
1180 metadata_provider=metadata_provider,
1181 cache_config=cache_config,
1182 cache_manager=cache_manager,
1183 retry_config=retry_config,
1184 telemetry_provider=self._telemetry_provider,
1185 replicas=replicas,
1186 autocommit_config=autocommit_config,
1187 )
1188
1189 config._config_dict = self._resolved_config_dict
1190 return config
1191
1192
1193class PathMapping:
1194 """
1195 Class to handle path mappings defined in the MSC configuration.
1196
1197 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1198 where entries are sorted by prefix length (longest first) for optimal matching.
1199 Longer paths take precedence when matching.
1200 """
1201
1202 def __init__(self):
1203 """Initialize an empty PathMapping."""
1204 self._mapping = defaultdict(lambda: defaultdict(list))
1205
1206 @classmethod
1207 def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
1208 """
1209 Create a PathMapping instance from configuration dictionary.
1210
1211 :param config_dict: Configuration dictionary, if None the config will be loaded
1212 :return: A PathMapping instance with processed mappings
1213 """
1214 if config_dict is None:
1215 # Import locally to avoid circular imports
1216 from multistorageclient.config import StorageClientConfig
1217
1218 config_dict, _ = StorageClientConfig.read_msc_config()
1219
1220 if not config_dict:
1221 return cls()
1222
1223 instance = cls()
1224 instance._load_mapping(config_dict)
1225 return instance
1226
1227 def _load_mapping(self, config_dict: dict[str, Any]) -> None:
1228 """
1229 Load path mapping from a configuration dictionary.
1230
1231 :param config_dict: Configuration dictionary containing path mapping
1232 """
1233 # Get the path_mapping section
1234 path_mapping = config_dict.get("path_mapping", {})
1235 if path_mapping is None:
1236 return
1237
1238 # Process each mapping
1239 for source_path, dest_path in path_mapping.items():
1240 # Validate format
1241 if not source_path.endswith("/"):
1242 continue
1243 if not dest_path.startswith(MSC_PROTOCOL):
1244 continue
1245 if not dest_path.endswith("/"):
1246 continue
1247
1248 # Extract the destination profile
1249 pr_dest = urlparse(dest_path)
1250 dest_profile = pr_dest.netloc
1251
1252 # Parse the source path
1253 pr = urlparse(source_path)
1254 protocol = pr.scheme.lower() if pr.scheme else "file"
1255
1256 if protocol == "file" or source_path.startswith("/"):
1257 # For file or absolute paths, use the whole path as the prefix
1258 # and leave bucket empty
1259 bucket = ""
1260 prefix = source_path if source_path.startswith("/") else pr.path
1261 else:
1262 # For object storage, extract bucket and prefix
1263 bucket = pr.netloc
1264 prefix = pr.path
1265 if prefix.startswith("/"):
1266 prefix = prefix[1:]
1267
1268 # Add the mapping to the nested dict
1269 self._mapping[protocol][bucket].append((prefix, dest_profile))
1270
1271 # Sort each bucket's prefixes by length (longest first) for optimal matching
1272 for protocol, buckets in self._mapping.items():
1273 for bucket, prefixes in buckets.items():
1274 self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
1275
1276 def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
1277 """
1278 Find the best matching mapping for the given URL.
1279
1280 :param url: URL to find matching mapping for
1281 :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
1282 """
1283 # Parse the URL
1284 pr = urlparse(url)
1285 protocol = pr.scheme.lower() if pr.scheme else "file"
1286
1287 # For file paths or absolute paths
1288 if protocol == "file" or url.startswith("/"):
1289 path = url if url.startswith("/") else pr.path
1290
1291 possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
1292
1293 # Check each prefix (already sorted by length, longest first)
1294 for prefix, profile in possible_mapping:
1295 if path.startswith(prefix):
1296 # Calculate the relative path
1297 rel_path = path[len(prefix) :]
1298 if not rel_path.startswith("/"):
1299 rel_path = "/" + rel_path
1300 return profile, rel_path
1301
1302 return None
1303
1304 # For object storage
1305 bucket = pr.netloc
1306 path = pr.path
1307 if path.startswith("/"):
1308 path = path[1:]
1309
1310 # Check bucket-specific mapping
1311 possible_mapping = (
1312 self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
1313 )
1314
1315 # Check each prefix (already sorted by length, longest first)
1316 for prefix, profile in possible_mapping:
1317 # matching prefix
1318 if path.startswith(prefix):
1319 rel_path = path[len(prefix) :]
1320 # Remove leading slash if present
1321 if rel_path.startswith("/"):
1322 rel_path = rel_path[1:]
1323
1324 return profile, rel_path
1325
1326 return None
1327
1328
[docs]
1329class StorageClientConfig:
1330 """
1331 Configuration class for the :py:class:`multistorageclient.StorageClient`.
1332 """
1333
1334 profile: str
1335 storage_provider: Optional[StorageProvider]
1336 credentials_provider: Optional[CredentialsProvider]
1337 storage_provider_profiles: Optional[list[str]]
1338 child_configs: Optional[dict[str, "StorageClientConfig"]]
1339 metadata_provider: Optional[MetadataProvider]
1340 cache_config: Optional[CacheConfig]
1341 cache_manager: Optional[CacheManager]
1342 retry_config: Optional[RetryConfig]
1343 telemetry_provider: Optional[Callable[[], Telemetry]]
1344 replicas: list[Replica]
1345 autocommit_config: Optional[AutoCommitConfig]
1346
1347 _config_dict: Optional[dict[str, Any]]
1348
1349 def __init__(
1350 self,
1351 profile: str,
1352 storage_provider: Optional[StorageProvider] = None,
1353 credentials_provider: Optional[CredentialsProvider] = None,
1354 storage_provider_profiles: Optional[list[str]] = None,
1355 child_configs: Optional[dict[str, "StorageClientConfig"]] = None,
1356 metadata_provider: Optional[MetadataProvider] = None,
1357 cache_config: Optional[CacheConfig] = None,
1358 cache_manager: Optional[CacheManager] = None,
1359 retry_config: Optional[RetryConfig] = None,
1360 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1361 replicas: Optional[list[Replica]] = None,
1362 autocommit_config: Optional[AutoCommitConfig] = None,
1363 ):
1364 # exactly one of storage_provider or storage_provider_profiles must be set
1365 if storage_provider and storage_provider_profiles:
1366 raise ValueError(
1367 "Cannot specify both storage_provider and storage_provider_profiles. "
1368 "Use storage_provider for SingleStorageClient or storage_provider_profiles for CompositeStorageClient."
1369 )
1370 if not storage_provider and not storage_provider_profiles:
1371 raise ValueError("Must specify either storage_provider or storage_provider_profiles.")
1372
1373 if replicas is None:
1374 replicas = []
1375 self.profile = profile
1376 self.storage_provider = storage_provider
1377 self.credentials_provider = credentials_provider
1378 self.storage_provider_profiles = storage_provider_profiles
1379 self.child_configs = child_configs
1380 self.metadata_provider = metadata_provider
1381 self.cache_config = cache_config
1382 self.cache_manager = cache_manager
1383 self.retry_config = retry_config
1384 self.telemetry_provider = telemetry_provider
1385 self.replicas = replicas
1386 self.autocommit_config = autocommit_config
1387
[docs]
1388 @staticmethod
1389 def from_json(
1390 config_json: str,
1391 profile: str = RESERVED_POSIX_PROFILE_NAME,
1392 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1393 ) -> "StorageClientConfig":
1394 """
1395 Load a storage client configuration from a JSON string.
1396
1397 :param config_json: Configuration JSON string.
1398 :param profile: Profile to use.
1399 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1400 """
1401 config_dict = json.loads(config_json)
1402 return StorageClientConfig.from_dict(
1403 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
1404 )
1405
[docs]
1406 @staticmethod
1407 def from_yaml(
1408 config_yaml: str,
1409 profile: str = RESERVED_POSIX_PROFILE_NAME,
1410 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1411 ) -> "StorageClientConfig":
1412 """
1413 Load a storage client configuration from a YAML string.
1414
1415 :param config_yaml: Configuration YAML string.
1416 :param profile: Profile to use.
1417 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1418 """
1419 config_dict = yaml.safe_load(config_yaml) or {}
1420 return StorageClientConfig.from_dict(
1421 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
1422 )
1423
[docs]
1424 @staticmethod
1425 def from_dict(
1426 config_dict: dict[str, Any],
1427 profile: str = RESERVED_POSIX_PROFILE_NAME,
1428 skip_validation: bool = False,
1429 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1430 ) -> "StorageClientConfig":
1431 """
1432 Load a storage client configuration from a Python dictionary.
1433
1434 :param config_dict: Configuration Python dictionary.
1435 :param profile: Profile to use.
1436 :param skip_validation: Skip configuration schema validation.
1437 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1438 """
1439 # Validate the config file with predefined JSON schema
1440 if not skip_validation:
1441 validate_config(config_dict)
1442
1443 # Load config
1444 loader = StorageClientConfigLoader(
1445 config_dict=config_dict,
1446 profile=_normalize_profile_name(profile, config_dict),
1447 telemetry_provider=telemetry_provider,
1448 )
1449 config = loader.build_config()
1450
1451 return config
1452
[docs]
1453 @staticmethod
1454 def from_file(
1455 config_file_paths: Optional[Iterable[str]] = None,
1456 profile: str = RESERVED_POSIX_PROFILE_NAME,
1457 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1458 ) -> "StorageClientConfig":
1459 """
1460 Load a storage client configuration from the first file found.
1461
1462 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`).
1463 :param profile: Profile to use.
1464 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1465 """
1466 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths)
1467 # Parse rclone config file.
1468 rclone_config_dict, rclone_config_file = read_rclone_config()
1469
1470 # Merge config files.
1471 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict)
1472 if conflicted_keys:
1473 raise ValueError(
1474 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}'
1475 )
1476 merged_profiles = merged_config.get("profiles", {})
1477
1478 # Check if profile is in merged_profiles
1479 if profile in merged_profiles:
1480 return StorageClientConfig.from_dict(
1481 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider
1482 )
1483 else:
1484 # Check if profile is the default POSIX profile or an implicit profile
1485 if profile == RESERVED_POSIX_PROFILE_NAME or profile == LEGACY_POSIX_PROFILE_NAME:
1486 implicit_profile_config = DEFAULT_POSIX_PROFILE
1487 elif profile.startswith("_"):
1488 # Handle implicit profiles
1489 parts = profile[1:].split("-", 1)
1490 if len(parts) == 2:
1491 protocol, bucket = parts
1492 # Verify it's a supported protocol
1493 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS:
1494 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"')
1495 implicit_profile_config = create_implicit_profile_config(
1496 profile_name=profile, protocol=protocol, base_path=bucket
1497 )
1498 else:
1499 raise ValueError(f'Invalid implicit profile format: "{profile}"')
1500 else:
1501 raise ValueError(
1502 f'Profile "{profile}" not found in configuration files. Configuration was checked in '
1503 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. "
1504 f"Please verify that the profile exists and that configuration files are correctly located."
1505 )
1506 # merge the implicit profile config into the merged config so the cache & observability config can be inherited
1507 if "profiles" not in merged_config:
1508 merged_config["profiles"] = implicit_profile_config["profiles"]
1509 else:
1510 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile]
1511 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_"
1512 return StorageClientConfig.from_dict(
1513 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider
1514 )
1515
1516 @staticmethod
1517 def from_provider_bundle(
1518 config_dict: dict[str, Any],
1519 provider_bundle: Union[ProviderBundle, ProviderBundleV2],
1520 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1521 ) -> "StorageClientConfig":
1522 loader = StorageClientConfigLoader(
1523 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider
1524 )
1525 config = loader.build_config()
1526 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors
1527 return config
1528
[docs]
1529 @staticmethod
1530 def read_msc_config(
1531 config_file_paths: Optional[Iterable[str]] = None,
1532 ) -> tuple[Optional[dict[str, Any]], Optional[str]]:
1533 """Get the MSC configuration dictionary and the path of the first file found.
1534
1535 If no config paths are specified, configs are searched in the following order:
1536
1537 1. ``MSC_CONFIG`` environment variable (highest precedence)
1538 2. Standard search paths (user-specified config and system-wide config)
1539
1540 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used.
1541 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration
1542 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute
1543 path of the config file used, or ``None`` if no config file was found.
1544 """
1545 config_dict: dict[str, Any] = {}
1546 config_file_path: Optional[str] = None
1547
1548 config_file_paths = list(config_file_paths or [])
1549
1550 # Add default paths if none provided.
1551 if len(config_file_paths) == 0:
1552 # Environment variable.
1553 msc_config_env = os.getenv("MSC_CONFIG", None)
1554 if msc_config_env is not None:
1555 config_file_paths.append(msc_config_env)
1556
1557 # Standard search paths.
1558 config_file_paths.extend(_find_config_file_paths())
1559
1560 # Normalize + absolutize paths.
1561 config_file_paths = [os.path.abspath(path) for path in config_file_paths]
1562
1563 # Log plan.
1564 logger.debug(f"Searching MSC config file paths: {config_file_paths}")
1565
1566 # Load config.
1567 for path in config_file_paths:
1568 if os.path.exists(path):
1569 try:
1570 with open(path) as f:
1571 if path.endswith(".json"):
1572 config_dict = json.load(f)
1573 else:
1574 config_dict = yaml.safe_load(f)
1575 config_file_path = path
1576 # Use the first config file.
1577 break
1578 except Exception as e:
1579 raise ValueError(f"malformed MSC config file: {path}, exception: {e}")
1580
1581 # Log result.
1582 if config_file_path is None:
1583 logger.debug("No MSC config files found in any of the search locations.")
1584 else:
1585 logger.debug(f"Using MSC config file: {config_file_path}")
1586
1587 if config_dict:
1588 validate_config(config_dict)
1589
1590 if "include" in config_dict and config_file_path:
1591 config_dict = _load_and_merge_includes(config_file_path, config_dict)
1592
1593 return config_dict, config_file_path
1594
[docs]
1595 @staticmethod
1596 def read_path_mapping() -> PathMapping:
1597 """
1598 Get the path mapping defined in the MSC configuration.
1599
1600 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1601 where entries are sorted by prefix length (longest first) for optimal matching.
1602 Longer paths take precedence when matching.
1603
1604 :return: A PathMapping instance with translation mappings
1605 """
1606 try:
1607 return PathMapping.from_config()
1608 except Exception:
1609 # Log the error but continue - this shouldn't stop the application from working
1610 logger.error("Failed to load path_mapping from MSC config")
1611 return PathMapping()
1612
1613 def __getstate__(self) -> dict[str, Any]:
1614 state = self.__dict__.copy()
1615 if not state.get("_config_dict"):
1616 raise ValueError("StorageClientConfig is not serializable")
1617 del state["credentials_provider"]
1618 del state["storage_provider"]
1619 del state["metadata_provider"]
1620 del state["cache_manager"]
1621 del state["replicas"]
1622 del state["child_configs"]
1623 return state
1624
1625 def __setstate__(self, state: dict[str, Any]) -> None:
1626 # Presence checked by __getstate__.
1627 config_dict = state["_config_dict"]
1628 loader = StorageClientConfigLoader(
1629 config_dict=config_dict,
1630 profile=state["profile"],
1631 telemetry_provider=state["telemetry_provider"],
1632 )
1633 new_config = loader.build_config()
1634 self.profile = new_config.profile
1635 self.storage_provider = new_config.storage_provider
1636 self.credentials_provider = new_config.credentials_provider
1637 self.storage_provider_profiles = new_config.storage_provider_profiles
1638 self.child_configs = new_config.child_configs
1639 self.metadata_provider = new_config.metadata_provider
1640 self.cache_config = new_config.cache_config
1641 self.cache_manager = new_config.cache_manager
1642 self.retry_config = new_config.retry_config
1643 self.telemetry_provider = new_config.telemetry_provider
1644 self._config_dict = config_dict
1645 self.replicas = new_config.replicas
1646 self.autocommit_config = new_config.autocommit_config