1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import contextlib
18import logging
19import os
20import threading
21from collections.abc import Iterator
22from datetime import datetime, timezone
23from io import BytesIO
24from pathlib import PurePosixPath
25from typing import IO, Any, List, Optional, Union, cast
26
27from .config import StorageClientConfig
28from .constants import MEMORY_LOAD_LIMIT
29from .file import ObjectFile, PosixFile
30from .instrumentation.utils import instrumented
31from .providers.posix_file import PosixFileStorageProvider
32from .replica_manager import ReplicaManager
33from .retry import retry
34from .sync import SyncManager
35from .types import (
36 AWARE_DATETIME_MIN,
37 MSC_PROTOCOL,
38 ExecutionMode,
39 ObjectMetadata,
40 Range,
41 Replica,
42 SourceVersionCheckMode,
43)
44from .utils import NullStorageClient, join_paths
45
46logger = logging.getLogger(__name__)
47
48
[docs]
49@instrumented
50class StorageClient:
51 """
52 A client for interacting with different storage providers.
53 """
54
55 _config: StorageClientConfig
56 _metadata_provider_lock: Optional[threading.Lock] = None
57 _stop_event: Optional[threading.Event] = None
58 _replica_manager: Optional[ReplicaManager] = None
59
60 def __init__(self, config: StorageClientConfig):
61 """
62 Initializes the :py:class:`StorageClient` with the given configuration.
63
64 :param config: The configuration object for the storage client.
65 """
66 self._initialize_providers(config)
67 self._initialize_replicas(config.replicas)
68
69 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event):
70 if not stop_event:
71 raise RuntimeError("Stop event not set")
72
73 while not stop_event.is_set():
74 # Wait with the ability to exit early
75 if stop_event.wait(timeout=commit_interval_minutes * 60):
76 break
77 logger.debug("Auto-committing to metadata provider")
78 self.commit_metadata()
79
80 def _commit_on_exit(self):
81 logger.debug("Shutting down, committing metadata one last time...")
82 self.commit_metadata()
83
84 def _initialize_providers(self, config: StorageClientConfig) -> None:
85 self._config = config
86 self._credentials_provider = self._config.credentials_provider
87 self._storage_provider = self._config.storage_provider
88 self._metadata_provider = self._config.metadata_provider
89 self._cache_config = self._config.cache_config
90 self._retry_config = self._config.retry_config
91 self._cache_manager = self._config.cache_manager
92 self._autocommit_config = self._config.autocommit_config
93
94 if self._autocommit_config:
95 if self._metadata_provider:
96 logger.debug("Creating auto-commiter thread")
97
98 if self._autocommit_config.interval_minutes:
99 self._stop_event = threading.Event()
100 self._commit_thread = threading.Thread(
101 target=self._committer_thread,
102 daemon=True,
103 args=(self._autocommit_config.interval_minutes, self._stop_event),
104 )
105 self._commit_thread.start()
106
107 if self._autocommit_config.at_exit:
108 atexit.register(self._commit_on_exit)
109
110 self._metadata_provider_lock = threading.Lock()
111 else:
112 logger.debug("No metadata provider configured, auto-commit will not be enabled")
113
114 def __del__(self):
115 if self._stop_event:
116 self._stop_event.set()
117 if self._commit_thread.is_alive():
118 self._commit_thread.join(timeout=5.0)
119
120 def _get_source_version(self, path: str) -> Optional[str]:
121 """
122 Get etag from metadata provider or storage provider.
123 """
124 if self._metadata_provider:
125 metadata = self._metadata_provider.get_object_metadata(path)
126 else:
127 metadata = self._storage_provider.get_object_metadata(path)
128 return metadata.etag
129
130 def _is_cache_enabled(self) -> bool:
131 return self._cache_manager is not None and not self._is_posix_file_storage_provider()
132
133 def _is_posix_file_storage_provider(self) -> bool:
134 return isinstance(self._storage_provider, PosixFileStorageProvider)
135
[docs]
136 def is_default_profile(self) -> bool:
137 """
138 Return True if the storage client is using the default profile.
139 """
140 return self._config.profile == "default"
141
142 @property
143 def profile(self) -> str:
144 return self._config.profile
145
146 def _initialize_replicas(self, replicas: list[Replica]) -> None:
147 """Initialize replica StorageClient instances."""
148 # Sort replicas by read_priority, the first one is the primary replica
149 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority)
150
151 replica_clients = []
152 for replica in sorted_replicas:
153 if self._config._config_dict is None:
154 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config")
155 replica_config = StorageClientConfig.from_dict(
156 config_dict=self._config._config_dict, profile=replica.replica_profile
157 )
158
159 storage_client = StorageClient(config=replica_config)
160 replica_clients.append(storage_client)
161
162 self._replicas = replica_clients
163 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None
164
165 @property
166 def replicas(self) -> list["StorageClient"]:
167 """Return StorageClient instances for all replicas, sorted by read priority."""
168 return self._replicas
169
170 @retry
171 def read(self, path: str, byte_range: Optional[Range] = None) -> bytes:
172 """
173 Reads an object from the specified logical path.
174
175 :param path: The logical path of the object to read.
176 :return: The content of the object.
177 """
178 if self._metadata_provider:
179 path, exists = self._metadata_provider.realpath(path)
180 if not exists:
181 raise FileNotFoundError(f"The file at path '{path}' was not found.")
182
183 # Never cache range-read requests
184 if byte_range:
185 return self._storage_provider.get_object(path, byte_range=byte_range)
186
187 # Read from cache if the file exists
188 if self._is_cache_enabled():
189 assert self._cache_manager is not None
190 source_version = self._get_source_version(path)
191 data = self._cache_manager.read(path, source_version)
192
193 if data is None:
194 if self._replica_manager:
195 data = self._read_from_replica_or_primary(path)
196 else:
197 data = self._storage_provider.get_object(path)
198 self._cache_manager.set(path, data, source_version)
199 return data
200 elif self._replica_manager:
201 return self._read_from_replica_or_primary(path)
202
203 return self._storage_provider.get_object(path, byte_range=byte_range)
204
205 def _read_from_replica_or_primary(self, path: str) -> bytes:
206 """
207 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files.
208 """
209 assert self._replica_manager is not None, "Replica manager is not initialized"
210 file_obj = BytesIO()
211 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider)
212 return file_obj.getvalue()
213
[docs]
214 def info(self, path: str, strict: bool = True) -> ObjectMetadata:
215 """
216 Retrieves metadata or information about an object stored at the specified path.
217
218 :param path: The logical path to the object for which metadata or information is being retrieved.
219 :param strict: If True, performs additional validation to determine whether the path refers to a directory.
220
221 :return: A dictionary containing metadata about the object.
222 """
223
224 if not path or path == ".": # for the empty path provided by the user
225 if self._is_posix_file_storage_provider():
226 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc)
227 else:
228 last_modified = AWARE_DATETIME_MIN
229 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified)
230
231 if not self._metadata_provider:
232 return self._storage_provider.get_object_metadata(path, strict=strict)
233
234 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory.
235 try:
236 return self._metadata_provider.get_object_metadata(path)
237 except FileNotFoundError:
238 # Try listing from the parent to determine if path is a valid directory
239 parent = os.path.dirname(path.rstrip("/")) + "/"
240 parent = "" if parent == "/" else parent
241 target = path.rstrip("/") + "/"
242
243 try:
244 entries = self._metadata_provider.list_objects(parent, include_directories=True)
245 for entry in entries:
246 if entry.key == target and entry.type == "directory":
247 return entry
248 except Exception:
249 pass
250 raise # Raise original FileNotFoundError
251
252 @retry
253 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None:
254 """
255 Downloads a file to the local file system.
256
257 :param remote_path: The logical path of the file in the storage provider.
258 :param local_path: The local path where the file should be downloaded.
259 """
260
261 if self._metadata_provider:
262 real_path, exists = self._metadata_provider.realpath(remote_path)
263 if not exists:
264 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.")
265 metadata = self._metadata_provider.get_object_metadata(remote_path)
266 self._storage_provider.download_file(real_path, local_path, metadata)
267 elif self._replica_manager:
268 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider)
269 else:
270 self._storage_provider.download_file(remote_path, local_path)
271
272 @retry
273 def upload_file(self, remote_path: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None:
274 """
275 Uploads a file from the local file system.
276
277 :param remote_path: The logical path where the file should be stored.
278 :param local_path: The local path of the file to upload.
279 :param attributes: The attributes to add to the file.
280 """
281 virtual_path = remote_path
282 if self._metadata_provider:
283 remote_path, exists = self._metadata_provider.realpath(remote_path)
284 if exists:
285 raise FileExistsError(
286 f"The file at path '{virtual_path}' already exists; "
287 f"overwriting is not yet allowed when using a metadata provider."
288 )
289 if self._metadata_provider:
290 # if metdata provider is present, we only write attributes to the metadata provider
291 self._storage_provider.upload_file(remote_path, local_path, attributes=None)
292 obj_metadata = self._storage_provider.get_object_metadata(remote_path)
293 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
294 with self._metadata_provider_lock or contextlib.nullcontext():
295 self._metadata_provider.add_file(virtual_path, obj_metadata)
296 else:
297 self._storage_provider.upload_file(remote_path, local_path, attributes)
298
299 @retry
300 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None:
301 """
302 Writes an object at the specified path.
303
304 :param path: The logical path where the object should be written.
305 :param body: The content to write to the object.
306 :param attributes: The attributes to add to the file.
307 """
308 virtual_path = path
309 if self._metadata_provider:
310 path, exists = self._metadata_provider.realpath(path)
311 if exists:
312 raise FileExistsError(
313 f"The file at path '{virtual_path}' already exists; "
314 f"overwriting is not yet allowed when using a metadata provider."
315 )
316 if self._metadata_provider:
317 # if metadata provider is present, we only write attributes to the metadata provider
318 self._storage_provider.put_object(path, body, attributes=None)
319 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
320 obj_metadata = self._storage_provider.get_object_metadata(path)
321 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
322 with self._metadata_provider_lock or contextlib.nullcontext():
323 self._metadata_provider.add_file(virtual_path, obj_metadata)
324 else:
325 self._storage_provider.put_object(path, body, attributes=attributes)
326
[docs]
327 def copy(self, src_path: str, dest_path: str) -> None:
328 """
329 Copies an object from source to destination path.
330
331 :param src_path: The logical path of the source object to copy.
332 :param dest_path: The logical path of the destination.
333 """
334 virtual_dest_path = dest_path
335 if self._metadata_provider:
336 src_path, exists = self._metadata_provider.realpath(src_path)
337 if not exists:
338 raise FileNotFoundError(f"The file at path '{src_path}' was not found.")
339
340 dest_path, exists = self._metadata_provider.realpath(dest_path)
341 if exists:
342 raise FileExistsError(
343 f"The file at path '{virtual_dest_path}' already exists; "
344 f"overwriting is not yet allowed when using a metadata provider."
345 )
346
347 self._storage_provider.copy_object(src_path, dest_path)
348 if self._metadata_provider:
349 metadata = self._storage_provider.get_object_metadata(dest_path)
350 with self._metadata_provider_lock or contextlib.nullcontext():
351 self._metadata_provider.add_file(virtual_dest_path, metadata)
352
[docs]
353 def delete(self, path: str, recursive: bool = False) -> None:
354 """
355 Deletes an object at the specified path.
356
357 :param path: The logical path of the object or directory to delete.
358 :param recursive: Whether to delete objects in the path recursively.
359 """
360 obj_metadata = self.info(path)
361 is_dir = obj_metadata and obj_metadata.type == "directory"
362 is_file = obj_metadata and obj_metadata.type == "file"
363 if recursive and is_dir:
364 self.sync_from(
365 NullStorageClient(),
366 path,
367 path,
368 delete_unmatched_files=True,
369 num_worker_processes=1,
370 description="Deleting",
371 )
372 # If this is a posix storage provider, we need to also delete remaining directory stubs.
373 if self._is_posix_file_storage_provider():
374 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider)
375 posix_storage_provider.rmtree(path)
376 return
377 else:
378 # 1) If path is a file: delete the file
379 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag
380 if is_file:
381 virtual_path = path
382 if self._metadata_provider:
383 path, exists = self._metadata_provider.realpath(path)
384 if not exists:
385 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.")
386 with self._metadata_provider_lock or contextlib.nullcontext():
387 self._metadata_provider.remove_file(virtual_path)
388
389 # Delete the cached file if it exists
390 if self._is_cache_enabled():
391 assert self._cache_manager is not None
392 self._cache_manager.delete(virtual_path)
393 self._storage_provider.delete_object(path)
394
395 # Delete from replicas if replica manager exists
396 if self._replica_manager:
397 self._replica_manager.delete_from_replicas(virtual_path)
398 elif is_dir:
399 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.")
400 else:
401 raise FileNotFoundError(f"The file at '{path}' was not found.")
402
[docs]
403 def glob(
404 self,
405 pattern: str,
406 include_url_prefix: bool = False,
407 attribute_filter_expression: Optional[str] = None,
408 ) -> list[str]:
409 """
410 Matches and retrieves a list of objects in the storage provider that
411 match the specified pattern.
412
413 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``).
414 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
415 :param attribute_filter_expression: The attribute filter expression to apply to the result.
416
417 :return: A list of object paths that match the pattern.
418 """
419 if self._metadata_provider:
420 results = self._metadata_provider.glob(pattern, attribute_filter_expression)
421 else:
422 results = self._storage_provider.glob(pattern, attribute_filter_expression)
423
424 if include_url_prefix:
425 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results]
426
427 return results
428
[docs]
429 def list(
430 self,
431 prefix: str = "",
432 path: str = "",
433 start_after: Optional[str] = None,
434 end_at: Optional[str] = None,
435 include_directories: bool = False,
436 include_url_prefix: bool = False,
437 attribute_filter_expression: Optional[str] = None,
438 show_attributes: bool = False,
439 ) -> Iterator[ObjectMetadata]:
440 """
441 Lists objects in the storage provider under the specified path.
442
443 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is
444 deprecated and will be removed in a future version.
445
446 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under.
447 :param path: The directory or file path to list objects under. This should be a
448 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
449 Cannot be used together with ``prefix``.
450 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
451 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
452 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects.
453 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
454 :param attribute_filter_expression: The attribute filter expression to apply to the result.
455 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to True.
456
457 :return: An iterator over objects.
458
459 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty).
460 """
461 # Parameter validation - either path or prefix, not both
462 if path and prefix:
463 raise ValueError(
464 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). "
465 f"Please use only the 'path' parameter for new code. "
466 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
467 )
468 elif prefix:
469 logger.debug(
470 f"The 'prefix' parameter is deprecated and will be removed in a future version. "
471 f"Please use the 'path' parameter instead. "
472 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
473 )
474
475 # Use path if provided, otherwise fall back to prefix
476 effective_path = path if path else prefix
477 if effective_path and not self.is_file(effective_path):
478 effective_path = (
479 effective_path.rstrip("/") + "/"
480 ) # assume it's a directory if the effective path is not empty
481
482 if self._metadata_provider:
483 objects = self._metadata_provider.list_objects(
484 effective_path,
485 start_after=start_after,
486 end_at=end_at,
487 include_directories=include_directories,
488 attribute_filter_expression=attribute_filter_expression,
489 show_attributes=show_attributes,
490 )
491 else:
492 objects = self._storage_provider.list_objects(
493 effective_path,
494 start_after=start_after,
495 end_at=end_at,
496 include_directories=include_directories,
497 attribute_filter_expression=attribute_filter_expression,
498 show_attributes=show_attributes,
499 )
500
501 for object in objects:
502 if include_url_prefix:
503 if self.is_default_profile():
504 object.key = str(PurePosixPath("/") / object.key)
505 else:
506 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key)
507 yield object
508
[docs]
509 def open(
510 self,
511 path: str,
512 mode: str = "rb",
513 buffering: int = -1,
514 encoding: Optional[str] = None,
515 disable_read_cache: bool = False,
516 memory_load_limit: int = MEMORY_LOAD_LIMIT,
517 atomic: bool = True,
518 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
519 attributes: Optional[dict[str, str]] = None,
520 ) -> Union[PosixFile, ObjectFile]:
521 """
522 Returns a file-like object from the specified path.
523
524 :param path: The logical path of the object to read.
525 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported.
526 :param buffering: The buffering mode. Only applies to PosixFile.
527 :param encoding: The encoding to use for text files.
528 :param disable_read_cache: When set to True, disables caching for the file content.
529 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
530 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB.
531 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
532 :param atomic: When set to True, the file will be written atomically (rename upon close).
533 This parameter is only applicable to PosixFile in write mode.
534 :param check_source_version: Whether to check the source version of cached objects.
535 :param attributes: The attributes to add to the file. This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab".
536 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
537 """
538 if self._is_posix_file_storage_provider():
539 return PosixFile(
540 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes
541 )
542 else:
543 if atomic is False:
544 logger.warning("Non-atomic writes are not supported for object storage providers.")
545
546 return ObjectFile(
547 self,
548 remote_path=path,
549 mode=mode,
550 encoding=encoding,
551 disable_read_cache=disable_read_cache,
552 memory_load_limit=memory_load_limit,
553 check_source_version=check_source_version,
554 attributes=attributes,
555 )
556
[docs]
557 def is_file(self, path: str) -> bool:
558 """
559 Checks whether the specified path points to a file (rather than a directory or folder).
560
561 :param path: The logical path to check.
562
563 :return: ``True`` if the path points to a file, ``False`` otherwise.
564 """
565 if self._metadata_provider:
566 _, exists = self._metadata_provider.realpath(path)
567 return exists
568 return self._storage_provider.is_file(path)
569
587
[docs]
588 def is_empty(self, path: str) -> bool:
589 """
590 Checks whether the specified path is empty. A path is considered empty if there are no
591 objects whose keys start with the given path as a prefix.
592
593 :param path: The logical path to check. This is typically a prefix representing a directory or folder.
594
595 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
596 """
597 if self._metadata_provider:
598 objects = self._metadata_provider.list_objects(path)
599 else:
600 objects = self._storage_provider.list_objects(path)
601
602 try:
603 return next(objects) is None
604 except StopIteration:
605 pass
606 return True
607
608 def __getstate__(self) -> dict[str, Any]:
609 state = self.__dict__.copy()
610 del state["_credentials_provider"]
611 del state["_storage_provider"]
612 del state["_metadata_provider"]
613 del state["_cache_manager"]
614 if "_metadata_provider_lock" in state:
615 del state["_metadata_provider_lock"]
616 if "_replicas" in state:
617 del state["_replicas"]
618 if "_replica_manager" in state:
619 del state["_replica_manager"]
620 return state
621
622 def __setstate__(self, state: dict[str, Any]) -> None:
623 config = state["_config"]
624 self._initialize_providers(config)
625 self._initialize_replicas(config.replicas)
626 if self._metadata_provider:
627 self._metadata_provider_lock = threading.Lock()
628
[docs]
629 def sync_from(
630 self,
631 source_client: "StorageClient",
632 source_path: str = "",
633 target_path: str = "",
634 delete_unmatched_files: bool = False,
635 description: str = "Syncing",
636 num_worker_processes: Optional[int] = None,
637 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
638 ) -> None:
639 """
640 Syncs files from the source storage client to "path/".
641
642 :param source_client: The source storage client.
643 :param source_path: The logical path to sync from.
644 :param target_path: The logical path to sync to.
645 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
646 :param description: Description of sync process for logging purposes.
647 :param num_worker_processes: The number of worker processes to use.
648 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
649 """
650 # Disable the replica manager during sync
651 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager:
652 source_client = StorageClient(source_client._config)
653 source_client._replica_manager = None
654
655 m = SyncManager(source_client, source_path, self, target_path)
656
657 m.sync_objects(
658 execution_mode=execution_mode,
659 description=description,
660 num_worker_processes=num_worker_processes,
661 delete_unmatched_files=delete_unmatched_files,
662 )
663
[docs]
664 def sync_replicas(
665 self,
666 source_path: str,
667 replica_indices: Optional[List[int]] = None,
668 delete_unmatched_files: bool = False,
669 description: str = "Syncing replicas",
670 num_worker_processes: Optional[int] = None,
671 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
672 ) -> None:
673 """
674 Sync files from the source storage client to target replicas.
675
676 :param source_path: The logical path to sync from.
677 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0.
678 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
679 :param description: Description of sync process for logging purposes.
680 :param num_worker_processes: The number of worker processes to use.
681 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
682 """
683 if not self._replicas:
684 logger.warning(
685 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable "
686 "secondary storage locations for redundancy and performance.",
687 self._config.profile,
688 )
689 return None
690
691 if replica_indices:
692 try:
693 replicas = [self._replicas[i] for i in replica_indices]
694 except IndexError as e:
695 raise ValueError(f"Replica index out of range: {replica_indices}") from e
696 else:
697 replicas = self._replicas
698
699 # Disable the replica manager during sync
700 if self._replica_manager:
701 source_client = StorageClient(self._config)
702 source_client._replica_manager = None
703 else:
704 source_client = self
705
706 for replica in replicas:
707 replica.sync_from(
708 source_client,
709 source_path,
710 source_path,
711 delete_unmatched_files=delete_unmatched_files,
712 description=description,
713 num_worker_processes=num_worker_processes,
714 execution_mode=execution_mode,
715 )