Source code for multistorageclient.client.single

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import atexit
 17import contextlib
 18import logging
 19import os
 20import threading
 21from collections.abc import Iterator
 22from datetime import datetime, timezone
 23from io import BytesIO
 24from pathlib import PurePosixPath
 25from typing import IO, Any, List, Optional, Union, cast
 26
 27from ..config import StorageClientConfig
 28from ..constants import MEMORY_LOAD_LIMIT
 29from ..file import ObjectFile, PosixFile
 30from ..providers.posix_file import PosixFileStorageProvider
 31from ..replica_manager import ReplicaManager
 32from ..retry import retry
 33from ..sync import SyncManager
 34from ..types import (
 35    AWARE_DATETIME_MIN,
 36    MSC_PROTOCOL,
 37    ExecutionMode,
 38    ObjectMetadata,
 39    PatternList,
 40    Range,
 41    Replica,
 42    ResolvedPathState,
 43    SourceVersionCheckMode,
 44    StorageProvider,
 45    SyncResult,
 46)
 47from ..utils import NullStorageClient, PatternMatcher, join_paths
 48from .types import AbstractStorageClient
 49
 50logger = logging.getLogger(__name__)
 51
 52
[docs] 53class SingleStorageClient(AbstractStorageClient): 54 """ 55 Storage client for single-backend configurations. 56 57 Supports full read and write operations against a single storage provider. 58 """ 59 60 _config: StorageClientConfig 61 _storage_provider: StorageProvider 62 _metadata_provider_lock: Optional[threading.Lock] = None 63 _stop_event: Optional[threading.Event] = None 64 _replica_manager: Optional[ReplicaManager] = None 65 66 def __init__(self, config: StorageClientConfig): 67 """ 68 Initialize the :py:class:`SingleStorageClient` with the given configuration. 69 70 :param config: Storage client configuration with storage_provider set 71 :raises ValueError: If config has storage_provider_profiles (multi-backend) 72 """ 73 self._initialize_providers(config) 74 self._initialize_replicas(config.replicas) 75 76 def _initialize_providers(self, config: StorageClientConfig) -> None: 77 if config.storage_provider_profiles: 78 raise ValueError( 79 "SingleStorageClient requires storage_provider, not storage_provider_profiles. " 80 "Use CompositeStorageClient for multi-backend configurations." 81 ) 82 83 if config.storage_provider is None: 84 raise ValueError("SingleStorageClient requires storage_provider to be set.") 85 86 self._config = config 87 self._credentials_provider = self._config.credentials_provider 88 self._storage_provider = cast(StorageProvider, self._config.storage_provider) 89 self._metadata_provider = self._config.metadata_provider 90 self._cache_config = self._config.cache_config 91 self._retry_config = self._config.retry_config 92 self._cache_manager = self._config.cache_manager 93 self._autocommit_config = self._config.autocommit_config 94 95 if self._autocommit_config: 96 if self._metadata_provider: 97 logger.debug("Creating auto-commiter thread") 98 99 if self._autocommit_config.interval_minutes: 100 self._stop_event = threading.Event() 101 self._commit_thread = threading.Thread( 102 target=self._committer_thread, 103 daemon=True, 104 args=(self._autocommit_config.interval_minutes, self._stop_event), 105 ) 106 self._commit_thread.start() 107 108 if self._autocommit_config.at_exit: 109 atexit.register(self._commit_on_exit) 110 111 self._metadata_provider_lock = threading.Lock() 112 else: 113 logger.debug("No metadata provider configured, auto-commit will not be enabled") 114 115 def _initialize_replicas(self, replicas: list[Replica]) -> None: 116 """Initialize replica StorageClient instances (facade).""" 117 # Import here to avoid circular dependency 118 from .client import StorageClient as StorageClientFacade 119 120 # Sort replicas by read_priority, the first one is the primary replica. 121 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority) 122 123 replica_clients = [] 124 for replica in sorted_replicas: 125 if self._config._config_dict is None: 126 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config") 127 replica_config = StorageClientConfig.from_dict( 128 config_dict=self._config._config_dict, profile=replica.replica_profile 129 ) 130 131 storage_client = StorageClientFacade(config=replica_config) 132 replica_clients.append(storage_client) 133 134 self._replicas = replica_clients 135 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None 136 137 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event): 138 if not stop_event: 139 raise RuntimeError("Stop event not set") 140 141 while not stop_event.is_set(): 142 # Wait with the ability to exit early 143 if stop_event.wait(timeout=commit_interval_minutes * 60): 144 break 145 logger.debug("Auto-committing to metadata provider") 146 self.commit_metadata() 147 148 def _commit_on_exit(self): 149 logger.debug("Shutting down, committing metadata one last time...") 150 self.commit_metadata() 151 152 def _get_source_version(self, path: str) -> Optional[str]: 153 """ 154 Get etag from metadata provider or storage provider. 155 """ 156 if self._metadata_provider: 157 metadata = self._metadata_provider.get_object_metadata(path) 158 else: 159 metadata = self._storage_provider.get_object_metadata(path) 160 return metadata.etag 161 162 def _is_cache_enabled(self) -> bool: 163 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider() 164 return enabled 165 166 def _is_posix_file_storage_provider(self) -> bool: 167 """ 168 :return: ``True`` if the storage client is using a POSIX file storage provider, ``False`` otherwise. 169 """ 170 return isinstance(self._storage_provider, PosixFileStorageProvider) 171 172 def _is_rust_client_enabled(self) -> bool: 173 """ 174 :return: ``True`` if the storage provider is using the Rust client, ``False`` otherwise. 175 """ 176 return getattr(self._storage_provider, "_rust_client", None) is not None 177 178 def _read_from_replica_or_primary(self, path: str) -> bytes: 179 """ 180 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files. 181 """ 182 if self._replica_manager is None: 183 raise RuntimeError("Replica manager is not initialized") 184 file_obj = BytesIO() 185 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider) 186 return file_obj.getvalue() 187 188 def __del__(self): 189 if self._stop_event: 190 self._stop_event.set() 191 if self._commit_thread.is_alive(): 192 self._commit_thread.join(timeout=5.0) 193 194 def __getstate__(self) -> dict[str, Any]: 195 state = self.__dict__.copy() 196 del state["_credentials_provider"] 197 del state["_storage_provider"] 198 del state["_metadata_provider"] 199 del state["_cache_manager"] 200 201 if "_metadata_provider_lock" in state: 202 del state["_metadata_provider_lock"] 203 204 if "_replicas" in state: 205 del state["_replicas"] 206 207 # Replica manager could be disabled if it's set to None in the state. 208 if "_replica_manager" in state: 209 if state["_replica_manager"] is not None: 210 del state["_replica_manager"] 211 212 return state 213 214 def __setstate__(self, state: dict[str, Any]) -> None: 215 config = state["_config"] 216 self._initialize_providers(config) 217 218 # Replica manager could be disabled if it's set to None in the state. 219 if "_replica_manager" in state and state["_replica_manager"] is None: 220 self._replica_manager = None 221 else: 222 self._initialize_replicas(config.replicas) 223 224 if self._metadata_provider: 225 self._metadata_provider_lock = threading.Lock() 226 227 @property 228 def profile(self) -> str: 229 """ 230 :return: The profile name of the storage client. 231 """ 232 return self._config.profile 233
[docs] 234 def is_default_profile(self) -> bool: 235 """ 236 :return: ``True`` if the storage client is using the reserved POSIX profile, ``False`` otherwise. 237 """ 238 return self._config.profile == "__filesystem__"
239 240 @property 241 def replicas(self) -> List[AbstractStorageClient]: 242 """ 243 :return: List of replica storage clients, sorted by read priority. 244 """ 245 return self._replicas 246 247 @retry 248 def read( 249 self, 250 path: str, 251 byte_range: Optional[Range] = None, 252 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 253 ) -> bytes: 254 """ 255 Read bytes from a file at the specified logical path. 256 257 :param path: The logical path of the object to read. 258 :param byte_range: Optional byte range to read (offset and length). 259 :param check_source_version: Whether to check the source version of cached objects. 260 :return: The content of the object as bytes. 261 :raises FileNotFoundError: If the file at the specified path does not exist. 262 """ 263 if self._metadata_provider: 264 resolved = self._metadata_provider.realpath(path) 265 if not resolved.exists: 266 raise FileNotFoundError(f"The file at path '{path}' was not found.") 267 path = resolved.physical_path 268 269 # Handle caching logic 270 if self._is_cache_enabled() and self._cache_manager: 271 if byte_range: 272 # Range request with cache 273 try: 274 # Fetch metadata for source version checking (if needed) 275 metadata = None 276 source_version = None 277 if check_source_version == SourceVersionCheckMode.ENABLE: 278 metadata = self._storage_provider.get_object_metadata(path) 279 source_version = metadata.etag 280 elif check_source_version == SourceVersionCheckMode.INHERIT: 281 if self._cache_manager.check_source_version(): 282 metadata = self._storage_provider.get_object_metadata(path) 283 source_version = metadata.etag 284 285 # Optimization: For full-file reads (offset=0, size >= file_size), cache whole file instead of chunking 286 # This avoids creating many small chunks when the user requests the entire file. 287 # Only apply this optimization when metadata is already available (i.e., when version checking is enabled), 288 # to respect the user's choice to disable version checking and avoid extra HEAD requests. 289 if byte_range.offset == 0 and metadata and byte_range.size >= metadata.content_length: 290 full_file_data = self._storage_provider.get_object(path) 291 self._cache_manager.set(path, full_file_data, source_version) 292 return full_file_data[: metadata.content_length] 293 294 # Use chunk-based caching for partial reads or when optimization doesn't apply 295 data = self._cache_manager.read( 296 key=path, 297 source_version=source_version, 298 byte_range=byte_range, 299 storage_provider=self._storage_provider, 300 ) 301 if data is not None: 302 return data 303 # Fallback (should not normally happen) 304 return self._storage_provider.get_object(path, byte_range=byte_range) 305 except (FileNotFoundError, Exception): 306 # Fall back to direct read if metadata fetching fails 307 return self._storage_provider.get_object(path, byte_range=byte_range) 308 else: 309 # Full file read with cache 310 # Only fetch source version if check_source_version is enabled 311 source_version = None 312 if check_source_version == SourceVersionCheckMode.ENABLE: 313 source_version = self._get_source_version(path) 314 elif check_source_version == SourceVersionCheckMode.INHERIT: 315 if self._cache_manager.check_source_version(): 316 source_version = self._get_source_version(path) 317 318 data = self._cache_manager.read(path, source_version) 319 if data is None: 320 if self._replica_manager: 321 data = self._read_from_replica_or_primary(path) 322 else: 323 data = self._storage_provider.get_object(path) 324 self._cache_manager.set(path, data, source_version) 325 return data 326 elif self._replica_manager: 327 # No cache, but replicas available 328 return self._read_from_replica_or_primary(path) 329 else: 330 # No cache, no replicas - direct storage provider read 331 return self._storage_provider.get_object(path, byte_range=byte_range) 332
[docs] 333 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 334 """ 335 Get metadata for a file at the specified path. 336 337 :param path: The logical path of the object. 338 :param strict: When ``True``, only return committed metadata. When ``False``, include pending changes. 339 :return: ObjectMetadata containing file information (size, last modified, etc.). 340 :raises FileNotFoundError: If the file at the specified path does not exist. 341 """ 342 if not path or path == ".": # for the empty path provided by the user 343 if self._is_posix_file_storage_provider(): 344 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc) 345 else: 346 last_modified = AWARE_DATETIME_MIN 347 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified) 348 349 if not self._metadata_provider: 350 return self._storage_provider.get_object_metadata(path, strict=strict) 351 352 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory. 353 # TODO: Consider passing strict argument to the metadata provider. 354 try: 355 return self._metadata_provider.get_object_metadata(path) 356 except FileNotFoundError: 357 # Try listing from the parent to determine if path is a valid directory 358 parent = os.path.dirname(path.rstrip("/")) + "/" 359 parent = "" if parent == "/" else parent 360 target = path.rstrip("/") + "/" 361 362 try: 363 entries = self._metadata_provider.list_objects(parent, include_directories=True) 364 for entry in entries: 365 if entry.key == target and entry.type == "directory": 366 return entry 367 except Exception: 368 pass 369 raise # Raise original FileNotFoundError
370 371 @retry 372 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 373 """ 374 Download a remote file to a local path or file-like object. 375 376 :param remote_path: The logical path of the remote file to download. 377 :param local_path: The local file path or file-like object to write to. 378 :raises FileNotFoundError: If the remote file does not exist. 379 """ 380 if self._metadata_provider: 381 resolved = self._metadata_provider.realpath(remote_path) 382 if not resolved.exists: 383 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.") 384 385 metadata = self._metadata_provider.get_object_metadata(remote_path) 386 self._storage_provider.download_file(resolved.physical_path, local_path, metadata) 387 elif self._replica_manager: 388 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider) 389 else: 390 self._storage_provider.download_file(remote_path, local_path) 391 392 @retry 393 def upload_file( 394 self, remote_path: str, local_path: Union[str, IO], attributes: Optional[dict[str, str]] = None 395 ) -> None: 396 """ 397 Uploads a file from the local file system to the storage provider. 398 399 :param remote_path: The path where the object will be stored. 400 :param local_path: The source file to upload. This can either be a string representing the local 401 file path, or a file-like object (e.g., an open file handle). 402 :param attributes: The attributes to add to the file if a new file is created. 403 """ 404 virtual_path = remote_path 405 if self._metadata_provider: 406 resolved = self._metadata_provider.realpath(remote_path) 407 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED): 408 # File exists or has been deleted 409 if not self._metadata_provider.allow_overwrites(): 410 raise FileExistsError( 411 f"The file at path '{virtual_path}' already exists; " 412 f"overwriting is not allowed when using a metadata provider." 413 ) 414 # Generate path for overwrite (future: may return different path for versioning) 415 remote_path = self._metadata_provider.generate_physical_path( 416 remote_path, for_overwrite=True 417 ).physical_path 418 else: 419 # New file - generate path 420 remote_path = self._metadata_provider.generate_physical_path( 421 remote_path, for_overwrite=False 422 ).physical_path 423 424 # if metdata provider is present, we only write attributes to the metadata provider 425 self._storage_provider.upload_file(remote_path, local_path, attributes=None) 426 427 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 428 obj_metadata = self._storage_provider.get_object_metadata(remote_path) 429 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 430 with self._metadata_provider_lock or contextlib.nullcontext(): 431 self._metadata_provider.add_file(virtual_path, obj_metadata) 432 else: 433 self._storage_provider.upload_file(remote_path, local_path, attributes) 434 435 @retry 436 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None: 437 """ 438 Write bytes to a file at the specified path. 439 440 :param path: The logical path where the object will be written. 441 :param body: The content to write as bytes. 442 :param attributes: Optional attributes to add to the file. 443 """ 444 virtual_path = path 445 if self._metadata_provider: 446 resolved = self._metadata_provider.realpath(path) 447 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED): 448 # File exists or has been deleted 449 if not self._metadata_provider.allow_overwrites(): 450 raise FileExistsError( 451 f"The file at path '{virtual_path}' already exists; " 452 f"overwriting is not allowed when using a metadata provider." 453 ) 454 # Generate path for overwrite (future: may return different path for versioning) 455 path = self._metadata_provider.generate_physical_path(path, for_overwrite=True).physical_path 456 else: 457 # New file - generate path 458 path = self._metadata_provider.generate_physical_path(path, for_overwrite=False).physical_path 459 460 # if metadata provider is present, we only write attributes to the metadata provider 461 self._storage_provider.put_object(path, body, attributes=None) 462 463 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 464 obj_metadata = self._storage_provider.get_object_metadata(path) 465 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 466 with self._metadata_provider_lock or contextlib.nullcontext(): 467 self._metadata_provider.add_file(virtual_path, obj_metadata) 468 else: 469 self._storage_provider.put_object(path, body, attributes=attributes) 470
[docs] 471 def copy(self, src_path: str, dest_path: str) -> None: 472 """ 473 Copy a file from source path to destination path. 474 475 :param src_path: The logical path of the source object. 476 :param dest_path: The logical path where the object will be copied to. 477 :raises FileNotFoundError: If the source file does not exist. 478 """ 479 virtual_dest_path = dest_path 480 if self._metadata_provider: 481 # Source: must exist 482 src_resolved = self._metadata_provider.realpath(src_path) 483 if not src_resolved.exists: 484 raise FileNotFoundError(f"The file at path '{src_path}' was not found.") 485 src_path = src_resolved.physical_path 486 487 # Destination: check for overwrites 488 dest_resolved = self._metadata_provider.realpath(dest_path) 489 if dest_resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED): 490 # Destination exists or has been deleted 491 if not self._metadata_provider.allow_overwrites(): 492 raise FileExistsError( 493 f"The file at path '{virtual_dest_path}' already exists; " 494 f"overwriting is not allowed when using a metadata provider." 495 ) 496 # Generate path for overwrite 497 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=True).physical_path 498 else: 499 # New file - generate path 500 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=False).physical_path 501 502 self._storage_provider.copy_object(src_path, dest_path) 503 504 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 505 obj_metadata = self._storage_provider.get_object_metadata(dest_path) 506 with self._metadata_provider_lock or contextlib.nullcontext(): 507 self._metadata_provider.add_file(virtual_dest_path, obj_metadata) 508 else: 509 self._storage_provider.copy_object(src_path, dest_path)
510
[docs] 511 def delete(self, path: str, recursive: bool = False) -> None: 512 """ 513 Deletes an object at the specified path. 514 515 :param path: The logical path of the object or directory to delete. 516 :param recursive: Whether to delete objects in the path recursively. 517 """ 518 obj_metadata = self.info(path) 519 is_dir = obj_metadata and obj_metadata.type == "directory" 520 is_file = obj_metadata and obj_metadata.type == "file" 521 if recursive and is_dir: 522 self.sync_from( 523 cast(AbstractStorageClient, NullStorageClient()), 524 path, 525 path, 526 delete_unmatched_files=True, 527 num_worker_processes=1, 528 description="Deleting", 529 ) 530 # If this is a posix storage provider, we need to also delete remaining directory stubs. 531 # TODO: Nofity metadata provider for the changes. 532 if self._is_posix_file_storage_provider(): 533 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider) 534 posix_storage_provider.rmtree(path) 535 return 536 else: 537 # 1) If path is a file: delete the file 538 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag 539 if is_file: 540 virtual_path = path 541 if self._metadata_provider: 542 resolved = self._metadata_provider.realpath(path) 543 if not resolved.exists: 544 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.") 545 546 # Check if soft-delete is enabled 547 if not self._metadata_provider.should_use_soft_delete(): 548 # Hard delete: remove both physical file and metadata 549 self._storage_provider.delete_object(resolved.physical_path) 550 551 with self._metadata_provider_lock or contextlib.nullcontext(): 552 self._metadata_provider.remove_file(virtual_path) 553 else: 554 self._storage_provider.delete_object(path) 555 556 # Delete the cached file if it exists 557 if self._is_cache_enabled(): 558 if self._cache_manager is None: 559 raise RuntimeError("Cache manager is not initialized") 560 self._cache_manager.delete(virtual_path) 561 562 # Delete from replicas if replica manager exists 563 if self._replica_manager: 564 self._replica_manager.delete_from_replicas(virtual_path) 565 elif is_dir: 566 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.") 567 else: 568 raise FileNotFoundError(f"The file at '{path}' was not found.")
569
[docs] 570 def glob( 571 self, 572 pattern: str, 573 include_url_prefix: bool = False, 574 attribute_filter_expression: Optional[str] = None, 575 ) -> list[str]: 576 """ 577 Matches and retrieves a list of object keys in the storage provider that match the specified pattern. 578 579 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``). 580 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 581 :param attribute_filter_expression: The attribute filter expression to apply to the result. 582 :return: A list of object paths that match the specified pattern. 583 """ 584 if self._metadata_provider: 585 results = self._metadata_provider.glob(pattern, attribute_filter_expression) 586 else: 587 results = self._storage_provider.glob(pattern, attribute_filter_expression) 588 589 if include_url_prefix: 590 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 591 592 return results
593
[docs] 594 def list( 595 self, 596 prefix: str = "", 597 path: str = "", 598 start_after: Optional[str] = None, 599 end_at: Optional[str] = None, 600 include_directories: bool = False, 601 include_url_prefix: bool = False, 602 attribute_filter_expression: Optional[str] = None, 603 show_attributes: bool = False, 604 follow_symlinks: bool = True, 605 patterns: Optional[PatternList] = None, 606 ) -> Iterator[ObjectMetadata]: 607 """ 608 List objects in the storage provider under the specified path. 609 610 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is 611 deprecated and will be removed in a future version. 612 613 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under. 614 :param path: The directory or file path to list objects under. This should be a 615 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 616 Cannot be used together with ``prefix``. 617 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 618 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 619 :param include_directories: Whether to include directories in the result. when ``True``, directories are returned alongside objects. 620 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 621 :param attribute_filter_expression: The attribute filter expression to apply to the result. 622 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to ``True``. 623 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing. 624 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 625 :return: An iterator over ObjectMetadata for matching objects. 626 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty). 627 """ 628 # Parameter validation - either path or prefix, not both 629 if path and prefix: 630 raise ValueError( 631 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 632 f"Please use only the 'path' parameter for new code. " 633 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 634 ) 635 elif prefix: 636 logger.debug( 637 f"The 'prefix' parameter is deprecated and will be removed in a future version. " 638 f"Please use the 'path' parameter instead. " 639 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 640 ) 641 642 # Apply patterns to the objects 643 pattern_matcher = PatternMatcher(patterns) if patterns else None 644 645 # Use path if provided, otherwise fall back to prefix 646 effective_path = path if path else prefix 647 648 if effective_path: 649 if self.is_file(effective_path): 650 if pattern_matcher and not pattern_matcher.should_include_file(effective_path): 651 return iter([]) 652 653 try: 654 object_metadata = self.info(effective_path) 655 if include_url_prefix: 656 if self.is_default_profile(): 657 object_metadata.key = str(PurePosixPath("/") / object_metadata.key) 658 else: 659 object_metadata.key = join_paths( 660 f"{MSC_PROTOCOL}{self._config.profile}", object_metadata.key 661 ) 662 yield object_metadata # short circuit if the path is a file 663 return 664 except FileNotFoundError: 665 pass 666 else: 667 effective_path = effective_path.rstrip("/") + "/" 668 669 if self._metadata_provider: 670 objects = self._metadata_provider.list_objects( 671 effective_path, 672 start_after=start_after, 673 end_at=end_at, 674 include_directories=include_directories, 675 attribute_filter_expression=attribute_filter_expression, 676 show_attributes=show_attributes, 677 ) 678 else: 679 objects = self._storage_provider.list_objects( 680 effective_path, 681 start_after=start_after, 682 end_at=end_at, 683 include_directories=include_directories, 684 attribute_filter_expression=attribute_filter_expression, 685 show_attributes=show_attributes, 686 follow_symlinks=follow_symlinks, 687 ) 688 689 for object in objects: 690 # Skip objects that do not match the patterns 691 if pattern_matcher and not pattern_matcher.should_include_file(object.key): 692 continue 693 694 if include_url_prefix: 695 if self.is_default_profile(): 696 object.key = str(PurePosixPath("/") / object.key) 697 else: 698 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key) 699 700 yield object
701
[docs] 702 def open( 703 self, 704 path: str, 705 mode: str = "rb", 706 buffering: int = -1, 707 encoding: Optional[str] = None, 708 disable_read_cache: bool = False, 709 memory_load_limit: int = MEMORY_LOAD_LIMIT, 710 atomic: bool = True, 711 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 712 attributes: Optional[dict[str, str]] = None, 713 prefetch_file: bool = True, 714 ) -> Union[PosixFile, ObjectFile]: 715 """ 716 Open a file for reading or writing. 717 718 :param path: The logical path of the object to open. 719 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab". 720 :param buffering: The buffering mode. Only applies to PosixFile. 721 :param encoding: The encoding to use for text files. 722 :param disable_read_cache: When set to ``True``, disables caching for file content. 723 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 724 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 725 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB. 726 :param atomic: When set to ``True``, file will be written atomically (rename upon close). 727 This parameter is only applicable to PosixFile in write mode. 728 :param check_source_version: Whether to check the source version of cached objects. 729 :param attributes: Attributes to add to the file. 730 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None. 731 :param prefetch_file: Whether to prefetch the file content. 732 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True. 733 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 734 :raises FileNotFoundError: If the file does not exist (read mode). 735 """ 736 if self._is_posix_file_storage_provider(): 737 return PosixFile( 738 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes 739 ) 740 else: 741 if atomic is False: 742 logger.warning("Non-atomic writes are not supported for object storage providers.") 743 744 return ObjectFile( 745 self, 746 remote_path=path, 747 mode=mode, 748 encoding=encoding, 749 disable_read_cache=disable_read_cache, 750 memory_load_limit=memory_load_limit, 751 check_source_version=check_source_version, 752 attributes=attributes, 753 prefetch_file=prefetch_file, 754 )
755
[docs] 756 def get_posix_path(self, path: str) -> Optional[str]: 757 """ 758 Returns the physical POSIX filesystem path for POSIX storage providers. 759 760 :param path: The path to resolve (may be a symlink or virtual path). 761 :return: Physical POSIX filesystem path if POSIX storage, None otherwise. 762 """ 763 if not self._is_posix_file_storage_provider(): 764 return None 765 766 if self._metadata_provider: 767 resolved = self._metadata_provider.realpath(path) 768 realpath = resolved.physical_path 769 else: 770 realpath = path 771 772 return cast(PosixFileStorageProvider, self._storage_provider)._prepend_base_path(realpath)
773
[docs] 774 def is_file(self, path: str) -> bool: 775 """ 776 Checks whether the specified path points to a file (rather than a folder or directory). 777 778 :param path: The logical path to check. 779 :return: ``True`` if the key points to a file, ``False`` otherwise. 780 """ 781 if self._metadata_provider: 782 resolved = self._metadata_provider.realpath(path) 783 return resolved.exists 784 785 return self._storage_provider.is_file(path)
786
[docs] 787 def commit_metadata(self, prefix: Optional[str] = None) -> None: 788 """ 789 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 790 791 :param prefix: If provided, scans the prefix to find files to commit. 792 """ 793 if self._metadata_provider: 794 with self._metadata_provider_lock or contextlib.nullcontext(): 795 if prefix: 796 base_resolved = self._metadata_provider.generate_physical_path("") 797 physical_base = base_resolved.physical_path 798 799 prefix_resolved = self._metadata_provider.generate_physical_path(prefix) 800 physical_prefix = prefix_resolved.physical_path 801 802 for obj in self._storage_provider.list_objects(physical_prefix): 803 virtual_path = obj.key[len(physical_base) :].lstrip("/") 804 self._metadata_provider.add_file(virtual_path, obj) 805 self._metadata_provider.commit_updates()
806
[docs] 807 def is_empty(self, path: str) -> bool: 808 """ 809 Check whether the specified path is empty. A path is considered empty if there are no 810 objects whose keys start with the given path as a prefix. 811 812 :param path: The logical path to check (typically a directory or folder prefix). 813 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 814 """ 815 if self._metadata_provider: 816 objects = self._metadata_provider.list_objects(path) 817 else: 818 objects = self._storage_provider.list_objects(path) 819 820 try: 821 return next(objects) is None 822 except StopIteration: 823 pass 824 825 return True
826
[docs] 827 def sync_from( 828 self, 829 source_client: AbstractStorageClient, 830 source_path: str = "", 831 target_path: str = "", 832 delete_unmatched_files: bool = False, 833 description: str = "Syncing", 834 num_worker_processes: Optional[int] = None, 835 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 836 patterns: Optional[PatternList] = None, 837 preserve_source_attributes: bool = False, 838 follow_symlinks: bool = True, 839 source_files: Optional[List[str]] = None, 840 ignore_hidden: bool = True, 841 commit_metadata: bool = True, 842 ) -> SyncResult: 843 """ 844 Syncs files from the source storage client to "path/". 845 846 :param source_client: The source storage client. 847 :param source_path: The logical path to sync from. 848 :param target_path: The logical path to sync to. 849 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 850 :param description: Description of sync process for logging purposes. 851 :param num_worker_processes: The number of worker processes to use. 852 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 853 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 854 Cannot be used together with source_files. 855 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization. 856 When ``False`` (default), only file content is copied. When ``True``, custom metadata attributes are also preserved. 857 858 .. warning:: 859 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD 860 request for each object to retrieve attributes, which can significantly impact performance on large-scale 861 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile. 862 863 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is ``True``. 864 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these 865 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns. 866 :param ignore_hidden: Whether to ignore hidden files and directories. Default is ``True``. 867 :param commit_metadata: When ``True`` (default), calls :py:meth:`StorageClient.commit_metadata` after sync completes. 868 Set to ``False`` to skip the commit, allowing batching of multiple sync operations before committing manually. 869 :raises ValueError: If both source_files and patterns are provided. 870 :raises RuntimeError: If errors occur during sync operations. The sync will stop on first error (fail-fast). 871 """ 872 if source_files and patterns: 873 raise ValueError("Cannot specify both 'source_files' and 'patterns'. Please use only one filtering method.") 874 875 pattern_matcher = PatternMatcher(patterns) if patterns else None 876 877 # Disable the replica manager during sync 878 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager: 879 # Import here to avoid circular dependency 880 from .client import StorageClient as StorageClientFacade 881 882 source_client = StorageClientFacade(source_client._config) 883 source_client._replica_manager = None 884 885 m = SyncManager(source_client, source_path, self, target_path) 886 887 return m.sync_objects( 888 execution_mode=execution_mode, 889 description=description, 890 num_worker_processes=num_worker_processes, 891 delete_unmatched_files=delete_unmatched_files, 892 pattern_matcher=pattern_matcher, 893 preserve_source_attributes=preserve_source_attributes, 894 follow_symlinks=follow_symlinks, 895 source_files=source_files, 896 ignore_hidden=ignore_hidden, 897 commit_metadata=commit_metadata, 898 )
899
[docs] 900 def sync_replicas( 901 self, 902 source_path: str, 903 replica_indices: Optional[List[int]] = None, 904 delete_unmatched_files: bool = False, 905 description: str = "Syncing replica", 906 num_worker_processes: Optional[int] = None, 907 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 908 patterns: Optional[PatternList] = None, 909 ignore_hidden: bool = True, 910 ) -> None: 911 """ 912 Sync files from this client to its replica storage clients. 913 914 :param source_path: The logical path to sync from. 915 :param replica_indices: Specific replica indices to sync to (0-indexed). If None, syncs to all replicas. 916 :param delete_unmatched_files: When set to ``True``, delete files in replicas that don't exist in source. 917 :param description: Description of sync process for logging purposes. 918 :param num_worker_processes: Number of worker processes for parallel sync. 919 :param execution_mode: Execution mode (LOCAL or REMOTE). 920 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 921 :param ignore_hidden: When set to ``True``, ignore hidden files (starting with '.'). Defaults to ``True``. 922 """ 923 if not self._replicas: 924 logger.warning( 925 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable " 926 "secondary storage locations for redundancy and performance.", 927 self._config.profile, 928 ) 929 return None 930 931 if replica_indices: 932 try: 933 replicas = [self._replicas[i] for i in replica_indices] 934 except IndexError as e: 935 raise ValueError(f"Replica index out of range: {replica_indices}") from e 936 else: 937 replicas = self._replicas 938 939 # Disable the replica manager during sync 940 if self._replica_manager: 941 # Import here to avoid circular dependency 942 from .client import StorageClient as StorageClientFacade 943 944 source_client = StorageClientFacade(self._config) 945 source_client._replica_manager = None 946 else: 947 source_client = self 948 949 for replica in replicas: 950 replica.sync_from( 951 source_client, 952 source_path, 953 source_path, 954 delete_unmatched_files=delete_unmatched_files, 955 description=f"{description} ({replica.profile})", 956 num_worker_processes=num_worker_processes, 957 execution_mode=execution_mode, 958 patterns=patterns, 959 ignore_hidden=ignore_hidden, 960 )