1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import json
17import logging
18import os
19import tempfile
20from collections import defaultdict
21from collections.abc import Sequence
22from multiprocessing import current_process
23from pathlib import Path
24from typing import Any, Optional
25from urllib.parse import urlparse
26
27import opentelemetry.metrics as api_metrics
28import yaml
29
30from .cache import DEFAULT_CACHE_SIZE, CacheManager
31from .caching.cache_config import CacheConfig, EvictionPolicyConfig
32from .instrumentation import setup_opentelemetry
33from .providers.manifest_metadata import ManifestMetadataProvider
34from .rclone import read_rclone_config
35from .schema import validate_config
36from .telemetry import Telemetry, TelemetryMode
37from .telemetry import init as telemetry_init
38from .telemetry.attributes.base import AttributesProvider
39from .types import (
40 DEFAULT_RETRY_ATTEMPTS,
41 DEFAULT_RETRY_DELAY,
42 MSC_PROTOCOL,
43 AutoCommitConfig,
44 CredentialsProvider,
45 MetadataProvider,
46 ProviderBundle,
47 Replica,
48 RetryConfig,
49 StorageProvider,
50 StorageProviderConfig,
51)
52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
53
54# Constants related to implicit profiles
55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
57 "s3": "s3",
58 "gs": "gcs",
59 "ais": "ais",
60 "file": "file",
61}
62
63_TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING = {
64 "environment_variables": "multistorageclient.telemetry.attributes.environment_variables.EnvironmentVariablesAttributesProvider",
65 "host": "multistorageclient.telemetry.attributes.host.HostAttributesProvider",
66 "msc_config": "multistorageclient.telemetry.attributes.msc_config.MSCConfigAttributesProvider",
67 "process": "multistorageclient.telemetry.attributes.process.ProcessAttributesProvider",
68 "static": "multistorageclient.telemetry.attributes.static.StaticAttributesProvider",
69 "thread": "multistorageclient.telemetry.attributes.thread.ThreadAttributesProvider",
70}
71
72
73# Template for creating implicit profile configurations
74def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
75 """
76 Create a configuration dictionary for an implicit profile.
77
78 :param profile_name: The name of the profile (e.g., "_s3-bucket1")
79 :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
80 :param base_path: The base path (e.g., bucket name) for the storage provider
81
82 :return: A configuration dictionary for the implicit profile
83 """
84 provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
85 return {
86 "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
87 }
88
89
90DEFAULT_POSIX_PROFILE_NAME = "default"
91DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
92
93STORAGE_PROVIDER_MAPPING = {
94 "file": "PosixFileStorageProvider",
95 "s3": "S3StorageProvider",
96 "gcs": "GoogleStorageProvider",
97 "oci": "OracleStorageProvider",
98 "azure": "AzureBlobStorageProvider",
99 "ais": "AIStoreStorageProvider",
100 "s8k": "S8KStorageProvider",
101 "gcs_s3": "GoogleS3StorageProvider",
102}
103
104CREDENTIALS_PROVIDER_MAPPING = {
105 "S3Credentials": "StaticS3CredentialsProvider",
106 "AzureCredentials": "StaticAzureCredentialsProvider",
107 "AISCredentials": "StaticAISCredentialProvider",
108 "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
109}
110
111
112def _find_config_file_paths():
113 """
114 Get configuration file search paths.
115
116 Returns paths in order of precedence:
117
118 1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
119 2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
120 """
121 paths = []
122
123 # 1. User-specific configuration directory
124 xdg_config_home = os.getenv("XDG_CONFIG_HOME")
125
126 if xdg_config_home:
127 paths.extend(
128 [
129 os.path.join(xdg_config_home, "msc", "config.yaml"),
130 os.path.join(xdg_config_home, "msc", "config.json"),
131 ]
132 )
133
134 user_home = os.getenv("HOME")
135
136 if user_home:
137 paths.extend(
138 [
139 os.path.join(user_home, ".msc_config.yaml"),
140 os.path.join(user_home, ".msc_config.json"),
141 os.path.join(user_home, ".config", "msc", "config.yaml"),
142 os.path.join(user_home, ".config", "msc", "config.json"),
143 ]
144 )
145
146 # 2. System-wide configuration directories
147 xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
148 if not xdg_config_dirs:
149 xdg_config_dirs = "/etc/xdg"
150
151 for config_dir in xdg_config_dirs.split(":"):
152 config_dir = config_dir.strip()
153 if config_dir:
154 paths.extend(
155 [
156 os.path.join(config_dir, "msc", "config.yaml"),
157 os.path.join(config_dir, "msc", "config.json"),
158 ]
159 )
160
161 paths.extend(
162 [
163 "/etc/msc_config.yaml",
164 "/etc/msc_config.json",
165 ]
166 )
167
168 return tuple(paths)
169
170
171PACKAGE_NAME = "multistorageclient"
172
173logger = logging.getLogger(__name__)
174
175
176class SimpleProviderBundle(ProviderBundle):
177 def __init__(
178 self,
179 storage_provider_config: StorageProviderConfig,
180 credentials_provider: Optional[CredentialsProvider] = None,
181 metadata_provider: Optional[MetadataProvider] = None,
182 replicas: Optional[list[Replica]] = None,
183 ):
184 if replicas is None:
185 replicas = []
186
187 self._storage_provider_config = storage_provider_config
188 self._credentials_provider = credentials_provider
189 self._metadata_provider = metadata_provider
190 self._replicas = replicas
191
192 @property
193 def storage_provider_config(self) -> StorageProviderConfig:
194 return self._storage_provider_config
195
196 @property
197 def credentials_provider(self) -> Optional[CredentialsProvider]:
198 return self._credentials_provider
199
200 @property
201 def metadata_provider(self) -> Optional[MetadataProvider]:
202 return self._metadata_provider
203
204 @property
205 def replicas(self) -> list[Replica]:
206 return self._replicas
207
208
209DEFAULT_CACHE_REFRESH_INTERVAL = 300
210
211
212class StorageClientConfigLoader:
213 _provider_bundle: Optional[ProviderBundle]
214 _profiles: dict[str, Any]
215 _profile: str
216 _profile_dict: dict[str, Any]
217 _opentelemetry_dict: Optional[dict[str, Any]]
218 _metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]]
219 _metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]]
220 _metric_attributes_providers: Optional[Sequence[AttributesProvider]]
221 _cache_dict: Optional[dict[str, Any]]
222
223 def __init__(
224 self,
225 config_dict: dict[str, Any],
226 profile: str = DEFAULT_POSIX_PROFILE_NAME,
227 provider_bundle: Optional[ProviderBundle] = None,
228 telemetry: Optional[Telemetry] = None,
229 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] = None,
230 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] = None,
231 metric_attributes_providers: Optional[Sequence[AttributesProvider]] = None,
232 ) -> None:
233 """
234 Initializes a :py:class:`StorageClientConfigLoader` to create a
235 StorageClientConfig. Components are built using the ``config_dict`` and
236 profile, but a pre-built provider_bundle takes precedence.
237
238 :param config_dict: Dictionary of configuration options.
239 :param profile: Name of profile in ``config_dict`` to use to build configuration.
240 :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle`, takes precedence over ``config_dict``.
241 :param telemetry: Optional telemetry instance to use, takes precedence over ``metric_gauges``, ``metric_counters``, and ``metric_attributes_providers``.
242 :param metric_gauges: Optional metric gauges to use.
243 :param metric_counters: Optional metric counters to use.
244 :param metric_attributes_providers: Optional metric attributes providers to use.
245 """
246 # ProviderBundle takes precedence
247 self._provider_bundle = provider_bundle
248
249 # Interpolates all environment variables into actual values.
250 config_dict = expand_env_vars(config_dict)
251
252 self._profiles = config_dict.get("profiles", {})
253
254 if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
255 # Assign the default POSIX profile
256 self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
257 else:
258 # Cannot override default POSIX profile
259 storage_provider_type = (
260 self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
261 )
262 if storage_provider_type != "file":
263 raise ValueError(
264 f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
265 f'"{storage_provider_type}"; expected "file".'
266 )
267
268 profile_dict = self._profiles.get(profile)
269
270 if not profile_dict:
271 raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
272
273 self._profile = profile
274 self._profile_dict = profile_dict
275
276 self._opentelemetry_dict = config_dict.get("opentelemetry", None)
277
278 self._metric_gauges = metric_gauges
279 self._metric_counters = metric_counters
280 self._metric_attributes_providers = metric_attributes_providers
281 if self._opentelemetry_dict is not None:
282 if telemetry is None:
283 try:
284 # Try to create a telemetry instance with the default mode heuristic.
285 telemetry = telemetry_init()
286 except (AssertionError, OSError, RuntimeError, ValueError):
287 try:
288 if current_process().daemon:
289 # Daemons can't have child processes.
290 #
291 # Try to create a telemetry instance in local mode.
292 #
293 # ⚠️ This may cause CPU contention if the current process is compute-intensive
294 # and a high collect and/or export frequency is used due to global interpreter lock (GIL).
295 telemetry = telemetry_init(mode=TelemetryMode.LOCAL)
296 else:
297 # Try to create a telemetry instance in server mode.
298 telemetry = telemetry_init(mode=TelemetryMode.SERVER)
299 except (AssertionError, OSError, RuntimeError, ValueError):
300 # Don't throw on telemetry init failures.
301 logger.error("Failed to automatically create telemetry instance!", exc_info=True)
302
303 if "metrics" in self._opentelemetry_dict:
304 if telemetry is not None:
305 self._metric_gauges = {}
306 for name in Telemetry.GaugeName:
307 gauge = telemetry.gauge(config=self._opentelemetry_dict["metrics"], name=name)
308 if gauge is not None:
309 self._metric_gauges[name] = gauge
310 self._metric_counters = {}
311 for name in Telemetry.CounterName:
312 counter = telemetry.counter(config=self._opentelemetry_dict["metrics"], name=name)
313 if counter is not None:
314 self._metric_counters[name] = counter
315
316 if "attributes" in self._opentelemetry_dict["metrics"]:
317 attributes_providers: list[AttributesProvider] = []
318 attributes_provider_configs: list[dict[str, Any]] = self._opentelemetry_dict["metrics"][
319 "attributes"
320 ]
321 for config in attributes_provider_configs:
322 attributes_provider_type: str = config["type"]
323 attributes_provider_fully_qualified_name = _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING.get(
324 attributes_provider_type, attributes_provider_type
325 )
326 attributes_provider_module_name, attributes_provider_class_name = (
327 attributes_provider_fully_qualified_name.rsplit(".", 1)
328 )
329 cls = import_class(attributes_provider_class_name, attributes_provider_module_name)
330 attributes_provider_options = config.get("options", {})
331 if (
332 attributes_provider_fully_qualified_name
333 == _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING["msc_config"]
334 ):
335 attributes_provider_options["config_dict"] = config_dict
336 attributes_provider: AttributesProvider = cls(**attributes_provider_options)
337 attributes_providers.append(attributes_provider)
338 self._metric_attributes_providers = tuple(attributes_providers)
339 elif not any([metric_gauges, metric_counters, metric_attributes_providers]):
340 # TODO: Remove "beta" from the log once legacy metrics are removed.
341 logger.error("No telemetry instance! Disabling beta metrics.")
342
343 self._cache_dict = config_dict.get("cache", None)
344
345 def _build_storage_provider(
346 self,
347 storage_provider_name: str,
348 storage_options: Optional[dict[str, Any]] = None,
349 credentials_provider: Optional[CredentialsProvider] = None,
350 ) -> StorageProvider:
351 if storage_options is None:
352 storage_options = {}
353 if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
354 raise ValueError(
355 f"Storage provider {storage_provider_name} is not supported. "
356 f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
357 )
358 if credentials_provider:
359 storage_options["credentials_provider"] = credentials_provider
360 if self._metric_gauges is not None:
361 storage_options["metric_gauges"] = self._metric_gauges
362 if self._metric_counters is not None:
363 storage_options["metric_counters"] = self._metric_counters
364 if self._metric_attributes_providers is not None:
365 storage_options["metric_attributes_providers"] = self._metric_attributes_providers
366 class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
367 module_name = ".providers"
368 cls = import_class(class_name, module_name, PACKAGE_NAME)
369 return cls(**storage_options)
370
371 def _build_storage_provider_from_profile(self, storage_provider_profile: str):
372 storage_profile_dict = self._profiles.get(storage_provider_profile)
373 if not storage_profile_dict:
374 raise ValueError(
375 f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
376 )
377
378 # Check if metadata provider is configured for this profile
379 # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
380 local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
381 if local_metadata_provider_dict:
382 raise ValueError(
383 f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
384 )
385
386 # Initialize CredentialsProvider
387 local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
388 local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
389
390 # Initialize StorageProvider
391 local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
392 if local_storage_provider_dict:
393 local_name = local_storage_provider_dict["type"]
394 local_storage_options = local_storage_provider_dict.get("options", {})
395 else:
396 raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
397
398 storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
399 return storage_provider
400
401 def _build_credentials_provider(
402 self,
403 credentials_provider_dict: Optional[dict[str, Any]],
404 storage_options: Optional[dict[str, Any]] = None,
405 ) -> Optional[CredentialsProvider]:
406 """
407 Initializes the CredentialsProvider based on the provided dictionary.
408
409 Args:
410 credentials_provider_dict: Dictionary containing credentials provider configuration
411 storage_options: Storage provider options required by some credentials providers to scope the credentials.
412 """
413 if not credentials_provider_dict:
414 return None
415
416 if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
417 # Fully qualified class path case
418 class_type = credentials_provider_dict["type"]
419 module_name, class_name = class_type.rsplit(".", 1)
420 cls = import_class(class_name, module_name)
421 else:
422 # Mapped class name case
423 class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
424 module_name = ".providers"
425 cls = import_class(class_name, module_name, PACKAGE_NAME)
426
427 # Propagate storage provider options to credentials provider since they may be
428 # required by some credentials providers to scope the credentials.
429 import inspect
430
431 init_params = list(inspect.signature(cls.__init__).parameters)[1:] # skip 'self'
432 options = credentials_provider_dict.get("options", {})
433 if storage_options:
434 for storage_provider_option in storage_options.keys():
435 if storage_provider_option in init_params and storage_provider_option not in options:
436 options[storage_provider_option] = storage_options[storage_provider_option]
437
438 return cls(**options)
439
440 def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
441 # Initialize StorageProvider
442 storage_provider_dict = profile_dict.get("storage_provider", None)
443 if storage_provider_dict:
444 storage_provider_name = storage_provider_dict["type"]
445 storage_options = storage_provider_dict.get("options", {})
446 else:
447 raise ValueError("Missing storage_provider in the config.")
448
449 # Initialize CredentialsProvider
450 # It is prudent to assume that in some cases, the credentials provider
451 # will provide credentials scoped to specific base_path.
452 # So we need to pass the storage_options to the credentials provider.
453 credentials_provider_dict = profile_dict.get("credentials_provider", None)
454 credentials_provider = self._build_credentials_provider(
455 credentials_provider_dict=credentials_provider_dict,
456 storage_options=storage_options,
457 )
458
459 # Initialize MetadataProvider
460 metadata_provider_dict = profile_dict.get("metadata_provider", None)
461 metadata_provider = None
462 if metadata_provider_dict:
463 if metadata_provider_dict["type"] == "manifest":
464 metadata_options = metadata_provider_dict.get("options", {})
465 # If MetadataProvider has a reference to a different storage provider profile
466 storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
467 if storage_provider_profile:
468 storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
469 else:
470 storage_provider = self._build_storage_provider(
471 storage_provider_name, storage_options, credentials_provider
472 )
473
474 metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
475 else:
476 class_type = metadata_provider_dict["type"]
477 if "." not in class_type:
478 raise ValueError(
479 f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
480 )
481 module_name, class_name = class_type.rsplit(".", 1)
482 cls = import_class(class_name, module_name)
483 options = metadata_provider_dict.get("options", {})
484 metadata_provider = cls(**options)
485
486 # Build replicas if configured
487 replicas_config = profile_dict.get("replicas", [])
488 replicas = []
489 if replicas_config:
490 for replica_dict in replicas_config:
491 replicas.append(
492 Replica(
493 replica_profile=replica_dict["replica_profile"],
494 read_priority=replica_dict["read_priority"],
495 )
496 )
497
498 # Sort replicas by read_priority
499 replicas.sort(key=lambda r: r.read_priority)
500
501 return SimpleProviderBundle(
502 storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
503 credentials_provider=credentials_provider,
504 metadata_provider=metadata_provider,
505 replicas=replicas,
506 )
507
508 def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
509 class_type = provider_bundle_dict["type"]
510 module_name, class_name = class_type.rsplit(".", 1)
511 cls = import_class(class_name, module_name)
512 options = provider_bundle_dict.get("options", {})
513 return cls(**options)
514
515 def _build_provider_bundle(self) -> ProviderBundle:
516 if self._provider_bundle:
517 return self._provider_bundle # Return if previously provided.
518
519 # Load 3rd party extension
520 provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
521 if provider_bundle_dict:
522 return self._build_provider_bundle_from_extension(provider_bundle_dict)
523
524 return self._build_provider_bundle_from_config(self._profile_dict)
525
526 def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
527 if "size_mb" in cache_dict:
528 raise ValueError(
529 "The 'size_mb' property is no longer supported. \n"
530 "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
531 "Example configuration:\n"
532 "cache:\n"
533 " size: 500G # Optional: If not specified, default cache size (10GB) will be used\n"
534 " use_etag: true # Optional: If not specified, default cache use_etag (true) will be used\n"
535 " location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
536 " eviction_policy: # Optional: The eviction policy to use\n"
537 " policy: fifo # Optional: The eviction policy to use, default is 'fifo'\n"
538 " refresh_interval: 300 # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
539 )
540
541 def _validate_replicas(self, replicas: list[Replica]) -> None:
542 """
543 Validates that replica profiles do not have their own replicas configuration.
544
545 This prevents circular references where a replica profile could reference
546 another profile that also has replicas, creating an infinite loop.
547
548 :param replicas: The list of Replica objects to validate
549 :raises ValueError: If any replica profile has its own replicas configuration
550 """
551 for replica in replicas:
552 replica_profile_name = replica.replica_profile
553
554 # Check that replica profile is not the same as the current profile
555 if replica_profile_name == self._profile:
556 raise ValueError(
557 f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
558 )
559
560 # Check if the replica profile exists in the configuration
561 if replica_profile_name not in self._profiles:
562 raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
563
564 # Get the replica profile configuration
565 replica_profile_dict = self._profiles[replica_profile_name]
566
567 # Check if the replica profile has its own replicas configuration
568 if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
569 raise ValueError(
570 f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
571 f"This creates a circular reference which is not allowed."
572 )
573
574 def build_config(self) -> "StorageClientConfig":
575 bundle = self._build_provider_bundle()
576
577 # Validate replicas to prevent circular references
578 self._validate_replicas(bundle.replicas)
579
580 storage_provider = self._build_storage_provider(
581 bundle.storage_provider_config.type,
582 bundle.storage_provider_config.options,
583 bundle.credentials_provider,
584 )
585
586 cache_config: Optional[CacheConfig] = None
587 cache_manager: Optional[CacheManager] = None
588
589 # Check if caching is enabled for this profile
590 caching_enabled = self._profile_dict.get("caching_enabled", False)
591
592 if self._cache_dict is not None and caching_enabled:
593 tempdir = tempfile.gettempdir()
594 default_location = os.path.join(tempdir, "msc_cache")
595 location = self._cache_dict.get("location", default_location)
596
597 # Check if cache_backend.cache_path is defined
598 cache_backend = self._cache_dict.get("cache_backend", {})
599 cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
600
601 # Warn if both location and cache_backend.cache_path are defined
602 if cache_backend_path and self._cache_dict.get("location") is not None:
603 logger.warning(
604 f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
605 f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
606 )
607 elif cache_backend_path:
608 # Use cache_backend.cache_path only if location is not explicitly defined
609 location = cache_backend_path
610
611 # Resolve the effective flag while preserving explicit ``False``.
612 if "check_source_version" in self._cache_dict:
613 check_source_version = self._cache_dict["check_source_version"]
614 else:
615 check_source_version = self._cache_dict.get("use_etag", True)
616
617 # Warn if both keys are specified – the new one wins.
618 if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
619 logger.warning(
620 "Both 'check_source_version' and 'use_etag' are defined in cache config. "
621 "Using 'check_source_version' and ignoring 'use_etag'."
622 )
623
624 if not Path(location).is_absolute():
625 raise ValueError(f"Cache location must be an absolute path: {location}")
626
627 # Initialize cache_dict with default values
628 cache_dict = self._cache_dict
629
630 # Verify cache config
631 self._verify_cache_config(cache_dict)
632
633 # Initialize eviction policy
634 if "eviction_policy" in cache_dict:
635 policy = cache_dict["eviction_policy"]["policy"].lower()
636 eviction_policy = EvictionPolicyConfig(
637 policy=policy,
638 refresh_interval=cache_dict["eviction_policy"].get(
639 "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
640 ),
641 )
642 else:
643 eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
644
645 # Create cache config from the standardized format
646 cache_config = CacheConfig(
647 size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
648 location=cache_dict.get("location", location),
649 check_source_version=check_source_version,
650 eviction_policy=eviction_policy,
651 )
652
653 cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
654 elif self._cache_dict is not None and not caching_enabled:
655 logger.debug(f"Caching is disabled for profile '{self._profile}'")
656 elif self._cache_dict is None and caching_enabled:
657 logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
658
659 # retry options
660 retry_config_dict = self._profile_dict.get("retry", None)
661 if retry_config_dict:
662 attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
663 delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
664 retry_config = RetryConfig(attempts=attempts, delay=delay)
665 else:
666 retry_config = RetryConfig(attempts=DEFAULT_RETRY_ATTEMPTS, delay=DEFAULT_RETRY_DELAY)
667
668 # autocommit options
669 autocommit_config = AutoCommitConfig()
670 autocommit_dict = self._profile_dict.get("autocommit", None)
671 if autocommit_dict:
672 interval_minutes = autocommit_dict.get("interval_minutes", None)
673 at_exit = autocommit_dict.get("at_exit", False)
674 autocommit_config = AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
675
676 # set up OpenTelemetry providers once per process
677 #
678 # TODO: Legacy, need to remove.
679 if self._opentelemetry_dict:
680 setup_opentelemetry(self._opentelemetry_dict)
681
682 return StorageClientConfig(
683 profile=self._profile,
684 storage_provider=storage_provider,
685 credentials_provider=bundle.credentials_provider,
686 metadata_provider=bundle.metadata_provider,
687 cache_config=cache_config,
688 cache_manager=cache_manager,
689 retry_config=retry_config,
690 metric_gauges=self._metric_gauges,
691 metric_counters=self._metric_counters,
692 metric_attributes_providers=self._metric_attributes_providers,
693 replicas=bundle.replicas,
694 autocommit_config=autocommit_config,
695 )
696
697
698class PathMapping:
699 """
700 Class to handle path mappings defined in the MSC configuration.
701
702 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
703 where entries are sorted by prefix length (longest first) for optimal matching.
704 Longer paths take precedence when matching.
705 """
706
707 def __init__(self):
708 """Initialize an empty PathMapping."""
709 self._mapping = defaultdict(lambda: defaultdict(list))
710
711 @classmethod
712 def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
713 """
714 Create a PathMapping instance from configuration dictionary.
715
716 :param config_dict: Configuration dictionary, if None the config will be loaded
717 :return: A PathMapping instance with processed mappings
718 """
719 if config_dict is None:
720 # Import locally to avoid circular imports
721 from multistorageclient.config import StorageClientConfig
722
723 config_dict, _ = StorageClientConfig.read_msc_config()
724
725 if not config_dict:
726 return cls()
727
728 instance = cls()
729 instance._load_mapping(config_dict)
730 return instance
731
732 def _load_mapping(self, config_dict: dict[str, Any]) -> None:
733 """
734 Load path mapping from a configuration dictionary.
735
736 :param config_dict: Configuration dictionary containing path mapping
737 """
738 # Get the path_mapping section
739 path_mapping = config_dict.get("path_mapping", {})
740 if path_mapping is None:
741 return
742
743 # Process each mapping
744 for source_path, dest_path in path_mapping.items():
745 # Validate format
746 if not source_path.endswith("/"):
747 continue
748 if not dest_path.startswith(MSC_PROTOCOL):
749 continue
750 if not dest_path.endswith("/"):
751 continue
752
753 # Extract the destination profile
754 pr_dest = urlparse(dest_path)
755 dest_profile = pr_dest.netloc
756
757 # Parse the source path
758 pr = urlparse(source_path)
759 protocol = pr.scheme.lower() if pr.scheme else "file"
760
761 if protocol == "file" or source_path.startswith("/"):
762 # For file or absolute paths, use the whole path as the prefix
763 # and leave bucket empty
764 bucket = ""
765 prefix = source_path if source_path.startswith("/") else pr.path
766 else:
767 # For object storage, extract bucket and prefix
768 bucket = pr.netloc
769 prefix = pr.path
770 if prefix.startswith("/"):
771 prefix = prefix[1:]
772
773 # Add the mapping to the nested dict
774 self._mapping[protocol][bucket].append((prefix, dest_profile))
775
776 # Sort each bucket's prefixes by length (longest first) for optimal matching
777 for protocol, buckets in self._mapping.items():
778 for bucket, prefixes in buckets.items():
779 self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
780
781 def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
782 """
783 Find the best matching mapping for the given URL.
784
785 :param url: URL to find matching mapping for
786 :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
787 """
788 # Parse the URL
789 pr = urlparse(url)
790 protocol = pr.scheme.lower() if pr.scheme else "file"
791
792 # For file paths or absolute paths
793 if protocol == "file" or url.startswith("/"):
794 path = url if url.startswith("/") else pr.path
795
796 possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
797
798 # Check each prefix (already sorted by length, longest first)
799 for prefix, profile in possible_mapping:
800 if path.startswith(prefix):
801 # Calculate the relative path
802 rel_path = path[len(prefix) :]
803 if not rel_path.startswith("/"):
804 rel_path = "/" + rel_path
805 return profile, rel_path
806
807 return None
808
809 # For object storage
810 bucket = pr.netloc
811 path = pr.path
812 if path.startswith("/"):
813 path = path[1:]
814
815 # Check bucket-specific mapping
816 possible_mapping = (
817 self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
818 )
819
820 # Check each prefix (already sorted by length, longest first)
821 for prefix, profile in possible_mapping:
822 # matching prefix
823 if path.startswith(prefix):
824 rel_path = path[len(prefix) :]
825 # Remove leading slash if present
826 if rel_path.startswith("/"):
827 rel_path = rel_path[1:]
828
829 return profile, rel_path
830
831 return None
832
833
[docs]
834class StorageClientConfig:
835 """
836 Configuration class for the :py:class:`multistorageclient.StorageClient`.
837 """
838
839 profile: str
840 storage_provider: StorageProvider
841 credentials_provider: Optional[CredentialsProvider]
842 metadata_provider: Optional[MetadataProvider]
843 cache_config: Optional[CacheConfig]
844 cache_manager: Optional[CacheManager]
845 retry_config: Optional[RetryConfig]
846 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]]
847 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]]
848 metric_attributes_providers: Optional[Sequence[AttributesProvider]]
849 replicas: list[Replica]
850 autocommit_config: Optional[AutoCommitConfig]
851
852 _config_dict: Optional[dict[str, Any]]
853
854 def __init__(
855 self,
856 profile: str,
857 storage_provider: StorageProvider,
858 credentials_provider: Optional[CredentialsProvider] = None,
859 metadata_provider: Optional[MetadataProvider] = None,
860 cache_config: Optional[CacheConfig] = None,
861 cache_manager: Optional[CacheManager] = None,
862 retry_config: Optional[RetryConfig] = None,
863 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] = None,
864 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] = None,
865 metric_attributes_providers: Optional[Sequence[AttributesProvider]] = None,
866 replicas: Optional[list[Replica]] = None,
867 autocommit_config: Optional[AutoCommitConfig] = None,
868 ):
869 if replicas is None:
870 replicas = []
871 self.profile = profile
872 self.storage_provider = storage_provider
873 self.credentials_provider = credentials_provider
874 self.metadata_provider = metadata_provider
875 self.cache_config = cache_config
876 self.cache_manager = cache_manager
877 self.retry_config = retry_config
878 self.metric_gauges = metric_gauges
879 self.metric_counters = metric_counters
880 self.metric_attributes_providers = metric_attributes_providers
881 self.replicas = replicas
882 self.autocommit_config = autocommit_config
883
884 @staticmethod
885 def from_json(
886 config_json: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None
887 ) -> "StorageClientConfig":
888 config_dict = json.loads(config_json)
889 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry)
890
891 @staticmethod
892 def from_yaml(
893 config_yaml: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None
894 ) -> "StorageClientConfig":
895 config_dict = yaml.safe_load(config_yaml)
896 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry)
897
898 @staticmethod
899 def from_dict(
900 config_dict: dict[str, Any],
901 profile: str = DEFAULT_POSIX_PROFILE_NAME,
902 skip_validation: bool = False,
903 telemetry: Optional[Telemetry] = None,
904 ) -> "StorageClientConfig":
905 # Validate the config file with predefined JSON schema
906 if not skip_validation:
907 validate_config(config_dict)
908
909 # Load config
910 loader = StorageClientConfigLoader(config_dict=config_dict, profile=profile, telemetry=telemetry)
911 config = loader.build_config()
912 config._config_dict = config_dict
913
914 return config
915
916 @staticmethod
917 def from_file(
918 profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None
919 ) -> "StorageClientConfig":
920 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config()
921 # Parse rclone config file.
922 rclone_config_dict, rclone_config_file = read_rclone_config()
923
924 # Merge config files.
925 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict)
926 if conflicted_keys:
927 raise ValueError(
928 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}'
929 )
930 merged_profiles = merged_config.get("profiles", {})
931
932 # Check if profile is in merged_profiles
933 if profile in merged_profiles:
934 return StorageClientConfig.from_dict(config_dict=merged_config, profile=profile, telemetry=telemetry)
935 else:
936 # Check if profile is the default profile or an implicit profile
937 if profile == DEFAULT_POSIX_PROFILE_NAME:
938 implicit_profile_config = DEFAULT_POSIX_PROFILE
939 elif profile.startswith("_"):
940 # Handle implicit profiles
941 parts = profile[1:].split("-", 1)
942 if len(parts) == 2:
943 protocol, bucket = parts
944 # Verify it's a supported protocol
945 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS:
946 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"')
947 implicit_profile_config = create_implicit_profile_config(
948 profile_name=profile, protocol=protocol, base_path=bucket
949 )
950 else:
951 raise ValueError(f'Invalid implicit profile format: "{profile}"')
952 else:
953 raise ValueError(
954 f'Profile "{profile}" not found in configuration files. Configuration was checked in '
955 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. "
956 f"Please verify that the profile exists and that configuration files are correctly located."
957 )
958 # merge the implicit profile config into the merged config so the cache & observability config can be inherited
959 if "profiles" not in merged_config:
960 merged_config["profiles"] = implicit_profile_config["profiles"]
961 else:
962 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile]
963 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_"
964 return StorageClientConfig.from_dict(
965 config_dict=merged_config, profile=profile, skip_validation=True, telemetry=telemetry
966 )
967
968 @staticmethod
969 def from_provider_bundle(
970 config_dict: dict[str, Any], provider_bundle: ProviderBundle, telemetry: Optional[Telemetry] = None
971 ) -> "StorageClientConfig":
972 loader = StorageClientConfigLoader(
973 config_dict=config_dict, provider_bundle=provider_bundle, telemetry=telemetry
974 )
975 config = loader.build_config()
976 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors
977 return config
978
[docs]
979 @staticmethod
980 def read_msc_config() -> tuple[Optional[dict[str, Any]], Optional[str]]:
981 """Get the MSC configuration dictionary and the path of the config file used.
982
983 Configs are searched in the following order:
984 1. MSC_CONFIG environment variable (highest precedence)
985 2. Standard search paths (user-specified config and system-wide config)
986
987
988 :return: Tuple of (config_dict, config_file_path). config_dict is the MSC configuration
989 dictionary or empty dict if no config was found. config_file_path is the absolute
990 path of the config file used, or None if no config file was found.
991 """
992 config_dict = {}
993 found_config_files = []
994 used_config_file = None
995
996 # Check for environment variable first
997 msc_config = os.getenv("MSC_CONFIG", None)
998 if msc_config and os.path.exists(msc_config):
999 try:
1000 with open(msc_config) as f:
1001 if msc_config.endswith(".json"):
1002 config_dict = json.load(f)
1003 used_config_file = msc_config
1004 else:
1005 config_dict = yaml.safe_load(f)
1006 used_config_file = msc_config
1007 found_config_files.append(msc_config)
1008 except Exception as e:
1009 raise ValueError(f"malformed msc config file: {msc_config}, exception: {e}")
1010
1011 # Check all standard search paths
1012 for path in _find_config_file_paths():
1013 if os.path.exists(path):
1014 found_config_files.append(path)
1015 # Only load the first found config file (environment variable takes precedence)
1016 if used_config_file is None:
1017 try:
1018 with open(path) as f:
1019 if path.endswith(".json"):
1020 config_dict = json.load(f)
1021 used_config_file = path
1022 else:
1023 config_dict = yaml.safe_load(f)
1024 used_config_file = path
1025 except Exception as e:
1026 raise ValueError(f"malformed msc config file: {path}, exception: {e}")
1027
1028 # Log warnings and info messages
1029 if len(found_config_files) > 1:
1030 found_config_files_str = ", ".join(found_config_files)
1031 logger.warning(f"Multiple MSC config files found: {found_config_files_str}. ")
1032
1033 if len(found_config_files) == 0:
1034 logger.warning("No MSC config files found in any of the search locations.")
1035 else:
1036 logger.info(f"Using MSC config file: {used_config_file}")
1037
1038 if config_dict:
1039 validate_config(config_dict)
1040 return config_dict, used_config_file
1041
[docs]
1042 @staticmethod
1043 def read_path_mapping() -> PathMapping:
1044 """
1045 Get the path mapping defined in the MSC configuration.
1046
1047 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1048 where entries are sorted by prefix length (longest first) for optimal matching.
1049 Longer paths take precedence when matching.
1050
1051 :return: A PathMapping instance with translation mappings
1052 """
1053 try:
1054 return PathMapping.from_config()
1055 except Exception:
1056 # Log the error but continue - this shouldn't stop the application from working
1057 logger.error("Failed to load path_mapping from MSC config")
1058 return PathMapping()
1059
1060 def __getstate__(self) -> dict[str, Any]:
1061 state = self.__dict__.copy()
1062 if not state.get("_config_dict"):
1063 raise ValueError("StorageClientConfig is not serializable")
1064 del state["credentials_provider"]
1065 del state["storage_provider"]
1066 del state["metadata_provider"]
1067 del state["cache_manager"]
1068 del state["replicas"]
1069 return state
1070
1071 def __setstate__(self, state: dict[str, Any]) -> None:
1072 # Presence checked by __getstate__.
1073 config_dict = state["_config_dict"]
1074 loader = StorageClientConfigLoader(
1075 config_dict=config_dict,
1076 profile=state["profile"],
1077 metric_gauges=state["metric_gauges"],
1078 metric_counters=state["metric_counters"],
1079 metric_attributes_providers=state["metric_attributes_providers"],
1080 )
1081 new_config = loader.build_config()
1082 self.profile = new_config.profile
1083 self.storage_provider = new_config.storage_provider
1084 self.credentials_provider = new_config.credentials_provider
1085 self.metadata_provider = new_config.metadata_provider
1086 self.cache_config = new_config.cache_config
1087 self.cache_manager = new_config.cache_manager
1088 self.retry_config = new_config.retry_config
1089 self.metric_gauges = new_config.metric_gauges
1090 self.metric_counters = new_config.metric_counters
1091 self.metric_attributes_providers = new_config.metric_attributes_providers
1092 self._config_dict = config_dict
1093 self.replicas = new_config.replicas
1094 self.autocommit_config = new_config.autocommit_config