Source code for multistorageclient.client.composite

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import logging
 17from collections.abc import Iterator
 18from typing import IO, Any, Optional, Union
 19
 20from ..config import StorageClientConfig
 21from ..constants import MEMORY_LOAD_LIMIT
 22from ..file import ObjectFile, PosixFile
 23from ..types import (
 24    MSC_PROTOCOL,
 25    ExecutionMode,
 26    MetadataProvider,
 27    ObjectMetadata,
 28    PatternList,
 29    Range,
 30    SourceVersionCheckMode,
 31    SyncResult,
 32)
 33from ..utils import PatternMatcher, join_paths
 34from .single import SingleStorageClient
 35from .types import AbstractStorageClient
 36
 37logger = logging.getLogger(__name__)
 38
 39
[docs] 40class CompositeStorageClient(AbstractStorageClient): 41 """ 42 READ-ONLY storage client for multi-backend configurations. 43 44 Routes read operations to child SingleStorageClient instances based on 45 metadata provider's routing information (ResolvedPath.profile). 46 47 Write operations raise NotImplementedError with clear error messages. 48 """ 49 50 _metadata_provider: MetadataProvider 51 52 def __init__(self, config: StorageClientConfig): 53 """ 54 Initialize a composite storage client. 55 56 :param config: Storage client configuration with storage_provider_profiles set 57 :raises ValueError: If config doesn't have storage_provider_profiles 58 """ 59 if not config.storage_provider_profiles: 60 raise ValueError( 61 "CompositeStorageClient requires storage_provider_profiles. " 62 "Use SingleStorageClient for single-backend configurations." 63 ) 64 65 if config.metadata_provider is None: 66 raise ValueError("CompositeStorageClient requires a metadata_provider for routing decisions.") 67 68 self._config = config 69 self._profile = config.profile 70 self._metadata_provider = config.metadata_provider 71 self._metadata_provider_lock = None 72 self._storage_provider = config.storage_provider 73 self._credentials_provider = config.credentials_provider 74 self._retry_config = config.retry_config 75 self._cache_manager = config.cache_manager 76 self._replica_manager = None 77 78 self._child_clients: dict[str, SingleStorageClient] = {} 79 self._child_profile_names = config.storage_provider_profiles 80 81 if config.child_configs: 82 # ProviderBundleV2 path: child configs are pre-built 83 for child_name, child_config in config.child_configs.items(): 84 self._child_clients[child_name] = SingleStorageClient(child_config) 85 else: 86 # Config-dict path: child profiles have their own credentials defined 87 if not config._config_dict: 88 raise ValueError("CompositeStorageClient requires _config_dict to build child clients") 89 90 for child_profile in self._child_profile_names: 91 child_config = StorageClientConfig.from_dict( 92 config_dict=config._config_dict, 93 profile=child_profile, 94 ) 95 self._child_clients[child_profile] = SingleStorageClient(child_config) 96 97 @property 98 def profile(self) -> str: 99 """ 100 :return: The profile name of the storage client. 101 """ 102 return self._profile 103 104 @property 105 def replicas(self) -> list[AbstractStorageClient]: 106 """ 107 :return: List of replica storage clients (empty list for CompositeStorageClient). 108 """ 109 return [] 110
[docs] 111 def is_default_profile(self) -> bool: 112 return self._config.profile == "__filesystem__"
113 114 def _is_rust_client_enabled(self) -> bool: 115 """ 116 Check if Rust client is enabled for all child storage clients. 117 118 When all child backends are Rust-enabled, MSC can use single-process 119 multi-threaded mode instead of multi-process mode, as Rust handles 120 parallelism internally via async I/O without Python GIL contention. 121 122 :return: True if all child clients are Rust-enabled, False otherwise. 123 """ 124 return all(client._is_rust_client_enabled() for client in self._child_clients.values()) 125 126 def _is_posix_file_storage_provider(self) -> bool: 127 """ 128 Check if using POSIX file storage provider. 129 130 :return: False - composite client doesn't have a single storage provider. 131 """ 132 return False 133
[docs] 134 def get_posix_path(self, path: str) -> Optional[str]: 135 """ 136 Get the POSIX filesystem path for a given logical path. 137 138 :param path: The logical path to resolve. 139 :return: None - composite client doesn't support POSIX path resolution. 140 """ 141 return None
142 143 def _get_child_client(self, profile: Optional[str]) -> SingleStorageClient: 144 """ 145 Get the child client for the specified profile. 146 147 :param profile: Profile name from ResolvedPath 148 :return: Child SingleStorageClient instance 149 :raises ValueError: If profile is None or not found 150 """ 151 if profile is None: 152 raise ValueError( 153 "CompositeStorageClient requires profile from ResolvedPath for routing. " 154 "Metadata provider must return ResolvedPath with profile set." 155 ) 156 157 if profile not in self._child_clients: 158 raise ValueError( 159 f"Profile '{profile}' not found in composite client. " 160 f"Available profiles: {list(self._child_clients.keys())}" 161 ) 162 163 return self._child_clients[profile] 164
[docs] 165 def read( 166 self, 167 path: str, 168 byte_range: Optional[Range] = None, 169 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 170 ) -> bytes: 171 resolved = self._metadata_provider.realpath(path) 172 if not resolved.exists: 173 raise FileNotFoundError(f"Path '{path}' not found") 174 175 child = self._get_child_client(resolved.profile) 176 return child.read(resolved.physical_path, byte_range, check_source_version)
177
[docs] 178 def open( 179 self, 180 path: str, 181 mode: str = "rb", 182 buffering: int = -1, 183 encoding: Optional[str] = None, 184 disable_read_cache: bool = False, 185 memory_load_limit: int = MEMORY_LOAD_LIMIT, 186 atomic: bool = True, 187 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 188 attributes: Optional[dict[str, str]] = None, 189 prefetch_file: bool = True, 190 ) -> Union[PosixFile, ObjectFile]: 191 """ 192 Open a file for reading or writing. 193 194 :param path: The logical path of the object to open. 195 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab". 196 :param buffering: The buffering mode. Only applies to PosixFile. 197 :param encoding: The encoding to use for text files. 198 :param disable_read_cache: When set to ``True``, disables caching for file content. 199 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 200 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 201 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB. 202 :param atomic: When set to ``True``, file will be written atomically (rename upon close). 203 This parameter is only applicable to PosixFile in write mode. 204 :param check_source_version: Whether to check the source version of cached objects. 205 :param attributes: Attributes to add to the file. 206 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None. 207 :param prefetch_file: Whether to prefetch the file content. 208 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True. 209 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 210 :raises FileNotFoundError: If the file does not exist (read mode). 211 :raises NotImplementedError: If the operation is not supported (e.g., write on CompositeStorageClient). 212 """ 213 if mode not in ["r", "rb"]: 214 raise NotImplementedError( 215 f"CompositeStorageClient only supports read mode (got '{mode}'). " 216 "Write operations are not implemented for multi-location datasets." 217 ) 218 219 resolved = self._metadata_provider.realpath(path) 220 if not resolved.exists: 221 raise FileNotFoundError(f"Path '{path}' not found") 222 223 child = self._get_child_client(resolved.profile) 224 return child.open( 225 resolved.physical_path, 226 mode, 227 buffering, 228 encoding, 229 disable_read_cache, 230 memory_load_limit, 231 atomic, 232 check_source_version, 233 attributes, 234 prefetch_file, 235 )
236
[docs] 237 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 238 resolved = self._metadata_provider.realpath(remote_path) 239 if not resolved.exists: 240 raise FileNotFoundError(f"Path '{remote_path}' not found") 241 242 child = self._get_child_client(resolved.profile) 243 child.download_file(resolved.physical_path, local_path)
244
[docs] 245 def glob( 246 self, 247 pattern: str, 248 include_url_prefix: bool = False, 249 attribute_filter_expression: Optional[str] = None, 250 ) -> list[str]: 251 results = self._metadata_provider.glob( 252 pattern, 253 attribute_filter_expression=attribute_filter_expression, 254 ) 255 256 if include_url_prefix: 257 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 258 259 return results
260
[docs] 261 def list_recursive( 262 self, 263 path: str = "", 264 start_after: Optional[str] = None, 265 end_at: Optional[str] = None, 266 max_workers: int = 32, 267 look_ahead: int = 2, 268 include_url_prefix: bool = False, 269 follow_symlinks: bool = True, 270 patterns: Optional[PatternList] = None, 271 ) -> Iterator[ObjectMetadata]: 272 273 yield from self.list( 274 path=path, 275 start_after=start_after, 276 end_at=end_at, 277 include_directories=False, 278 include_url_prefix=include_url_prefix, 279 follow_symlinks=follow_symlinks, 280 patterns=patterns, 281 )
282
[docs] 283 def is_file(self, path: str) -> bool: 284 resolved = self._metadata_provider.realpath(path) 285 return resolved.exists
286
[docs] 287 def is_empty(self, path: str) -> bool: 288 objects = self._metadata_provider.list_objects(path) 289 290 try: 291 return next(objects) is None 292 except StopIteration: 293 pass 294 295 return True
296
[docs] 297 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 298 return self._metadata_provider.get_object_metadata(path, include_pending=not strict)
299
[docs] 300 def write( 301 self, 302 path: str, 303 body: bytes, 304 attributes: Optional[dict[str, str]] = None, 305 ) -> None: 306 """Write operations not supported in read-only mode.""" 307 raise NotImplementedError( 308 "CompositeStorageClient is read-only. " 309 "Write operations are not implemented for multi-location datasets. " 310 "Use a single-location dataset for write operations." 311 )
312
[docs] 313 def delete(self, path: str, recursive: bool = False) -> None: 314 """Delete operations not supported in read-only mode.""" 315 raise NotImplementedError( 316 "CompositeStorageClient is read-only. Delete operations are not implemented for multi-location datasets." 317 )
318
[docs] 319 def delete_many(self, paths: list[str]) -> None: 320 """Delete operations not supported in read-only mode.""" 321 raise NotImplementedError( 322 "CompositeStorageClient is read-only. Delete operations are not implemented for multi-location datasets." 323 )
324
[docs] 325 def copy(self, src_path: str, dest_path: str) -> None: 326 """Copy operations not supported in read-only mode.""" 327 raise NotImplementedError( 328 "CompositeStorageClient is read-only. Copy operations are not implemented for multi-location datasets." 329 )
330
[docs] 331 def upload_file( 332 self, 333 remote_path: str, 334 local_path: Union[str, IO], 335 attributes: Optional[dict[str, str]] = None, 336 ) -> None: 337 """Upload operations not supported in read-only mode.""" 338 raise NotImplementedError( 339 "CompositeStorageClient is read-only. Upload operations are not implemented for multi-location datasets." 340 )
341
[docs] 342 def commit_metadata(self, prefix: Optional[str] = None) -> None: 343 """No-op for read-only client.""" 344 pass
345
[docs] 346 def sync_from( 347 self, 348 source_client: AbstractStorageClient, 349 source_path: str = "", 350 target_path: str = "", 351 delete_unmatched_files: bool = False, 352 description: str = "Syncing", 353 num_worker_processes: Optional[int] = None, 354 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 355 patterns: Optional[PatternList] = None, 356 preserve_source_attributes: bool = False, 357 follow_symlinks: bool = True, 358 source_files: Optional[list[str]] = None, 359 ignore_hidden: bool = True, 360 commit_metadata: bool = True, 361 ) -> SyncResult: 362 raise NotImplementedError( 363 "CompositeStorageClient cannot be used as sync target (write operation). " 364 "Use CompositeStorageClient as source only, or use a single-location dataset as target." 365 )
366
[docs] 367 def sync_replicas( 368 self, 369 source_path: str, 370 replica_indices: Optional[list[int]] = None, 371 delete_unmatched_files: bool = False, 372 description: str = "Syncing replica", 373 num_worker_processes: Optional[int] = None, 374 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 375 patterns: Optional[PatternList] = None, 376 ignore_hidden: bool = True, 377 ) -> None: 378 """No-op for read-only client.""" 379 pass
380
[docs] 381 def list( 382 self, 383 prefix: str = "", 384 path: str = "", 385 start_after: Optional[str] = None, 386 end_at: Optional[str] = None, 387 include_directories: bool = False, 388 include_url_prefix: bool = False, 389 attribute_filter_expression: Optional[str] = None, 390 show_attributes: bool = False, 391 follow_symlinks: bool = True, 392 patterns: Optional[PatternList] = None, 393 ) -> Iterator[ObjectMetadata]: 394 # Parameter validation - either path or prefix, not both 395 if path and prefix: 396 raise ValueError( 397 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 398 f"Please use only the 'path' parameter for new code. " 399 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 400 ) 401 402 # Use path if provided, otherwise fall back to prefix 403 effective_path = path if path else prefix 404 405 # Apply patterns to the objects 406 pattern_matcher = PatternMatcher(patterns) if patterns else None 407 408 # Delegate to metadata provider (always present for CompositeStorageClient) 409 for obj in self._metadata_provider.list_objects( 410 effective_path, 411 start_after=start_after, 412 end_at=end_at, 413 include_directories=include_directories, 414 attribute_filter_expression=attribute_filter_expression, 415 show_attributes=show_attributes, 416 ): 417 # Skip objects that do not match the patterns 418 if pattern_matcher and not pattern_matcher.should_include_file(obj.key): 419 continue 420 421 if include_url_prefix: 422 obj.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", obj.key) 423 424 yield obj
425 426 def __getstate__(self) -> dict[str, Any]: 427 """Support for pickling.""" 428 return {"_config": self._config} 429 430 def __setstate__(self, state: dict[str, Any]) -> None: 431 """Support for unpickling - reinitialize from config.""" 432 config = state["_config"] 433 self.__init__(config)