Source code for multistorageclient.client.composite

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import logging
 17from collections.abc import Iterator, Sequence
 18from typing import IO, Any, Optional, Union
 19
 20from ..config import StorageClientConfig
 21from ..constants import MEMORY_LOAD_LIMIT
 22from ..file import ObjectFile, PosixFile
 23from ..types import (
 24    MSC_PROTOCOL,
 25    ExecutionMode,
 26    MetadataProvider,
 27    ObjectMetadata,
 28    PatternList,
 29    Range,
 30    SignerType,
 31    SourceVersionCheckMode,
 32    SymlinkHandling,
 33    SyncResult,
 34)
 35from ..utils import PatternMatcher, join_paths
 36from .single import SingleStorageClient
 37from .types import AbstractStorageClient
 38
 39logger = logging.getLogger(__name__)
 40
 41
[docs] 42class CompositeStorageClient(AbstractStorageClient): 43 """ 44 READ-ONLY storage client for multi-backend configurations. 45 46 Routes read operations to child SingleStorageClient instances based on 47 metadata provider's routing information (ResolvedPath.profile). 48 49 Write operations raise NotImplementedError with clear error messages. 50 """ 51 52 _metadata_provider: MetadataProvider 53 54 def __init__(self, config: StorageClientConfig): 55 """ 56 Initialize a composite storage client. 57 58 :param config: Storage client configuration with storage_provider_profiles set 59 :raises ValueError: If config doesn't have storage_provider_profiles 60 """ 61 if not config.storage_provider_profiles: 62 raise ValueError( 63 "CompositeStorageClient requires storage_provider_profiles. " 64 "Use SingleStorageClient for single-backend configurations." 65 ) 66 67 if config.metadata_provider is None: 68 raise ValueError("CompositeStorageClient requires a metadata_provider for routing decisions.") 69 70 self._config = config 71 self._profile = config.profile 72 self._metadata_provider = config.metadata_provider 73 self._metadata_provider_lock = None 74 self._storage_provider = config.storage_provider 75 self._credentials_provider = config.credentials_provider 76 self._retry_config = config.retry_config 77 self._cache_manager = config.cache_manager 78 self._replica_manager = None 79 80 self._child_clients: dict[str, SingleStorageClient] = {} 81 self._child_profile_names = config.storage_provider_profiles 82 83 if config.child_configs: 84 # ProviderBundleV2 path: child configs are pre-built 85 for child_name, child_config in config.child_configs.items(): 86 self._child_clients[child_name] = SingleStorageClient(child_config) 87 else: 88 # Config-dict path: child profiles have their own credentials defined 89 if not config._config_dict: 90 raise ValueError("CompositeStorageClient requires _config_dict to build child clients") 91 92 for child_profile in self._child_profile_names: 93 child_config = StorageClientConfig.from_dict( 94 config_dict=config._config_dict, 95 profile=child_profile, 96 ) 97 self._child_clients[child_profile] = SingleStorageClient(child_config) 98 99 @property 100 def profile(self) -> str: 101 """ 102 :return: The profile name of the storage client. 103 """ 104 return self._profile 105 106 @property 107 def replicas(self) -> list[AbstractStorageClient]: 108 """ 109 :return: List of replica storage clients (empty list for CompositeStorageClient). 110 """ 111 return [] 112
[docs] 113 def is_default_profile(self) -> bool: 114 return self._config.profile == "__filesystem__"
115 116 def _is_rust_client_enabled(self) -> bool: 117 """ 118 Check if Rust client is enabled for all child storage clients. 119 120 When all child backends are Rust-enabled, MSC can use single-process 121 multi-threaded mode instead of multi-process mode, as Rust handles 122 parallelism internally via async I/O without Python GIL contention. 123 124 :return: True if all child clients are Rust-enabled, False otherwise. 125 """ 126 return all(client._is_rust_client_enabled() for client in self._child_clients.values()) 127 128 def _is_posix_file_storage_provider(self) -> bool: 129 """ 130 Check if using POSIX file storage provider. 131 132 :return: False - composite client doesn't have a single storage provider. 133 """ 134 return False 135
[docs] 136 def get_posix_path(self, path: str) -> Optional[str]: 137 """ 138 Get the POSIX filesystem path for a given logical path. 139 140 :param path: The logical path to resolve. 141 :return: None - composite client doesn't support POSIX path resolution. 142 """ 143 return None
144 145 def _get_child_client(self, profile: Optional[str]) -> SingleStorageClient: 146 """ 147 Get the child client for the specified profile. 148 149 :param profile: Profile name from ResolvedPath 150 :return: Child SingleStorageClient instance 151 :raises ValueError: If profile is None or not found 152 """ 153 if profile is None: 154 raise ValueError( 155 "CompositeStorageClient requires profile from ResolvedPath for routing. " 156 "Metadata provider must return ResolvedPath with profile set." 157 ) 158 159 if profile not in self._child_clients: 160 raise ValueError( 161 f"Profile '{profile}' not found in composite client. " 162 f"Available profiles: {list(self._child_clients.keys())}" 163 ) 164 165 return self._child_clients[profile] 166
[docs] 167 def read( 168 self, 169 path: str, 170 byte_range: Optional[Range] = None, 171 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 172 ) -> bytes: 173 resolved = self._metadata_provider.realpath(path) 174 if not resolved.exists: 175 raise FileNotFoundError(f"Path '{path}' not found") 176 177 child = self._get_child_client(resolved.profile) 178 return child.read(resolved.physical_path, byte_range, check_source_version)
179
[docs] 180 def open( 181 self, 182 path: str, 183 mode: str = "rb", 184 buffering: int = -1, 185 encoding: Optional[str] = None, 186 disable_read_cache: bool = False, 187 memory_load_limit: int = MEMORY_LOAD_LIMIT, 188 atomic: bool = True, 189 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 190 attributes: Optional[dict[str, Any]] = None, 191 prefetch_file: Optional[bool] = None, 192 ) -> Union[PosixFile, ObjectFile]: 193 """ 194 Open a file for reading or writing. 195 196 :param path: The logical path of the object to open. 197 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab". 198 :param buffering: The buffering mode. Only applies to PosixFile. 199 :param encoding: The encoding to use for text files. 200 :param disable_read_cache: When set to ``True``, disables caching for file content. 201 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 202 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 203 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB. 204 :param atomic: When set to ``True``, file will be written atomically (rename upon close). 205 This parameter is only applicable to PosixFile in write mode. 206 :param check_source_version: Whether to check the source version of cached objects. 207 :param attributes: Attributes to add to the file. 208 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None. 209 :param prefetch_file: Whether to prefetch the file content. 210 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 211 If None, inherits from cache configuration. 212 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 213 :raises FileNotFoundError: If the file does not exist (read mode). 214 :raises NotImplementedError: If the operation is not supported (e.g., write on CompositeStorageClient). 215 """ 216 if mode not in ["r", "rb"]: 217 raise NotImplementedError( 218 f"CompositeStorageClient only supports read mode (got '{mode}'). " 219 "Write operations are not implemented for multi-location datasets." 220 ) 221 222 resolved = self._metadata_provider.realpath(path) 223 if not resolved.exists: 224 raise FileNotFoundError(f"Path '{path}' not found") 225 226 child = self._get_child_client(resolved.profile) 227 return child.open( 228 resolved.physical_path, 229 mode, 230 buffering, 231 encoding, 232 disable_read_cache, 233 memory_load_limit, 234 atomic, 235 check_source_version, 236 attributes, 237 prefetch_file, 238 )
239
[docs] 240 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 241 resolved = self._metadata_provider.realpath(remote_path) 242 if not resolved.exists: 243 raise FileNotFoundError(f"Path '{remote_path}' not found") 244 245 child = self._get_child_client(resolved.profile) 246 child.download_file(resolved.physical_path, local_path)
247
[docs] 248 def download_files( 249 self, 250 remote_paths: list[str], 251 local_paths: list[str], 252 metadata: Optional[Sequence[Optional[ObjectMetadata]]] = None, 253 max_workers: int = 16, 254 ) -> None: 255 if len(remote_paths) != len(local_paths): 256 raise ValueError("remote_paths and local_paths must have the same length") 257 258 groups: dict[str, tuple[list[str], list[str], list[Optional[ObjectMetadata]]]] = {} 259 for i, (remote_path, local_path) in enumerate(zip(remote_paths, local_paths)): 260 resolved = self._metadata_provider.realpath(remote_path) 261 if not resolved.exists: 262 raise FileNotFoundError(f"Path '{remote_path}' not found") 263 264 profile = resolved.profile 265 if profile is None: 266 raise ValueError( 267 "CompositeStorageClient requires profile from ResolvedPath for routing. " 268 "Metadata provider must return ResolvedPath with profile set." 269 ) 270 if profile not in groups: 271 groups[profile] = ([], [], []) 272 groups[profile][0].append(resolved.physical_path) 273 groups[profile][1].append(local_path) 274 groups[profile][2].append(metadata[i] if metadata is not None else None) 275 276 for profile, (physical_paths, group_local_paths, group_metadata) in groups.items(): 277 child = self._get_child_client(profile) 278 has_any_metadata = any(m is not None for m in group_metadata) 279 child.download_files( 280 physical_paths, group_local_paths, group_metadata if has_any_metadata else None, max_workers 281 )
282
[docs] 283 def glob( 284 self, 285 pattern: str, 286 include_url_prefix: bool = False, 287 attribute_filter_expression: Optional[str] = None, 288 ) -> list[str]: 289 results = self._metadata_provider.glob( 290 pattern, 291 attribute_filter_expression=attribute_filter_expression, 292 ) 293 294 if include_url_prefix: 295 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 296 297 return results
298
[docs] 299 def list_recursive( 300 self, 301 path: str = "", 302 start_after: Optional[str] = None, 303 end_at: Optional[str] = None, 304 max_workers: int = 32, 305 look_ahead: int = 2, 306 include_url_prefix: bool = False, 307 follow_symlinks: Optional[bool] = None, 308 patterns: Optional[PatternList] = None, 309 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW, 310 ) -> Iterator[ObjectMetadata]: 311 yield from self.list( 312 path=path, 313 start_after=start_after, 314 end_at=end_at, 315 include_directories=False, 316 include_url_prefix=include_url_prefix, 317 follow_symlinks=follow_symlinks, 318 patterns=patterns, 319 symlink_handling=symlink_handling, 320 )
321
[docs] 322 def is_file(self, path: str) -> bool: 323 resolved = self._metadata_provider.realpath(path) 324 return resolved.exists
325
[docs] 326 def is_empty(self, path: str) -> bool: 327 objects = self._metadata_provider.list_objects(path) 328 329 try: 330 return next(objects) is None 331 except StopIteration: 332 pass 333 334 return True
335
[docs] 336 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 337 return self._metadata_provider.get_object_metadata(path, include_pending=not strict)
338
[docs] 339 def write( 340 self, 341 path: str, 342 body: bytes, 343 attributes: Optional[dict[str, Any]] = None, 344 ) -> None: 345 """Write operations not supported in read-only mode.""" 346 raise NotImplementedError( 347 "CompositeStorageClient is read-only. " 348 "Write operations are not implemented for multi-location datasets. " 349 "Use a single-location dataset for write operations." 350 )
351
[docs] 352 def delete(self, path: str, recursive: bool = False) -> None: 353 """Delete operations not supported in read-only mode.""" 354 raise NotImplementedError( 355 "CompositeStorageClient is read-only. Delete operations are not implemented for multi-location datasets." 356 )
357
[docs] 358 def delete_many(self, paths: list[str]) -> None: 359 """Delete operations not supported in read-only mode.""" 360 raise NotImplementedError( 361 "CompositeStorageClient is read-only. Delete operations are not implemented for multi-location datasets." 362 )
363
[docs] 364 def copy(self, src_path: str, dest_path: str) -> None: 365 """Copy operations not supported in read-only mode.""" 366 raise NotImplementedError( 367 "CompositeStorageClient is read-only. Copy operations are not implemented for multi-location datasets." 368 )
369 375
[docs] 376 def upload_file( 377 self, 378 remote_path: str, 379 local_path: Union[str, IO], 380 attributes: Optional[dict[str, Any]] = None, 381 ) -> None: 382 """Upload operations not supported in read-only mode.""" 383 raise NotImplementedError( 384 "CompositeStorageClient is read-only. Upload operations are not implemented for multi-location datasets." 385 )
386
[docs] 387 def upload_files( 388 self, 389 remote_paths: list[str], 390 local_paths: list[str], 391 attributes: Optional[Sequence[Optional[dict[str, Any]]]] = None, 392 max_workers: int = 16, 393 ) -> None: 394 """Upload operations not supported in read-only mode.""" 395 raise NotImplementedError( 396 "CompositeStorageClient is read-only. Upload operations are not implemented for multi-location datasets." 397 )
398
[docs] 399 def commit_metadata(self, prefix: Optional[str] = None) -> None: 400 """No-op for read-only client.""" 401 pass
402
[docs] 403 def sync_from( 404 self, 405 source_client: AbstractStorageClient, 406 source_path: str = "", 407 target_path: str = "", 408 delete_unmatched_files: bool = False, 409 description: str = "Syncing", 410 num_worker_processes: Optional[int] = None, 411 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 412 patterns: Optional[PatternList] = None, 413 preserve_source_attributes: bool = False, 414 follow_symlinks: Optional[bool] = None, 415 source_files: Optional[list[str]] = None, 416 ignore_hidden: bool = True, 417 commit_metadata: bool = True, 418 dryrun: bool = False, 419 dryrun_output_path: Optional[str] = None, 420 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW, 421 ) -> SyncResult: 422 raise NotImplementedError( 423 "CompositeStorageClient cannot be used as sync target (write operation). " 424 "Use CompositeStorageClient as source only, or use a single-location dataset as target." 425 )
426
[docs] 427 def sync_replicas( 428 self, 429 source_path: str, 430 replica_indices: Optional[list[int]] = None, 431 delete_unmatched_files: bool = False, 432 description: str = "Syncing replica", 433 num_worker_processes: Optional[int] = None, 434 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 435 patterns: Optional[PatternList] = None, 436 ignore_hidden: bool = True, 437 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW, 438 ) -> None: 439 """No-op for read-only client.""" 440 pass
441
[docs] 442 def list( 443 self, 444 prefix: str = "", 445 path: str = "", 446 start_after: Optional[str] = None, 447 end_at: Optional[str] = None, 448 include_directories: bool = False, 449 include_url_prefix: bool = False, 450 attribute_filter_expression: Optional[str] = None, 451 show_attributes: bool = False, 452 follow_symlinks: Optional[bool] = None, 453 patterns: Optional[PatternList] = None, 454 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW, 455 ) -> Iterator[ObjectMetadata]: 456 # Parameter validation - either path or prefix, not both 457 if path and prefix: 458 raise ValueError( 459 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 460 f"Please use only the 'path' parameter for new code. " 461 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 462 ) 463 464 # Use path if provided, otherwise fall back to prefix 465 effective_path = path if path else prefix 466 467 # Apply patterns to the objects 468 pattern_matcher = PatternMatcher(patterns) if patterns else None 469 470 # Delegate to metadata provider (always present for CompositeStorageClient) 471 for obj in self._metadata_provider.list_objects( 472 effective_path, 473 start_after=start_after, 474 end_at=end_at, 475 include_directories=include_directories, 476 attribute_filter_expression=attribute_filter_expression, 477 show_attributes=show_attributes, 478 ): 479 # Skip objects that do not match the patterns 480 if pattern_matcher and not pattern_matcher.should_include_file(obj.key): 481 continue 482 483 if include_url_prefix: 484 obj.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", obj.key) 485 486 yield obj
487
[docs] 488 def generate_presigned_url( 489 self, 490 path: str, 491 *, 492 method: str = "GET", 493 signer_type: Optional[SignerType] = None, 494 signer_options: Optional[dict[str, Any]] = None, 495 ) -> str: 496 resolved = self._metadata_provider.realpath(path) 497 if not resolved.exists: 498 raise FileNotFoundError(f"Path '{path}' not found") 499 500 child = self._get_child_client(resolved.profile) 501 return child.generate_presigned_url( 502 resolved.physical_path, method=method, signer_type=signer_type, signer_options=signer_options 503 )
504 505 def __getstate__(self) -> dict[str, Any]: 506 """Support for pickling.""" 507 return {"_config": self._config} 508 509 def __setstate__(self, state: dict[str, Any]) -> None: 510 """Support for unpickling - reinitialize from config.""" 511 config = state["_config"] 512 self.__init__(config)