Source code for multistorageclient.client.single

   1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
   2# SPDX-License-Identifier: Apache-2.0
   3#
   4# Licensed under the Apache License, Version 2.0 (the "License");
   5# you may not use this file except in compliance with the License.
   6# You may obtain a copy of the License at
   7#
   8# http://www.apache.org/licenses/LICENSE-2.0
   9#
  10# Unless required by applicable law or agreed to in writing, software
  11# distributed under the License is distributed on an "AS IS" BASIS,
  12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13# See the License for the specific language governing permissions and
  14# limitations under the License.
  15
  16import atexit
  17import contextlib
  18import logging
  19import os
  20import threading
  21from collections.abc import Iterator, Sequence
  22from datetime import datetime, timezone
  23from io import BytesIO
  24from pathlib import PurePosixPath
  25from typing import IO, Any, Optional, Union, cast
  26
  27from ..config import StorageClientConfig
  28from ..constants import DEFAULT_SYNC_BATCH_SIZE, MEMORY_LOAD_LIMIT
  29from ..file import ObjectFile, PosixFile
  30from ..providers.posix_file import PosixFileStorageProvider
  31from ..replica_manager import ReplicaManager
  32from ..retry import retry
  33from ..sync import SyncManager
  34from ..types import (
  35    AWARE_DATETIME_MIN,
  36    MSC_PROTOCOL,
  37    ExecutionMode,
  38    ObjectMetadata,
  39    PatternList,
  40    Range,
  41    Replica,
  42    ResolvedPathState,
  43    SignerType,
  44    SourceVersionCheckMode,
  45    StorageProvider,
  46    SyncResult,
  47)
  48from ..utils import NullStorageClient, PatternMatcher, join_paths
  49from .types import AbstractStorageClient
  50
  51logger = logging.getLogger(__name__)
  52
  53
