Source code for multistorageclient.client

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import atexit
 17import contextlib
 18import logging
 19import os
 20import threading
 21from collections.abc import Iterator
 22from datetime import datetime, timezone
 23from io import BytesIO
 24from pathlib import PurePosixPath
 25from typing import IO, Any, List, Optional, Union, cast
 26
 27from .config import StorageClientConfig
 28from .constants import MEMORY_LOAD_LIMIT
 29from .file import ObjectFile, PosixFile
 30from .providers.posix_file import PosixFileStorageProvider
 31from .replica_manager import ReplicaManager
 32from .retry import retry
 33from .sync import SyncManager
 34from .types import (
 35    AWARE_DATETIME_MIN,
 36    MSC_PROTOCOL,
 37    ExecutionMode,
 38    ObjectMetadata,
 39    PatternList,
 40    Range,
 41    Replica,
 42    SourceVersionCheckMode,
 43    StorageProvider,
 44)
 45from .utils import NullStorageClient, PatternMatcher, join_paths
 46
 47logger = logging.getLogger(__name__)
 48
 49
[docs] 50class StorageClient: 51 """ 52 A client for interacting with different storage providers. 53 """ 54 55 _config: StorageClientConfig 56 _storage_provider: StorageProvider 57 _metadata_provider_lock: Optional[threading.Lock] = None 58 _stop_event: Optional[threading.Event] = None 59 _replica_manager: Optional[ReplicaManager] = None 60 61 def __init__(self, config: StorageClientConfig): 62 """ 63 Initializes the :py:class:`StorageClient` with the given configuration. 64 65 :param config: The configuration object for the storage client. 66 """ 67 self._initialize_providers(config) 68 self._initialize_replicas(config.replicas) 69 70 def _initialize_providers(self, config: StorageClientConfig) -> None: 71 # TODO: Support composite clients with storage_provider_profiles 72 if config.storage_provider is None or config.storage_provider_profiles: 73 raise NotImplementedError("StorageClient with storage_provider_profiles is not yet supported.") 74 75 self._config = config 76 self._credentials_provider = self._config.credentials_provider 77 self._storage_provider = cast(StorageProvider, self._config.storage_provider) 78 self._metadata_provider = self._config.metadata_provider 79 self._cache_config = self._config.cache_config 80 self._retry_config = self._config.retry_config 81 self._cache_manager = self._config.cache_manager 82 self._autocommit_config = self._config.autocommit_config 83 84 if self._autocommit_config: 85 if self._metadata_provider: 86 logger.debug("Creating auto-commiter thread") 87 88 if self._autocommit_config.interval_minutes: 89 self._stop_event = threading.Event() 90 self._commit_thread = threading.Thread( 91 target=self._committer_thread, 92 daemon=True, 93 args=(self._autocommit_config.interval_minutes, self._stop_event), 94 ) 95 self._commit_thread.start() 96 97 if self._autocommit_config.at_exit: 98 atexit.register(self._commit_on_exit) 99 100 self._metadata_provider_lock = threading.Lock() 101 else: 102 logger.debug("No metadata provider configured, auto-commit will not be enabled") 103 104 def _initialize_replicas(self, replicas: list[Replica]) -> None: 105 """Initialize replica StorageClient instances.""" 106 # Sort replicas by read_priority, the first one is the primary replica 107 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority) 108 109 replica_clients = [] 110 for replica in sorted_replicas: 111 if self._config._config_dict is None: 112 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config") 113 replica_config = StorageClientConfig.from_dict( 114 config_dict=self._config._config_dict, profile=replica.replica_profile 115 ) 116 117 storage_client = StorageClient(config=replica_config) 118 replica_clients.append(storage_client) 119 120 self._replicas = replica_clients 121 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None 122 123 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event): 124 if not stop_event: 125 raise RuntimeError("Stop event not set") 126 127 while not stop_event.is_set(): 128 # Wait with the ability to exit early 129 if stop_event.wait(timeout=commit_interval_minutes * 60): 130 break 131 logger.debug("Auto-committing to metadata provider") 132 self.commit_metadata() 133 134 def _commit_on_exit(self): 135 logger.debug("Shutting down, committing metadata one last time...") 136 self.commit_metadata() 137 138 def _get_source_version(self, path: str) -> Optional[str]: 139 """ 140 Get etag from metadata provider or storage provider. 141 """ 142 if self._metadata_provider: 143 metadata = self._metadata_provider.get_object_metadata(path) 144 else: 145 metadata = self._storage_provider.get_object_metadata(path) 146 return metadata.etag 147 148 def _is_cache_enabled(self) -> bool: 149 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider() 150 return enabled 151 152 def _is_posix_file_storage_provider(self) -> bool: 153 return isinstance(self._storage_provider, PosixFileStorageProvider) 154 155 def _is_rust_client_enabled(self) -> bool: 156 """ 157 Return True if the storage provider is using the Rust client. 158 """ 159 return hasattr(self._storage_provider, "_rust_client") 160 161 def _read_from_replica_or_primary(self, path: str) -> bytes: 162 """ 163 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files. 164 """ 165 if self._replica_manager is None: 166 raise RuntimeError("Replica manager is not initialized") 167 file_obj = BytesIO() 168 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider) 169 return file_obj.getvalue() 170 171 def __del__(self): 172 if self._stop_event: 173 self._stop_event.set() 174 if self._commit_thread.is_alive(): 175 self._commit_thread.join(timeout=5.0) 176 177 def __getstate__(self) -> dict[str, Any]: 178 state = self.__dict__.copy() 179 del state["_credentials_provider"] 180 del state["_storage_provider"] 181 del state["_metadata_provider"] 182 del state["_cache_manager"] 183 184 if "_metadata_provider_lock" in state: 185 del state["_metadata_provider_lock"] 186 187 if "_replicas" in state: 188 del state["_replicas"] 189 190 # Replica manager could be disabled if it's set to None in the state. 191 if "_replica_manager" in state: 192 if state["_replica_manager"] is not None: 193 del state["_replica_manager"] 194 195 return state 196 197 def __setstate__(self, state: dict[str, Any]) -> None: 198 config = state["_config"] 199 self._initialize_providers(config) 200 201 # Replica manager could be disabled if it's set to None in the state. 202 if "_replica_manager" in state and state["_replica_manager"] is None: 203 self._replica_manager = None 204 else: 205 self._initialize_replicas(config.replicas) 206 207 if self._metadata_provider: 208 self._metadata_provider_lock = threading.Lock() 209
[docs] 210 def is_default_profile(self) -> bool: 211 """ 212 Return True if the storage client is using the default profile. 213 """ 214 return self._config.profile == "default"
215 216 @property 217 def profile(self) -> str: 218 """ 219 :return: The profile name of the storage client. 220 """ 221 return self._config.profile 222 223 @property 224 def replicas(self) -> list["StorageClient"]: 225 """ 226 :return: StorageClient instances for all replicas, sorted by read priority. 227 """ 228 return self._replicas 229 230 @retry 231 def read( 232 self, 233 path: str, 234 byte_range: Optional[Range] = None, 235 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 236 ) -> bytes: 237 """ 238 Reads an object from the specified logical path. 239 240 :param path: The logical path of the object to read. 241 :param byte_range: Optional byte range to read. 242 :param check_source_version: Whether to check the source version of cached objects. 243 :param size: Optional file size for range reads. 244 :return: The content of the object. 245 """ 246 if self._metadata_provider: 247 resolved = self._metadata_provider.realpath(path) 248 if not resolved.exists: 249 raise FileNotFoundError(f"The file at path '{path}' was not found.") 250 path = resolved.physical_path 251 252 # Handle caching logic 253 if self._is_cache_enabled() and self._cache_manager: 254 if byte_range: 255 # Range request with cache 256 try: 257 if check_source_version == SourceVersionCheckMode.ENABLE: 258 metadata = self._storage_provider.get_object_metadata(path) 259 source_version = metadata.etag 260 elif check_source_version == SourceVersionCheckMode.INHERIT: 261 if self._cache_manager.check_source_version(): 262 metadata = self._storage_provider.get_object_metadata(path) 263 source_version = metadata.etag 264 else: 265 metadata = None 266 source_version = None 267 else: 268 metadata = None 269 source_version = None 270 271 data = self._cache_manager.read( 272 key=path, 273 source_version=source_version, 274 byte_range=byte_range, 275 storage_provider=self._storage_provider, 276 ) 277 if data is not None: 278 return data 279 # Fallback (should not normally happen) 280 return self._storage_provider.get_object(path, byte_range=byte_range) 281 except (FileNotFoundError, Exception): 282 # Fall back to direct read if metadata fetching fails 283 return self._storage_provider.get_object(path, byte_range=byte_range) 284 else: 285 # Full file read with cache 286 source_version = self._get_source_version(path) 287 data = self._cache_manager.read(path, source_version) 288 289 # Read from cache if the file exists 290 if self._is_cache_enabled(): 291 if self._cache_manager is None: 292 raise RuntimeError("Cache manager is not initialized") 293 source_version = self._get_source_version(path) 294 data = self._cache_manager.read(path, source_version) 295 296 if data is None: 297 if self._replica_manager: 298 data = self._read_from_replica_or_primary(path) 299 else: 300 data = self._storage_provider.get_object(path) 301 self._cache_manager.set(path, data, source_version) 302 return data 303 elif self._replica_manager: 304 # No cache, but replicas available 305 return self._read_from_replica_or_primary(path) 306 else: 307 # No cache, no replicas - direct storage provider read 308 return self._storage_provider.get_object(path, byte_range=byte_range) 309
[docs] 310 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 311 """ 312 Retrieves metadata or information about an object stored at the specified path. 313 314 :param path: The logical path to the object for which metadata or information is being retrieved. 315 :param strict: If True, performs additional validation to determine whether the path refers to a directory. 316 317 :return: A dictionary containing metadata about the object. 318 """ 319 if not path or path == ".": # for the empty path provided by the user 320 if self._is_posix_file_storage_provider(): 321 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc) 322 else: 323 last_modified = AWARE_DATETIME_MIN 324 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified) 325 326 if not self._metadata_provider: 327 return self._storage_provider.get_object_metadata(path, strict=strict) 328 329 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory. 330 # TODO: Consider passing strict argument to the metadata provider. 331 try: 332 return self._metadata_provider.get_object_metadata(path) 333 except FileNotFoundError: 334 # Try listing from the parent to determine if path is a valid directory 335 parent = os.path.dirname(path.rstrip("/")) + "/" 336 parent = "" if parent == "/" else parent 337 target = path.rstrip("/") + "/" 338 339 try: 340 entries = self._metadata_provider.list_objects(parent, include_directories=True) 341 for entry in entries: 342 if entry.key == target and entry.type == "directory": 343 return entry 344 except Exception: 345 pass 346 raise # Raise original FileNotFoundError
347 348 @retry 349 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 350 """ 351 Downloads a file to the local file system. 352 353 :param remote_path: The logical path of the file in the storage provider. 354 :param local_path: The local path where the file should be downloaded. 355 """ 356 if self._metadata_provider: 357 resolved = self._metadata_provider.realpath(remote_path) 358 if not resolved.exists: 359 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.") 360 361 metadata = self._metadata_provider.get_object_metadata(remote_path) 362 self._storage_provider.download_file(resolved.physical_path, local_path, metadata) 363 elif self._replica_manager: 364 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider) 365 else: 366 self._storage_provider.download_file(remote_path, local_path) 367 368 @retry 369 def upload_file(self, remote_path: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None: 370 """ 371 Uploads a file from the local file system. 372 373 :param remote_path: The logical path where the file should be stored. 374 :param local_path: The local path of the file to upload. 375 :param attributes: The attributes to add to the file. 376 """ 377 virtual_path = remote_path 378 if self._metadata_provider: 379 resolved = self._metadata_provider.realpath(remote_path) 380 if resolved.exists: 381 # File exists 382 if not self._metadata_provider.allow_overwrites(): 383 raise FileExistsError( 384 f"The file at path '{virtual_path}' already exists; " 385 f"overwriting is not allowed when using a metadata provider." 386 ) 387 # Generate path for overwrite (future: may return different path for versioning) 388 remote_path = self._metadata_provider.generate_physical_path( 389 remote_path, for_overwrite=True 390 ).physical_path 391 else: 392 # New file - generate path 393 remote_path = self._metadata_provider.generate_physical_path( 394 remote_path, for_overwrite=False 395 ).physical_path 396 397 # if metdata provider is present, we only write attributes to the metadata provider 398 self._storage_provider.upload_file(remote_path, local_path, attributes=None) 399 400 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 401 obj_metadata = self._storage_provider.get_object_metadata(remote_path) 402 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 403 with self._metadata_provider_lock or contextlib.nullcontext(): 404 self._metadata_provider.add_file(virtual_path, obj_metadata) 405 else: 406 self._storage_provider.upload_file(remote_path, local_path, attributes) 407 408 @retry 409 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None: 410 """ 411 Writes an object at the specified path. 412 413 :param path: The logical path where the object should be written. 414 :param body: The content to write to the object. 415 :param attributes: The attributes to add to the file. 416 """ 417 virtual_path = path 418 if self._metadata_provider: 419 resolved = self._metadata_provider.realpath(path) 420 if resolved.exists: 421 # File exists 422 if not self._metadata_provider.allow_overwrites(): 423 raise FileExistsError( 424 f"The file at path '{virtual_path}' already exists; " 425 f"overwriting is not allowed when using a metadata provider." 426 ) 427 # Generate path for overwrite (future: may return different path for versioning) 428 path = self._metadata_provider.generate_physical_path(path, for_overwrite=True).physical_path 429 else: 430 # New file - generate path 431 path = self._metadata_provider.generate_physical_path(path, for_overwrite=False).physical_path 432 433 # if metadata provider is present, we only write attributes to the metadata provider 434 self._storage_provider.put_object(path, body, attributes=None) 435 436 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 437 obj_metadata = self._storage_provider.get_object_metadata(path) 438 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 439 with self._metadata_provider_lock or contextlib.nullcontext(): 440 self._metadata_provider.add_file(virtual_path, obj_metadata) 441 else: 442 self._storage_provider.put_object(path, body, attributes=attributes) 443
[docs] 444 def copy(self, src_path: str, dest_path: str) -> None: 445 """ 446 Copies an object from source to destination path. 447 448 :param src_path: The logical path of the source object to copy. 449 :param dest_path: The logical path of the destination. 450 """ 451 virtual_dest_path = dest_path 452 if self._metadata_provider: 453 # Source: must exist 454 src_resolved = self._metadata_provider.realpath(src_path) 455 if not src_resolved.exists: 456 raise FileNotFoundError(f"The file at path '{src_path}' was not found.") 457 src_path = src_resolved.physical_path 458 459 # Destination: check for overwrites 460 dest_resolved = self._metadata_provider.realpath(dest_path) 461 if dest_resolved.exists: 462 # Destination exists 463 if not self._metadata_provider.allow_overwrites(): 464 raise FileExistsError( 465 f"The file at path '{virtual_dest_path}' already exists; " 466 f"overwriting is not allowed when using a metadata provider." 467 ) 468 # Generate path for overwrite 469 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=True).physical_path 470 else: 471 # New file - generate path 472 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=False).physical_path 473 474 self._storage_provider.copy_object(src_path, dest_path) 475 476 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 477 obj_metadata = self._storage_provider.get_object_metadata(dest_path) 478 with self._metadata_provider_lock or contextlib.nullcontext(): 479 self._metadata_provider.add_file(virtual_dest_path, obj_metadata) 480 else: 481 self._storage_provider.copy_object(src_path, dest_path)
482
[docs] 483 def delete(self, path: str, recursive: bool = False) -> None: 484 """ 485 Deletes an object at the specified path. 486 487 :param path: The logical path of the object or directory to delete. 488 :param recursive: Whether to delete objects in the path recursively. 489 """ 490 obj_metadata = self.info(path) 491 is_dir = obj_metadata and obj_metadata.type == "directory" 492 is_file = obj_metadata and obj_metadata.type == "file" 493 if recursive and is_dir: 494 self.sync_from( 495 cast(StorageClient, NullStorageClient()), 496 path, 497 path, 498 delete_unmatched_files=True, 499 num_worker_processes=1, 500 description="Deleting", 501 ) 502 # If this is a posix storage provider, we need to also delete remaining directory stubs. 503 # TODO: Nofity metadata provider for the changes. 504 if self._is_posix_file_storage_provider(): 505 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider) 506 posix_storage_provider.rmtree(path) 507 return 508 else: 509 # 1) If path is a file: delete the file 510 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag 511 if is_file: 512 virtual_path = path 513 if self._metadata_provider: 514 resolved = self._metadata_provider.realpath(path) 515 if not resolved.exists: 516 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.") 517 518 self._storage_provider.delete_object(resolved.physical_path) 519 520 with self._metadata_provider_lock or contextlib.nullcontext(): 521 self._metadata_provider.remove_file(virtual_path) 522 else: 523 self._storage_provider.delete_object(path) 524 525 # Delete the cached file if it exists 526 if self._is_cache_enabled(): 527 if self._cache_manager is None: 528 raise RuntimeError("Cache manager is not initialized") 529 self._cache_manager.delete(virtual_path) 530 531 # Delete from replicas if replica manager exists 532 if self._replica_manager: 533 self._replica_manager.delete_from_replicas(virtual_path) 534 elif is_dir: 535 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.") 536 else: 537 raise FileNotFoundError(f"The file at '{path}' was not found.")
538
[docs] 539 def glob( 540 self, 541 pattern: str, 542 include_url_prefix: bool = False, 543 attribute_filter_expression: Optional[str] = None, 544 ) -> list[str]: 545 """ 546 Matches and retrieves a list of objects in the storage provider that 547 match the specified pattern. 548 549 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``). 550 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 551 :param attribute_filter_expression: The attribute filter expression to apply to the result. 552 553 :return: A list of object paths that match the pattern. 554 """ 555 if self._metadata_provider: 556 results = self._metadata_provider.glob(pattern, attribute_filter_expression) 557 else: 558 results = self._storage_provider.glob(pattern, attribute_filter_expression) 559 560 if include_url_prefix: 561 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 562 563 return results
564
[docs] 565 def list( 566 self, 567 prefix: str = "", 568 path: str = "", 569 start_after: Optional[str] = None, 570 end_at: Optional[str] = None, 571 include_directories: bool = False, 572 include_url_prefix: bool = False, 573 attribute_filter_expression: Optional[str] = None, 574 show_attributes: bool = False, 575 follow_symlinks: bool = True, 576 ) -> Iterator[ObjectMetadata]: 577 """ 578 Lists objects in the storage provider under the specified path. 579 580 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is 581 deprecated and will be removed in a future version. 582 583 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under. 584 :param path: The directory or file path to list objects under. This should be a 585 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 586 Cannot be used together with ``prefix``. 587 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 588 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 589 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 590 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 591 :param attribute_filter_expression: The attribute filter expression to apply to the result. 592 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to True. 593 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When False, symlinks are skipped during listing. 594 595 :return: An iterator over objects. 596 597 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty). 598 """ 599 # Parameter validation - either path or prefix, not both 600 if path and prefix: 601 raise ValueError( 602 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 603 f"Please use only the 'path' parameter for new code. " 604 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 605 ) 606 elif prefix: 607 logger.debug( 608 f"The 'prefix' parameter is deprecated and will be removed in a future version. " 609 f"Please use the 'path' parameter instead. " 610 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 611 ) 612 613 # Use path if provided, otherwise fall back to prefix 614 effective_path = path if path else prefix 615 616 if effective_path: 617 if self.is_file(effective_path): 618 try: 619 object_metadata = self.info(effective_path) 620 if include_url_prefix: 621 if self.is_default_profile(): 622 object_metadata.key = str(PurePosixPath("/") / object_metadata.key) 623 else: 624 object_metadata.key = join_paths( 625 f"{MSC_PROTOCOL}{self._config.profile}", object_metadata.key 626 ) 627 yield object_metadata # short circuit if the path is a file 628 return 629 except FileNotFoundError: 630 pass 631 else: 632 effective_path = effective_path.rstrip("/") + "/" 633 634 if self._metadata_provider: 635 objects = self._metadata_provider.list_objects( 636 effective_path, 637 start_after=start_after, 638 end_at=end_at, 639 include_directories=include_directories, 640 attribute_filter_expression=attribute_filter_expression, 641 show_attributes=show_attributes, 642 ) 643 else: 644 objects = self._storage_provider.list_objects( 645 effective_path, 646 start_after=start_after, 647 end_at=end_at, 648 include_directories=include_directories, 649 attribute_filter_expression=attribute_filter_expression, 650 show_attributes=show_attributes, 651 follow_symlinks=follow_symlinks, 652 ) 653 654 for object in objects: 655 if include_url_prefix: 656 if self.is_default_profile(): 657 object.key = str(PurePosixPath("/") / object.key) 658 else: 659 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key) 660 yield object
661
[docs] 662 def open( 663 self, 664 path: str, 665 mode: str = "rb", 666 buffering: int = -1, 667 encoding: Optional[str] = None, 668 disable_read_cache: bool = False, 669 memory_load_limit: int = MEMORY_LOAD_LIMIT, 670 atomic: bool = True, 671 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 672 attributes: Optional[dict[str, str]] = None, 673 prefetch_file: bool = True, 674 ) -> Union[PosixFile, ObjectFile]: 675 """ 676 Returns a file-like object from the specified path. 677 678 :param path: The logical path of the object to read. 679 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported. 680 :param buffering: The buffering mode. Only applies to PosixFile. 681 :param encoding: The encoding to use for text files. 682 :param disable_read_cache: When set to True, disables caching for the file content. 683 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 684 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 685 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 686 :param atomic: When set to True, the file will be written atomically (rename upon close). 687 This parameter is only applicable to PosixFile in write mode. 688 :param check_source_version: Whether to check the source version of cached objects. 689 :param attributes: The attributes to add to the file. This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". 690 691 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 692 """ 693 if self._is_posix_file_storage_provider(): 694 return PosixFile( 695 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes 696 ) 697 else: 698 if atomic is False: 699 logger.warning("Non-atomic writes are not supported for object storage providers.") 700 701 return ObjectFile( 702 self, 703 remote_path=path, 704 mode=mode, 705 encoding=encoding, 706 disable_read_cache=disable_read_cache, 707 memory_load_limit=memory_load_limit, 708 check_source_version=check_source_version, 709 attributes=attributes, 710 prefetch_file=prefetch_file, 711 )
712
[docs] 713 def get_posix_path(self, path: str) -> Optional[str]: 714 """ 715 Returns the physical POSIX filesystem path for POSIX storage providers. 716 717 :param path: The path to resolve (may be a symlink or virtual path). 718 :return: Physical POSIX filesystem path if POSIX storage, None otherwise. 719 """ 720 if not self._is_posix_file_storage_provider(): 721 return None 722 723 if self._metadata_provider: 724 resolved = self._metadata_provider.realpath(path) 725 realpath = resolved.physical_path 726 else: 727 realpath = path 728 729 return cast(PosixFileStorageProvider, self._storage_provider)._prepend_base_path(realpath)
730
[docs] 731 def is_file(self, path: str) -> bool: 732 """ 733 Checks whether the specified path points to a file (rather than a directory or folder). 734 735 :param path: The logical path to check. 736 737 :return: ``True`` if the path points to a file, ``False`` otherwise. 738 """ 739 if self._metadata_provider: 740 resolved = self._metadata_provider.realpath(path) 741 return resolved.exists 742 743 return self._storage_provider.is_file(path)
744
[docs] 745 def commit_metadata(self, prefix: Optional[str] = None) -> None: 746 """ 747 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 748 749 :param prefix: If provided, scans the prefix to find files to commit. 750 """ 751 if self._metadata_provider: 752 with self._metadata_provider_lock or contextlib.nullcontext(): 753 if prefix: 754 # The logical path for each item will be the physical path with 755 # the base physical path removed from the beginning. 756 base_resolved = self._metadata_provider.realpath("") 757 physical_base = base_resolved.physical_path 758 759 prefix_resolved = self._metadata_provider.realpath(prefix) 760 physical_prefix = prefix_resolved.physical_path 761 762 for obj in self._storage_provider.list_objects(physical_prefix): 763 virtual_path = obj.key[len(physical_base) :].lstrip("/") 764 self._metadata_provider.add_file(virtual_path, obj) 765 self._metadata_provider.commit_updates()
766
[docs] 767 def is_empty(self, path: str) -> bool: 768 """ 769 Checks whether the specified path is empty. A path is considered empty if there are no 770 objects whose keys start with the given path as a prefix. 771 772 :param path: The logical path to check. This is typically a prefix representing a directory or folder. 773 774 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 775 """ 776 if self._metadata_provider: 777 objects = self._metadata_provider.list_objects(path) 778 else: 779 objects = self._storage_provider.list_objects(path) 780 781 try: 782 return next(objects) is None 783 except StopIteration: 784 pass 785 786 return True
787
[docs] 788 def sync_from( 789 self, 790 source_client: "StorageClient", 791 source_path: str = "", 792 target_path: str = "", 793 delete_unmatched_files: bool = False, 794 description: str = "Syncing", 795 num_worker_processes: Optional[int] = None, 796 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 797 patterns: Optional[PatternList] = None, 798 preserve_source_attributes: bool = False, 799 follow_symlinks: bool = True, 800 source_files: Optional[List[str]] = None, 801 ignore_hidden: bool = True, 802 ) -> None: 803 """ 804 Syncs files from the source storage client to "path/". 805 806 :param source_client: The source storage client. 807 :param source_path: The logical path to sync from. 808 :param target_path: The logical path to sync to. 809 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 810 :param description: Description of sync process for logging purposes. 811 :param num_worker_processes: The number of worker processes to use. 812 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 813 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 814 Cannot be used together with source_files. 815 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization. 816 When False (default), only file content is copied. When True, custom metadata attributes are also preserved. 817 818 .. warning:: 819 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD 820 request for each object to retrieve attributes, which can significantly impact performance on large-scale 821 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile. 822 823 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is True. 824 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these 825 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns. 826 :param ignore_hidden: Whether to ignore hidden files and directories. Default is True. 827 :raises ValueError: If both source_files and patterns are provided. 828 """ 829 if source_files and patterns: 830 raise ValueError("Cannot specify both 'source_files' and 'patterns'. Please use only one filtering method.") 831 832 pattern_matcher = PatternMatcher(patterns) if patterns else None 833 834 # Disable the replica manager during sync 835 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager: 836 source_client = StorageClient(source_client._config) 837 source_client._replica_manager = None 838 839 m = SyncManager(source_client, source_path, self, target_path) 840 841 m.sync_objects( 842 execution_mode=execution_mode, 843 description=description, 844 num_worker_processes=num_worker_processes, 845 delete_unmatched_files=delete_unmatched_files, 846 pattern_matcher=pattern_matcher, 847 preserve_source_attributes=preserve_source_attributes, 848 follow_symlinks=follow_symlinks, 849 source_files=source_files, 850 ignore_hidden=ignore_hidden, 851 )
852
[docs] 853 def sync_replicas( 854 self, 855 source_path: str, 856 replica_indices: Optional[List[int]] = None, 857 delete_unmatched_files: bool = False, 858 description: str = "Syncing replica", 859 num_worker_processes: Optional[int] = None, 860 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 861 patterns: Optional[PatternList] = None, 862 ignore_hidden: bool = True, 863 ) -> None: 864 """ 865 Sync files from the source storage client to target replicas. 866 867 :param source_path: The logical path to sync from. 868 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0. 869 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 870 :param description: Description of sync process for logging purposes. 871 :param num_worker_processes: The number of worker processes to use. 872 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 873 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 874 :param ignore_hidden: Whether to ignore hidden files and directories (starting with dot). Default is True. 875 """ 876 if not self._replicas: 877 logger.warning( 878 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable " 879 "secondary storage locations for redundancy and performance.", 880 self._config.profile, 881 ) 882 return None 883 884 if replica_indices: 885 try: 886 replicas = [self._replicas[i] for i in replica_indices] 887 except IndexError as e: 888 raise ValueError(f"Replica index out of range: {replica_indices}") from e 889 else: 890 replicas = self._replicas 891 892 # Disable the replica manager during sync 893 if self._replica_manager: 894 source_client = StorageClient(self._config) 895 source_client._replica_manager = None 896 else: 897 source_client = self 898 899 for replica in replicas: 900 replica.sync_from( 901 source_client, 902 source_path, 903 source_path, 904 delete_unmatched_files=delete_unmatched_files, 905 description=f"{description} ({replica.profile})", 906 num_worker_processes=num_worker_processes, 907 execution_mode=execution_mode, 908 patterns=patterns, 909 ignore_hidden=ignore_hidden, 910 )