Source code for multistorageclient.client.single

   1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
   2# SPDX-License-Identifier: Apache-2.0
   3#
   4# Licensed under the Apache License, Version 2.0 (the "License");
   5# you may not use this file except in compliance with the License.
   6# You may obtain a copy of the License at
   7#
   8# http://www.apache.org/licenses/LICENSE-2.0
   9#
  10# Unless required by applicable law or agreed to in writing, software
  11# distributed under the License is distributed on an "AS IS" BASIS,
  12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13# See the License for the specific language governing permissions and
  14# limitations under the License.
  15
  16import atexit
  17import contextlib
  18import logging
  19import os
  20import threading
  21from collections.abc import Iterator, Sequence
  22from concurrent.futures import ThreadPoolExecutor, as_completed
  23from datetime import datetime, timezone
  24from io import BytesIO
  25from pathlib import PurePosixPath
  26from typing import IO, Any, Optional, Union, cast
  27
  28from ..config import StorageClientConfig
  29from ..constants import DEFAULT_SYNC_BATCH_SIZE, MEMORY_LOAD_LIMIT
  30from ..file import ObjectFile, PosixFile
  31from ..providers.posix_file import PosixFileStorageProvider
  32from ..replica_manager import ReplicaManager
  33from ..retry import retry
  34from ..sync import SyncManager
  35from ..types import (
  36    AWARE_DATETIME_MIN,
  37    MSC_PROTOCOL,
  38    ExecutionMode,
  39    ObjectMetadata,
  40    PatternList,
  41    Range,
  42    Replica,
  43    ResolvedPathState,
  44    SignerType,
  45    SourceVersionCheckMode,
  46    StorageProvider,
  47    SymlinkHandling,
  48    SyncResult,
  49)
  50from ..utils import NullStorageClient, PatternMatcher, join_paths, resolve_symlink_handling
  51from .types import AbstractStorageClient
  52
  53logger = logging.getLogger(__name__)
  54
  55
