1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import copy
17import json
18import logging
19import os
20import tempfile
21from collections import defaultdict
22from collections.abc import Callable
23from pathlib import Path
24from typing import Any, Iterable, Optional
25from urllib.parse import urlparse
26
27import yaml
28
29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
31from .instrumentation import setup_opentelemetry
32from .providers.manifest_metadata import ManifestMetadataProvider
33from .rclone import read_rclone_config
34from .schema import validate_config
35from .telemetry import Telemetry
36from .telemetry import init as telemetry_init
37from .types import (
38 DEFAULT_RETRY_ATTEMPTS,
39 DEFAULT_RETRY_BACKOFF_MULTIPLIER,
40 DEFAULT_RETRY_DELAY,
41 MSC_PROTOCOL,
42 AutoCommitConfig,
43 CredentialsProvider,
44 MetadataProvider,
45 ProviderBundle,
46 Replica,
47 RetryConfig,
48 StorageProvider,
49 StorageProviderConfig,
50)
51from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
52
53# Constants related to implicit profiles
54SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
55PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
56 "s3": "s3",
57 "gs": "gcs",
58 "ais": "ais",
59 "file": "file",
60}
61
62
63# Template for creating implicit profile configurations
64def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
65 """
66 Create a configuration dictionary for an implicit profile.
67
68 :param profile_name: The name of the profile (e.g., "_s3-bucket1")
69 :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
70 :param base_path: The base path (e.g., bucket name) for the storage provider
71
72 :return: A configuration dictionary for the implicit profile
73 """
74 provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
75 return {
76 "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
77 }
78
79
80DEFAULT_POSIX_PROFILE_NAME = "default"
81DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
82
83STORAGE_PROVIDER_MAPPING = {
84 "file": "PosixFileStorageProvider",
85 "s3": "S3StorageProvider",
86 "gcs": "GoogleStorageProvider",
87 "oci": "OracleStorageProvider",
88 "azure": "AzureBlobStorageProvider",
89 "ais": "AIStoreStorageProvider",
90 "s8k": "S8KStorageProvider",
91 "gcs_s3": "GoogleS3StorageProvider",
92 "huggingface": "HuggingFaceStorageProvider",
93}
94
95CREDENTIALS_PROVIDER_MAPPING = {
96 "S3Credentials": "StaticS3CredentialsProvider",
97 "AzureCredentials": "StaticAzureCredentialsProvider",
98 "AISCredentials": "StaticAISCredentialProvider",
99 "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
100 "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
101}
102
103
104def _find_config_file_paths() -> tuple[str]:
105 """
106 Get configuration file search paths.
107
108 Returns paths in order of precedence:
109
110 1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
111 2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
112 """
113 paths = []
114
115 # 1. User-specific configuration directory
116 xdg_config_home = os.getenv("XDG_CONFIG_HOME")
117
118 if xdg_config_home:
119 paths.extend(
120 [
121 os.path.join(xdg_config_home, "msc", "config.yaml"),
122 os.path.join(xdg_config_home, "msc", "config.json"),
123 ]
124 )
125
126 user_home = os.getenv("HOME")
127
128 if user_home:
129 paths.extend(
130 [
131 os.path.join(user_home, ".msc_config.yaml"),
132 os.path.join(user_home, ".msc_config.json"),
133 os.path.join(user_home, ".config", "msc", "config.yaml"),
134 os.path.join(user_home, ".config", "msc", "config.json"),
135 ]
136 )
137
138 # 2. System-wide configuration directories
139 xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
140 if not xdg_config_dirs:
141 xdg_config_dirs = "/etc/xdg"
142
143 for config_dir in xdg_config_dirs.split(":"):
144 config_dir = config_dir.strip()
145 if config_dir:
146 paths.extend(
147 [
148 os.path.join(config_dir, "msc", "config.yaml"),
149 os.path.join(config_dir, "msc", "config.json"),
150 ]
151 )
152
153 paths.extend(
154 [
155 "/etc/msc_config.yaml",
156 "/etc/msc_config.json",
157 ]
158 )
159
160 return tuple(paths)
161
162
163PACKAGE_NAME = "multistorageclient"
164
165logger = logging.getLogger(__name__)
166
167
168class SimpleProviderBundle(ProviderBundle):
169 def __init__(
170 self,
171 storage_provider_config: StorageProviderConfig,
172 credentials_provider: Optional[CredentialsProvider] = None,
173 metadata_provider: Optional[MetadataProvider] = None,
174 replicas: Optional[list[Replica]] = None,
175 ):
176 if replicas is None:
177 replicas = []
178
179 self._storage_provider_config = storage_provider_config
180 self._credentials_provider = credentials_provider
181 self._metadata_provider = metadata_provider
182 self._replicas = replicas
183
184 @property
185 def storage_provider_config(self) -> StorageProviderConfig:
186 return self._storage_provider_config
187
188 @property
189 def credentials_provider(self) -> Optional[CredentialsProvider]:
190 return self._credentials_provider
191
192 @property
193 def metadata_provider(self) -> Optional[MetadataProvider]:
194 return self._metadata_provider
195
196 @property
197 def replicas(self) -> list[Replica]:
198 return self._replicas
199
200
201DEFAULT_CACHE_REFRESH_INTERVAL = 300
202
203
204class StorageClientConfigLoader:
205 _provider_bundle: Optional[ProviderBundle]
206 _resolved_config_dict: dict[str, Any]
207 _profiles: dict[str, Any]
208 _profile: str
209 _profile_dict: dict[str, Any]
210 _opentelemetry_dict: Optional[dict[str, Any]]
211 _telemetry_provider: Optional[Callable[[], Telemetry]]
212 _cache_dict: Optional[dict[str, Any]]
213
214 def __init__(
215 self,
216 config_dict: dict[str, Any],
217 profile: str = DEFAULT_POSIX_PROFILE_NAME,
218 provider_bundle: Optional[ProviderBundle] = None,
219 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
220 ) -> None:
221 """
222 Initializes a :py:class:`StorageClientConfigLoader` to create a
223 StorageClientConfig. Components are built using the ``config_dict`` and
224 profile, but a pre-built provider_bundle takes precedence.
225
226 :param config_dict: Dictionary of configuration options.
227 :param profile: Name of profile in ``config_dict`` to use to build configuration.
228 :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle`, takes precedence over ``config_dict``.
229 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
230 """
231 # ProviderBundle takes precedence
232 self._provider_bundle = provider_bundle
233
234 # Interpolates all environment variables into actual values.
235 config_dict = expand_env_vars(config_dict)
236 self._resolved_config_dict = config_dict
237
238 self._profiles = config_dict.get("profiles", {})
239
240 if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
241 # Assign the default POSIX profile
242 self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
243 else:
244 # Cannot override default POSIX profile
245 storage_provider_type = (
246 self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
247 )
248 if storage_provider_type != "file":
249 raise ValueError(
250 f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
251 f'"{storage_provider_type}"; expected "file".'
252 )
253
254 profile_dict = self._profiles.get(profile)
255
256 if not profile_dict:
257 raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
258
259 self._profile = profile
260 self._profile_dict = profile_dict
261
262 self._opentelemetry_dict = config_dict.get("opentelemetry", None)
263 # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
264 # New processes (e.g. multiprocessing manager server) can't be created during this phase.
265 #
266 # Pass thunks everywhere instead for lazy telemetry initialization.
267 self._telemetry_provider = telemetry_provider or telemetry_init
268
269 self._cache_dict = config_dict.get("cache", None)
270
271 def _build_storage_provider(
272 self,
273 storage_provider_name: str,
274 storage_options: Optional[dict[str, Any]] = None,
275 credentials_provider: Optional[CredentialsProvider] = None,
276 ) -> StorageProvider:
277 if storage_options is None:
278 storage_options = {}
279 if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
280 raise ValueError(
281 f"Storage provider {storage_provider_name} is not supported. "
282 f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
283 )
284 if credentials_provider:
285 storage_options["credentials_provider"] = credentials_provider
286 if self._resolved_config_dict is not None:
287 # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
288 storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
289 if self._telemetry_provider is not None:
290 storage_options["telemetry_provider"] = self._telemetry_provider
291 class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
292 module_name = ".providers"
293 cls = import_class(class_name, module_name, PACKAGE_NAME)
294 return cls(**storage_options)
295
296 def _build_storage_provider_from_profile(self, storage_provider_profile: str):
297 storage_profile_dict = self._profiles.get(storage_provider_profile)
298 if not storage_profile_dict:
299 raise ValueError(
300 f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
301 )
302
303 # Check if metadata provider is configured for this profile
304 # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
305 local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
306 if local_metadata_provider_dict:
307 raise ValueError(
308 f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
309 )
310
311 # Initialize CredentialsProvider
312 local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
313 local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
314
315 # Initialize StorageProvider
316 local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
317 if local_storage_provider_dict:
318 local_name = local_storage_provider_dict["type"]
319 local_storage_options = local_storage_provider_dict.get("options", {})
320 else:
321 raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
322
323 storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
324 return storage_provider
325
326 def _build_credentials_provider(
327 self,
328 credentials_provider_dict: Optional[dict[str, Any]],
329 storage_options: Optional[dict[str, Any]] = None,
330 ) -> Optional[CredentialsProvider]:
331 """
332 Initializes the CredentialsProvider based on the provided dictionary.
333
334 Args:
335 credentials_provider_dict: Dictionary containing credentials provider configuration
336 storage_options: Storage provider options required by some credentials providers to scope the credentials.
337 """
338 if not credentials_provider_dict:
339 return None
340
341 if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
342 # Fully qualified class path case
343 class_type = credentials_provider_dict["type"]
344 module_name, class_name = class_type.rsplit(".", 1)
345 cls = import_class(class_name, module_name)
346 else:
347 # Mapped class name case
348 class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
349 module_name = ".providers"
350 cls = import_class(class_name, module_name, PACKAGE_NAME)
351
352 # Propagate storage provider options to credentials provider since they may be
353 # required by some credentials providers to scope the credentials.
354 import inspect
355
356 init_params = list(inspect.signature(cls.__init__).parameters)[1:] # skip 'self'
357 options = credentials_provider_dict.get("options", {})
358 if storage_options:
359 for storage_provider_option in storage_options.keys():
360 if storage_provider_option in init_params and storage_provider_option not in options:
361 options[storage_provider_option] = storage_options[storage_provider_option]
362
363 return cls(**options)
364
365 def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
366 # Initialize StorageProvider
367 storage_provider_dict = profile_dict.get("storage_provider", None)
368 if storage_provider_dict:
369 storage_provider_name = storage_provider_dict["type"]
370 storage_options = storage_provider_dict.get("options", {})
371 else:
372 raise ValueError("Missing storage_provider in the config.")
373
374 # Initialize CredentialsProvider
375 # It is prudent to assume that in some cases, the credentials provider
376 # will provide credentials scoped to specific base_path.
377 # So we need to pass the storage_options to the credentials provider.
378 credentials_provider_dict = profile_dict.get("credentials_provider", None)
379 credentials_provider = self._build_credentials_provider(
380 credentials_provider_dict=credentials_provider_dict,
381 storage_options=storage_options,
382 )
383
384 # Initialize MetadataProvider
385 metadata_provider_dict = profile_dict.get("metadata_provider", None)
386 metadata_provider = None
387 if metadata_provider_dict:
388 if metadata_provider_dict["type"] == "manifest":
389 metadata_options = metadata_provider_dict.get("options", {})
390 # If MetadataProvider has a reference to a different storage provider profile
391 storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
392 if storage_provider_profile:
393 storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
394 else:
395 storage_provider = self._build_storage_provider(
396 storage_provider_name, storage_options, credentials_provider
397 )
398
399 metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
400 else:
401 class_type = metadata_provider_dict["type"]
402 if "." not in class_type:
403 raise ValueError(
404 f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
405 )
406 module_name, class_name = class_type.rsplit(".", 1)
407 cls = import_class(class_name, module_name)
408 options = metadata_provider_dict.get("options", {})
409 metadata_provider = cls(**options)
410
411 # Build replicas if configured
412 replicas_config = profile_dict.get("replicas", [])
413 replicas = []
414 if replicas_config:
415 for replica_dict in replicas_config:
416 replicas.append(
417 Replica(
418 replica_profile=replica_dict["replica_profile"],
419 read_priority=replica_dict["read_priority"],
420 )
421 )
422
423 # Sort replicas by read_priority
424 replicas.sort(key=lambda r: r.read_priority)
425
426 return SimpleProviderBundle(
427 storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
428 credentials_provider=credentials_provider,
429 metadata_provider=metadata_provider,
430 replicas=replicas,
431 )
432
433 def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
434 class_type = provider_bundle_dict["type"]
435 module_name, class_name = class_type.rsplit(".", 1)
436 cls = import_class(class_name, module_name)
437 options = provider_bundle_dict.get("options", {})
438 return cls(**options)
439
440 def _build_provider_bundle(self) -> ProviderBundle:
441 if self._provider_bundle:
442 return self._provider_bundle # Return if previously provided.
443
444 # Load 3rd party extension
445 provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
446 if provider_bundle_dict:
447 return self._build_provider_bundle_from_extension(provider_bundle_dict)
448
449 return self._build_provider_bundle_from_config(self._profile_dict)
450
451 def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
452 if "size_mb" in cache_dict:
453 raise ValueError(
454 "The 'size_mb' property is no longer supported. \n"
455 "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
456 "Example configuration:\n"
457 "cache:\n"
458 " size: 500G # Optional: If not specified, default cache size (10GB) will be used\n"
459 " use_etag: true # Optional: If not specified, default cache use_etag (true) will be used\n"
460 " location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
461 " eviction_policy: # Optional: The eviction policy to use\n"
462 " policy: fifo # Optional: The eviction policy to use, default is 'fifo'\n"
463 " refresh_interval: 300 # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
464 )
465
466 def _validate_replicas(self, replicas: list[Replica]) -> None:
467 """
468 Validates that replica profiles do not have their own replicas configuration.
469
470 This prevents circular references where a replica profile could reference
471 another profile that also has replicas, creating an infinite loop.
472
473 :param replicas: The list of Replica objects to validate
474 :raises ValueError: If any replica profile has its own replicas configuration
475 """
476 for replica in replicas:
477 replica_profile_name = replica.replica_profile
478
479 # Check that replica profile is not the same as the current profile
480 if replica_profile_name == self._profile:
481 raise ValueError(
482 f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
483 )
484
485 # Check if the replica profile exists in the configuration
486 if replica_profile_name not in self._profiles:
487 raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
488
489 # Get the replica profile configuration
490 replica_profile_dict = self._profiles[replica_profile_name]
491
492 # Check if the replica profile has its own replicas configuration
493 if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
494 raise ValueError(
495 f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
496 f"This creates a circular reference which is not allowed."
497 )
498
499 def build_config(self) -> "StorageClientConfig":
500 bundle = self._build_provider_bundle()
501
502 # Validate replicas to prevent circular references
503 self._validate_replicas(bundle.replicas)
504
505 storage_provider = self._build_storage_provider(
506 bundle.storage_provider_config.type,
507 bundle.storage_provider_config.options,
508 bundle.credentials_provider,
509 )
510
511 cache_config: Optional[CacheConfig] = None
512 cache_manager: Optional[CacheManager] = None
513
514 # Check if caching is enabled for this profile
515 caching_enabled = self._profile_dict.get("caching_enabled", False)
516
517 if self._cache_dict is not None and caching_enabled:
518 tempdir = tempfile.gettempdir()
519 default_location = os.path.join(tempdir, "msc_cache")
520 location = self._cache_dict.get("location", default_location)
521
522 # Check if cache_backend.cache_path is defined
523 cache_backend = self._cache_dict.get("cache_backend", {})
524 cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
525
526 # Warn if both location and cache_backend.cache_path are defined
527 if cache_backend_path and self._cache_dict.get("location") is not None:
528 logger.warning(
529 f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
530 f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
531 )
532 elif cache_backend_path:
533 # Use cache_backend.cache_path only if location is not explicitly defined
534 location = cache_backend_path
535
536 # Resolve the effective flag while preserving explicit ``False``.
537 if "check_source_version" in self._cache_dict:
538 check_source_version = self._cache_dict["check_source_version"]
539 else:
540 check_source_version = self._cache_dict.get("use_etag", True)
541
542 # Warn if both keys are specified – the new one wins.
543 if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
544 logger.warning(
545 "Both 'check_source_version' and 'use_etag' are defined in cache config. "
546 "Using 'check_source_version' and ignoring 'use_etag'."
547 )
548
549 if not Path(location).is_absolute():
550 raise ValueError(f"Cache location must be an absolute path: {location}")
551
552 # Initialize cache_dict with default values
553 cache_dict = self._cache_dict
554
555 # Verify cache config
556 self._verify_cache_config(cache_dict)
557
558 # Initialize eviction policy
559 if "eviction_policy" in cache_dict:
560 policy = cache_dict["eviction_policy"]["policy"].lower()
561 eviction_policy = EvictionPolicyConfig(
562 policy=policy,
563 refresh_interval=cache_dict["eviction_policy"].get(
564 "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
565 ),
566 )
567 else:
568 eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
569
570 # Create cache config from the standardized format
571 cache_config = CacheConfig(
572 size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
573 location=cache_dict.get("location", location),
574 check_source_version=check_source_version,
575 eviction_policy=eviction_policy,
576 cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
577 )
578
579 cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
580 elif self._cache_dict is not None and not caching_enabled:
581 logger.debug(f"Caching is disabled for profile '{self._profile}'")
582 elif self._cache_dict is None and caching_enabled:
583 logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
584
585 # retry options
586 retry_config_dict = self._profile_dict.get("retry", None)
587 if retry_config_dict:
588 attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
589 delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
590 backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
591 retry_config = RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
592 else:
593 retry_config = RetryConfig(
594 attempts=DEFAULT_RETRY_ATTEMPTS,
595 delay=DEFAULT_RETRY_DELAY,
596 backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
597 )
598
599 # autocommit options
600 autocommit_config = AutoCommitConfig()
601 autocommit_dict = self._profile_dict.get("autocommit", None)
602 if autocommit_dict:
603 interval_minutes = autocommit_dict.get("interval_minutes", None)
604 at_exit = autocommit_dict.get("at_exit", False)
605 autocommit_config = AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
606
607 # set up OpenTelemetry providers once per process
608 #
609 # TODO: Legacy, need to remove.
610 if self._opentelemetry_dict:
611 setup_opentelemetry(self._opentelemetry_dict)
612
613 return StorageClientConfig(
614 profile=self._profile,
615 storage_provider=storage_provider,
616 credentials_provider=bundle.credentials_provider,
617 metadata_provider=bundle.metadata_provider,
618 cache_config=cache_config,
619 cache_manager=cache_manager,
620 retry_config=retry_config,
621 telemetry_provider=self._telemetry_provider,
622 replicas=bundle.replicas,
623 autocommit_config=autocommit_config,
624 )
625
626
627class PathMapping:
628 """
629 Class to handle path mappings defined in the MSC configuration.
630
631 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
632 where entries are sorted by prefix length (longest first) for optimal matching.
633 Longer paths take precedence when matching.
634 """
635
636 def __init__(self):
637 """Initialize an empty PathMapping."""
638 self._mapping = defaultdict(lambda: defaultdict(list))
639
640 @classmethod
641 def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
642 """
643 Create a PathMapping instance from configuration dictionary.
644
645 :param config_dict: Configuration dictionary, if None the config will be loaded
646 :return: A PathMapping instance with processed mappings
647 """
648 if config_dict is None:
649 # Import locally to avoid circular imports
650 from multistorageclient.config import StorageClientConfig
651
652 config_dict, _ = StorageClientConfig.read_msc_config()
653
654 if not config_dict:
655 return cls()
656
657 instance = cls()
658 instance._load_mapping(config_dict)
659 return instance
660
661 def _load_mapping(self, config_dict: dict[str, Any]) -> None:
662 """
663 Load path mapping from a configuration dictionary.
664
665 :param config_dict: Configuration dictionary containing path mapping
666 """
667 # Get the path_mapping section
668 path_mapping = config_dict.get("path_mapping", {})
669 if path_mapping is None:
670 return
671
672 # Process each mapping
673 for source_path, dest_path in path_mapping.items():
674 # Validate format
675 if not source_path.endswith("/"):
676 continue
677 if not dest_path.startswith(MSC_PROTOCOL):
678 continue
679 if not dest_path.endswith("/"):
680 continue
681
682 # Extract the destination profile
683 pr_dest = urlparse(dest_path)
684 dest_profile = pr_dest.netloc
685
686 # Parse the source path
687 pr = urlparse(source_path)
688 protocol = pr.scheme.lower() if pr.scheme else "file"
689
690 if protocol == "file" or source_path.startswith("/"):
691 # For file or absolute paths, use the whole path as the prefix
692 # and leave bucket empty
693 bucket = ""
694 prefix = source_path if source_path.startswith("/") else pr.path
695 else:
696 # For object storage, extract bucket and prefix
697 bucket = pr.netloc
698 prefix = pr.path
699 if prefix.startswith("/"):
700 prefix = prefix[1:]
701
702 # Add the mapping to the nested dict
703 self._mapping[protocol][bucket].append((prefix, dest_profile))
704
705 # Sort each bucket's prefixes by length (longest first) for optimal matching
706 for protocol, buckets in self._mapping.items():
707 for bucket, prefixes in buckets.items():
708 self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
709
710 def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
711 """
712 Find the best matching mapping for the given URL.
713
714 :param url: URL to find matching mapping for
715 :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
716 """
717 # Parse the URL
718 pr = urlparse(url)
719 protocol = pr.scheme.lower() if pr.scheme else "file"
720
721 # For file paths or absolute paths
722 if protocol == "file" or url.startswith("/"):
723 path = url if url.startswith("/") else pr.path
724
725 possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
726
727 # Check each prefix (already sorted by length, longest first)
728 for prefix, profile in possible_mapping:
729 if path.startswith(prefix):
730 # Calculate the relative path
731 rel_path = path[len(prefix) :]
732 if not rel_path.startswith("/"):
733 rel_path = "/" + rel_path
734 return profile, rel_path
735
736 return None
737
738 # For object storage
739 bucket = pr.netloc
740 path = pr.path
741 if path.startswith("/"):
742 path = path[1:]
743
744 # Check bucket-specific mapping
745 possible_mapping = (
746 self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
747 )
748
749 # Check each prefix (already sorted by length, longest first)
750 for prefix, profile in possible_mapping:
751 # matching prefix
752 if path.startswith(prefix):
753 rel_path = path[len(prefix) :]
754 # Remove leading slash if present
755 if rel_path.startswith("/"):
756 rel_path = rel_path[1:]
757
758 return profile, rel_path
759
760 return None
761
762
[docs]
763class StorageClientConfig:
764 """
765 Configuration class for the :py:class:`multistorageclient.StorageClient`.
766 """
767
768 profile: str
769 storage_provider: StorageProvider
770 credentials_provider: Optional[CredentialsProvider]
771 metadata_provider: Optional[MetadataProvider]
772 cache_config: Optional[CacheConfig]
773 cache_manager: Optional[CacheManager]
774 retry_config: Optional[RetryConfig]
775 telemetry_provider: Optional[Callable[[], Telemetry]]
776 replicas: list[Replica]
777 autocommit_config: Optional[AutoCommitConfig]
778
779 _config_dict: Optional[dict[str, Any]]
780
781 def __init__(
782 self,
783 profile: str,
784 storage_provider: StorageProvider,
785 credentials_provider: Optional[CredentialsProvider] = None,
786 metadata_provider: Optional[MetadataProvider] = None,
787 cache_config: Optional[CacheConfig] = None,
788 cache_manager: Optional[CacheManager] = None,
789 retry_config: Optional[RetryConfig] = None,
790 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
791 replicas: Optional[list[Replica]] = None,
792 autocommit_config: Optional[AutoCommitConfig] = None,
793 ):
794 if replicas is None:
795 replicas = []
796 self.profile = profile
797 self.storage_provider = storage_provider
798 self.credentials_provider = credentials_provider
799 self.metadata_provider = metadata_provider
800 self.cache_config = cache_config
801 self.cache_manager = cache_manager
802 self.retry_config = retry_config
803 self.telemetry_provider = telemetry_provider
804 self.replicas = replicas
805 self.autocommit_config = autocommit_config
806
[docs]
807 @staticmethod
808 def from_json(
809 config_json: str,
810 profile: str = DEFAULT_POSIX_PROFILE_NAME,
811 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
812 ) -> "StorageClientConfig":
813 """
814 Load a storage client configuration from a JSON string.
815
816 :param config_json: Configuration JSON string.
817 :param profile: Profile to use.
818 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
819 """
820 config_dict = json.loads(config_json)
821 return StorageClientConfig.from_dict(
822 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
823 )
824
[docs]
825 @staticmethod
826 def from_yaml(
827 config_yaml: str,
828 profile: str = DEFAULT_POSIX_PROFILE_NAME,
829 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
830 ) -> "StorageClientConfig":
831 """
832 Load a storage client configuration from a YAML string.
833
834 :param config_yaml: Configuration YAML string.
835 :param profile: Profile to use.
836 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
837 """
838 config_dict = yaml.safe_load(config_yaml)
839 return StorageClientConfig.from_dict(
840 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
841 )
842
[docs]
843 @staticmethod
844 def from_dict(
845 config_dict: dict[str, Any],
846 profile: str = DEFAULT_POSIX_PROFILE_NAME,
847 skip_validation: bool = False,
848 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
849 ) -> "StorageClientConfig":
850 """
851 Load a storage client configuration from a Python dictionary.
852
853 :param config_dict: Configuration Python dictionary.
854 :param profile: Profile to use.
855 :param skip_validation: Skip configuration schema validation.
856 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
857 """
858 # Validate the config file with predefined JSON schema
859 if not skip_validation:
860 validate_config(config_dict)
861
862 # Load config
863 loader = StorageClientConfigLoader(
864 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
865 )
866 config = loader.build_config()
867 config._config_dict = config_dict
868
869 return config
870
[docs]
871 @staticmethod
872 def from_file(
873 config_file_paths: Optional[Iterable[str]] = None,
874 profile: str = DEFAULT_POSIX_PROFILE_NAME,
875 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
876 ) -> "StorageClientConfig":
877 """
878 Load a storage client configuration from the first file found.
879
880 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`).
881 :param profile: Profile to use.
882 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
883 """
884 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths)
885 # Parse rclone config file.
886 rclone_config_dict, rclone_config_file = read_rclone_config()
887
888 # Merge config files.
889 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict)
890 if conflicted_keys:
891 raise ValueError(
892 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}'
893 )
894 merged_profiles = merged_config.get("profiles", {})
895
896 # Check if profile is in merged_profiles
897 if profile in merged_profiles:
898 return StorageClientConfig.from_dict(
899 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider
900 )
901 else:
902 # Check if profile is the default profile or an implicit profile
903 if profile == DEFAULT_POSIX_PROFILE_NAME:
904 implicit_profile_config = DEFAULT_POSIX_PROFILE
905 elif profile.startswith("_"):
906 # Handle implicit profiles
907 parts = profile[1:].split("-", 1)
908 if len(parts) == 2:
909 protocol, bucket = parts
910 # Verify it's a supported protocol
911 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS:
912 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"')
913 implicit_profile_config = create_implicit_profile_config(
914 profile_name=profile, protocol=protocol, base_path=bucket
915 )
916 else:
917 raise ValueError(f'Invalid implicit profile format: "{profile}"')
918 else:
919 raise ValueError(
920 f'Profile "{profile}" not found in configuration files. Configuration was checked in '
921 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. "
922 f"Please verify that the profile exists and that configuration files are correctly located."
923 )
924 # merge the implicit profile config into the merged config so the cache & observability config can be inherited
925 if "profiles" not in merged_config:
926 merged_config["profiles"] = implicit_profile_config["profiles"]
927 else:
928 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile]
929 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_"
930 return StorageClientConfig.from_dict(
931 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider
932 )
933
934 @staticmethod
935 def from_provider_bundle(
936 config_dict: dict[str, Any],
937 provider_bundle: ProviderBundle,
938 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
939 ) -> "StorageClientConfig":
940 loader = StorageClientConfigLoader(
941 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider
942 )
943 config = loader.build_config()
944 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors
945 return config
946
[docs]
947 @staticmethod
948 def read_msc_config(
949 config_file_paths: Optional[Iterable[str]] = None,
950 ) -> tuple[Optional[dict[str, Any]], Optional[str]]:
951 """Get the MSC configuration dictionary and the path of the first file found.
952
953 If no config paths are specified, configs are searched in the following order:
954
955 1. ``MSC_CONFIG`` environment variable (highest precedence)
956 2. Standard search paths (user-specified config and system-wide config)
957
958 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used.
959 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration
960 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute
961 path of the config file used, or ``None`` if no config file was found.
962 """
963 config_dict: dict[str, Any] = {}
964 config_file_path: Optional[str] = None
965
966 config_file_paths = list(config_file_paths or [])
967
968 # Add default paths if none provided.
969 if len(config_file_paths) == 0:
970 # Environment variable.
971 msc_config_env = os.getenv("MSC_CONFIG", None)
972 if msc_config_env is not None:
973 config_file_paths.append(msc_config_env)
974
975 # Standard search paths.
976 config_file_paths.extend(_find_config_file_paths())
977
978 # Normalize + absolutize paths.
979 config_file_paths = [os.path.abspath(path) for path in config_file_paths]
980
981 # Log plan.
982 logger.debug(f"Searching MSC config file paths: {config_file_paths}")
983
984 # Load config.
985 for path in config_file_paths:
986 if os.path.exists(path):
987 try:
988 with open(path) as f:
989 if path.endswith(".json"):
990 config_dict = json.load(f)
991 else:
992 config_dict = yaml.safe_load(f)
993 config_file_path = path
994 # Use the first config file.
995 break
996 except Exception as e:
997 raise ValueError(f"malformed MSC config file: {path}, exception: {e}")
998
999 # Log result.
1000 if config_file_path is None:
1001 logger.debug("No MSC config files found in any of the search locations.")
1002 else:
1003 logger.info(f"Using MSC config file: {config_file_path}")
1004
1005 if config_dict:
1006 validate_config(config_dict)
1007 return config_dict, config_file_path
1008
[docs]
1009 @staticmethod
1010 def read_path_mapping() -> PathMapping:
1011 """
1012 Get the path mapping defined in the MSC configuration.
1013
1014 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1015 where entries are sorted by prefix length (longest first) for optimal matching.
1016 Longer paths take precedence when matching.
1017
1018 :return: A PathMapping instance with translation mappings
1019 """
1020 try:
1021 return PathMapping.from_config()
1022 except Exception:
1023 # Log the error but continue - this shouldn't stop the application from working
1024 logger.error("Failed to load path_mapping from MSC config")
1025 return PathMapping()
1026
1027 def __getstate__(self) -> dict[str, Any]:
1028 state = self.__dict__.copy()
1029 if not state.get("_config_dict"):
1030 raise ValueError("StorageClientConfig is not serializable")
1031 del state["credentials_provider"]
1032 del state["storage_provider"]
1033 del state["metadata_provider"]
1034 del state["cache_manager"]
1035 del state["replicas"]
1036 return state
1037
1038 def __setstate__(self, state: dict[str, Any]) -> None:
1039 # Presence checked by __getstate__.
1040 config_dict = state["_config_dict"]
1041 loader = StorageClientConfigLoader(
1042 config_dict=config_dict,
1043 profile=state["profile"],
1044 telemetry_provider=state["telemetry_provider"],
1045 )
1046 new_config = loader.build_config()
1047 self.profile = new_config.profile
1048 self.storage_provider = new_config.storage_provider
1049 self.credentials_provider = new_config.credentials_provider
1050 self.metadata_provider = new_config.metadata_provider
1051 self.cache_config = new_config.cache_config
1052 self.cache_manager = new_config.cache_manager
1053 self.retry_config = new_config.retry_config
1054 self.telemetry_provider = new_config.telemetry_provider
1055 self._config_dict = config_dict
1056 self.replicas = new_config.replicas
1057 self.autocommit_config = new_config.autocommit_config