1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import contextlib
18import logging
19import os
20import threading
21from collections.abc import Iterator
22from datetime import datetime, timezone
23from io import BytesIO
24from pathlib import PurePosixPath
25from typing import IO, Any, List, Optional, Union, cast
26
27from .config import StorageClientConfig
28from .constants import MEMORY_LOAD_LIMIT
29from .file import ObjectFile, PosixFile
30from .instrumentation.utils import instrumented
31from .providers.posix_file import PosixFileStorageProvider
32from .replica_manager import ReplicaManager
33from .retry import retry
34from .sync import SyncManager
35from .types import (
36 AWARE_DATETIME_MIN,
37 MSC_PROTOCOL,
38 ExecutionMode,
39 ObjectMetadata,
40 Range,
41 Replica,
42 SourceVersionCheckMode,
43)
44from .utils import NullStorageClient, join_paths
45
46logger = logging.getLogger(__name__)
47
48
[docs]
49@instrumented
50class StorageClient:
51 """
52 A client for interacting with different storage providers.
53 """
54
55 _config: StorageClientConfig
56 _metadata_provider_lock: Optional[threading.Lock] = None
57 _stop_event: Optional[threading.Event] = None
58 _replica_manager: Optional[ReplicaManager] = None
59
60 def __init__(self, config: StorageClientConfig):
61 """
62 Initializes the :py:class:`StorageClient` with the given configuration.
63
64 :param config: The configuration object for the storage client.
65 """
66 self._initialize_providers(config)
67 self._initialize_replicas(config.replicas)
68
69 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event):
70 if not stop_event:
71 raise RuntimeError("Stop event not set")
72
73 while not stop_event.is_set():
74 # Wait with the ability to exit early
75 if stop_event.wait(timeout=commit_interval_minutes * 60):
76 break
77 logger.debug("Auto-committing to metadata provider")
78 self.commit_metadata()
79
80 def _commit_on_exit(self):
81 logger.debug("Shutting down, committing metadata one last time...")
82 self.commit_metadata()
83
84 def _initialize_providers(self, config: StorageClientConfig) -> None:
85 self._config = config
86 self._credentials_provider = self._config.credentials_provider
87 self._storage_provider = self._config.storage_provider
88 self._metadata_provider = self._config.metadata_provider
89 self._cache_config = self._config.cache_config
90 self._retry_config = self._config.retry_config
91 self._cache_manager = self._config.cache_manager
92 self._autocommit_config = self._config.autocommit_config
93
94 if self._autocommit_config:
95 if self._metadata_provider:
96 logger.debug("Creating auto-commiter thread")
97
98 if self._autocommit_config.interval_minutes:
99 self._stop_event = threading.Event()
100 self._commit_thread = threading.Thread(
101 target=self._committer_thread,
102 daemon=True,
103 args=(self._autocommit_config.interval_minutes, self._stop_event),
104 )
105 self._commit_thread.start()
106
107 if self._autocommit_config.at_exit:
108 atexit.register(self._commit_on_exit)
109
110 self._metadata_provider_lock = threading.Lock()
111 else:
112 logger.debug("No metadata provider configured, auto-commit will not be enabled")
113
114 def __del__(self):
115 if self._stop_event:
116 self._stop_event.set()
117 if self._commit_thread.is_alive():
118 self._commit_thread.join(timeout=5.0)
119
120 def _get_source_version(self, path: str) -> Optional[str]:
121 """
122 Get etag from metadata provider or storage provider.
123 """
124 if self._metadata_provider:
125 metadata = self._metadata_provider.get_object_metadata(path)
126 else:
127 metadata = self._storage_provider.get_object_metadata(path)
128 return metadata.etag
129
130 def _is_cache_enabled(self) -> bool:
131 return self._cache_manager is not None and not self._is_posix_file_storage_provider()
132
133 def _is_posix_file_storage_provider(self) -> bool:
134 return isinstance(self._storage_provider, PosixFileStorageProvider)
135
[docs]
136 def is_default_profile(self) -> bool:
137 """
138 Return True if the storage client is using the default profile.
139 """
140 return self._config.profile == "default"
141
142 def _is_rust_client_enabled(self) -> bool:
143 """
144 Return True if the storage provider is using the Rust client.
145 """
146 return hasattr(self._storage_provider, "_rust_client")
147
148 @property
149 def profile(self) -> str:
150 return self._config.profile
151
152 def _initialize_replicas(self, replicas: list[Replica]) -> None:
153 """Initialize replica StorageClient instances."""
154 # Sort replicas by read_priority, the first one is the primary replica
155 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority)
156
157 replica_clients = []
158 for replica in sorted_replicas:
159 if self._config._config_dict is None:
160 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config")
161 replica_config = StorageClientConfig.from_dict(
162 config_dict=self._config._config_dict, profile=replica.replica_profile
163 )
164
165 storage_client = StorageClient(config=replica_config)
166 replica_clients.append(storage_client)
167
168 self._replicas = replica_clients
169 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None
170
171 @property
172 def replicas(self) -> list["StorageClient"]:
173 """Return StorageClient instances for all replicas, sorted by read priority."""
174 return self._replicas
175
176 @retry
177 def read(self, path: str, byte_range: Optional[Range] = None) -> bytes:
178 """
179 Reads an object from the specified logical path.
180
181 :param path: The logical path of the object to read.
182 :return: The content of the object.
183 """
184 if self._metadata_provider:
185 path, exists = self._metadata_provider.realpath(path)
186 if not exists:
187 raise FileNotFoundError(f"The file at path '{path}' was not found.")
188
189 # Never cache range-read requests
190 if byte_range:
191 return self._storage_provider.get_object(path, byte_range=byte_range)
192
193 # Read from cache if the file exists
194 if self._is_cache_enabled():
195 assert self._cache_manager is not None
196 source_version = self._get_source_version(path)
197 data = self._cache_manager.read(path, source_version)
198
199 if data is None:
200 if self._replica_manager:
201 data = self._read_from_replica_or_primary(path)
202 else:
203 data = self._storage_provider.get_object(path)
204 self._cache_manager.set(path, data, source_version)
205 return data
206 elif self._replica_manager:
207 return self._read_from_replica_or_primary(path)
208
209 return self._storage_provider.get_object(path, byte_range=byte_range)
210
211 def _read_from_replica_or_primary(self, path: str) -> bytes:
212 """
213 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files.
214 """
215 assert self._replica_manager is not None, "Replica manager is not initialized"
216 file_obj = BytesIO()
217 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider)
218 return file_obj.getvalue()
219
[docs]
220 def info(self, path: str, strict: bool = True) -> ObjectMetadata:
221 """
222 Retrieves metadata or information about an object stored at the specified path.
223
224 :param path: The logical path to the object for which metadata or information is being retrieved.
225 :param strict: If True, performs additional validation to determine whether the path refers to a directory.
226
227 :return: A dictionary containing metadata about the object.
228 """
229
230 if not path or path == ".": # for the empty path provided by the user
231 if self._is_posix_file_storage_provider():
232 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc)
233 else:
234 last_modified = AWARE_DATETIME_MIN
235 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified)
236
237 if not self._metadata_provider:
238 return self._storage_provider.get_object_metadata(path, strict=strict)
239
240 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory.
241 try:
242 return self._metadata_provider.get_object_metadata(path)
243 except FileNotFoundError:
244 # Try listing from the parent to determine if path is a valid directory
245 parent = os.path.dirname(path.rstrip("/")) + "/"
246 parent = "" if parent == "/" else parent
247 target = path.rstrip("/") + "/"
248
249 try:
250 entries = self._metadata_provider.list_objects(parent, include_directories=True)
251 for entry in entries:
252 if entry.key == target and entry.type == "directory":
253 return entry
254 except Exception:
255 pass
256 raise # Raise original FileNotFoundError
257
258 @retry
259 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None:
260 """
261 Downloads a file to the local file system.
262
263 :param remote_path: The logical path of the file in the storage provider.
264 :param local_path: The local path where the file should be downloaded.
265 """
266
267 if self._metadata_provider:
268 real_path, exists = self._metadata_provider.realpath(remote_path)
269 if not exists:
270 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.")
271 metadata = self._metadata_provider.get_object_metadata(remote_path)
272 self._storage_provider.download_file(real_path, local_path, metadata)
273 elif self._replica_manager:
274 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider)
275 else:
276 self._storage_provider.download_file(remote_path, local_path)
277
278 @retry
279 def upload_file(self, remote_path: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None:
280 """
281 Uploads a file from the local file system.
282
283 :param remote_path: The logical path where the file should be stored.
284 :param local_path: The local path of the file to upload.
285 :param attributes: The attributes to add to the file.
286 """
287 virtual_path = remote_path
288 if self._metadata_provider:
289 remote_path, exists = self._metadata_provider.realpath(remote_path)
290 if exists:
291 raise FileExistsError(
292 f"The file at path '{virtual_path}' already exists; "
293 f"overwriting is not yet allowed when using a metadata provider."
294 )
295 if self._metadata_provider:
296 # if metdata provider is present, we only write attributes to the metadata provider
297 self._storage_provider.upload_file(remote_path, local_path, attributes=None)
298 obj_metadata = self._storage_provider.get_object_metadata(remote_path)
299 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
300 with self._metadata_provider_lock or contextlib.nullcontext():
301 self._metadata_provider.add_file(virtual_path, obj_metadata)
302 else:
303 self._storage_provider.upload_file(remote_path, local_path, attributes)
304
305 @retry
306 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None:
307 """
308 Writes an object at the specified path.
309
310 :param path: The logical path where the object should be written.
311 :param body: The content to write to the object.
312 :param attributes: The attributes to add to the file.
313 """
314 virtual_path = path
315 if self._metadata_provider:
316 path, exists = self._metadata_provider.realpath(path)
317 if exists:
318 raise FileExistsError(
319 f"The file at path '{virtual_path}' already exists; "
320 f"overwriting is not yet allowed when using a metadata provider."
321 )
322 if self._metadata_provider:
323 # if metadata provider is present, we only write attributes to the metadata provider
324 self._storage_provider.put_object(path, body, attributes=None)
325 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
326 obj_metadata = self._storage_provider.get_object_metadata(path)
327 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
328 with self._metadata_provider_lock or contextlib.nullcontext():
329 self._metadata_provider.add_file(virtual_path, obj_metadata)
330 else:
331 self._storage_provider.put_object(path, body, attributes=attributes)
332
[docs]
333 def copy(self, src_path: str, dest_path: str) -> None:
334 """
335 Copies an object from source to destination path.
336
337 :param src_path: The logical path of the source object to copy.
338 :param dest_path: The logical path of the destination.
339 """
340 virtual_dest_path = dest_path
341 if self._metadata_provider:
342 src_path, exists = self._metadata_provider.realpath(src_path)
343 if not exists:
344 raise FileNotFoundError(f"The file at path '{src_path}' was not found.")
345
346 dest_path, exists = self._metadata_provider.realpath(dest_path)
347 if exists:
348 raise FileExistsError(
349 f"The file at path '{virtual_dest_path}' already exists; "
350 f"overwriting is not yet allowed when using a metadata provider."
351 )
352
353 self._storage_provider.copy_object(src_path, dest_path)
354 if self._metadata_provider:
355 metadata = self._storage_provider.get_object_metadata(dest_path)
356 with self._metadata_provider_lock or contextlib.nullcontext():
357 self._metadata_provider.add_file(virtual_dest_path, metadata)
358
[docs]
359 def delete(self, path: str, recursive: bool = False) -> None:
360 """
361 Deletes an object at the specified path.
362
363 :param path: The logical path of the object or directory to delete.
364 :param recursive: Whether to delete objects in the path recursively.
365 """
366 obj_metadata = self.info(path)
367 is_dir = obj_metadata and obj_metadata.type == "directory"
368 is_file = obj_metadata and obj_metadata.type == "file"
369 if recursive and is_dir:
370 self.sync_from(
371 NullStorageClient(),
372 path,
373 path,
374 delete_unmatched_files=True,
375 num_worker_processes=1,
376 description="Deleting",
377 )
378 # If this is a posix storage provider, we need to also delete remaining directory stubs.
379 if self._is_posix_file_storage_provider():
380 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider)
381 posix_storage_provider.rmtree(path)
382 return
383 else:
384 # 1) If path is a file: delete the file
385 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag
386 if is_file:
387 virtual_path = path
388 if self._metadata_provider:
389 path, exists = self._metadata_provider.realpath(path)
390 if not exists:
391 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.")
392 with self._metadata_provider_lock or contextlib.nullcontext():
393 self._metadata_provider.remove_file(virtual_path)
394
395 # Delete the cached file if it exists
396 if self._is_cache_enabled():
397 assert self._cache_manager is not None
398 self._cache_manager.delete(virtual_path)
399 self._storage_provider.delete_object(path)
400
401 # Delete from replicas if replica manager exists
402 if self._replica_manager:
403 self._replica_manager.delete_from_replicas(virtual_path)
404 elif is_dir:
405 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.")
406 else:
407 raise FileNotFoundError(f"The file at '{path}' was not found.")
408
[docs]
409 def glob(
410 self,
411 pattern: str,
412 include_url_prefix: bool = False,
413 attribute_filter_expression: Optional[str] = None,
414 ) -> list[str]:
415 """
416 Matches and retrieves a list of objects in the storage provider that
417 match the specified pattern.
418
419 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``).
420 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
421 :param attribute_filter_expression: The attribute filter expression to apply to the result.
422
423 :return: A list of object paths that match the pattern.
424 """
425 if self._metadata_provider:
426 results = self._metadata_provider.glob(pattern, attribute_filter_expression)
427 else:
428 results = self._storage_provider.glob(pattern, attribute_filter_expression)
429
430 if include_url_prefix:
431 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results]
432
433 return results
434
[docs]
435 def list(
436 self,
437 prefix: str = "",
438 path: str = "",
439 start_after: Optional[str] = None,
440 end_at: Optional[str] = None,
441 include_directories: bool = False,
442 include_url_prefix: bool = False,
443 attribute_filter_expression: Optional[str] = None,
444 show_attributes: bool = False,
445 ) -> Iterator[ObjectMetadata]:
446 """
447 Lists objects in the storage provider under the specified path.
448
449 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is
450 deprecated and will be removed in a future version.
451
452 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under.
453 :param path: The directory or file path to list objects under. This should be a
454 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
455 Cannot be used together with ``prefix``.
456 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
457 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
458 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects.
459 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
460 :param attribute_filter_expression: The attribute filter expression to apply to the result.
461 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to True.
462
463 :return: An iterator over objects.
464
465 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty).
466 """
467 # Parameter validation - either path or prefix, not both
468 if path and prefix:
469 raise ValueError(
470 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). "
471 f"Please use only the 'path' parameter for new code. "
472 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
473 )
474 elif prefix:
475 logger.debug(
476 f"The 'prefix' parameter is deprecated and will be removed in a future version. "
477 f"Please use the 'path' parameter instead. "
478 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
479 )
480
481 # Use path if provided, otherwise fall back to prefix
482 effective_path = path if path else prefix
483 if effective_path and not self.is_file(effective_path):
484 effective_path = (
485 effective_path.rstrip("/") + "/"
486 ) # assume it's a directory if the effective path is not empty
487
488 if self._metadata_provider:
489 objects = self._metadata_provider.list_objects(
490 effective_path,
491 start_after=start_after,
492 end_at=end_at,
493 include_directories=include_directories,
494 attribute_filter_expression=attribute_filter_expression,
495 show_attributes=show_attributes,
496 )
497 else:
498 objects = self._storage_provider.list_objects(
499 effective_path,
500 start_after=start_after,
501 end_at=end_at,
502 include_directories=include_directories,
503 attribute_filter_expression=attribute_filter_expression,
504 show_attributes=show_attributes,
505 )
506
507 for object in objects:
508 if include_url_prefix:
509 if self.is_default_profile():
510 object.key = str(PurePosixPath("/") / object.key)
511 else:
512 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key)
513 yield object
514
[docs]
515 def open(
516 self,
517 path: str,
518 mode: str = "rb",
519 buffering: int = -1,
520 encoding: Optional[str] = None,
521 disable_read_cache: bool = False,
522 memory_load_limit: int = MEMORY_LOAD_LIMIT,
523 atomic: bool = True,
524 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
525 attributes: Optional[dict[str, str]] = None,
526 ) -> Union[PosixFile, ObjectFile]:
527 """
528 Returns a file-like object from the specified path.
529
530 :param path: The logical path of the object to read.
531 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported.
532 :param buffering: The buffering mode. Only applies to PosixFile.
533 :param encoding: The encoding to use for text files.
534 :param disable_read_cache: When set to True, disables caching for the file content.
535 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
536 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB.
537 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
538 :param atomic: When set to True, the file will be written atomically (rename upon close).
539 This parameter is only applicable to PosixFile in write mode.
540 :param check_source_version: Whether to check the source version of cached objects.
541 :param attributes: The attributes to add to the file. This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab".
542 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
543 """
544 if self._is_posix_file_storage_provider():
545 return PosixFile(
546 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes
547 )
548 else:
549 if atomic is False:
550 logger.warning("Non-atomic writes are not supported for object storage providers.")
551
552 return ObjectFile(
553 self,
554 remote_path=path,
555 mode=mode,
556 encoding=encoding,
557 disable_read_cache=disable_read_cache,
558 memory_load_limit=memory_load_limit,
559 check_source_version=check_source_version,
560 attributes=attributes,
561 )
562
[docs]
563 def is_file(self, path: str) -> bool:
564 """
565 Checks whether the specified path points to a file (rather than a directory or folder).
566
567 :param path: The logical path to check.
568
569 :return: ``True`` if the path points to a file, ``False`` otherwise.
570 """
571 if self._metadata_provider:
572 _, exists = self._metadata_provider.realpath(path)
573 return exists
574 return self._storage_provider.is_file(path)
575
593
[docs]
594 def is_empty(self, path: str) -> bool:
595 """
596 Checks whether the specified path is empty. A path is considered empty if there are no
597 objects whose keys start with the given path as a prefix.
598
599 :param path: The logical path to check. This is typically a prefix representing a directory or folder.
600
601 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
602 """
603 if self._metadata_provider:
604 objects = self._metadata_provider.list_objects(path)
605 else:
606 objects = self._storage_provider.list_objects(path)
607
608 try:
609 return next(objects) is None
610 except StopIteration:
611 pass
612 return True
613
614 def __getstate__(self) -> dict[str, Any]:
615 state = self.__dict__.copy()
616 del state["_credentials_provider"]
617 del state["_storage_provider"]
618 del state["_metadata_provider"]
619 del state["_cache_manager"]
620
621 if "_metadata_provider_lock" in state:
622 del state["_metadata_provider_lock"]
623
624 if "_replicas" in state:
625 del state["_replicas"]
626
627 # Replica manager could be disabled if it's set to None in the state.
628 if "_replica_manager" in state:
629 if state["_replica_manager"] is not None:
630 del state["_replica_manager"]
631
632 return state
633
634 def __setstate__(self, state: dict[str, Any]) -> None:
635 config = state["_config"]
636 self._initialize_providers(config)
637
638 # Replica manager could be disabled if it's set to None in the state.
639 if "_replica_manager" in state and state["_replica_manager"] is None:
640 self._replica_manager = None
641 else:
642 self._initialize_replicas(config.replicas)
643
644 if self._metadata_provider:
645 self._metadata_provider_lock = threading.Lock()
646
[docs]
647 def sync_from(
648 self,
649 source_client: "StorageClient",
650 source_path: str = "",
651 target_path: str = "",
652 delete_unmatched_files: bool = False,
653 description: str = "Syncing",
654 num_worker_processes: Optional[int] = None,
655 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
656 ) -> None:
657 """
658 Syncs files from the source storage client to "path/".
659
660 :param source_client: The source storage client.
661 :param source_path: The logical path to sync from.
662 :param target_path: The logical path to sync to.
663 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
664 :param description: Description of sync process for logging purposes.
665 :param num_worker_processes: The number of worker processes to use.
666 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
667 """
668 # Disable the replica manager during sync
669 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager:
670 source_client = StorageClient(source_client._config)
671 source_client._replica_manager = None
672
673 m = SyncManager(source_client, source_path, self, target_path)
674
675 m.sync_objects(
676 execution_mode=execution_mode,
677 description=description,
678 num_worker_processes=num_worker_processes,
679 delete_unmatched_files=delete_unmatched_files,
680 )
681
[docs]
682 def sync_replicas(
683 self,
684 source_path: str,
685 replica_indices: Optional[List[int]] = None,
686 delete_unmatched_files: bool = False,
687 description: str = "Syncing replica",
688 num_worker_processes: Optional[int] = None,
689 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
690 ) -> None:
691 """
692 Sync files from the source storage client to target replicas.
693
694 :param source_path: The logical path to sync from.
695 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0.
696 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
697 :param description: Description of sync process for logging purposes.
698 :param num_worker_processes: The number of worker processes to use.
699 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
700 """
701 if not self._replicas:
702 logger.warning(
703 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable "
704 "secondary storage locations for redundancy and performance.",
705 self._config.profile,
706 )
707 return None
708
709 if replica_indices:
710 try:
711 replicas = [self._replicas[i] for i in replica_indices]
712 except IndexError as e:
713 raise ValueError(f"Replica index out of range: {replica_indices}") from e
714 else:
715 replicas = self._replicas
716
717 # Disable the replica manager during sync
718 if self._replica_manager:
719 source_client = StorageClient(self._config)
720 source_client._replica_manager = None
721 else:
722 source_client = self
723
724 for replica in replicas:
725 replica.sync_from(
726 source_client,
727 source_path,
728 source_path,
729 delete_unmatched_files=delete_unmatched_files,
730 description=f"{description} ({replica.profile})",
731 num_worker_processes=num_worker_processes,
732 execution_mode=execution_mode,
733 )