Source code for multistorageclient.client

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import atexit
 17import contextlib
 18import logging
 19import os
 20import threading
 21from collections.abc import Iterator
 22from datetime import datetime, timezone
 23from io import BytesIO
 24from pathlib import PurePosixPath
 25from typing import IO, Any, List, Optional, Union, cast
 26
 27from .config import StorageClientConfig
 28from .constants import MEMORY_LOAD_LIMIT
 29from .file import ObjectFile, PosixFile
 30from .instrumentation.utils import instrumented
 31from .providers.posix_file import PosixFileStorageProvider
 32from .replica_manager import ReplicaManager
 33from .retry import retry
 34from .sync import SyncManager
 35from .types import (
 36    AWARE_DATETIME_MIN,
 37    MSC_PROTOCOL,
 38    ExecutionMode,
 39    ObjectMetadata,
 40    PatternList,
 41    Range,
 42    Replica,
 43    SourceVersionCheckMode,
 44)
 45from .utils import NullStorageClient, PatternMatcher, join_paths
 46
 47logger = logging.getLogger(__name__)
 48
 49
[docs] 50@instrumented 51class StorageClient: 52 """ 53 A client for interacting with different storage providers. 54 """ 55 56 _config: StorageClientConfig 57 _metadata_provider_lock: Optional[threading.Lock] = None 58 _stop_event: Optional[threading.Event] = None 59 _replica_manager: Optional[ReplicaManager] = None 60 61 def __init__(self, config: StorageClientConfig): 62 """ 63 Initializes the :py:class:`StorageClient` with the given configuration. 64 65 :param config: The configuration object for the storage client. 66 """ 67 self._initialize_providers(config) 68 self._initialize_replicas(config.replicas) 69 70 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event): 71 if not stop_event: 72 raise RuntimeError("Stop event not set") 73 74 while not stop_event.is_set(): 75 # Wait with the ability to exit early 76 if stop_event.wait(timeout=commit_interval_minutes * 60): 77 break 78 logger.debug("Auto-committing to metadata provider") 79 self.commit_metadata() 80 81 def _commit_on_exit(self): 82 logger.debug("Shutting down, committing metadata one last time...") 83 self.commit_metadata() 84 85 def _initialize_providers(self, config: StorageClientConfig) -> None: 86 self._config = config 87 self._credentials_provider = self._config.credentials_provider 88 self._storage_provider = self._config.storage_provider 89 self._metadata_provider = self._config.metadata_provider 90 self._cache_config = self._config.cache_config 91 self._retry_config = self._config.retry_config 92 self._cache_manager = self._config.cache_manager 93 self._autocommit_config = self._config.autocommit_config 94 95 if self._autocommit_config: 96 if self._metadata_provider: 97 logger.debug("Creating auto-commiter thread") 98 99 if self._autocommit_config.interval_minutes: 100 self._stop_event = threading.Event() 101 self._commit_thread = threading.Thread( 102 target=self._committer_thread, 103 daemon=True, 104 args=(self._autocommit_config.interval_minutes, self._stop_event), 105 ) 106 self._commit_thread.start() 107 108 if self._autocommit_config.at_exit: 109 atexit.register(self._commit_on_exit) 110 111 self._metadata_provider_lock = threading.Lock() 112 else: 113 logger.debug("No metadata provider configured, auto-commit will not be enabled") 114 115 def __del__(self): 116 if self._stop_event: 117 self._stop_event.set() 118 if self._commit_thread.is_alive(): 119 self._commit_thread.join(timeout=5.0) 120 121 def _get_source_version(self, path: str) -> Optional[str]: 122 """ 123 Get etag from metadata provider or storage provider. 124 """ 125 if self._metadata_provider: 126 metadata = self._metadata_provider.get_object_metadata(path) 127 else: 128 metadata = self._storage_provider.get_object_metadata(path) 129 return metadata.etag 130 131 def _is_cache_enabled(self) -> bool: 132 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider() 133 return enabled 134 135 def _is_posix_file_storage_provider(self) -> bool: 136 return isinstance(self._storage_provider, PosixFileStorageProvider) 137
[docs] 138 def is_default_profile(self) -> bool: 139 """ 140 Return True if the storage client is using the default profile. 141 """ 142 return self._config.profile == "default"
143 144 def _is_rust_client_enabled(self) -> bool: 145 """ 146 Return True if the storage provider is using the Rust client. 147 """ 148 return hasattr(self._storage_provider, "_rust_client") 149 150 @property 151 def profile(self) -> str: 152 return self._config.profile 153 154 def _initialize_replicas(self, replicas: list[Replica]) -> None: 155 """Initialize replica StorageClient instances.""" 156 # Sort replicas by read_priority, the first one is the primary replica 157 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority) 158 159 replica_clients = [] 160 for replica in sorted_replicas: 161 if self._config._config_dict is None: 162 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config") 163 replica_config = StorageClientConfig.from_dict( 164 config_dict=self._config._config_dict, profile=replica.replica_profile 165 ) 166 167 storage_client = StorageClient(config=replica_config) 168 replica_clients.append(storage_client) 169 170 self._replicas = replica_clients 171 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None 172 173 @property 174 def replicas(self) -> list["StorageClient"]: 175 """Return StorageClient instances for all replicas, sorted by read priority.""" 176 return self._replicas 177 178 @retry 179 def read( 180 self, 181 path: str, 182 byte_range: Optional[Range] = None, 183 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 184 ) -> bytes: 185 """ 186 Reads an object from the specified logical path. 187 188 :param path: The logical path of the object to read. 189 :param byte_range: Optional byte range to read. 190 :param check_source_version: Whether to check the source version of cached objects. 191 :param size: Optional file size for range reads. 192 :return: The content of the object. 193 """ 194 if self._metadata_provider: 195 path, exists = self._metadata_provider.realpath(path) 196 if not exists: 197 raise FileNotFoundError(f"The file at path '{path}' was not found.") 198 199 # Handle caching logic 200 if self._is_cache_enabled() and self._cache_manager: 201 if byte_range: 202 # Range request with cache 203 try: 204 if check_source_version == SourceVersionCheckMode.ENABLE: 205 metadata = self._storage_provider.get_object_metadata(path) 206 source_version = metadata.etag 207 elif check_source_version == SourceVersionCheckMode.INHERIT: 208 if self._cache_manager.check_source_version(): 209 metadata = self._storage_provider.get_object_metadata(path) 210 source_version = metadata.etag 211 else: 212 metadata = None 213 source_version = None 214 else: 215 metadata = None 216 source_version = None 217 218 data = self._cache_manager.read( 219 key=path, 220 source_version=source_version, 221 byte_range=byte_range, 222 storage_provider=self._storage_provider, 223 ) 224 if data is not None: 225 return data 226 # Fallback (should not normally happen) 227 return self._storage_provider.get_object(path, byte_range=byte_range) 228 except (FileNotFoundError, Exception): 229 # Fall back to direct read if metadata fetching fails 230 return self._storage_provider.get_object(path, byte_range=byte_range) 231 else: 232 # Full file read with cache 233 source_version = self._get_source_version(path) 234 data = self._cache_manager.read(path, source_version) 235 236 # Read from cache if the file exists 237 if self._is_cache_enabled(): 238 if self._cache_manager is None: 239 raise RuntimeError("Cache manager is not initialized") 240 source_version = self._get_source_version(path) 241 data = self._cache_manager.read(path, source_version) 242 243 if data is None: 244 if self._replica_manager: 245 data = self._read_from_replica_or_primary(path) 246 else: 247 data = self._storage_provider.get_object(path) 248 self._cache_manager.set(path, data, source_version) 249 return data 250 elif self._replica_manager: 251 # No cache, but replicas available 252 return self._read_from_replica_or_primary(path) 253 else: 254 # No cache, no replicas - direct storage provider read 255 return self._storage_provider.get_object(path, byte_range=byte_range) 256 257 def _read_from_replica_or_primary(self, path: str) -> bytes: 258 """ 259 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files. 260 """ 261 if self._replica_manager is None: 262 raise RuntimeError("Replica manager is not initialized") 263 file_obj = BytesIO() 264 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider) 265 return file_obj.getvalue() 266
[docs] 267 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 268 """ 269 Retrieves metadata or information about an object stored at the specified path. 270 271 :param path: The logical path to the object for which metadata or information is being retrieved. 272 :param strict: If True, performs additional validation to determine whether the path refers to a directory. 273 274 :return: A dictionary containing metadata about the object. 275 """ 276 277 if not path or path == ".": # for the empty path provided by the user 278 if self._is_posix_file_storage_provider(): 279 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc) 280 else: 281 last_modified = AWARE_DATETIME_MIN 282 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified) 283 284 if not self._metadata_provider: 285 return self._storage_provider.get_object_metadata(path, strict=strict) 286 287 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory. 288 try: 289 return self._metadata_provider.get_object_metadata(path) 290 except FileNotFoundError: 291 # Try listing from the parent to determine if path is a valid directory 292 parent = os.path.dirname(path.rstrip("/")) + "/" 293 parent = "" if parent == "/" else parent 294 target = path.rstrip("/") + "/" 295 296 try: 297 entries = self._metadata_provider.list_objects(parent, include_directories=True) 298 for entry in entries: 299 if entry.key == target and entry.type == "directory": 300 return entry 301 except Exception: 302 pass 303 raise # Raise original FileNotFoundError
304 305 @retry 306 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 307 """ 308 Downloads a file to the local file system. 309 310 :param remote_path: The logical path of the file in the storage provider. 311 :param local_path: The local path where the file should be downloaded. 312 """ 313 314 if self._metadata_provider: 315 real_path, exists = self._metadata_provider.realpath(remote_path) 316 if not exists: 317 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.") 318 metadata = self._metadata_provider.get_object_metadata(remote_path) 319 self._storage_provider.download_file(real_path, local_path, metadata) 320 elif self._replica_manager: 321 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider) 322 else: 323 self._storage_provider.download_file(remote_path, local_path) 324 325 @retry 326 def upload_file(self, remote_path: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None: 327 """ 328 Uploads a file from the local file system. 329 330 :param remote_path: The logical path where the file should be stored. 331 :param local_path: The local path of the file to upload. 332 :param attributes: The attributes to add to the file. 333 """ 334 virtual_path = remote_path 335 if self._metadata_provider: 336 remote_path, exists = self._metadata_provider.realpath(remote_path) 337 if exists: 338 raise FileExistsError( 339 f"The file at path '{virtual_path}' already exists; " 340 f"overwriting is not yet allowed when using a metadata provider." 341 ) 342 if self._metadata_provider: 343 # if metdata provider is present, we only write attributes to the metadata provider 344 self._storage_provider.upload_file(remote_path, local_path, attributes=None) 345 obj_metadata = self._storage_provider.get_object_metadata(remote_path) 346 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 347 with self._metadata_provider_lock or contextlib.nullcontext(): 348 self._metadata_provider.add_file(virtual_path, obj_metadata) 349 else: 350 self._storage_provider.upload_file(remote_path, local_path, attributes) 351 352 @retry 353 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None: 354 """ 355 Writes an object at the specified path. 356 357 :param path: The logical path where the object should be written. 358 :param body: The content to write to the object. 359 :param attributes: The attributes to add to the file. 360 """ 361 virtual_path = path 362 if self._metadata_provider: 363 path, exists = self._metadata_provider.realpath(path) 364 if exists: 365 raise FileExistsError( 366 f"The file at path '{virtual_path}' already exists; " 367 f"overwriting is not yet allowed when using a metadata provider." 368 ) 369 if self._metadata_provider: 370 # if metadata provider is present, we only write attributes to the metadata provider 371 self._storage_provider.put_object(path, body, attributes=None) 372 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 373 obj_metadata = self._storage_provider.get_object_metadata(path) 374 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 375 with self._metadata_provider_lock or contextlib.nullcontext(): 376 self._metadata_provider.add_file(virtual_path, obj_metadata) 377 else: 378 self._storage_provider.put_object(path, body, attributes=attributes) 379
[docs] 380 def copy(self, src_path: str, dest_path: str) -> None: 381 """ 382 Copies an object from source to destination path. 383 384 :param src_path: The logical path of the source object to copy. 385 :param dest_path: The logical path of the destination. 386 """ 387 virtual_dest_path = dest_path 388 if self._metadata_provider: 389 src_path, exists = self._metadata_provider.realpath(src_path) 390 if not exists: 391 raise FileNotFoundError(f"The file at path '{src_path}' was not found.") 392 393 dest_path, exists = self._metadata_provider.realpath(dest_path) 394 if exists: 395 raise FileExistsError( 396 f"The file at path '{virtual_dest_path}' already exists; " 397 f"overwriting is not yet allowed when using a metadata provider." 398 ) 399 400 self._storage_provider.copy_object(src_path, dest_path) 401 if self._metadata_provider: 402 metadata = self._storage_provider.get_object_metadata(dest_path) 403 with self._metadata_provider_lock or contextlib.nullcontext(): 404 self._metadata_provider.add_file(virtual_dest_path, metadata)
405
[docs] 406 def delete(self, path: str, recursive: bool = False) -> None: 407 """ 408 Deletes an object at the specified path. 409 410 :param path: The logical path of the object or directory to delete. 411 :param recursive: Whether to delete objects in the path recursively. 412 """ 413 obj_metadata = self.info(path) 414 is_dir = obj_metadata and obj_metadata.type == "directory" 415 is_file = obj_metadata and obj_metadata.type == "file" 416 if recursive and is_dir: 417 self.sync_from( 418 NullStorageClient(), 419 path, 420 path, 421 delete_unmatched_files=True, 422 num_worker_processes=1, 423 description="Deleting", 424 ) 425 # If this is a posix storage provider, we need to also delete remaining directory stubs. 426 if self._is_posix_file_storage_provider(): 427 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider) 428 posix_storage_provider.rmtree(path) 429 return 430 else: 431 # 1) If path is a file: delete the file 432 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag 433 if is_file: 434 virtual_path = path 435 if self._metadata_provider: 436 path, exists = self._metadata_provider.realpath(path) 437 if not exists: 438 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.") 439 with self._metadata_provider_lock or contextlib.nullcontext(): 440 self._metadata_provider.remove_file(virtual_path) 441 442 # Delete the cached file if it exists 443 if self._is_cache_enabled(): 444 if self._cache_manager is None: 445 raise RuntimeError("Cache manager is not initialized") 446 self._cache_manager.delete(virtual_path) 447 self._storage_provider.delete_object(path) 448 449 # Delete from replicas if replica manager exists 450 if self._replica_manager: 451 self._replica_manager.delete_from_replicas(virtual_path) 452 elif is_dir: 453 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.") 454 else: 455 raise FileNotFoundError(f"The file at '{path}' was not found.")
456
[docs] 457 def glob( 458 self, 459 pattern: str, 460 include_url_prefix: bool = False, 461 attribute_filter_expression: Optional[str] = None, 462 ) -> list[str]: 463 """ 464 Matches and retrieves a list of objects in the storage provider that 465 match the specified pattern. 466 467 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``). 468 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 469 :param attribute_filter_expression: The attribute filter expression to apply to the result. 470 471 :return: A list of object paths that match the pattern. 472 """ 473 if self._metadata_provider: 474 results = self._metadata_provider.glob(pattern, attribute_filter_expression) 475 else: 476 results = self._storage_provider.glob(pattern, attribute_filter_expression) 477 478 if include_url_prefix: 479 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 480 481 return results
482
[docs] 483 def list( 484 self, 485 prefix: str = "", 486 path: str = "", 487 start_after: Optional[str] = None, 488 end_at: Optional[str] = None, 489 include_directories: bool = False, 490 include_url_prefix: bool = False, 491 attribute_filter_expression: Optional[str] = None, 492 show_attributes: bool = False, 493 ) -> Iterator[ObjectMetadata]: 494 """ 495 Lists objects in the storage provider under the specified path. 496 497 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is 498 deprecated and will be removed in a future version. 499 500 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under. 501 :param path: The directory or file path to list objects under. This should be a 502 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 503 Cannot be used together with ``prefix``. 504 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 505 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 506 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 507 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 508 :param attribute_filter_expression: The attribute filter expression to apply to the result. 509 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to True. 510 511 :return: An iterator over objects. 512 513 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty). 514 """ 515 # Parameter validation - either path or prefix, not both 516 if path and prefix: 517 raise ValueError( 518 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 519 f"Please use only the 'path' parameter for new code. " 520 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 521 ) 522 elif prefix: 523 logger.debug( 524 f"The 'prefix' parameter is deprecated and will be removed in a future version. " 525 f"Please use the 'path' parameter instead. " 526 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 527 ) 528 529 # Use path if provided, otherwise fall back to prefix 530 effective_path = path if path else prefix 531 if effective_path and not self.is_file(effective_path): 532 effective_path = ( 533 effective_path.rstrip("/") + "/" 534 ) # assume it's a directory if the effective path is not empty 535 536 if self._metadata_provider: 537 objects = self._metadata_provider.list_objects( 538 effective_path, 539 start_after=start_after, 540 end_at=end_at, 541 include_directories=include_directories, 542 attribute_filter_expression=attribute_filter_expression, 543 show_attributes=show_attributes, 544 ) 545 else: 546 objects = self._storage_provider.list_objects( 547 effective_path, 548 start_after=start_after, 549 end_at=end_at, 550 include_directories=include_directories, 551 attribute_filter_expression=attribute_filter_expression, 552 show_attributes=show_attributes, 553 ) 554 555 for object in objects: 556 if include_url_prefix: 557 if self.is_default_profile(): 558 object.key = str(PurePosixPath("/") / object.key) 559 else: 560 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key) 561 yield object
562
[docs] 563 def open( 564 self, 565 path: str, 566 mode: str = "rb", 567 buffering: int = -1, 568 encoding: Optional[str] = None, 569 disable_read_cache: bool = False, 570 memory_load_limit: int = MEMORY_LOAD_LIMIT, 571 atomic: bool = True, 572 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 573 attributes: Optional[dict[str, str]] = None, 574 prefetch_file: bool = True, 575 ) -> Union[PosixFile, ObjectFile]: 576 """ 577 Returns a file-like object from the specified path. 578 579 :param path: The logical path of the object to read. 580 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported. 581 :param buffering: The buffering mode. Only applies to PosixFile. 582 :param encoding: The encoding to use for text files. 583 :param disable_read_cache: When set to True, disables caching for the file content. 584 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 585 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 586 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 587 :param atomic: When set to True, the file will be written atomically (rename upon close). 588 This parameter is only applicable to PosixFile in write mode. 589 :param check_source_version: Whether to check the source version of cached objects. 590 :param attributes: The attributes to add to the file. This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". 591 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 592 """ 593 if self._is_posix_file_storage_provider(): 594 return PosixFile( 595 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes 596 ) 597 else: 598 if atomic is False: 599 logger.warning("Non-atomic writes are not supported for object storage providers.") 600 601 return ObjectFile( 602 self, 603 remote_path=path, 604 mode=mode, 605 encoding=encoding, 606 disable_read_cache=disable_read_cache, 607 memory_load_limit=memory_load_limit, 608 check_source_version=check_source_version, 609 attributes=attributes, 610 prefetch_file=prefetch_file, 611 )
612
[docs] 613 def is_file(self, path: str) -> bool: 614 """ 615 Checks whether the specified path points to a file (rather than a directory or folder). 616 617 :param path: The logical path to check. 618 619 :return: ``True`` if the path points to a file, ``False`` otherwise. 620 """ 621 if self._metadata_provider: 622 _, exists = self._metadata_provider.realpath(path) 623 return exists 624 return self._storage_provider.is_file(path)
625
[docs] 626 def commit_metadata(self, prefix: Optional[str] = None) -> None: 627 """ 628 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 629 630 :param prefix: If provided, scans the prefix to find files to commit. 631 """ 632 if self._metadata_provider: 633 with self._metadata_provider_lock or contextlib.nullcontext(): 634 if prefix: 635 # The logical path for each item will be the physical path with 636 # the base physical path removed from the beginning. 637 physical_base, _ = self._metadata_provider.realpath("") 638 physical_prefix, _ = self._metadata_provider.realpath(prefix) 639 for obj in self._storage_provider.list_objects(physical_prefix): 640 virtual_path = obj.key[len(physical_base) :].lstrip("/") 641 self._metadata_provider.add_file(virtual_path, obj) 642 self._metadata_provider.commit_updates()
643
[docs] 644 def is_empty(self, path: str) -> bool: 645 """ 646 Checks whether the specified path is empty. A path is considered empty if there are no 647 objects whose keys start with the given path as a prefix. 648 649 :param path: The logical path to check. This is typically a prefix representing a directory or folder. 650 651 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 652 """ 653 if self._metadata_provider: 654 objects = self._metadata_provider.list_objects(path) 655 else: 656 objects = self._storage_provider.list_objects(path) 657 658 try: 659 return next(objects) is None 660 except StopIteration: 661 pass 662 return True
663 664 def __getstate__(self) -> dict[str, Any]: 665 state = self.__dict__.copy() 666 del state["_credentials_provider"] 667 del state["_storage_provider"] 668 del state["_metadata_provider"] 669 del state["_cache_manager"] 670 671 if "_metadata_provider_lock" in state: 672 del state["_metadata_provider_lock"] 673 674 if "_replicas" in state: 675 del state["_replicas"] 676 677 # Replica manager could be disabled if it's set to None in the state. 678 if "_replica_manager" in state: 679 if state["_replica_manager"] is not None: 680 del state["_replica_manager"] 681 682 return state 683 684 def __setstate__(self, state: dict[str, Any]) -> None: 685 config = state["_config"] 686 self._initialize_providers(config) 687 688 # Replica manager could be disabled if it's set to None in the state. 689 if "_replica_manager" in state and state["_replica_manager"] is None: 690 self._replica_manager = None 691 else: 692 self._initialize_replicas(config.replicas) 693 694 if self._metadata_provider: 695 self._metadata_provider_lock = threading.Lock() 696
[docs] 697 def sync_from( 698 self, 699 source_client: "StorageClient", 700 source_path: str = "", 701 target_path: str = "", 702 delete_unmatched_files: bool = False, 703 description: str = "Syncing", 704 num_worker_processes: Optional[int] = None, 705 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 706 patterns: Optional[PatternList] = None, 707 preserve_source_attributes: bool = False, 708 ) -> None: 709 """ 710 Syncs files from the source storage client to "path/". 711 712 :param source_client: The source storage client. 713 :param source_path: The logical path to sync from. 714 :param target_path: The logical path to sync to. 715 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 716 :param description: Description of sync process for logging purposes. 717 :param num_worker_processes: The number of worker processes to use. 718 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 719 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 720 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization. 721 When False (default), only file content is copied. When True, custom metadata attributes are also preserved. 722 723 .. warning:: 724 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD 725 request for each object to retrieve attributes, which can significantly impact performance on large-scale 726 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile. 727 """ 728 # Create internal PatternMatcher from patterns if provided 729 pattern_matcher = PatternMatcher(patterns) if patterns else None 730 731 # Disable the replica manager during sync 732 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager: 733 source_client = StorageClient(source_client._config) 734 source_client._replica_manager = None 735 736 m = SyncManager(source_client, source_path, self, target_path) 737 738 m.sync_objects( 739 execution_mode=execution_mode, 740 description=description, 741 num_worker_processes=num_worker_processes, 742 delete_unmatched_files=delete_unmatched_files, 743 pattern_matcher=pattern_matcher, 744 preserve_source_attributes=preserve_source_attributes, 745 )
746
[docs] 747 def sync_replicas( 748 self, 749 source_path: str, 750 replica_indices: Optional[List[int]] = None, 751 delete_unmatched_files: bool = False, 752 description: str = "Syncing replica", 753 num_worker_processes: Optional[int] = None, 754 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 755 patterns: Optional[PatternList] = None, 756 ) -> None: 757 """ 758 Sync files from the source storage client to target replicas. 759 760 :param source_path: The logical path to sync from. 761 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0. 762 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 763 :param description: Description of sync process for logging purposes. 764 :param num_worker_processes: The number of worker processes to use. 765 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 766 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 767 """ 768 if not self._replicas: 769 logger.warning( 770 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable " 771 "secondary storage locations for redundancy and performance.", 772 self._config.profile, 773 ) 774 return None 775 776 if replica_indices: 777 try: 778 replicas = [self._replicas[i] for i in replica_indices] 779 except IndexError as e: 780 raise ValueError(f"Replica index out of range: {replica_indices}") from e 781 else: 782 replicas = self._replicas 783 784 # Disable the replica manager during sync 785 if self._replica_manager: 786 source_client = StorageClient(self._config) 787 source_client._replica_manager = None 788 else: 789 source_client = self 790 791 for replica in replicas: 792 replica.sync_from( 793 source_client, 794 source_path, 795 source_path, 796 delete_unmatched_files=delete_unmatched_files, 797 description=f"{description} ({replica.profile})", 798 num_worker_processes=num_worker_processes, 799 execution_mode=execution_mode, 800 patterns=patterns, 801 )