Source code for multistorageclient.client.client

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import logging
 17from collections.abc import Iterator
 18from typing import IO, Any, List, Optional, Union
 19
 20from ..config import StorageClientConfig
 21from ..constants import MEMORY_LOAD_LIMIT
 22from ..file import ObjectFile, PosixFile
 23from ..types import (
 24    ExecutionMode,
 25    MetadataProvider,
 26    ObjectMetadata,
 27    PatternList,
 28    Range,
 29    SourceVersionCheckMode,
 30    StorageProvider,
 31    SyncResult,
 32)
 33from .composite import CompositeStorageClient
 34from .single import SingleStorageClient
 35from .types import AbstractStorageClient
 36
 37logger = logging.getLogger(__name__)
 38
 39
[docs] 40class StorageClient(AbstractStorageClient): 41 """ 42 Unified storage client facade. 43 44 Automatically delegates to: 45 - SingleStorageClient: For single-backend configurations (full read/write) 46 - CompositeStorageClient: For multi-backend configurations (read-only) 47 """ 48 49 _delegate: Union[SingleStorageClient, CompositeStorageClient] 50 51 def __init__(self, config: StorageClientConfig): 52 if config.storage_provider_profiles: 53 self._delegate = CompositeStorageClient(config) 54 logger.debug(f"StorageClient '{config.profile}' using CompositeStorageClient (read-only)") 55 else: 56 self._delegate = SingleStorageClient(config) 57 logger.debug(f"StorageClient '{config.profile}' using SingleStorageClient") 58 59 @property 60 def delegate(self) -> Union[SingleStorageClient, CompositeStorageClient]: 61 """ 62 Access to underlying delegate storage client. 63 64 :return: SingleStorageClient or CompositeStorageClient. 65 """ 66 return self._delegate 67 68 @property 69 def _config(self) -> StorageClientConfig: 70 """ 71 :return: The configuration for the underlying storage client. 72 """ 73 return self._delegate._config 74 75 @property 76 def _storage_provider(self) -> Optional[StorageProvider]: 77 """ 78 :return: The storage provider for the underlying storage client. None for CompositeStorageClient. 79 """ 80 return self._delegate._storage_provider 81 82 @_storage_provider.setter 83 def _storage_provider(self, value: StorageProvider) -> None: 84 """Allow mutation of storage provider for testing purposes.""" 85 if isinstance(self._delegate, SingleStorageClient): 86 self._delegate._storage_provider = value 87 88 @property 89 def _metadata_provider(self) -> Optional[MetadataProvider]: 90 """ 91 :return: The metadata provider for the underlying storage client. 92 """ 93 return self._delegate._metadata_provider 94 95 @_metadata_provider.setter 96 def _metadata_provider(self, value: Optional[MetadataProvider]) -> None: 97 """Allow mutation of metadata provider for DSS compatibility.""" 98 if isinstance(self._delegate, CompositeStorageClient) and value is None: 99 raise ValueError("CompositeStorageClient requires a metadata_provider for routing decisions.") 100 self._delegate._metadata_provider = value # type: ignore[assignment] 101 102 @property 103 def _metadata_provider_lock(self): 104 """ 105 Access to metadata provider lock for DSS compatibility. 106 107 :return: The lock for the metadata provider. 108 """ 109 return self._delegate._metadata_provider_lock 110 111 @_metadata_provider_lock.setter 112 def _metadata_provider_lock(self, value): 113 """Allow mutation of metadata provider lock for DSS compatibility.""" 114 self._delegate._metadata_provider_lock = value 115 116 @property 117 def _credentials_provider(self): 118 """ 119 :return: The credentials provider for the underlying storage client. 120 """ 121 return self._delegate._credentials_provider 122 123 @property 124 def _retry_config(self): 125 """ 126 :return: The retry configuration for the underlying storage client. 127 """ 128 return self._delegate._retry_config 129 130 @property 131 def _cache_manager(self): 132 """ 133 :return: The cache manager for the underlying storage client. 134 """ 135 return self._delegate._cache_manager 136 137 @property 138 def _replica_manager(self): 139 """ 140 :return: The replica manager for the underlying storage client. 141 """ 142 return self._delegate._replica_manager 143 144 @_replica_manager.setter 145 def _replica_manager(self, value): 146 """ 147 Allow mutation of replica manager for testing purposes. 148 149 :param value: The new replica manager. 150 """ 151 if isinstance(self._delegate, SingleStorageClient): 152 self._delegate._replica_manager = value 153 154 @property 155 def profile(self) -> str: 156 """ 157 :return: The profile name of the storage client. 158 """ 159 return self._delegate.profile 160 161 @property 162 def replicas(self) -> List[AbstractStorageClient]: 163 """ 164 :return: List of replica storage clients, sorted by read priority. 165 """ 166 return self._delegate.replicas 167
[docs] 168 def is_default_profile(self) -> bool: 169 """ 170 :return: ``True`` if the storage client is using the default profile, ``False`` otherwise. 171 """ 172 return self._delegate.is_default_profile()
173 174 def _is_rust_client_enabled(self) -> bool: 175 """ 176 :return: ``True`` if the storage provider is using the Rust client, ``False`` otherwise. 177 """ 178 return self._delegate._is_rust_client_enabled() 179 180 def _is_posix_file_storage_provider(self) -> bool: 181 """ 182 :return: ``True`` if the storage client is using a POSIX file storage provider, ``False`` otherwise. 183 """ 184 return self._delegate._is_posix_file_storage_provider() 185
[docs] 186 def get_posix_path(self, path: str) -> Optional[str]: 187 """ 188 Returns the physical POSIX filesystem path for POSIX storage providers. 189 190 :param path: The path to resolve (may be a symlink or virtual path). 191 :return: Physical POSIX filesystem path if POSIX storage, None otherwise. 192 """ 193 return self._delegate.get_posix_path(path)
194
[docs] 195 def read( 196 self, 197 path: str, 198 byte_range: Optional[Range] = None, 199 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 200 ) -> bytes: 201 """ 202 Read bytes from a file at the specified logical path. 203 204 :param path: The logical path of the object to read. 205 :param byte_range: Optional byte range to read (offset and length). 206 :param check_source_version: Whether to check the source version of cached objects. 207 :return: The content of the object as bytes. 208 :raises FileNotFoundError: If the file at the specified path does not exist. 209 """ 210 return self._delegate.read(path, byte_range, check_source_version)
211
[docs] 212 def open( 213 self, 214 path: str, 215 mode: str = "rb", 216 buffering: int = -1, 217 encoding: Optional[str] = None, 218 disable_read_cache: bool = False, 219 memory_load_limit: int = MEMORY_LOAD_LIMIT, 220 atomic: bool = True, 221 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 222 attributes: Optional[dict[str, str]] = None, 223 prefetch_file: bool = True, 224 ) -> Union[PosixFile, ObjectFile]: 225 """ 226 Open a file for reading or writing. 227 228 :param path: The logical path of the object to open. 229 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab". 230 :param buffering: The buffering mode. Only applies to PosixFile. 231 :param encoding: The encoding to use for text files. 232 :param disable_read_cache: When set to ``True``, disables caching for file content. 233 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 234 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 235 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB. 236 :param atomic: When set to ``True``, file will be written atomically (rename upon close). 237 This parameter is only applicable to PosixFile in write mode. 238 :param check_source_version: Whether to check the source version of cached objects. 239 :param attributes: Attributes to add to the file. 240 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None. 241 :param prefetch_file: Whether to prefetch the file content. 242 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True. 243 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 244 :raises FileNotFoundError: If the file does not exist (read mode). 245 :raises NotImplementedError: If the operation is not supported (e.g., write on CompositeStorageClient). 246 """ 247 return self._delegate.open( 248 path, 249 mode, 250 buffering, 251 encoding, 252 disable_read_cache, 253 memory_load_limit, 254 atomic, 255 check_source_version, 256 attributes, 257 prefetch_file, 258 )
259
[docs] 260 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 261 """ 262 Download a remote file to a local path or file-like object. 263 264 :param remote_path: The logical path of the remote file to download. 265 :param local_path: The local file path or file-like object to write to. 266 :raises FileNotFoundError: If the remote file does not exist. 267 """ 268 return self._delegate.download_file(remote_path, local_path)
269
[docs] 270 def glob( 271 self, 272 pattern: str, 273 include_url_prefix: bool = False, 274 attribute_filter_expression: Optional[str] = None, 275 ) -> List[str]: 276 """ 277 Matches and retrieves a list of object keys in the storage provider that match the specified pattern. 278 279 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``). 280 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 281 :param attribute_filter_expression: The attribute filter expression to apply to the result. 282 :return: A list of object paths that match the specified pattern. 283 """ 284 return self._delegate.glob(pattern, include_url_prefix, attribute_filter_expression)
285
[docs] 286 def list( 287 self, 288 prefix: str = "", 289 path: str = "", 290 start_after: Optional[str] = None, 291 end_at: Optional[str] = None, 292 include_directories: bool = False, 293 include_url_prefix: bool = False, 294 attribute_filter_expression: Optional[str] = None, 295 show_attributes: bool = False, 296 follow_symlinks: bool = True, 297 patterns: Optional[PatternList] = None, 298 ) -> Iterator[ObjectMetadata]: 299 """ 300 List objects in the storage provider under the specified path. 301 302 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is 303 deprecated and will be removed in a future version. 304 305 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under. 306 :param path: The directory or file path to list objects under. This should be a 307 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/"). 308 Cannot be used together with ``prefix``. 309 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 310 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 311 :param include_directories: Whether to include directories in the result. when ``True``, directories are returned alongside objects. 312 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 313 :param attribute_filter_expression: The attribute filter expression to apply to the result. 314 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to ``True``. 315 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing. 316 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 317 :return: An iterator over ObjectMetadata for matching objects. 318 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty). 319 """ 320 return self._delegate.list( 321 prefix, 322 path, 323 start_after, 324 end_at, 325 include_directories, 326 include_url_prefix, 327 attribute_filter_expression, 328 show_attributes, 329 follow_symlinks, 330 patterns, 331 )
332
[docs] 333 def is_file(self, path: str) -> bool: 334 """ 335 Checks whether the specified path points to a file (rather than a folder or directory). 336 337 :param path: The logical path to check. 338 :return: ``True`` if the key points to a file, ``False`` otherwise. 339 """ 340 return self._delegate.is_file(path)
341
[docs] 342 def is_empty(self, path: str) -> bool: 343 """ 344 Check whether the specified path is empty. A path is considered empty if there are no 345 objects whose keys start with the given path as a prefix. 346 347 :param path: The logical path to check (typically a directory or folder prefix). 348 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 349 """ 350 return self._delegate.is_empty(path)
351
[docs] 352 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 353 """ 354 Get metadata for a file at the specified path. 355 356 :param path: The logical path of the object. 357 :param strict: When ``True``, only return committed metadata. When ``False``, include pending changes. 358 :return: ObjectMetadata containing file information (size, last modified, etc.). 359 :raises FileNotFoundError: If the file at the specified path does not exist. 360 """ 361 return self._delegate.info(path, strict)
362
[docs] 363 def write( 364 self, 365 path: str, 366 body: bytes, 367 attributes: Optional[dict[str, str]] = None, 368 ) -> None: 369 """ 370 Write bytes to a file at the specified path. 371 372 :param path: The logical path where the object will be written. 373 :param body: The content to write as bytes. 374 :param attributes: Optional attributes to add to the file. 375 :raises NotImplementedError: If write operations are not supported (e.g., CompositeStorageClient). 376 """ 377 return self._delegate.write(path, body, attributes)
378
[docs] 379 def delete(self, path: str, recursive: bool = False) -> None: 380 """ 381 Delete a file or directory at the specified path. 382 383 :param path: The logical path of the object to delete. 384 :param recursive: When True, delete directory and all its contents recursively. 385 :raises FileNotFoundError: If the file or directory does not exist. 386 :raises NotImplementedError: If delete operations are not supported (e.g., CompositeStorageClient). 387 """ 388 return self._delegate.delete(path, recursive)
389
[docs] 390 def copy(self, src_path: str, dest_path: str) -> None: 391 """ 392 Copy a file from source path to destination path. 393 394 :param src_path: The logical path of the source object. 395 :param dest_path: The logical path where the object will be copied to. 396 :raises FileNotFoundError: If the source file does not exist. 397 :raises NotImplementedError: If copy operations are not supported (e.g., CompositeStorageClient). 398 """ 399 return self._delegate.copy(src_path, dest_path)
400
[docs] 401 def upload_file( 402 self, 403 remote_path: str, 404 local_path: Union[str, IO], 405 attributes: Optional[dict[str, str]] = None, 406 ) -> None: 407 """ 408 Upload a local file to remote storage. 409 410 :param remote_path: The logical path where the file will be uploaded. 411 :param local_path: The local file path or file-like object to upload. 412 :param attributes: Optional attributes to add to the file. 413 :raises FileNotFoundError: If the local file does not exist. 414 :raises NotImplementedError: If upload operations are not supported (e.g., CompositeStorageClient). 415 """ 416 return self._delegate.upload_file(remote_path, local_path, attributes)
417
[docs] 418 def commit_metadata(self, prefix: Optional[str] = None) -> None: 419 """ 420 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 421 422 :param prefix: If provided, scans the prefix to find files to commit. 423 """ 424 return self._delegate.commit_metadata(prefix)
425
[docs] 426 def sync_from( 427 self, 428 source_client: AbstractStorageClient, 429 source_path: str = "", 430 target_path: str = "", 431 delete_unmatched_files: bool = False, 432 description: str = "Syncing", 433 num_worker_processes: Optional[int] = None, 434 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 435 patterns: Optional[PatternList] = None, 436 preserve_source_attributes: bool = False, 437 follow_symlinks: bool = True, 438 source_files: Optional[List[str]] = None, 439 ignore_hidden: bool = True, 440 commit_metadata: bool = True, 441 ) -> SyncResult: 442 """ 443 Syncs files from the source storage client to "path/". 444 445 :param source_client: The source storage client. 446 :param source_path: The logical path to sync from. 447 :param target_path: The logical path to sync to. 448 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 449 :param description: Description of sync process for logging purposes. 450 :param num_worker_processes: The number of worker processes to use. 451 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 452 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 453 Cannot be used together with source_files. 454 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization. 455 When ``False`` (default), only file content is copied. When ``True``, custom metadata attributes are also preserved. 456 457 .. warning:: 458 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD 459 request for each object to retrieve attributes, which can significantly impact performance on large-scale 460 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile. 461 462 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is ``True``. 463 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these 464 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns. 465 :param ignore_hidden: Whether to ignore hidden files and directories. Default is ``True``. 466 :param commit_metadata: When ``True`` (default), calls :py:meth:`StorageClient.commit_metadata` after sync completes. 467 Set to ``False`` to skip the commit, allowing batching of multiple sync operations before committing manually. 468 :raises ValueError: If both source_files and patterns are provided. 469 :raises NotImplementedError: If sync operations are not supported (e.g., CompositeStorageClient as target). 470 """ 471 return self._delegate.sync_from( 472 source_client, 473 source_path, 474 target_path, 475 delete_unmatched_files, 476 description, 477 num_worker_processes, 478 execution_mode, 479 patterns, 480 preserve_source_attributes, 481 follow_symlinks, 482 source_files, 483 ignore_hidden, 484 commit_metadata, 485 )
486
[docs] 487 def sync_replicas( 488 self, 489 source_path: str, 490 replica_indices: Optional[List[int]] = None, 491 delete_unmatched_files: bool = False, 492 description: str = "Syncing replica", 493 num_worker_processes: Optional[int] = None, 494 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 495 patterns: Optional[PatternList] = None, 496 ignore_hidden: bool = True, 497 ) -> None: 498 """ 499 Sync files from this client to its replica storage clients. 500 501 :param source_path: The logical path to sync from. 502 :param replica_indices: Specific replica indices to sync to (0-indexed). If None, syncs to all replicas. 503 :param delete_unmatched_files: When set to ``True``, delete files in replicas that don't exist in source. 504 :param description: Description of sync process for logging purposes. 505 :param num_worker_processes: Number of worker processes for parallel sync. 506 :param execution_mode: Execution mode (LOCAL or REMOTE). 507 :param patterns: PatternList for include/exclude filtering. If None, all files are included. 508 :param ignore_hidden: When set to ``True``, ignore hidden files (starting with '.'). Defaults to ``True``. 509 """ 510 return self._delegate.sync_replicas( 511 source_path, 512 replica_indices, 513 delete_unmatched_files, 514 description, 515 num_worker_processes, 516 execution_mode, 517 patterns, 518 ignore_hidden, 519 )
520 521 def __getstate__(self) -> dict[str, Any]: 522 """Support for pickling (forward to delegate).""" 523 return self._delegate.__getstate__() 524 525 def __setstate__(self, state: dict[str, Any]) -> None: 526 """Support for unpickling - reconstruct the delegate.""" 527 config = state["_config"] 528 529 if config.storage_provider_profiles: 530 self._delegate = CompositeStorageClient.__new__(CompositeStorageClient) 531 self._delegate.__setstate__(state) 532 else: 533 self._delegate = SingleStorageClient.__new__(SingleStorageClient) 534 self._delegate.__setstate__(state)