1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import copy
17import json
18import logging
19import os
20import tempfile
21from collections import defaultdict
22from collections.abc import Callable
23from pathlib import Path
24from typing import Any, Iterable, Optional, Union
25from urllib.parse import urlparse
26
27import yaml
28
29from .cache import DEFAULT_CACHE_LINE_SIZE, DEFAULT_CACHE_SIZE, CacheManager
30from .caching.cache_config import CacheConfig, EvictionPolicyConfig
31from .providers.manifest_metadata import ManifestMetadataProvider
32from .rclone import read_rclone_config
33from .schema import validate_config
34from .telemetry import Telemetry
35from .telemetry import init as telemetry_init
36from .types import (
37 DEFAULT_RETRY_ATTEMPTS,
38 DEFAULT_RETRY_BACKOFF_MULTIPLIER,
39 DEFAULT_RETRY_DELAY,
40 MSC_PROTOCOL,
41 AutoCommitConfig,
42 CredentialsProvider,
43 MetadataProvider,
44 ProviderBundle,
45 ProviderBundleV2,
46 Replica,
47 RetryConfig,
48 StorageBackend,
49 StorageProvider,
50 StorageProviderConfig,
51)
52from .utils import expand_env_vars, import_class, merge_dictionaries_no_overwrite
53
54# Constants related to implicit profiles
55SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS = ("s3", "gs", "ais", "file")
56PROTOCOL_TO_PROVIDER_TYPE_MAPPING = {
57 "s3": "s3",
58 "gs": "gcs",
59 "ais": "ais",
60 "file": "file",
61}
62
63
64# Template for creating implicit profile configurations
65def create_implicit_profile_config(profile_name: str, protocol: str, base_path: str) -> dict:
66 """
67 Create a configuration dictionary for an implicit profile.
68
69 :param profile_name: The name of the profile (e.g., "_s3-bucket1")
70 :param protocol: The storage protocol (e.g., "s3", "gs", "ais")
71 :param base_path: The base path (e.g., bucket name) for the storage provider
72
73 :return: A configuration dictionary for the implicit profile
74 """
75 provider_type = PROTOCOL_TO_PROVIDER_TYPE_MAPPING[protocol]
76 return {
77 "profiles": {profile_name: {"storage_provider": {"type": provider_type, "options": {"base_path": base_path}}}}
78 }
79
80
81DEFAULT_POSIX_PROFILE_NAME = "default"
82DEFAULT_POSIX_PROFILE = create_implicit_profile_config(DEFAULT_POSIX_PROFILE_NAME, "file", "/")
83
84STORAGE_PROVIDER_MAPPING = {
85 "file": "PosixFileStorageProvider",
86 "s3": "S3StorageProvider",
87 "gcs": "GoogleStorageProvider",
88 "oci": "OracleStorageProvider",
89 "azure": "AzureBlobStorageProvider",
90 "ais": "AIStoreStorageProvider",
91 "ais_s3": "AIStoreS3StorageProvider",
92 "s8k": "S8KStorageProvider",
93 "gcs_s3": "GoogleS3StorageProvider",
94 "huggingface": "HuggingFaceStorageProvider",
95}
96
97CREDENTIALS_PROVIDER_MAPPING = {
98 "S3Credentials": "StaticS3CredentialsProvider",
99 "AzureCredentials": "StaticAzureCredentialsProvider",
100 "AISCredentials": "StaticAISCredentialProvider",
101 "GoogleIdentityPoolCredentialsProvider": "GoogleIdentityPoolCredentialsProvider",
102 "GoogleServiceAccountCredentialsProvider": "GoogleServiceAccountCredentialsProvider",
103 "HuggingFaceCredentials": "HuggingFaceCredentialsProvider",
104 "FileBasedCredentials": "FileBasedCredentialsProvider",
105}
106
107
108def _find_config_file_paths() -> tuple[str]:
109 """
110 Get configuration file search paths.
111
112 Returns paths in order of precedence:
113
114 1. User-specific config (${XDG_CONFIG_HOME}/msc/, ${HOME}/, ${HOME}/.config/msc/)
115 2. System-wide configs (${XDG_CONFIG_DIRS}/msc/, /etc/xdg, /etc/)
116 """
117 paths = []
118
119 # 1. User-specific configuration directory
120 xdg_config_home = os.getenv("XDG_CONFIG_HOME")
121
122 if xdg_config_home:
123 paths.extend(
124 [
125 os.path.join(xdg_config_home, "msc", "config.yaml"),
126 os.path.join(xdg_config_home, "msc", "config.json"),
127 ]
128 )
129
130 user_home = os.getenv("HOME")
131
132 if user_home:
133 paths.extend(
134 [
135 os.path.join(user_home, ".msc_config.yaml"),
136 os.path.join(user_home, ".msc_config.json"),
137 os.path.join(user_home, ".config", "msc", "config.yaml"),
138 os.path.join(user_home, ".config", "msc", "config.json"),
139 ]
140 )
141
142 # 2. System-wide configuration directories
143 xdg_config_dirs = os.getenv("XDG_CONFIG_DIRS")
144 if not xdg_config_dirs:
145 xdg_config_dirs = "/etc/xdg"
146
147 for config_dir in xdg_config_dirs.split(":"):
148 config_dir = config_dir.strip()
149 if config_dir:
150 paths.extend(
151 [
152 os.path.join(config_dir, "msc", "config.yaml"),
153 os.path.join(config_dir, "msc", "config.json"),
154 ]
155 )
156
157 paths.extend(
158 [
159 "/etc/msc_config.yaml",
160 "/etc/msc_config.json",
161 ]
162 )
163
164 return tuple(paths)
165
166
167PACKAGE_NAME = "multistorageclient"
168
169logger = logging.getLogger(__name__)
170
171
172class ImmutableDict(dict):
173 """
174 Immutable dictionary that raises an error when attempting to modify it.
175 """
176
177 def __init__(self, *args, **kwargs):
178 super().__init__(*args, **kwargs)
179
180 # Recursively freeze nested structures
181 for key, value in list(super().items()):
182 if isinstance(value, dict) and not isinstance(value, ImmutableDict):
183 super().__setitem__(key, ImmutableDict(value))
184 elif isinstance(value, list):
185 super().__setitem__(key, self._freeze_list(value))
186
187 @staticmethod
188 def _freeze_list(lst):
189 """
190 Convert list to tuple, freezing nested dicts recursively.
191 """
192 frozen = []
193 for item in lst:
194 if isinstance(item, dict):
195 frozen.append(ImmutableDict(item))
196 elif isinstance(item, list):
197 frozen.append(ImmutableDict._freeze_list(item))
198 else:
199 frozen.append(item)
200 return tuple(frozen)
201
202 def __deepcopy__(self, memo):
203 """
204 Return a regular mutable dict when deepcopy is called.
205 """
206 return copy.deepcopy(dict(self), memo)
207
208 def __reduce__(self):
209 """
210 Support for pickle serialization.
211 """
212 return (self.__class__, (dict(self),))
213
214 def _copy_value(self, value):
215 """
216 Convert frozen structures back to mutable equivalents.
217 """
218 if isinstance(value, ImmutableDict):
219 return {k: self._copy_value(v) for k, v in value.items()}
220 elif isinstance(value, tuple):
221 # Check if it was originally a list (frozen by _freeze_list)
222 return [self._copy_value(item) for item in value]
223 else:
224 return value
225
226 def __getitem__(self, key):
227 """
228 Return a mutable copy of the value.
229 """
230 value = super().__getitem__(key)
231 return self._copy_value(value)
232
233 def get(self, key, default=None):
234 """
235 Return a mutable copy of the value.
236 """
237 return self[key] if key in self else default
238
239 def __setitem__(self, key, value):
240 raise TypeError("ImmutableDict is immutable")
241
242 def __delitem__(self, key):
243 raise TypeError("ImmutableDict is immutable")
244
245 def clear(self):
246 raise TypeError("ImmutableDict is immutable")
247
248 def pop(self, *args):
249 raise TypeError("ImmutableDict is immutable")
250
251 def popitem(self):
252 raise TypeError("ImmutableDict is immutable")
253
254 def setdefault(self, key, default=None):
255 raise TypeError("ImmutableDict is immutable")
256
257 def update(self, *args, **kwargs):
258 raise TypeError("ImmutableDict is immutable")
259
260
261class SimpleProviderBundle(ProviderBundle):
262 def __init__(
263 self,
264 storage_provider_config: StorageProviderConfig,
265 credentials_provider: Optional[CredentialsProvider] = None,
266 metadata_provider: Optional[MetadataProvider] = None,
267 replicas: Optional[list[Replica]] = None,
268 ):
269 if replicas is None:
270 replicas = []
271
272 self._storage_provider_config = storage_provider_config
273 self._credentials_provider = credentials_provider
274 self._metadata_provider = metadata_provider
275 self._replicas = replicas
276
277 @property
278 def storage_provider_config(self) -> StorageProviderConfig:
279 return self._storage_provider_config
280
281 @property
282 def credentials_provider(self) -> Optional[CredentialsProvider]:
283 return self._credentials_provider
284
285 @property
286 def metadata_provider(self) -> Optional[MetadataProvider]:
287 return self._metadata_provider
288
289 @property
290 def replicas(self) -> list[Replica]:
291 return self._replicas
292
293
294class SimpleProviderBundleV2(ProviderBundleV2):
295 def __init__(
296 self, storage_backends: dict[str, StorageBackend], metadata_provider: Optional[MetadataProvider] = None
297 ):
298 self._storage_backends = storage_backends
299 self._metadata_provider = metadata_provider
300
301 @staticmethod
302 def from_v1_bundle(profile_name: str, v1_bundle: ProviderBundle) -> "SimpleProviderBundleV2":
303 backend = StorageBackend(
304 storage_provider_config=v1_bundle.storage_provider_config,
305 credentials_provider=v1_bundle.credentials_provider,
306 replicas=v1_bundle.replicas,
307 )
308
309 return SimpleProviderBundleV2(
310 storage_backends={profile_name: backend},
311 metadata_provider=v1_bundle.metadata_provider,
312 )
313
314 @property
315 def storage_backends(self) -> dict[str, StorageBackend]:
316 return self._storage_backends
317
318 @property
319 def metadata_provider(self) -> Optional[MetadataProvider]:
320 return self._metadata_provider
321
322
323DEFAULT_CACHE_REFRESH_INTERVAL = 300
324
325
326class StorageClientConfigLoader:
327 _provider_bundle: ProviderBundleV2
328 _resolved_config_dict: dict[str, Any]
329 _profiles: dict[str, Any]
330 _profile: str
331 _profile_dict: dict[str, Any]
332 _opentelemetry_dict: Optional[dict[str, Any]]
333 _telemetry_provider: Optional[Callable[[], Telemetry]]
334 _cache_dict: Optional[dict[str, Any]]
335
336 def __init__(
337 self,
338 config_dict: dict[str, Any],
339 profile: str = DEFAULT_POSIX_PROFILE_NAME,
340 provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]] = None,
341 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
342 ) -> None:
343 """
344 Initializes a :py:class:`StorageClientConfigLoader` to create a
345 StorageClientConfig. Components are built using the ``config_dict`` and
346 profile, but a pre-built provider_bundle takes precedence.
347
348 :param config_dict: Dictionary of configuration options.
349 :param profile: Name of profile in ``config_dict`` to use to build configuration.
350 :param provider_bundle: Optional pre-built :py:class:`multistorageclient.types.ProviderBundle` or :py:class:`multistorageclient.types.ProviderBundleV2`, takes precedence over ``config_dict``.
351 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
352 """
353 # Interpolates all environment variables into actual values.
354 config_dict = expand_env_vars(config_dict)
355 self._resolved_config_dict = ImmutableDict(config_dict)
356
357 self._profiles = config_dict.get("profiles", {})
358
359 if DEFAULT_POSIX_PROFILE_NAME not in self._profiles:
360 # Assign the default POSIX profile
361 self._profiles[DEFAULT_POSIX_PROFILE_NAME] = DEFAULT_POSIX_PROFILE["profiles"][DEFAULT_POSIX_PROFILE_NAME]
362 else:
363 # Cannot override default POSIX profile
364 storage_provider_type = (
365 self._profiles[DEFAULT_POSIX_PROFILE_NAME].get("storage_provider", {}).get("type", None)
366 )
367 if storage_provider_type != "file":
368 raise ValueError(
369 f'Cannot override "{DEFAULT_POSIX_PROFILE_NAME}" profile with storage provider type '
370 f'"{storage_provider_type}"; expected "file".'
371 )
372
373 profile_dict = self._profiles.get(profile)
374
375 if not profile_dict:
376 raise ValueError(f"Profile {profile} not found; available profiles: {list(self._profiles.keys())}")
377
378 self._profile = profile
379 self._profile_dict = ImmutableDict(profile_dict)
380
381 self._opentelemetry_dict = config_dict.get("opentelemetry", None)
382 # Multiprocessing unpickles during the Python interpreter's bootstrap phase for new processes.
383 # New processes (e.g. multiprocessing manager server) can't be created during this phase.
384 #
385 # Pass thunks everywhere instead for lazy telemetry initialization.
386 self._telemetry_provider = telemetry_provider or telemetry_init
387
388 self._cache_dict = config_dict.get("cache", None)
389 self._experimental_features = config_dict.get("experimental_features", None)
390
391 self._provider_bundle = self._build_provider_bundle(provider_bundle)
392 self._inject_profiles_from_bundle()
393
394 def _build_storage_provider(
395 self,
396 storage_provider_name: str,
397 storage_options: Optional[dict[str, Any]] = None,
398 credentials_provider: Optional[CredentialsProvider] = None,
399 ) -> StorageProvider:
400 if storage_options is None:
401 storage_options = {}
402 if storage_provider_name not in STORAGE_PROVIDER_MAPPING:
403 raise ValueError(
404 f"Storage provider {storage_provider_name} is not supported. "
405 f"Supported providers are: {list(STORAGE_PROVIDER_MAPPING.keys())}"
406 )
407 if credentials_provider:
408 storage_options["credentials_provider"] = credentials_provider
409 if self._resolved_config_dict is not None:
410 # Make a deep copy to drop any external references which may be mutated or cause infinite recursion.
411 storage_options["config_dict"] = copy.deepcopy(self._resolved_config_dict)
412 if self._telemetry_provider is not None:
413 storage_options["telemetry_provider"] = self._telemetry_provider
414 class_name = STORAGE_PROVIDER_MAPPING[storage_provider_name]
415 module_name = ".providers"
416 cls = import_class(class_name, module_name, PACKAGE_NAME)
417 return cls(**storage_options)
418
419 def _build_storage_provider_from_profile(self, storage_provider_profile: str):
420 storage_profile_dict = self._profiles.get(storage_provider_profile)
421 if not storage_profile_dict:
422 raise ValueError(
423 f"Profile '{storage_provider_profile}' referenced by storage_provider_profile does not exist."
424 )
425
426 # Check if metadata provider is configured for this profile
427 # NOTE: The storage profile for manifests does not support metadata provider (at the moment).
428 local_metadata_provider_dict = storage_profile_dict.get("metadata_provider", None)
429 if local_metadata_provider_dict:
430 raise ValueError(
431 f"Profile '{storage_provider_profile}' cannot have a metadata provider when used for manifests"
432 )
433
434 # Initialize CredentialsProvider
435 local_creds_provider_dict = storage_profile_dict.get("credentials_provider", None)
436 local_creds_provider = self._build_credentials_provider(credentials_provider_dict=local_creds_provider_dict)
437
438 # Initialize StorageProvider
439 local_storage_provider_dict = storage_profile_dict.get("storage_provider", None)
440 if local_storage_provider_dict:
441 local_name = local_storage_provider_dict["type"]
442 local_storage_options = local_storage_provider_dict.get("options", {})
443 else:
444 raise ValueError(f"Missing storage_provider in the config for profile {storage_provider_profile}.")
445
446 storage_provider = self._build_storage_provider(local_name, local_storage_options, local_creds_provider)
447 return storage_provider
448
449 def _build_credentials_provider(
450 self,
451 credentials_provider_dict: Optional[dict[str, Any]],
452 storage_options: Optional[dict[str, Any]] = None,
453 ) -> Optional[CredentialsProvider]:
454 """
455 Initializes the CredentialsProvider based on the provided dictionary.
456
457 Args:
458 credentials_provider_dict: Dictionary containing credentials provider configuration
459 storage_options: Storage provider options required by some credentials providers to scope the credentials.
460 """
461 if not credentials_provider_dict:
462 return None
463
464 if credentials_provider_dict["type"] not in CREDENTIALS_PROVIDER_MAPPING:
465 # Fully qualified class path case
466 class_type = credentials_provider_dict["type"]
467 module_name, class_name = class_type.rsplit(".", 1)
468 cls = import_class(class_name, module_name)
469 else:
470 # Mapped class name case
471 class_name = CREDENTIALS_PROVIDER_MAPPING[credentials_provider_dict["type"]]
472 module_name = ".providers"
473 cls = import_class(class_name, module_name, PACKAGE_NAME)
474
475 # Propagate storage provider options to credentials provider since they may be
476 # required by some credentials providers to scope the credentials.
477 import inspect
478
479 init_params = list(inspect.signature(cls.__init__).parameters)[1:] # skip 'self'
480 options = credentials_provider_dict.get("options", {})
481 if storage_options:
482 for storage_provider_option in storage_options.keys():
483 if storage_provider_option in init_params and storage_provider_option not in options:
484 options[storage_provider_option] = storage_options[storage_provider_option]
485
486 return cls(**options)
487
488 def _build_provider_bundle_from_config(self, profile_dict: dict[str, Any]) -> ProviderBundle:
489 # Initialize StorageProvider
490 storage_provider_dict = profile_dict.get("storage_provider", None)
491 if storage_provider_dict:
492 storage_provider_name = storage_provider_dict["type"]
493 storage_options = storage_provider_dict.get("options", {})
494 else:
495 raise ValueError("Missing storage_provider in the config.")
496
497 # Initialize CredentialsProvider
498 # It is prudent to assume that in some cases, the credentials provider
499 # will provide credentials scoped to specific base_path.
500 # So we need to pass the storage_options to the credentials provider.
501 credentials_provider_dict = profile_dict.get("credentials_provider", None)
502 credentials_provider = self._build_credentials_provider(
503 credentials_provider_dict=credentials_provider_dict,
504 storage_options=storage_options,
505 )
506
507 # Initialize MetadataProvider
508 metadata_provider_dict = profile_dict.get("metadata_provider", None)
509 metadata_provider = None
510 if metadata_provider_dict:
511 if metadata_provider_dict["type"] == "manifest":
512 metadata_options = metadata_provider_dict.get("options", {})
513 # If MetadataProvider has a reference to a different storage provider profile
514 storage_provider_profile = metadata_options.pop("storage_provider_profile", None)
515 if storage_provider_profile:
516 storage_provider = self._build_storage_provider_from_profile(storage_provider_profile)
517 else:
518 storage_provider = self._build_storage_provider(
519 storage_provider_name, storage_options, credentials_provider
520 )
521
522 metadata_provider = ManifestMetadataProvider(storage_provider, **metadata_options)
523 else:
524 class_type = metadata_provider_dict["type"]
525 if "." not in class_type:
526 raise ValueError(
527 f"Expected a fully qualified class name (e.g., 'module.ClassName'); got '{class_type}'."
528 )
529 module_name, class_name = class_type.rsplit(".", 1)
530 cls = import_class(class_name, module_name)
531 options = metadata_provider_dict.get("options", {})
532 metadata_provider = cls(**options)
533
534 # Build replicas if configured
535 replicas_config = profile_dict.get("replicas", [])
536 replicas = []
537 if replicas_config:
538 for replica_dict in replicas_config:
539 replicas.append(
540 Replica(
541 replica_profile=replica_dict["replica_profile"],
542 read_priority=replica_dict["read_priority"],
543 )
544 )
545
546 # Sort replicas by read_priority
547 replicas.sort(key=lambda r: r.read_priority)
548
549 return SimpleProviderBundle(
550 storage_provider_config=StorageProviderConfig(storage_provider_name, storage_options),
551 credentials_provider=credentials_provider,
552 metadata_provider=metadata_provider,
553 replicas=replicas,
554 )
555
556 def _build_provider_bundle_from_extension(self, provider_bundle_dict: dict[str, Any]) -> ProviderBundle:
557 class_type = provider_bundle_dict["type"]
558 module_name, class_name = class_type.rsplit(".", 1)
559 cls = import_class(class_name, module_name)
560 options = provider_bundle_dict.get("options", {})
561 return cls(**options)
562
563 def _build_provider_bundle(
564 self, provider_bundle: Optional[Union[ProviderBundle, ProviderBundleV2]]
565 ) -> ProviderBundleV2:
566 if provider_bundle:
567 bundle = provider_bundle
568 else:
569 provider_bundle_dict = self._profile_dict.get("provider_bundle", None)
570 if provider_bundle_dict:
571 bundle = self._build_provider_bundle_from_extension(provider_bundle_dict)
572 else:
573 bundle = self._build_provider_bundle_from_config(self._profile_dict)
574
575 if isinstance(bundle, ProviderBundle) and not isinstance(bundle, ProviderBundleV2):
576 bundle = SimpleProviderBundleV2.from_v1_bundle(self._profile, bundle)
577
578 return bundle
579
580 def _inject_profiles_from_bundle(self) -> None:
581 if len(self._provider_bundle.storage_backends) > 1:
582 profiles = copy.deepcopy(self._profiles)
583
584 for child_name, backend in self._provider_bundle.storage_backends.items():
585 if child_name in self._profiles:
586 raise ValueError(f"Profile '{child_name}' already exists in configuration.")
587 child_config = {
588 "storage_provider": {
589 "type": backend.storage_provider_config.type,
590 "options": backend.storage_provider_config.options,
591 }
592 }
593 profiles[child_name] = child_config
594
595 resolved_config_dict = copy.deepcopy(self._resolved_config_dict)
596 resolved_config_dict["profiles"] = profiles
597
598 self._profiles = ImmutableDict(profiles)
599 self._resolved_config_dict = ImmutableDict(resolved_config_dict)
600
601 def _build_retry_config(self) -> RetryConfig:
602 """Build retry config from profile dict."""
603 retry_config_dict = self._profile_dict.get("retry", None)
604 if retry_config_dict:
605 attempts = retry_config_dict.get("attempts", DEFAULT_RETRY_ATTEMPTS)
606 delay = retry_config_dict.get("delay", DEFAULT_RETRY_DELAY)
607 backoff_multiplier = retry_config_dict.get("backoff_multiplier", DEFAULT_RETRY_BACKOFF_MULTIPLIER)
608 return RetryConfig(attempts=attempts, delay=delay, backoff_multiplier=backoff_multiplier)
609 else:
610 return RetryConfig(
611 attempts=DEFAULT_RETRY_ATTEMPTS,
612 delay=DEFAULT_RETRY_DELAY,
613 backoff_multiplier=DEFAULT_RETRY_BACKOFF_MULTIPLIER,
614 )
615
616 def _build_autocommit_config(self) -> AutoCommitConfig:
617 """Build autocommit config from profile dict."""
618 autocommit_dict = self._profile_dict.get("autocommit", None)
619 if autocommit_dict:
620 interval_minutes = autocommit_dict.get("interval_minutes", None)
621 at_exit = autocommit_dict.get("at_exit", False)
622 return AutoCommitConfig(interval_minutes=interval_minutes, at_exit=at_exit)
623 return AutoCommitConfig()
624
625 def _verify_cache_config(self, cache_dict: dict[str, Any]) -> None:
626 if "size_mb" in cache_dict:
627 raise ValueError(
628 "The 'size_mb' property is no longer supported. \n"
629 "Please use 'size' with a unit suffix (M, G, T) instead of size_mb.\n"
630 "Example configuration:\n"
631 "cache:\n"
632 " size: 500G # Optional: If not specified, default cache size (10GB) will be used\n"
633 " use_etag: true # Optional: If not specified, default cache use_etag (true) will be used\n"
634 " location: /tmp/msc_cache # Optional: If not specified, default cache location (system temporary directory + '/msc_cache') will be used\n"
635 " eviction_policy: # Optional: The eviction policy to use\n"
636 " policy: fifo # Optional: The eviction policy to use, default is 'fifo'\n"
637 " refresh_interval: 300 # Optional: If not specified, default cache refresh interval (300 seconds) will be used\n"
638 )
639
640 def _validate_replicas(self, replicas: list[Replica]) -> None:
641 """
642 Validates that replica profiles do not have their own replicas configuration.
643
644 This prevents circular references where a replica profile could reference
645 another profile that also has replicas, creating an infinite loop.
646
647 :param replicas: The list of Replica objects to validate
648 :raises ValueError: If any replica profile has its own replicas configuration
649 """
650 for replica in replicas:
651 replica_profile_name = replica.replica_profile
652
653 # Check that replica profile is not the same as the current profile
654 if replica_profile_name == self._profile:
655 raise ValueError(
656 f"Replica profile {replica_profile_name} cannot be the same as the profile {self._profile}."
657 )
658
659 # Check if the replica profile exists in the configuration
660 if replica_profile_name not in self._profiles:
661 raise ValueError(f"Replica profile '{replica_profile_name}' not found in configuration")
662
663 # Get the replica profile configuration
664 replica_profile_dict = self._profiles[replica_profile_name]
665
666 # Check if the replica profile has its own replicas configuration
667 if "replicas" in replica_profile_dict and replica_profile_dict["replicas"]:
668 raise ValueError(
669 f"Invalid replica configuration: profile '{replica_profile_name}' has its own replicas. "
670 f"This creates a circular reference which is not allowed."
671 )
672
673 # todo: remove once experimental features are stable
674 def _validate_experimental_features(self, eviction_policy: EvictionPolicyConfig) -> None:
675 """
676 Validate that experimental features are enabled when used.
677
678 :param eviction_policy: The eviction policy configuration to validate
679 :raises ValueError: If experimental features are used without being enabled
680 """
681 # Validate MRU eviction policy
682 if eviction_policy.policy.upper() == "MRU":
683 if not (self._experimental_features and self._experimental_features.get("cache_mru_eviction")):
684 raise ValueError(
685 "MRU eviction policy is experimental and not enabled.\n"
686 "Enable it by adding to config:\n"
687 " experimental_features:\n"
688 " cache_mru_eviction: true"
689 )
690
691 # Validate purge_factor
692 if eviction_policy.purge_factor != 0:
693 if eviction_policy.purge_factor < 0 or eviction_policy.purge_factor > 100:
694 raise ValueError("purge_factor must be between 0 and 100")
695
696 if not (self._experimental_features and self._experimental_features.get("cache_purge_factor")):
697 raise ValueError(
698 "purge_factor is experimental and not enabled.\n"
699 "Enable it by adding to config:\n"
700 " experimental_features:\n"
701 " cache_purge_factor: true"
702 )
703
704 def build_config(self) -> "StorageClientConfig":
705 bundle = self._provider_bundle
706 backends = bundle.storage_backends
707
708 if len(backends) > 1:
709 if not bundle.metadata_provider:
710 raise ValueError(
711 f"Multi-backend configuration for profile '{self._profile}' requires metadata_provider "
712 "for routing between storage locations."
713 )
714
715 child_profile_names = list(backends.keys())
716 retry_config = self._build_retry_config()
717 autocommit_config = self._build_autocommit_config()
718
719 # config for Composite StorageClient
720 config = StorageClientConfig(
721 profile=self._profile,
722 storage_provider=None,
723 credentials_provider=None,
724 storage_provider_profiles=child_profile_names,
725 metadata_provider=bundle.metadata_provider,
726 cache_config=None,
727 cache_manager=None,
728 retry_config=retry_config,
729 telemetry_provider=self._telemetry_provider,
730 replicas=[],
731 autocommit_config=autocommit_config,
732 )
733
734 config._config_dict = self._resolved_config_dict
735 return config
736
737 # Single-backend (len == 1)
738 _, backend = list(backends.items())[0]
739 storage_provider = self._build_storage_provider(
740 backend.storage_provider_config.type,
741 backend.storage_provider_config.options,
742 backend.credentials_provider,
743 )
744 credentials_provider = backend.credentials_provider
745 metadata_provider = bundle.metadata_provider
746 replicas = backend.replicas
747
748 # Validate replicas to prevent circular references
749 if replicas:
750 self._validate_replicas(replicas)
751
752 cache_config: Optional[CacheConfig] = None
753 cache_manager: Optional[CacheManager] = None
754
755 # Check if caching is enabled for this profile
756 caching_enabled = self._profile_dict.get("caching_enabled", False)
757
758 if self._cache_dict is not None and caching_enabled:
759 tempdir = tempfile.gettempdir()
760 default_location = os.path.join(tempdir, "msc_cache")
761 location = self._cache_dict.get("location", default_location)
762
763 # Check if cache_backend.cache_path is defined
764 cache_backend = self._cache_dict.get("cache_backend", {})
765 cache_backend_path = cache_backend.get("cache_path") if cache_backend else None
766
767 # Warn if both location and cache_backend.cache_path are defined
768 if cache_backend_path and self._cache_dict.get("location") is not None:
769 logger.warning(
770 f"Both 'location' and 'cache_backend.cache_path' are defined in cache config. "
771 f"Using 'location' ({location}) and ignoring 'cache_backend.cache_path' ({cache_backend_path})."
772 )
773 elif cache_backend_path:
774 # Use cache_backend.cache_path only if location is not explicitly defined
775 location = cache_backend_path
776
777 # Resolve the effective flag while preserving explicit ``False``.
778 if "check_source_version" in self._cache_dict:
779 check_source_version = self._cache_dict["check_source_version"]
780 else:
781 check_source_version = self._cache_dict.get("use_etag", True)
782
783 # Warn if both keys are specified – the new one wins.
784 if "check_source_version" in self._cache_dict and "use_etag" in self._cache_dict:
785 logger.warning(
786 "Both 'check_source_version' and 'use_etag' are defined in cache config. "
787 "Using 'check_source_version' and ignoring 'use_etag'."
788 )
789
790 if not Path(location).is_absolute():
791 raise ValueError(f"Cache location must be an absolute path: {location}")
792
793 # Initialize cache_dict with default values
794 cache_dict = self._cache_dict
795
796 # Verify cache config
797 self._verify_cache_config(cache_dict)
798
799 # Initialize eviction policy
800 if "eviction_policy" in cache_dict:
801 policy = cache_dict["eviction_policy"]["policy"].lower()
802 purge_factor = cache_dict["eviction_policy"].get("purge_factor", 0)
803 eviction_policy = EvictionPolicyConfig(
804 policy=policy,
805 refresh_interval=cache_dict["eviction_policy"].get(
806 "refresh_interval", DEFAULT_CACHE_REFRESH_INTERVAL
807 ),
808 purge_factor=purge_factor,
809 )
810 else:
811 eviction_policy = EvictionPolicyConfig(policy="fifo", refresh_interval=DEFAULT_CACHE_REFRESH_INTERVAL)
812
813 # todo: remove once experimental features are stable
814 # Validate experimental features before creating cache_config
815 self._validate_experimental_features(eviction_policy)
816
817 # Create cache config from the standardized format
818 cache_config = CacheConfig(
819 size=cache_dict.get("size", DEFAULT_CACHE_SIZE),
820 location=cache_dict.get("location", location),
821 check_source_version=check_source_version,
822 eviction_policy=eviction_policy,
823 cache_line_size=cache_dict.get("cache_line_size", DEFAULT_CACHE_LINE_SIZE),
824 )
825
826 cache_manager = CacheManager(profile=self._profile, cache_config=cache_config)
827 elif self._cache_dict is not None and not caching_enabled:
828 logger.debug(f"Caching is disabled for profile '{self._profile}'")
829 elif self._cache_dict is None and caching_enabled:
830 logger.warning(f"Caching is enabled for profile '{self._profile}' but no cache configuration is provided")
831
832 retry_config = self._build_retry_config()
833 autocommit_config = self._build_autocommit_config()
834
835 config = StorageClientConfig(
836 profile=self._profile,
837 storage_provider=storage_provider,
838 credentials_provider=credentials_provider,
839 storage_provider_profiles=None,
840 metadata_provider=metadata_provider,
841 cache_config=cache_config,
842 cache_manager=cache_manager,
843 retry_config=retry_config,
844 telemetry_provider=self._telemetry_provider,
845 replicas=replicas,
846 autocommit_config=autocommit_config,
847 )
848
849 config._config_dict = self._resolved_config_dict
850 return config
851
852
853class PathMapping:
854 """
855 Class to handle path mappings defined in the MSC configuration.
856
857 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
858 where entries are sorted by prefix length (longest first) for optimal matching.
859 Longer paths take precedence when matching.
860 """
861
862 def __init__(self):
863 """Initialize an empty PathMapping."""
864 self._mapping = defaultdict(lambda: defaultdict(list))
865
866 @classmethod
867 def from_config(cls, config_dict: Optional[dict[str, Any]] = None) -> "PathMapping":
868 """
869 Create a PathMapping instance from configuration dictionary.
870
871 :param config_dict: Configuration dictionary, if None the config will be loaded
872 :return: A PathMapping instance with processed mappings
873 """
874 if config_dict is None:
875 # Import locally to avoid circular imports
876 from multistorageclient.config import StorageClientConfig
877
878 config_dict, _ = StorageClientConfig.read_msc_config()
879
880 if not config_dict:
881 return cls()
882
883 instance = cls()
884 instance._load_mapping(config_dict)
885 return instance
886
887 def _load_mapping(self, config_dict: dict[str, Any]) -> None:
888 """
889 Load path mapping from a configuration dictionary.
890
891 :param config_dict: Configuration dictionary containing path mapping
892 """
893 # Get the path_mapping section
894 path_mapping = config_dict.get("path_mapping", {})
895 if path_mapping is None:
896 return
897
898 # Process each mapping
899 for source_path, dest_path in path_mapping.items():
900 # Validate format
901 if not source_path.endswith("/"):
902 continue
903 if not dest_path.startswith(MSC_PROTOCOL):
904 continue
905 if not dest_path.endswith("/"):
906 continue
907
908 # Extract the destination profile
909 pr_dest = urlparse(dest_path)
910 dest_profile = pr_dest.netloc
911
912 # Parse the source path
913 pr = urlparse(source_path)
914 protocol = pr.scheme.lower() if pr.scheme else "file"
915
916 if protocol == "file" or source_path.startswith("/"):
917 # For file or absolute paths, use the whole path as the prefix
918 # and leave bucket empty
919 bucket = ""
920 prefix = source_path if source_path.startswith("/") else pr.path
921 else:
922 # For object storage, extract bucket and prefix
923 bucket = pr.netloc
924 prefix = pr.path
925 if prefix.startswith("/"):
926 prefix = prefix[1:]
927
928 # Add the mapping to the nested dict
929 self._mapping[protocol][bucket].append((prefix, dest_profile))
930
931 # Sort each bucket's prefixes by length (longest first) for optimal matching
932 for protocol, buckets in self._mapping.items():
933 for bucket, prefixes in buckets.items():
934 self._mapping[protocol][bucket] = sorted(prefixes, key=lambda x: len(x[0]), reverse=True)
935
936 def find_mapping(self, url: str) -> Optional[tuple[str, str]]:
937 """
938 Find the best matching mapping for the given URL.
939
940 :param url: URL to find matching mapping for
941 :return: Tuple of (profile_name, translated_path) if a match is found, None otherwise
942 """
943 # Parse the URL
944 pr = urlparse(url)
945 protocol = pr.scheme.lower() if pr.scheme else "file"
946
947 # For file paths or absolute paths
948 if protocol == "file" or url.startswith("/"):
949 path = url if url.startswith("/") else pr.path
950
951 possible_mapping = self._mapping[protocol][""] if protocol in self._mapping else []
952
953 # Check each prefix (already sorted by length, longest first)
954 for prefix, profile in possible_mapping:
955 if path.startswith(prefix):
956 # Calculate the relative path
957 rel_path = path[len(prefix) :]
958 if not rel_path.startswith("/"):
959 rel_path = "/" + rel_path
960 return profile, rel_path
961
962 return None
963
964 # For object storage
965 bucket = pr.netloc
966 path = pr.path
967 if path.startswith("/"):
968 path = path[1:]
969
970 # Check bucket-specific mapping
971 possible_mapping = (
972 self._mapping[protocol][bucket] if (protocol in self._mapping and bucket in self._mapping[protocol]) else []
973 )
974
975 # Check each prefix (already sorted by length, longest first)
976 for prefix, profile in possible_mapping:
977 # matching prefix
978 if path.startswith(prefix):
979 rel_path = path[len(prefix) :]
980 # Remove leading slash if present
981 if rel_path.startswith("/"):
982 rel_path = rel_path[1:]
983
984 return profile, rel_path
985
986 return None
987
988
[docs]
989class StorageClientConfig:
990 """
991 Configuration class for the :py:class:`multistorageclient.StorageClient`.
992 """
993
994 profile: str
995 storage_provider: Optional[StorageProvider]
996 credentials_provider: Optional[CredentialsProvider]
997 storage_provider_profiles: Optional[list[str]]
998 metadata_provider: Optional[MetadataProvider]
999 cache_config: Optional[CacheConfig]
1000 cache_manager: Optional[CacheManager]
1001 retry_config: Optional[RetryConfig]
1002 telemetry_provider: Optional[Callable[[], Telemetry]]
1003 replicas: list[Replica]
1004 autocommit_config: Optional[AutoCommitConfig]
1005
1006 _config_dict: Optional[dict[str, Any]]
1007
1008 def __init__(
1009 self,
1010 profile: str,
1011 storage_provider: Optional[StorageProvider] = None,
1012 credentials_provider: Optional[CredentialsProvider] = None,
1013 storage_provider_profiles: Optional[list[str]] = None,
1014 metadata_provider: Optional[MetadataProvider] = None,
1015 cache_config: Optional[CacheConfig] = None,
1016 cache_manager: Optional[CacheManager] = None,
1017 retry_config: Optional[RetryConfig] = None,
1018 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1019 replicas: Optional[list[Replica]] = None,
1020 autocommit_config: Optional[AutoCommitConfig] = None,
1021 ):
1022 # exactly one of storage_provider or storage_provider_profiles must be set
1023 if storage_provider and storage_provider_profiles:
1024 raise ValueError(
1025 "Cannot specify both storage_provider and storage_provider_profiles. "
1026 "Use storage_provider for SingleStorageClient or storage_provider_profiles for CompositeStorageClient."
1027 )
1028 if not storage_provider and not storage_provider_profiles:
1029 raise ValueError("Must specify either storage_provider or storage_provider_profiles.")
1030
1031 if replicas is None:
1032 replicas = []
1033 self.profile = profile
1034 self.storage_provider = storage_provider
1035 self.credentials_provider = credentials_provider
1036 self.storage_provider_profiles = storage_provider_profiles
1037 self.metadata_provider = metadata_provider
1038 self.cache_config = cache_config
1039 self.cache_manager = cache_manager
1040 self.retry_config = retry_config
1041 self.telemetry_provider = telemetry_provider
1042 self.replicas = replicas
1043 self.autocommit_config = autocommit_config
1044
[docs]
1045 @staticmethod
1046 def from_json(
1047 config_json: str,
1048 profile: str = DEFAULT_POSIX_PROFILE_NAME,
1049 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1050 ) -> "StorageClientConfig":
1051 """
1052 Load a storage client configuration from a JSON string.
1053
1054 :param config_json: Configuration JSON string.
1055 :param profile: Profile to use.
1056 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1057 """
1058 config_dict = json.loads(config_json)
1059 return StorageClientConfig.from_dict(
1060 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
1061 )
1062
[docs]
1063 @staticmethod
1064 def from_yaml(
1065 config_yaml: str,
1066 profile: str = DEFAULT_POSIX_PROFILE_NAME,
1067 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1068 ) -> "StorageClientConfig":
1069 """
1070 Load a storage client configuration from a YAML string.
1071
1072 :param config_yaml: Configuration YAML string.
1073 :param profile: Profile to use.
1074 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1075 """
1076 config_dict = yaml.safe_load(config_yaml)
1077 return StorageClientConfig.from_dict(
1078 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
1079 )
1080
[docs]
1081 @staticmethod
1082 def from_dict(
1083 config_dict: dict[str, Any],
1084 profile: str = DEFAULT_POSIX_PROFILE_NAME,
1085 skip_validation: bool = False,
1086 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1087 ) -> "StorageClientConfig":
1088 """
1089 Load a storage client configuration from a Python dictionary.
1090
1091 :param config_dict: Configuration Python dictionary.
1092 :param profile: Profile to use.
1093 :param skip_validation: Skip configuration schema validation.
1094 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1095 """
1096 # Validate the config file with predefined JSON schema
1097 if not skip_validation:
1098 validate_config(config_dict)
1099
1100 # Load config
1101 loader = StorageClientConfigLoader(
1102 config_dict=config_dict, profile=profile, telemetry_provider=telemetry_provider
1103 )
1104 config = loader.build_config()
1105
1106 return config
1107
[docs]
1108 @staticmethod
1109 def from_file(
1110 config_file_paths: Optional[Iterable[str]] = None,
1111 profile: str = DEFAULT_POSIX_PROFILE_NAME,
1112 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1113 ) -> "StorageClientConfig":
1114 """
1115 Load a storage client configuration from the first file found.
1116
1117 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used (see :py:meth:`StorageClientConfig.read_msc_config`).
1118 :param profile: Profile to use.
1119 :param telemetry_provider: A function that provides a telemetry instance. The function must be defined at the top level of a module to work with pickling.
1120 """
1121 msc_config_dict, msc_config_file = StorageClientConfig.read_msc_config(config_file_paths=config_file_paths)
1122 # Parse rclone config file.
1123 rclone_config_dict, rclone_config_file = read_rclone_config()
1124
1125 # Merge config files.
1126 merged_config, conflicted_keys = merge_dictionaries_no_overwrite(msc_config_dict, rclone_config_dict)
1127 if conflicted_keys:
1128 raise ValueError(
1129 f'Conflicting keys found in configuration files "{msc_config_file}" and "{rclone_config_file}: {conflicted_keys}'
1130 )
1131 merged_profiles = merged_config.get("profiles", {})
1132
1133 # Check if profile is in merged_profiles
1134 if profile in merged_profiles:
1135 return StorageClientConfig.from_dict(
1136 config_dict=merged_config, profile=profile, telemetry_provider=telemetry_provider
1137 )
1138 else:
1139 # Check if profile is the default profile or an implicit profile
1140 if profile == DEFAULT_POSIX_PROFILE_NAME:
1141 implicit_profile_config = DEFAULT_POSIX_PROFILE
1142 elif profile.startswith("_"):
1143 # Handle implicit profiles
1144 parts = profile[1:].split("-", 1)
1145 if len(parts) == 2:
1146 protocol, bucket = parts
1147 # Verify it's a supported protocol
1148 if protocol not in SUPPORTED_IMPLICIT_PROFILE_PROTOCOLS:
1149 raise ValueError(f'Unsupported protocol in implicit profile: "{protocol}"')
1150 implicit_profile_config = create_implicit_profile_config(
1151 profile_name=profile, protocol=protocol, base_path=bucket
1152 )
1153 else:
1154 raise ValueError(f'Invalid implicit profile format: "{profile}"')
1155 else:
1156 raise ValueError(
1157 f'Profile "{profile}" not found in configuration files. Configuration was checked in '
1158 f"{msc_config_file or 'MSC config (not found)'} and {rclone_config_file or 'Rclone config (not found)'}. "
1159 f"Please verify that the profile exists and that configuration files are correctly located."
1160 )
1161 # merge the implicit profile config into the merged config so the cache & observability config can be inherited
1162 if "profiles" not in merged_config:
1163 merged_config["profiles"] = implicit_profile_config["profiles"]
1164 else:
1165 merged_config["profiles"][profile] = implicit_profile_config["profiles"][profile]
1166 # the config is already validated while reading, skip the validation for implicit profiles which start profile with "_"
1167 return StorageClientConfig.from_dict(
1168 config_dict=merged_config, profile=profile, skip_validation=True, telemetry_provider=telemetry_provider
1169 )
1170
1171 @staticmethod
1172 def from_provider_bundle(
1173 config_dict: dict[str, Any],
1174 provider_bundle: Union[ProviderBundle, ProviderBundleV2],
1175 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
1176 ) -> "StorageClientConfig":
1177 loader = StorageClientConfigLoader(
1178 config_dict=config_dict, provider_bundle=provider_bundle, telemetry_provider=telemetry_provider
1179 )
1180 config = loader.build_config()
1181 config._config_dict = None # Explicitly mark as None to avoid confusing pickling errors
1182 return config
1183
[docs]
1184 @staticmethod
1185 def read_msc_config(
1186 config_file_paths: Optional[Iterable[str]] = None,
1187 ) -> tuple[Optional[dict[str, Any]], Optional[str]]:
1188 """Get the MSC configuration dictionary and the path of the first file found.
1189
1190 If no config paths are specified, configs are searched in the following order:
1191
1192 1. ``MSC_CONFIG`` environment variable (highest precedence)
1193 2. Standard search paths (user-specified config and system-wide config)
1194
1195 :param config_file_paths: Configuration file search paths. If omitted, the default search paths are used.
1196 :return: Tuple of ``(config_dict, config_file_path)``. ``config_dict`` is the MSC configuration
1197 dictionary or empty dict if no config was found. ``config_file_path`` is the absolute
1198 path of the config file used, or ``None`` if no config file was found.
1199 """
1200 config_dict: dict[str, Any] = {}
1201 config_file_path: Optional[str] = None
1202
1203 config_file_paths = list(config_file_paths or [])
1204
1205 # Add default paths if none provided.
1206 if len(config_file_paths) == 0:
1207 # Environment variable.
1208 msc_config_env = os.getenv("MSC_CONFIG", None)
1209 if msc_config_env is not None:
1210 config_file_paths.append(msc_config_env)
1211
1212 # Standard search paths.
1213 config_file_paths.extend(_find_config_file_paths())
1214
1215 # Normalize + absolutize paths.
1216 config_file_paths = [os.path.abspath(path) for path in config_file_paths]
1217
1218 # Log plan.
1219 logger.debug(f"Searching MSC config file paths: {config_file_paths}")
1220
1221 # Load config.
1222 for path in config_file_paths:
1223 if os.path.exists(path):
1224 try:
1225 with open(path) as f:
1226 if path.endswith(".json"):
1227 config_dict = json.load(f)
1228 else:
1229 config_dict = yaml.safe_load(f)
1230 config_file_path = path
1231 # Use the first config file.
1232 break
1233 except Exception as e:
1234 raise ValueError(f"malformed MSC config file: {path}, exception: {e}")
1235
1236 # Log result.
1237 if config_file_path is None:
1238 logger.debug("No MSC config files found in any of the search locations.")
1239 else:
1240 logger.debug(f"Using MSC config file: {config_file_path}")
1241
1242 if config_dict:
1243 validate_config(config_dict)
1244 return config_dict, config_file_path
1245
[docs]
1246 @staticmethod
1247 def read_path_mapping() -> PathMapping:
1248 """
1249 Get the path mapping defined in the MSC configuration.
1250
1251 Path mappings create a nested structure of protocol -> bucket -> [(prefix, profile)]
1252 where entries are sorted by prefix length (longest first) for optimal matching.
1253 Longer paths take precedence when matching.
1254
1255 :return: A PathMapping instance with translation mappings
1256 """
1257 try:
1258 return PathMapping.from_config()
1259 except Exception:
1260 # Log the error but continue - this shouldn't stop the application from working
1261 logger.error("Failed to load path_mapping from MSC config")
1262 return PathMapping()
1263
1264 def __getstate__(self) -> dict[str, Any]:
1265 state = self.__dict__.copy()
1266 if not state.get("_config_dict"):
1267 raise ValueError("StorageClientConfig is not serializable")
1268 del state["credentials_provider"]
1269 del state["storage_provider"]
1270 del state["metadata_provider"]
1271 del state["cache_manager"]
1272 del state["replicas"]
1273 return state
1274
1275 def __setstate__(self, state: dict[str, Any]) -> None:
1276 # Presence checked by __getstate__.
1277 config_dict = state["_config_dict"]
1278 loader = StorageClientConfigLoader(
1279 config_dict=config_dict,
1280 profile=state["profile"],
1281 telemetry_provider=state["telemetry_provider"],
1282 )
1283 new_config = loader.build_config()
1284 self.profile = new_config.profile
1285 self.storage_provider = new_config.storage_provider
1286 self.credentials_provider = new_config.credentials_provider
1287 self.storage_provider_profiles = new_config.storage_provider_profiles
1288 self.metadata_provider = new_config.metadata_provider
1289 self.cache_config = new_config.cache_config
1290 self.cache_manager = new_config.cache_manager
1291 self.retry_config = new_config.retry_config
1292 self.telemetry_provider = new_config.telemetry_provider
1293 self._config_dict = config_dict
1294 self.replicas = new_config.replicas
1295 self.autocommit_config = new_config.autocommit_config