Source code for multistorageclient.client

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import atexit
 17import contextlib
 18import logging
 19import os
 20import threading
 21from collections.abc import Iterator
 22from datetime import datetime, timezone
 23from io import BytesIO
 24from pathlib import PurePosixPath
 25from typing import IO, Any, List, Optional, Union, cast
 26
 27from .config import StorageClientConfig
 28from .constants import MEMORY_LOAD_LIMIT
 29from .file import ObjectFile, PosixFile
 30from .providers.posix_file import PosixFileStorageProvider
 31from .replica_manager import ReplicaManager
 32from .retry import retry
 33from .sync import SyncManager
 34from .types import (
 35    AWARE_DATETIME_MIN,
 36    MSC_PROTOCOL,
 37    ExecutionMode,
 38    ObjectMetadata,
 39    PatternList,
 40    Range,
 41    Replica,
 42    SourceVersionCheckMode,
 43)
 44from .utils import NullStorageClient, PatternMatcher, join_paths
 45
 46logger = logging.getLogger(__name__)
 47
 48
[docs] 49class StorageClient: 50 """ 51 A client for interacting with different storage providers. 52 """ 53 54 _config: StorageClientConfig 55 _metadata_provider_lock: Optional[threading.Lock] = None 56 _stop_event: Optional[threading.Event] = None 57 _replica_manager: Optional[ReplicaManager] = None 58 59 def __init__(self, config: StorageClientConfig): 60 """ 61 Initializes the :py:class:`StorageClient` with the given configuration. 62 63 :param config: The configuration object for the storage client. 64 """ 65 self._initialize_providers(config) 66 self._initialize_replicas(config.replicas) 67 68 def _initialize_providers(self, config: StorageClientConfig) -> None: 69 self._config = config 70 self._credentials_provider = self._config.credentials_provider 71 self._storage_provider = self._config.storage_provider 72 self._metadata_provider = self._config.metadata_provider 73 self._cache_config = self._config.cache_config 74 self._retry_config = self._config.retry_config 75 self._cache_manager = self._config.cache_manager 76 self._autocommit_config = self._config.autocommit_config 77 78 if self._autocommit_config: 79 if self._metadata_provider: 80 logger.debug("Creating auto-commiter thread") 81 82 if self._autocommit_config.interval_minutes: 83 self._stop_event = threading.Event() 84 self._commit_thread = threading.Thread( 85 target=self._committer_thread, 86 daemon=True, 87 args=(self._autocommit_config.interval_minutes, self._stop_event), 88 ) 89 self._commit_thread.start() 90 91 if self._autocommit_config.at_exit: 92 atexit.register(self._commit_on_exit) 93 94 self._metadata_provider_lock = threading.Lock() 95 else: 96 logger.debug("No metadata provider configured, auto-commit will not be enabled") 97 98 def _initialize_replicas(self, replicas: list[Replica]) -> None: 99 """Initialize replica StorageClient instances.""" 100 # Sort replicas by read_priority, the first one is the primary replica 101 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority) 102 103 replica_clients = [] 104 for replica in sorted_replicas: 105 if self._config._config_dict is None: 106 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config") 107 replica_config = StorageClientConfig.from_dict( 108 config_dict=self._config._config_dict, profile=replica.replica_profile 109 ) 110 111 storage_client = StorageClient(config=replica_config) 112 replica_clients.append(storage_client) 113 114 self._replicas = replica_clients 115 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None 116 117 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event): 118 if not stop_event: 119 raise RuntimeError("Stop event not set") 120 121 while not stop_event.is_set(): 122 # Wait with the ability to exit early 123 if stop_event.wait(timeout=commit_interval_minutes * 60): 124 break 125 logger.debug("Auto-committing to metadata provider") 126 self.commit_metadata() 127 128 def _commit_on_exit(self): 129 logger.debug("Shutting down, committing metadata one last time...") 130 self.commit_metadata() 131 132 def _get_source_version(self, path: str) -> Optional[str]: 133 """ 134 Get etag from metadata provider or storage provider. 135 """ 136 if self._metadata_provider: 137 metadata = self._metadata_provider.get_object_metadata(path) 138 else: 139 metadata = self._storage_provider.get_object_metadata(path) 140 return metadata.etag 141 142 def _is_cache_enabled(self) -> bool: 143 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider() 144 return enabled 145 146 def _is_posix_file_storage_provider(self) -> bool: 147 return isinstance(self._storage_provider, PosixFileStorageProvider) 148 149 def _is_rust_client_enabled(self) -> bool: 150 """ 151 Return True if the storage provider is using the Rust client. 152 """ 153 return hasattr(self._storage_provider, "_rust_client") 154 155 def _read_from_replica_or_primary(self, path: str) -> bytes: 156 """ 157 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files. 158 """ 159 if self._replica_manager is None: 160 raise RuntimeError("Replica manager is not initialized") 161 file_obj = BytesIO() 162 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider) 163 return file_obj.getvalue() 164 165 def __del__(self): 166 if self._stop_event: 167 self._stop_event.set() 168 if self._commit_thread.is_alive(): 169 self._commit_thread.join(timeout=5.0) 170 171 def __getstate__(self) -> dict[str, Any]: 172 state = self.__dict__.copy() 173 del state["_credentials_provider"] 174 del state["_storage_provider"] 175 del state["_metadata_provider"] 176 del state["_cache_manager"] 177 178 if "_metadata_provider_lock" in state: 179 del state["_metadata_provider_lock"] 180 181 if "_replicas" in state: 182 del state["_replicas"] 183 184 # Replica manager could be disabled if it's set to None in the state. 185 if "_replica_manager" in state: 186 if state["_replica_manager"] is not None: 187 del state["_replica_manager"] 188 189 return state 190 191 def __setstate__(self, state: dict[str, Any]) -> None: 192 config = state["_config"] 193 self._initialize_providers(config) 194 195 # Replica manager could be disabled if it's set to None in the state. 196 if "_replica_manager" in state and state["_replica_manager"] is None: 197 self._replica_manager = None 198 else: 199 self._initialize_replicas(config.replicas) 200 201 if self._metadata_provider: 202 self._metadata_provider_lock = threading.Lock() 203
[docs] 204 def is_default_profile(self) -> bool: 205 """ 206 Return True if the storage client is using the default profile. 207 """ 208 return self._config.profile == "default"
209 210 @property 211 def profile(self) -> str: 212 """ 213 :return: The profile name of the storage client. 214 """ 215 return self._config.profile 216 217 @property 218 def replicas(self) -> list["StorageClient"]: 219 """ 220 :return: StorageClient instances for all replicas, sorted by read priority. 221 """ 222 return self._replicas 223 224 @retry 225 def read( 226 self, 227 path: str, 228 byte_range: Optional[Range] = None, 229 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 230 ) -> bytes: 231 """ 232 Reads an object from the specified logical path. 233 234 :param path: The logical path of the object to read. 235 :param byte_range: Optional byte range to read. 236 :param check_source_version: Whether to check the source version of cached objects. 237 :param size: Optional file size for range reads. 238 :return: The content of the object. 239 """ 240 if self._metadata_provider: 241 path, exists = self._metadata_provider.realpath(path) 242 if not exists: 243 raise FileNotFoundError(f"The file at path '{path}' was not found.") 244 245 # Handle caching logic 246 if self._is_cache_enabled() and self._cache_manager: 247 if byte_range: 248 # Range request with cache 249 try: 250 if check_source_version == SourceVersionCheckMode.ENABLE: 251 metadata = self._storage_provider.get_object_metadata(path) 252 source_version = metadata.etag 253 elif check_source_version == SourceVersionCheckMode.INHERIT: 254 if self._cache_manager.check_source_version(): 255 metadata = self._storage_provider.get_object_metadata(path) 256 source_version = metadata.etag 257 else: 258 metadata = None 259 source_version = None 260 else: 261 metadata = None 262 source_version = None 263 264 data = self._cache_manager.read( 265 key=path, 266 source_version=source_version, 267 byte_range=byte_range, 268 storage_provider=self._storage_provider, 269 ) 270 if data is not None: 271 return data 272 # Fallback (should not normally happen) 273 return self._storage_provider.get_object(path, byte_range=byte_range) 274 except (FileNotFoundError, Exception): 275 # Fall back to direct read if metadata fetching fails 276 return self._storage_provider.get_object(path, byte_range=byte_range) 277 else: 278 # Full file read with cache 279 source_version = self._get_source_version(path) 280 data = self._cache_manager.read(path, source_version) 281 282 # Read from cache if the file exists 283 if self._is_cache_enabled(): 284 if self._cache_manager is None: 285 raise RuntimeError("Cache manager is not initialized") 286 source_version = self._get_source_version(path) 287 data = self._cache_manager.read(path, source_version) 288 289 if data is None: 290 if self._replica_manager: 291 data = self._read_from_replica_or_primary(path) 292 else: 293 data = self._storage_provider.get_object(path) 294 self._cache_manager.set(path, data, source_version) 295 return data 296 elif self._replica_manager: 297 # No cache, but replicas available 298 return self._read_from_replica_or_primary(path) 299 else: 300 # No cache, no replicas - direct storage provider read 301 return self._storage_provider.get_object(path, byte_range=byte_range) 302
[docs] 303 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 304 """ 305 Retrieves metadata or information about an object stored at the specified path. 306 307 :param path: The logical path to the object for which metadata or information is being retrieved. 308 :param strict: If True, performs additional validation to determine whether the path refers to a directory. 309 310 :return: A dictionary containing metadata about the object. 311 """ 312 if not path or path == ".": # for the empty path provided by the user 313 if self._is_posix_file_storage_provider(): 314 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc) 315 else: 316 last_modified = AWARE_DATETIME_MIN 317 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified) 318 319 if not self._metadata_provider: 320 return self._storage_provider.get_object_metadata(path, strict=strict) 321 322 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory. 323 # TODO: Consider passing strict argument to the metadata provider. 324 try: 325 return self._metadata_provider.get_object_metadata(path) 326 except FileNotFoundError: 327 # Try listing from the parent to determine if path is a valid directory 328 parent = os.path.dirname(path.rstrip("/")) + "/" 329 parent = "" if parent == "/" else parent 330 target = path.rstrip("/") + "/" 331 332 try: 333 entries = self._metadata_provider.list_objects(parent, include_directories=True) 334 for entry in entries: 335 if entry.key == target and entry.type == "directory": 336 return entry 337 except Exception: 338 pass 339 raise # Raise original FileNotFoundError
340 341 @retry 342 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 343 """ 344 Downloads a file to the local file system. 345 346 :param remote_path: The logical path of the file in the storage provider. 347 :param local_path: The local path where the file should be downloaded. 348 """ 349 if self._metadata_provider: 350 real_path, exists = self._metadata_provider.realpath(remote_path) 351 if not exists: 352 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.") 353 354 metadata = self._metadata_provider.get_object_metadata(remote_path) 355 self._storage_provider.download_file(real_path, local_path, metadata) 356 elif self._replica_manager: 357 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider) 358 else: 359 self._storage_provider.download_file(remote_path, local_path) 360 361 @retry 362 def upload_file(self, remote_path: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None: 363 """ 364 Uploads a file from the local file system. 365 366 :param remote_path: The logical path where the file should be stored. 367 :param local_path: The local path of the file to upload. 368 :param attributes: The attributes to add to the file. 369 """ 370 virtual_path = remote_path 371 if self._metadata_provider: 372 remote_path, exists = self._metadata_provider.realpath(remote_path) 373 if exists: 374 raise FileExistsError( 375 f"The file at path '{virtual_path}' already exists; " 376 f"overwriting is not yet allowed when using a metadata provider." 377 ) 378 379 # if metdata provider is present, we only write attributes to the metadata provider 380 self._storage_provider.upload_file(remote_path, local_path, attributes=None) 381 382 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 383 obj_metadata = self._storage_provider.get_object_metadata(remote_path) 384 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 385 with self._metadata_provider_lock or contextlib.nullcontext(): 386 self._metadata_provider.add_file(virtual_path, obj_metadata) 387 else: 388 self._storage_provider.upload_file(remote_path, local_path, attributes) 389 390 @retry 391 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None: 392 """ 393 Writes an object at the specified path. 394 395 :param path: The logical path where the object should be written. 396 :param body: The content to write to the object. 397 :param attributes: The attributes to add to the file. 398 """ 399 virtual_path = path 400 if self._metadata_provider: 401 path, exists = self._metadata_provider.realpath(path) 402 if exists: 403 raise FileExistsError( 404 f"The file at path '{virtual_path}' already exists; " 405 f"overwriting is not yet allowed when using a metadata provider." 406 ) 407 408 # if metadata provider is present, we only write attributes to the metadata provider 409 self._storage_provider.put_object(path, body, attributes=None) 410 411 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 412 obj_metadata = self._storage_provider.get_object_metadata(path) 413 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 414 with self._metadata_provider_lock or contextlib.nullcontext(): 415 self._metadata_provider.add_file(virtual_path, obj_metadata) 416 else: 417 self._storage_provider.put_object(path, body, attributes=attributes) 418
[docs] 419 def copy(self, src_path: str, dest_path: str) -> None: 420 """ 421 Copies an object from source to destination path. 422 423 :param src_path: The logical path of the source object to copy. 424 :param dest_path: The logical path of the destination. 425 """ 426 virtual_dest_path = dest_path 427 if self._metadata_provider: 428 src_path, exists = self._metadata_provider.realpath(src_path) 429 if not exists: 430 raise FileNotFoundError(f"The file at path '{src_path}' was not found.") 431 432 dest_path, exists = self._metadata_provider.realpath(dest_path) 433 if exists: 434 raise FileExistsError( 435 f"The file at path '{virtual_dest_path}' already exists; " 436 f"overwriting is not yet allowed when using a metadata provider." 437 ) 438 439 self._storage_provider.copy_object(src_path, dest_path) 440 441 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 442 obj_metadata = self._storage_provider.get_object_metadata(dest_path) 443 with self._metadata_provider_lock or contextlib.nullcontext(): 444 self._metadata_provider.add_file(virtual_dest_path, obj_metadata) 445 else: 446 self._storage_provider.copy_object(src_path, dest_path)
447
[docs] 448 def delete(self, path: str, recursive: bool = False) -> None: 449 """ 450 Deletes an object at the specified path. 451 452 :param path: The logical path of the object or directory to delete. 453 :param recursive: Whether to delete objects in the path recursively. 454 """ 455 obj_metadata = self.info(path) 456 is_dir = obj_metadata and obj_metadata.type == "directory" 457 is_file = obj_metadata and obj_metadata.type == "file" 458 if recursive and is_dir: 459 self.sync_from( 460 cast(StorageClient, NullStorageClient()), 461 path, 462 path, 463 delete_unmatched_files=True, 464 num_worker_processes=1, 465 description="Deleting", 466 ) 467 # If this is a posix storage provider, we need to also delete remaining directory stubs. 468 # TODO: Nofity metadata provider for the changes. 469 if self._is_posix_file_storage_provider(): 470 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider) 471 posix_storage_provider.rmtree(path) 472 return 473 else: 474 # 1) If path is a file: delete the file 475 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag 476 if is_file: 477 virtual_path = path 478 if self._metadata_provider: 479 path, exists = self._metadata_provider.realpath(path) 480 if not exists: 481 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.") 482 483 self._storage_provider.delete_object(path) 484 485 with self._metadata_provider_lock or contextlib.nullcontext(): 486 self._metadata_provider.remove_file(virtual_path) 487 else: 488 self._storage_provider.delete_object(path) 489 490 # Delete the cached file if it exists 491 if self._is_cache_enabled(): 492 if self._cache_manager is None: 493 raise RuntimeError("Cache manager is not initialized") 494 self._cache_manager.delete(virtual_path) 495 496 # Delete from replicas if replica manager exists 497 if self._replica_manager: 498 self._replica_manager.delete_from_replicas(virtual_path) 499 elif is_dir: 500 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.") 501 else: 502 raise FileNotFoundError(f"The file at '{path}' was not found.")
503
[docs] 504 def glob( 505 self, 506 pattern: str, 507 include_url_prefix: bool = False, 508 attribute_filter_expression: Optional[str] = None, 509 ) -> list[str]: 510 """ 511 Matches and retrieves a list of objects in the storage provider that 512 match the specified pattern. 513 514 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``). 515 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 516 :param attribute_filter_expression: The attribute filter expression to apply to the result. 517 518 :return: A list of object paths that match the pattern. 519 """ 520 if self._metadata_provider: 521 results = self._metadata_provider.glob(pattern, attribute_filter_expression) 522 else: 523 results = self._storage_provider.glob(pattern, attribute_filter_expression) 524 525 if include_url_prefix: 526 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 527 528 return results
529
[docs] 530 def list( 531 self, 532 prefix: str = "", 533 path: str = "", 534 start_after: Optional[str] = None, 535 end_at: Optional[str] = None, 536 include_directories: bool = False, 537 include_url_prefix: bool = False, 538 attribute_filter_expression: Optional[str] = None, 539 show_attributes: bool = False, 540 follow_symlinks: bool = True, 541 ) -> Iterator[ObjectMetadata]: 542 """ 543 Lists objects in the storage provider under the specified path. 544 545 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is 546 deprecated and will be removed in a future version. 547 548 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under. 549 :param path: The directory or file path to list objects under. This should be a 550 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 551 Cannot be used together with ``prefix``. 552 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 553 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 554 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 555 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 556 :param attribute_filter_expression: The attribute filter expression to apply to the result. 557 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to True. 558 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When False, symlinks are skipped during listing. 559 560 :return: An iterator over objects. 561 562 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty). 563 """ 564 # Parameter validation - either path or prefix, not both 565 if path and prefix: 566 raise ValueError( 567 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 568 f"Please use only the 'path' parameter for new code. " 569 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 570 ) 571 elif prefix: 572 logger.debug( 573 f"The 'prefix' parameter is deprecated and will be removed in a future version. " 574 f"Please use the 'path' parameter instead. " 575 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 576 ) 577 578 # Use path if provided, otherwise fall back to prefix 579 effective_path = path if path else prefix 580 if effective_path and not self.is_file(effective_path): 581 effective_path = ( 582 effective_path.rstrip("/") + "/" 583 ) # assume it's a directory if the effective path is not empty 584 585 if self._metadata_provider: 586 objects = self._metadata_provider.list_objects( 587 effective_path, 588 start_after=start_after, 589 end_at=end_at, 590 include_directories=include_directories, 591 attribute_filter_expression=attribute_filter_expression, 592 show_attributes=show_attributes, 593 ) 594 else: 595 objects = self._storage_provider.list_objects( 596 effective_path, 597 start_after=start_after, 598 end_at=end_at, 599 include_directories=include_directories, 600 attribute_filter_expression=attribute_filter_expression, 601 show_attributes=show_attributes, 602 follow_symlinks=follow_symlinks, 603 ) 604 605 for object in objects: 606 if include_url_prefix: 607 if self.is_default_profile(): 608 object.key = str(PurePosixPath("/") / object.key) 609 else: 610 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key) 611 yield object
612
[docs] 613 def open( 614 self, 615 path: str, 616 mode: str = "rb", 617 buffering: int = -1, 618 encoding: Optional[str] = None, 619 disable_read_cache: bool = False, 620 memory_load_limit: int = MEMORY_LOAD_LIMIT, 621 atomic: bool = True, 622 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 623 attributes: Optional[dict[str, str]] = None, 624 prefetch_file: bool = True, 625 ) -> Union[PosixFile, ObjectFile]: 626 """ 627 Returns a file-like object from the specified path. 628 629 :param path: The logical path of the object to read. 630 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported. 631 :param buffering: The buffering mode. Only applies to PosixFile. 632 :param encoding: The encoding to use for text files. 633 :param disable_read_cache: When set to True, disables caching for the file content. 634 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 635 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 636 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 637 :param atomic: When set to True, the file will be written atomically (rename upon close). 638 This parameter is only applicable to PosixFile in write mode. 639 :param check_source_version: Whether to check the source version of cached objects. 640 :param attributes: The attributes to add to the file. This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". 641 642 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 643 """ 644 if self._is_posix_file_storage_provider(): 645 return PosixFile( 646 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes 647 ) 648 else: 649 if atomic is False: 650 logger.warning("Non-atomic writes are not supported for object storage providers.") 651 652 return ObjectFile( 653 self, 654 remote_path=path, 655 mode=mode, 656 encoding=encoding, 657 disable_read_cache=disable_read_cache, 658 memory_load_limit=memory_load_limit, 659 check_source_version=check_source_version, 660 attributes=attributes, 661 prefetch_file=prefetch_file, 662 )
663
[docs] 664 def get_posix_path(self, path: str) -> Optional[str]: 665 """ 666 Returns the physical POSIX filesystem path for POSIX storage providers. 667 668 :param path: The path to resolve (may be a symlink or virtual path). 669 :return: Physical POSIX filesystem path if POSIX storage, None otherwise. 670 """ 671 if not self._is_posix_file_storage_provider(): 672 return None 673 674 if self._metadata_provider: 675 realpath, _ = self._metadata_provider.realpath(path) 676 else: 677 realpath = path 678 679 return cast(PosixFileStorageProvider, self._storage_provider)._prepend_base_path(realpath)
680
[docs] 681 def is_file(self, path: str) -> bool: 682 """ 683 Checks whether the specified path points to a file (rather than a directory or folder). 684 685 :param path: The logical path to check. 686 687 :return: ``True`` if the path points to a file, ``False`` otherwise. 688 """ 689 if self._metadata_provider: 690 _, exists = self._metadata_provider.realpath(path) 691 return exists 692 693 return self._storage_provider.is_file(path)
694
[docs] 695 def commit_metadata(self, prefix: Optional[str] = None) -> None: 696 """ 697 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 698 699 :param prefix: If provided, scans the prefix to find files to commit. 700 """ 701 if self._metadata_provider: 702 with self._metadata_provider_lock or contextlib.nullcontext(): 703 if prefix: 704 # The logical path for each item will be the physical path with 705 # the base physical path removed from the beginning. 706 physical_base, _ = self._metadata_provider.realpath("") 707 physical_prefix, _ = self._metadata_provider.realpath(prefix) 708 for obj in self._storage_provider.list_objects(physical_prefix): 709 virtual_path = obj.key[len(physical_base) :].lstrip("/") 710 self._metadata_provider.add_file(virtual_path, obj) 711 self._metadata_provider.commit_updates()
712
[docs] 713 def is_empty(self, path: str) -> bool: 714 """ 715 Checks whether the specified path is empty. A path is considered empty if there are no 716 objects whose keys start with the given path as a prefix. 717 718 :param path: The logical path to check. This is typically a prefix representing a directory or folder. 719 720 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 721 """ 722 if self._metadata_provider: 723 objects = self._metadata_provider.list_objects(path) 724 else: 725 objects = self._storage_provider.list_objects(path) 726 727 try: 728 return next(objects) is None 729 except StopIteration: 730 pass 731 732 return True
733
[docs] 734 def sync_from( 735 self, 736 source_client: "StorageClient", 737 source_path: str = "", 738 target_path: str = "", 739 delete_unmatched_files: bool = False, 740 description: str = "Syncing", 741 num_worker_processes: Optional[int] = None, 742 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 743 patterns: Optional[PatternList] = None, 744 preserve_source_attributes: bool = False, 745 follow_symlinks: bool = True, 746 source_files: Optional[List[str]] = None, 747 ignore_hidden: bool = True, 748 ) -> None: 749 """ 750 Syncs files from the source storage client to "path/". 751 752 :param source_client: The source storage client. 753 :param source_path: The logical path to sync from. 754 :param target_path: The logical path to sync to. 755 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 756 :param description: Description of sync process for logging purposes. 757 :param num_worker_processes: The number of worker processes to use. 758 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 759 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 760 Cannot be used together with source_files. 761 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization. 762 When False (default), only file content is copied. When True, custom metadata attributes are also preserved. 763 764 .. warning:: 765 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD 766 request for each object to retrieve attributes, which can significantly impact performance on large-scale 767 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile. 768 769 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is True. 770 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these 771 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns. 772 :param ignore_hidden: Whether to ignore hidden files and directories. Default is True. 773 :raises ValueError: If both source_files and patterns are provided. 774 """ 775 if source_files and patterns: 776 raise ValueError("Cannot specify both 'source_files' and 'patterns'. Please use only one filtering method.") 777 778 pattern_matcher = PatternMatcher(patterns) if patterns else None 779 780 # Disable the replica manager during sync 781 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager: 782 source_client = StorageClient(source_client._config) 783 source_client._replica_manager = None 784 785 m = SyncManager(source_client, source_path, self, target_path) 786 787 m.sync_objects( 788 execution_mode=execution_mode, 789 description=description, 790 num_worker_processes=num_worker_processes, 791 delete_unmatched_files=delete_unmatched_files, 792 pattern_matcher=pattern_matcher, 793 preserve_source_attributes=preserve_source_attributes, 794 follow_symlinks=follow_symlinks, 795 source_files=source_files, 796 ignore_hidden=ignore_hidden, 797 )
798
[docs] 799 def sync_replicas( 800 self, 801 source_path: str, 802 replica_indices: Optional[List[int]] = None, 803 delete_unmatched_files: bool = False, 804 description: str = "Syncing replica", 805 num_worker_processes: Optional[int] = None, 806 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 807 patterns: Optional[PatternList] = None, 808 ignore_hidden: bool = True, 809 ) -> None: 810 """ 811 Sync files from the source storage client to target replicas. 812 813 :param source_path: The logical path to sync from. 814 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0. 815 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 816 :param description: Description of sync process for logging purposes. 817 :param num_worker_processes: The number of worker processes to use. 818 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 819 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 820 :param ignore_hidden: Whether to ignore hidden files and directories (starting with dot). Default is True. 821 """ 822 if not self._replicas: 823 logger.warning( 824 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable " 825 "secondary storage locations for redundancy and performance.", 826 self._config.profile, 827 ) 828 return None 829 830 if replica_indices: 831 try: 832 replicas = [self._replicas[i] for i in replica_indices] 833 except IndexError as e: 834 raise ValueError(f"Replica index out of range: {replica_indices}") from e 835 else: 836 replicas = self._replicas 837 838 # Disable the replica manager during sync 839 if self._replica_manager: 840 source_client = StorageClient(self._config) 841 source_client._replica_manager = None 842 else: 843 source_client = self 844 845 for replica in replicas: 846 replica.sync_from( 847 source_client, 848 source_path, 849 source_path, 850 delete_unmatched_files=delete_unmatched_files, 851 description=f"{description} ({replica.profile})", 852 num_worker_processes=num_worker_processes, 853 execution_mode=execution_mode, 854 patterns=patterns, 855 ignore_hidden=ignore_hidden, 856 )