Source code for multistorageclient.client.single

   1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
   2# SPDX-License-Identifier: Apache-2.0
   3#
   4# Licensed under the Apache License, Version 2.0 (the "License");
   5# you may not use this file except in compliance with the License.
   6# You may obtain a copy of the License at
   7#
   8# http://www.apache.org/licenses/LICENSE-2.0
   9#
  10# Unless required by applicable law or agreed to in writing, software
  11# distributed under the License is distributed on an "AS IS" BASIS,
  12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13# See the License for the specific language governing permissions and
  14# limitations under the License.
  15
  16import atexit
  17import contextlib
  18import logging
  19import os
  20import threading
  21from collections.abc import Iterator
  22from datetime import datetime, timezone
  23from io import BytesIO
  24from pathlib import PurePosixPath
  25from typing import IO, Any, Optional, Union, cast
  26
  27from ..config import StorageClientConfig
  28from ..constants import DEFAULT_SYNC_BATCH_SIZE, MEMORY_LOAD_LIMIT
  29from ..file import ObjectFile, PosixFile
  30from ..providers.posix_file import PosixFileStorageProvider
  31from ..replica_manager import ReplicaManager
  32from ..retry import retry
  33from ..sync import SyncManager
  34from ..types import (
  35    AWARE_DATETIME_MIN,
  36    MSC_PROTOCOL,
  37    ExecutionMode,
  38    ObjectMetadata,
  39    PatternList,
  40    Range,
  41    Replica,
  42    ResolvedPathState,
  43    SignerType,
  44    SourceVersionCheckMode,
  45    StorageProvider,
  46    SyncResult,
  47)
  48from ..utils import NullStorageClient, PatternMatcher, join_paths
  49from .types import AbstractStorageClient
  50
  51logger = logging.getLogger(__name__)
  52
  53
