1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import json
17import logging
18import os
19import tempfile
20from collections import defaultdict
21from collections.abc import Sequence
22from pathlib import Path
23from typing import Any, Optional
24from urllib.parse import urlparse
25
26import opentelemetry.metrics as api_metrics
27import yaml
28
29from .cache import DEFAULT_CACHE_SIZE, CacheManager
30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
31from .instrumentation import setup_opentelemetry
32from .providers.manifest_metadata import ManifestMetadataProvider
33from .rclone import read_rclone_config
34from .schema import validate_config
35from .telemetry import Telemetry
36from .telemetry.attributes.base import AttributesProvider
37from .types import (
38 DEFAULT_RETRY_ATTEMPTS,
39 DEFAULT_RETRY_DELAY,
40 MSC_PROTOCOL,
41 AutoCommitConfig,
42 CredentialsProvider,
43 MetadataProvider,
44 ProviderBundle,
45 Replica,
46 RetryConfig,
47 StorageProvider,
48 StorageProviderConfig,
49)
50from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
51
52# Constants related to implicit profiles
53SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
54PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
55 "s3": "s3",
56 "gs": "gcs",
57 "ais": "ais",
58 "file": "file",
59}
60
61_TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING = {
62 "environment_variables": "multistorageclient.telemetry.attributes.environment_variables.EnvironmentVariablesAttributesProvider",
63 "host": "multistorageclient.telemetry.attributes.host.HostAttributesProvider",
64 "msc_config": "multistorageclient.telemetry.attributes.msc_config.MSCConfigAttributesProvider",
65 "process": "multistorageclient.telemetry.attributes.process.ProcessAttributesProvider",
66 "static": "multistorageclient.telemetry.attributes.static.StaticAttributesProvider",
67 "thread": "multistorageclient.telemetry.attributes.thread.ThreadAttributesProvider",
68}
69
70
71# Template for creating implicit profile configurations
72def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
73 """
74 Create a configuration dictionary for an implicit profile.
75
76 :param profile_name: The name of the profile (e.g., "_s3-bucket1")
77 :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
78 :param base_path: The base path (e.g., bucket name) for the storage provider
79
80 :return: A configuration dictionary for the implicit profile
81 """
82 provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
83 return {
84 "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
85 }
86
87
88DEFAULT_POSIX_PROFILE_NAME = "default"
89DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
90
91STORAGE_PROVIDER_MAPPING = {
92 "file": "PosixFileStorageProvider",
93 "s3": "S3StorageProvider",
94 "gcs": "GoogleStorageProvider",
95 "oci": "OracleStorageProvider",
96 "azure": "AzureBlobStorageProvider",
97 "ais": "AIStoreStorageProvider",
98 "s8k": "S8KStorageProvider",
99 "gcs_s3": "GoogleS3StorageProvider",
100}
101
102CREDENTIALS_PROVIDER_MAPPING = {
103 "S3Credentials": "StaticS3CredentialsProvider",
104 "AzureCredentials": "StaticAzureCredentialsProvider",
105 "AISCredentials": "StaticAISCredentialProvider",
106 "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
107}
108
109
110def _find_config_file_paths():
111 """
112 Get configuration file search paths.
113
114 Returns paths in order of precedence:
115
116 1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
117 2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
118 """
119 paths = []
120
121 # 1. User-specific configuration directory
122 xdg_config_home = os.getenv("XDG_CONFIG_HOME")
123
124 if xdg_config_home:
125 paths.extend(
126 [
127 os.path.join(xdg_config_home, "msc", "config.yaml"),
128 os.path.join(xdg_config_home, "msc", "config.json"),
129 ]
130 )
131
132 user_home = os.getenv("HOME")
133
134 if user_home:
135 paths.extend(
136 [
137 os.path.join(user_home, ".msc_config.yaml"),
138 os.path.join(user_home, ".msc_config.json"),
139 os.path.join(user_home, ".config", "msc", "config.yaml"),
140 os.path.join(user_home, ".config", "msc", "config.json"),
141 ]
142 )
143
144 # 2. System-wide configuration directories
145 xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
146 if not xdg_config_dirs:
147 xdg_config_dirs = "/etc/xdg"
148
149 for config_dir in xdg_config_dirs.split(":"):
150 config_dir = config_dir.strip()
151 if config_dir:
152 paths.extend(
153 [
154 os.path.join(config_dir, "msc", "config.yaml"),
155 os.path.join(config_dir, "msc", "config.json"),
156 ]
157 )
158
159 paths.extend(
160 [
161 "/etc/msc_config.yaml",
162 "/etc/msc_config.json",
163 ]
164 )
165
166 return tuple(paths)
167
168
169DEFAULT_MSC_CONFIG_FILE_SEARCH_PATHS = _find_config_file_paths()
170
171PACKAGE_NAME = "multistorageclient"
172
173logger = logging.getLogger(__name__)
174
175
176class SimpleProviderBundle(ProviderBundle):
177 def __init__(
178 self,
179 storage_provider_config: StorageProviderConfig,
180 credentials_provider: Optional[CredentialsProvider] = None,
181 metadata_provider: Optional[MetadataProvider] = None,
182 replicas: Optional[list[Replica]] = None,
183 ):
184 if replicas is None:
185 replicas = []
186
187 self._storage_provider_config = storage_provider_config
188 self._credentials_provider = credentials_provider
189 self._metadata_provider = metadata_provider
190 self._replicas = replicas
191
192 @property
193 def storage_provider_config(self) -> StorageProviderConfig:
194 return self._storage_provider_config
195
196 @property
197 def credentials_provider(self) -> Optional[CredentialsProvider]:
198 return self._credentials_provider
199
200 @property
201 def metadata_provider(self) -> Optional[MetadataProvider]:
202 return self._metadata_provider
203
204 @property
205 def replicas(self) -> list[Replica]:
206 return self._replicas
207
208
209DEFAULT_CACHE_REFRESH_INTERVAL = 300
210
211
212class StorageClientConfigLoader:
213 _provider_bundle: Optional[ProviderBundle]
214 _profiles: dict[str, Any]
215 _profile: str
216 _profile_dict: dict[str, Any]
217 _opentelemetry_dict: Optional[dict[str, Any]]
218 _metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]]
219 _metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]]
220 _metric_attributes_providers: Optional[Sequence[AttributesProvider]]
221 _cache_dict: Optional[dict[str, Any]]
222
223 def __init__(
224 self,
225 config_dict: dict[str, Any],
226 profile: str = DEFAULT_POSIX_PROFILE_NAME,
227 provider_bundle: Optional[ProviderBundle] = None,
228 telemetry: Optional[Telemetry] = None,
229 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] = None,
230 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] = None,
231 metric_attributes_providers: Optional[Sequence[AttributesProvider]] = None,
232 ) -> None:
233 """
234 Initializes a :py:class:`StorageClientConfigLoader` to create a
235 StorageClientConfig. Components are built using the ``config_dict`` and
236 profile, but a pre-built provider_bundle takes precedence.
237
238 :param config_dict: Dictionary of configuration options.
239 :param profile: Name of profile in ``config_dict`` to use to build configuration.
240 :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle`, takes precedence over ``config_dict``.
241 :param telemetry: Optional telemetry instance to use, takes precedence over ``metric_gauges``, ``metric_counters``, and ``metric_attributes_providers``.
242 :param metric_gauges: Optional metric gauges to use.
243 :param metric_counters: Optional metric counters to use.
244 :param metric_attributes_providers: Optional metric attributes providers to use.
245 """
246 # ProviderBundle takes precedence
247 self._provider_bundle = provider_bundle
248
249 # Interpolates all environment variables into actual values.
250 config_dict = expand_env_vars(config_dict)
251
252 self._profiles = config_dict.get("profiles", {})
253
254 if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
255 # Assign the default POSIX profile
256 self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
257 else:
258 # Cannot override default POSIX profile
259 storage_provider_type = (
260 self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
261 )
262 if storage_provider_type != "file":
263 raise ValueError(
264 f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
265 f'"{storage_provider_type}"; expected "file".'
266 )
267
268 profile_dict = self._profiles.get(profile)
269
270 if not profile_dict:
271 raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
272
273 self._profile = profile
274 self._profile_dict = profile_dict
275
276 self._opentelemetry_dict = config_dict.get("opentelemetry", None)
277
278 self._metric_gauges = metric_gauges
279 self._metric_counters = metric_counters
280 self._metric_attributes_providers = metric_attributes_providers
281 if self._opentelemetry_dict is not None:
282 if "metrics" in self._opentelemetry_dict:
283 if telemetry is not None:
284 self._metric_gauges = {}
285 for name in Telemetry.GaugeName:
286 gauge = telemetry.gauge(config=self._opentelemetry_dict["metrics"], name=name)
287 if gauge is not None:
288 self._metric_gauges[name] = gauge
289 self._metric_counters = {}
290 for name in Telemetry.CounterName:
291 counter = telemetry.counter(config=self._opentelemetry_dict["metrics"], name=name)
292 if counter is not None:
293 self._metric_counters[name] = counter
294
295 if "attributes" in self._opentelemetry_dict["metrics"]:
296 attributes_providers: list[AttributesProvider] = []
297 attributes_provider_configs: list[dict[str, Any]] = self._opentelemetry_dict["metrics"][
298 "attributes"
299 ]
300 for config in attributes_provider_configs:
301 attributes_provider_type: str = config["type"]
302 attributes_provider_fully_qualified_name = _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING.get(
303 attributes_provider_type, attributes_provider_type
304 )
305 attributes_provider_module_name, attributes_provider_class_name = (
306 attributes_provider_fully_qualified_name.rsplit(".", 1)
307 )
308 cls = import_class(attributes_provider_class_name, attributes_provider_module_name)
309 attributes_provider_options = config.get("options", {})
310 if (
311 attributes_provider_fully_qualified_name
312 == _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING["msc_config"]
313 ):
314 attributes_provider_options["config_dict"] = config_dict
315 attributes_provider: AttributesProvider = cls(**attributes_provider_options)
316 attributes_providers.append(attributes_provider)
317 self._metric_attributes_providers = tuple(attributes_providers)
318 elif not any([metric_gauges, metric_counters, metric_attributes_providers]):
319 # TODO: Remove "beta" from the log once legacy metrics are removed.
320 logger.error("No telemetry instance! Disabling beta metrics.")
321
322 self._cache_dict = config_dict.get("cache", None)
323
324 def _build_storage_provider(
325 self,
326 storage_provider_name: str,
327 storage_options: Optional[dict[str, Any]] = None,
328 credentials_provider: Optional[CredentialsProvider] = None,
329 ) -> StorageProvider:
330 if storage_options is None:
331 storage_options = {}
332 if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
333 raise ValueError(
334 f"Storage provider {storage_provider_name} is not supported. "
335 f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
336 )
337 if credentials_provider:
338 storage_options["credentials_provider"] = credentials_provider
339 if self._metric_gauges is not None:
340 storage_options["metric_gauges"] = self._metric_gauges
341 if self._metric_counters is not None:
342 storage_options["metric_counters"] = self._metric_counters
343 if self._metric_attributes_providers is not None:
344 storage_options["metric_attributes_providers"] = self._metric_attributes_providers
345 class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
346 module_name = ".providers"
347 cls = import_class(class_name, module_name, PACKAGE_NAME)
348 return cls(**storage_options)
349
350 def _build_storage_provider_from_profile(self, storage_provider_profile: str):
351 storage_profile_dict = self._profiles.get(storage_provider_profile)
352 if not storage_profile_dict:
353 raise ValueError(
354 f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
355 )
356
357 # Check if metadata provider is configured for this profile
358 # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
359 local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
360 if local_metadata_provider_dict:
361 raise ValueError(
362 f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
363 )
364
365 # Initialize CredentialsProvider
366 local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
367 local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
368
369 # Initialize StorageProvider
370 local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
371 if local_storage_provider_dict:
372 local_name = local_storage_provider_dict["type"]
373 local_storage_options = local_storage_provider_dict.get("options", {})
374 else:
375 raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
376
377 storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
378 return storage_provider
379
380 def _build_credentials_provider(
381 self,
382 credentials_provider_dict: Optional[dict[str, Any]],
383 storage_options: Optional[dict[str, Any]] = None,
384 ) -> Optional[CredentialsProvider]:
385 """
386 Initializes the CredentialsProvider based on the provided dictionary.
387
388 Args:
389 credentials_provider_dict: Dictionary containing credentials provider configuration
390 storage_options: Storage provider options required by some credentials providers to scope the credentials.
391 """
392 if not credentials_provider_dict:
393 return None
394
395 if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
396 # Fully qualified class path case
397 class_type = credentials_provider_dict["type"]
398 module_name, class_name = class_type.rsplit(".", 1)
399 cls = import_class(class_name, module_name)
400 else:
401 # Mapped class name case
402 class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
403 module_name = ".providers"
404 cls = import_class(class_name, module_name, PACKAGE_NAME)
405
406 # Propagate storage provider options to credentials provider since they may be
407 # required by some credentials providers to scope the credentials.
408 import inspect
409
410 init_params = list(inspect.signature(cls.__init__).parameters)[1:] # skip 'self'
411 options = credentials_provider_dict.get("options", {})
412 if storage_options:
413 for storage_provider_option in storage_options.keys():
414 if storage_provider_option in init_params and storage_provider_option not in options:
415 options[storage_provider_option] = storage_options[storage_provider_option]
416
417 return cls(**options)
418
419 def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
420 # Initialize StorageProvider
421 storage_provider_dict = profile_dict.get("storage_provider", None)
422 if storage_provider_dict:
423 storage_provider_name = storage_provider_dict["type"]
424 storage_options = storage_provider_dict.get("options", {})
425 else:
426 raise ValueError("Missing storage_provider in the config.")
427
428 # Initialize CredentialsProvider
429 # It is prudent to assume that in some cases, the credentials provider
430 # will provide credentials scoped to specific base_path.
431 # So we need to pass the storage_options to the credentials provider.
432 credentials_provider_dict = profile_dict.get("credentials_provider", None)
433 credentials_provider = self._build_credentials_provider(
434 credentials_provider_dict=credentials_provider_dict,
435 storage_options=storage_options,
436 )
437
438 # Initialize MetadataProvider
439 metadata_provider_dict = profile_dict.get("metadata_provider", None)
440 metadata_provider = None
441 if metadata_provider_dict:
442 if metadata_provider_dict["type"] == "manifest":
443 metadata_options = metadata_provider_dict.get("options", {})
444 # If MetadataProvider has a reference to a different storage provider profile
445 storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
446 if storage_provider_profile:
447 storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
448 else:
449 storage_provider = self._build_storage_provider(
450 storage_provider_name, storage_options, credentials_provider
451 )
452
453 metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
454 else:
455 class_type = metadata_provider_dict["type"]
456 if "." not in class_type:
457 raise ValueError(
458 f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
459 )
460 module_name, class_name = class_type.rsplit(".", 1)
461 cls = import_class(class_name, module_name)
462 options = metadata_provider_dict.get("options", {})
463 metadata_provider = cls(**options)
464
465 # Build replicas if configured
466 replicas_config = profile_dict.get("replicas", [])
467 replicas = []
468 if replicas_config:
469 for replica_dict in replicas_config:
470 replicas.append(
471 Replica(
472 replica_profile=replica_dict["replica_profile"],
473 read_priority=replica_dict["read_priority"],
474 )
475 )
476
477 # Sort replicas by read_priority
478 replicas.sort(key=lambda r: r.read_priority)
479
480 return SimpleProviderBundle(
481 storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
482 credentials_provider=credentials_provider,
483 metadata_provider=metadata_provider,
484 replicas=replicas,
485 )
486
487 def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
488 class_type = provider_bundle_dict["type"]
489 module_name, class_name = class_type.rsplit(".", 1)
490 cls = import_class(class_name, module_name)
491 options = provider_bundle_dict.get("options", {})
492 return cls(**options)
493
494 def _build_provider_bundle(self) -> ProviderBundle:
495 if self._provider_bundle:
496 return self._provider_bundle # Return if previously provided.
497
498 # Load 3rd party extension
499 provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
500 if provider_bundle_dict:
501 return self._build_provider_bundle_from_extension(provider_bundle_dict)
502
503 return self._build_provider_bundle_from_config(self._profile_dict)
504
505 def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
506 if "size_mb" in cache_dict:
507 raise ValueError(
508 "The 'size_mb' property is no longer supported. \n"
509 "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
510 "Example configuration:\n"
511 "cache:\n"
512 " size: 500G # Optional: If not specified, default cache size (10GB) will be used\n"
513 " use_etag: true # Optional: If not specified, default cache use_etag (true) will be used\n"
514 " location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
515 " eviction_policy: # Optional: The eviction policy to use\n"
516 " policy: fifo # Optional: The eviction policy to use, default is 'fifo'\n"
517 " refresh_interval: 300 # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
518 )
519
520 def _validate_replicas(self, replicas: list[Replica]) -> None:
521 """
522 Validates that replica profiles do not have their own replicas configuration.
523
524 This prevents circular references where a replica profile could reference
525 another profile that also has replicas, creating an infinite loop.
526
527 :param replicas: The list of Replica objects to validate
528 :raises ValueError: If any replica profile has its own replicas configuration
529 """
530 for replica in replicas:
531 replica_profile_name = replica.replica_profile
532
533 # Check that replica profile is not the same as the current profile
534 if replica_profile_name == self._profile:
535 raise ValueError(
536 f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
537 )
538
539 # Check if the replica profile exists in the configuration
540 if replica_profile_name not in self._profiles:
541 raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
542
543 # Get the replica profile configuration
544 replica_profile_dict = self._profiles[replica_profile_name]
545
546 # Check if the replica profile has its own replicas configuration
547 if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
548 raise ValueError(
549 f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
550 f"This creates a circular reference which is not allowed."
551 )
552
553 def build_config(self) -> "StorageClientConfig":
554 bundle = self._build_provider_bundle()
555
556 # Validate replicas to prevent circular references
557 self._validate_replicas(bundle.replicas)
558
559 storage_provider = self._build_storage_provider(
560 bundle.storage_provider_config.type,
561 bundle.storage_provider_config.options,
562 bundle.credentials_provider,
563 )
564
565 cache_config: Optional[CacheConfig] = None
566 cache_manager: Optional[CacheManager] = None
567
568 if self._cache_dict is not None:
569 tempdir = tempfile.gettempdir()
570 default_location = os.path.join(tempdir, "msc_cache")
571 location = self._cache_dict.get("location", default_location)
572
573 # Check if cache_backend.cache_path is defined
574 cache_backend = self._cache_dict.get("cache_backend", {})
575 cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
576
577 # Warn if both location and cache_backend.cache_path are defined
578 if cache_backend_path and self._cache_dict.get("location") is not None:
579 logger.warning(
580 f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
581 f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
582 )
583 elif cache_backend_path:
584 # Use cache_backend.cache_path only if location is not explicitly defined
585 location = cache_backend_path
586
587 if not Path(location).is_absolute():
588 raise ValueError(f"Cache location must be an absolute path: {location}")
589
590 # Initialize cache_dict with default values
591 cache_dict = self._cache_dict
592
593 # Verify cache config
594 self._verify_cache_config(cache_dict)
595
596 # Initialize eviction policy
597 if "eviction_policy" in cache_dict:
598 policy = cache_dict["eviction_policy"]["policy"].lower()
599 eviction_policy = EvictionPolicyConfig(
600 policy=policy,
601 refresh_interval=cache_dict["eviction_policy"].get(
602 "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
603 ),
604 )
605 else:
606 eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
607
608 # Create cache config from the standardized format
609 cache_config = CacheConfig(
610 size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
611 location=cache_dict.get("location", location),
612 use_etag=cache_dict.get("use_etag", True),
613 eviction_policy=eviction_policy,
614 )
615
616 cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
617
618 # retry options
619 retry_config_dict = self._profile_dict.get("retry", None)
620 if retry_config_dict:
621 attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
622 delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
623 retry_config = RetryConfig(attempts=attempts, delay=delay)
624 else:
625 retry_config = RetryConfig(attempts=DEFAULT_RETRY_ATTEMPTS, delay=DEFAULT_RETRY_DELAY)
626
627 # autocommit options
628 autocommit_config = AutoCommitConfig()
629 autocommit_dict = self._profile_dict.get("autocommit", None)
630 if autocommit_dict:
631 interval_minutes = autocommit_dict.get("interval_minutes", None)
632 at_exit = autocommit_dict.get("at_exit", False)
633 autocommit_config = AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
634
635 # set up OpenTelemetry providers once per process
636 #
637 # TODO: Legacy, need to remove.
638 if self._opentelemetry_dict:
639 setup_opentelemetry(self._opentelemetry_dict)
640
641 return StorageClientConfig(
642 profile=self._profile,
643 storage_provider=storage_provider,
644 credentials_provider=bundle.credentials_provider,
645 metadata_provider=bundle.metadata_provider,
646 cache_config=cache_config,
647 cache_manager=cache_manager,
648 retry_config=retry_config,
649 metric_gauges=self._metric_gauges,
650 metric_counters=self._metric_counters,
651 metric_attributes_providers=self._metric_attributes_providers,
652 replicas=bundle.replicas,
653 autocommit_config=autocommit_config,
654 )
655
656
657class PathMapping:
658 """
659 Class to handle path mappings defined in the MSC configuration.
660
661 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
662 where entries are sorted by prefix length (longest first) for optimal matching.
663 Longer paths take precedence when matching.
664 """
665
666 def __init__(self):
667 """Initialize an empty PathMapping."""
668 self._mapping = defaultdict(lambda: defaultdict(list))
669
670 @classmethod
671 def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
672 """
673 Create a PathMapping instance from configuration dictionary.
674
675 :param config_dict: Configuration dictionary, if None the config will be loaded
676 :return: A PathMapping instance with processed mappings
677 """
678 if config_dict is None:
679 # Import locally to avoid circular imports
680 from multistorageclient.config import StorageClientConfig
681
682 config_dict = StorageClientConfig.read_msc_config()
683
684 if not config_dict:
685 return cls()
686
687 instance = cls()
688 instance._load_mapping(config_dict)
689 return instance
690
691 def _load_mapping(self, config_dict: dict[str, Any]) -> None:
692 """
693 Load path mapping from a configuration dictionary.
694
695 :param config_dict: Configuration dictionary containing path mapping
696 """
697 # Get the path_mapping section
698 path_mapping = config_dict.get("path_mapping", {})
699 if path_mapping is None:
700 return
701
702 # Process each mapping
703 for source_path, dest_path in path_mapping.items():
704 # Validate format
705 if not source_path.endswith("/"):
706 continue
707 if not dest_path.startswith(MSC_PROTOCOL):
708 continue
709 if not dest_path.endswith("/"):
710 continue
711
712 # Extract the destination profile
713 pr_dest = urlparse(dest_path)
714 dest_profile = pr_dest.netloc
715
716 # Parse the source path
717 pr = urlparse(source_path)
718 protocol = pr.scheme.lower() if pr.scheme else "file"
719
720 if protocol == "file" or source_path.startswith("/"):
721 # For file or absolute paths, use the whole path as the prefix
722 # and leave bucket empty
723 bucket = ""
724 prefix = source_path if source_path.startswith("/") else pr.path
725 else:
726 # For object storage, extract bucket and prefix
727 bucket = pr.netloc
728 prefix = pr.path
729 if prefix.startswith("/"):
730 prefix = prefix[1:]
731
732 # Add the mapping to the nested dict
733 self._mapping[protocol][bucket].append((prefix, dest_profile))
734
735 # Sort each bucket's prefixes by length (longest first) for optimal matching
736 for protocol, buckets in self._mapping.items():
737 for bucket, prefixes in buckets.items():
738 self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
739
740 def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
741 """
742 Find the best matching mapping for the given URL.
743
744 :param url: URL to find matching mapping for
745 :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
746 """
747 # Parse the URL
748 pr = urlparse(url)
749 protocol = pr.scheme.lower() if pr.scheme else "file"
750
751 # For file paths or absolute paths
752 if protocol == "file" or url.startswith("/"):
753 path = url if url.startswith("/") else pr.path
754
755 possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
756
757 # Check each prefix (already sorted by length, longest first)
758 for prefix, profile in possible_mapping:
759 if path.startswith(prefix):
760 # Calculate the relative path
761 rel_path = path[len(prefix) :]
762 if not rel_path.startswith("/"):
763 rel_path = "/" + rel_path
764 return profile, rel_path
765
766 return None
767
768 # For object storage
769 bucket = pr.netloc
770 path = pr.path
771 if path.startswith("/"):
772 path = path[1:]
773
774 # Check bucket-specific mapping
775 possible_mapping = (
776 self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
777 )
778
779 # Check each prefix (already sorted by length, longest first)
780 for prefix, profile in possible_mapping:
781 # matching prefix
782 if path.startswith(prefix):
783 rel_path = path[len(prefix) :]
784 # Remove leading slash if present
785 if rel_path.startswith("/"):
786 rel_path = rel_path[1:]
787
788 return profile, rel_path
789
790 return None
791
792
[docs]
793class StorageClientConfig:
794 """
795 Configuration class for the :py:class:`multistorageclient.StorageClient`.
796 """
797
798 profile: str
799 storage_provider: StorageProvider
800 credentials_provider: Optional[CredentialsProvider]
801 metadata_provider: Optional[MetadataProvider]
802 cache_config: Optional[CacheConfig]
803 cache_manager: Optional[CacheManager]
804 retry_config: Optional[RetryConfig]
805 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]]
806 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]]
807 metric_attributes_providers: Optional[Sequence[AttributesProvider]]
808 replicas: list[Replica]
809 autocommit_config: Optional[AutoCommitConfig]
810
811 _config_dict: Optional[dict[str, Any]]
812
813 def __init__(
814 self,
815 profile: str,
816 storage_provider: StorageProvider,
817 credentials_provider: Optional[CredentialsProvider] = None,
818 metadata_provider: Optional[MetadataProvider] = None,
819 cache_config: Optional[CacheConfig] = None,
820 cache_manager: Optional[CacheManager] = None,
821 retry_config: Optional[RetryConfig] = None,
822 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] = None,
823 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] = None,
824 metric_attributes_providers: Optional[Sequence[AttributesProvider]] = None,
825 replicas: Optional[list[Replica]] = None,
826 autocommit_config: Optional[AutoCommitConfig] = None,
827 ):
828 if replicas is None:
829 replicas = []
830 self.profile = profile
831 self.storage_provider = storage_provider
832 self.credentials_provider = credentials_provider
833 self.metadata_provider = metadata_provider
834 self.cache_config = cache_config
835 self.cache_manager = cache_manager
836 self.retry_config = retry_config
837 self.metric_gauges = metric_gauges
838 self.metric_counters = metric_counters
839 self.metric_attributes_providers = metric_attributes_providers
840 self.replicas = replicas
841 self.autocommit_config = autocommit_config
842
843 @staticmethod
844 def from_json(
845 config_json: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None
846 ) -> "StorageClientConfig":
847 config_dict = json.loads(config_json)
848 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry)
849
850 @staticmethod
851 def from_yaml(
852 config_yaml: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None
853 ) -> "StorageClientConfig":
854 config_dict = yaml.safe_load(config_yaml)
855 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry)
856
857 @staticmethod
858 def from_dict(
859 config_dict: dict[str, Any],
860 profile: str = DEFAULT_POSIX_PROFILE_NAME,
861 skip_validation: bool = False,
862 telemetry: Optional[Telemetry] = None,
863 ) -> "StorageClientConfig":
864 # Validate the config file with predefined JSON schema
865 if not skip_validation:
866 validate_config(config_dict)
867
868 # Load config
869 loader = StorageClientConfigLoader(config_dict=config_dict, profile=profile, telemetry=telemetry)
870 config = loader.build_config()
871 config._config_dict = config_dict
872
873 return config
874
875 @staticmethod
876 def from_file(
877 profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None
878 ) -> "StorageClientConfig":
879 msc_config_file = os.getenv("MSC_CONFIG", None)
880
881 # Search config paths
882 if msc_config_file is None:
883 for filename in DEFAULT_MSC_CONFIG_FILE_SEARCH_PATHS:
884 if os.path.exists(filename):
885 msc_config_file = filename
886 break
887
888 msc_config_dict = StorageClientConfig.read_msc_config()
889 # Parse rclone config file.
890 rclone_config_dict, rclone_config_file = read_rclone_config()
891
892 # Merge config files.
893 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict)
894 if conflicted_keys:
895 raise ValueError(
896 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}'
897 )
898 merged_profiles = merged_config.get("profiles", {})
899
900 # Check if profile is in merged_profiles
901 if profile in merged_profiles:
902 return StorageClientConfig.from_dict(config_dict=merged_config, profile=profile, telemetry=telemetry)
903 else:
904 # Check if profile is the default profile or an implicit profile
905 if profile == DEFAULT_POSIX_PROFILE_NAME:
906 implicit_profile_config = DEFAULT_POSIX_PROFILE
907 elif profile.startswith("_"):
908 # Handle implicit profiles
909 parts = profile[1:].split("-", 1)
910 if len(parts) == 2:
911 protocol, bucket = parts
912 # Verify it's a supported protocol
913 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS:
914 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"')
915 implicit_profile_config = create_implicit_profile_config(
916 profile_name=profile, protocol=protocol, base_path=bucket
917 )
918 else:
919 raise ValueError(f'Invalid implicit profile format: "{profile}"')
920 else:
921 raise ValueError(
922 f'Profile "{profile}" not found in configuration files. Configuration was checked in '
923 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. "
924 f"Please verify that the profile exists and that configuration files are correctly located."
925 )
926 # merge the implicit profile config into the merged config so the cache & observability config can be inherited
927 if "profiles" not in merged_config:
928 merged_config["profiles"] = implicit_profile_config["profiles"]
929 else:
930 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile]
931 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_"
932 return StorageClientConfig.from_dict(
933 config_dict=merged_config, profile=profile, skip_validation=True, telemetry=telemetry
934 )
935
936 @staticmethod
937 def from_provider_bundle(
938 config_dict: dict[str, Any], provider_bundle: ProviderBundle, telemetry: Optional[Telemetry] = None
939 ) -> "StorageClientConfig":
940 loader = StorageClientConfigLoader(
941 config_dict=config_dict, provider_bundle=provider_bundle, telemetry=telemetry
942 )
943 config = loader.build_config()
944 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors
945 return config
946
[docs]
947 @staticmethod
948 def read_msc_config() -> Optional[dict[str, Any]]:
949 """Get the MSC configuration dictionary.
950
951 :return: The MSC configuration dictionary or empty dict if no config was found
952 """
953 config_dict = {}
954 config_found = False
955
956 # Check for environment variable first
957 msc_config = os.getenv("MSC_CONFIG", None)
958 if msc_config and os.path.exists(msc_config):
959 try:
960 with open(msc_config) as f:
961 if msc_config.endswith(".json"):
962 config_dict = json.load(f)
963 config_found = True
964 else:
965 config_dict = yaml.safe_load(f)
966 config_found = True
967 except Exception as e:
968 raise ValueError(f"malformed msc config file: {msc_config}, exception: {e}")
969
970 # If not found through environment variable, try standard search paths
971 if not config_found:
972 for path in DEFAULT_MSC_CONFIG_FILE_SEARCH_PATHS:
973 if not os.path.exists(path):
974 continue
975 try:
976 with open(path) as f:
977 if path.endswith(".json"):
978 config_dict = json.load(f)
979 config_found = True
980 break
981 else:
982 config_dict = yaml.safe_load(f)
983 config_found = True
984 break
985 except Exception as e:
986 raise ValueError(f"malformed msc config file: {msc_config}, exception: {e}")
987
988 if config_dict:
989 validate_config(config_dict)
990 return config_dict
991
[docs]
992 @staticmethod
993 def read_path_mapping() -> PathMapping:
994 """
995 Get the path mapping defined in the MSC configuration.
996
997 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
998 where entries are sorted by prefix length (longest first) for optimal matching.
999 Longer paths take precedence when matching.
1000
1001 :return: A PathMapping instance with translation mappings
1002 """
1003 try:
1004 return PathMapping.from_config()
1005 except Exception:
1006 # Log the error but continue - this shouldn't stop the application from working
1007 logger.error("Failed to load path_mapping from MSC config")
1008 return PathMapping()
1009
1010 def __getstate__(self) -> dict[str, Any]:
1011 state = self.__dict__.copy()
1012 if not state.get("_config_dict"):
1013 raise ValueError("StorageClientConfig is not serializable")
1014 del state["credentials_provider"]
1015 del state["storage_provider"]
1016 del state["metadata_provider"]
1017 del state["cache_manager"]
1018 del state["replicas"]
1019 return state
1020
1021 def __setstate__(self, state: dict[str, Any]) -> None:
1022 # Presence checked by __getstate__.
1023 config_dict = state["_config_dict"]
1024 loader = StorageClientConfigLoader(
1025 config_dict=config_dict,
1026 profile=state["profile"],
1027 metric_gauges=state["metric_gauges"],
1028 metric_counters=state["metric_counters"],
1029 metric_attributes_providers=state["metric_attributes_providers"],
1030 )
1031 new_config = loader.build_config()
1032 self.profile = new_config.profile
1033 self.storage_provider = new_config.storage_provider
1034 self.credentials_provider = new_config.credentials_provider
1035 self.metadata_provider = new_config.metadata_provider
1036 self.cache_config = new_config.cache_config
1037 self.cache_manager = new_config.cache_manager
1038 self.retry_config = new_config.retry_config
1039 self.metric_gauges = new_config.metric_gauges
1040 self.metric_counters = new_config.metric_counters
1041 self.metric_attributes_providers = new_config.metric_attributes_providers
1042 self._config_dict = config_dict
1043 self.replicas = new_config.replicas
1044 self.autocommit_config = new_config.autocommit_config