Source code for multistorageclient.client

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import atexit
 17import contextlib
 18import logging
 19import os
 20import threading
 21from collections.abc import Iterator
 22from datetime import datetime, timezone
 23from io import BytesIO
 24from pathlib import PurePosixPath
 25from typing import IO, Any, List, Optional, Union, cast
 26
 27from .config import StorageClientConfig
 28from .constants import MEMORY_LOAD_LIMIT
 29from .file import ObjectFile, PosixFile
 30from .instrumentation.utils import instrumented
 31from .providers.posix_file import PosixFileStorageProvider
 32from .replica_manager import ReplicaManager
 33from .retry import retry
 34from .sync import SyncManager
 35from .types import (
 36    AWARE_DATETIME_MIN,
 37    MSC_PROTOCOL,
 38    ExecutionMode,
 39    ObjectMetadata,
 40    Range,
 41    Replica,
 42    SourceVersionCheckMode,
 43)
 44from .utils import NullStorageClient, join_paths
 45
 46logger = logging.getLogger(__name__)
 47
 48
[docs] 49@instrumented 50class StorageClient: 51 """ 52 A client for interacting with different storage providers. 53 """ 54 55 _config: StorageClientConfig 56 _metadata_provider_lock: Optional[threading.Lock] = None 57 _stop_event: Optional[threading.Event] = None 58 _replica_manager: Optional[ReplicaManager] = None 59 60 def __init__(self, config: StorageClientConfig): 61 """ 62 Initializes the :py:class:`StorageClient` with the given configuration. 63 64 :param config: The configuration object for the storage client. 65 """ 66 self._initialize_providers(config) 67 self._initialize_replicas(config.replicas) 68 69 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event): 70 if not stop_event: 71 raise RuntimeError("Stop event not set") 72 73 while not stop_event.is_set(): 74 # Wait with the ability to exit early 75 if stop_event.wait(timeout=commit_interval_minutes * 60): 76 break 77 logger.debug("Auto-committing to metadata provider") 78 self.commit_metadata() 79 80 def _commit_on_exit(self): 81 logger.debug("Shutting down, committing metadata one last time...") 82 self.commit_metadata() 83 84 def _initialize_providers(self, config: StorageClientConfig) -> None: 85 self._config = config 86 self._credentials_provider = self._config.credentials_provider 87 self._storage_provider = self._config.storage_provider 88 self._metadata_provider = self._config.metadata_provider 89 self._cache_config = self._config.cache_config 90 self._retry_config = self._config.retry_config 91 self._cache_manager = self._config.cache_manager 92 self._autocommit_config = self._config.autocommit_config 93 94 if self._autocommit_config: 95 if self._metadata_provider: 96 logger.debug("Creating auto-commiter thread") 97 98 if self._autocommit_config.interval_minutes: 99 self._stop_event = threading.Event() 100 self._commit_thread = threading.Thread( 101 target=self._committer_thread, 102 daemon=True, 103 args=(self._autocommit_config.interval_minutes, self._stop_event), 104 ) 105 self._commit_thread.start() 106 107 if self._autocommit_config.at_exit: 108 atexit.register(self._commit_on_exit) 109 110 self._metadata_provider_lock = threading.Lock() 111 else: 112 logger.debug("No metadata provider configured, auto-commit will not be enabled") 113 114 def __del__(self): 115 if self._stop_event: 116 self._stop_event.set() 117 if self._commit_thread.is_alive(): 118 self._commit_thread.join(timeout=5.0) 119 120 def _get_source_version(self, path: str) -> Optional[str]: 121 """ 122 Get etag from metadata provider or storage provider. 123 """ 124 if self._metadata_provider: 125 metadata = self._metadata_provider.get_object_metadata(path) 126 else: 127 metadata = self._storage_provider.get_object_metadata(path) 128 return metadata.etag 129 130 def _is_cache_enabled(self) -> bool: 131 return self._cache_manager is not None and not self._is_posix_file_storage_provider() 132 133 def _is_posix_file_storage_provider(self) -> bool: 134 return isinstance(self._storage_provider, PosixFileStorageProvider) 135
[docs] 136 def is_default_profile(self) -> bool: 137 """ 138 Return True if the storage client is using the default profile. 139 """ 140 return self._config.profile == "default"
141 142 def _is_rust_client_enabled(self) -> bool: 143 """ 144 Return True if the storage provider is using the Rust client. 145 """ 146 return hasattr(self._storage_provider, "_rust_client") 147 148 @property 149 def profile(self) -> str: 150 return self._config.profile 151 152 def _initialize_replicas(self, replicas: list[Replica]) -> None: 153 """Initialize replica StorageClient instances.""" 154 # Sort replicas by read_priority, the first one is the primary replica 155 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority) 156 157 replica_clients = [] 158 for replica in sorted_replicas: 159 if self._config._config_dict is None: 160 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config") 161 replica_config = StorageClientConfig.from_dict( 162 config_dict=self._config._config_dict, profile=replica.replica_profile 163 ) 164 165 storage_client = StorageClient(config=replica_config) 166 replica_clients.append(storage_client) 167 168 self._replicas = replica_clients 169 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None 170 171 @property 172 def replicas(self) -> list["StorageClient"]: 173 """Return StorageClient instances for all replicas, sorted by read priority.""" 174 return self._replicas 175 176 @retry 177 def read(self, path: str, byte_range: Optional[Range] = None) -> bytes: 178 """ 179 Reads an object from the specified logical path. 180 181 :param path: The logical path of the object to read. 182 :return: The content of the object. 183 """ 184 if self._metadata_provider: 185 path, exists = self._metadata_provider.realpath(path) 186 if not exists: 187 raise FileNotFoundError(f"The file at path '{path}' was not found.") 188 189 # Never cache range-read requests 190 if byte_range: 191 return self._storage_provider.get_object(path, byte_range=byte_range) 192 193 # Read from cache if the file exists 194 if self._is_cache_enabled(): 195 assert self._cache_manager is not None 196 source_version = self._get_source_version(path) 197 data = self._cache_manager.read(path, source_version) 198 199 if data is None: 200 if self._replica_manager: 201 data = self._read_from_replica_or_primary(path) 202 else: 203 data = self._storage_provider.get_object(path) 204 self._cache_manager.set(path, data, source_version) 205 return data 206 elif self._replica_manager: 207 return self._read_from_replica_or_primary(path) 208 209 return self._storage_provider.get_object(path, byte_range=byte_range) 210 211 def _read_from_replica_or_primary(self, path: str) -> bytes: 212 """ 213 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files. 214 """ 215 assert self._replica_manager is not None, "Replica manager is not initialized" 216 file_obj = BytesIO() 217 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider) 218 return file_obj.getvalue() 219
[docs] 220 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 221 """ 222 Retrieves metadata or information about an object stored at the specified path. 223 224 :param path: The logical path to the object for which metadata or information is being retrieved. 225 :param strict: If True, performs additional validation to determine whether the path refers to a directory. 226 227 :return: A dictionary containing metadata about the object. 228 """ 229 230 if not path or path == ".": # for the empty path provided by the user 231 if self._is_posix_file_storage_provider(): 232 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc) 233 else: 234 last_modified = AWARE_DATETIME_MIN 235 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified) 236 237 if not self._metadata_provider: 238 return self._storage_provider.get_object_metadata(path, strict=strict) 239 240 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory. 241 try: 242 return self._metadata_provider.get_object_metadata(path) 243 except FileNotFoundError: 244 # Try listing from the parent to determine if path is a valid directory 245 parent = os.path.dirname(path.rstrip("/")) + "/" 246 parent = "" if parent == "/" else parent 247 target = path.rstrip("/") + "/" 248 249 try: 250 entries = self._metadata_provider.list_objects(parent, include_directories=True) 251 for entry in entries: 252 if entry.key == target and entry.type == "directory": 253 return entry 254 except Exception: 255 pass 256 raise # Raise original FileNotFoundError
257 258 @retry 259 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 260 """ 261 Downloads a file to the local file system. 262 263 :param remote_path: The logical path of the file in the storage provider. 264 :param local_path: The local path where the file should be downloaded. 265 """ 266 267 if self._metadata_provider: 268 real_path, exists = self._metadata_provider.realpath(remote_path) 269 if not exists: 270 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.") 271 metadata = self._metadata_provider.get_object_metadata(remote_path) 272 self._storage_provider.download_file(real_path, local_path, metadata) 273 elif self._replica_manager: 274 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider) 275 else: 276 self._storage_provider.download_file(remote_path, local_path) 277 278 @retry 279 def upload_file(self, remote_path: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None: 280 """ 281 Uploads a file from the local file system. 282 283 :param remote_path: The logical path where the file should be stored. 284 :param local_path: The local path of the file to upload. 285 :param attributes: The attributes to add to the file. 286 """ 287 virtual_path = remote_path 288 if self._metadata_provider: 289 remote_path, exists = self._metadata_provider.realpath(remote_path) 290 if exists: 291 raise FileExistsError( 292 f"The file at path '{virtual_path}' already exists; " 293 f"overwriting is not yet allowed when using a metadata provider." 294 ) 295 if self._metadata_provider: 296 # if metdata provider is present, we only write attributes to the metadata provider 297 self._storage_provider.upload_file(remote_path, local_path, attributes=None) 298 obj_metadata = self._storage_provider.get_object_metadata(remote_path) 299 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 300 with self._metadata_provider_lock or contextlib.nullcontext(): 301 self._metadata_provider.add_file(virtual_path, obj_metadata) 302 else: 303 self._storage_provider.upload_file(remote_path, local_path, attributes) 304 305 @retry 306 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None: 307 """ 308 Writes an object at the specified path. 309 310 :param path: The logical path where the object should be written. 311 :param body: The content to write to the object. 312 :param attributes: The attributes to add to the file. 313 """ 314 virtual_path = path 315 if self._metadata_provider: 316 path, exists = self._metadata_provider.realpath(path) 317 if exists: 318 raise FileExistsError( 319 f"The file at path '{virtual_path}' already exists; " 320 f"overwriting is not yet allowed when using a metadata provider." 321 ) 322 if self._metadata_provider: 323 # if metadata provider is present, we only write attributes to the metadata provider 324 self._storage_provider.put_object(path, body, attributes=None) 325 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 326 obj_metadata = self._storage_provider.get_object_metadata(path) 327 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 328 with self._metadata_provider_lock or contextlib.nullcontext(): 329 self._metadata_provider.add_file(virtual_path, obj_metadata) 330 else: 331 self._storage_provider.put_object(path, body, attributes=attributes) 332
[docs] 333 def copy(self, src_path: str, dest_path: str) -> None: 334 """ 335 Copies an object from source to destination path. 336 337 :param src_path: The logical path of the source object to copy. 338 :param dest_path: The logical path of the destination. 339 """ 340 virtual_dest_path = dest_path 341 if self._metadata_provider: 342 src_path, exists = self._metadata_provider.realpath(src_path) 343 if not exists: 344 raise FileNotFoundError(f"The file at path '{src_path}' was not found.") 345 346 dest_path, exists = self._metadata_provider.realpath(dest_path) 347 if exists: 348 raise FileExistsError( 349 f"The file at path '{virtual_dest_path}' already exists; " 350 f"overwriting is not yet allowed when using a metadata provider." 351 ) 352 353 self._storage_provider.copy_object(src_path, dest_path) 354 if self._metadata_provider: 355 metadata = self._storage_provider.get_object_metadata(dest_path) 356 with self._metadata_provider_lock or contextlib.nullcontext(): 357 self._metadata_provider.add_file(virtual_dest_path, metadata)
358
[docs] 359 def delete(self, path: str, recursive: bool = False) -> None: 360 """ 361 Deletes an object at the specified path. 362 363 :param path: The logical path of the object or directory to delete. 364 :param recursive: Whether to delete objects in the path recursively. 365 """ 366 obj_metadata = self.info(path) 367 is_dir = obj_metadata and obj_metadata.type == "directory" 368 is_file = obj_metadata and obj_metadata.type == "file" 369 if recursive and is_dir: 370 self.sync_from( 371 NullStorageClient(), 372 path, 373 path, 374 delete_unmatched_files=True, 375 num_worker_processes=1, 376 description="Deleting", 377 ) 378 # If this is a posix storage provider, we need to also delete remaining directory stubs. 379 if self._is_posix_file_storage_provider(): 380 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider) 381 posix_storage_provider.rmtree(path) 382 return 383 else: 384 # 1) If path is a file: delete the file 385 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag 386 if is_file: 387 virtual_path = path 388 if self._metadata_provider: 389 path, exists = self._metadata_provider.realpath(path) 390 if not exists: 391 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.") 392 with self._metadata_provider_lock or contextlib.nullcontext(): 393 self._metadata_provider.remove_file(virtual_path) 394 395 # Delete the cached file if it exists 396 if self._is_cache_enabled(): 397 assert self._cache_manager is not None 398 self._cache_manager.delete(virtual_path) 399 self._storage_provider.delete_object(path) 400 401 # Delete from replicas if replica manager exists 402 if self._replica_manager: 403 self._replica_manager.delete_from_replicas(virtual_path) 404 elif is_dir: 405 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.") 406 else: 407 raise FileNotFoundError(f"The file at '{path}' was not found.")
408
[docs] 409 def glob( 410 self, 411 pattern: str, 412 include_url_prefix: bool = False, 413 attribute_filter_expression: Optional[str] = None, 414 ) -> list[str]: 415 """ 416 Matches and retrieves a list of objects in the storage provider that 417 match the specified pattern. 418 419 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``). 420 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 421 :param attribute_filter_expression: The attribute filter expression to apply to the result. 422 423 :return: A list of object paths that match the pattern. 424 """ 425 if self._metadata_provider: 426 results = self._metadata_provider.glob(pattern, attribute_filter_expression) 427 else: 428 results = self._storage_provider.glob(pattern, attribute_filter_expression) 429 430 if include_url_prefix: 431 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 432 433 return results
434
[docs] 435 def list( 436 self, 437 prefix: str = "", 438 path: str = "", 439 start_after: Optional[str] = None, 440 end_at: Optional[str] = None, 441 include_directories: bool = False, 442 include_url_prefix: bool = False, 443 attribute_filter_expression: Optional[str] = None, 444 show_attributes: bool = False, 445 ) -> Iterator[ObjectMetadata]: 446 """ 447 Lists objects in the storage provider under the specified path. 448 449 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is 450 deprecated and will be removed in a future version. 451 452 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under. 453 :param path: The directory or file path to list objects under. This should be a 454 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 455 Cannot be used together with ``prefix``. 456 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 457 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 458 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 459 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 460 :param attribute_filter_expression: The attribute filter expression to apply to the result. 461 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to True. 462 463 :return: An iterator over objects. 464 465 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty). 466 """ 467 # Parameter validation - either path or prefix, not both 468 if path and prefix: 469 raise ValueError( 470 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 471 f"Please use only the 'path' parameter for new code. " 472 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 473 ) 474 elif prefix: 475 logger.debug( 476 f"The 'prefix' parameter is deprecated and will be removed in a future version. " 477 f"Please use the 'path' parameter instead. " 478 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 479 ) 480 481 # Use path if provided, otherwise fall back to prefix 482 effective_path = path if path else prefix 483 if effective_path and not self.is_file(effective_path): 484 effective_path = ( 485 effective_path.rstrip("/") + "/" 486 ) # assume it's a directory if the effective path is not empty 487 488 if self._metadata_provider: 489 objects = self._metadata_provider.list_objects( 490 effective_path, 491 start_after=start_after, 492 end_at=end_at, 493 include_directories=include_directories, 494 attribute_filter_expression=attribute_filter_expression, 495 show_attributes=show_attributes, 496 ) 497 else: 498 objects = self._storage_provider.list_objects( 499 effective_path, 500 start_after=start_after, 501 end_at=end_at, 502 include_directories=include_directories, 503 attribute_filter_expression=attribute_filter_expression, 504 show_attributes=show_attributes, 505 ) 506 507 for object in objects: 508 if include_url_prefix: 509 if self.is_default_profile(): 510 object.key = str(PurePosixPath("/") / object.key) 511 else: 512 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key) 513 yield object
514
[docs] 515 def open( 516 self, 517 path: str, 518 mode: str = "rb", 519 buffering: int = -1, 520 encoding: Optional[str] = None, 521 disable_read_cache: bool = False, 522 memory_load_limit: int = MEMORY_LOAD_LIMIT, 523 atomic: bool = True, 524 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 525 attributes: Optional[dict[str, str]] = None, 526 ) -> Union[PosixFile, ObjectFile]: 527 """ 528 Returns a file-like object from the specified path. 529 530 :param path: The logical path of the object to read. 531 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported. 532 :param buffering: The buffering mode. Only applies to PosixFile. 533 :param encoding: The encoding to use for text files. 534 :param disable_read_cache: When set to True, disables caching for the file content. 535 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 536 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 537 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 538 :param atomic: When set to True, the file will be written atomically (rename upon close). 539 This parameter is only applicable to PosixFile in write mode. 540 :param check_source_version: Whether to check the source version of cached objects. 541 :param attributes: The attributes to add to the file. This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". 542 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 543 """ 544 if self._is_posix_file_storage_provider(): 545 return PosixFile( 546 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes 547 ) 548 else: 549 if atomic is False: 550 logger.warning("Non-atomic writes are not supported for object storage providers.") 551 552 return ObjectFile( 553 self, 554 remote_path=path, 555 mode=mode, 556 encoding=encoding, 557 disable_read_cache=disable_read_cache, 558 memory_load_limit=memory_load_limit, 559 check_source_version=check_source_version, 560 attributes=attributes, 561 )
562
[docs] 563 def is_file(self, path: str) -> bool: 564 """ 565 Checks whether the specified path points to a file (rather than a directory or folder). 566 567 :param path: The logical path to check. 568 569 :return: ``True`` if the path points to a file, ``False`` otherwise. 570 """ 571 if self._metadata_provider: 572 _, exists = self._metadata_provider.realpath(path) 573 return exists 574 return self._storage_provider.is_file(path)
575
[docs] 576 def commit_metadata(self, prefix: Optional[str] = None) -> None: 577 """ 578 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 579 580 :param prefix: If provided, scans the prefix to find files to commit. 581 """ 582 if self._metadata_provider: 583 with self._metadata_provider_lock or contextlib.nullcontext(): 584 if prefix: 585 # The logical path for each item will be the physical path with 586 # the base physical path removed from the beginning. 587 physical_base, _ = self._metadata_provider.realpath("") 588 physical_prefix, _ = self._metadata_provider.realpath(prefix) 589 for obj in self._storage_provider.list_objects(physical_prefix): 590 virtual_path = obj.key[len(physical_base) :].lstrip("/") 591 self._metadata_provider.add_file(virtual_path, obj) 592 self._metadata_provider.commit_updates()
593
[docs] 594 def is_empty(self, path: str) -> bool: 595 """ 596 Checks whether the specified path is empty. A path is considered empty if there are no 597 objects whose keys start with the given path as a prefix. 598 599 :param path: The logical path to check. This is typically a prefix representing a directory or folder. 600 601 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 602 """ 603 if self._metadata_provider: 604 objects = self._metadata_provider.list_objects(path) 605 else: 606 objects = self._storage_provider.list_objects(path) 607 608 try: 609 return next(objects) is None 610 except StopIteration: 611 pass 612 return True
613 614 def __getstate__(self) -> dict[str, Any]: 615 state = self.__dict__.copy() 616 del state["_credentials_provider"] 617 del state["_storage_provider"] 618 del state["_metadata_provider"] 619 del state["_cache_manager"] 620 621 if "_metadata_provider_lock" in state: 622 del state["_metadata_provider_lock"] 623 624 if "_replicas" in state: 625 del state["_replicas"] 626 627 # Replica manager could be disabled if it's set to None in the state. 628 if "_replica_manager" in state: 629 if state["_replica_manager"] is not None: 630 del state["_replica_manager"] 631 632 return state 633 634 def __setstate__(self, state: dict[str, Any]) -> None: 635 config = state["_config"] 636 self._initialize_providers(config) 637 638 # Replica manager could be disabled if it's set to None in the state. 639 if "_replica_manager" in state and state["_replica_manager"] is None: 640 self._replica_manager = None 641 else: 642 self._initialize_replicas(config.replicas) 643 644 if self._metadata_provider: 645 self._metadata_provider_lock = threading.Lock() 646
[docs] 647 def sync_from( 648 self, 649 source_client: "StorageClient", 650 source_path: str = "", 651 target_path: str = "", 652 delete_unmatched_files: bool = False, 653 description: str = "Syncing", 654 num_worker_processes: Optional[int] = None, 655 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 656 ) -> None: 657 """ 658 Syncs files from the source storage client to "path/". 659 660 :param source_client: The source storage client. 661 :param source_path: The logical path to sync from. 662 :param target_path: The logical path to sync to. 663 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 664 :param description: Description of sync process for logging purposes. 665 :param num_worker_processes: The number of worker processes to use. 666 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 667 """ 668 # Disable the replica manager during sync 669 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager: 670 source_client = StorageClient(source_client._config) 671 source_client._replica_manager = None 672 673 m = SyncManager(source_client, source_path, self, target_path) 674 675 m.sync_objects( 676 execution_mode=execution_mode, 677 description=description, 678 num_worker_processes=num_worker_processes, 679 delete_unmatched_files=delete_unmatched_files, 680 )
681
[docs] 682 def sync_replicas( 683 self, 684 source_path: str, 685 replica_indices: Optional[List[int]] = None, 686 delete_unmatched_files: bool = False, 687 description: str = "Syncing replica", 688 num_worker_processes: Optional[int] = None, 689 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 690 ) -> None: 691 """ 692 Sync files from the source storage client to target replicas. 693 694 :param source_path: The logical path to sync from. 695 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0. 696 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 697 :param description: Description of sync process for logging purposes. 698 :param num_worker_processes: The number of worker processes to use. 699 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 700 """ 701 if not self._replicas: 702 logger.warning( 703 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable " 704 "secondary storage locations for redundancy and performance.", 705 self._config.profile, 706 ) 707 return None 708 709 if replica_indices: 710 try: 711 replicas = [self._replicas[i] for i in replica_indices] 712 except IndexError as e: 713 raise ValueError(f"Replica index out of range: {replica_indices}") from e 714 else: 715 replicas = self._replicas 716 717 # Disable the replica manager during sync 718 if self._replica_manager: 719 source_client = StorageClient(self._config) 720 source_client._replica_manager = None 721 else: 722 source_client = self 723 724 for replica in replicas: 725 replica.sync_from( 726 source_client, 727 source_path, 728 source_path, 729 delete_unmatched_files=delete_unmatched_files, 730 description=f"{description} ({replica.profile})", 731 num_worker_processes=num_worker_processes, 732 execution_mode=execution_mode, 733 )