[docs] 56class SingleStorageClient(AbstractStorageClient): 57 """ 58 Storage client for single-backend configurations. 59 60 Supports full read and write operations against a single storage provider. 61 """ 62 63 _config: StorageClientConfig 64 _storage_provider: StorageProvider 65 _metadata_provider_lock: Optional[threading.Lock] = None 66 _stop_event: Optional[threading.Event] = None 67 _replica_manager: Optional[ReplicaManager] = None 68 69 def __init__(self, config: StorageClientConfig): 70 """ 71 Initialize the :py:class:`SingleStorageClient` with the given configuration. 72 73 :param config: Storage client configuration with storage_provider set 74 :raises ValueError: If config has storage_provider_profiles (multi-backend) 75 """ 76 self._initialize_providers(config) 77 self._initialize_replicas(config.replicas) 78 79 def _initialize_providers(self, config: StorageClientConfig) -> None: 80 if config.storage_provider_profiles: 81 raise ValueError( 82 "SingleStorageClient requires storage_provider, not storage_provider_profiles. " 83 "Use CompositeStorageClient for multi-backend configurations." 84 ) 85 86 if config.storage_provider is None: 87 raise ValueError("SingleStorageClient requires storage_provider to be set.") 88 89 self._config = config 90 self._credentials_provider = self._config.credentials_provider 91 self._storage_provider = cast(StorageProvider, self._config.storage_provider) 92 self._metadata_provider = self._config.metadata_provider 93 self._cache_config = self._config.cache_config 94 self._retry_config = self._config.retry_config 95 self._cache_manager = self._config.cache_manager 96 self._autocommit_config = self._config.autocommit_config 97 98 if self._autocommit_config: 99 if self._metadata_provider: 100 logger.debug("Creating auto-commiter thread") 101 102 if self._autocommit_config.interval_minutes: 103 self._stop_event = threading.Event() 104 self._commit_thread = threading.Thread( 105 target=self._committer_thread, 106 daemon=True, 107 args=(self._autocommit_config.interval_minutes, self._stop_event), 108 ) 109 self._commit_thread.start() 110 111 if self._autocommit_config.at_exit: 112 atexit.register(self._commit_on_exit) 113 114 self._metadata_provider_lock = threading.Lock() 115 else: 116 logger.debug("No metadata provider configured, auto-commit will not be enabled") 117 118 def _initialize_replicas(self, replicas: list[Replica]) -> None: 119 """Initialize replica StorageClient instances (facade).""" 120 # Import here to avoid circular dependency 121 from .client import StorageClient as StorageClientFacade 122 123 # Sort replicas by read_priority, the first one is the primary replica. 124 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority) 125 126 replica_clients = [] 127 for replica in sorted_replicas: 128 if self._config._config_dict is None: 129 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config") 130 replica_config = StorageClientConfig.from_dict( 131 config_dict=self._config._config_dict, profile=replica.replica_profile 132 ) 133 134 storage_client = StorageClientFacade(config=replica_config) 135 replica_clients.append(storage_client) 136 137 self._replicas = replica_clients 138 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None 139 140 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event): 141 if not stop_event: 142 raise RuntimeError("Stop event not set") 143 144 while not stop_event.is_set(): 145 # Wait with the ability to exit early 146 if stop_event.wait(timeout=commit_interval_minutes * 60): 147 break 148 logger.debug("Auto-committing to metadata provider") 149 self.commit_metadata() 150 151 def _commit_on_exit(self): 152 logger.debug("Shutting down, committing metadata one last time...") 153 self.commit_metadata() 154 155 def _get_source_version(self, path: str) -> Optional[str]: 156 """ 157 Get etag from metadata provider or storage provider. 158 """ 159 if self._metadata_provider: 160 metadata = self._metadata_provider.get_object_metadata(path) 161 else: 162 metadata = self._storage_provider.get_object_metadata(path) 163 return metadata.etag 164 165 def _is_cache_enabled(self) -> bool: 166 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider() 167 return enabled 168 169 def _is_posix_file_storage_provider(self) -> bool: 170 """ 171 :return: ``True`` if the storage client is using a POSIX file storage provider, ``False`` otherwise. 172 """ 173 return isinstance(self._storage_provider, PosixFileStorageProvider) 174 175 def _is_rust_client_enabled(self) -> bool: 176 """ 177 :return: ``True`` if the storage provider is using the Rust client, ``False`` otherwise. 178 """ 179 return getattr(self._storage_provider, "_rust_client", None) is not None 180 181 def _read_from_replica_or_primary(self, path: str) -> bytes: 182 """ 183 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files. 184 """ 185 if self._replica_manager is None: 186 raise RuntimeError("Replica manager is not initialized") 187 file_obj = BytesIO() 188 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider) 189 return file_obj.getvalue() 190 191 def __del__(self): 192 if self._stop_event: 193 self._stop_event.set() 194 if self._commit_thread.is_alive(): 195 self._commit_thread.join(timeout=5.0) 196 197 def __getstate__(self) -> dict[str, Any]: 198 state = self.__dict__.copy() 199 del state["_credentials_provider"] 200 del state["_storage_provider"] 201 del state["_metadata_provider"] 202 del state["_cache_manager"] 203 204 if "_metadata_provider_lock" in state: 205 del state["_metadata_provider_lock"] 206 207 if "_replicas" in state: 208 del state["_replicas"] 209 210 # Replica manager could be disabled if it's set to None in the state. 211 if "_replica_manager" in state: 212 if state["_replica_manager"] is not None: 213 del state["_replica_manager"] 214 215 return state 216 217 def __setstate__(self, state: dict[str, Any]) -> None: 218 config = state["_config"] 219 self._initialize_providers(config) 220 221 # Replica manager could be disabled if it's set to None in the state. 222 if "_replica_manager" in state and state["_replica_manager"] is None: 223 self._replica_manager = None 224 else: 225 self._initialize_replicas(config.replicas) 226 227 if self._metadata_provider: 228 self._metadata_provider_lock = threading.Lock() 229 230 @property 231 def profile(self) -> str: 232 """ 233 :return: The profile name of the storage client. 234 """ 235 return self._config.profile 236
[docs] 237 def is_default_profile(self) -> bool: 238 """ 239 :return: ``True`` if the storage client is using the reserved POSIX profile, ``False`` otherwise. 240 """ 241 return self._config.profile == "__filesystem__"
242 243 @property 244 def replicas(self) -> list[AbstractStorageClient]: 245 """ 246 :return: List of replica storage clients, sorted by read priority. 247 """ 248 return self._replicas 249 250 # -- Metadata resolution helpers -- 251 252 def _resolve_read_path(self, logical_path: str) -> str: 253 """ 254 Resolve a logical path to its physical storage path for read operations. 255 256 :param logical_path: The user-facing logical path. 257 :return: The physical storage path. 258 :raises FileNotFoundError: If the file does not exist in the metadata provider. 259 """ 260 assert self._metadata_provider is not None 261 resolved = self._metadata_provider.realpath(logical_path) 262 if not resolved.exists: 263 raise FileNotFoundError(f"The file at path '{logical_path}' was not found by metadata provider.") 264 return resolved.physical_path 265 266 def _resolve_write_path(self, logical_path: str) -> str: 267 """ 268 Resolve a logical path to its physical storage path for write operations. 269 270 Checks overwrite policy and generates the physical path via the metadata provider. 271 272 :param logical_path: The user-facing logical path. 273 :return: The physical storage path to write to. 274 :raises FileExistsError: If the file exists and overwrites are not allowed. 275 """ 276 assert self._metadata_provider is not None 277 resolved = self._metadata_provider.realpath(logical_path) 278 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED): 279 if not self._metadata_provider.allow_overwrites(): 280 raise FileExistsError( 281 f"The file at path '{logical_path}' already exists; " 282 f"overwriting is not allowed when using a metadata provider." 283 ) 284 return self._metadata_provider.generate_physical_path(logical_path, for_overwrite=True).physical_path 285 return self._metadata_provider.generate_physical_path(logical_path, for_overwrite=False).physical_path 286 287 def _register_written_file( 288 self, virtual_path: str, physical_path: str, attributes: Optional[dict[str, Any]] = None 289 ) -> None: 290 """ 291 Register a written file with the metadata provider. 292 293 Fetches metadata from the storage provider, optionally merges custom attributes, 294 and registers the file with the metadata provider. 295 296 Protects metadata provider mutation with ``_metadata_provider_lock`` when configured. 297 298 .. note:: 299 TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 300 """ 301 assert self._metadata_provider is not None 302 obj_metadata = self._storage_provider.get_object_metadata(physical_path) 303 if attributes: 304 obj_metadata.metadata = (obj_metadata.metadata or {}) | attributes 305 with self._metadata_provider_lock or contextlib.nullcontext(): 306 self._metadata_provider.add_file(virtual_path, obj_metadata) 307 308 def _register_written_files( 309 self, 310 virtual_paths: Sequence[str], 311 physical_paths: Sequence[str], 312 attributes: Optional[Sequence[Optional[dict[str, Any]]]] = None, 313 max_workers: int = 16, 314 ) -> None: 315 """Register multiple written files with the metadata provider concurrently.""" 316 if not virtual_paths: 317 return 318 319 def _register_one(index: int, virtual_path: str, physical_path: str) -> None: 320 file_attrs = attributes[index] if attributes is not None else None 321 self._register_written_file(virtual_path, physical_path, file_attrs) 322 323 worker_count = max(1, min(len(virtual_paths), max_workers)) 324 with ThreadPoolExecutor(max_workers=worker_count) as executor: 325 future_to_virtual_path = { 326 executor.submit(_register_one, i, virtual_path, physical_path): virtual_path 327 for i, (virtual_path, physical_path) in enumerate(zip(virtual_paths, physical_paths)) 328 } 329 for future in as_completed(future_to_virtual_path): 330 future.result() 331 332 @retry 333 def read( 334 self, 335 path: str, 336 byte_range: Optional[Range] = None, 337 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 338 ) -> bytes: 339 """ 340 Read bytes from a file at the specified logical path. 341 342 :param path: The logical path of the object to read. 343 :param byte_range: Optional byte range to read (offset and length). 344 :param check_source_version: Whether to check the source version of cached objects. 345 :return: The content of the object as bytes. 346 :raises FileNotFoundError: If the file at the specified path does not exist. 347 """ 348 if self._metadata_provider: 349 path = self._resolve_read_path(path) 350 351 # Handle caching logic 352 if self._is_cache_enabled() and self._cache_manager: 353 if byte_range: 354 # Range request with cache 355 try: 356 # Fetch metadata for source version checking (if needed) 357 metadata = None 358 source_version = None 359 if check_source_version == SourceVersionCheckMode.ENABLE: 360 metadata = self._storage_provider.get_object_metadata(path) 361 source_version = metadata.etag 362 elif check_source_version == SourceVersionCheckMode.INHERIT: 363 if self._cache_manager.check_source_version(): 364 metadata = self._storage_provider.get_object_metadata(path) 365 source_version = metadata.etag 366 367 # Optimization: For full-file reads (offset=0, size >= file_size), cache whole file instead of chunking 368 # This avoids creating many small chunks when the user requests the entire file. 369 # Only apply this optimization when metadata is already available (i.e., when version checking is enabled), 370 # to respect the user's choice to disable version checking and avoid extra HEAD requests. 371 if byte_range.offset == 0 and metadata and byte_range.size >= metadata.content_length: 372 full_file_data = self._storage_provider.get_object(path) 373 self._cache_manager.set(path, full_file_data, source_version) 374 return full_file_data[: metadata.content_length] 375 376 # Use chunk-based caching for partial reads or when optimization doesn't apply 377 data = self._cache_manager.read( 378 key=path, 379 source_version=source_version, 380 byte_range=byte_range, 381 storage_provider=self._storage_provider, 382 source_size=metadata.content_length if metadata else None, 383 ) 384 if data is not None: 385 return data 386 # Fallback (should not normally happen) 387 return self._storage_provider.get_object(path, byte_range=byte_range) 388 except (FileNotFoundError, Exception): 389 # Fall back to direct read if metadata fetching fails 390 return self._storage_provider.get_object(path, byte_range=byte_range) 391 else: 392 # Full file read with cache 393 # Only fetch source version if check_source_version is enabled 394 source_version = None 395 if check_source_version == SourceVersionCheckMode.ENABLE: 396 source_version = self._get_source_version(path) 397 elif check_source_version == SourceVersionCheckMode.INHERIT: 398 if self._cache_manager.check_source_version(): 399 source_version = self._get_source_version(path) 400 401 data = self._cache_manager.read(path, source_version) 402 if data is None: 403 if self._replica_manager: 404 data = self._read_from_replica_or_primary(path) 405 else: 406 data = self._storage_provider.get_object(path) 407 self._cache_manager.set(path, data, source_version) 408 return data 409 elif self._replica_manager: 410 # No cache, but replicas available 411 return self._read_from_replica_or_primary(path) 412 else: 413 # No cache, no replicas - direct storage provider read 414 return self._storage_provider.get_object(path, byte_range=byte_range) 415
[docs] 416 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 417 """ 418 Get metadata for a file at the specified path. 419 420 :param path: The logical path of the object. 421 :param strict: When ``True``, only return committed metadata. When ``False``, include pending changes. 422 :return: ObjectMetadata containing file information (size, last modified, etc.). 423 :raises FileNotFoundError: If the file at the specified path does not exist. 424 """ 425 if not path or path == ".": # empty path or '.' provided by the user 426 if self._is_posix_file_storage_provider(): 427 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc) 428 else: 429 last_modified = AWARE_DATETIME_MIN 430 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified) 431 432 if not self._metadata_provider: 433 return self._storage_provider.get_object_metadata(path, strict=strict) 434 435 return self._metadata_provider.get_object_metadata(path, include_pending=not strict)
436 437 @retry 438 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 439 """ 440 Download a remote file to a local path or file-like object. 441 442 :param remote_path: The logical path of the remote file to download. 443 :param local_path: The local file path or file-like object to write to. 444 :raises FileNotFoundError: If the remote file does not exist. 445 """ 446 if self._metadata_provider: 447 physical_path = self._resolve_read_path(remote_path) 448 metadata = self._metadata_provider.get_object_metadata(remote_path) 449 self._storage_provider.download_file(physical_path, local_path, metadata) 450 elif self._replica_manager: 451 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider) 452 else: 453 self._storage_provider.download_file(remote_path, local_path) 454
[docs] 455 def download_files( 456 self, 457 remote_paths: list[str], 458 local_paths: list[str], 459 metadata: Optional[Sequence[Optional[ObjectMetadata]]] = None, 460 max_workers: int = 16, 461 ) -> None: 462 """ 463 Download multiple remote files to local paths. 464 465 :param remote_paths: List of logical paths of remote files to download. 466 :param local_paths: List of local file paths to save the downloaded files to. 467 :param metadata: Optional per-file metadata used to decide between regular and multipart download. 468 :param max_workers: Maximum number of concurrent download workers (default: 16). 469 :raises ValueError: If remote_paths and local_paths have different lengths. 470 :raises FileNotFoundError: If any remote file does not exist. 471 """ 472 if len(remote_paths) != len(local_paths): 473 raise ValueError("remote_paths and local_paths must have the same length") 474 475 if self._metadata_provider: 476 physical_paths = [self._resolve_read_path(rp) for rp in remote_paths] 477 self._storage_provider.download_files(physical_paths, local_paths, metadata, max_workers) 478 elif self._replica_manager: 479 for remote_path, local_path in zip(remote_paths, local_paths): 480 self.download_file(remote_path, local_path) 481 else: 482 self._storage_provider.download_files(remote_paths, local_paths, metadata, max_workers)
483 484 @retry 485 def upload_file( 486 self, remote_path: str, local_path: Union[str, IO], attributes: Optional[dict[str, Any]] = None 487 ) -> None: 488 """ 489 Uploads a file from the local file system to the storage provider. 490 491 :param remote_path: The path where the object will be stored. 492 :param local_path: The source file to upload. This can either be a string representing the local 493 file path, or a file-like object (e.g., an open file handle). 494 :param attributes: The attributes to add to the file if a new file is created. 495 """ 496 virtual_path = remote_path 497 if self._metadata_provider: 498 physical_path = self._resolve_write_path(remote_path) 499 self._storage_provider.upload_file(physical_path, local_path, attributes) 500 self._register_written_file(virtual_path, physical_path, attributes) 501 else: 502 self._storage_provider.upload_file(remote_path, local_path, attributes) 503
[docs] 504 def upload_files( 505 self, 506 remote_paths: list[str], 507 local_paths: list[str], 508 attributes: Optional[Sequence[Optional[dict[str, Any]]]] = None, 509 max_workers: int = 16, 510 ) -> None: 511 """ 512 Upload multiple local files to remote storage. 513 514 :param remote_paths: List of logical paths where the files will be uploaded. 515 :param local_paths: List of local file paths to upload. 516 :param attributes: Optional list of per-file attributes to add. When provided, must have the same length 517 as remote_paths/local_paths. Each element may be ``None`` for files that need no attributes. 518 :param max_workers: Maximum number of concurrent upload workers (default: 16). 519 :raises ValueError: If remote_paths and local_paths have different lengths. 520 :raises ValueError: If attributes is provided and has a different length than remote_paths. 521 """ 522 if len(remote_paths) != len(local_paths): 523 raise ValueError("remote_paths and local_paths must have the same length") 524 if attributes is not None and len(attributes) != len(remote_paths): 525 raise ValueError("attributes must have the same length as remote_paths and local_paths") 526 527 if self._metadata_provider: 528 physical_paths = [self._resolve_write_path(rp) for rp in remote_paths] 529 self._storage_provider.upload_files(local_paths, physical_paths, attributes, max_workers) 530 self._register_written_files(remote_paths, physical_paths, attributes, max_workers) 531 else: 532 self._storage_provider.upload_files(local_paths, remote_paths, attributes, max_workers)
533 534 @retry 535 def write(self, path: str, body: bytes, attributes: Optional[dict[str, Any]] = None) -> None: 536 """ 537 Write bytes to a file at the specified path. 538 539 :param path: The logical path where the object will be written. 540 :param body: The content to write as bytes. 541 :param attributes: Optional attributes to add to the file. 542 """ 543 virtual_path = path 544 if self._metadata_provider: 545 physical_path = self._resolve_write_path(path) 546 self._storage_provider.put_object(physical_path, body, attributes=attributes) 547 self._register_written_file(virtual_path, physical_path, attributes) 548 else: 549 self._storage_provider.put_object(path, body, attributes=attributes) 550
[docs] 551 def copy(self, src_path: str, dest_path: str) -> None: 552 """ 553 Copy a file from source path to destination path. 554 555 :param src_path: The logical path of the source object. 556 :param dest_path: The logical path where the object will be copied to. 557 :raises FileNotFoundError: If the source file does not exist. 558 """ 559 virtual_dest_path = dest_path 560 if self._metadata_provider: 561 src_path = self._resolve_read_path(src_path) 562 dest_path = self._resolve_write_path(dest_path) 563 self._storage_provider.copy_object(src_path, dest_path) 564 self._register_written_file(virtual_dest_path, dest_path) 565 else: 566 self._storage_provider.copy_object(src_path, dest_path)
567 595
[docs] 596 def delete(self, path: str, recursive: bool = False) -> None: 597 """ 598 Deletes an object at the specified path. 599 600 :param path: The logical path of the object or directory to delete. 601 :param recursive: Whether to delete objects in the path recursively. 602 """ 603 obj_metadata = self.info(path) 604 is_dir = obj_metadata and obj_metadata.type == "directory" 605 is_file = obj_metadata and obj_metadata.type == "file" 606 if recursive and is_dir: 607 self.sync_from( 608 cast(AbstractStorageClient, NullStorageClient()), 609 path, 610 path, 611 delete_unmatched_files=True, 612 num_worker_processes=1, 613 description="Deleting", 614 ) 615 # If this is a posix storage provider, we need to also delete remaining directory stubs. 616 # TODO: Notify metadata provider of the changes. 617 if self._is_posix_file_storage_provider(): 618 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider) 619 posix_storage_provider.rmtree(path) 620 return 621 else: 622 # 1) If path is a file: delete the file 623 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag 624 if is_file: 625 virtual_path = path 626 if self._metadata_provider: 627 resolved = self._metadata_provider.realpath(path) 628 if not resolved.exists: 629 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.") 630 631 # Check if soft-delete is enabled 632 if not self._metadata_provider.should_use_soft_delete(): 633 # Hard delete: remove both physical file and metadata 634 self._storage_provider.delete_object(resolved.physical_path) 635 636 with self._metadata_provider_lock or contextlib.nullcontext(): 637 self._metadata_provider.remove_file(virtual_path) 638 else: 639 self._storage_provider.delete_object(path) 640 641 # Delete the cached file if it exists 642 if self._is_cache_enabled(): 643 if self._cache_manager is None: 644 raise RuntimeError("Cache manager is not initialized") 645 self._cache_manager.delete(virtual_path) 646 647 # Delete from replicas if replica manager exists 648 if self._replica_manager: 649 self._replica_manager.delete_from_replicas(virtual_path) 650 elif is_dir: 651 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.") 652 else: 653 raise FileNotFoundError(f"The file at '{path}' was not found.")
654
[docs] 655 def delete_many(self, paths: list[str]) -> None: 656 """ 657 Delete multiple files at the specified paths. Only files are supported; directories are not deleted. 658 659 :param paths: List of logical paths of the files to delete. 660 """ 661 physical_paths_to_delete: list[str] = [] 662 for path in paths: 663 if self._metadata_provider: 664 resolved = self._metadata_provider.realpath(path) 665 if not resolved.exists: 666 raise FileNotFoundError(f"The file at path '{path}' was not found.") 667 if not self._metadata_provider.should_use_soft_delete(): 668 physical_paths_to_delete.append(resolved.physical_path) 669 else: 670 physical_paths_to_delete.append(path) 671 672 if physical_paths_to_delete: 673 self._storage_provider.delete_objects(physical_paths_to_delete) 674 675 for path in paths: 676 virtual_path = path 677 if self._metadata_provider: 678 with self._metadata_provider_lock or contextlib.nullcontext(): 679 self._metadata_provider.remove_file(virtual_path) 680 if self._is_cache_enabled(): 681 if self._cache_manager is None: 682 raise RuntimeError("Cache manager is not initialized") 683 self._cache_manager.delete(virtual_path) 684 if self._replica_manager: 685 self._replica_manager.delete_from_replicas(virtual_path)
686
[docs] 687 def glob( 688 self, 689 pattern: str, 690 include_url_prefix: bool = False, 691 attribute_filter_expression: Optional[str] = None, 692 ) -> list[str]: 693 """ 694 Matches and retrieves a list of object keys in the storage provider that match the specified pattern. 695 696 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``). 697 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 698 :param attribute_filter_expression: The attribute filter expression to apply to the result. 699 :return: A list of object paths that match the specified pattern. 700 """ 701 if self._metadata_provider: 702 results = self._metadata_provider.glob(pattern, attribute_filter_expression) 703 else: 704 results = self._storage_provider.glob(pattern, attribute_filter_expression) 705 706 if include_url_prefix: 707 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 708 709 return results
710 711 def _resolve_single_file( 712 self, 713 path: str, 714 start_after: Optional[str], 715 end_at: Optional[str], 716 include_url_prefix: bool, 717 pattern_matcher: Optional[PatternMatcher], 718 ) -> tuple[Optional[ObjectMetadata], Optional[str]]: 719 """ 720 Resolve whether ``path`` should be handled as a single-file listing result. 721 722 :param path: Candidate file path or directory prefix to resolve. 723 :param start_after: Exclusive lower bound for file key filtering. 724 :param end_at: Inclusive upper bound for file key filtering. 725 :param include_url_prefix: Whether to prefix returned keys with ``msc://profile``. 726 :param pattern_matcher: Optional include/exclude matcher for file keys. 727 :return: A tuple of ``(single_file, normalized_path)``. Returns file metadata and 728 the original path when ``path`` resolves to a file that passes filters; 729 returns ``(None, normalized_directory_path)`` when the caller should 730 continue with directory listing; returns ``(None, None)`` when filtering 731 excludes the single-file candidate and listing should stop. 732 """ 733 if not path: 734 return None, path 735 736 if self.is_file(path): 737 if pattern_matcher and not pattern_matcher.should_include_file(path): 738 return None, None 739 740 try: 741 object_metadata = self.info(path) 742 if start_after and object_metadata.key <= start_after: 743 return None, None 744 if end_at and object_metadata.key > end_at: 745 return None, None 746 if include_url_prefix: 747 self._prepend_url_prefix(object_metadata) 748 return object_metadata, path 749 except FileNotFoundError: 750 return None, path.rstrip("/") + "/" 751 else: 752 return None, path.rstrip("/") + "/" 753 754 def _prepend_url_prefix(self, obj: ObjectMetadata) -> None: 755 if self.is_default_profile(): 756 obj.key = str(PurePosixPath("/") / obj.key) 757 else: 758 obj.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", obj.key) 759 760 def _filter_and_decorate( 761 self, 762 objects: Iterator[ObjectMetadata], 763 include_url_prefix: bool, 764 pattern_matcher: Optional[PatternMatcher], 765 ) -> Iterator[ObjectMetadata]: 766 for obj in objects: 767 if pattern_matcher and not pattern_matcher.should_include_file(obj.key): 768 continue 769 if include_url_prefix: 770 self._prepend_url_prefix(obj) 771 yield obj 772
[docs] 773 def list_recursive( 774 self, 775 path: str = "", 776 start_after: Optional[str] = None, 777 end_at: Optional[str] = None, 778 max_workers: int = 32, 779 look_ahead: int = 2, 780 include_url_prefix: bool = False, 781 follow_symlinks: Optional[bool] = None, 782 patterns: Optional[PatternList] = None, 783 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW, 784 ) -> Iterator[ObjectMetadata]: 785 """ 786 List files recursively in the storage provider under the specified path. 787 788 :param path: The directory or file path to list objects under. This should be a 789 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 790 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 791 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 792 :param max_workers: Maximum concurrent workers for provider-level recursive listing. 793 :param look_ahead: Prefixes to buffer per worker for provider-level recursive listing. 794 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 795 :param follow_symlinks: **Deprecated.** Use ``symlink_handling`` instead. 796 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 797 :param symlink_handling: How to handle symbolic links during listing. Only applicable for POSIX file storage providers. 798 :return: An iterator over ObjectMetadata for matching files. 799 """ 800 symlink_handling = resolve_symlink_handling(follow_symlinks, symlink_handling) 801 802 pattern_matcher = PatternMatcher(patterns) if patterns else None 803 804 single_file, effective_path = self._resolve_single_file( 805 path, start_after, end_at, include_url_prefix, pattern_matcher 806 ) 807 if single_file is not None: 808 yield single_file 809 return 810 if effective_path is None: 811 return 812 813 if self._metadata_provider: 814 objects = self._metadata_provider.list_objects( 815 effective_path, 816 start_after=start_after, 817 end_at=end_at, 818 include_directories=False, 819 ) 820 else: 821 objects = self._storage_provider.list_objects_recursive( 822 effective_path, 823 start_after=start_after, 824 end_at=end_at, 825 max_workers=max_workers, 826 look_ahead=look_ahead, 827 symlink_handling=symlink_handling, 828 ) 829 830 yield from self._filter_and_decorate(objects, include_url_prefix, pattern_matcher)
831
[docs] 832 def open( 833 self, 834 path: str, 835 mode: str = "rb", 836 buffering: int = -1, 837 encoding: Optional[str] = None, 838 disable_read_cache: bool = False, 839 memory_load_limit: int = MEMORY_LOAD_LIMIT, 840 atomic: bool = True, 841 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 842 attributes: Optional[dict[str, Any]] = None, 843 prefetch_file: Optional[bool] = None, 844 ) -> Union[PosixFile, ObjectFile]: 845 """ 846 Open a file for reading or writing. 847 848 :param path: The logical path of the object to open. 849 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab". 850 :param buffering: The buffering mode. Only applies to PosixFile. 851 :param encoding: The encoding to use for text files. 852 :param disable_read_cache: When set to ``True``, disables caching for file content. 853 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 854 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 855 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB. 856 :param atomic: When set to ``True``, file will be written atomically (rename upon close). 857 This parameter is only applicable to PosixFile in write mode. 858 :param check_source_version: Whether to check the source version of cached objects. 859 :param attributes: Attributes to add to the file. 860 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None. 861 :param prefetch_file: Whether to prefetch the file content. 862 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 863 If None, inherits from cache configuration. 864 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 865 :raises FileNotFoundError: If the file does not exist (read mode). 866 """ 867 if self._is_posix_file_storage_provider(): 868 return PosixFile( 869 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes 870 ) 871 else: 872 if atomic is False: 873 logger.warning("Non-atomic writes are not supported for object storage providers.") 874 875 return ObjectFile( 876 self, 877 remote_path=path, 878 mode=mode, 879 encoding=encoding, 880 disable_read_cache=disable_read_cache, 881 memory_load_limit=memory_load_limit, 882 check_source_version=check_source_version, 883 attributes=attributes, 884 prefetch_file=prefetch_file, 885 )
886
[docs] 887 def get_posix_path(self, path: str) -> Optional[str]: 888 """ 889 Returns the physical POSIX filesystem path for POSIX storage providers. 890 891 :param path: The path to resolve (may be a symlink or virtual path). 892 :return: Physical POSIX filesystem path if POSIX storage, None otherwise. 893 """ 894 if not self._is_posix_file_storage_provider(): 895 return None 896 897 if self._metadata_provider: 898 resolved = self._metadata_provider.realpath(path) 899 realpath = resolved.physical_path 900 else: 901 realpath = path 902 903 return cast(PosixFileStorageProvider, self._storage_provider)._prepend_base_path(realpath)
904
[docs] 905 def is_file(self, path: str) -> bool: 906 """ 907 Checks whether the specified path points to a file (rather than a folder or directory). 908 909 :param path: The logical path to check. 910 :return: ``True`` if the key points to a file, ``False`` otherwise. 911 """ 912 if self._metadata_provider: 913 resolved = self._metadata_provider.realpath(path) 914 return resolved.exists 915 916 return self._storage_provider.is_file(path)
917
[docs] 918 def commit_metadata(self, prefix: Optional[str] = None) -> None: 919 """ 920 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 921 922 :param prefix: If provided, scans the prefix to find files to commit. 923 """ 924 if self._metadata_provider: 925 with self._metadata_provider_lock or contextlib.nullcontext(): 926 if prefix: 927 base_resolved = self._metadata_provider.generate_physical_path("") 928 physical_base = base_resolved.physical_path 929 930 prefix_resolved = self._metadata_provider.generate_physical_path(prefix) 931 physical_prefix = prefix_resolved.physical_path 932 933 for obj in self._storage_provider.list_objects(physical_prefix): 934 virtual_path = obj.key[len(physical_base) :].lstrip("/") 935 self._metadata_provider.add_file(virtual_path, obj) 936 self._metadata_provider.commit_updates()
937
[docs] 938 def is_empty(self, path: str) -> bool: 939 """ 940 Check whether the specified path is empty. A path is considered empty if there are no 941 objects whose keys start with the given path as a prefix. 942 943 :param path: The logical path to check (typically a directory or folder prefix). 944 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 945 """ 946 if self._metadata_provider: 947 objects = self._metadata_provider.list_objects(path) 948 else: 949 objects = self._storage_provider.list_objects(path) 950 951 try: 952 return next(objects) is None 953 except StopIteration: 954 pass 955 956 return True
957
[docs] 958 def sync_from( 959 self, 960 source_client: AbstractStorageClient, 961 source_path: str = "", 962 target_path: str = "", 963 delete_unmatched_files: bool = False, 964 description: str = "Syncing", 965 num_worker_processes: Optional[int] = None, 966 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 967 patterns: Optional[PatternList] = None, 968 preserve_source_attributes: bool = False, 969 follow_symlinks: Optional[bool] = None, 970 source_files: Optional[list[str]] = None, 971 ignore_hidden: bool = True, 972 commit_metadata: bool = True, 973 dryrun: bool = False, 974 dryrun_output_path: Optional[str] = None, 975 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW, 976 ) -> SyncResult: 977 """ 978 Syncs files from the source storage client to "path/". 979 980 :param source_client: The source storage client. 981 :param source_path: The logical path to sync from. 982 :param target_path: The logical path to sync to. 983 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 984 :param description: Description of sync process for logging purposes. 985 :param num_worker_processes: The number of worker processes to use. 986 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 987 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 988 Cannot be used together with source_files. 989 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization. 990 When ``False`` (default), only file content is copied. When ``True``, custom metadata attributes are also preserved. 991 992 .. warning:: 993 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD 994 request for each object to retrieve attributes, which can significantly impact performance on large-scale 995 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile. 996 997 :param follow_symlinks: **Deprecated.** Use ``symlink_handling`` instead. If the source StorageClient is PosixFile, 998 whether to follow symbolic links. ``True`` maps to :py:attr:`SymlinkHandling.FOLLOW`; ``False`` maps to 999 :py:attr:`SymlinkHandling.SKIP`. Cannot be combined with a non-default ``symlink_handling``. 1000 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these 1001 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns. 1002 :param ignore_hidden: Whether to ignore hidden files and directories. Default is ``True``. 1003 :param commit_metadata: When ``True`` (default), calls :py:meth:`StorageClient.commit_metadata` after sync completes. 1004 Set to ``False`` to skip the commit, allowing batching of multiple sync operations before committing manually. 1005 :param dryrun: If ``True``, only enumerate and compare objects without performing any copy/delete operations. 1006 The returned :py:class:`SyncResult` will include a :py:class:`DryrunResult` with paths to JSONL files. 1007 :param dryrun_output_path: Directory to write dryrun JSONL files into. If ``None`` (default), a temporary 1008 directory is created automatically. Ignored when ``dryrun`` is ``False``. 1009 :param symlink_handling: How to handle symbolic links during sync. 1010 :py:attr:`SymlinkHandling.FOLLOW` (default) dereferences symlinks and copies the target's bytes. 1011 :py:attr:`SymlinkHandling.SKIP` excludes symlinks from the sync. 1012 :py:attr:`SymlinkHandling.PRESERVE` recreates symlinks on the target via :py:meth:`make_symlink` 1013 instead of copying bytes (required for round-trip preservation of symlinks). 1014 :raises ValueError: If both source_files and patterns are provided, or if ``follow_symlinks`` is combined 1015 with a non-default ``symlink_handling``. 1016 :raises RuntimeError: If errors occur during sync operations. The sync will stop on first error (fail-fast). 1017 """ 1018 if source_files and patterns: 1019 raise ValueError("Cannot specify both 'source_files' and 'patterns'. Please use only one filtering method.") 1020 1021 symlink_handling = resolve_symlink_handling(follow_symlinks, symlink_handling) 1022 1023 pattern_matcher = PatternMatcher(patterns) if patterns else None 1024 1025 # Disable the replica manager during sync 1026 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager: 1027 # Import here to avoid circular dependency 1028 from .client import StorageClient as StorageClientFacade 1029 1030 source_client = StorageClientFacade(source_client._config) 1031 source_client._replica_manager = None 1032 1033 m = SyncManager(source_client, source_path, self, target_path) 1034 batch_size = int(os.environ.get("MSC_SYNC_BATCH_SIZE", DEFAULT_SYNC_BATCH_SIZE)) 1035 1036 return m.sync_objects( 1037 execution_mode=execution_mode, 1038 description=description, 1039 num_worker_processes=num_worker_processes, 1040 delete_unmatched_files=delete_unmatched_files, 1041 pattern_matcher=pattern_matcher, 1042 preserve_source_attributes=preserve_source_attributes, 1043 symlink_handling=symlink_handling, 1044 source_files=source_files, 1045 ignore_hidden=ignore_hidden, 1046 commit_metadata=commit_metadata, 1047 batch_size=batch_size, 1048 dryrun=dryrun, 1049 dryrun_output_path=dryrun_output_path, 1050 )
1051
[docs] 1052 def sync_replicas( 1053 self, 1054 source_path: str, 1055 replica_indices: Optional[list[int]] = None, 1056 delete_unmatched_files: bool = False, 1057 description: str = "Syncing replica", 1058 num_worker_processes: Optional[int] = None, 1059 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 1060 patterns: Optional[PatternList] = None, 1061 ignore_hidden: bool = True, 1062 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW, 1063 ) -> None: 1064 """ 1065 Sync files from this client to its replica storage clients. 1066 1067 :param source_path: The logical path to sync from. 1068 :param replica_indices: Specific replica indices to sync to (0-indexed). If None, syncs to all replicas. 1069 :param delete_unmatched_files: When set to ``True``, delete files in replicas that don't exist in source. 1070 :param description: Description of sync process for logging purposes. 1071 :param num_worker_processes: Number of worker processes for parallel sync. 1072 :param execution_mode: Execution mode (LOCAL or REMOTE). 1073 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 1074 :param ignore_hidden: When set to ``True``, ignore hidden files (starting with '.'). Defaults to ``True``. 1075 :param symlink_handling: How to handle symbolic links during sync. 1076 :py:attr:`SymlinkHandling.FOLLOW` (default) dereferences symlinks and copies the target's bytes. 1077 :py:attr:`SymlinkHandling.SKIP` excludes symlinks from the sync. 1078 :py:attr:`SymlinkHandling.PRESERVE` recreates symlinks on each replica via 1079 :py:meth:`make_symlink` instead of copying bytes. 1080 """ 1081 if not self._replicas: 1082 logger.warning( 1083 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable " 1084 "secondary storage locations for redundancy and performance.", 1085 self._config.profile, 1086 ) 1087 return None 1088 1089 if replica_indices: 1090 try: 1091 replicas = [self._replicas[i] for i in replica_indices] 1092 except IndexError as e: 1093 raise ValueError(f"Replica index out of range: {replica_indices}") from e 1094 else: 1095 replicas = self._replicas 1096 1097 # Disable the replica manager during sync 1098 if self._replica_manager: 1099 # Import here to avoid circular dependency 1100 from .client import StorageClient as StorageClientFacade 1101 1102 source_client = StorageClientFacade(self._config) 1103 source_client._replica_manager = None 1104 else: 1105 source_client = self 1106 1107 for replica in replicas: 1108 replica.sync_from( 1109 source_client, 1110 source_path, 1111 source_path, 1112 delete_unmatched_files=delete_unmatched_files, 1113 description=f"{description} ({replica.profile})", 1114 num_worker_processes=num_worker_processes, 1115 execution_mode=execution_mode, 1116 patterns=patterns, 1117 ignore_hidden=ignore_hidden, 1118 symlink_handling=symlink_handling, 1119 )
1120
[docs] 1121 def list( 1122 self, 1123 prefix: str = "", 1124 path: str = "", 1125 start_after: Optional[str] = None, 1126 end_at: Optional[str] = None, 1127 include_directories: bool = False, 1128 include_url_prefix: bool = False, 1129 attribute_filter_expression: Optional[str] = None, 1130 show_attributes: bool = False, 1131 follow_symlinks: Optional[bool] = None, 1132 patterns: Optional[PatternList] = None, 1133 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW, 1134 ) -> Iterator[ObjectMetadata]: 1135 """ 1136 List objects in the storage provider under the specified path. 1137 1138 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is 1139 deprecated and will be removed in a future version. 1140 1141 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under. 1142 :param path: The directory or file path to list objects under. This should be a 1143 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 1144 Cannot be used together with ``prefix``. 1145 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 1146 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 1147 :param include_directories: Whether to include directories in the result. When ``True``, directories are returned alongside objects. 1148 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 1149 :param attribute_filter_expression: The attribute filter expression to apply to the result. 1150 :param show_attributes: Whether to return attributes in the result. WARNING: Depending on implementation, there may be a performance impact if this is set to ``True``. 1151 :param follow_symlinks: **Deprecated.** Use ``symlink_handling`` instead. 1152 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 1153 :param symlink_handling: How to handle symbolic links during listing. Only applicable for POSIX file storage providers. 1154 :return: An iterator over ObjectMetadata for matching objects. 1155 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty). 1156 """ 1157 symlink_handling = resolve_symlink_handling(follow_symlinks, symlink_handling) 1158 1159 # Parameter validation - either path or prefix, not both 1160 if path and prefix: 1161 raise ValueError( 1162 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 1163 f"Please use only the 'path' parameter for new code. " 1164 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 1165 ) 1166 elif prefix: 1167 logger.debug( 1168 f"The 'prefix' parameter is deprecated and will be removed in a future version. " 1169 f"Please use the 'path' parameter instead. " 1170 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 1171 ) 1172 1173 pattern_matcher = PatternMatcher(patterns) if patterns else None 1174 effective_path = path if path else prefix 1175 1176 single_file, effective_path = self._resolve_single_file( 1177 effective_path, start_after, end_at, include_url_prefix, pattern_matcher 1178 ) 1179 if single_file is not None: 1180 yield single_file 1181 return 1182 if effective_path is None: 1183 return 1184 1185 if self._metadata_provider: 1186 objects = self._metadata_provider.list_objects( 1187 effective_path, 1188 start_after=start_after, 1189 end_at=end_at, 1190 include_directories=include_directories, 1191 attribute_filter_expression=attribute_filter_expression, 1192 show_attributes=show_attributes, 1193 ) 1194 else: 1195 objects = self._storage_provider.list_objects( 1196 effective_path, 1197 start_after=start_after, 1198 end_at=end_at, 1199 include_directories=include_directories, 1200 attribute_filter_expression=attribute_filter_expression, 1201 show_attributes=show_attributes, 1202 symlink_handling=symlink_handling, 1203 ) 1204 1205 yield from self._filter_and_decorate(objects, include_url_prefix, pattern_matcher)
1206
[docs] 1207 def generate_presigned_url( 1208 self, 1209 path: str, 1210 *, 1211 method: str = "GET", 1212 signer_type: Optional[SignerType] = None, 1213 signer_options: Optional[dict[str, Any]] = None, 1214 ) -> str: 1215 return self._storage_provider.generate_presigned_url( 1216 path, method=method, signer_type=signer_type, signer_options=signer_options 1217 )