Source code for multistorageclient.client.single

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import atexit
 17import contextlib
 18import logging
 19import os
 20import threading
 21from collections.abc import Iterator
 22from datetime import datetime, timezone
 23from io import BytesIO
 24from pathlib import PurePosixPath
 25from typing import IO, Any, List, Optional, Union, cast
 26
 27from ..config import StorageClientConfig
 28from ..constants import MEMORY_LOAD_LIMIT
 29from ..file import ObjectFile, PosixFile
 30from ..providers.posix_file import PosixFileStorageProvider
 31from ..replica_manager import ReplicaManager
 32from ..retry import retry
 33from ..sync import SyncManager
 34from ..types import (
 35    AWARE_DATETIME_MIN,
 36    MSC_PROTOCOL,
 37    ExecutionMode,
 38    ObjectMetadata,
 39    PatternList,
 40    Range,
 41    Replica,
 42    ResolvedPathState,
 43    SourceVersionCheckMode,
 44    StorageProvider,
 45)
 46from ..utils import NullStorageClient, PatternMatcher, join_paths
 47from .types import AbstractStorageClient
 48
 49logger = logging.getLogger(__name__)
 50
 51
[docs] 52class SingleStorageClient(AbstractStorageClient): 53 """ 54 Storage client for single-backend configurations. 55 56 Supports full read and write operations against a single storage provider. 57 """ 58 59 _config: StorageClientConfig 60 _storage_provider: StorageProvider 61 _metadata_provider_lock: Optional[threading.Lock] = None 62 _stop_event: Optional[threading.Event] = None 63 _replica_manager: Optional[ReplicaManager] = None 64 65 def __init__(self, config: StorageClientConfig): 66 """ 67 Initialize the :py:class:`SingleStorageClient` with the given configuration. 68 69 :param config: Storage client configuration with storage_provider set 70 :raises ValueError: If config has storage_provider_profiles (multi-backend) 71 """ 72 self._initialize_providers(config) 73 self._initialize_replicas(config.replicas) 74 75 def _initialize_providers(self, config: StorageClientConfig) -> None: 76 if config.storage_provider_profiles: 77 raise ValueError( 78 "SingleStorageClient requires storage_provider, not storage_provider_profiles. " 79 "Use CompositeStorageClient for multi-backend configurations." 80 ) 81 82 if config.storage_provider is None: 83 raise ValueError("SingleStorageClient requires storage_provider to be set.") 84 85 self._config = config 86 self._credentials_provider = self._config.credentials_provider 87 self._storage_provider = cast(StorageProvider, self._config.storage_provider) 88 self._metadata_provider = self._config.metadata_provider 89 self._cache_config = self._config.cache_config 90 self._retry_config = self._config.retry_config 91 self._cache_manager = self._config.cache_manager 92 self._autocommit_config = self._config.autocommit_config 93 94 if self._autocommit_config: 95 if self._metadata_provider: 96 logger.debug("Creating auto-commiter thread") 97 98 if self._autocommit_config.interval_minutes: 99 self._stop_event = threading.Event() 100 self._commit_thread = threading.Thread( 101 target=self._committer_thread, 102 daemon=True, 103 args=(self._autocommit_config.interval_minutes, self._stop_event), 104 ) 105 self._commit_thread.start() 106 107 if self._autocommit_config.at_exit: 108 atexit.register(self._commit_on_exit) 109 110 self._metadata_provider_lock = threading.Lock() 111 else: 112 logger.debug("No metadata provider configured, auto-commit will not be enabled") 113 114 def _initialize_replicas(self, replicas: list[Replica]) -> None: 115 """Initialize replica StorageClient instances (facade).""" 116 # Import here to avoid circular dependency 117 from .client import StorageClient as StorageClientFacade 118 119 # Sort replicas by read_priority, the first one is the primary replica. 120 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority) 121 122 replica_clients = [] 123 for replica in sorted_replicas: 124 if self._config._config_dict is None: 125 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config") 126 replica_config = StorageClientConfig.from_dict( 127 config_dict=self._config._config_dict, profile=replica.replica_profile 128 ) 129 130 storage_client = StorageClientFacade(config=replica_config) 131 replica_clients.append(storage_client) 132 133 self._replicas = replica_clients 134 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None 135 136 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event): 137 if not stop_event: 138 raise RuntimeError("Stop event not set") 139 140 while not stop_event.is_set(): 141 # Wait with the ability to exit early 142 if stop_event.wait(timeout=commit_interval_minutes * 60): 143 break 144 logger.debug("Auto-committing to metadata provider") 145 self.commit_metadata() 146 147 def _commit_on_exit(self): 148 logger.debug("Shutting down, committing metadata one last time...") 149 self.commit_metadata() 150 151 def _get_source_version(self, path: str) -> Optional[str]: 152 """ 153 Get etag from metadata provider or storage provider. 154 """ 155 if self._metadata_provider: 156 metadata = self._metadata_provider.get_object_metadata(path) 157 else: 158 metadata = self._storage_provider.get_object_metadata(path) 159 return metadata.etag 160 161 def _is_cache_enabled(self) -> bool: 162 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider() 163 return enabled 164 165 def _is_posix_file_storage_provider(self) -> bool: 166 """ 167 :return: ``True`` if the storage client is using a POSIX file storage provider, ``False`` otherwise. 168 """ 169 return isinstance(self._storage_provider, PosixFileStorageProvider) 170 171 def _is_rust_client_enabled(self) -> bool: 172 """ 173 :return: ``True`` if the storage provider is using the Rust client, ``False`` otherwise. 174 """ 175 return hasattr(self._storage_provider, "_rust_client") 176 177 def _read_from_replica_or_primary(self, path: str) -> bytes: 178 """ 179 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files. 180 """ 181 if self._replica_manager is None: 182 raise RuntimeError("Replica manager is not initialized") 183 file_obj = BytesIO() 184 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider) 185 return file_obj.getvalue() 186 187 def __del__(self): 188 if self._stop_event: 189 self._stop_event.set() 190 if self._commit_thread.is_alive(): 191 self._commit_thread.join(timeout=5.0) 192 193 def __getstate__(self) -> dict[str, Any]: 194 state = self.__dict__.copy() 195 del state["_credentials_provider"] 196 del state["_storage_provider"] 197 del state["_metadata_provider"] 198 del state["_cache_manager"] 199 200 if "_metadata_provider_lock" in state: 201 del state["_metadata_provider_lock"] 202 203 if "_replicas" in state: 204 del state["_replicas"] 205 206 # Replica manager could be disabled if it's set to None in the state. 207 if "_replica_manager" in state: 208 if state["_replica_manager"] is not None: 209 del state["_replica_manager"] 210 211 return state 212 213 def __setstate__(self, state: dict[str, Any]) -> None: 214 config = state["_config"] 215 self._initialize_providers(config) 216 217 # Replica manager could be disabled if it's set to None in the state. 218 if "_replica_manager" in state and state["_replica_manager"] is None: 219 self._replica_manager = None 220 else: 221 self._initialize_replicas(config.replicas) 222 223 if self._metadata_provider: 224 self._metadata_provider_lock = threading.Lock() 225 226 @property 227 def profile(self) -> str: 228 """ 229 :return: The profile name of the storage client. 230 """ 231 return self._config.profile 232
[docs] 233 def is_default_profile(self) -> bool: 234 """ 235 :return: ``True`` if the storage client is using the default profile, ``False`` otherwise. 236 """ 237 return self._config.profile == "default"
238 239 @property 240 def replicas(self) -> List[AbstractStorageClient]: 241 """ 242 :return: List of replica storage clients, sorted by read priority. 243 """ 244 return self._replicas 245 246 @retry 247 def read( 248 self, 249 path: str, 250 byte_range: Optional[Range] = None, 251 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 252 ) -> bytes: 253 """ 254 Read bytes from a file at the specified logical path. 255 256 :param path: The logical path of the object to read. 257 :param byte_range: Optional byte range to read (offset and length). 258 :param check_source_version: Whether to check the source version of cached objects. 259 :return: The content of the object as bytes. 260 :raises FileNotFoundError: If the file at the specified path does not exist. 261 """ 262 if self._metadata_provider: 263 resolved = self._metadata_provider.realpath(path) 264 if not resolved.exists: 265 raise FileNotFoundError(f"The file at path '{path}' was not found.") 266 path = resolved.physical_path 267 268 # Handle caching logic 269 if self._is_cache_enabled() and self._cache_manager: 270 if byte_range: 271 # Range request with cache 272 try: 273 if check_source_version == SourceVersionCheckMode.ENABLE: 274 metadata = self._storage_provider.get_object_metadata(path) 275 source_version = metadata.etag 276 elif check_source_version == SourceVersionCheckMode.INHERIT: 277 if self._cache_manager.check_source_version(): 278 metadata = self._storage_provider.get_object_metadata(path) 279 source_version = metadata.etag 280 else: 281 metadata = None 282 source_version = None 283 else: 284 metadata = None 285 source_version = None 286 287 data = self._cache_manager.read( 288 key=path, 289 source_version=source_version, 290 byte_range=byte_range, 291 storage_provider=self._storage_provider, 292 ) 293 if data is not None: 294 return data 295 # Fallback (should not normally happen) 296 return self._storage_provider.get_object(path, byte_range=byte_range) 297 except (FileNotFoundError, Exception): 298 # Fall back to direct read if metadata fetching fails 299 return self._storage_provider.get_object(path, byte_range=byte_range) 300 else: 301 # Full file read with cache 302 source_version = self._get_source_version(path) 303 data = self._cache_manager.read(path, source_version) 304 if data is None: 305 if self._replica_manager: 306 data = self._read_from_replica_or_primary(path) 307 else: 308 data = self._storage_provider.get_object(path) 309 self._cache_manager.set(path, data, source_version) 310 return data 311 elif self._replica_manager: 312 # No cache, but replicas available 313 return self._read_from_replica_or_primary(path) 314 else: 315 # No cache, no replicas - direct storage provider read 316 return self._storage_provider.get_object(path, byte_range=byte_range) 317
[docs] 318 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 319 """ 320 Get metadata for a file at the specified path. 321 322 :param path: The logical path of the object. 323 :param strict: When ``True``, only return committed metadata. When ``False``, include pending changes. 324 :return: ObjectMetadata containing file information (size, last modified, etc.). 325 :raises FileNotFoundError: If the file at the specified path does not exist. 326 """ 327 if not path or path == ".": # for the empty path provided by the user 328 if self._is_posix_file_storage_provider(): 329 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc) 330 else: 331 last_modified = AWARE_DATETIME_MIN 332 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified) 333 334 if not self._metadata_provider: 335 return self._storage_provider.get_object_metadata(path, strict=strict) 336 337 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory. 338 # TODO: Consider passing strict argument to the metadata provider. 339 try: 340 return self._metadata_provider.get_object_metadata(path) 341 except FileNotFoundError: 342 # Try listing from the parent to determine if path is a valid directory 343 parent = os.path.dirname(path.rstrip("/")) + "/" 344 parent = "" if parent == "/" else parent 345 target = path.rstrip("/") + "/" 346 347 try: 348 entries = self._metadata_provider.list_objects(parent, include_directories=True) 349 for entry in entries: 350 if entry.key == target and entry.type == "directory": 351 return entry 352 except Exception: 353 pass 354 raise # Raise original FileNotFoundError
355 356 @retry 357 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 358 """ 359 Download a remote file to a local path or file-like object. 360 361 :param remote_path: The logical path of the remote file to download. 362 :param local_path: The local file path or file-like object to write to. 363 :raises FileNotFoundError: If the remote file does not exist. 364 """ 365 if self._metadata_provider: 366 resolved = self._metadata_provider.realpath(remote_path) 367 if not resolved.exists: 368 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.") 369 370 metadata = self._metadata_provider.get_object_metadata(remote_path) 371 self._storage_provider.download_file(resolved.physical_path, local_path, metadata) 372 elif self._replica_manager: 373 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider) 374 else: 375 self._storage_provider.download_file(remote_path, local_path) 376 377 @retry 378 def upload_file( 379 self, remote_path: str, local_path: Union[str, IO], attributes: Optional[dict[str, str]] = None 380 ) -> None: 381 """ 382 Uploads a file from the local file system to the storage provider. 383 384 :param remote_path: The path where the object will be stored. 385 :param local_path: The source file to upload. This can either be a string representing the local 386 file path, or a file-like object (e.g., an open file handle). 387 :param attributes: The attributes to add to the file if a new file is created. 388 """ 389 virtual_path = remote_path 390 if self._metadata_provider: 391 resolved = self._metadata_provider.realpath(remote_path) 392 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED): 393 # File exists or has been deleted 394 if not self._metadata_provider.allow_overwrites(): 395 raise FileExistsError( 396 f"The file at path '{virtual_path}' already exists; " 397 f"overwriting is not allowed when using a metadata provider." 398 ) 399 # Generate path for overwrite (future: may return different path for versioning) 400 remote_path = self._metadata_provider.generate_physical_path( 401 remote_path, for_overwrite=True 402 ).physical_path 403 else: 404 # New file - generate path 405 remote_path = self._metadata_provider.generate_physical_path( 406 remote_path, for_overwrite=False 407 ).physical_path 408 409 # if metdata provider is present, we only write attributes to the metadata provider 410 self._storage_provider.upload_file(remote_path, local_path, attributes=None) 411 412 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 413 obj_metadata = self._storage_provider.get_object_metadata(remote_path) 414 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 415 with self._metadata_provider_lock or contextlib.nullcontext(): 416 self._metadata_provider.add_file(virtual_path, obj_metadata) 417 else: 418 self._storage_provider.upload_file(remote_path, local_path, attributes) 419 420 @retry 421 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None: 422 """ 423 Write bytes to a file at the specified path. 424 425 :param path: The logical path where the object will be written. 426 :param body: The content to write as bytes. 427 :param attributes: Optional attributes to add to the file. 428 """ 429 virtual_path = path 430 if self._metadata_provider: 431 resolved = self._metadata_provider.realpath(path) 432 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED): 433 # File exists or has been deleted 434 if not self._metadata_provider.allow_overwrites(): 435 raise FileExistsError( 436 f"The file at path '{virtual_path}' already exists; " 437 f"overwriting is not allowed when using a metadata provider." 438 ) 439 # Generate path for overwrite (future: may return different path for versioning) 440 path = self._metadata_provider.generate_physical_path(path, for_overwrite=True).physical_path 441 else: 442 # New file - generate path 443 path = self._metadata_provider.generate_physical_path(path, for_overwrite=False).physical_path 444 445 # if metadata provider is present, we only write attributes to the metadata provider 446 self._storage_provider.put_object(path, body, attributes=None) 447 448 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 449 obj_metadata = self._storage_provider.get_object_metadata(path) 450 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 451 with self._metadata_provider_lock or contextlib.nullcontext(): 452 self._metadata_provider.add_file(virtual_path, obj_metadata) 453 else: 454 self._storage_provider.put_object(path, body, attributes=attributes) 455
[docs] 456 def copy(self, src_path: str, dest_path: str) -> None: 457 """ 458 Copy a file from source path to destination path. 459 460 :param src_path: The logical path of the source object. 461 :param dest_path: The logical path where the object will be copied to. 462 :raises FileNotFoundError: If the source file does not exist. 463 """ 464 virtual_dest_path = dest_path 465 if self._metadata_provider: 466 # Source: must exist 467 src_resolved = self._metadata_provider.realpath(src_path) 468 if not src_resolved.exists: 469 raise FileNotFoundError(f"The file at path '{src_path}' was not found.") 470 src_path = src_resolved.physical_path 471 472 # Destination: check for overwrites 473 dest_resolved = self._metadata_provider.realpath(dest_path) 474 if dest_resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED): 475 # Destination exists or has been deleted 476 if not self._metadata_provider.allow_overwrites(): 477 raise FileExistsError( 478 f"The file at path '{virtual_dest_path}' already exists; " 479 f"overwriting is not allowed when using a metadata provider." 480 ) 481 # Generate path for overwrite 482 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=True).physical_path 483 else: 484 # New file - generate path 485 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=False).physical_path 486 487 self._storage_provider.copy_object(src_path, dest_path) 488 489 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 490 obj_metadata = self._storage_provider.get_object_metadata(dest_path) 491 with self._metadata_provider_lock or contextlib.nullcontext(): 492 self._metadata_provider.add_file(virtual_dest_path, obj_metadata) 493 else: 494 self._storage_provider.copy_object(src_path, dest_path)
495
[docs] 496 def delete(self, path: str, recursive: bool = False) -> None: 497 """ 498 Deletes an object at the specified path. 499 500 :param path: The logical path of the object or directory to delete. 501 :param recursive: Whether to delete objects in the path recursively. 502 """ 503 obj_metadata = self.info(path) 504 is_dir = obj_metadata and obj_metadata.type == "directory" 505 is_file = obj_metadata and obj_metadata.type == "file" 506 if recursive and is_dir: 507 self.sync_from( 508 cast(AbstractStorageClient, NullStorageClient()), 509 path, 510 path, 511 delete_unmatched_files=True, 512 num_worker_processes=1, 513 description="Deleting", 514 ) 515 # If this is a posix storage provider, we need to also delete remaining directory stubs. 516 # TODO: Nofity metadata provider for the changes. 517 if self._is_posix_file_storage_provider(): 518 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider) 519 posix_storage_provider.rmtree(path) 520 return 521 else: 522 # 1) If path is a file: delete the file 523 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag 524 if is_file: 525 virtual_path = path 526 if self._metadata_provider: 527 resolved = self._metadata_provider.realpath(path) 528 if not resolved.exists: 529 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.") 530 531 self._storage_provider.delete_object(resolved.physical_path) 532 533 with self._metadata_provider_lock or contextlib.nullcontext(): 534 self._metadata_provider.remove_file(virtual_path) 535 else: 536 self._storage_provider.delete_object(path) 537 538 # Delete the cached file if it exists 539 if self._is_cache_enabled(): 540 if self._cache_manager is None: 541 raise RuntimeError("Cache manager is not initialized") 542 self._cache_manager.delete(virtual_path) 543 544 # Delete from replicas if replica manager exists 545 if self._replica_manager: 546 self._replica_manager.delete_from_replicas(virtual_path) 547 elif is_dir: 548 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.") 549 else: 550 raise FileNotFoundError(f"The file at '{path}' was not found.")
551
[docs] 552 def glob( 553 self, 554 pattern: str, 555 include_url_prefix: bool = False, 556 attribute_filter_expression: Optional[str] = None, 557 ) -> list[str]: 558 """ 559 Matches and retrieves a list of object keys in the storage provider that match the specified pattern. 560 561 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``). 562 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 563 :param attribute_filter_expression: The attribute filter expression to apply to the result. 564 :return: A list of object paths that match the specified pattern. 565 """ 566 if self._metadata_provider: 567 results = self._metadata_provider.glob(pattern, attribute_filter_expression) 568 else: 569 results = self._storage_provider.glob(pattern, attribute_filter_expression) 570 571 if include_url_prefix: 572 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 573 574 return results
575
[docs] 576 def list( 577 self, 578 prefix: str = "", 579 path: str = "", 580 start_after: Optional[str] = None, 581 end_at: Optional[str] = None, 582 include_directories: bool = False, 583 include_url_prefix: bool = False, 584 attribute_filter_expression: Optional[str] = None, 585 show_attributes: bool = False, 586 follow_symlinks: bool = True, 587 ) -> Iterator[ObjectMetadata]: 588 """ 589 List objects in the storage provider under the specified path. 590 591 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is 592 deprecated and will be removed in a future version. 593 594 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under. 595 :param path: The directory or file path to list objects under. This should be a 596 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 597 Cannot be used together with ``prefix``. 598 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 599 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 600 :param include_directories: Whether to include directories in the result. when ``True``, directories are returned alongside objects. 601 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 602 :param attribute_filter_expression: The attribute filter expression to apply to the result. 603 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to ``True``. 604 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing. 605 :return: An iterator over ObjectMetadata for matching objects. 606 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty). 607 """ 608 # Parameter validation - either path or prefix, not both 609 if path and prefix: 610 raise ValueError( 611 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 612 f"Please use only the 'path' parameter for new code. " 613 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 614 ) 615 elif prefix: 616 logger.debug( 617 f"The 'prefix' parameter is deprecated and will be removed in a future version. " 618 f"Please use the 'path' parameter instead. " 619 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 620 ) 621 622 # Use path if provided, otherwise fall back to prefix 623 effective_path = path if path else prefix 624 625 if effective_path: 626 if self.is_file(effective_path): 627 try: 628 object_metadata = self.info(effective_path) 629 if include_url_prefix: 630 if self.is_default_profile(): 631 object_metadata.key = str(PurePosixPath("/") / object_metadata.key) 632 else: 633 object_metadata.key = join_paths( 634 f"{MSC_PROTOCOL}{self._config.profile}", object_metadata.key 635 ) 636 yield object_metadata # short circuit if the path is a file 637 return 638 except FileNotFoundError: 639 pass 640 else: 641 effective_path = effective_path.rstrip("/") + "/" 642 643 if self._metadata_provider: 644 objects = self._metadata_provider.list_objects( 645 effective_path, 646 start_after=start_after, 647 end_at=end_at, 648 include_directories=include_directories, 649 attribute_filter_expression=attribute_filter_expression, 650 show_attributes=show_attributes, 651 ) 652 else: 653 objects = self._storage_provider.list_objects( 654 effective_path, 655 start_after=start_after, 656 end_at=end_at, 657 include_directories=include_directories, 658 attribute_filter_expression=attribute_filter_expression, 659 show_attributes=show_attributes, 660 follow_symlinks=follow_symlinks, 661 ) 662 663 for object in objects: 664 if include_url_prefix: 665 if self.is_default_profile(): 666 object.key = str(PurePosixPath("/") / object.key) 667 else: 668 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key) 669 yield object
670
[docs] 671 def open( 672 self, 673 path: str, 674 mode: str = "rb", 675 buffering: int = -1, 676 encoding: Optional[str] = None, 677 disable_read_cache: bool = False, 678 memory_load_limit: int = MEMORY_LOAD_LIMIT, 679 atomic: bool = True, 680 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 681 attributes: Optional[dict[str, str]] = None, 682 prefetch_file: bool = True, 683 ) -> Union[PosixFile, ObjectFile]: 684 """ 685 Open a file for reading or writing. 686 687 :param path: The logical path of the object to open. 688 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab". 689 :param buffering: The buffering mode. Only applies to PosixFile. 690 :param encoding: The encoding to use for text files. 691 :param disable_read_cache: When set to ``True``, disables caching for file content. 692 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 693 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 694 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB. 695 :param atomic: When set to ``True``, file will be written atomically (rename upon close). 696 This parameter is only applicable to PosixFile in write mode. 697 :param check_source_version: Whether to check the source version of cached objects. 698 :param attributes: Attributes to add to the file. 699 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None. 700 :param prefetch_file: Whether to prefetch the file content. 701 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True. 702 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 703 :raises FileNotFoundError: If the file does not exist (read mode). 704 """ 705 if self._is_posix_file_storage_provider(): 706 return PosixFile( 707 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes 708 ) 709 else: 710 if atomic is False: 711 logger.warning("Non-atomic writes are not supported for object storage providers.") 712 713 return ObjectFile( 714 self, 715 remote_path=path, 716 mode=mode, 717 encoding=encoding, 718 disable_read_cache=disable_read_cache, 719 memory_load_limit=memory_load_limit, 720 check_source_version=check_source_version, 721 attributes=attributes, 722 prefetch_file=prefetch_file, 723 )
724
[docs] 725 def get_posix_path(self, path: str) -> Optional[str]: 726 """ 727 Returns the physical POSIX filesystem path for POSIX storage providers. 728 729 :param path: The path to resolve (may be a symlink or virtual path). 730 :return: Physical POSIX filesystem path if POSIX storage, None otherwise. 731 """ 732 if not self._is_posix_file_storage_provider(): 733 return None 734 735 if self._metadata_provider: 736 resolved = self._metadata_provider.realpath(path) 737 realpath = resolved.physical_path 738 else: 739 realpath = path 740 741 return cast(PosixFileStorageProvider, self._storage_provider)._prepend_base_path(realpath)
742
[docs] 743 def is_file(self, path: str) -> bool: 744 """ 745 Checks whether the specified path points to a file (rather than a folder or directory). 746 747 :param path: The logical path to check. 748 :return: ``True`` if the key points to a file, ``False`` otherwise. 749 """ 750 if self._metadata_provider: 751 resolved = self._metadata_provider.realpath(path) 752 return resolved.exists 753 754 return self._storage_provider.is_file(path)
755
[docs] 756 def commit_metadata(self, prefix: Optional[str] = None) -> None: 757 """ 758 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 759 760 :param prefix: If provided, scans the prefix to find files to commit. 761 """ 762 if self._metadata_provider: 763 with self._metadata_provider_lock or contextlib.nullcontext(): 764 if prefix: 765 # The logical path for each item will be the physical path with 766 # the base physical path removed from the beginning. 767 base_resolved = self._metadata_provider.realpath("") 768 physical_base = base_resolved.physical_path 769 770 prefix_resolved = self._metadata_provider.realpath(prefix) 771 physical_prefix = prefix_resolved.physical_path 772 773 for obj in self._storage_provider.list_objects(physical_prefix): 774 virtual_path = obj.key[len(physical_base) :].lstrip("/") 775 self._metadata_provider.add_file(virtual_path, obj) 776 self._metadata_provider.commit_updates()
777
[docs] 778 def is_empty(self, path: str) -> bool: 779 """ 780 Check whether the specified path is empty. A path is considered empty if there are no 781 objects whose keys start with the given path as a prefix. 782 783 :param path: The logical path to check (typically a directory or folder prefix). 784 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 785 """ 786 if self._metadata_provider: 787 objects = self._metadata_provider.list_objects(path) 788 else: 789 objects = self._storage_provider.list_objects(path) 790 791 try: 792 return next(objects) is None 793 except StopIteration: 794 pass 795 796 return True
797
[docs] 798 def sync_from( 799 self, 800 source_client: AbstractStorageClient, 801 source_path: str = "", 802 target_path: str = "", 803 delete_unmatched_files: bool = False, 804 description: str = "Syncing", 805 num_worker_processes: Optional[int] = None, 806 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 807 patterns: Optional[PatternList] = None, 808 preserve_source_attributes: bool = False, 809 follow_symlinks: bool = True, 810 source_files: Optional[List[str]] = None, 811 ignore_hidden: bool = True, 812 commit_metadata: bool = True, 813 ) -> None: 814 """ 815 Syncs files from the source storage client to "path/". 816 817 :param source_client: The source storage client. 818 :param source_path: The logical path to sync from. 819 :param target_path: The logical path to sync to. 820 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 821 :param description: Description of sync process for logging purposes. 822 :param num_worker_processes: The number of worker processes to use. 823 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 824 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 825 Cannot be used together with source_files. 826 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization. 827 When ``False`` (default), only file content is copied. When ``True``, custom metadata attributes are also preserved. 828 829 .. warning:: 830 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD 831 request for each object to retrieve attributes, which can significantly impact performance on large-scale 832 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile. 833 834 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is ``True``. 835 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these 836 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns. 837 :param ignore_hidden: Whether to ignore hidden files and directories. Default is ``True``. 838 :param commit_metadata: When ``True`` (default), calls :py:meth:`StorageClient.commit_metadata` after sync completes. 839 Set to ``False`` to skip the commit, allowing batching of multiple sync operations before committing manually. 840 :raises ValueError: If both source_files and patterns are provided. 841 :raises RuntimeError: If errors occur during sync operations. The sync will stop on first error (fail-fast). 842 """ 843 if source_files and patterns: 844 raise ValueError("Cannot specify both 'source_files' and 'patterns'. Please use only one filtering method.") 845 846 pattern_matcher = PatternMatcher(patterns) if patterns else None 847 848 # Disable the replica manager during sync 849 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager: 850 # Import here to avoid circular dependency 851 from .client import StorageClient as StorageClientFacade 852 853 source_client = StorageClientFacade(source_client._config) 854 source_client._replica_manager = None 855 856 m = SyncManager(source_client, source_path, self, target_path) 857 858 m.sync_objects( 859 execution_mode=execution_mode, 860 description=description, 861 num_worker_processes=num_worker_processes, 862 delete_unmatched_files=delete_unmatched_files, 863 pattern_matcher=pattern_matcher, 864 preserve_source_attributes=preserve_source_attributes, 865 follow_symlinks=follow_symlinks, 866 source_files=source_files, 867 ignore_hidden=ignore_hidden, 868 commit_metadata=commit_metadata, 869 )
870
[docs] 871 def sync_replicas( 872 self, 873 source_path: str, 874 replica_indices: Optional[List[int]] = None, 875 delete_unmatched_files: bool = False, 876 description: str = "Syncing replica", 877 num_worker_processes: Optional[int] = None, 878 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 879 patterns: Optional[PatternList] = None, 880 ignore_hidden: bool = True, 881 ) -> None: 882 """ 883 Sync files from this client to its replica storage clients. 884 885 :param source_path: The logical path to sync from. 886 :param replica_indices: Specific replica indices to sync to (0-indexed). If None, syncs to all replicas. 887 :param delete_unmatched_files: When set to ``True``, delete files in replicas that don't exist in source. 888 :param description: Description of sync process for logging purposes. 889 :param num_worker_processes: Number of worker processes for parallel sync. 890 :param execution_mode: Execution mode (LOCAL or REMOTE). 891 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 892 :param ignore_hidden: When set to ``True``, ignore hidden files (starting with '.'). Defaults to ``True``. 893 """ 894 if not self._replicas: 895 logger.warning( 896 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable " 897 "secondary storage locations for redundancy and performance.", 898 self._config.profile, 899 ) 900 return None 901 902 if replica_indices: 903 try: 904 replicas = [self._replicas[i] for i in replica_indices] 905 except IndexError as e: 906 raise ValueError(f"Replica index out of range: {replica_indices}") from e 907 else: 908 replicas = self._replicas 909 910 # Disable the replica manager during sync 911 if self._replica_manager: 912 # Import here to avoid circular dependency 913 from .client import StorageClient as StorageClientFacade 914 915 source_client = StorageClientFacade(self._config) 916 source_client._replica_manager = None 917 else: 918 source_client = self 919 920 for replica in replicas: 921 replica.sync_from( 922 source_client, 923 source_path, 924 source_path, 925 delete_unmatched_files=delete_unmatched_files, 926 description=f"{description} ({replica.profile})", 927 num_worker_processes=num_worker_processes, 928 execution_mode=execution_mode, 929 patterns=patterns, 930 ignore_hidden=ignore_hidden, 931 )