[docs] 54class SingleStorageClient(AbstractStorageClient): 55 """ 56 Storage client for single-backend configurations. 57 58 Supports full read and write operations against a single storage provider. 59 """ 60 61 _config: StorageClientConfig 62 _storage_provider: StorageProvider 63 _metadata_provider_lock: Optional[threading.Lock] = None 64 _stop_event: Optional[threading.Event] = None 65 _replica_manager: Optional[ReplicaManager] = None 66 67 def __init__(self, config: StorageClientConfig): 68 """ 69 Initialize the :py:class:`SingleStorageClient` with the given configuration. 70 71 :param config: Storage client configuration with storage_provider set 72 :raises ValueError: If config has storage_provider_profiles (multi-backend) 73 """ 74 self._initialize_providers(config) 75 self._initialize_replicas(config.replicas) 76 77 def _initialize_providers(self, config: StorageClientConfig) -> None: 78 if config.storage_provider_profiles: 79 raise ValueError( 80 "SingleStorageClient requires storage_provider, not storage_provider_profiles. " 81 "Use CompositeStorageClient for multi-backend configurations." 82 ) 83 84 if config.storage_provider is None: 85 raise ValueError("SingleStorageClient requires storage_provider to be set.") 86 87 self._config = config 88 self._credentials_provider = self._config.credentials_provider 89 self._storage_provider = cast(StorageProvider, self._config.storage_provider) 90 self._metadata_provider = self._config.metadata_provider 91 self._cache_config = self._config.cache_config 92 self._retry_config = self._config.retry_config 93 self._cache_manager = self._config.cache_manager 94 self._autocommit_config = self._config.autocommit_config 95 96 if self._autocommit_config: 97 if self._metadata_provider: 98 logger.debug("Creating auto-commiter thread") 99 100 if self._autocommit_config.interval_minutes: 101 self._stop_event = threading.Event() 102 self._commit_thread = threading.Thread( 103 target=self._committer_thread, 104 daemon=True, 105 args=(self._autocommit_config.interval_minutes, self._stop_event), 106 ) 107 self._commit_thread.start() 108 109 if self._autocommit_config.at_exit: 110 atexit.register(self._commit_on_exit) 111 112 self._metadata_provider_lock = threading.Lock() 113 else: 114 logger.debug("No metadata provider configured, auto-commit will not be enabled") 115 116 def _initialize_replicas(self, replicas: list[Replica]) -> None: 117 """Initialize replica StorageClient instances (facade).""" 118 # Import here to avoid circular dependency 119 from .client import StorageClient as StorageClientFacade 120 121 # Sort replicas by read_priority, the first one is the primary replica. 122 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority) 123 124 replica_clients = [] 125 for replica in sorted_replicas: 126 if self._config._config_dict is None: 127 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config") 128 replica_config = StorageClientConfig.from_dict( 129 config_dict=self._config._config_dict, profile=replica.replica_profile 130 ) 131 132 storage_client = StorageClientFacade(config=replica_config) 133 replica_clients.append(storage_client) 134 135 self._replicas = replica_clients 136 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None 137 138 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event): 139 if not stop_event: 140 raise RuntimeError("Stop event not set") 141 142 while not stop_event.is_set(): 143 # Wait with the ability to exit early 144 if stop_event.wait(timeout=commit_interval_minutes * 60): 145 break 146 logger.debug("Auto-committing to metadata provider") 147 self.commit_metadata() 148 149 def _commit_on_exit(self): 150 logger.debug("Shutting down, committing metadata one last time...") 151 self.commit_metadata() 152 153 def _get_source_version(self, path: str) -> Optional[str]: 154 """ 155 Get etag from metadata provider or storage provider. 156 """ 157 if self._metadata_provider: 158 metadata = self._metadata_provider.get_object_metadata(path) 159 else: 160 metadata = self._storage_provider.get_object_metadata(path) 161 return metadata.etag 162 163 def _is_cache_enabled(self) -> bool: 164 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider() 165 return enabled 166 167 def _is_posix_file_storage_provider(self) -> bool: 168 """ 169 :return: ``True`` if the storage client is using a POSIX file storage provider, ``False`` otherwise. 170 """ 171 return isinstance(self._storage_provider, PosixFileStorageProvider) 172 173 def _is_rust_client_enabled(self) -> bool: 174 """ 175 :return: ``True`` if the storage provider is using the Rust client, ``False`` otherwise. 176 """ 177 return getattr(self._storage_provider, "_rust_client", None) is not None 178 179 def _read_from_replica_or_primary(self, path: str) -> bytes: 180 """ 181 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files. 182 """ 183 if self._replica_manager is None: 184 raise RuntimeError("Replica manager is not initialized") 185 file_obj = BytesIO() 186 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider) 187 return file_obj.getvalue() 188 189 def __del__(self): 190 if self._stop_event: 191 self._stop_event.set() 192 if self._commit_thread.is_alive(): 193 self._commit_thread.join(timeout=5.0) 194 195 def __getstate__(self) -> dict[str, Any]: 196 state = self.__dict__.copy() 197 del state["_credentials_provider"] 198 del state["_storage_provider"] 199 del state["_metadata_provider"] 200 del state["_cache_manager"] 201 202 if "_metadata_provider_lock" in state: 203 del state["_metadata_provider_lock"] 204 205 if "_replicas" in state: 206 del state["_replicas"] 207 208 # Replica manager could be disabled if it's set to None in the state. 209 if "_replica_manager" in state: 210 if state["_replica_manager"] is not None: 211 del state["_replica_manager"] 212 213 return state 214 215 def __setstate__(self, state: dict[str, Any]) -> None: 216 config = state["_config"] 217 self._initialize_providers(config) 218 219 # Replica manager could be disabled if it's set to None in the state. 220 if "_replica_manager" in state and state["_replica_manager"] is None: 221 self._replica_manager = None 222 else: 223 self._initialize_replicas(config.replicas) 224 225 if self._metadata_provider: 226 self._metadata_provider_lock = threading.Lock() 227 228 @property 229 def profile(self) -> str: 230 """ 231 :return: The profile name of the storage client. 232 """ 233 return self._config.profile 234
[docs] 235 def is_default_profile(self) -> bool: 236 """ 237 :return: ``True`` if the storage client is using the reserved POSIX profile, ``False`` otherwise. 238 """ 239 return self._config.profile == "__filesystem__"
240 241 @property 242 def replicas(self) -> list[AbstractStorageClient]: 243 """ 244 :return: List of replica storage clients, sorted by read priority. 245 """ 246 return self._replicas 247 248 @retry 249 def read( 250 self, 251 path: str, 252 byte_range: Optional[Range] = None, 253 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 254 ) -> bytes: 255 """ 256 Read bytes from a file at the specified logical path. 257 258 :param path: The logical path of the object to read. 259 :param byte_range: Optional byte range to read (offset and length). 260 :param check_source_version: Whether to check the source version of cached objects. 261 :return: The content of the object as bytes. 262 :raises FileNotFoundError: If the file at the specified path does not exist. 263 """ 264 if self._metadata_provider: 265 resolved = self._metadata_provider.realpath(path) 266 if not resolved.exists: 267 raise FileNotFoundError(f"The file at path '{path}' was not found.") 268 path = resolved.physical_path 269 270 # Handle caching logic 271 if self._is_cache_enabled() and self._cache_manager: 272 if byte_range: 273 # Range request with cache 274 try: 275 # Fetch metadata for source version checking (if needed) 276 metadata = None 277 source_version = None 278 if check_source_version == SourceVersionCheckMode.ENABLE: 279 metadata = self._storage_provider.get_object_metadata(path) 280 source_version = metadata.etag 281 elif check_source_version == SourceVersionCheckMode.INHERIT: 282 if self._cache_manager.check_source_version(): 283 metadata = self._storage_provider.get_object_metadata(path) 284 source_version = metadata.etag 285 286 # Optimization: For full-file reads (offset=0, size >= file_size), cache whole file instead of chunking 287 # This avoids creating many small chunks when the user requests the entire file. 288 # Only apply this optimization when metadata is already available (i.e., when version checking is enabled), 289 # to respect the user's choice to disable version checking and avoid extra HEAD requests. 290 if byte_range.offset == 0 and metadata and byte_range.size >= metadata.content_length: 291 full_file_data = self._storage_provider.get_object(path) 292 self._cache_manager.set(path, full_file_data, source_version) 293 return full_file_data[: metadata.content_length] 294 295 # Use chunk-based caching for partial reads or when optimization doesn't apply 296 data = self._cache_manager.read( 297 key=path, 298 source_version=source_version, 299 byte_range=byte_range, 300 storage_provider=self._storage_provider, 301 source_size=metadata.content_length if metadata else None, 302 ) 303 if data is not None: 304 return data 305 # Fallback (should not normally happen) 306 return self._storage_provider.get_object(path, byte_range=byte_range) 307 except (FileNotFoundError, Exception): 308 # Fall back to direct read if metadata fetching fails 309 return self._storage_provider.get_object(path, byte_range=byte_range) 310 else: 311 # Full file read with cache 312 # Only fetch source version if check_source_version is enabled 313 source_version = None 314 if check_source_version == SourceVersionCheckMode.ENABLE: 315 source_version = self._get_source_version(path) 316 elif check_source_version == SourceVersionCheckMode.INHERIT: 317 if self._cache_manager.check_source_version(): 318 source_version = self._get_source_version(path) 319 320 data = self._cache_manager.read(path, source_version) 321 if data is None: 322 if self._replica_manager: 323 data = self._read_from_replica_or_primary(path) 324 else: 325 data = self._storage_provider.get_object(path) 326 self._cache_manager.set(path, data, source_version) 327 return data 328 elif self._replica_manager: 329 # No cache, but replicas available 330 return self._read_from_replica_or_primary(path) 331 else: 332 # No cache, no replicas - direct storage provider read 333 return self._storage_provider.get_object(path, byte_range=byte_range) 334
[docs] 335 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 336 """ 337 Get metadata for a file at the specified path. 338 339 :param path: The logical path of the object. 340 :param strict: When ``True``, only return committed metadata. When ``False``, include pending changes. 341 :return: ObjectMetadata containing file information (size, last modified, etc.). 342 :raises FileNotFoundError: If the file at the specified path does not exist. 343 """ 344 if not path or path == ".": # empty path or '.' provided by the user 345 if self._is_posix_file_storage_provider(): 346 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc) 347 else: 348 last_modified = AWARE_DATETIME_MIN 349 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified) 350 351 if not self._metadata_provider: 352 return self._storage_provider.get_object_metadata(path, strict=strict) 353 354 return self._metadata_provider.get_object_metadata(path, include_pending=not strict)
355 356 @retry 357 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 358 """ 359 Download a remote file to a local path or file-like object. 360 361 :param remote_path: The logical path of the remote file to download. 362 :param local_path: The local file path or file-like object to write to. 363 :raises FileNotFoundError: If the remote file does not exist. 364 """ 365 if self._metadata_provider: 366 resolved = self._metadata_provider.realpath(remote_path) 367 if not resolved.exists: 368 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.") 369 370 metadata = self._metadata_provider.get_object_metadata(remote_path) 371 self._storage_provider.download_file(resolved.physical_path, local_path, metadata) 372 elif self._replica_manager: 373 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider) 374 else: 375 self._storage_provider.download_file(remote_path, local_path) 376 377 @retry 378 def upload_file( 379 self, remote_path: str, local_path: Union[str, IO], attributes: Optional[dict[str, str]] = None 380 ) -> None: 381 """ 382 Uploads a file from the local file system to the storage provider. 383 384 :param remote_path: The path where the object will be stored. 385 :param local_path: The source file to upload. This can either be a string representing the local 386 file path, or a file-like object (e.g., an open file handle). 387 :param attributes: The attributes to add to the file if a new file is created. 388 """ 389 virtual_path = remote_path 390 if self._metadata_provider: 391 resolved = self._metadata_provider.realpath(remote_path) 392 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED): 393 # File exists or has been deleted 394 if not self._metadata_provider.allow_overwrites(): 395 raise FileExistsError( 396 f"The file at path '{virtual_path}' already exists; " 397 f"overwriting is not allowed when using a metadata provider." 398 ) 399 # Generate path for overwrite (future: may return different path for versioning) 400 remote_path = self._metadata_provider.generate_physical_path( 401 remote_path, for_overwrite=True 402 ).physical_path 403 else: 404 # New file - generate path 405 remote_path = self._metadata_provider.generate_physical_path( 406 remote_path, for_overwrite=False 407 ).physical_path 408 409 # if metadata provider is present, we only write attributes to the metadata provider 410 self._storage_provider.upload_file(remote_path, local_path, attributes=None) 411 412 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 413 obj_metadata = self._storage_provider.get_object_metadata(remote_path) 414 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 415 with self._metadata_provider_lock or contextlib.nullcontext(): 416 self._metadata_provider.add_file(virtual_path, obj_metadata) 417 else: 418 self._storage_provider.upload_file(remote_path, local_path, attributes) 419 420 @retry 421 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None: 422 """ 423 Write bytes to a file at the specified path. 424 425 :param path: The logical path where the object will be written. 426 :param body: The content to write as bytes. 427 :param attributes: Optional attributes to add to the file. 428 """ 429 virtual_path = path 430 if self._metadata_provider: 431 resolved = self._metadata_provider.realpath(path) 432 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED): 433 # File exists or has been deleted 434 if not self._metadata_provider.allow_overwrites(): 435 raise FileExistsError( 436 f"The file at path '{virtual_path}' already exists; " 437 f"overwriting is not allowed when using a metadata provider." 438 ) 439 # Generate path for overwrite (future: may return different path for versioning) 440 path = self._metadata_provider.generate_physical_path(path, for_overwrite=True).physical_path 441 else: 442 # New file - generate path 443 path = self._metadata_provider.generate_physical_path(path, for_overwrite=False).physical_path 444 445 # if metadata provider is present, we only write attributes to the metadata provider 446 self._storage_provider.put_object(path, body, attributes=None) 447 448 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 449 obj_metadata = self._storage_provider.get_object_metadata(path) 450 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 451 with self._metadata_provider_lock or contextlib.nullcontext(): 452 self._metadata_provider.add_file(virtual_path, obj_metadata) 453 else: 454 self._storage_provider.put_object(path, body, attributes=attributes) 455
[docs] 456 def copy(self, src_path: str, dest_path: str) -> None: 457 """ 458 Copy a file from source path to destination path. 459 460 :param src_path: The logical path of the source object. 461 :param dest_path: The logical path where the object will be copied to. 462 :raises FileNotFoundError: If the source file does not exist. 463 """ 464 virtual_dest_path = dest_path 465 if self._metadata_provider: 466 # Source: must exist 467 src_resolved = self._metadata_provider.realpath(src_path) 468 if not src_resolved.exists: 469 raise FileNotFoundError(f"The file at path '{src_path}' was not found.") 470 src_path = src_resolved.physical_path 471 472 # Destination: check for overwrites 473 dest_resolved = self._metadata_provider.realpath(dest_path) 474 if dest_resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED): 475 # Destination exists or has been deleted 476 if not self._metadata_provider.allow_overwrites(): 477 raise FileExistsError( 478 f"The file at path '{virtual_dest_path}' already exists; " 479 f"overwriting is not allowed when using a metadata provider." 480 ) 481 # Generate path for overwrite 482 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=True).physical_path 483 else: 484 # New file - generate path 485 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=False).physical_path 486 487 self._storage_provider.copy_object(src_path, dest_path) 488 489 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 490 obj_metadata = self._storage_provider.get_object_metadata(dest_path) 491 with self._metadata_provider_lock or contextlib.nullcontext(): 492 self._metadata_provider.add_file(virtual_dest_path, obj_metadata) 493 else: 494 self._storage_provider.copy_object(src_path, dest_path)
495
[docs] 496 def delete(self, path: str, recursive: bool = False) -> None: 497 """ 498 Deletes an object at the specified path. 499 500 :param path: The logical path of the object or directory to delete. 501 :param recursive: Whether to delete objects in the path recursively. 502 """ 503 obj_metadata = self.info(path) 504 is_dir = obj_metadata and obj_metadata.type == "directory" 505 is_file = obj_metadata and obj_metadata.type == "file" 506 if recursive and is_dir: 507 self.sync_from( 508 cast(AbstractStorageClient, NullStorageClient()), 509 path, 510 path, 511 delete_unmatched_files=True, 512 num_worker_processes=1, 513 description="Deleting", 514 ) 515 # If this is a posix storage provider, we need to also delete remaining directory stubs. 516 # TODO: Notify metadata provider of the changes. 517 if self._is_posix_file_storage_provider(): 518 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider) 519 posix_storage_provider.rmtree(path) 520 return 521 else: 522 # 1) If path is a file: delete the file 523 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag 524 if is_file: 525 virtual_path = path 526 if self._metadata_provider: 527 resolved = self._metadata_provider.realpath(path) 528 if not resolved.exists: 529 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.") 530 531 # Check if soft-delete is enabled 532 if not self._metadata_provider.should_use_soft_delete(): 533 # Hard delete: remove both physical file and metadata 534 self._storage_provider.delete_object(resolved.physical_path) 535 536 with self._metadata_provider_lock or contextlib.nullcontext(): 537 self._metadata_provider.remove_file(virtual_path) 538 else: 539 self._storage_provider.delete_object(path) 540 541 # Delete the cached file if it exists 542 if self._is_cache_enabled(): 543 if self._cache_manager is None: 544 raise RuntimeError("Cache manager is not initialized") 545 self._cache_manager.delete(virtual_path) 546 547 # Delete from replicas if replica manager exists 548 if self._replica_manager: 549 self._replica_manager.delete_from_replicas(virtual_path) 550 elif is_dir: 551 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.") 552 else: 553 raise FileNotFoundError(f"The file at '{path}' was not found.")
554
[docs] 555 def delete_many(self, paths: list[str]) -> None: 556 """ 557 Delete multiple files at the specified paths. Only files are supported; directories are not deleted. 558 559 :param paths: List of logical paths of the files to delete. 560 """ 561 physical_paths_to_delete: list[str] = [] 562 for path in paths: 563 if self._metadata_provider: 564 resolved = self._metadata_provider.realpath(path) 565 if not resolved.exists: 566 raise FileNotFoundError(f"The file at path '{path}' was not found.") 567 if not self._metadata_provider.should_use_soft_delete(): 568 physical_paths_to_delete.append(resolved.physical_path) 569 else: 570 physical_paths_to_delete.append(path) 571 572 if physical_paths_to_delete: 573 self._storage_provider.delete_objects(physical_paths_to_delete) 574 575 for path in paths: 576 virtual_path = path 577 if self._metadata_provider: 578 with self._metadata_provider_lock or contextlib.nullcontext(): 579 self._metadata_provider.remove_file(virtual_path) 580 if self._is_cache_enabled(): 581 if self._cache_manager is None: 582 raise RuntimeError("Cache manager is not initialized") 583 self._cache_manager.delete(virtual_path) 584 if self._replica_manager: 585 self._replica_manager.delete_from_replicas(virtual_path)
586
[docs] 587 def glob( 588 self, 589 pattern: str, 590 include_url_prefix: bool = False, 591 attribute_filter_expression: Optional[str] = None, 592 ) -> list[str]: 593 """ 594 Matches and retrieves a list of object keys in the storage provider that match the specified pattern. 595 596 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``). 597 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 598 :param attribute_filter_expression: The attribute filter expression to apply to the result. 599 :return: A list of object paths that match the specified pattern. 600 """ 601 if self._metadata_provider: 602 results = self._metadata_provider.glob(pattern, attribute_filter_expression) 603 else: 604 results = self._storage_provider.glob(pattern, attribute_filter_expression) 605 606 if include_url_prefix: 607 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 608 609 return results
610 611 def _resolve_single_file( 612 self, 613 path: str, 614 start_after: Optional[str], 615 end_at: Optional[str], 616 include_url_prefix: bool, 617 pattern_matcher: Optional[PatternMatcher], 618 ) -> tuple[Optional[ObjectMetadata], Optional[str]]: 619 """ 620 Resolve whether ``path`` should be handled as a single-file listing result. 621 622 :param path: Candidate file path or directory prefix to resolve. 623 :param start_after: Exclusive lower bound for file key filtering. 624 :param end_at: Inclusive upper bound for file key filtering. 625 :param include_url_prefix: Whether to prefix returned keys with ``msc://profile``. 626 :param pattern_matcher: Optional include/exclude matcher for file keys. 627 :return: A tuple of ``(single_file, normalized_path)``. Returns file metadata and 628 the original path when ``path`` resolves to a file that passes filters; 629 returns ``(None, normalized_directory_path)`` when the caller should 630 continue with directory listing; returns ``(None, None)`` when filtering 631 excludes the single-file candidate and listing should stop. 632 """ 633 if not path: 634 return None, path 635 636 if self.is_file(path): 637 if pattern_matcher and not pattern_matcher.should_include_file(path): 638 return None, None 639 640 try: 641 object_metadata = self.info(path) 642 if start_after and object_metadata.key <= start_after: 643 return None, None 644 if end_at and object_metadata.key > end_at: 645 return None, None 646 if include_url_prefix: 647 self._prepend_url_prefix(object_metadata) 648 return object_metadata, path 649 except FileNotFoundError: 650 return None, path.rstrip("/") + "/" 651 else: 652 return None, path.rstrip("/") + "/" 653 654 def _prepend_url_prefix(self, obj: ObjectMetadata) -> None: 655 if self.is_default_profile(): 656 obj.key = str(PurePosixPath("/") / obj.key) 657 else: 658 obj.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", obj.key) 659 660 def _filter_and_decorate( 661 self, 662 objects: Iterator[ObjectMetadata], 663 include_url_prefix: bool, 664 pattern_matcher: Optional[PatternMatcher], 665 ) -> Iterator[ObjectMetadata]: 666 for obj in objects: 667 if pattern_matcher and not pattern_matcher.should_include_file(obj.key): 668 continue 669 if include_url_prefix: 670 self._prepend_url_prefix(obj) 671 yield obj 672
[docs] 673 def list_recursive( 674 self, 675 path: str = "", 676 start_after: Optional[str] = None, 677 end_at: Optional[str] = None, 678 max_workers: int = 32, 679 look_ahead: int = 2, 680 include_url_prefix: bool = False, 681 follow_symlinks: bool = True, 682 patterns: Optional[PatternList] = None, 683 ) -> Iterator[ObjectMetadata]: 684 """ 685 List files recursively in the storage provider under the specified path. 686 687 :param path: The directory or file path to list objects under. This should be a 688 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 689 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 690 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 691 :param max_workers: Maximum concurrent workers for provider-level recursive listing. 692 :param look_ahead: Prefixes to buffer per worker for provider-level recursive listing. 693 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 694 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing. 695 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 696 :return: An iterator over ObjectMetadata for matching files. 697 """ 698 pattern_matcher = PatternMatcher(patterns) if patterns else None 699 700 single_file, effective_path = self._resolve_single_file( 701 path, start_after, end_at, include_url_prefix, pattern_matcher 702 ) 703 if single_file is not None: 704 yield single_file 705 return 706 if effective_path is None: 707 return 708 709 if self._metadata_provider: 710 objects = self._metadata_provider.list_objects( 711 effective_path, 712 start_after=start_after, 713 end_at=end_at, 714 include_directories=False, 715 ) 716 else: 717 objects = self._storage_provider.list_objects_recursive( 718 effective_path, 719 start_after=start_after, 720 end_at=end_at, 721 max_workers=max_workers, 722 look_ahead=look_ahead, 723 follow_symlinks=follow_symlinks, 724 ) 725 726 yield from self._filter_and_decorate(objects, include_url_prefix, pattern_matcher)
727
[docs] 728 def open( 729 self, 730 path: str, 731 mode: str = "rb", 732 buffering: int = -1, 733 encoding: Optional[str] = None, 734 disable_read_cache: bool = False, 735 memory_load_limit: int = MEMORY_LOAD_LIMIT, 736 atomic: bool = True, 737 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 738 attributes: Optional[dict[str, str]] = None, 739 prefetch_file: bool = True, 740 ) -> Union[PosixFile, ObjectFile]: 741 """ 742 Open a file for reading or writing. 743 744 :param path: The logical path of the object to open. 745 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab". 746 :param buffering: The buffering mode. Only applies to PosixFile. 747 :param encoding: The encoding to use for text files. 748 :param disable_read_cache: When set to ``True``, disables caching for file content. 749 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 750 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 751 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB. 752 :param atomic: When set to ``True``, file will be written atomically (rename upon close). 753 This parameter is only applicable to PosixFile in write mode. 754 :param check_source_version: Whether to check the source version of cached objects. 755 :param attributes: Attributes to add to the file. 756 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None. 757 :param prefetch_file: Whether to prefetch the file content. 758 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True. 759 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 760 :raises FileNotFoundError: If the file does not exist (read mode). 761 """ 762 if self._is_posix_file_storage_provider(): 763 return PosixFile( 764 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes 765 ) 766 else: 767 if atomic is False: 768 logger.warning("Non-atomic writes are not supported for object storage providers.") 769 770 return ObjectFile( 771 self, 772 remote_path=path, 773 mode=mode, 774 encoding=encoding, 775 disable_read_cache=disable_read_cache, 776 memory_load_limit=memory_load_limit, 777 check_source_version=check_source_version, 778 attributes=attributes, 779 prefetch_file=prefetch_file, 780 )
781
[docs] 782 def get_posix_path(self, path: str) -> Optional[str]: 783 """ 784 Returns the physical POSIX filesystem path for POSIX storage providers. 785 786 :param path: The path to resolve (may be a symlink or virtual path). 787 :return: Physical POSIX filesystem path if POSIX storage, None otherwise. 788 """ 789 if not self._is_posix_file_storage_provider(): 790 return None 791 792 if self._metadata_provider: 793 resolved = self._metadata_provider.realpath(path) 794 realpath = resolved.physical_path 795 else: 796 realpath = path 797 798 return cast(PosixFileStorageProvider, self._storage_provider)._prepend_base_path(realpath)
799
[docs] 800 def is_file(self, path: str) -> bool: 801 """ 802 Checks whether the specified path points to a file (rather than a folder or directory). 803 804 :param path: The logical path to check. 805 :return: ``True`` if the key points to a file, ``False`` otherwise. 806 """ 807 if self._metadata_provider: 808 resolved = self._metadata_provider.realpath(path) 809 return resolved.exists 810 811 return self._storage_provider.is_file(path)
812
[docs] 813 def commit_metadata(self, prefix: Optional[str] = None) -> None: 814 """ 815 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 816 817 :param prefix: If provided, scans the prefix to find files to commit. 818 """ 819 if self._metadata_provider: 820 with self._metadata_provider_lock or contextlib.nullcontext(): 821 if prefix: 822 base_resolved = self._metadata_provider.generate_physical_path("") 823 physical_base = base_resolved.physical_path 824 825 prefix_resolved = self._metadata_provider.generate_physical_path(prefix) 826 physical_prefix = prefix_resolved.physical_path 827 828 for obj in self._storage_provider.list_objects(physical_prefix): 829 virtual_path = obj.key[len(physical_base) :].lstrip("/") 830 self._metadata_provider.add_file(virtual_path, obj) 831 self._metadata_provider.commit_updates()
832
[docs] 833 def is_empty(self, path: str) -> bool: 834 """ 835 Check whether the specified path is empty. A path is considered empty if there are no 836 objects whose keys start with the given path as a prefix. 837 838 :param path: The logical path to check (typically a directory or folder prefix). 839 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 840 """ 841 if self._metadata_provider: 842 objects = self._metadata_provider.list_objects(path) 843 else: 844 objects = self._storage_provider.list_objects(path) 845 846 try: 847 return next(objects) is None 848 except StopIteration: 849 pass 850 851 return True
852
[docs] 853 def sync_from( 854 self, 855 source_client: AbstractStorageClient, 856 source_path: str = "", 857 target_path: str = "", 858 delete_unmatched_files: bool = False, 859 description: str = "Syncing", 860 num_worker_processes: Optional[int] = None, 861 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 862 patterns: Optional[PatternList] = None, 863 preserve_source_attributes: bool = False, 864 follow_symlinks: bool = True, 865 source_files: Optional[list[str]] = None, 866 ignore_hidden: bool = True, 867 commit_metadata: bool = True, 868 dryrun: bool = False, 869 dryrun_output_path: Optional[str] = None, 870 ) -> SyncResult: 871 """ 872 Syncs files from the source storage client to "path/". 873 874 :param source_client: The source storage client. 875 :param source_path: The logical path to sync from. 876 :param target_path: The logical path to sync to. 877 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 878 :param description: Description of sync process for logging purposes. 879 :param num_worker_processes: The number of worker processes to use. 880 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 881 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 882 Cannot be used together with source_files. 883 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization. 884 When ``False`` (default), only file content is copied. When ``True``, custom metadata attributes are also preserved. 885 886 .. warning:: 887 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD 888 request for each object to retrieve attributes, which can significantly impact performance on large-scale 889 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile. 890 891 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is ``True``. 892 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these 893 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns. 894 :param ignore_hidden: Whether to ignore hidden files and directories. Default is ``True``. 895 :param commit_metadata: When ``True`` (default), calls :py:meth:`StorageClient.commit_metadata` after sync completes. 896 Set to ``False`` to skip the commit, allowing batching of multiple sync operations before committing manually. 897 :param dryrun: If ``True``, only enumerate and compare objects without performing any copy/delete operations. 898 The returned :py:class:`SyncResult` will include a :py:class:`DryrunResult` with paths to JSONL files. 899 :param dryrun_output_path: Directory to write dryrun JSONL files into. If ``None`` (default), a temporary 900 directory is created automatically. Ignored when ``dryrun`` is ``False``. 901 :raises ValueError: If both source_files and patterns are provided. 902 :raises RuntimeError: If errors occur during sync operations. The sync will stop on first error (fail-fast). 903 """ 904 if source_files and patterns: 905 raise ValueError("Cannot specify both 'source_files' and 'patterns'. Please use only one filtering method.") 906 907 pattern_matcher = PatternMatcher(patterns) if patterns else None 908 909 # Disable the replica manager during sync 910 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager: 911 # Import here to avoid circular dependency 912 from .client import StorageClient as StorageClientFacade 913 914 source_client = StorageClientFacade(source_client._config) 915 source_client._replica_manager = None 916 917 m = SyncManager(source_client, source_path, self, target_path) 918 batch_size = int(os.environ.get("MSC_SYNC_BATCH_SIZE", DEFAULT_SYNC_BATCH_SIZE)) 919 920 return m.sync_objects( 921 execution_mode=execution_mode, 922 description=description, 923 num_worker_processes=num_worker_processes, 924 delete_unmatched_files=delete_unmatched_files, 925 pattern_matcher=pattern_matcher, 926 preserve_source_attributes=preserve_source_attributes, 927 follow_symlinks=follow_symlinks, 928 source_files=source_files, 929 ignore_hidden=ignore_hidden, 930 commit_metadata=commit_metadata, 931 batch_size=batch_size, 932 dryrun=dryrun, 933 dryrun_output_path=dryrun_output_path, 934 )
935
[docs] 936 def sync_replicas( 937 self, 938 source_path: str, 939 replica_indices: Optional[list[int]] = None, 940 delete_unmatched_files: bool = False, 941 description: str = "Syncing replica", 942 num_worker_processes: Optional[int] = None, 943 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 944 patterns: Optional[PatternList] = None, 945 ignore_hidden: bool = True, 946 ) -> None: 947 """ 948 Sync files from this client to its replica storage clients. 949 950 :param source_path: The logical path to sync from. 951 :param replica_indices: Specific replica indices to sync to (0-indexed). If None, syncs to all replicas. 952 :param delete_unmatched_files: When set to ``True``, delete files in replicas that don't exist in source. 953 :param description: Description of sync process for logging purposes. 954 :param num_worker_processes: Number of worker processes for parallel sync. 955 :param execution_mode: Execution mode (LOCAL or REMOTE). 956 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 957 :param ignore_hidden: When set to ``True``, ignore hidden files (starting with '.'). Defaults to ``True``. 958 """ 959 if not self._replicas: 960 logger.warning( 961 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable " 962 "secondary storage locations for redundancy and performance.", 963 self._config.profile, 964 ) 965 return None 966 967 if replica_indices: 968 try: 969 replicas = [self._replicas[i] for i in replica_indices] 970 except IndexError as e: 971 raise ValueError(f"Replica index out of range: {replica_indices}") from e 972 else: 973 replicas = self._replicas 974 975 # Disable the replica manager during sync 976 if self._replica_manager: 977 # Import here to avoid circular dependency 978 from .client import StorageClient as StorageClientFacade 979 980 source_client = StorageClientFacade(self._config) 981 source_client._replica_manager = None 982 else: 983 source_client = self 984 985 for replica in replicas: 986 replica.sync_from( 987 source_client, 988 source_path, 989 source_path, 990 delete_unmatched_files=delete_unmatched_files, 991 description=f"{description} ({replica.profile})", 992 num_worker_processes=num_worker_processes, 993 execution_mode=execution_mode, 994 patterns=patterns, 995 ignore_hidden=ignore_hidden, 996 )
997
[docs] 998 def list( 999 self, 1000 prefix: str = "", 1001 path: str = "", 1002 start_after: Optional[str] = None, 1003 end_at: Optional[str] = None, 1004 include_directories: bool = False, 1005 include_url_prefix: bool = False, 1006 attribute_filter_expression: Optional[str] = None, 1007 show_attributes: bool = False, 1008 follow_symlinks: bool = True, 1009 patterns: Optional[PatternList] = None, 1010 ) -> Iterator[ObjectMetadata]: 1011 """ 1012 List objects in the storage provider under the specified path. 1013 1014 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is 1015 deprecated and will be removed in a future version. 1016 1017 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under. 1018 :param path: The directory or file path to list objects under. This should be a 1019 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 1020 Cannot be used together with ``prefix``. 1021 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 1022 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 1023 :param include_directories: Whether to include directories in the result. When ``True``, directories are returned alongside objects. 1024 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 1025 :param attribute_filter_expression: The attribute filter expression to apply to the result. 1026 :param show_attributes: Whether to return attributes in the result. WARNING: Depending on implementation, there may be a performance impact if this is set to ``True``. 1027 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing. 1028 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 1029 :return: An iterator over ObjectMetadata for matching objects. 1030 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty). 1031 """ 1032 # Parameter validation - either path or prefix, not both 1033 if path and prefix: 1034 raise ValueError( 1035 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 1036 f"Please use only the 'path' parameter for new code. " 1037 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 1038 ) 1039 elif prefix: 1040 logger.debug( 1041 f"The 'prefix' parameter is deprecated and will be removed in a future version. " 1042 f"Please use the 'path' parameter instead. " 1043 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 1044 ) 1045 1046 pattern_matcher = PatternMatcher(patterns) if patterns else None 1047 effective_path = path if path else prefix 1048 1049 single_file, effective_path = self._resolve_single_file( 1050 effective_path, start_after, end_at, include_url_prefix, pattern_matcher 1051 ) 1052 if single_file is not None: 1053 yield single_file 1054 return 1055 if effective_path is None: 1056 return 1057 1058 if self._metadata_provider: 1059 objects = self._metadata_provider.list_objects( 1060 effective_path, 1061 start_after=start_after, 1062 end_at=end_at, 1063 include_directories=include_directories, 1064 attribute_filter_expression=attribute_filter_expression, 1065 show_attributes=show_attributes, 1066 ) 1067 else: 1068 objects = self._storage_provider.list_objects( 1069 effective_path, 1070 start_after=start_after, 1071 end_at=end_at, 1072 include_directories=include_directories, 1073 attribute_filter_expression=attribute_filter_expression, 1074 show_attributes=show_attributes, 1075 follow_symlinks=follow_symlinks, 1076 ) 1077 1078 yield from self._filter_and_decorate(objects, include_url_prefix, pattern_matcher)
1079
[docs] 1080 def generate_presigned_url( 1081 self, 1082 path: str, 1083 *, 1084 method: str = "GET", 1085 signer_type: Optional[SignerType] = None, 1086 signer_options: Optional[dict[str, Any]] = None, 1087 ) -> str: 1088 return self._storage_provider.generate_presigned_url( 1089 path, method=method, signer_type=signer_type, signer_options=signer_options 1090 )