Source code for multistorageclient.client.client

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import logging
 17from collections.abc import Iterator
 18from typing import IO, Any, Optional, Union
 19
 20from ..config import StorageClientConfig
 21from ..constants import MEMORY_LOAD_LIMIT
 22from ..file import ObjectFile, PosixFile
 23from ..types import (
 24    ExecutionMode,
 25    MetadataProvider,
 26    ObjectMetadata,
 27    PatternList,
 28    Range,
 29    SourceVersionCheckMode,
 30    StorageProvider,
 31    SyncResult,
 32)
 33from .composite import CompositeStorageClient
 34from .single import SingleStorageClient
 35from .types import AbstractStorageClient
 36
 37logger = logging.getLogger(__name__)
 38
 39
[docs] 40class StorageClient(AbstractStorageClient): 41 """ 42 Unified storage client facade. 43 44 Automatically delegates to: 45 - SingleStorageClient: For single-backend configurations (full read/write) 46 - CompositeStorageClient: For multi-backend configurations (read-only) 47 """ 48 49 _delegate: Union[SingleStorageClient, CompositeStorageClient] 50 51 def __init__(self, config: StorageClientConfig): 52 if config.storage_provider_profiles: 53 self._delegate = CompositeStorageClient(config) 54 logger.debug(f"StorageClient '{config.profile}' using CompositeStorageClient (read-only)") 55 else: 56 self._delegate = SingleStorageClient(config) 57 logger.debug(f"StorageClient '{config.profile}' using SingleStorageClient") 58 59 @property 60 def delegate(self) -> Union[SingleStorageClient, CompositeStorageClient]: 61 """ 62 Access to underlying delegate storage client. 63 64 :return: SingleStorageClient or CompositeStorageClient. 65 """ 66 return self._delegate 67 68 @property 69 def _config(self) -> StorageClientConfig: 70 """ 71 :return: The configuration for the underlying storage client. 72 """ 73 return self._delegate._config 74 75 @property 76 def _storage_provider(self) -> Optional[StorageProvider]: 77 """ 78 :return: The storage provider for the underlying storage client. None for CompositeStorageClient. 79 """ 80 return self._delegate._storage_provider 81 82 @_storage_provider.setter 83 def _storage_provider(self, value: StorageProvider) -> None: 84 """Allow mutation of storage provider for testing purposes.""" 85 if isinstance(self._delegate, SingleStorageClient): 86 self._delegate._storage_provider = value 87 88 @property 89 def _metadata_provider(self) -> Optional[MetadataProvider]: 90 """ 91 :return: The metadata provider for the underlying storage client. 92 """ 93 return self._delegate._metadata_provider 94 95 @_metadata_provider.setter 96 def _metadata_provider(self, value: Optional[MetadataProvider]) -> None: 97 """Allow mutation of metadata provider for DSS compatibility.""" 98 if isinstance(self._delegate, CompositeStorageClient) and value is None: 99 raise ValueError("CompositeStorageClient requires a metadata_provider for routing decisions.") 100 self._delegate._metadata_provider = value # type: ignore[assignment] 101 102 @property 103 def _metadata_provider_lock(self): 104 """ 105 Access to metadata provider lock for DSS compatibility. 106 107 :return: The lock for the metadata provider. 108 """ 109 return self._delegate._metadata_provider_lock 110 111 @_metadata_provider_lock.setter 112 def _metadata_provider_lock(self, value): 113 """Allow mutation of metadata provider lock for DSS compatibility.""" 114 self._delegate._metadata_provider_lock = value 115 116 @property 117 def _credentials_provider(self): 118 """ 119 :return: The credentials provider for the underlying storage client. 120 """ 121 return self._delegate._credentials_provider 122 123 @property 124 def _retry_config(self): 125 """ 126 :return: The retry configuration for the underlying storage client. 127 """ 128 return self._delegate._retry_config 129 130 @property 131 def _cache_manager(self): 132 """ 133 :return: The cache manager for the underlying storage client. 134 """ 135 return self._delegate._cache_manager 136 137 @property 138 def _replica_manager(self): 139 """ 140 :return: The replica manager for the underlying storage client. 141 """ 142 return self._delegate._replica_manager 143 144 @_replica_manager.setter 145 def _replica_manager(self, value): 146 """ 147 Allow mutation of replica manager for testing purposes. 148 149 :param value: The new replica manager. 150 """ 151 if isinstance(self._delegate, SingleStorageClient): 152 self._delegate._replica_manager = value 153 154 @property 155 def profile(self) -> str: 156 """ 157 :return: The profile name of the storage client. 158 """ 159 return self._delegate.profile 160 161 @property 162 def replicas(self) -> list[AbstractStorageClient]: 163 """ 164 :return: List of replica storage clients, sorted by read priority. 165 """ 166 return self._delegate.replicas 167
[docs] 168 def is_default_profile(self) -> bool: 169 """ 170 :return: ``True`` if the storage client is using the default profile, ``False`` otherwise. 171 """ 172 return self._delegate.is_default_profile()
173 174 def _is_rust_client_enabled(self) -> bool: 175 """ 176 :return: ``True`` if the storage provider is using the Rust client, ``False`` otherwise. 177 """ 178 return self._delegate._is_rust_client_enabled() 179 180 def _is_posix_file_storage_provider(self) -> bool: 181 """ 182 :return: ``True`` if the storage client is using a POSIX file storage provider, ``False`` otherwise. 183 """ 184 return self._delegate._is_posix_file_storage_provider() 185
[docs] 186 def get_posix_path(self, path: str) -> Optional[str]: 187 """ 188 Returns the physical POSIX filesystem path for POSIX storage providers. 189 190 :param path: The path to resolve (may be a symlink or virtual path). 191 :return: Physical POSIX filesystem path if POSIX storage, None otherwise. 192 """ 193 return self._delegate.get_posix_path(path)
194
[docs] 195 def read( 196 self, 197 path: str, 198 byte_range: Optional[Range] = None, 199 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 200 ) -> bytes: 201 """ 202 Read bytes from a file at the specified logical path. 203 204 :param path: The logical path of the object to read. 205 :param byte_range: Optional byte range to read (offset and length). 206 :param check_source_version: Whether to check the source version of cached objects. 207 :return: The content of the object as bytes. 208 :raises FileNotFoundError: If the file at the specified path does not exist. 209 """ 210 return self._delegate.read(path, byte_range, check_source_version)
211
[docs] 212 def open( 213 self, 214 path: str, 215 mode: str = "rb", 216 buffering: int = -1, 217 encoding: Optional[str] = None, 218 disable_read_cache: bool = False, 219 memory_load_limit: int = MEMORY_LOAD_LIMIT, 220 atomic: bool = True, 221 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 222 attributes: Optional[dict[str, str]] = None, 223 prefetch_file: bool = True, 224 ) -> Union[PosixFile, ObjectFile]: 225 """ 226 Open a file for reading or writing. 227 228 :param path: The logical path of the object to open. 229 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab". 230 :param buffering: The buffering mode. Only applies to PosixFile. 231 :param encoding: The encoding to use for text files. 232 :param disable_read_cache: When set to ``True``, disables caching for file content. 233 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 234 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 235 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB. 236 :param atomic: When set to ``True``, file will be written atomically (rename upon close). 237 This parameter is only applicable to PosixFile in write mode. 238 :param check_source_version: Whether to check the source version of cached objects. 239 :param attributes: Attributes to add to the file. 240 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None. 241 :param prefetch_file: Whether to prefetch the file content. 242 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True. 243 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 244 :raises FileNotFoundError: If the file does not exist (read mode). 245 :raises NotImplementedError: If the operation is not supported (e.g., write on CompositeStorageClient). 246 """ 247 return self._delegate.open( 248 path, 249 mode, 250 buffering, 251 encoding, 252 disable_read_cache, 253 memory_load_limit, 254 atomic, 255 check_source_version, 256 attributes, 257 prefetch_file, 258 )
259
[docs] 260 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 261 """ 262 Download a remote file to a local path or file-like object. 263 264 :param remote_path: The logical path of the remote file to download. 265 :param local_path: The local file path or file-like object to write to. 266 :raises FileNotFoundError: If the remote file does not exist. 267 """ 268 return self._delegate.download_file(remote_path, local_path)
269
[docs] 270 def glob( 271 self, 272 pattern: str, 273 include_url_prefix: bool = False, 274 attribute_filter_expression: Optional[str] = None, 275 ) -> list[str]: 276 """ 277 Matches and retrieves a list of object keys in the storage provider that match the specified pattern. 278 279 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``). 280 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 281 :param attribute_filter_expression: The attribute filter expression to apply to the result. 282 :return: A list of object paths that match the specified pattern. 283 """ 284 return self._delegate.glob(pattern, include_url_prefix, attribute_filter_expression)
285
[docs] 286 def list_recursive( 287 self, 288 path: str = "", 289 start_after: Optional[str] = None, 290 end_at: Optional[str] = None, 291 max_workers: int = 32, 292 look_ahead: int = 2, 293 include_url_prefix: bool = False, 294 follow_symlinks: bool = True, 295 patterns: Optional[PatternList] = None, 296 ) -> Iterator[ObjectMetadata]: 297 """ 298 List files recursively in the storage provider under the specified path. 299 300 :param path: The directory or file path to list objects under. This should be a 301 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 302 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 303 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 304 :param max_workers: Maximum concurrent workers for provider-level recursive listing. 305 :param look_ahead: Prefixes to buffer per worker for provider-level recursive listing. 306 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 307 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing. 308 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 309 :return: An iterator over ObjectMetadata for matching files. 310 """ 311 return self._delegate.list_recursive( 312 path=path, 313 start_after=start_after, 314 end_at=end_at, 315 max_workers=max_workers, 316 look_ahead=look_ahead, 317 include_url_prefix=include_url_prefix, 318 follow_symlinks=follow_symlinks, 319 patterns=patterns, 320 )
321
[docs] 322 def is_file(self, path: str) -> bool: 323 """ 324 Checks whether the specified path points to a file (rather than a folder or directory). 325 326 :param path: The logical path to check. 327 :return: ``True`` if the key points to a file, ``False`` otherwise. 328 """ 329 return self._delegate.is_file(path)
330
[docs] 331 def is_empty(self, path: str) -> bool: 332 """ 333 Check whether the specified path is empty. A path is considered empty if there are no 334 objects whose keys start with the given path as a prefix. 335 336 :param path: The logical path to check (typically a directory or folder prefix). 337 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 338 """ 339 return self._delegate.is_empty(path)
340
[docs] 341 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 342 """ 343 Get metadata for a file at the specified path. 344 345 :param path: The logical path of the object. 346 :param strict: When ``True``, only return committed metadata. When ``False``, include pending changes. 347 :return: ObjectMetadata containing file information (size, last modified, etc.). 348 :raises FileNotFoundError: If the file at the specified path does not exist. 349 """ 350 return self._delegate.info(path, strict)
351
[docs] 352 def write( 353 self, 354 path: str, 355 body: bytes, 356 attributes: Optional[dict[str, str]] = None, 357 ) -> None: 358 """ 359 Write bytes to a file at the specified path. 360 361 :param path: The logical path where the object will be written. 362 :param body: The content to write as bytes. 363 :param attributes: Optional attributes to add to the file. 364 :raises NotImplementedError: If write operations are not supported (e.g., CompositeStorageClient). 365 """ 366 return self._delegate.write(path, body, attributes)
367
[docs] 368 def delete(self, path: str, recursive: bool = False) -> None: 369 """ 370 Delete a file or directory at the specified path. 371 372 :param path: The logical path of the object to delete. 373 :param recursive: When True, delete directory and all its contents recursively. 374 :raises FileNotFoundError: If the file or directory does not exist. 375 :raises NotImplementedError: If delete operations are not supported (e.g., CompositeStorageClient). 376 """ 377 return self._delegate.delete(path, recursive)
378
[docs] 379 def delete_many(self, paths: list[str]) -> None: 380 """ 381 Delete multiple files at the specified paths. Only files are supported; directories are not deleted. 382 383 :param paths: List of logical paths of the files to delete. 384 :raises NotImplementedError: If delete operations are not supported (e.g., CompositeStorageClient). 385 """ 386 return self._delegate.delete_many(paths)
387
[docs] 388 def copy(self, src_path: str, dest_path: str) -> None: 389 """ 390 Copy a file from source path to destination path. 391 392 :param src_path: The logical path of the source object. 393 :param dest_path: The logical path where the object will be copied to. 394 :raises FileNotFoundError: If the source file does not exist. 395 :raises NotImplementedError: If copy operations are not supported (e.g., CompositeStorageClient). 396 """ 397 return self._delegate.copy(src_path, dest_path)
398
[docs] 399 def upload_file( 400 self, 401 remote_path: str, 402 local_path: Union[str, IO], 403 attributes: Optional[dict[str, str]] = None, 404 ) -> None: 405 """ 406 Upload a local file to remote storage. 407 408 :param remote_path: The logical path where the file will be uploaded. 409 :param local_path: The local file path or file-like object to upload. 410 :param attributes: Optional attributes to add to the file. 411 :raises FileNotFoundError: If the local file does not exist. 412 :raises NotImplementedError: If upload operations are not supported (e.g., CompositeStorageClient). 413 """ 414 return self._delegate.upload_file(remote_path, local_path, attributes)
415
[docs] 416 def commit_metadata(self, prefix: Optional[str] = None) -> None: 417 """ 418 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 419 420 :param prefix: If provided, scans the prefix to find files to commit. 421 """ 422 return self._delegate.commit_metadata(prefix)
423
[docs] 424 def sync_from( 425 self, 426 source_client: AbstractStorageClient, 427 source_path: str = "", 428 target_path: str = "", 429 delete_unmatched_files: bool = False, 430 description: str = "Syncing", 431 num_worker_processes: Optional[int] = None, 432 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 433 patterns: Optional[PatternList] = None, 434 preserve_source_attributes: bool = False, 435 follow_symlinks: bool = True, 436 source_files: Optional[list[str]] = None, 437 ignore_hidden: bool = True, 438 commit_metadata: bool = True, 439 ) -> SyncResult: 440 """ 441 Syncs files from the source storage client to "path/". 442 443 :param source_client: The source storage client. 444 :param source_path: The logical path to sync from. 445 :param target_path: The logical path to sync to. 446 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 447 :param description: Description of sync process for logging purposes. 448 :param num_worker_processes: The number of worker processes to use. 449 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 450 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 451 Cannot be used together with source_files. 452 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization. 453 When ``False`` (default), only file content is copied. When ``True``, custom metadata attributes are also preserved. 454 455 .. warning:: 456 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD 457 request for each object to retrieve attributes, which can significantly impact performance on large-scale 458 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile. 459 460 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is ``True``. 461 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these 462 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns. 463 :param ignore_hidden: Whether to ignore hidden files and directories. Default is ``True``. 464 :param commit_metadata: When ``True`` (default), calls :py:meth:`StorageClient.commit_metadata` after sync completes. 465 Set to ``False`` to skip the commit, allowing batching of multiple sync operations before committing manually. 466 :raises ValueError: If both source_files and patterns are provided. 467 :raises NotImplementedError: If sync operations are not supported (e.g., CompositeStorageClient as target). 468 """ 469 return self._delegate.sync_from( 470 source_client, 471 source_path, 472 target_path, 473 delete_unmatched_files, 474 description, 475 num_worker_processes, 476 execution_mode, 477 patterns, 478 preserve_source_attributes, 479 follow_symlinks, 480 source_files, 481 ignore_hidden, 482 commit_metadata, 483 )
484
[docs] 485 def sync_replicas( 486 self, 487 source_path: str, 488 replica_indices: Optional[list[int]] = None, 489 delete_unmatched_files: bool = False, 490 description: str = "Syncing replica", 491 num_worker_processes: Optional[int] = None, 492 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 493 patterns: Optional[PatternList] = None, 494 ignore_hidden: bool = True, 495 ) -> None: 496 """ 497 Sync files from this client to its replica storage clients. 498 499 :param source_path: The logical path to sync from. 500 :param replica_indices: Specific replica indices to sync to (0-indexed). If None, syncs to all replicas. 501 :param delete_unmatched_files: When set to ``True``, delete files in replicas that don't exist in source. 502 :param description: Description of sync process for logging purposes. 503 :param num_worker_processes: Number of worker processes for parallel sync. 504 :param execution_mode: Execution mode (LOCAL or REMOTE). 505 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 506 :param ignore_hidden: When set to ``True``, ignore hidden files (starting with '.'). Defaults to ``True``. 507 """ 508 return self._delegate.sync_replicas( 509 source_path, 510 replica_indices, 511 delete_unmatched_files, 512 description, 513 num_worker_processes, 514 execution_mode, 515 patterns, 516 ignore_hidden, 517 )
518
[docs] 519 def list( 520 self, 521 prefix: str = "", 522 path: str = "", 523 start_after: Optional[str] = None, 524 end_at: Optional[str] = None, 525 include_directories: bool = False, 526 include_url_prefix: bool = False, 527 attribute_filter_expression: Optional[str] = None, 528 show_attributes: bool = False, 529 follow_symlinks: bool = True, 530 patterns: Optional[PatternList] = None, 531 ) -> Iterator[ObjectMetadata]: 532 """ 533 List objects in the storage provider under the specified path. 534 535 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is 536 deprecated and will be removed in a future version. 537 538 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under. 539 :param path: The directory or file path to list objects under. This should be a 540 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 541 Cannot be used together with ``prefix``. 542 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 543 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 544 :param include_directories: Whether to include directories in the result. When ``True``, directories are returned alongside objects. 545 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 546 :param attribute_filter_expression: The attribute filter expression to apply to the result. 547 :param show_attributes: Whether to return attributes in the result. WARNING: Depending on implementation, there may be a performance impact if this is set to ``True``. 548 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing. 549 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 550 :return: An iterator over ObjectMetadata for matching objects. 551 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty). 552 """ 553 return self._delegate.list( 554 prefix, 555 path, 556 start_after, 557 end_at, 558 include_directories, 559 include_url_prefix, 560 attribute_filter_expression, 561 show_attributes, 562 follow_symlinks, 563 patterns, 564 )
565 566 def __getstate__(self) -> dict[str, Any]: 567 """Support for pickling (forward to delegate).""" 568 return self._delegate.__getstate__() 569 570 def __setstate__(self, state: dict[str, Any]) -> None: 571 """Support for unpickling - reconstruct the delegate.""" 572 config = state["_config"] 573 574 if config.storage_provider_profiles: 575 self._delegate = CompositeStorageClient.__new__(CompositeStorageClient) 576 self._delegate.__setstate__(state) 577 else: 578 self._delegate = SingleStorageClient.__new__(SingleStorageClient) 579 self._delegate.__setstate__(state)