Source code for multistorageclient.client.composite

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import logging
 17from collections.abc import Iterator
 18from typing import IO, Any, Optional, Union
 19
 20from ..config import StorageClientConfig
 21from ..constants import MEMORY_LOAD_LIMIT
 22from ..file import ObjectFile, PosixFile
 23from ..types import (
 24    MSC_PROTOCOL,
 25    ExecutionMode,
 26    MetadataProvider,
 27    ObjectMetadata,
 28    PatternList,
 29    Range,
 30    SignerType,
 31    SourceVersionCheckMode,
 32    SyncResult,
 33)
 34from ..utils import PatternMatcher, join_paths
 35from .single import SingleStorageClient
 36from .types import AbstractStorageClient
 37
 38logger = logging.getLogger(__name__)
 39
 40
[docs] 41class CompositeStorageClient(AbstractStorageClient): 42 """ 43 READ-ONLY storage client for multi-backend configurations. 44 45 Routes read operations to child SingleStorageClient instances based on 46 metadata provider's routing information (ResolvedPath.profile). 47 48 Write operations raise NotImplementedError with clear error messages. 49 """ 50 51 _metadata_provider: MetadataProvider 52 53 def __init__(self, config: StorageClientConfig): 54 """ 55 Initialize a composite storage client. 56 57 :param config: Storage client configuration with storage_provider_profiles set 58 :raises ValueError: If config doesn't have storage_provider_profiles 59 """ 60 if not config.storage_provider_profiles: 61 raise ValueError( 62 "CompositeStorageClient requires storage_provider_profiles. " 63 "Use SingleStorageClient for single-backend configurations." 64 ) 65 66 if config.metadata_provider is None: 67 raise ValueError("CompositeStorageClient requires a metadata_provider for routing decisions.") 68 69 self._config = config 70 self._profile = config.profile 71 self._metadata_provider = config.metadata_provider 72 self._metadata_provider_lock = None 73 self._storage_provider = config.storage_provider 74 self._credentials_provider = config.credentials_provider 75 self._retry_config = config.retry_config 76 self._cache_manager = config.cache_manager 77 self._replica_manager = None 78 79 self._child_clients: dict[str, SingleStorageClient] = {} 80 self._child_profile_names = config.storage_provider_profiles 81 82 if config.child_configs: 83 # ProviderBundleV2 path: child configs are pre-built 84 for child_name, child_config in config.child_configs.items(): 85 self._child_clients[child_name] = SingleStorageClient(child_config) 86 else: 87 # Config-dict path: child profiles have their own credentials defined 88 if not config._config_dict: 89 raise ValueError("CompositeStorageClient requires _config_dict to build child clients") 90 91 for child_profile in self._child_profile_names: 92 child_config = StorageClientConfig.from_dict( 93 config_dict=config._config_dict, 94 profile=child_profile, 95 ) 96 self._child_clients[child_profile] = SingleStorageClient(child_config) 97 98 @property 99 def profile(self) -> str: 100 """ 101 :return: The profile name of the storage client. 102 """ 103 return self._profile 104 105 @property 106 def replicas(self) -> list[AbstractStorageClient]: 107 """ 108 :return: List of replica storage clients (empty list for CompositeStorageClient). 109 """ 110 return [] 111
[docs] 112 def is_default_profile(self) -> bool: 113 return self._config.profile == "__filesystem__"
114 115 def _is_rust_client_enabled(self) -> bool: 116 """ 117 Check if Rust client is enabled for all child storage clients. 118 119 When all child backends are Rust-enabled, MSC can use single-process 120 multi-threaded mode instead of multi-process mode, as Rust handles 121 parallelism internally via async I/O without Python GIL contention. 122 123 :return: True if all child clients are Rust-enabled, False otherwise. 124 """ 125 return all(client._is_rust_client_enabled() for client in self._child_clients.values()) 126 127 def _is_posix_file_storage_provider(self) -> bool: 128 """ 129 Check if using POSIX file storage provider. 130 131 :return: False - composite client doesn't have a single storage provider. 132 """ 133 return False 134
[docs] 135 def get_posix_path(self, path: str) -> Optional[str]: 136 """ 137 Get the POSIX filesystem path for a given logical path. 138 139 :param path: The logical path to resolve. 140 :return: None - composite client doesn't support POSIX path resolution. 141 """ 142 return None
143 144 def _get_child_client(self, profile: Optional[str]) -> SingleStorageClient: 145 """ 146 Get the child client for the specified profile. 147 148 :param profile: Profile name from ResolvedPath 149 :return: Child SingleStorageClient instance 150 :raises ValueError: If profile is None or not found 151 """ 152 if profile is None: 153 raise ValueError( 154 "CompositeStorageClient requires profile from ResolvedPath for routing. " 155 "Metadata provider must return ResolvedPath with profile set." 156 ) 157 158 if profile not in self._child_clients: 159 raise ValueError( 160 f"Profile '{profile}' not found in composite client. " 161 f"Available profiles: {list(self._child_clients.keys())}" 162 ) 163 164 return self._child_clients[profile] 165
[docs] 166 def read( 167 self, 168 path: str, 169 byte_range: Optional[Range] = None, 170 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 171 ) -> bytes: 172 resolved = self._metadata_provider.realpath(path) 173 if not resolved.exists: 174 raise FileNotFoundError(f"Path '{path}' not found") 175 176 child = self._get_child_client(resolved.profile) 177 return child.read(resolved.physical_path, byte_range, check_source_version)
178
[docs] 179 def open( 180 self, 181 path: str, 182 mode: str = "rb", 183 buffering: int = -1, 184 encoding: Optional[str] = None, 185 disable_read_cache: bool = False, 186 memory_load_limit: int = MEMORY_LOAD_LIMIT, 187 atomic: bool = True, 188 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 189 attributes: Optional[dict[str, str]] = None, 190 prefetch_file: bool = True, 191 ) -> Union[PosixFile, ObjectFile]: 192 """ 193 Open a file for reading or writing. 194 195 :param path: The logical path of the object to open. 196 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab". 197 :param buffering: The buffering mode. Only applies to PosixFile. 198 :param encoding: The encoding to use for text files. 199 :param disable_read_cache: When set to ``True``, disables caching for file content. 200 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 201 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 202 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB. 203 :param atomic: When set to ``True``, file will be written atomically (rename upon close). 204 This parameter is only applicable to PosixFile in write mode. 205 :param check_source_version: Whether to check the source version of cached objects. 206 :param attributes: Attributes to add to the file. 207 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None. 208 :param prefetch_file: Whether to prefetch the file content. 209 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True. 210 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 211 :raises FileNotFoundError: If the file does not exist (read mode). 212 :raises NotImplementedError: If the operation is not supported (e.g., write on CompositeStorageClient). 213 """ 214 if mode not in ["r", "rb"]: 215 raise NotImplementedError( 216 f"CompositeStorageClient only supports read mode (got '{mode}'). " 217 "Write operations are not implemented for multi-location datasets." 218 ) 219 220 resolved = self._metadata_provider.realpath(path) 221 if not resolved.exists: 222 raise FileNotFoundError(f"Path '{path}' not found") 223 224 child = self._get_child_client(resolved.profile) 225 return child.open( 226 resolved.physical_path, 227 mode, 228 buffering, 229 encoding, 230 disable_read_cache, 231 memory_load_limit, 232 atomic, 233 check_source_version, 234 attributes, 235 prefetch_file, 236 )
237
[docs] 238 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 239 resolved = self._metadata_provider.realpath(remote_path) 240 if not resolved.exists: 241 raise FileNotFoundError(f"Path '{remote_path}' not found") 242 243 child = self._get_child_client(resolved.profile) 244 child.download_file(resolved.physical_path, local_path)
245
[docs] 246 def glob( 247 self, 248 pattern: str, 249 include_url_prefix: bool = False, 250 attribute_filter_expression: Optional[str] = None, 251 ) -> list[str]: 252 results = self._metadata_provider.glob( 253 pattern, 254 attribute_filter_expression=attribute_filter_expression, 255 ) 256 257 if include_url_prefix: 258 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 259 260 return results
261
[docs] 262 def list_recursive( 263 self, 264 path: str = "", 265 start_after: Optional[str] = None, 266 end_at: Optional[str] = None, 267 max_workers: int = 32, 268 look_ahead: int = 2, 269 include_url_prefix: bool = False, 270 follow_symlinks: bool = True, 271 patterns: Optional[PatternList] = None, 272 ) -> Iterator[ObjectMetadata]: 273 274 yield from self.list( 275 path=path, 276 start_after=start_after, 277 end_at=end_at, 278 include_directories=False, 279 include_url_prefix=include_url_prefix, 280 follow_symlinks=follow_symlinks, 281 patterns=patterns, 282 )
283
[docs] 284 def is_file(self, path: str) -> bool: 285 resolved = self._metadata_provider.realpath(path) 286 return resolved.exists
287
[docs] 288 def is_empty(self, path: str) -> bool: 289 objects = self._metadata_provider.list_objects(path) 290 291 try: 292 return next(objects) is None 293 except StopIteration: 294 pass 295 296 return True
297
[docs] 298 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 299 return self._metadata_provider.get_object_metadata(path, include_pending=not strict)
300
[docs] 301 def write( 302 self, 303 path: str, 304 body: bytes, 305 attributes: Optional[dict[str, str]] = None, 306 ) -> None: 307 """Write operations not supported in read-only mode.""" 308 raise NotImplementedError( 309 "CompositeStorageClient is read-only. " 310 "Write operations are not implemented for multi-location datasets. " 311 "Use a single-location dataset for write operations." 312 )
313
[docs] 314 def delete(self, path: str, recursive: bool = False) -> None: 315 """Delete operations not supported in read-only mode.""" 316 raise NotImplementedError( 317 "CompositeStorageClient is read-only. Delete operations are not implemented for multi-location datasets." 318 )
319
[docs] 320 def delete_many(self, paths: list[str]) -> None: 321 """Delete operations not supported in read-only mode.""" 322 raise NotImplementedError( 323 "CompositeStorageClient is read-only. Delete operations are not implemented for multi-location datasets." 324 )
325
[docs] 326 def copy(self, src_path: str, dest_path: str) -> None: 327 """Copy operations not supported in read-only mode.""" 328 raise NotImplementedError( 329 "CompositeStorageClient is read-only. Copy operations are not implemented for multi-location datasets." 330 )
331
[docs] 332 def upload_file( 333 self, 334 remote_path: str, 335 local_path: Union[str, IO], 336 attributes: Optional[dict[str, str]] = None, 337 ) -> None: 338 """Upload operations not supported in read-only mode.""" 339 raise NotImplementedError( 340 "CompositeStorageClient is read-only. Upload operations are not implemented for multi-location datasets." 341 )
342
[docs] 343 def commit_metadata(self, prefix: Optional[str] = None) -> None: 344 """No-op for read-only client.""" 345 pass
346
[docs] 347 def sync_from( 348 self, 349 source_client: AbstractStorageClient, 350 source_path: str = "", 351 target_path: str = "", 352 delete_unmatched_files: bool = False, 353 description: str = "Syncing", 354 num_worker_processes: Optional[int] = None, 355 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 356 patterns: Optional[PatternList] = None, 357 preserve_source_attributes: bool = False, 358 follow_symlinks: bool = True, 359 source_files: Optional[list[str]] = None, 360 ignore_hidden: bool = True, 361 commit_metadata: bool = True, 362 dryrun: bool = False, 363 dryrun_output_path: Optional[str] = None, 364 ) -> SyncResult: 365 raise NotImplementedError( 366 "CompositeStorageClient cannot be used as sync target (write operation). " 367 "Use CompositeStorageClient as source only, or use a single-location dataset as target." 368 )
369
[docs] 370 def sync_replicas( 371 self, 372 source_path: str, 373 replica_indices: Optional[list[int]] = None, 374 delete_unmatched_files: bool = False, 375 description: str = "Syncing replica", 376 num_worker_processes: Optional[int] = None, 377 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 378 patterns: Optional[PatternList] = None, 379 ignore_hidden: bool = True, 380 ) -> None: 381 """No-op for read-only client.""" 382 pass
383
[docs] 384 def list( 385 self, 386 prefix: str = "", 387 path: str = "", 388 start_after: Optional[str] = None, 389 end_at: Optional[str] = None, 390 include_directories: bool = False, 391 include_url_prefix: bool = False, 392 attribute_filter_expression: Optional[str] = None, 393 show_attributes: bool = False, 394 follow_symlinks: bool = True, 395 patterns: Optional[PatternList] = None, 396 ) -> Iterator[ObjectMetadata]: 397 # Parameter validation - either path or prefix, not both 398 if path and prefix: 399 raise ValueError( 400 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 401 f"Please use only the 'path' parameter for new code. " 402 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 403 ) 404 405 # Use path if provided, otherwise fall back to prefix 406 effective_path = path if path else prefix 407 408 # Apply patterns to the objects 409 pattern_matcher = PatternMatcher(patterns) if patterns else None 410 411 # Delegate to metadata provider (always present for CompositeStorageClient) 412 for obj in self._metadata_provider.list_objects( 413 effective_path, 414 start_after=start_after, 415 end_at=end_at, 416 include_directories=include_directories, 417 attribute_filter_expression=attribute_filter_expression, 418 show_attributes=show_attributes, 419 ): 420 # Skip objects that do not match the patterns 421 if pattern_matcher and not pattern_matcher.should_include_file(obj.key): 422 continue 423 424 if include_url_prefix: 425 obj.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", obj.key) 426 427 yield obj
428
[docs] 429 def generate_presigned_url( 430 self, 431 path: str, 432 *, 433 method: str = "GET", 434 signer_type: Optional[SignerType] = None, 435 signer_options: Optional[dict[str, Any]] = None, 436 ) -> str: 437 resolved = self._metadata_provider.realpath(path) 438 if not resolved.exists: 439 raise FileNotFoundError(f"Path '{path}' not found") 440 441 child = self._get_child_client(resolved.profile) 442 return child.generate_presigned_url( 443 resolved.physical_path, method=method, signer_type=signer_type, signer_options=signer_options 444 )
445 446 def __getstate__(self) -> dict[str, Any]: 447 """Support for pickling.""" 448 return {"_config": self._config} 449 450 def __setstate__(self, state: dict[str, Any]) -> None: 451 """Support for unpickling - reinitialize from config.""" 452 config = state["_config"] 453 self.__init__(config)