1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import contextlib
18import logging
19import os
20import threading
21from collections.abc import Iterator
22from datetime import datetime, timezone
23from io import BytesIO
24from pathlib import PurePosixPath
25from typing import IO, Any, List, Optional, Union, cast
26
27from .config import StorageClientConfig
28from .constants import MEMORY_LOAD_LIMIT
29from .file import ObjectFile, PosixFile
30from .instrumentation.utils import instrumented
31from .providers.posix_file import PosixFileStorageProvider
32from .replica_manager import ReplicaManager
33from .retry import retry
34from .sync import SyncManager
35from .types import (
36 AWARE_DATETIME_MIN,
37 MSC_PROTOCOL,
38 ExecutionMode,
39 ObjectMetadata,
40 PatternList,
41 Range,
42 Replica,
43 SourceVersionCheckMode,
44)
45from .utils import NullStorageClient, PatternMatcher, join_paths
46
47logger = logging.getLogger(__name__)
48
49
[docs]
50@instrumented
51class StorageClient:
52 """
53 A client for interacting with different storage providers.
54 """
55
56 _config: StorageClientConfig
57 _metadata_provider_lock: Optional[threading.Lock] = None
58 _stop_event: Optional[threading.Event] = None
59 _replica_manager: Optional[ReplicaManager] = None
60
61 def __init__(self, config: StorageClientConfig):
62 """
63 Initializes the :py:class:`StorageClient` with the given configuration.
64
65 :param config: The configuration object for the storage client.
66 """
67 self._initialize_providers(config)
68 self._initialize_replicas(config.replicas)
69
70 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event):
71 if not stop_event:
72 raise RuntimeError("Stop event not set")
73
74 while not stop_event.is_set():
75 # Wait with the ability to exit early
76 if stop_event.wait(timeout=commit_interval_minutes * 60):
77 break
78 logger.debug("Auto-committing to metadata provider")
79 self.commit_metadata()
80
81 def _commit_on_exit(self):
82 logger.debug("Shutting down, committing metadata one last time...")
83 self.commit_metadata()
84
85 def _initialize_providers(self, config: StorageClientConfig) -> None:
86 self._config = config
87 self._credentials_provider = self._config.credentials_provider
88 self._storage_provider = self._config.storage_provider
89 self._metadata_provider = self._config.metadata_provider
90 self._cache_config = self._config.cache_config
91 self._retry_config = self._config.retry_config
92 self._cache_manager = self._config.cache_manager
93 self._autocommit_config = self._config.autocommit_config
94
95 if self._autocommit_config:
96 if self._metadata_provider:
97 logger.debug("Creating auto-commiter thread")
98
99 if self._autocommit_config.interval_minutes:
100 self._stop_event = threading.Event()
101 self._commit_thread = threading.Thread(
102 target=self._committer_thread,
103 daemon=True,
104 args=(self._autocommit_config.interval_minutes, self._stop_event),
105 )
106 self._commit_thread.start()
107
108 if self._autocommit_config.at_exit:
109 atexit.register(self._commit_on_exit)
110
111 self._metadata_provider_lock = threading.Lock()
112 else:
113 logger.debug("No metadata provider configured, auto-commit will not be enabled")
114
115 def __del__(self):
116 if self._stop_event:
117 self._stop_event.set()
118 if self._commit_thread.is_alive():
119 self._commit_thread.join(timeout=5.0)
120
121 def _get_source_version(self, path: str) -> Optional[str]:
122 """
123 Get etag from metadata provider or storage provider.
124 """
125 if self._metadata_provider:
126 metadata = self._metadata_provider.get_object_metadata(path)
127 else:
128 metadata = self._storage_provider.get_object_metadata(path)
129 return metadata.etag
130
131 def _is_cache_enabled(self) -> bool:
132 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider()
133 return enabled
134
135 def _is_posix_file_storage_provider(self) -> bool:
136 return isinstance(self._storage_provider, PosixFileStorageProvider)
137
[docs]
138 def is_default_profile(self) -> bool:
139 """
140 Return True if the storage client is using the default profile.
141 """
142 return self._config.profile == "default"
143
144 def _is_rust_client_enabled(self) -> bool:
145 """
146 Return True if the storage provider is using the Rust client.
147 """
148 return hasattr(self._storage_provider, "_rust_client")
149
150 @property
151 def profile(self) -> str:
152 return self._config.profile
153
154 def _initialize_replicas(self, replicas: list[Replica]) -> None:
155 """Initialize replica StorageClient instances."""
156 # Sort replicas by read_priority, the first one is the primary replica
157 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority)
158
159 replica_clients = []
160 for replica in sorted_replicas:
161 if self._config._config_dict is None:
162 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config")
163 replica_config = StorageClientConfig.from_dict(
164 config_dict=self._config._config_dict, profile=replica.replica_profile
165 )
166
167 storage_client = StorageClient(config=replica_config)
168 replica_clients.append(storage_client)
169
170 self._replicas = replica_clients
171 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None
172
173 @property
174 def replicas(self) -> list["StorageClient"]:
175 """Return StorageClient instances for all replicas, sorted by read priority."""
176 return self._replicas
177
178 @retry
179 def read(
180 self,
181 path: str,
182 byte_range: Optional[Range] = None,
183 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
184 ) -> bytes:
185 """
186 Reads an object from the specified logical path.
187
188 :param path: The logical path of the object to read.
189 :param byte_range: Optional byte range to read.
190 :param check_source_version: Whether to check the source version of cached objects.
191 :param size: Optional file size for range reads.
192 :return: The content of the object.
193 """
194 if self._metadata_provider:
195 path, exists = self._metadata_provider.realpath(path)
196 if not exists:
197 raise FileNotFoundError(f"The file at path '{path}' was not found.")
198
199 # Handle caching logic
200 if self._is_cache_enabled() and self._cache_manager:
201 if byte_range:
202 # Range request with cache
203 try:
204 if check_source_version == SourceVersionCheckMode.ENABLE:
205 metadata = self._storage_provider.get_object_metadata(path)
206 source_version = metadata.etag
207 elif check_source_version == SourceVersionCheckMode.INHERIT:
208 if self._cache_manager.check_source_version():
209 metadata = self._storage_provider.get_object_metadata(path)
210 source_version = metadata.etag
211 else:
212 metadata = None
213 source_version = None
214 else:
215 metadata = None
216 source_version = None
217
218 data = self._cache_manager.read(
219 key=path,
220 source_version=source_version,
221 byte_range=byte_range,
222 storage_provider=self._storage_provider,
223 )
224 if data is not None:
225 return data
226 # Fallback (should not normally happen)
227 return self._storage_provider.get_object(path, byte_range=byte_range)
228 except (FileNotFoundError, Exception):
229 # Fall back to direct read if metadata fetching fails
230 return self._storage_provider.get_object(path, byte_range=byte_range)
231 else:
232 # Full file read with cache
233 source_version = self._get_source_version(path)
234 data = self._cache_manager.read(path, source_version)
235
236 # Read from cache if the file exists
237 if self._is_cache_enabled():
238 if self._cache_manager is None:
239 raise RuntimeError("Cache manager is not initialized")
240 source_version = self._get_source_version(path)
241 data = self._cache_manager.read(path, source_version)
242
243 if data is None:
244 if self._replica_manager:
245 data = self._read_from_replica_or_primary(path)
246 else:
247 data = self._storage_provider.get_object(path)
248 self._cache_manager.set(path, data, source_version)
249 return data
250 elif self._replica_manager:
251 # No cache, but replicas available
252 return self._read_from_replica_or_primary(path)
253 else:
254 # No cache, no replicas - direct storage provider read
255 return self._storage_provider.get_object(path, byte_range=byte_range)
256
257 def _read_from_replica_or_primary(self, path: str) -> bytes:
258 """
259 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files.
260 """
261 if self._replica_manager is None:
262 raise RuntimeError("Replica manager is not initialized")
263 file_obj = BytesIO()
264 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider)
265 return file_obj.getvalue()
266
[docs]
267 def info(self, path: str, strict: bool = True) -> ObjectMetadata:
268 """
269 Retrieves metadata or information about an object stored at the specified path.
270
271 :param path: The logical path to the object for which metadata or information is being retrieved.
272 :param strict: If True, performs additional validation to determine whether the path refers to a directory.
273
274 :return: A dictionary containing metadata about the object.
275 """
276
277 if not path or path == ".": # for the empty path provided by the user
278 if self._is_posix_file_storage_provider():
279 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc)
280 else:
281 last_modified = AWARE_DATETIME_MIN
282 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified)
283
284 if not self._metadata_provider:
285 return self._storage_provider.get_object_metadata(path, strict=strict)
286
287 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory.
288 try:
289 return self._metadata_provider.get_object_metadata(path)
290 except FileNotFoundError:
291 # Try listing from the parent to determine if path is a valid directory
292 parent = os.path.dirname(path.rstrip("/")) + "/"
293 parent = "" if parent == "/" else parent
294 target = path.rstrip("/") + "/"
295
296 try:
297 entries = self._metadata_provider.list_objects(parent, include_directories=True)
298 for entry in entries:
299 if entry.key == target and entry.type == "directory":
300 return entry
301 except Exception:
302 pass
303 raise # Raise original FileNotFoundError
304
305 @retry
306 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None:
307 """
308 Downloads a file to the local file system.
309
310 :param remote_path: The logical path of the file in the storage provider.
311 :param local_path: The local path where the file should be downloaded.
312 """
313
314 if self._metadata_provider:
315 real_path, exists = self._metadata_provider.realpath(remote_path)
316 if not exists:
317 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.")
318 metadata = self._metadata_provider.get_object_metadata(remote_path)
319 self._storage_provider.download_file(real_path, local_path, metadata)
320 elif self._replica_manager:
321 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider)
322 else:
323 self._storage_provider.download_file(remote_path, local_path)
324
325 @retry
326 def upload_file(self, remote_path: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None:
327 """
328 Uploads a file from the local file system.
329
330 :param remote_path: The logical path where the file should be stored.
331 :param local_path: The local path of the file to upload.
332 :param attributes: The attributes to add to the file.
333 """
334 virtual_path = remote_path
335 if self._metadata_provider:
336 remote_path, exists = self._metadata_provider.realpath(remote_path)
337 if exists:
338 raise FileExistsError(
339 f"The file at path '{virtual_path}' already exists; "
340 f"overwriting is not yet allowed when using a metadata provider."
341 )
342 if self._metadata_provider:
343 # if metdata provider is present, we only write attributes to the metadata provider
344 self._storage_provider.upload_file(remote_path, local_path, attributes=None)
345 obj_metadata = self._storage_provider.get_object_metadata(remote_path)
346 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
347 with self._metadata_provider_lock or contextlib.nullcontext():
348 self._metadata_provider.add_file(virtual_path, obj_metadata)
349 else:
350 self._storage_provider.upload_file(remote_path, local_path, attributes)
351
352 @retry
353 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None:
354 """
355 Writes an object at the specified path.
356
357 :param path: The logical path where the object should be written.
358 :param body: The content to write to the object.
359 :param attributes: The attributes to add to the file.
360 """
361 virtual_path = path
362 if self._metadata_provider:
363 path, exists = self._metadata_provider.realpath(path)
364 if exists:
365 raise FileExistsError(
366 f"The file at path '{virtual_path}' already exists; "
367 f"overwriting is not yet allowed when using a metadata provider."
368 )
369 if self._metadata_provider:
370 # if metadata provider is present, we only write attributes to the metadata provider
371 self._storage_provider.put_object(path, body, attributes=None)
372 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
373 obj_metadata = self._storage_provider.get_object_metadata(path)
374 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
375 with self._metadata_provider_lock or contextlib.nullcontext():
376 self._metadata_provider.add_file(virtual_path, obj_metadata)
377 else:
378 self._storage_provider.put_object(path, body, attributes=attributes)
379
[docs]
380 def copy(self, src_path: str, dest_path: str) -> None:
381 """
382 Copies an object from source to destination path.
383
384 :param src_path: The logical path of the source object to copy.
385 :param dest_path: The logical path of the destination.
386 """
387 virtual_dest_path = dest_path
388 if self._metadata_provider:
389 src_path, exists = self._metadata_provider.realpath(src_path)
390 if not exists:
391 raise FileNotFoundError(f"The file at path '{src_path}' was not found.")
392
393 dest_path, exists = self._metadata_provider.realpath(dest_path)
394 if exists:
395 raise FileExistsError(
396 f"The file at path '{virtual_dest_path}' already exists; "
397 f"overwriting is not yet allowed when using a metadata provider."
398 )
399
400 self._storage_provider.copy_object(src_path, dest_path)
401 if self._metadata_provider:
402 metadata = self._storage_provider.get_object_metadata(dest_path)
403 with self._metadata_provider_lock or contextlib.nullcontext():
404 self._metadata_provider.add_file(virtual_dest_path, metadata)
405
[docs]
406 def delete(self, path: str, recursive: bool = False) -> None:
407 """
408 Deletes an object at the specified path.
409
410 :param path: The logical path of the object or directory to delete.
411 :param recursive: Whether to delete objects in the path recursively.
412 """
413 obj_metadata = self.info(path)
414 is_dir = obj_metadata and obj_metadata.type == "directory"
415 is_file = obj_metadata and obj_metadata.type == "file"
416 if recursive and is_dir:
417 self.sync_from(
418 NullStorageClient(),
419 path,
420 path,
421 delete_unmatched_files=True,
422 num_worker_processes=1,
423 description="Deleting",
424 )
425 # If this is a posix storage provider, we need to also delete remaining directory stubs.
426 if self._is_posix_file_storage_provider():
427 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider)
428 posix_storage_provider.rmtree(path)
429 return
430 else:
431 # 1) If path is a file: delete the file
432 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag
433 if is_file:
434 virtual_path = path
435 if self._metadata_provider:
436 path, exists = self._metadata_provider.realpath(path)
437 if not exists:
438 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.")
439 with self._metadata_provider_lock or contextlib.nullcontext():
440 self._metadata_provider.remove_file(virtual_path)
441
442 # Delete the cached file if it exists
443 if self._is_cache_enabled():
444 if self._cache_manager is None:
445 raise RuntimeError("Cache manager is not initialized")
446 self._cache_manager.delete(virtual_path)
447 self._storage_provider.delete_object(path)
448
449 # Delete from replicas if replica manager exists
450 if self._replica_manager:
451 self._replica_manager.delete_from_replicas(virtual_path)
452 elif is_dir:
453 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.")
454 else:
455 raise FileNotFoundError(f"The file at '{path}' was not found.")
456
[docs]
457 def glob(
458 self,
459 pattern: str,
460 include_url_prefix: bool = False,
461 attribute_filter_expression: Optional[str] = None,
462 ) -> list[str]:
463 """
464 Matches and retrieves a list of objects in the storage provider that
465 match the specified pattern.
466
467 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``).
468 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
469 :param attribute_filter_expression: The attribute filter expression to apply to the result.
470
471 :return: A list of object paths that match the pattern.
472 """
473 if self._metadata_provider:
474 results = self._metadata_provider.glob(pattern, attribute_filter_expression)
475 else:
476 results = self._storage_provider.glob(pattern, attribute_filter_expression)
477
478 if include_url_prefix:
479 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results]
480
481 return results
482
[docs]
483 def list(
484 self,
485 prefix: str = "",
486 path: str = "",
487 start_after: Optional[str] = None,
488 end_at: Optional[str] = None,
489 include_directories: bool = False,
490 include_url_prefix: bool = False,
491 attribute_filter_expression: Optional[str] = None,
492 show_attributes: bool = False,
493 follow_symlinks: bool = True,
494 ) -> Iterator[ObjectMetadata]:
495 """
496 Lists objects in the storage provider under the specified path.
497
498 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is
499 deprecated and will be removed in a future version.
500
501 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under.
502 :param path: The directory or file path to list objects under. This should be a
503 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
504 Cannot be used together with ``prefix``.
505 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
506 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
507 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects.
508 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
509 :param attribute_filter_expression: The attribute filter expression to apply to the result.
510 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to True.
511 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When False, symlinks are skipped during listing.
512
513 :return: An iterator over objects.
514
515 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty).
516 """
517 # Parameter validation - either path or prefix, not both
518 if path and prefix:
519 raise ValueError(
520 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). "
521 f"Please use only the 'path' parameter for new code. "
522 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
523 )
524 elif prefix:
525 logger.debug(
526 f"The 'prefix' parameter is deprecated and will be removed in a future version. "
527 f"Please use the 'path' parameter instead. "
528 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
529 )
530
531 # Use path if provided, otherwise fall back to prefix
532 effective_path = path if path else prefix
533 if effective_path and not self.is_file(effective_path):
534 effective_path = (
535 effective_path.rstrip("/") + "/"
536 ) # assume it's a directory if the effective path is not empty
537
538 if self._metadata_provider:
539 objects = self._metadata_provider.list_objects(
540 effective_path,
541 start_after=start_after,
542 end_at=end_at,
543 include_directories=include_directories,
544 attribute_filter_expression=attribute_filter_expression,
545 show_attributes=show_attributes,
546 )
547 else:
548 objects = self._storage_provider.list_objects(
549 effective_path,
550 start_after=start_after,
551 end_at=end_at,
552 include_directories=include_directories,
553 attribute_filter_expression=attribute_filter_expression,
554 show_attributes=show_attributes,
555 follow_symlinks=follow_symlinks,
556 )
557
558 for object in objects:
559 if include_url_prefix:
560 if self.is_default_profile():
561 object.key = str(PurePosixPath("/") / object.key)
562 else:
563 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key)
564 yield object
565
[docs]
566 def open(
567 self,
568 path: str,
569 mode: str = "rb",
570 buffering: int = -1,
571 encoding: Optional[str] = None,
572 disable_read_cache: bool = False,
573 memory_load_limit: int = MEMORY_LOAD_LIMIT,
574 atomic: bool = True,
575 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
576 attributes: Optional[dict[str, str]] = None,
577 prefetch_file: bool = True,
578 ) -> Union[PosixFile, ObjectFile]:
579 """
580 Returns a file-like object from the specified path.
581
582 :param path: The logical path of the object to read.
583 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported.
584 :param buffering: The buffering mode. Only applies to PosixFile.
585 :param encoding: The encoding to use for text files.
586 :param disable_read_cache: When set to True, disables caching for the file content.
587 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
588 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB.
589 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
590 :param atomic: When set to True, the file will be written atomically (rename upon close).
591 This parameter is only applicable to PosixFile in write mode.
592 :param check_source_version: Whether to check the source version of cached objects.
593 :param attributes: The attributes to add to the file. This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab".
594 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
595 """
596 if self._is_posix_file_storage_provider():
597 return PosixFile(
598 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes
599 )
600 else:
601 if atomic is False:
602 logger.warning("Non-atomic writes are not supported for object storage providers.")
603
604 return ObjectFile(
605 self,
606 remote_path=path,
607 mode=mode,
608 encoding=encoding,
609 disable_read_cache=disable_read_cache,
610 memory_load_limit=memory_load_limit,
611 check_source_version=check_source_version,
612 attributes=attributes,
613 prefetch_file=prefetch_file,
614 )
615
[docs]
616 def is_file(self, path: str) -> bool:
617 """
618 Checks whether the specified path points to a file (rather than a directory or folder).
619
620 :param path: The logical path to check.
621
622 :return: ``True`` if the path points to a file, ``False`` otherwise.
623 """
624 if self._metadata_provider:
625 _, exists = self._metadata_provider.realpath(path)
626 return exists
627 return self._storage_provider.is_file(path)
628
646
[docs]
647 def is_empty(self, path: str) -> bool:
648 """
649 Checks whether the specified path is empty. A path is considered empty if there are no
650 objects whose keys start with the given path as a prefix.
651
652 :param path: The logical path to check. This is typically a prefix representing a directory or folder.
653
654 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
655 """
656 if self._metadata_provider:
657 objects = self._metadata_provider.list_objects(path)
658 else:
659 objects = self._storage_provider.list_objects(path)
660
661 try:
662 return next(objects) is None
663 except StopIteration:
664 pass
665 return True
666
667 def __getstate__(self) -> dict[str, Any]:
668 state = self.__dict__.copy()
669 del state["_credentials_provider"]
670 del state["_storage_provider"]
671 del state["_metadata_provider"]
672 del state["_cache_manager"]
673
674 if "_metadata_provider_lock" in state:
675 del state["_metadata_provider_lock"]
676
677 if "_replicas" in state:
678 del state["_replicas"]
679
680 # Replica manager could be disabled if it's set to None in the state.
681 if "_replica_manager" in state:
682 if state["_replica_manager"] is not None:
683 del state["_replica_manager"]
684
685 return state
686
687 def __setstate__(self, state: dict[str, Any]) -> None:
688 config = state["_config"]
689 self._initialize_providers(config)
690
691 # Replica manager could be disabled if it's set to None in the state.
692 if "_replica_manager" in state and state["_replica_manager"] is None:
693 self._replica_manager = None
694 else:
695 self._initialize_replicas(config.replicas)
696
697 if self._metadata_provider:
698 self._metadata_provider_lock = threading.Lock()
699
[docs]
700 def sync_from(
701 self,
702 source_client: "StorageClient",
703 source_path: str = "",
704 target_path: str = "",
705 delete_unmatched_files: bool = False,
706 description: str = "Syncing",
707 num_worker_processes: Optional[int] = None,
708 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
709 patterns: Optional[PatternList] = None,
710 preserve_source_attributes: bool = False,
711 follow_symlinks: bool = True,
712 ) -> None:
713 """
714 Syncs files from the source storage client to "path/".
715
716 :param source_client: The source storage client.
717 :param source_path: The logical path to sync from.
718 :param target_path: The logical path to sync to.
719 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
720 :param description: Description of sync process for logging purposes.
721 :param num_worker_processes: The number of worker processes to use.
722 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
723 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
724 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization.
725 When False (default), only file content is copied. When True, custom metadata attributes are also preserved.
726
727 .. warning::
728 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD
729 request for each object to retrieve attributes, which can significantly impact performance on large-scale
730 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile.
731 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is True.
732 """
733 # Create internal PatternMatcher from patterns if provided
734 pattern_matcher = PatternMatcher(patterns) if patterns else None
735
736 # Disable the replica manager during sync
737 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager:
738 source_client = StorageClient(source_client._config)
739 source_client._replica_manager = None
740
741 m = SyncManager(source_client, source_path, self, target_path)
742
743 m.sync_objects(
744 execution_mode=execution_mode,
745 description=description,
746 num_worker_processes=num_worker_processes,
747 delete_unmatched_files=delete_unmatched_files,
748 pattern_matcher=pattern_matcher,
749 preserve_source_attributes=preserve_source_attributes,
750 follow_symlinks=follow_symlinks,
751 )
752
[docs]
753 def sync_replicas(
754 self,
755 source_path: str,
756 replica_indices: Optional[List[int]] = None,
757 delete_unmatched_files: bool = False,
758 description: str = "Syncing replica",
759 num_worker_processes: Optional[int] = None,
760 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
761 patterns: Optional[PatternList] = None,
762 ) -> None:
763 """
764 Sync files from the source storage client to target replicas.
765
766 :param source_path: The logical path to sync from.
767 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0.
768 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
769 :param description: Description of sync process for logging purposes.
770 :param num_worker_processes: The number of worker processes to use.
771 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
772 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
773 """
774 if not self._replicas:
775 logger.warning(
776 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable "
777 "secondary storage locations for redundancy and performance.",
778 self._config.profile,
779 )
780 return None
781
782 if replica_indices:
783 try:
784 replicas = [self._replicas[i] for i in replica_indices]
785 except IndexError as e:
786 raise ValueError(f"Replica index out of range: {replica_indices}") from e
787 else:
788 replicas = self._replicas
789
790 # Disable the replica manager during sync
791 if self._replica_manager:
792 source_client = StorageClient(self._config)
793 source_client._replica_manager = None
794 else:
795 source_client = self
796
797 for replica in replicas:
798 replica.sync_from(
799 source_client,
800 source_path,
801 source_path,
802 delete_unmatched_files=delete_unmatched_files,
803 description=f"{description} ({replica.profile})",
804 num_worker_processes=num_worker_processes,
805 execution_mode=execution_mode,
806 patterns=patterns,
807 )