1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import copy
17import json
18import logging
19import os
20import tempfile
21from collections import defaultdict
22from collections.abc import Callable
23from pathlib import Path
24from typing import Any, Iterable, Optional
25from urllib.parse import urlparse
26
27import yaml
28
29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
31from .providers.manifest_metadata import ManifestMetadataProvider
32from .rclone import read_rclone_config
33from .schema import validate_config
34from .telemetry import Telemetry
35from .telemetry import init as telemetry_init
36from .types import (
37 DEFAULT_RETRY_ATTEMPTS,
38 DEFAULT_RETRY_BACKOFF_MULTIPLIER,
39 DEFAULT_RETRY_DELAY,
40 MSC_PROTOCOL,
41 AutoCommitConfig,
42 CredentialsProvider,
43 MetadataProvider,
44 ProviderBundle,
45 Replica,
46 RetryConfig,
47 StorageProvider,
48 StorageProviderConfig,
49)
50from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
51
52# Constants related to implicit profiles
53SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
54PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
55 "s3": "s3",
56 "gs": "gcs",
57 "ais": "ais",
58 "file": "file",
59}
60
61
62# Template for creating implicit profile configurations
63def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
64 """
65 Create a configuration dictionary for an implicit profile.
66
67 :param profile_name: The name of the profile (e.g., "_s3-bucket1")
68 :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
69 :param base_path: The base path (e.g., bucket name) for the storage provider
70
71 :return: A configuration dictionary for the implicit profile
72 """
73 provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
74 return {
75 "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
76 }
77
78
79DEFAULT_POSIX_PROFILE_NAME = "default"
80DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
81
82STORAGE_PROVIDER_MAPPING = {
83 "file": "PosixFileStorageProvider",
84 "s3": "S3StorageProvider",
85 "gcs": "GoogleStorageProvider",
86 "oci": "OracleStorageProvider",
87 "azure": "AzureBlobStorageProvider",
88 "ais": "AIStoreStorageProvider",
89 "ais_s3": "AIStoreS3StorageProvider",
90 "s8k": "S8KStorageProvider",
91 "gcs_s3": "GoogleS3StorageProvider",
92 "huggingface": "HuggingFaceStorageProvider",
93}
94
95CREDENTIALS_PROVIDER_MAPPING = {
96 "S3Credentials": "StaticS3CredentialsProvider",
97 "AzureCredentials": "StaticAzureCredentialsProvider",
98 "AISCredentials": "StaticAISCredentialProvider",
99 "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
100 "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
101}
102
103
104def _find_config_file_paths() -> tuple[str]:
105 """
106 Get configuration file search paths.
107
108 Returns paths in order of precedence:
109
110 1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
111 2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
112 """
113 paths = []
114
115 # 1. User-specific configuration directory
116 xdg_config_home = os.getenv("XDG_CONFIG_HOME")
117
118 if xdg_config_home:
119 paths.extend(
120 [
121 os.path.join(xdg_config_home, "msc", "config.yaml"),
122 os.path.join(xdg_config_home, "msc", "config.json"),
123 ]
124 )
125
126 user_home = os.getenv("HOME")
127
128 if user_home:
129 paths.extend(
130 [
131 os.path.join(user_home, ".msc_config.yaml"),
132 os.path.join(user_home, ".msc_config.json"),
133 os.path.join(user_home, ".config", "msc", "config.yaml"),
134 os.path.join(user_home, ".config", "msc", "config.json"),
135 ]
136 )
137
138 # 2. System-wide configuration directories
139 xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
140 if not xdg_config_dirs:
141 xdg_config_dirs = "/etc/xdg"
142
143 for config_dir in xdg_config_dirs.split(":"):
144 config_dir = config_dir.strip()
145 if config_dir:
146 paths.extend(
147 [
148 os.path.join(config_dir, "msc", "config.yaml"),
149 os.path.join(config_dir, "msc", "config.json"),
150 ]
151 )
152
153 paths.extend(
154 [
155 "/etc/msc_config.yaml",
156 "/etc/msc_config.json",
157 ]
158 )
159
160 return tuple(paths)
161
162
163PACKAGE_NAME = "multistorageclient"
164
165logger = logging.getLogger(__name__)
166
167
168class ImmutableDict(dict):
169 """
170 Immutable dictionary that raises an error when attempting to modify it.
171 """
172
173 def __init__(self, *args, **kwargs):
174 super().__init__(*args, **kwargs)
175
176 # Recursively freeze nested structures
177 for key, value in list(super().items()):
178 if isinstance(value, dict) and not isinstance(value, ImmutableDict):
179 super().__setitem__(key, ImmutableDict(value))
180 elif isinstance(value, list):
181 super().__setitem__(key, self._freeze_list(value))
182
183 @staticmethod
184 def _freeze_list(lst):
185 """
186 Convert list to tuple, freezing nested dicts recursively.
187 """
188 frozen = []
189 for item in lst:
190 if isinstance(item, dict):
191 frozen.append(ImmutableDict(item))
192 elif isinstance(item, list):
193 frozen.append(ImmutableDict._freeze_list(item))
194 else:
195 frozen.append(item)
196 return tuple(frozen)
197
198 def __deepcopy__(self, memo):
199 """
200 Return a regular mutable dict when deepcopy is called.
201 """
202 return copy.deepcopy(dict(self), memo)
203
204 def _copy_value(self, value):
205 """
206 Convert frozen structures back to mutable equivalents.
207 """
208 if isinstance(value, ImmutableDict):
209 return {k: self._copy_value(v) for k, v in value.items()}
210 elif isinstance(value, tuple):
211 # Check if it was originally a list (frozen by _freeze_list)
212 return [self._copy_value(item) for item in value]
213 else:
214 return value
215
216 def __getitem__(self, key):
217 """
218 Return a mutable copy of the value.
219 """
220 value = super().__getitem__(key)
221 return self._copy_value(value)
222
223 def get(self, key, default=None):
224 """
225 Return a mutable copy of the value.
226 """
227 return self[key] if key in self else default
228
229 def __setitem__(self, key, value):
230 raise TypeError("ImmutableDict is immutable")
231
232 def __delitem__(self, key):
233 raise TypeError("ImmutableDict is immutable")
234
235 def clear(self):
236 raise TypeError("ImmutableDict is immutable")
237
238 def pop(self, *args):
239 raise TypeError("ImmutableDict is immutable")
240
241 def popitem(self):
242 raise TypeError("ImmutableDict is immutable")
243
244 def setdefault(self, key, default=None):
245 raise TypeError("ImmutableDict is immutable")
246
247 def update(self, *args, **kwargs):
248 raise TypeError("ImmutableDict is immutable")
249
250
251class SimpleProviderBundle(ProviderBundle):
252 def __init__(
253 self,
254 storage_provider_config: StorageProviderConfig,
255 credentials_provider: Optional[CredentialsProvider] = None,
256 metadata_provider: Optional[MetadataProvider] = None,
257 replicas: Optional[list[Replica]] = None,
258 ):
259 if replicas is None:
260 replicas = []
261
262 self._storage_provider_config = storage_provider_config
263 self._credentials_provider = credentials_provider
264 self._metadata_provider = metadata_provider
265 self._replicas = replicas
266
267 @property
268 def storage_provider_config(self) -> StorageProviderConfig:
269 return self._storage_provider_config
270
271 @property
272 def credentials_provider(self) -> Optional[CredentialsProvider]:
273 return self._credentials_provider
274
275 @property
276 def metadata_provider(self) -> Optional[MetadataProvider]:
277 return self._metadata_provider
278
279 @property
280 def replicas(self) -> list[Replica]:
281 return self._replicas
282
283
284DEFAULT_CACHE_REFRESH_INTERVAL = 300
285
286
287class StorageClientConfigLoader:
288 _provider_bundle: Optional[ProviderBundle]
289 _resolved_config_dict: dict[str, Any]
290 _profiles: dict[str, Any]
291 _profile: str
292 _profile_dict: dict[str, Any]
293 _opentelemetry_dict: Optional[dict[str, Any]]
294 _telemetry_provider: Optional[Callable[[], Telemetry]]
295 _cache_dict: Optional[dict[str, Any]]
296
297 def __init__(
298 self,
299 config_dict: dict[str, Any],
300 profile: str = DEFAULT_POSIX_PROFILE_NAME,
301 provider_bundle: Optional[ProviderBundle] = None,
302 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
303 ) -> None:
304 """
305 Initializes a :py:class:`StorageClientConfigLoader` to create a
306 StorageClientConfig. Components are built using the ``config_dict`` and
307 profile, but a pre-built provider_bundle takes precedence.
308
309 :param config_dict: Dictionary of configuration options.
310 :param profile: Name of profile in ``config_dict`` to use to build configuration.
311 :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle`, takes precedence over ``config_dict``.
312 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
313 """
314 # ProviderBundle takes precedence
315 self._provider_bundle = provider_bundle
316
317 # Interpolates all environment variables into actual values.
318 config_dict = expand_env_vars(config_dict)
319 self._resolved_config_dict = ImmutableDict(config_dict)
320
321 self._profiles = config_dict.get("profiles", {})
322
323 if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
324 # Assign the default POSIX profile
325 self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
326 else:
327 # Cannot override default POSIX profile
328 storage_provider_type = (
329 self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
330 )
331 if storage_provider_type != "file":
332 raise ValueError(
333 f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
334 f'"{storage_provider_type}"; expected "file".'
335 )
336
337 profile_dict = self._profiles.get(profile)
338
339 if not profile_dict:
340 raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
341
342 self._profile = profile
343 self._profile_dict = ImmutableDict(profile_dict)
344
345 self._opentelemetry_dict = config_dict.get("opentelemetry", None)
346 # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
347 # New processes (e.g. multiprocessing manager server) can't be created during this phase.
348 #
349 # Pass thunks everywhere instead for lazy telemetry initialization.
350 self._telemetry_provider = telemetry_provider or telemetry_init
351
352 self._cache_dict = config_dict.get("cache", None)
353 self._experimental_features = config_dict.get("experimental_features", None)
354
355 def _build_storage_provider(
356 self,
357 storage_provider_name: str,
358 storage_options: Optional[dict[str, Any]] = None,
359 credentials_provider: Optional[CredentialsProvider] = None,
360 ) -> StorageProvider:
361 if storage_options is None:
362 storage_options = {}
363 if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
364 raise ValueError(
365 f"Storage provider {storage_provider_name} is not supported. "
366 f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
367 )
368 if credentials_provider:
369 storage_options["credentials_provider"] = credentials_provider
370 if self._resolved_config_dict is not None:
371 # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
372 storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
373 if self._telemetry_provider is not None:
374 storage_options["telemetry_provider"] = self._telemetry_provider
375 class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
376 module_name = ".providers"
377 cls = import_class(class_name, module_name, PACKAGE_NAME)
378 return cls(**storage_options)
379
380 def _build_storage_provider_from_profile(self, storage_provider_profile: str):
381 storage_profile_dict = self._profiles.get(storage_provider_profile)
382 if not storage_profile_dict:
383 raise ValueError(
384 f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
385 )
386
387 # Check if metadata provider is configured for this profile
388 # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
389 local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
390 if local_metadata_provider_dict:
391 raise ValueError(
392 f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
393 )
394
395 # Initialize CredentialsProvider
396 local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
397 local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
398
399 # Initialize StorageProvider
400 local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
401 if local_storage_provider_dict:
402 local_name = local_storage_provider_dict["type"]
403 local_storage_options = local_storage_provider_dict.get("options", {})
404 else:
405 raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
406
407 storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
408 return storage_provider
409
410 def _build_credentials_provider(
411 self,
412 credentials_provider_dict: Optional[dict[str, Any]],
413 storage_options: Optional[dict[str, Any]] = None,
414 ) -> Optional[CredentialsProvider]:
415 """
416 Initializes the CredentialsProvider based on the provided dictionary.
417
418 Args:
419 credentials_provider_dict: Dictionary containing credentials provider configuration
420 storage_options: Storage provider options required by some credentials providers to scope the credentials.
421 """
422 if not credentials_provider_dict:
423 return None
424
425 if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
426 # Fully qualified class path case
427 class_type = credentials_provider_dict["type"]
428 module_name, class_name = class_type.rsplit(".", 1)
429 cls = import_class(class_name, module_name)
430 else:
431 # Mapped class name case
432 class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
433 module_name = ".providers"
434 cls = import_class(class_name, module_name, PACKAGE_NAME)
435
436 # Propagate storage provider options to credentials provider since they may be
437 # required by some credentials providers to scope the credentials.
438 import inspect
439
440 init_params = list(inspect.signature(cls.__init__).parameters)[1:] # skip 'self'
441 options = credentials_provider_dict.get("options", {})
442 if storage_options:
443 for storage_provider_option in storage_options.keys():
444 if storage_provider_option in init_params and storage_provider_option not in options:
445 options[storage_provider_option] = storage_options[storage_provider_option]
446
447 return cls(**options)
448
449 def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
450 # Initialize StorageProvider
451 storage_provider_dict = profile_dict.get("storage_provider", None)
452 if storage_provider_dict:
453 storage_provider_name = storage_provider_dict["type"]
454 storage_options = storage_provider_dict.get("options", {})
455 else:
456 raise ValueError("Missing storage_provider in the config.")
457
458 # Initialize CredentialsProvider
459 # It is prudent to assume that in some cases, the credentials provider
460 # will provide credentials scoped to specific base_path.
461 # So we need to pass the storage_options to the credentials provider.
462 credentials_provider_dict = profile_dict.get("credentials_provider", None)
463 credentials_provider = self._build_credentials_provider(
464 credentials_provider_dict=credentials_provider_dict,
465 storage_options=storage_options,
466 )
467
468 # Initialize MetadataProvider
469 metadata_provider_dict = profile_dict.get("metadata_provider", None)
470 metadata_provider = None
471 if metadata_provider_dict:
472 if metadata_provider_dict["type"] == "manifest":
473 metadata_options = metadata_provider_dict.get("options", {})
474 # If MetadataProvider has a reference to a different storage provider profile
475 storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
476 if storage_provider_profile:
477 storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
478 else:
479 storage_provider = self._build_storage_provider(
480 storage_provider_name, storage_options, credentials_provider
481 )
482
483 metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
484 else:
485 class_type = metadata_provider_dict["type"]
486 if "." not in class_type:
487 raise ValueError(
488 f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
489 )
490 module_name, class_name = class_type.rsplit(".", 1)
491 cls = import_class(class_name, module_name)
492 options = metadata_provider_dict.get("options", {})
493 metadata_provider = cls(**options)
494
495 # Build replicas if configured
496 replicas_config = profile_dict.get("replicas", [])
497 replicas = []
498 if replicas_config:
499 for replica_dict in replicas_config:
500 replicas.append(
501 Replica(
502 replica_profile=replica_dict["replica_profile"],
503 read_priority=replica_dict["read_priority"],
504 )
505 )
506
507 # Sort replicas by read_priority
508 replicas.sort(key=lambda r: r.read_priority)
509
510 return SimpleProviderBundle(
511 storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
512 credentials_provider=credentials_provider,
513 metadata_provider=metadata_provider,
514 replicas=replicas,
515 )
516
517 def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
518 class_type = provider_bundle_dict["type"]
519 module_name, class_name = class_type.rsplit(".", 1)
520 cls = import_class(class_name, module_name)
521 options = provider_bundle_dict.get("options", {})
522 return cls(**options)
523
524 def _build_provider_bundle(self) -> ProviderBundle:
525 if self._provider_bundle:
526 return self._provider_bundle # Return if previously provided.
527
528 # Load 3rd party extension
529 provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
530 if provider_bundle_dict:
531 return self._build_provider_bundle_from_extension(provider_bundle_dict)
532
533 return self._build_provider_bundle_from_config(self._profile_dict)
534
535 def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
536 if "size_mb" in cache_dict:
537 raise ValueError(
538 "The 'size_mb' property is no longer supported. \n"
539 "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
540 "Example configuration:\n"
541 "cache:\n"
542 " size: 500G # Optional: If not specified, default cache size (10GB) will be used\n"
543 " use_etag: true # Optional: If not specified, default cache use_etag (true) will be used\n"
544 " location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
545 " eviction_policy: # Optional: The eviction policy to use\n"
546 " policy: fifo # Optional: The eviction policy to use, default is 'fifo'\n"
547 " refresh_interval: 300 # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
548 )
549
550 def _validate_replicas(self, replicas: list[Replica]) -> None:
551 """
552 Validates that replica profiles do not have their own replicas configuration.
553
554 This prevents circular references where a replica profile could reference
555 another profile that also has replicas, creating an infinite loop.
556
557 :param replicas: The list of Replica objects to validate
558 :raises ValueError: If any replica profile has its own replicas configuration
559 """
560 for replica in replicas:
561 replica_profile_name = replica.replica_profile
562
563 # Check that replica profile is not the same as the current profile
564 if replica_profile_name == self._profile:
565 raise ValueError(
566 f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
567 )
568
569 # Check if the replica profile exists in the configuration
570 if replica_profile_name not in self._profiles:
571 raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
572
573 # Get the replica profile configuration
574 replica_profile_dict = self._profiles[replica_profile_name]
575
576 # Check if the replica profile has its own replicas configuration
577 if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
578 raise ValueError(
579 f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
580 f"This creates a circular reference which is not allowed."
581 )
582
583 # todo: remove once experimental features are stable
584 def _validate_experimental_features(self, eviction_policy: EvictionPolicyConfig) -> None:
585 """
586 Validate that experimental features are enabled when used.
587
588 :param eviction_policy: The eviction policy configuration to validate
589 :raises ValueError: If experimental features are used without being enabled
590 """
591 # Validate MRU eviction policy
592 if eviction_policy.policy.upper() == "MRU":
593 if not (self._experimental_features and self._experimental_features.get("cache_mru_eviction")):
594 raise ValueError(
595 "MRU eviction policy is experimental and not enabled.\n"
596 "Enable it by adding to config:\n"
597 " experimental_features:\n"
598 " cache_mru_eviction: true"
599 )
600
601 # Validate purge_factor
602 if eviction_policy.purge_factor != 0:
603 if eviction_policy.purge_factor < 0 or eviction_policy.purge_factor > 100:
604 raise ValueError("purge_factor must be between 0 and 100")
605
606 if not (self._experimental_features and self._experimental_features.get("cache_purge_factor")):
607 raise ValueError(
608 "purge_factor is experimental and not enabled.\n"
609 "Enable it by adding to config:\n"
610 " experimental_features:\n"
611 " cache_purge_factor: true"
612 )
613
614 def build_config(self) -> "StorageClientConfig":
615 bundle = self._build_provider_bundle()
616
617 # Validate replicas to prevent circular references
618 self._validate_replicas(bundle.replicas)
619
620 storage_provider = self._build_storage_provider(
621 bundle.storage_provider_config.type,
622 bundle.storage_provider_config.options,
623 bundle.credentials_provider,
624 )
625
626 cache_config: Optional[CacheConfig] = None
627 cache_manager: Optional[CacheManager] = None
628
629 # Check if caching is enabled for this profile
630 caching_enabled = self._profile_dict.get("caching_enabled", False)
631
632 if self._cache_dict is not None and caching_enabled:
633 tempdir = tempfile.gettempdir()
634 default_location = os.path.join(tempdir, "msc_cache")
635 location = self._cache_dict.get("location", default_location)
636
637 # Check if cache_backend.cache_path is defined
638 cache_backend = self._cache_dict.get("cache_backend", {})
639 cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
640
641 # Warn if both location and cache_backend.cache_path are defined
642 if cache_backend_path and self._cache_dict.get("location") is not None:
643 logger.warning(
644 f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
645 f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
646 )
647 elif cache_backend_path:
648 # Use cache_backend.cache_path only if location is not explicitly defined
649 location = cache_backend_path
650
651 # Resolve the effective flag while preserving explicit ``False``.
652 if "check_source_version" in self._cache_dict:
653 check_source_version = self._cache_dict["check_source_version"]
654 else:
655 check_source_version = self._cache_dict.get("use_etag", True)
656
657 # Warn if both keys are specified – the new one wins.
658 if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
659 logger.warning(
660 "Both 'check_source_version' and 'use_etag' are defined in cache config. "
661 "Using 'check_source_version' and ignoring 'use_etag'."
662 )
663
664 if not Path(location).is_absolute():
665 raise ValueError(f"Cache location must be an absolute path: {location}")
666
667 # Initialize cache_dict with default values
668 cache_dict = self._cache_dict
669
670 # Verify cache config
671 self._verify_cache_config(cache_dict)
672
673 # Initialize eviction policy
674 if "eviction_policy" in cache_dict:
675 policy = cache_dict["eviction_policy"]["policy"].lower()
676 purge_factor = cache_dict["eviction_policy"].get("purge_factor", 0)
677 eviction_policy = EvictionPolicyConfig(
678 policy=policy,
679 refresh_interval=cache_dict["eviction_policy"].get(
680 "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
681 ),
682 purge_factor=purge_factor,
683 )
684 else:
685 eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
686
687 # todo: remove once experimental features are stable
688 # Validate experimental features before creating cache_config
689 self._validate_experimental_features(eviction_policy)
690
691 # Create cache config from the standardized format
692 cache_config = CacheConfig(
693 size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
694 location=cache_dict.get("location", location),
695 check_source_version=check_source_version,
696 eviction_policy=eviction_policy,
697 cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
698 )
699
700 cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
701 elif self._cache_dict is not None and not caching_enabled:
702 logger.debug(f"Caching is disabled for profile '{self._profile}'")
703 elif self._cache_dict is None and caching_enabled:
704 logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
705
706 # retry options
707 retry_config_dict = self._profile_dict.get("retry", None)
708 if retry_config_dict:
709 attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
710 delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
711 backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
712 retry_config = RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
713 else:
714 retry_config = RetryConfig(
715 attempts=DEFAULT_RETRY_ATTEMPTS,
716 delay=DEFAULT_RETRY_DELAY,
717 backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
718 )
719
720 # autocommit options
721 autocommit_config = AutoCommitConfig()
722 autocommit_dict = self._profile_dict.get("autocommit", None)
723 if autocommit_dict:
724 interval_minutes = autocommit_dict.get("interval_minutes", None)
725 at_exit = autocommit_dict.get("at_exit", False)
726 autocommit_config = AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
727
728 return StorageClientConfig(
729 profile=self._profile,
730 storage_provider=storage_provider,
731 credentials_provider=bundle.credentials_provider,
732 metadata_provider=bundle.metadata_provider,
733 cache_config=cache_config,
734 cache_manager=cache_manager,
735 retry_config=retry_config,
736 telemetry_provider=self._telemetry_provider,
737 replicas=bundle.replicas,
738 autocommit_config=autocommit_config,
739 )
740
741
742class PathMapping:
743 """
744 Class to handle path mappings defined in the MSC configuration.
745
746 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
747 where entries are sorted by prefix length (longest first) for optimal matching.
748 Longer paths take precedence when matching.
749 """
750
751 def __init__(self):
752 """Initialize an empty PathMapping."""
753 self._mapping = defaultdict(lambda: defaultdict(list))
754
755 @classmethod
756 def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
757 """
758 Create a PathMapping instance from configuration dictionary.
759
760 :param config_dict: Configuration dictionary, if None the config will be loaded
761 :return: A PathMapping instance with processed mappings
762 """
763 if config_dict is None:
764 # Import locally to avoid circular imports
765 from multistorageclient.config import StorageClientConfig
766
767 config_dict, _ = StorageClientConfig.read_msc_config()
768
769 if not config_dict:
770 return cls()
771
772 instance = cls()
773 instance._load_mapping(config_dict)
774 return instance
775
776 def _load_mapping(self, config_dict: dict[str, Any]) -> None:
777 """
778 Load path mapping from a configuration dictionary.
779
780 :param config_dict: Configuration dictionary containing path mapping
781 """
782 # Get the path_mapping section
783 path_mapping = config_dict.get("path_mapping", {})
784 if path_mapping is None:
785 return
786
787 # Process each mapping
788 for source_path, dest_path in path_mapping.items():
789 # Validate format
790 if not source_path.endswith("/"):
791 continue
792 if not dest_path.startswith(MSC_PROTOCOL):
793 continue
794 if not dest_path.endswith("/"):
795 continue
796
797 # Extract the destination profile
798 pr_dest = urlparse(dest_path)
799 dest_profile = pr_dest.netloc
800
801 # Parse the source path
802 pr = urlparse(source_path)
803 protocol = pr.scheme.lower() if pr.scheme else "file"
804
805 if protocol == "file" or source_path.startswith("/"):
806 # For file or absolute paths, use the whole path as the prefix
807 # and leave bucket empty
808 bucket = ""
809 prefix = source_path if source_path.startswith("/") else pr.path
810 else:
811 # For object storage, extract bucket and prefix
812 bucket = pr.netloc
813 prefix = pr.path
814 if prefix.startswith("/"):
815 prefix = prefix[1:]
816
817 # Add the mapping to the nested dict
818 self._mapping[protocol][bucket].append((prefix, dest_profile))
819
820 # Sort each bucket's prefixes by length (longest first) for optimal matching
821 for protocol, buckets in self._mapping.items():
822 for bucket, prefixes in buckets.items():
823 self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
824
825 def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
826 """
827 Find the best matching mapping for the given URL.
828
829 :param url: URL to find matching mapping for
830 :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
831 """
832 # Parse the URL
833 pr = urlparse(url)
834 protocol = pr.scheme.lower() if pr.scheme else "file"
835
836 # For file paths or absolute paths
837 if protocol == "file" or url.startswith("/"):
838 path = url if url.startswith("/") else pr.path
839
840 possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
841
842 # Check each prefix (already sorted by length, longest first)
843 for prefix, profile in possible_mapping:
844 if path.startswith(prefix):
845 # Calculate the relative path
846 rel_path = path[len(prefix) :]
847 if not rel_path.startswith("/"):
848 rel_path = "/" + rel_path
849 return profile, rel_path
850
851 return None
852
853 # For object storage
854 bucket = pr.netloc
855 path = pr.path
856 if path.startswith("/"):
857 path = path[1:]
858
859 # Check bucket-specific mapping
860 possible_mapping = (
861 self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
862 )
863
864 # Check each prefix (already sorted by length, longest first)
865 for prefix, profile in possible_mapping:
866 # matching prefix
867 if path.startswith(prefix):
868 rel_path = path[len(prefix) :]
869 # Remove leading slash if present
870 if rel_path.startswith("/"):
871 rel_path = rel_path[1:]
872
873 return profile, rel_path
874
875 return None
876
877
[docs]
878class StorageClientConfig:
879 """
880 Configuration class for the :py:class:`multistorageclient.StorageClient`.
881 """
882
883 profile: str
884 storage_provider: StorageProvider
885 credentials_provider: Optional[CredentialsProvider]
886 metadata_provider: Optional[MetadataProvider]
887 cache_config: Optional[CacheConfig]
888 cache_manager: Optional[CacheManager]
889 retry_config: Optional[RetryConfig]
890 telemetry_provider: Optional[Callable[[], Telemetry]]
891 replicas: list[Replica]
892 autocommit_config: Optional[AutoCommitConfig]
893
894 _config_dict: Optional[dict[str, Any]]
895
896 def __init__(
897 self,
898 profile: str,
899 storage_provider: StorageProvider,
900 credentials_provider: Optional[CredentialsProvider] = None,
901 metadata_provider: Optional[MetadataProvider] = None,
902 cache_config: Optional[CacheConfig] = None,
903 cache_manager: Optional[CacheManager] = None,
904 retry_config: Optional[RetryConfig] = None,
905 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
906 replicas: Optional[list[Replica]] = None,
907 autocommit_config: Optional[AutoCommitConfig] = None,
908 ):
909 if replicas is None:
910 replicas = []
911 self.profile = profile
912 self.storage_provider = storage_provider
913 self.credentials_provider = credentials_provider
914 self.metadata_provider = metadata_provider
915 self.cache_config = cache_config
916 self.cache_manager = cache_manager
917 self.retry_config = retry_config
918 self.telemetry_provider = telemetry_provider
919 self.replicas = replicas
920 self.autocommit_config = autocommit_config
921
[docs]
922 @staticmethod
923 def from_json(
924 config_json: str,
925 profile: str = DEFAULT_POSIX_PROFILE_NAME,
926 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
927 ) -> "StorageClientConfig":
928 """
929 Load a storage client configuration from a JSON string.
930
931 :param config_json: Configuration JSON string.
932 :param profile: Profile to use.
933 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
934 """
935 config_dict = json.loads(config_json)
936 return StorageClientConfig.from_dict(
937 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
938 )
939
[docs]
940 @staticmethod
941 def from_yaml(
942 config_yaml: str,
943 profile: str = DEFAULT_POSIX_PROFILE_NAME,
944 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
945 ) -> "StorageClientConfig":
946 """
947 Load a storage client configuration from a YAML string.
948
949 :param config_yaml: Configuration YAML string.
950 :param profile: Profile to use.
951 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
952 """
953 config_dict = yaml.safe_load(config_yaml)
954 return StorageClientConfig.from_dict(
955 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
956 )
957
[docs]
958 @staticmethod
959 def from_dict(
960 config_dict: dict[str, Any],
961 profile: str = DEFAULT_POSIX_PROFILE_NAME,
962 skip_validation: bool = False,
963 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
964 ) -> "StorageClientConfig":
965 """
966 Load a storage client configuration from a Python dictionary.
967
968 :param config_dict: Configuration Python dictionary.
969 :param profile: Profile to use.
970 :param skip_validation: Skip configuration schema validation.
971 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
972 """
973 # Validate the config file with predefined JSON schema
974 if not skip_validation:
975 validate_config(config_dict)
976
977 # Load config
978 loader = StorageClientConfigLoader(
979 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
980 )
981 config = loader.build_config()
982 config._config_dict = config_dict
983
984 return config
985
[docs]
986 @staticmethod
987 def from_file(
988 config_file_paths: Optional[Iterable[str]] = None,
989 profile: str = DEFAULT_POSIX_PROFILE_NAME,
990 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
991 ) -> "StorageClientConfig":
992 """
993 Load a storage client configuration from the first file found.
994
995 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`).
996 :param profile: Profile to use.
997 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
998 """
999 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths)
1000 # Parse rclone config file.
1001 rclone_config_dict, rclone_config_file = read_rclone_config()
1002
1003 # Merge config files.
1004 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict)
1005 if conflicted_keys:
1006 raise ValueError(
1007 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}'
1008 )
1009 merged_profiles = merged_config.get("profiles", {})
1010
1011 # Check if profile is in merged_profiles
1012 if profile in merged_profiles:
1013 return StorageClientConfig.from_dict(
1014 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider
1015 )
1016 else:
1017 # Check if profile is the default profile or an implicit profile
1018 if profile == DEFAULT_POSIX_PROFILE_NAME:
1019 implicit_profile_config = DEFAULT_POSIX_PROFILE
1020 elif profile.startswith("_"):
1021 # Handle implicit profiles
1022 parts = profile[1:].split("-", 1)
1023 if len(parts) == 2:
1024 protocol, bucket = parts
1025 # Verify it's a supported protocol
1026 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS:
1027 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"')
1028 implicit_profile_config = create_implicit_profile_config(
1029 profile_name=profile, protocol=protocol, base_path=bucket
1030 )
1031 else:
1032 raise ValueError(f'Invalid implicit profile format: "{profile}"')
1033 else:
1034 raise ValueError(
1035 f'Profile "{profile}" not found in configuration files. Configuration was checked in '
1036 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. "
1037 f"Please verify that the profile exists and that configuration files are correctly located."
1038 )
1039 # merge the implicit profile config into the merged config so the cache & observability config can be inherited
1040 if "profiles" not in merged_config:
1041 merged_config["profiles"] = implicit_profile_config["profiles"]
1042 else:
1043 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile]
1044 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_"
1045 return StorageClientConfig.from_dict(
1046 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider
1047 )
1048
1049 @staticmethod
1050 def from_provider_bundle(
1051 config_dict: dict[str, Any],
1052 provider_bundle: ProviderBundle,
1053 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1054 ) -> "StorageClientConfig":
1055 loader = StorageClientConfigLoader(
1056 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider
1057 )
1058 config = loader.build_config()
1059 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors
1060 return config
1061
[docs]
1062 @staticmethod
1063 def read_msc_config(
1064 config_file_paths: Optional[Iterable[str]] = None,
1065 ) -> tuple[Optional[dict[str, Any]], Optional[str]]:
1066 """Get the MSC configuration dictionary and the path of the first file found.
1067
1068 If no config paths are specified, configs are searched in the following order:
1069
1070 1. ``MSC_CONFIG`` environment variable (highest precedence)
1071 2. Standard search paths (user-specified config and system-wide config)
1072
1073 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used.
1074 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration
1075 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute
1076 path of the config file used, or ``None`` if no config file was found.
1077 """
1078 config_dict: dict[str, Any] = {}
1079 config_file_path: Optional[str] = None
1080
1081 config_file_paths = list(config_file_paths or [])
1082
1083 # Add default paths if none provided.
1084 if len(config_file_paths) == 0:
1085 # Environment variable.
1086 msc_config_env = os.getenv("MSC_CONFIG", None)
1087 if msc_config_env is not None:
1088 config_file_paths.append(msc_config_env)
1089
1090 # Standard search paths.
1091 config_file_paths.extend(_find_config_file_paths())
1092
1093 # Normalize + absolutize paths.
1094 config_file_paths = [os.path.abspath(path) for path in config_file_paths]
1095
1096 # Log plan.
1097 logger.debug(f"Searching MSC config file paths: {config_file_paths}")
1098
1099 # Load config.
1100 for path in config_file_paths:
1101 if os.path.exists(path):
1102 try:
1103 with open(path) as f:
1104 if path.endswith(".json"):
1105 config_dict = json.load(f)
1106 else:
1107 config_dict = yaml.safe_load(f)
1108 config_file_path = path
1109 # Use the first config file.
1110 break
1111 except Exception as e:
1112 raise ValueError(f"malformed MSC config file: {path}, exception: {e}")
1113
1114 # Log result.
1115 if config_file_path is None:
1116 logger.debug("No MSC config files found in any of the search locations.")
1117 else:
1118 logger.debug(f"Using MSC config file: {config_file_path}")
1119
1120 if config_dict:
1121 validate_config(config_dict)
1122 return config_dict, config_file_path
1123
[docs]
1124 @staticmethod
1125 def read_path_mapping() -> PathMapping:
1126 """
1127 Get the path mapping defined in the MSC configuration.
1128
1129 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1130 where entries are sorted by prefix length (longest first) for optimal matching.
1131 Longer paths take precedence when matching.
1132
1133 :return: A PathMapping instance with translation mappings
1134 """
1135 try:
1136 return PathMapping.from_config()
1137 except Exception:
1138 # Log the error but continue - this shouldn't stop the application from working
1139 logger.error("Failed to load path_mapping from MSC config")
1140 return PathMapping()
1141
1142 def __getstate__(self) -> dict[str, Any]:
1143 state = self.__dict__.copy()
1144 if not state.get("_config_dict"):
1145 raise ValueError("StorageClientConfig is not serializable")
1146 del state["credentials_provider"]
1147 del state["storage_provider"]
1148 del state["metadata_provider"]
1149 del state["cache_manager"]
1150 del state["replicas"]
1151 return state
1152
1153 def __setstate__(self, state: dict[str, Any]) -> None:
1154 # Presence checked by __getstate__.
1155 config_dict = state["_config_dict"]
1156 loader = StorageClientConfigLoader(
1157 config_dict=config_dict,
1158 profile=state["profile"],
1159 telemetry_provider=state["telemetry_provider"],
1160 )
1161 new_config = loader.build_config()
1162 self.profile = new_config.profile
1163 self.storage_provider = new_config.storage_provider
1164 self.credentials_provider = new_config.credentials_provider
1165 self.metadata_provider = new_config.metadata_provider
1166 self.cache_config = new_config.cache_config
1167 self.cache_manager = new_config.cache_manager
1168 self.retry_config = new_config.retry_config
1169 self.telemetry_provider = new_config.telemetry_provider
1170 self._config_dict = config_dict
1171 self.replicas = new_config.replicas
1172 self.autocommit_config = new_config.autocommit_config