1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import json
17import logging
18import os
19import tempfile
20from collections import defaultdict
21from collections.abc import Sequence
22from multiprocessing import current_process
23from pathlib import Path
24from typing import Any, Optional
25from urllib.parse import urlparse
26
27import opentelemetry.metrics as api_metrics
28import yaml
29
30from .cache import DEFAULT_CACHE_SIZE, CacheManager
31from .caching.cache_config import CacheConfig, EvictionPolicyConfig
32from .instrumentation import setup_opentelemetry
33from .providers.manifest_metadata import ManifestMetadataProvider
34from .rclone import read_rclone_config
35from .schema import validate_config
36from .telemetry import Telemetry, TelemetryMode
37from .telemetry import init as telemetry_init
38from .telemetry.attributes.base import AttributesProvider
39from .types import (
40 DEFAULT_RETRY_ATTEMPTS,
41 DEFAULT_RETRY_DELAY,
42 MSC_PROTOCOL,
43 AutoCommitConfig,
44 CredentialsProvider,
45 MetadataProvider,
46 ProviderBundle,
47 Replica,
48 RetryConfig,
49 StorageProvider,
50 StorageProviderConfig,
51)
52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
53
54# Constants related to implicit profiles
55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
57 "s3": "s3",
58 "gs": "gcs",
59 "ais": "ais",
60 "file": "file",
61}
62
63_TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING = {
64 "environment_variables": "multistorageclient.telemetry.attributes.environment_variables.EnvironmentVariablesAttributesProvider",
65 "host": "multistorageclient.telemetry.attributes.host.HostAttributesProvider",
66 "msc_config": "multistorageclient.telemetry.attributes.msc_config.MSCConfigAttributesProvider",
67 "process": "multistorageclient.telemetry.attributes.process.ProcessAttributesProvider",
68 "static": "multistorageclient.telemetry.attributes.static.StaticAttributesProvider",
69 "thread": "multistorageclient.telemetry.attributes.thread.ThreadAttributesProvider",
70}
71
72
73# Template for creating implicit profile configurations
74def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
75 """
76 Create a configuration dictionary for an implicit profile.
77
78 :param profile_name: The name of the profile (e.g., "_s3-bucket1")
79 :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
80 :param base_path: The base path (e.g., bucket name) for the storage provider
81
82 :return: A configuration dictionary for the implicit profile
83 """
84 provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
85 return {
86 "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
87 }
88
89
90DEFAULT_POSIX_PROFILE_NAME = "default"
91DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
92
93STORAGE_PROVIDER_MAPPING = {
94 "file": "PosixFileStorageProvider",
95 "s3": "S3StorageProvider",
96 "gcs": "GoogleStorageProvider",
97 "oci": "OracleStorageProvider",
98 "azure": "AzureBlobStorageProvider",
99 "ais": "AIStoreStorageProvider",
100 "s8k": "S8KStorageProvider",
101 "gcs_s3": "GoogleS3StorageProvider",
102}
103
104CREDENTIALS_PROVIDER_MAPPING = {
105 "S3Credentials": "StaticS3CredentialsProvider",
106 "AzureCredentials": "StaticAzureCredentialsProvider",
107 "AISCredentials": "StaticAISCredentialProvider",
108 "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
109}
110
111
112def _find_config_file_paths():
113 """
114 Get configuration file search paths.
115
116 Returns paths in order of precedence:
117
118 1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
119 2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
120 """
121 paths = []
122
123 # 1. User-specific configuration directory
124 xdg_config_home = os.getenv("XDG_CONFIG_HOME")
125
126 if xdg_config_home:
127 paths.extend(
128 [
129 os.path.join(xdg_config_home, "msc", "config.yaml"),
130 os.path.join(xdg_config_home, "msc", "config.json"),
131 ]
132 )
133
134 user_home = os.getenv("HOME")
135
136 if user_home:
137 paths.extend(
138 [
139 os.path.join(user_home, ".msc_config.yaml"),
140 os.path.join(user_home, ".msc_config.json"),
141 os.path.join(user_home, ".config", "msc", "config.yaml"),
142 os.path.join(user_home, ".config", "msc", "config.json"),
143 ]
144 )
145
146 # 2. System-wide configuration directories
147 xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
148 if not xdg_config_dirs:
149 xdg_config_dirs = "/etc/xdg"
150
151 for config_dir in xdg_config_dirs.split(":"):
152 config_dir = config_dir.strip()
153 if config_dir:
154 paths.extend(
155 [
156 os.path.join(config_dir, "msc", "config.yaml"),
157 os.path.join(config_dir, "msc", "config.json"),
158 ]
159 )
160
161 paths.extend(
162 [
163 "/etc/msc_config.yaml",
164 "/etc/msc_config.json",
165 ]
166 )
167
168 return tuple(paths)
169
170
171PACKAGE_NAME = "multistorageclient"
172
173logger = logging.getLogger(__name__)
174
175
176class SimpleProviderBundle(ProviderBundle):
177 def __init__(
178 self,
179 storage_provider_config: StorageProviderConfig,
180 credentials_provider: Optional[CredentialsProvider] = None,
181 metadata_provider: Optional[MetadataProvider] = None,
182 replicas: Optional[list[Replica]] = None,
183 ):
184 if replicas is None:
185 replicas = []
186
187 self._storage_provider_config = storage_provider_config
188 self._credentials_provider = credentials_provider
189 self._metadata_provider = metadata_provider
190 self._replicas = replicas
191
192 @property
193 def storage_provider_config(self) -> StorageProviderConfig:
194 return self._storage_provider_config
195
196 @property
197 def credentials_provider(self) -> Optional[CredentialsProvider]:
198 return self._credentials_provider
199
200 @property
201 def metadata_provider(self) -> Optional[MetadataProvider]:
202 return self._metadata_provider
203
204 @property
205 def replicas(self) -> list[Replica]:
206 return self._replicas
207
208
209DEFAULT_CACHE_REFRESH_INTERVAL = 300
210
211
212class StorageClientConfigLoader:
213 _provider_bundle: Optional[ProviderBundle]
214 _profiles: dict[str, Any]
215 _profile: str
216 _profile_dict: dict[str, Any]
217 _opentelemetry_dict: Optional[dict[str, Any]]
218 _metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]]
219 _metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]]
220 _metric_attributes_providers: Optional[Sequence[AttributesProvider]]
221 _cache_dict: Optional[dict[str, Any]]
222
223 def __init__(
224 self,
225 config_dict: dict[str, Any],
226 profile: str = DEFAULT_POSIX_PROFILE_NAME,
227 provider_bundle: Optional[ProviderBundle] = None,
228 telemetry: Optional[Telemetry] = None,
229 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] = None,
230 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] = None,
231 metric_attributes_providers: Optional[Sequence[AttributesProvider]] = None,
232 ) -> None:
233 """
234 Initializes a :py:class:`StorageClientConfigLoader` to create a
235 StorageClientConfig. Components are built using the ``config_dict`` and
236 profile, but a pre-built provider_bundle takes precedence.
237
238 :param config_dict: Dictionary of configuration options.
239 :param profile: Name of profile in ``config_dict`` to use to build configuration.
240 :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle`, takes precedence over ``config_dict``.
241 :param telemetry: Optional telemetry instance to use, takes precedence over ``metric_gauges``, ``metric_counters``, and ``metric_attributes_providers``.
242 :param metric_gauges: Optional metric gauges to use.
243 :param metric_counters: Optional metric counters to use.
244 :param metric_attributes_providers: Optional metric attributes providers to use.
245 """
246 # ProviderBundle takes precedence
247 self._provider_bundle = provider_bundle
248
249 # Interpolates all environment variables into actual values.
250 config_dict = expand_env_vars(config_dict)
251
252 self._profiles = config_dict.get("profiles", {})
253
254 if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
255 # Assign the default POSIX profile
256 self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
257 else:
258 # Cannot override default POSIX profile
259 storage_provider_type = (
260 self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
261 )
262 if storage_provider_type != "file":
263 raise ValueError(
264 f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
265 f'"{storage_provider_type}"; expected "file".'
266 )
267
268 profile_dict = self._profiles.get(profile)
269
270 if not profile_dict:
271 raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
272
273 self._profile = profile
274 self._profile_dict = profile_dict
275
276 self._opentelemetry_dict = config_dict.get("opentelemetry", None)
277
278 self._metric_gauges = metric_gauges
279 self._metric_counters = metric_counters
280 self._metric_attributes_providers = metric_attributes_providers
281 if self._opentelemetry_dict is not None:
282 # Try to create a telemetry instance only if no telemetry instance or OpenTelemetry instruments are provided (e.g. when unpickling).
283 #
284 # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
285 # New processes (e.g. multiprocessing manager server) can't be created during this phase.
286 if telemetry is None and not any(
287 (self._metric_gauges, self._metric_counters, self._metric_attributes_providers)
288 ):
289 try:
290 # Try to create a telemetry instance with the default mode heuristic.
291 telemetry = telemetry_init()
292 except (AssertionError, OSError, RuntimeError, ValueError):
293 try:
294 if current_process().daemon:
295 # Daemons can't have child processes.
296 #
297 # Try to create a telemetry instance in local mode.
298 #
299 # ⚠️ This may cause CPU contention if the current process is compute-intensive
300 # and a high collect and/or export frequency is used due to global interpreter lock (GIL).
301 telemetry = telemetry_init(mode=TelemetryMode.LOCAL)
302 else:
303 # Try to create a telemetry instance in server mode.
304 telemetry = telemetry_init(mode=TelemetryMode.SERVER)
305 except (AssertionError, OSError, RuntimeError, ValueError):
306 # Don't throw on telemetry init failures.
307 logger.error("Failed to automatically create telemetry instance!", exc_info=True)
308
309 if "metrics" in self._opentelemetry_dict:
310 if telemetry is not None:
311 self._metric_gauges = {}
312 for name in Telemetry.GaugeName:
313 gauge = telemetry.gauge(config=self._opentelemetry_dict["metrics"], name=name)
314 if gauge is not None:
315 self._metric_gauges[name] = gauge
316 self._metric_counters = {}
317 for name in Telemetry.CounterName:
318 counter = telemetry.counter(config=self._opentelemetry_dict["metrics"], name=name)
319 if counter is not None:
320 self._metric_counters[name] = counter
321
322 if "attributes" in self._opentelemetry_dict["metrics"]:
323 attributes_providers: list[AttributesProvider] = []
324 attributes_provider_configs: list[dict[str, Any]] = self._opentelemetry_dict["metrics"][
325 "attributes"
326 ]
327 for config in attributes_provider_configs:
328 attributes_provider_type: str = config["type"]
329 attributes_provider_fully_qualified_name = _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING.get(
330 attributes_provider_type, attributes_provider_type
331 )
332 attributes_provider_module_name, attributes_provider_class_name = (
333 attributes_provider_fully_qualified_name.rsplit(".", 1)
334 )
335 cls = import_class(attributes_provider_class_name, attributes_provider_module_name)
336 attributes_provider_options = config.get("options", {})
337 if (
338 attributes_provider_fully_qualified_name
339 == _TELEMETRY_ATTRIBUTES_PROVIDER_MAPPING["msc_config"]
340 ):
341 attributes_provider_options["config_dict"] = config_dict
342 attributes_provider: AttributesProvider = cls(**attributes_provider_options)
343 attributes_providers.append(attributes_provider)
344 self._metric_attributes_providers = tuple(attributes_providers)
345 elif not any([metric_gauges, metric_counters, metric_attributes_providers]):
346 # TODO: Remove "beta" from the log once legacy metrics are removed.
347 logger.error("No telemetry instance! Disabling beta metrics.")
348
349 self._cache_dict = config_dict.get("cache", None)
350
351 def _build_storage_provider(
352 self,
353 storage_provider_name: str,
354 storage_options: Optional[dict[str, Any]] = None,
355 credentials_provider: Optional[CredentialsProvider] = None,
356 ) -> StorageProvider:
357 if storage_options is None:
358 storage_options = {}
359 if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
360 raise ValueError(
361 f"Storage provider {storage_provider_name} is not supported. "
362 f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
363 )
364 if credentials_provider:
365 storage_options["credentials_provider"] = credentials_provider
366 if self._metric_gauges is not None:
367 storage_options["metric_gauges"] = self._metric_gauges
368 if self._metric_counters is not None:
369 storage_options["metric_counters"] = self._metric_counters
370 if self._metric_attributes_providers is not None:
371 storage_options["metric_attributes_providers"] = self._metric_attributes_providers
372 class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
373 module_name = ".providers"
374 cls = import_class(class_name, module_name, PACKAGE_NAME)
375 return cls(**storage_options)
376
377 def _build_storage_provider_from_profile(self, storage_provider_profile: str):
378 storage_profile_dict = self._profiles.get(storage_provider_profile)
379 if not storage_profile_dict:
380 raise ValueError(
381 f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
382 )
383
384 # Check if metadata provider is configured for this profile
385 # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
386 local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
387 if local_metadata_provider_dict:
388 raise ValueError(
389 f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
390 )
391
392 # Initialize CredentialsProvider
393 local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
394 local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
395
396 # Initialize StorageProvider
397 local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
398 if local_storage_provider_dict:
399 local_name = local_storage_provider_dict["type"]
400 local_storage_options = local_storage_provider_dict.get("options", {})
401 else:
402 raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
403
404 storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
405 return storage_provider
406
407 def _build_credentials_provider(
408 self,
409 credentials_provider_dict: Optional[dict[str, Any]],
410 storage_options: Optional[dict[str, Any]] = None,
411 ) -> Optional[CredentialsProvider]:
412 """
413 Initializes the CredentialsProvider based on the provided dictionary.
414
415 Args:
416 credentials_provider_dict: Dictionary containing credentials provider configuration
417 storage_options: Storage provider options required by some credentials providers to scope the credentials.
418 """
419 if not credentials_provider_dict:
420 return None
421
422 if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
423 # Fully qualified class path case
424 class_type = credentials_provider_dict["type"]
425 module_name, class_name = class_type.rsplit(".", 1)
426 cls = import_class(class_name, module_name)
427 else:
428 # Mapped class name case
429 class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
430 module_name = ".providers"
431 cls = import_class(class_name, module_name, PACKAGE_NAME)
432
433 # Propagate storage provider options to credentials provider since they may be
434 # required by some credentials providers to scope the credentials.
435 import inspect
436
437 init_params = list(inspect.signature(cls.__init__).parameters)[1:] # skip 'self'
438 options = credentials_provider_dict.get("options", {})
439 if storage_options:
440 for storage_provider_option in storage_options.keys():
441 if storage_provider_option in init_params and storage_provider_option not in options:
442 options[storage_provider_option] = storage_options[storage_provider_option]
443
444 return cls(**options)
445
446 def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
447 # Initialize StorageProvider
448 storage_provider_dict = profile_dict.get("storage_provider", None)
449 if storage_provider_dict:
450 storage_provider_name = storage_provider_dict["type"]
451 storage_options = storage_provider_dict.get("options", {})
452 else:
453 raise ValueError("Missing storage_provider in the config.")
454
455 # Initialize CredentialsProvider
456 # It is prudent to assume that in some cases, the credentials provider
457 # will provide credentials scoped to specific base_path.
458 # So we need to pass the storage_options to the credentials provider.
459 credentials_provider_dict = profile_dict.get("credentials_provider", None)
460 credentials_provider = self._build_credentials_provider(
461 credentials_provider_dict=credentials_provider_dict,
462 storage_options=storage_options,
463 )
464
465 # Initialize MetadataProvider
466 metadata_provider_dict = profile_dict.get("metadata_provider", None)
467 metadata_provider = None
468 if metadata_provider_dict:
469 if metadata_provider_dict["type"] == "manifest":
470 metadata_options = metadata_provider_dict.get("options", {})
471 # If MetadataProvider has a reference to a different storage provider profile
472 storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
473 if storage_provider_profile:
474 storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
475 else:
476 storage_provider = self._build_storage_provider(
477 storage_provider_name, storage_options, credentials_provider
478 )
479
480 metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
481 else:
482 class_type = metadata_provider_dict["type"]
483 if "." not in class_type:
484 raise ValueError(
485 f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
486 )
487 module_name, class_name = class_type.rsplit(".", 1)
488 cls = import_class(class_name, module_name)
489 options = metadata_provider_dict.get("options", {})
490 metadata_provider = cls(**options)
491
492 # Build replicas if configured
493 replicas_config = profile_dict.get("replicas", [])
494 replicas = []
495 if replicas_config:
496 for replica_dict in replicas_config:
497 replicas.append(
498 Replica(
499 replica_profile=replica_dict["replica_profile"],
500 read_priority=replica_dict["read_priority"],
501 )
502 )
503
504 # Sort replicas by read_priority
505 replicas.sort(key=lambda r: r.read_priority)
506
507 return SimpleProviderBundle(
508 storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
509 credentials_provider=credentials_provider,
510 metadata_provider=metadata_provider,
511 replicas=replicas,
512 )
513
514 def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
515 class_type = provider_bundle_dict["type"]
516 module_name, class_name = class_type.rsplit(".", 1)
517 cls = import_class(class_name, module_name)
518 options = provider_bundle_dict.get("options", {})
519 return cls(**options)
520
521 def _build_provider_bundle(self) -> ProviderBundle:
522 if self._provider_bundle:
523 return self._provider_bundle # Return if previously provided.
524
525 # Load 3rd party extension
526 provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
527 if provider_bundle_dict:
528 return self._build_provider_bundle_from_extension(provider_bundle_dict)
529
530 return self._build_provider_bundle_from_config(self._profile_dict)
531
532 def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
533 if "size_mb" in cache_dict:
534 raise ValueError(
535 "The 'size_mb' property is no longer supported. \n"
536 "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
537 "Example configuration:\n"
538 "cache:\n"
539 " size: 500G # Optional: If not specified, default cache size (10GB) will be used\n"
540 " use_etag: true # Optional: If not specified, default cache use_etag (true) will be used\n"
541 " location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
542 " eviction_policy: # Optional: The eviction policy to use\n"
543 " policy: fifo # Optional: The eviction policy to use, default is 'fifo'\n"
544 " refresh_interval: 300 # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
545 )
546
547 def _validate_replicas(self, replicas: list[Replica]) -> None:
548 """
549 Validates that replica profiles do not have their own replicas configuration.
550
551 This prevents circular references where a replica profile could reference
552 another profile that also has replicas, creating an infinite loop.
553
554 :param replicas: The list of Replica objects to validate
555 :raises ValueError: If any replica profile has its own replicas configuration
556 """
557 for replica in replicas:
558 replica_profile_name = replica.replica_profile
559
560 # Check that replica profile is not the same as the current profile
561 if replica_profile_name == self._profile:
562 raise ValueError(
563 f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
564 )
565
566 # Check if the replica profile exists in the configuration
567 if replica_profile_name not in self._profiles:
568 raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
569
570 # Get the replica profile configuration
571 replica_profile_dict = self._profiles[replica_profile_name]
572
573 # Check if the replica profile has its own replicas configuration
574 if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
575 raise ValueError(
576 f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
577 f"This creates a circular reference which is not allowed."
578 )
579
580 def build_config(self) -> "StorageClientConfig":
581 bundle = self._build_provider_bundle()
582
583 # Validate replicas to prevent circular references
584 self._validate_replicas(bundle.replicas)
585
586 storage_provider = self._build_storage_provider(
587 bundle.storage_provider_config.type,
588 bundle.storage_provider_config.options,
589 bundle.credentials_provider,
590 )
591
592 cache_config: Optional[CacheConfig] = None
593 cache_manager: Optional[CacheManager] = None
594
595 # Check if caching is enabled for this profile
596 caching_enabled = self._profile_dict.get("caching_enabled", False)
597
598 if self._cache_dict is not None and caching_enabled:
599 tempdir = tempfile.gettempdir()
600 default_location = os.path.join(tempdir, "msc_cache")
601 location = self._cache_dict.get("location", default_location)
602
603 # Check if cache_backend.cache_path is defined
604 cache_backend = self._cache_dict.get("cache_backend", {})
605 cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
606
607 # Warn if both location and cache_backend.cache_path are defined
608 if cache_backend_path and self._cache_dict.get("location") is not None:
609 logger.warning(
610 f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
611 f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
612 )
613 elif cache_backend_path:
614 # Use cache_backend.cache_path only if location is not explicitly defined
615 location = cache_backend_path
616
617 # Resolve the effective flag while preserving explicit ``False``.
618 if "check_source_version" in self._cache_dict:
619 check_source_version = self._cache_dict["check_source_version"]
620 else:
621 check_source_version = self._cache_dict.get("use_etag", True)
622
623 # Warn if both keys are specified – the new one wins.
624 if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
625 logger.warning(
626 "Both 'check_source_version' and 'use_etag' are defined in cache config. "
627 "Using 'check_source_version' and ignoring 'use_etag'."
628 )
629
630 if not Path(location).is_absolute():
631 raise ValueError(f"Cache location must be an absolute path: {location}")
632
633 # Initialize cache_dict with default values
634 cache_dict = self._cache_dict
635
636 # Verify cache config
637 self._verify_cache_config(cache_dict)
638
639 # Initialize eviction policy
640 if "eviction_policy" in cache_dict:
641 policy = cache_dict["eviction_policy"]["policy"].lower()
642 eviction_policy = EvictionPolicyConfig(
643 policy=policy,
644 refresh_interval=cache_dict["eviction_policy"].get(
645 "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
646 ),
647 )
648 else:
649 eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
650
651 # Create cache config from the standardized format
652 cache_config = CacheConfig(
653 size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
654 location=cache_dict.get("location", location),
655 check_source_version=check_source_version,
656 eviction_policy=eviction_policy,
657 )
658
659 cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
660 elif self._cache_dict is not None and not caching_enabled:
661 logger.debug(f"Caching is disabled for profile '{self._profile}'")
662 elif self._cache_dict is None and caching_enabled:
663 logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
664
665 # retry options
666 retry_config_dict = self._profile_dict.get("retry", None)
667 if retry_config_dict:
668 attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
669 delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
670 retry_config = RetryConfig(attempts=attempts, delay=delay)
671 else:
672 retry_config = RetryConfig(attempts=DEFAULT_RETRY_ATTEMPTS, delay=DEFAULT_RETRY_DELAY)
673
674 # autocommit options
675 autocommit_config = AutoCommitConfig()
676 autocommit_dict = self._profile_dict.get("autocommit", None)
677 if autocommit_dict:
678 interval_minutes = autocommit_dict.get("interval_minutes", None)
679 at_exit = autocommit_dict.get("at_exit", False)
680 autocommit_config = AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
681
682 # set up OpenTelemetry providers once per process
683 #
684 # TODO: Legacy, need to remove.
685 if self._opentelemetry_dict:
686 setup_opentelemetry(self._opentelemetry_dict)
687
688 return StorageClientConfig(
689 profile=self._profile,
690 storage_provider=storage_provider,
691 credentials_provider=bundle.credentials_provider,
692 metadata_provider=bundle.metadata_provider,
693 cache_config=cache_config,
694 cache_manager=cache_manager,
695 retry_config=retry_config,
696 metric_gauges=self._metric_gauges,
697 metric_counters=self._metric_counters,
698 metric_attributes_providers=self._metric_attributes_providers,
699 replicas=bundle.replicas,
700 autocommit_config=autocommit_config,
701 )
702
703
704class PathMapping:
705 """
706 Class to handle path mappings defined in the MSC configuration.
707
708 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
709 where entries are sorted by prefix length (longest first) for optimal matching.
710 Longer paths take precedence when matching.
711 """
712
713 def __init__(self):
714 """Initialize an empty PathMapping."""
715 self._mapping = defaultdict(lambda: defaultdict(list))
716
717 @classmethod
718 def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
719 """
720 Create a PathMapping instance from configuration dictionary.
721
722 :param config_dict: Configuration dictionary, if None the config will be loaded
723 :return: A PathMapping instance with processed mappings
724 """
725 if config_dict is None:
726 # Import locally to avoid circular imports
727 from multistorageclient.config import StorageClientConfig
728
729 config_dict, _ = StorageClientConfig.read_msc_config()
730
731 if not config_dict:
732 return cls()
733
734 instance = cls()
735 instance._load_mapping(config_dict)
736 return instance
737
738 def _load_mapping(self, config_dict: dict[str, Any]) -> None:
739 """
740 Load path mapping from a configuration dictionary.
741
742 :param config_dict: Configuration dictionary containing path mapping
743 """
744 # Get the path_mapping section
745 path_mapping = config_dict.get("path_mapping", {})
746 if path_mapping is None:
747 return
748
749 # Process each mapping
750 for source_path, dest_path in path_mapping.items():
751 # Validate format
752 if not source_path.endswith("/"):
753 continue
754 if not dest_path.startswith(MSC_PROTOCOL):
755 continue
756 if not dest_path.endswith("/"):
757 continue
758
759 # Extract the destination profile
760 pr_dest = urlparse(dest_path)
761 dest_profile = pr_dest.netloc
762
763 # Parse the source path
764 pr = urlparse(source_path)
765 protocol = pr.scheme.lower() if pr.scheme else "file"
766
767 if protocol == "file" or source_path.startswith("/"):
768 # For file or absolute paths, use the whole path as the prefix
769 # and leave bucket empty
770 bucket = ""
771 prefix = source_path if source_path.startswith("/") else pr.path
772 else:
773 # For object storage, extract bucket and prefix
774 bucket = pr.netloc
775 prefix = pr.path
776 if prefix.startswith("/"):
777 prefix = prefix[1:]
778
779 # Add the mapping to the nested dict
780 self._mapping[protocol][bucket].append((prefix, dest_profile))
781
782 # Sort each bucket's prefixes by length (longest first) for optimal matching
783 for protocol, buckets in self._mapping.items():
784 for bucket, prefixes in buckets.items():
785 self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
786
787 def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
788 """
789 Find the best matching mapping for the given URL.
790
791 :param url: URL to find matching mapping for
792 :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
793 """
794 # Parse the URL
795 pr = urlparse(url)
796 protocol = pr.scheme.lower() if pr.scheme else "file"
797
798 # For file paths or absolute paths
799 if protocol == "file" or url.startswith("/"):
800 path = url if url.startswith("/") else pr.path
801
802 possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
803
804 # Check each prefix (already sorted by length, longest first)
805 for prefix, profile in possible_mapping:
806 if path.startswith(prefix):
807 # Calculate the relative path
808 rel_path = path[len(prefix) :]
809 if not rel_path.startswith("/"):
810 rel_path = "/" + rel_path
811 return profile, rel_path
812
813 return None
814
815 # For object storage
816 bucket = pr.netloc
817 path = pr.path
818 if path.startswith("/"):
819 path = path[1:]
820
821 # Check bucket-specific mapping
822 possible_mapping = (
823 self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
824 )
825
826 # Check each prefix (already sorted by length, longest first)
827 for prefix, profile in possible_mapping:
828 # matching prefix
829 if path.startswith(prefix):
830 rel_path = path[len(prefix) :]
831 # Remove leading slash if present
832 if rel_path.startswith("/"):
833 rel_path = rel_path[1:]
834
835 return profile, rel_path
836
837 return None
838
839
[docs]
840class StorageClientConfig:
841 """
842 Configuration class for the :py:class:`multistorageclient.StorageClient`.
843 """
844
845 profile: str
846 storage_provider: StorageProvider
847 credentials_provider: Optional[CredentialsProvider]
848 metadata_provider: Optional[MetadataProvider]
849 cache_config: Optional[CacheConfig]
850 cache_manager: Optional[CacheManager]
851 retry_config: Optional[RetryConfig]
852 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]]
853 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]]
854 metric_attributes_providers: Optional[Sequence[AttributesProvider]]
855 replicas: list[Replica]
856 autocommit_config: Optional[AutoCommitConfig]
857
858 _config_dict: Optional[dict[str, Any]]
859
860 def __init__(
861 self,
862 profile: str,
863 storage_provider: StorageProvider,
864 credentials_provider: Optional[CredentialsProvider] = None,
865 metadata_provider: Optional[MetadataProvider] = None,
866 cache_config: Optional[CacheConfig] = None,
867 cache_manager: Optional[CacheManager] = None,
868 retry_config: Optional[RetryConfig] = None,
869 metric_gauges: Optional[dict[Telemetry.GaugeName, api_metrics._Gauge]] = None,
870 metric_counters: Optional[dict[Telemetry.CounterName, api_metrics.Counter]] = None,
871 metric_attributes_providers: Optional[Sequence[AttributesProvider]] = None,
872 replicas: Optional[list[Replica]] = None,
873 autocommit_config: Optional[AutoCommitConfig] = None,
874 ):
875 if replicas is None:
876 replicas = []
877 self.profile = profile
878 self.storage_provider = storage_provider
879 self.credentials_provider = credentials_provider
880 self.metadata_provider = metadata_provider
881 self.cache_config = cache_config
882 self.cache_manager = cache_manager
883 self.retry_config = retry_config
884 self.metric_gauges = metric_gauges
885 self.metric_counters = metric_counters
886 self.metric_attributes_providers = metric_attributes_providers
887 self.replicas = replicas
888 self.autocommit_config = autocommit_config
889
890 @staticmethod
891 def from_json(
892 config_json: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None
893 ) -> "StorageClientConfig":
894 config_dict = json.loads(config_json)
895 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry)
896
897 @staticmethod
898 def from_yaml(
899 config_yaml: str, profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None
900 ) -> "StorageClientConfig":
901 config_dict = yaml.safe_load(config_yaml)
902 return StorageClientConfig.from_dict(config_dict=config_dict, profile=profile, telemetry=telemetry)
903
904 @staticmethod
905 def from_dict(
906 config_dict: dict[str, Any],
907 profile: str = DEFAULT_POSIX_PROFILE_NAME,
908 skip_validation: bool = False,
909 telemetry: Optional[Telemetry] = None,
910 ) -> "StorageClientConfig":
911 # Validate the config file with predefined JSON schema
912 if not skip_validation:
913 validate_config(config_dict)
914
915 # Load config
916 loader = StorageClientConfigLoader(config_dict=config_dict, profile=profile, telemetry=telemetry)
917 config = loader.build_config()
918 config._config_dict = config_dict
919
920 return config
921
922 @staticmethod
923 def from_file(
924 profile: str = DEFAULT_POSIX_PROFILE_NAME, telemetry: Optional[Telemetry] = None
925 ) -> "StorageClientConfig":
926 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config()
927 # Parse rclone config file.
928 rclone_config_dict, rclone_config_file = read_rclone_config()
929
930 # Merge config files.
931 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict)
932 if conflicted_keys:
933 raise ValueError(
934 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}'
935 )
936 merged_profiles = merged_config.get("profiles", {})
937
938 # Check if profile is in merged_profiles
939 if profile in merged_profiles:
940 return StorageClientConfig.from_dict(config_dict=merged_config, profile=profile, telemetry=telemetry)
941 else:
942 # Check if profile is the default profile or an implicit profile
943 if profile == DEFAULT_POSIX_PROFILE_NAME:
944 implicit_profile_config = DEFAULT_POSIX_PROFILE
945 elif profile.startswith("_"):
946 # Handle implicit profiles
947 parts = profile[1:].split("-", 1)
948 if len(parts) == 2:
949 protocol, bucket = parts
950 # Verify it's a supported protocol
951 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS:
952 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"')
953 implicit_profile_config = create_implicit_profile_config(
954 profile_name=profile, protocol=protocol, base_path=bucket
955 )
956 else:
957 raise ValueError(f'Invalid implicit profile format: "{profile}"')
958 else:
959 raise ValueError(
960 f'Profile "{profile}" not found in configuration files. Configuration was checked in '
961 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. "
962 f"Please verify that the profile exists and that configuration files are correctly located."
963 )
964 # merge the implicit profile config into the merged config so the cache & observability config can be inherited
965 if "profiles" not in merged_config:
966 merged_config["profiles"] = implicit_profile_config["profiles"]
967 else:
968 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile]
969 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_"
970 return StorageClientConfig.from_dict(
971 config_dict=merged_config, profile=profile, skip_validation=True, telemetry=telemetry
972 )
973
974 @staticmethod
975 def from_provider_bundle(
976 config_dict: dict[str, Any], provider_bundle: ProviderBundle, telemetry: Optional[Telemetry] = None
977 ) -> "StorageClientConfig":
978 loader = StorageClientConfigLoader(
979 config_dict=config_dict, provider_bundle=provider_bundle, telemetry=telemetry
980 )
981 config = loader.build_config()
982 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors
983 return config
984
[docs]
985 @staticmethod
986 def read_msc_config() -> tuple[Optional[dict[str, Any]], Optional[str]]:
987 """Get the MSC configuration dictionary and the path of the config file used.
988
989 Configs are searched in the following order:
990 1. MSC_CONFIG environment variable (highest precedence)
991 2. Standard search paths (user-specified config and system-wide config)
992
993
994 :return: Tuple of (config_dict, config_file_path). config_dict is the MSC configuration
995 dictionary or empty dict if no config was found. config_file_path is the absolute
996 path of the config file used, or None if no config file was found.
997 """
998 config_dict = {}
999 found_config_files = []
1000 used_config_file = None
1001
1002 # Check for environment variable first
1003 msc_config = os.getenv("MSC_CONFIG", None)
1004 if msc_config and os.path.exists(msc_config):
1005 try:
1006 with open(msc_config) as f:
1007 if msc_config.endswith(".json"):
1008 config_dict = json.load(f)
1009 used_config_file = msc_config
1010 else:
1011 config_dict = yaml.safe_load(f)
1012 used_config_file = msc_config
1013 found_config_files.append(msc_config)
1014 except Exception as e:
1015 raise ValueError(f"malformed msc config file: {msc_config}, exception: {e}")
1016
1017 # Check all standard search paths
1018 for path in _find_config_file_paths():
1019 if os.path.exists(path):
1020 found_config_files.append(path)
1021 # Only load the first found config file (environment variable takes precedence)
1022 if used_config_file is None:
1023 try:
1024 with open(path) as f:
1025 if path.endswith(".json"):
1026 config_dict = json.load(f)
1027 used_config_file = path
1028 else:
1029 config_dict = yaml.safe_load(f)
1030 used_config_file = path
1031 except Exception as e:
1032 raise ValueError(f"malformed msc config file: {path}, exception: {e}")
1033
1034 # Log debug and info messages
1035 if len(found_config_files) > 1:
1036 found_config_files_str = ", ".join(found_config_files)
1037 logger.debug(f"Multiple MSC config files found: {found_config_files_str}. ")
1038
1039 if len(found_config_files) == 0:
1040 logger.debug("No MSC config files found in any of the search locations.")
1041 else:
1042 logger.info(f"Using MSC config file: {used_config_file}")
1043
1044 if config_dict:
1045 validate_config(config_dict)
1046 return config_dict, used_config_file
1047
[docs]
1048 @staticmethod
1049 def read_path_mapping() -> PathMapping:
1050 """
1051 Get the path mapping defined in the MSC configuration.
1052
1053 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1054 where entries are sorted by prefix length (longest first) for optimal matching.
1055 Longer paths take precedence when matching.
1056
1057 :return: A PathMapping instance with translation mappings
1058 """
1059 try:
1060 return PathMapping.from_config()
1061 except Exception:
1062 # Log the error but continue - this shouldn't stop the application from working
1063 logger.error("Failed to load path_mapping from MSC config")
1064 return PathMapping()
1065
1066 def __getstate__(self) -> dict[str, Any]:
1067 state = self.__dict__.copy()
1068 if not state.get("_config_dict"):
1069 raise ValueError("StorageClientConfig is not serializable")
1070 del state["credentials_provider"]
1071 del state["storage_provider"]
1072 del state["metadata_provider"]
1073 del state["cache_manager"]
1074 del state["replicas"]
1075 return state
1076
1077 def __setstate__(self, state: dict[str, Any]) -> None:
1078 # Presence checked by __getstate__.
1079 config_dict = state["_config_dict"]
1080 loader = StorageClientConfigLoader(
1081 config_dict=config_dict,
1082 profile=state["profile"],
1083 metric_gauges=state["metric_gauges"],
1084 metric_counters=state["metric_counters"],
1085 metric_attributes_providers=state["metric_attributes_providers"],
1086 )
1087 new_config = loader.build_config()
1088 self.profile = new_config.profile
1089 self.storage_provider = new_config.storage_provider
1090 self.credentials_provider = new_config.credentials_provider
1091 self.metadata_provider = new_config.metadata_provider
1092 self.cache_config = new_config.cache_config
1093 self.cache_manager = new_config.cache_manager
1094 self.retry_config = new_config.retry_config
1095 self.metric_gauges = new_config.metric_gauges
1096 self.metric_counters = new_config.metric_counters
1097 self.metric_attributes_providers = new_config.metric_attributes_providers
1098 self._config_dict = config_dict
1099 self.replicas = new_config.replicas
1100 self.autocommit_config = new_config.autocommit_config