[docs] 54class SingleStorageClient(AbstractStorageClient): 55 """ 56 Storage client for single-backend configurations. 57 58 Supports full read and write operations against a single storage provider. 59 """ 60 61 _config: StorageClientConfig 62 _storage_provider: StorageProvider 63 _metadata_provider_lock: Optional[threading.Lock] = None 64 _stop_event: Optional[threading.Event] = None 65 _replica_manager: Optional[ReplicaManager] = None 66 67 def __init__(self, config: StorageClientConfig): 68 """ 69 Initialize the :py:class:`SingleStorageClient` with the given configuration. 70 71 :param config: Storage client configuration with storage_provider set 72 :raises ValueError: If config has storage_provider_profiles (multi-backend) 73 """ 74 self._initialize_providers(config) 75 self._initialize_replicas(config.replicas) 76 77 def _initialize_providers(self, config: StorageClientConfig) -> None: 78 if config.storage_provider_profiles: 79 raise ValueError( 80 "SingleStorageClient requires storage_provider, not storage_provider_profiles. " 81 "Use CompositeStorageClient for multi-backend configurations." 82 ) 83 84 if config.storage_provider is None: 85 raise ValueError("SingleStorageClient requires storage_provider to be set.") 86 87 self._config = config 88 self._credentials_provider = self._config.credentials_provider 89 self._storage_provider = cast(StorageProvider, self._config.storage_provider) 90 self._metadata_provider = self._config.metadata_provider 91 self._cache_config = self._config.cache_config 92 self._retry_config = self._config.retry_config 93 self._cache_manager = self._config.cache_manager 94 self._autocommit_config = self._config.autocommit_config 95 96 if self._autocommit_config: 97 if self._metadata_provider: 98 logger.debug("Creating auto-commiter thread") 99 100 if self._autocommit_config.interval_minutes: 101 self._stop_event = threading.Event() 102 self._commit_thread = threading.Thread( 103 target=self._committer_thread, 104 daemon=True, 105 args=(self._autocommit_config.interval_minutes, self._stop_event), 106 ) 107 self._commit_thread.start() 108 109 if self._autocommit_config.at_exit: 110 atexit.register(self._commit_on_exit) 111 112 self._metadata_provider_lock = threading.Lock() 113 else: 114 logger.debug("No metadata provider configured, auto-commit will not be enabled") 115 116 def _initialize_replicas(self, replicas: list[Replica]) -> None: 117 """Initialize replica StorageClient instances (facade).""" 118 # Import here to avoid circular dependency 119 from .client import StorageClient as StorageClientFacade 120 121 # Sort replicas by read_priority, the first one is the primary replica. 122 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority) 123 124 replica_clients = [] 125 for replica in sorted_replicas: 126 if self._config._config_dict is None: 127 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config") 128 replica_config = StorageClientConfig.from_dict( 129 config_dict=self._config._config_dict, profile=replica.replica_profile 130 ) 131 132 storage_client = StorageClientFacade(config=replica_config) 133 replica_clients.append(storage_client) 134 135 self._replicas = replica_clients 136 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None 137 138 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event): 139 if not stop_event: 140 raise RuntimeError("Stop event not set") 141 142 while not stop_event.is_set(): 143 # Wait with the ability to exit early 144 if stop_event.wait(timeout=commit_interval_minutes * 60): 145 break 146 logger.debug("Auto-committing to metadata provider") 147 self.commit_metadata() 148 149 def _commit_on_exit(self): 150 logger.debug("Shutting down, committing metadata one last time...") 151 self.commit_metadata() 152 153 def _get_source_version(self, path: str) -> Optional[str]: 154 """ 155 Get etag from metadata provider or storage provider. 156 """ 157 if self._metadata_provider: 158 metadata = self._metadata_provider.get_object_metadata(path) 159 else: 160 metadata = self._storage_provider.get_object_metadata(path) 161 return metadata.etag 162 163 def _is_cache_enabled(self) -> bool: 164 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider() 165 return enabled 166 167 def _is_posix_file_storage_provider(self) -> bool: 168 """ 169 :return: ``True`` if the storage client is using a POSIX file storage provider, ``False`` otherwise. 170 """ 171 return isinstance(self._storage_provider, PosixFileStorageProvider) 172 173 def _is_rust_client_enabled(self) -> bool: 174 """ 175 :return: ``True`` if the storage provider is using the Rust client, ``False`` otherwise. 176 """ 177 return getattr(self._storage_provider, "_rust_client", None) is not None 178 179 def _read_from_replica_or_primary(self, path: str) -> bytes: 180 """ 181 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files. 182 """ 183 if self._replica_manager is None: 184 raise RuntimeError("Replica manager is not initialized") 185 file_obj = BytesIO() 186 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider) 187 return file_obj.getvalue() 188 189 def __del__(self): 190 if self._stop_event: 191 self._stop_event.set() 192 if self._commit_thread.is_alive(): 193 self._commit_thread.join(timeout=5.0) 194 195 def __getstate__(self) -> dict[str, Any]: 196 state = self.__dict__.copy() 197 del state["_credentials_provider"] 198 del state["_storage_provider"] 199 del state["_metadata_provider"] 200 del state["_cache_manager"] 201 202 if "_metadata_provider_lock" in state: 203 del state["_metadata_provider_lock"] 204 205 if "_replicas" in state: 206 del state["_replicas"] 207 208 # Replica manager could be disabled if it's set to None in the state. 209 if "_replica_manager" in state: 210 if state["_replica_manager"] is not None: 211 del state["_replica_manager"] 212 213 return state 214 215 def __setstate__(self, state: dict[str, Any]) -> None: 216 config = state["_config"] 217 self._initialize_providers(config) 218 219 # Replica manager could be disabled if it's set to None in the state. 220 if "_replica_manager" in state and state["_replica_manager"] is None: 221 self._replica_manager = None 222 else: 223 self._initialize_replicas(config.replicas) 224 225 if self._metadata_provider: 226 self._metadata_provider_lock = threading.Lock() 227 228 @property 229 def profile(self) -> str: 230 """ 231 :return: The profile name of the storage client. 232 """ 233 return self._config.profile 234
[docs] 235 def is_default_profile(self) -> bool: 236 """ 237 :return: ``True`` if the storage client is using the reserved POSIX profile, ``False`` otherwise. 238 """ 239 return self._config.profile == "__filesystem__"
240 241 @property 242 def replicas(self) -> list[AbstractStorageClient]: 243 """ 244 :return: List of replica storage clients, sorted by read priority. 245 """ 246 return self._replicas 247 248 # -- Metadata resolution helpers -- 249 250 def _resolve_read_path(self, logical_path: str) -> str: 251 """ 252 Resolve a logical path to its physical storage path for read operations. 253 254 :param logical_path: The user-facing logical path. 255 :return: The physical storage path. 256 :raises FileNotFoundError: If the file does not exist in the metadata provider. 257 """ 258 assert self._metadata_provider is not None 259 resolved = self._metadata_provider.realpath(logical_path) 260 if not resolved.exists: 261 raise FileNotFoundError(f"The file at path '{logical_path}' was not found by metadata provider.") 262 return resolved.physical_path 263 264 def _resolve_write_path(self, logical_path: str) -> str: 265 """ 266 Resolve a logical path to its physical storage path for write operations. 267 268 Checks overwrite policy and generates the physical path via the metadata provider. 269 270 :param logical_path: The user-facing logical path. 271 :return: The physical storage path to write to. 272 :raises FileExistsError: If the file exists and overwrites are not allowed. 273 """ 274 assert self._metadata_provider is not None 275 resolved = self._metadata_provider.realpath(logical_path) 276 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED): 277 if not self._metadata_provider.allow_overwrites(): 278 raise FileExistsError( 279 f"The file at path '{logical_path}' already exists; " 280 f"overwriting is not allowed when using a metadata provider." 281 ) 282 return self._metadata_provider.generate_physical_path(logical_path, for_overwrite=True).physical_path 283 return self._metadata_provider.generate_physical_path(logical_path, for_overwrite=False).physical_path 284 285 def _register_written_file( 286 self, virtual_path: str, physical_path: str, attributes: Optional[dict[str, str]] = None 287 ) -> None: 288 """ 289 Register a written file with the metadata provider. 290 291 Fetches metadata from the storage provider, optionally merges custom attributes, 292 and registers the file with the metadata provider. 293 294 Caller must hold ``_metadata_provider_lock`` if thread-safety is required. 295 296 .. note:: 297 TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 298 """ 299 assert self._metadata_provider is not None 300 obj_metadata = self._storage_provider.get_object_metadata(physical_path) 301 if attributes: 302 obj_metadata.metadata = (obj_metadata.metadata or {}) | attributes 303 self._metadata_provider.add_file(virtual_path, obj_metadata) 304 305 @retry 306 def read( 307 self, 308 path: str, 309 byte_range: Optional[Range] = None, 310 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 311 ) -> bytes: 312 """ 313 Read bytes from a file at the specified logical path. 314 315 :param path: The logical path of the object to read. 316 :param byte_range: Optional byte range to read (offset and length). 317 :param check_source_version: Whether to check the source version of cached objects. 318 :return: The content of the object as bytes. 319 :raises FileNotFoundError: If the file at the specified path does not exist. 320 """ 321 if self._metadata_provider: 322 path = self._resolve_read_path(path) 323 324 # Handle caching logic 325 if self._is_cache_enabled() and self._cache_manager: 326 if byte_range: 327 # Range request with cache 328 try: 329 # Fetch metadata for source version checking (if needed) 330 metadata = None 331 source_version = None 332 if check_source_version == SourceVersionCheckMode.ENABLE: 333 metadata = self._storage_provider.get_object_metadata(path) 334 source_version = metadata.etag 335 elif check_source_version == SourceVersionCheckMode.INHERIT: 336 if self._cache_manager.check_source_version(): 337 metadata = self._storage_provider.get_object_metadata(path) 338 source_version = metadata.etag 339 340 # Optimization: For full-file reads (offset=0, size >= file_size), cache whole file instead of chunking 341 # This avoids creating many small chunks when the user requests the entire file. 342 # Only apply this optimization when metadata is already available (i.e., when version checking is enabled), 343 # to respect the user's choice to disable version checking and avoid extra HEAD requests. 344 if byte_range.offset == 0 and metadata and byte_range.size >= metadata.content_length: 345 full_file_data = self._storage_provider.get_object(path) 346 self._cache_manager.set(path, full_file_data, source_version) 347 return full_file_data[: metadata.content_length] 348 349 # Use chunk-based caching for partial reads or when optimization doesn't apply 350 data = self._cache_manager.read( 351 key=path, 352 source_version=source_version, 353 byte_range=byte_range, 354 storage_provider=self._storage_provider, 355 source_size=metadata.content_length if metadata else None, 356 ) 357 if data is not None: 358 return data 359 # Fallback (should not normally happen) 360 return self._storage_provider.get_object(path, byte_range=byte_range) 361 except (FileNotFoundError, Exception): 362 # Fall back to direct read if metadata fetching fails 363 return self._storage_provider.get_object(path, byte_range=byte_range) 364 else: 365 # Full file read with cache 366 # Only fetch source version if check_source_version is enabled 367 source_version = None 368 if check_source_version == SourceVersionCheckMode.ENABLE: 369 source_version = self._get_source_version(path) 370 elif check_source_version == SourceVersionCheckMode.INHERIT: 371 if self._cache_manager.check_source_version(): 372 source_version = self._get_source_version(path) 373 374 data = self._cache_manager.read(path, source_version) 375 if data is None: 376 if self._replica_manager: 377 data = self._read_from_replica_or_primary(path) 378 else: 379 data = self._storage_provider.get_object(path) 380 self._cache_manager.set(path, data, source_version) 381 return data 382 elif self._replica_manager: 383 # No cache, but replicas available 384 return self._read_from_replica_or_primary(path) 385 else: 386 # No cache, no replicas - direct storage provider read 387 return self._storage_provider.get_object(path, byte_range=byte_range) 388
[docs] 389 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 390 """ 391 Get metadata for a file at the specified path. 392 393 :param path: The logical path of the object. 394 :param strict: When ``True``, only return committed metadata. When ``False``, include pending changes. 395 :return: ObjectMetadata containing file information (size, last modified, etc.). 396 :raises FileNotFoundError: If the file at the specified path does not exist. 397 """ 398 if not path or path == ".": # empty path or '.' provided by the user 399 if self._is_posix_file_storage_provider(): 400 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc) 401 else: 402 last_modified = AWARE_DATETIME_MIN 403 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified) 404 405 if not self._metadata_provider: 406 return self._storage_provider.get_object_metadata(path, strict=strict) 407 408 return self._metadata_provider.get_object_metadata(path, include_pending=not strict)
409 410 @retry 411 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 412 """ 413 Download a remote file to a local path or file-like object. 414 415 :param remote_path: The logical path of the remote file to download. 416 :param local_path: The local file path or file-like object to write to. 417 :raises FileNotFoundError: If the remote file does not exist. 418 """ 419 if self._metadata_provider: 420 physical_path = self._resolve_read_path(remote_path) 421 metadata = self._metadata_provider.get_object_metadata(remote_path) 422 self._storage_provider.download_file(physical_path, local_path, metadata) 423 elif self._replica_manager: 424 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider) 425 else: 426 self._storage_provider.download_file(remote_path, local_path) 427
[docs] 428 def download_files( 429 self, 430 remote_paths: list[str], 431 local_paths: list[str], 432 metadata: Optional[Sequence[Optional[ObjectMetadata]]] = None, 433 max_workers: int = 16, 434 ) -> None: 435 """ 436 Download multiple remote files to local paths. 437 438 :param remote_paths: List of logical paths of remote files to download. 439 :param local_paths: List of local file paths to save the downloaded files to. 440 :param metadata: Optional per-file metadata used to decide between regular and multipart download. 441 :param max_workers: Maximum number of concurrent download workers (default: 16). 442 :raises ValueError: If remote_paths and local_paths have different lengths. 443 :raises FileNotFoundError: If any remote file does not exist. 444 """ 445 if len(remote_paths) != len(local_paths): 446 raise ValueError("remote_paths and local_paths must have the same length") 447 448 if self._metadata_provider: 449 physical_paths = [self._resolve_read_path(rp) for rp in remote_paths] 450 self._storage_provider.download_files(physical_paths, local_paths, metadata, max_workers) 451 elif self._replica_manager: 452 for remote_path, local_path in zip(remote_paths, local_paths): 453 self.download_file(remote_path, local_path) 454 else: 455 self._storage_provider.download_files(remote_paths, local_paths, metadata, max_workers)
456 457 @retry 458 def upload_file( 459 self, remote_path: str, local_path: Union[str, IO], attributes: Optional[dict[str, str]] = None 460 ) -> None: 461 """ 462 Uploads a file from the local file system to the storage provider. 463 464 :param remote_path: The path where the object will be stored. 465 :param local_path: The source file to upload. This can either be a string representing the local 466 file path, or a file-like object (e.g., an open file handle). 467 :param attributes: The attributes to add to the file if a new file is created. 468 """ 469 virtual_path = remote_path 470 if self._metadata_provider: 471 physical_path = self._resolve_write_path(remote_path) 472 self._storage_provider.upload_file(physical_path, local_path, attributes=None) 473 with self._metadata_provider_lock or contextlib.nullcontext(): 474 self._register_written_file(virtual_path, physical_path, attributes) 475 else: 476 self._storage_provider.upload_file(remote_path, local_path, attributes) 477
[docs] 478 def upload_files( 479 self, 480 remote_paths: list[str], 481 local_paths: list[str], 482 attributes: Optional[Sequence[Optional[dict[str, str]]]] = None, 483 max_workers: int = 16, 484 ) -> None: 485 """ 486 Upload multiple local files to remote storage. 487 488 :param remote_paths: List of logical paths where the files will be uploaded. 489 :param local_paths: List of local file paths to upload. 490 :param attributes: Optional list of per-file attributes to add. When provided, must have the same length 491 as remote_paths/local_paths. Each element may be ``None`` for files that need no attributes. 492 :param max_workers: Maximum number of concurrent upload workers (default: 16). 493 :raises ValueError: If remote_paths and local_paths have different lengths. 494 :raises ValueError: If attributes is provided and has a different length than remote_paths. 495 """ 496 if len(remote_paths) != len(local_paths): 497 raise ValueError("remote_paths and local_paths must have the same length") 498 if attributes is not None and len(attributes) != len(remote_paths): 499 raise ValueError("attributes must have the same length as remote_paths and local_paths") 500 501 if self._metadata_provider: 502 physical_paths = [self._resolve_write_path(rp) for rp in remote_paths] 503 self._storage_provider.upload_files(local_paths, physical_paths, attributes, max_workers) 504 with self._metadata_provider_lock or contextlib.nullcontext(): 505 for i, (virtual_path, physical_path) in enumerate(zip(remote_paths, physical_paths)): 506 file_attrs = attributes[i] if attributes is not None else None 507 self._register_written_file(virtual_path, physical_path, file_attrs) 508 else: 509 self._storage_provider.upload_files(local_paths, remote_paths, attributes, max_workers)
510 511 @retry 512 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None: 513 """ 514 Write bytes to a file at the specified path. 515 516 :param path: The logical path where the object will be written. 517 :param body: The content to write as bytes. 518 :param attributes: Optional attributes to add to the file. 519 """ 520 virtual_path = path 521 if self._metadata_provider: 522 physical_path = self._resolve_write_path(path) 523 self._storage_provider.put_object(physical_path, body, attributes=None) 524 with self._metadata_provider_lock or contextlib.nullcontext(): 525 self._register_written_file(virtual_path, physical_path, attributes) 526 else: 527 self._storage_provider.put_object(path, body, attributes=attributes) 528
[docs] 529 def copy(self, src_path: str, dest_path: str) -> None: 530 """ 531 Copy a file from source path to destination path. 532 533 :param src_path: The logical path of the source object. 534 :param dest_path: The logical path where the object will be copied to. 535 :raises FileNotFoundError: If the source file does not exist. 536 """ 537 virtual_dest_path = dest_path 538 if self._metadata_provider: 539 src_path = self._resolve_read_path(src_path) 540 dest_path = self._resolve_write_path(dest_path) 541 self._storage_provider.copy_object(src_path, dest_path) 542 with self._metadata_provider_lock or contextlib.nullcontext(): 543 self._register_written_file(virtual_dest_path, dest_path) 544 else: 545 self._storage_provider.copy_object(src_path, dest_path)
546
[docs] 547 def delete(self, path: str, recursive: bool = False) -> None: 548 """ 549 Deletes an object at the specified path. 550 551 :param path: The logical path of the object or directory to delete. 552 :param recursive: Whether to delete objects in the path recursively. 553 """ 554 obj_metadata = self.info(path) 555 is_dir = obj_metadata and obj_metadata.type == "directory" 556 is_file = obj_metadata and obj_metadata.type == "file" 557 if recursive and is_dir: 558 self.sync_from( 559 cast(AbstractStorageClient, NullStorageClient()), 560 path, 561 path, 562 delete_unmatched_files=True, 563 num_worker_processes=1, 564 description="Deleting", 565 ) 566 # If this is a posix storage provider, we need to also delete remaining directory stubs. 567 # TODO: Notify metadata provider of the changes. 568 if self._is_posix_file_storage_provider(): 569 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider) 570 posix_storage_provider.rmtree(path) 571 return 572 else: 573 # 1) If path is a file: delete the file 574 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag 575 if is_file: 576 virtual_path = path 577 if self._metadata_provider: 578 resolved = self._metadata_provider.realpath(path) 579 if not resolved.exists: 580 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.") 581 582 # Check if soft-delete is enabled 583 if not self._metadata_provider.should_use_soft_delete(): 584 # Hard delete: remove both physical file and metadata 585 self._storage_provider.delete_object(resolved.physical_path) 586 587 with self._metadata_provider_lock or contextlib.nullcontext(): 588 self._metadata_provider.remove_file(virtual_path) 589 else: 590 self._storage_provider.delete_object(path) 591 592 # Delete the cached file if it exists 593 if self._is_cache_enabled(): 594 if self._cache_manager is None: 595 raise RuntimeError("Cache manager is not initialized") 596 self._cache_manager.delete(virtual_path) 597 598 # Delete from replicas if replica manager exists 599 if self._replica_manager: 600 self._replica_manager.delete_from_replicas(virtual_path) 601 elif is_dir: 602 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.") 603 else: 604 raise FileNotFoundError(f"The file at '{path}' was not found.")
605
[docs] 606 def delete_many(self, paths: list[str]) -> None: 607 """ 608 Delete multiple files at the specified paths. Only files are supported; directories are not deleted. 609 610 :param paths: List of logical paths of the files to delete. 611 """ 612 physical_paths_to_delete: list[str] = [] 613 for path in paths: 614 if self._metadata_provider: 615 resolved = self._metadata_provider.realpath(path) 616 if not resolved.exists: 617 raise FileNotFoundError(f"The file at path '{path}' was not found.") 618 if not self._metadata_provider.should_use_soft_delete(): 619 physical_paths_to_delete.append(resolved.physical_path) 620 else: 621 physical_paths_to_delete.append(path) 622 623 if physical_paths_to_delete: 624 self._storage_provider.delete_objects(physical_paths_to_delete) 625 626 for path in paths: 627 virtual_path = path 628 if self._metadata_provider: 629 with self._metadata_provider_lock or contextlib.nullcontext(): 630 self._metadata_provider.remove_file(virtual_path) 631 if self._is_cache_enabled(): 632 if self._cache_manager is None: 633 raise RuntimeError("Cache manager is not initialized") 634 self._cache_manager.delete(virtual_path) 635 if self._replica_manager: 636 self._replica_manager.delete_from_replicas(virtual_path)
637
[docs] 638 def glob( 639 self, 640 pattern: str, 641 include_url_prefix: bool = False, 642 attribute_filter_expression: Optional[str] = None, 643 ) -> list[str]: 644 """ 645 Matches and retrieves a list of object keys in the storage provider that match the specified pattern. 646 647 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``). 648 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 649 :param attribute_filter_expression: The attribute filter expression to apply to the result. 650 :return: A list of object paths that match the specified pattern. 651 """ 652 if self._metadata_provider: 653 results = self._metadata_provider.glob(pattern, attribute_filter_expression) 654 else: 655 results = self._storage_provider.glob(pattern, attribute_filter_expression) 656 657 if include_url_prefix: 658 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 659 660 return results
661 662 def _resolve_single_file( 663 self, 664 path: str, 665 start_after: Optional[str], 666 end_at: Optional[str], 667 include_url_prefix: bool, 668 pattern_matcher: Optional[PatternMatcher], 669 ) -> tuple[Optional[ObjectMetadata], Optional[str]]: 670 """ 671 Resolve whether ``path`` should be handled as a single-file listing result. 672 673 :param path: Candidate file path or directory prefix to resolve. 674 :param start_after: Exclusive lower bound for file key filtering. 675 :param end_at: Inclusive upper bound for file key filtering. 676 :param include_url_prefix: Whether to prefix returned keys with ``msc://profile``. 677 :param pattern_matcher: Optional include/exclude matcher for file keys. 678 :return: A tuple of ``(single_file, normalized_path)``. Returns file metadata and 679 the original path when ``path`` resolves to a file that passes filters; 680 returns ``(None, normalized_directory_path)`` when the caller should 681 continue with directory listing; returns ``(None, None)`` when filtering 682 excludes the single-file candidate and listing should stop. 683 """ 684 if not path: 685 return None, path 686 687 if self.is_file(path): 688 if pattern_matcher and not pattern_matcher.should_include_file(path): 689 return None, None 690 691 try: 692 object_metadata = self.info(path) 693 if start_after and object_metadata.key <= start_after: 694 return None, None 695 if end_at and object_metadata.key > end_at: 696 return None, None 697 if include_url_prefix: 698 self._prepend_url_prefix(object_metadata) 699 return object_metadata, path 700 except FileNotFoundError: 701 return None, path.rstrip("/") + "/" 702 else: 703 return None, path.rstrip("/") + "/" 704 705 def _prepend_url_prefix(self, obj: ObjectMetadata) -> None: 706 if self.is_default_profile(): 707 obj.key = str(PurePosixPath("/") / obj.key) 708 else: 709 obj.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", obj.key) 710 711 def _filter_and_decorate( 712 self, 713 objects: Iterator[ObjectMetadata], 714 include_url_prefix: bool, 715 pattern_matcher: Optional[PatternMatcher], 716 ) -> Iterator[ObjectMetadata]: 717 for obj in objects: 718 if pattern_matcher and not pattern_matcher.should_include_file(obj.key): 719 continue 720 if include_url_prefix: 721 self._prepend_url_prefix(obj) 722 yield obj 723
[docs] 724 def list_recursive( 725 self, 726 path: str = "", 727 start_after: Optional[str] = None, 728 end_at: Optional[str] = None, 729 max_workers: int = 32, 730 look_ahead: int = 2, 731 include_url_prefix: bool = False, 732 follow_symlinks: bool = True, 733 patterns: Optional[PatternList] = None, 734 ) -> Iterator[ObjectMetadata]: 735 """ 736 List files recursively in the storage provider under the specified path. 737 738 :param path: The directory or file path to list objects under. This should be a 739 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 740 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 741 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 742 :param max_workers: Maximum concurrent workers for provider-level recursive listing. 743 :param look_ahead: Prefixes to buffer per worker for provider-level recursive listing. 744 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 745 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing. 746 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 747 :return: An iterator over ObjectMetadata for matching files. 748 """ 749 pattern_matcher = PatternMatcher(patterns) if patterns else None 750 751 single_file, effective_path = self._resolve_single_file( 752 path, start_after, end_at, include_url_prefix, pattern_matcher 753 ) 754 if single_file is not None: 755 yield single_file 756 return 757 if effective_path is None: 758 return 759 760 if self._metadata_provider: 761 objects = self._metadata_provider.list_objects( 762 effective_path, 763 start_after=start_after, 764 end_at=end_at, 765 include_directories=False, 766 ) 767 else: 768 objects = self._storage_provider.list_objects_recursive( 769 effective_path, 770 start_after=start_after, 771 end_at=end_at, 772 max_workers=max_workers, 773 look_ahead=look_ahead, 774 follow_symlinks=follow_symlinks, 775 ) 776 777 yield from self._filter_and_decorate(objects, include_url_prefix, pattern_matcher)
778
[docs] 779 def open( 780 self, 781 path: str, 782 mode: str = "rb", 783 buffering: int = -1, 784 encoding: Optional[str] = None, 785 disable_read_cache: bool = False, 786 memory_load_limit: int = MEMORY_LOAD_LIMIT, 787 atomic: bool = True, 788 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 789 attributes: Optional[dict[str, str]] = None, 790 prefetch_file: bool = True, 791 ) -> Union[PosixFile, ObjectFile]: 792 """ 793 Open a file for reading or writing. 794 795 :param path: The logical path of the object to open. 796 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab". 797 :param buffering: The buffering mode. Only applies to PosixFile. 798 :param encoding: The encoding to use for text files. 799 :param disable_read_cache: When set to ``True``, disables caching for file content. 800 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 801 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 802 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB. 803 :param atomic: When set to ``True``, file will be written atomically (rename upon close). 804 This parameter is only applicable to PosixFile in write mode. 805 :param check_source_version: Whether to check the source version of cached objects. 806 :param attributes: Attributes to add to the file. 807 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None. 808 :param prefetch_file: Whether to prefetch the file content. 809 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True. 810 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 811 :raises FileNotFoundError: If the file does not exist (read mode). 812 """ 813 if self._is_posix_file_storage_provider(): 814 return PosixFile( 815 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes 816 ) 817 else: 818 if atomic is False: 819 logger.warning("Non-atomic writes are not supported for object storage providers.") 820 821 return ObjectFile( 822 self, 823 remote_path=path, 824 mode=mode, 825 encoding=encoding, 826 disable_read_cache=disable_read_cache, 827 memory_load_limit=memory_load_limit, 828 check_source_version=check_source_version, 829 attributes=attributes, 830 prefetch_file=prefetch_file, 831 )
832
[docs] 833 def get_posix_path(self, path: str) -> Optional[str]: 834 """ 835 Returns the physical POSIX filesystem path for POSIX storage providers. 836 837 :param path: The path to resolve (may be a symlink or virtual path). 838 :return: Physical POSIX filesystem path if POSIX storage, None otherwise. 839 """ 840 if not self._is_posix_file_storage_provider(): 841 return None 842 843 if self._metadata_provider: 844 resolved = self._metadata_provider.realpath(path) 845 realpath = resolved.physical_path 846 else: 847 realpath = path 848 849 return cast(PosixFileStorageProvider, self._storage_provider)._prepend_base_path(realpath)
850
[docs] 851 def is_file(self, path: str) -> bool: 852 """ 853 Checks whether the specified path points to a file (rather than a folder or directory). 854 855 :param path: The logical path to check. 856 :return: ``True`` if the key points to a file, ``False`` otherwise. 857 """ 858 if self._metadata_provider: 859 resolved = self._metadata_provider.realpath(path) 860 return resolved.exists 861 862 return self._storage_provider.is_file(path)
863
[docs] 864 def commit_metadata(self, prefix: Optional[str] = None) -> None: 865 """ 866 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 867 868 :param prefix: If provided, scans the prefix to find files to commit. 869 """ 870 if self._metadata_provider: 871 with self._metadata_provider_lock or contextlib.nullcontext(): 872 if prefix: 873 base_resolved = self._metadata_provider.generate_physical_path("") 874 physical_base = base_resolved.physical_path 875 876 prefix_resolved = self._metadata_provider.generate_physical_path(prefix) 877 physical_prefix = prefix_resolved.physical_path 878 879 for obj in self._storage_provider.list_objects(physical_prefix): 880 virtual_path = obj.key[len(physical_base) :].lstrip("/") 881 self._metadata_provider.add_file(virtual_path, obj) 882 self._metadata_provider.commit_updates()
883
[docs] 884 def is_empty(self, path: str) -> bool: 885 """ 886 Check whether the specified path is empty. A path is considered empty if there are no 887 objects whose keys start with the given path as a prefix. 888 889 :param path: The logical path to check (typically a directory or folder prefix). 890 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 891 """ 892 if self._metadata_provider: 893 objects = self._metadata_provider.list_objects(path) 894 else: 895 objects = self._storage_provider.list_objects(path) 896 897 try: 898 return next(objects) is None 899 except StopIteration: 900 pass 901 902 return True
903
[docs] 904 def sync_from( 905 self, 906 source_client: AbstractStorageClient, 907 source_path: str = "", 908 target_path: str = "", 909 delete_unmatched_files: bool = False, 910 description: str = "Syncing", 911 num_worker_processes: Optional[int] = None, 912 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 913 patterns: Optional[PatternList] = None, 914 preserve_source_attributes: bool = False, 915 follow_symlinks: bool = True, 916 source_files: Optional[list[str]] = None, 917 ignore_hidden: bool = True, 918 commit_metadata: bool = True, 919 dryrun: bool = False, 920 dryrun_output_path: Optional[str] = None, 921 ) -> SyncResult: 922 """ 923 Syncs files from the source storage client to "path/". 924 925 :param source_client: The source storage client. 926 :param source_path: The logical path to sync from. 927 :param target_path: The logical path to sync to. 928 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 929 :param description: Description of sync process for logging purposes. 930 :param num_worker_processes: The number of worker processes to use. 931 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 932 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 933 Cannot be used together with source_files. 934 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization. 935 When ``False`` (default), only file content is copied. When ``True``, custom metadata attributes are also preserved. 936 937 .. warning:: 938 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD 939 request for each object to retrieve attributes, which can significantly impact performance on large-scale 940 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile. 941 942 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is ``True``. 943 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these 944 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns. 945 :param ignore_hidden: Whether to ignore hidden files and directories. Default is ``True``. 946 :param commit_metadata: When ``True`` (default), calls :py:meth:`StorageClient.commit_metadata` after sync completes. 947 Set to ``False`` to skip the commit, allowing batching of multiple sync operations before committing manually. 948 :param dryrun: If ``True``, only enumerate and compare objects without performing any copy/delete operations. 949 The returned :py:class:`SyncResult` will include a :py:class:`DryrunResult` with paths to JSONL files. 950 :param dryrun_output_path: Directory to write dryrun JSONL files into. If ``None`` (default), a temporary 951 directory is created automatically. Ignored when ``dryrun`` is ``False``. 952 :raises ValueError: If both source_files and patterns are provided. 953 :raises RuntimeError: If errors occur during sync operations. The sync will stop on first error (fail-fast). 954 """ 955 if source_files and patterns: 956 raise ValueError("Cannot specify both 'source_files' and 'patterns'. Please use only one filtering method.") 957 958 pattern_matcher = PatternMatcher(patterns) if patterns else None 959 960 # Disable the replica manager during sync 961 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager: 962 # Import here to avoid circular dependency 963 from .client import StorageClient as StorageClientFacade 964 965 source_client = StorageClientFacade(source_client._config) 966 source_client._replica_manager = None 967 968 m = SyncManager(source_client, source_path, self, target_path) 969 batch_size = int(os.environ.get("MSC_SYNC_BATCH_SIZE", DEFAULT_SYNC_BATCH_SIZE)) 970 971 return m.sync_objects( 972 execution_mode=execution_mode, 973 description=description, 974 num_worker_processes=num_worker_processes, 975 delete_unmatched_files=delete_unmatched_files, 976 pattern_matcher=pattern_matcher, 977 preserve_source_attributes=preserve_source_attributes, 978 follow_symlinks=follow_symlinks, 979 source_files=source_files, 980 ignore_hidden=ignore_hidden, 981 commit_metadata=commit_metadata, 982 batch_size=batch_size, 983 dryrun=dryrun, 984 dryrun_output_path=dryrun_output_path, 985 )
986
[docs] 987 def sync_replicas( 988 self, 989 source_path: str, 990 replica_indices: Optional[list[int]] = None, 991 delete_unmatched_files: bool = False, 992 description: str = "Syncing replica", 993 num_worker_processes: Optional[int] = None, 994 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 995 patterns: Optional[PatternList] = None, 996 ignore_hidden: bool = True, 997 ) -> None: 998 """ 999 Sync files from this client to its replica storage clients. 1000 1001 :param source_path: The logical path to sync from. 1002 :param replica_indices: Specific replica indices to sync to (0-indexed). If None, syncs to all replicas. 1003 :param delete_unmatched_files: When set to ``True``, delete files in replicas that don't exist in source. 1004 :param description: Description of sync process for logging purposes. 1005 :param num_worker_processes: Number of worker processes for parallel sync. 1006 :param execution_mode: Execution mode (LOCAL or REMOTE). 1007 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 1008 :param ignore_hidden: When set to ``True``, ignore hidden files (starting with '.'). Defaults to ``True``. 1009 """ 1010 if not self._replicas: 1011 logger.warning( 1012 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable " 1013 "secondary storage locations for redundancy and performance.", 1014 self._config.profile, 1015 ) 1016 return None 1017 1018 if replica_indices: 1019 try: 1020 replicas = [self._replicas[i] for i in replica_indices] 1021 except IndexError as e: 1022 raise ValueError(f"Replica index out of range: {replica_indices}") from e 1023 else: 1024 replicas = self._replicas 1025 1026 # Disable the replica manager during sync 1027 if self._replica_manager: 1028 # Import here to avoid circular dependency 1029 from .client import StorageClient as StorageClientFacade 1030 1031 source_client = StorageClientFacade(self._config) 1032 source_client._replica_manager = None 1033 else: 1034 source_client = self 1035 1036 for replica in replicas: 1037 replica.sync_from( 1038 source_client, 1039 source_path, 1040 source_path, 1041 delete_unmatched_files=delete_unmatched_files, 1042 description=f"{description} ({replica.profile})", 1043 num_worker_processes=num_worker_processes, 1044 execution_mode=execution_mode, 1045 patterns=patterns, 1046 ignore_hidden=ignore_hidden, 1047 )
1048
[docs] 1049 def list( 1050 self, 1051 prefix: str = "", 1052 path: str = "", 1053 start_after: Optional[str] = None, 1054 end_at: Optional[str] = None, 1055 include_directories: bool = False, 1056 include_url_prefix: bool = False, 1057 attribute_filter_expression: Optional[str] = None, 1058 show_attributes: bool = False, 1059 follow_symlinks: bool = True, 1060 patterns: Optional[PatternList] = None, 1061 ) -> Iterator[ObjectMetadata]: 1062 """ 1063 List objects in the storage provider under the specified path. 1064 1065 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is 1066 deprecated and will be removed in a future version. 1067 1068 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under. 1069 :param path: The directory or file path to list objects under. This should be a 1070 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 1071 Cannot be used together with ``prefix``. 1072 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 1073 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 1074 :param include_directories: Whether to include directories in the result. When ``True``, directories are returned alongside objects. 1075 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 1076 :param attribute_filter_expression: The attribute filter expression to apply to the result. 1077 :param show_attributes: Whether to return attributes in the result. WARNING: Depending on implementation, there may be a performance impact if this is set to ``True``. 1078 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing. 1079 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 1080 :return: An iterator over ObjectMetadata for matching objects. 1081 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty). 1082 """ 1083 # Parameter validation - either path or prefix, not both 1084 if path and prefix: 1085 raise ValueError( 1086 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 1087 f"Please use only the 'path' parameter for new code. " 1088 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 1089 ) 1090 elif prefix: 1091 logger.debug( 1092 f"The 'prefix' parameter is deprecated and will be removed in a future version. " 1093 f"Please use the 'path' parameter instead. " 1094 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 1095 ) 1096 1097 pattern_matcher = PatternMatcher(patterns) if patterns else None 1098 effective_path = path if path else prefix 1099 1100 single_file, effective_path = self._resolve_single_file( 1101 effective_path, start_after, end_at, include_url_prefix, pattern_matcher 1102 ) 1103 if single_file is not None: 1104 yield single_file 1105 return 1106 if effective_path is None: 1107 return 1108 1109 if self._metadata_provider: 1110 objects = self._metadata_provider.list_objects( 1111 effective_path, 1112 start_after=start_after, 1113 end_at=end_at, 1114 include_directories=include_directories, 1115 attribute_filter_expression=attribute_filter_expression, 1116 show_attributes=show_attributes, 1117 ) 1118 else: 1119 objects = self._storage_provider.list_objects( 1120 effective_path, 1121 start_after=start_after, 1122 end_at=end_at, 1123 include_directories=include_directories, 1124 attribute_filter_expression=attribute_filter_expression, 1125 show_attributes=show_attributes, 1126 follow_symlinks=follow_symlinks, 1127 ) 1128 1129 yield from self._filter_and_decorate(objects, include_url_prefix, pattern_matcher)
1130
[docs] 1131 def generate_presigned_url( 1132 self, 1133 path: str, 1134 *, 1135 method: str = "GET", 1136 signer_type: Optional[SignerType] = None, 1137 signer_options: Optional[dict[str, Any]] = None, 1138 ) -> str: 1139 return self._storage_provider.generate_presigned_url( 1140 path, method=method, signer_type=signer_type, signer_options=signer_options 1141 )