Source code for multistorageclient.client.composite

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import logging
 17from collections.abc import Iterator, Sequence
 18from typing import IO, Any, Optional, Union
 19
 20from ..config import StorageClientConfig
 21from ..constants import MEMORY_LOAD_LIMIT
 22from ..file import ObjectFile, PosixFile
 23from ..types import (
 24    MSC_PROTOCOL,
 25    ExecutionMode,
 26    MetadataProvider,
 27    ObjectMetadata,
 28    PatternList,
 29    Range,
 30    SignerType,
 31    SourceVersionCheckMode,
 32    SyncResult,
 33)
 34from ..utils import PatternMatcher, join_paths
 35from .single import SingleStorageClient
 36from .types import AbstractStorageClient
 37
 38logger = logging.getLogger(__name__)
 39
 40
[docs] 41class CompositeStorageClient(AbstractStorageClient): 42 """ 43 READ-ONLY storage client for multi-backend configurations. 44 45 Routes read operations to child SingleStorageClient instances based on 46 metadata provider's routing information (ResolvedPath.profile). 47 48 Write operations raise NotImplementedError with clear error messages. 49 """ 50 51 _metadata_provider: MetadataProvider 52 53 def __init__(self, config: StorageClientConfig): 54 """ 55 Initialize a composite storage client. 56 57 :param config: Storage client configuration with storage_provider_profiles set 58 :raises ValueError: If config doesn't have storage_provider_profiles 59 """ 60 if not config.storage_provider_profiles: 61 raise ValueError( 62 "CompositeStorageClient requires storage_provider_profiles. " 63 "Use SingleStorageClient for single-backend configurations." 64 ) 65 66 if config.metadata_provider is None: 67 raise ValueError("CompositeStorageClient requires a metadata_provider for routing decisions.") 68 69 self._config = config 70 self._profile = config.profile 71 self._metadata_provider = config.metadata_provider 72 self._metadata_provider_lock = None 73 self._storage_provider = config.storage_provider 74 self._credentials_provider = config.credentials_provider 75 self._retry_config = config.retry_config 76 self._cache_manager = config.cache_manager 77 self._replica_manager = None 78 79 self._child_clients: dict[str, SingleStorageClient] = {} 80 self._child_profile_names = config.storage_provider_profiles 81 82 if config.child_configs: 83 # ProviderBundleV2 path: child configs are pre-built 84 for child_name, child_config in config.child_configs.items(): 85 self._child_clients[child_name] = SingleStorageClient(child_config) 86 else: 87 # Config-dict path: child profiles have their own credentials defined 88 if not config._config_dict: 89 raise ValueError("CompositeStorageClient requires _config_dict to build child clients") 90 91 for child_profile in self._child_profile_names: 92 child_config = StorageClientConfig.from_dict( 93 config_dict=config._config_dict, 94 profile=child_profile, 95 ) 96 self._child_clients[child_profile] = SingleStorageClient(child_config) 97 98 @property 99 def profile(self) -> str: 100 """ 101 :return: The profile name of the storage client. 102 """ 103 return self._profile 104 105 @property 106 def replicas(self) -> list[AbstractStorageClient]: 107 """ 108 :return: List of replica storage clients (empty list for CompositeStorageClient). 109 """ 110 return [] 111
[docs] 112 def is_default_profile(self) -> bool: 113 return self._config.profile == "__filesystem__"
114 115 def _is_rust_client_enabled(self) -> bool: 116 """ 117 Check if Rust client is enabled for all child storage clients. 118 119 When all child backends are Rust-enabled, MSC can use single-process 120 multi-threaded mode instead of multi-process mode, as Rust handles 121 parallelism internally via async I/O without Python GIL contention. 122 123 :return: True if all child clients are Rust-enabled, False otherwise. 124 """ 125 return all(client._is_rust_client_enabled() for client in self._child_clients.values()) 126 127 def _is_posix_file_storage_provider(self) -> bool: 128 """ 129 Check if using POSIX file storage provider. 130 131 :return: False - composite client doesn't have a single storage provider. 132 """ 133 return False 134
[docs] 135 def get_posix_path(self, path: str) -> Optional[str]: 136 """ 137 Get the POSIX filesystem path for a given logical path. 138 139 :param path: The logical path to resolve. 140 :return: None - composite client doesn't support POSIX path resolution. 141 """ 142 return None
143 144 def _get_child_client(self, profile: Optional[str]) -> SingleStorageClient: 145 """ 146 Get the child client for the specified profile. 147 148 :param profile: Profile name from ResolvedPath 149 :return: Child SingleStorageClient instance 150 :raises ValueError: If profile is None or not found 151 """ 152 if profile is None: 153 raise ValueError( 154 "CompositeStorageClient requires profile from ResolvedPath for routing. " 155 "Metadata provider must return ResolvedPath with profile set." 156 ) 157 158 if profile not in self._child_clients: 159 raise ValueError( 160 f"Profile '{profile}' not found in composite client. " 161 f"Available profiles: {list(self._child_clients.keys())}" 162 ) 163 164 return self._child_clients[profile] 165
[docs] 166 def read( 167 self, 168 path: str, 169 byte_range: Optional[Range] = None, 170 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 171 ) -> bytes: 172 resolved = self._metadata_provider.realpath(path) 173 if not resolved.exists: 174 raise FileNotFoundError(f"Path '{path}' not found") 175 176 child = self._get_child_client(resolved.profile) 177 return child.read(resolved.physical_path, byte_range, check_source_version)
178
[docs] 179 def open( 180 self, 181 path: str, 182 mode: str = "rb", 183 buffering: int = -1, 184 encoding: Optional[str] = None, 185 disable_read_cache: bool = False, 186 memory_load_limit: int = MEMORY_LOAD_LIMIT, 187 atomic: bool = True, 188 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 189 attributes: Optional[dict[str, str]] = None, 190 prefetch_file: bool = True, 191 ) -> Union[PosixFile, ObjectFile]: 192 """ 193 Open a file for reading or writing. 194 195 :param path: The logical path of the object to open. 196 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab". 197 :param buffering: The buffering mode. Only applies to PosixFile. 198 :param encoding: The encoding to use for text files. 199 :param disable_read_cache: When set to ``True``, disables caching for file content. 200 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 201 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 202 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB. 203 :param atomic: When set to ``True``, file will be written atomically (rename upon close). 204 This parameter is only applicable to PosixFile in write mode. 205 :param check_source_version: Whether to check the source version of cached objects. 206 :param attributes: Attributes to add to the file. 207 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None. 208 :param prefetch_file: Whether to prefetch the file content. 209 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True. 210 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 211 :raises FileNotFoundError: If the file does not exist (read mode). 212 :raises NotImplementedError: If the operation is not supported (e.g., write on CompositeStorageClient). 213 """ 214 if mode not in ["r", "rb"]: 215 raise NotImplementedError( 216 f"CompositeStorageClient only supports read mode (got '{mode}'). " 217 "Write operations are not implemented for multi-location datasets." 218 ) 219 220 resolved = self._metadata_provider.realpath(path) 221 if not resolved.exists: 222 raise FileNotFoundError(f"Path '{path}' not found") 223 224 child = self._get_child_client(resolved.profile) 225 return child.open( 226 resolved.physical_path, 227 mode, 228 buffering, 229 encoding, 230 disable_read_cache, 231 memory_load_limit, 232 atomic, 233 check_source_version, 234 attributes, 235 prefetch_file, 236 )
237
[docs] 238 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 239 resolved = self._metadata_provider.realpath(remote_path) 240 if not resolved.exists: 241 raise FileNotFoundError(f"Path '{remote_path}' not found") 242 243 child = self._get_child_client(resolved.profile) 244 child.download_file(resolved.physical_path, local_path)
245
[docs] 246 def download_files( 247 self, 248 remote_paths: list[str], 249 local_paths: list[str], 250 metadata: Optional[Sequence[Optional[ObjectMetadata]]] = None, 251 max_workers: int = 16, 252 ) -> None: 253 if len(remote_paths) != len(local_paths): 254 raise ValueError("remote_paths and local_paths must have the same length") 255 256 groups: dict[str, tuple[list[str], list[str], list[Optional[ObjectMetadata]]]] = {} 257 for i, (remote_path, local_path) in enumerate(zip(remote_paths, local_paths)): 258 resolved = self._metadata_provider.realpath(remote_path) 259 if not resolved.exists: 260 raise FileNotFoundError(f"Path '{remote_path}' not found") 261 262 profile = resolved.profile 263 if profile is None: 264 raise ValueError( 265 "CompositeStorageClient requires profile from ResolvedPath for routing. " 266 "Metadata provider must return ResolvedPath with profile set." 267 ) 268 if profile not in groups: 269 groups[profile] = ([], [], []) 270 groups[profile][0].append(resolved.physical_path) 271 groups[profile][1].append(local_path) 272 groups[profile][2].append(metadata[i] if metadata is not None else None) 273 274 for profile, (physical_paths, group_local_paths, group_metadata) in groups.items(): 275 child = self._get_child_client(profile) 276 has_any_metadata = any(m is not None for m in group_metadata) 277 child.download_files( 278 physical_paths, group_local_paths, group_metadata if has_any_metadata else None, max_workers 279 )
280
[docs] 281 def glob( 282 self, 283 pattern: str, 284 include_url_prefix: bool = False, 285 attribute_filter_expression: Optional[str] = None, 286 ) -> list[str]: 287 results = self._metadata_provider.glob( 288 pattern, 289 attribute_filter_expression=attribute_filter_expression, 290 ) 291 292 if include_url_prefix: 293 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 294 295 return results
296
[docs] 297 def list_recursive( 298 self, 299 path: str = "", 300 start_after: Optional[str] = None, 301 end_at: Optional[str] = None, 302 max_workers: int = 32, 303 look_ahead: int = 2, 304 include_url_prefix: bool = False, 305 follow_symlinks: bool = True, 306 patterns: Optional[PatternList] = None, 307 ) -> Iterator[ObjectMetadata]: 308 309 yield from self.list( 310 path=path, 311 start_after=start_after, 312 end_at=end_at, 313 include_directories=False, 314 include_url_prefix=include_url_prefix, 315 follow_symlinks=follow_symlinks, 316 patterns=patterns, 317 )
318
[docs] 319 def is_file(self, path: str) -> bool: 320 resolved = self._metadata_provider.realpath(path) 321 return resolved.exists
322
[docs] 323 def is_empty(self, path: str) -> bool: 324 objects = self._metadata_provider.list_objects(path) 325 326 try: 327 return next(objects) is None 328 except StopIteration: 329 pass 330 331 return True
332
[docs] 333 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 334 return self._metadata_provider.get_object_metadata(path, include_pending=not strict)
335
[docs] 336 def write( 337 self, 338 path: str, 339 body: bytes, 340 attributes: Optional[dict[str, str]] = None, 341 ) -> None: 342 """Write operations not supported in read-only mode.""" 343 raise NotImplementedError( 344 "CompositeStorageClient is read-only. " 345 "Write operations are not implemented for multi-location datasets. " 346 "Use a single-location dataset for write operations." 347 )
348
[docs] 349 def delete(self, path: str, recursive: bool = False) -> None: 350 """Delete operations not supported in read-only mode.""" 351 raise NotImplementedError( 352 "CompositeStorageClient is read-only. Delete operations are not implemented for multi-location datasets." 353 )
354
[docs] 355 def delete_many(self, paths: list[str]) -> None: 356 """Delete operations not supported in read-only mode.""" 357 raise NotImplementedError( 358 "CompositeStorageClient is read-only. Delete operations are not implemented for multi-location datasets." 359 )
360
[docs] 361 def copy(self, src_path: str, dest_path: str) -> None: 362 """Copy operations not supported in read-only mode.""" 363 raise NotImplementedError( 364 "CompositeStorageClient is read-only. Copy operations are not implemented for multi-location datasets." 365 )
366
[docs] 367 def upload_file( 368 self, 369 remote_path: str, 370 local_path: Union[str, IO], 371 attributes: Optional[dict[str, str]] = None, 372 ) -> None: 373 """Upload operations not supported in read-only mode.""" 374 raise NotImplementedError( 375 "CompositeStorageClient is read-only. Upload operations are not implemented for multi-location datasets." 376 )
377
[docs] 378 def upload_files( 379 self, 380 remote_paths: list[str], 381 local_paths: list[str], 382 attributes: Optional[Sequence[Optional[dict[str, str]]]] = None, 383 max_workers: int = 16, 384 ) -> None: 385 """Upload operations not supported in read-only mode.""" 386 raise NotImplementedError( 387 "CompositeStorageClient is read-only. Upload operations are not implemented for multi-location datasets." 388 )
389
[docs] 390 def commit_metadata(self, prefix: Optional[str] = None) -> None: 391 """No-op for read-only client.""" 392 pass
393
[docs] 394 def sync_from( 395 self, 396 source_client: AbstractStorageClient, 397 source_path: str = "", 398 target_path: str = "", 399 delete_unmatched_files: bool = False, 400 description: str = "Syncing", 401 num_worker_processes: Optional[int] = None, 402 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 403 patterns: Optional[PatternList] = None, 404 preserve_source_attributes: bool = False, 405 follow_symlinks: bool = True, 406 source_files: Optional[list[str]] = None, 407 ignore_hidden: bool = True, 408 commit_metadata: bool = True, 409 dryrun: bool = False, 410 dryrun_output_path: Optional[str] = None, 411 ) -> SyncResult: 412 raise NotImplementedError( 413 "CompositeStorageClient cannot be used as sync target (write operation). " 414 "Use CompositeStorageClient as source only, or use a single-location dataset as target." 415 )
416
[docs] 417 def sync_replicas( 418 self, 419 source_path: str, 420 replica_indices: Optional[list[int]] = None, 421 delete_unmatched_files: bool = False, 422 description: str = "Syncing replica", 423 num_worker_processes: Optional[int] = None, 424 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 425 patterns: Optional[PatternList] = None, 426 ignore_hidden: bool = True, 427 ) -> None: 428 """No-op for read-only client.""" 429 pass
430
[docs] 431 def list( 432 self, 433 prefix: str = "", 434 path: str = "", 435 start_after: Optional[str] = None, 436 end_at: Optional[str] = None, 437 include_directories: bool = False, 438 include_url_prefix: bool = False, 439 attribute_filter_expression: Optional[str] = None, 440 show_attributes: bool = False, 441 follow_symlinks: bool = True, 442 patterns: Optional[PatternList] = None, 443 ) -> Iterator[ObjectMetadata]: 444 # Parameter validation - either path or prefix, not both 445 if path and prefix: 446 raise ValueError( 447 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 448 f"Please use only the 'path' parameter for new code. " 449 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 450 ) 451 452 # Use path if provided, otherwise fall back to prefix 453 effective_path = path if path else prefix 454 455 # Apply patterns to the objects 456 pattern_matcher = PatternMatcher(patterns) if patterns else None 457 458 # Delegate to metadata provider (always present for CompositeStorageClient) 459 for obj in self._metadata_provider.list_objects( 460 effective_path, 461 start_after=start_after, 462 end_at=end_at, 463 include_directories=include_directories, 464 attribute_filter_expression=attribute_filter_expression, 465 show_attributes=show_attributes, 466 ): 467 # Skip objects that do not match the patterns 468 if pattern_matcher and not pattern_matcher.should_include_file(obj.key): 469 continue 470 471 if include_url_prefix: 472 obj.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", obj.key) 473 474 yield obj
475
[docs] 476 def generate_presigned_url( 477 self, 478 path: str, 479 *, 480 method: str = "GET", 481 signer_type: Optional[SignerType] = None, 482 signer_options: Optional[dict[str, Any]] = None, 483 ) -> str: 484 resolved = self._metadata_provider.realpath(path) 485 if not resolved.exists: 486 raise FileNotFoundError(f"Path '{path}' not found") 487 488 child = self._get_child_client(resolved.profile) 489 return child.generate_presigned_url( 490 resolved.physical_path, method=method, signer_type=signer_type, signer_options=signer_options 491 )
492 493 def __getstate__(self) -> dict[str, Any]: 494 """Support for pickling.""" 495 return {"_config": self._config} 496 497 def __setstate__(self, state: dict[str, Any]) -> None: 498 """Support for unpickling - reinitialize from config.""" 499 config = state["_config"] 500 self.__init__(config)