Source code for multistorageclient.client.composite

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import logging
 17from collections.abc import Iterator
 18from typing import IO, Any, List, Optional, Union
 19
 20from ..config import StorageClientConfig
 21from ..constants import MEMORY_LOAD_LIMIT
 22from ..file import ObjectFile, PosixFile
 23from ..types import (
 24    MSC_PROTOCOL,
 25    ExecutionMode,
 26    MetadataProvider,
 27    ObjectMetadata,
 28    PatternList,
 29    Range,
 30    SourceVersionCheckMode,
 31)
 32from ..utils import join_paths
 33from .single import SingleStorageClient
 34from .types import AbstractStorageClient
 35
 36logger = logging.getLogger(__name__)
 37
 38
[docs] 39class CompositeStorageClient(AbstractStorageClient): 40 """ 41 READ-ONLY storage client for multi-backend configurations. 42 43 Routes read operations to child SingleStorageClient instances based on 44 metadata provider's routing information (ResolvedPath.profile). 45 46 Write operations raise NotImplementedError with clear error messages. 47 """ 48 49 _metadata_provider: MetadataProvider 50 51 def __init__(self, config: StorageClientConfig): 52 """ 53 Initialize a composite storage client. 54 55 :param config: Storage client configuration with storage_provider_profiles set 56 :raises ValueError: If config doesn't have storage_provider_profiles 57 """ 58 if not config.storage_provider_profiles: 59 raise ValueError( 60 "CompositeStorageClient requires storage_provider_profiles. " 61 "Use SingleStorageClient for single-backend configurations." 62 ) 63 64 if config.metadata_provider is None: 65 raise ValueError("CompositeStorageClient requires a metadata_provider for routing decisions.") 66 67 self._config = config 68 self._profile = config.profile 69 self._metadata_provider = config.metadata_provider 70 self._metadata_provider_lock = None 71 self._storage_provider = config.storage_provider 72 self._credentials_provider = config.credentials_provider 73 self._retry_config = config.retry_config 74 self._cache_manager = config.cache_manager 75 self._replica_manager = None 76 77 self._child_clients: dict[str, SingleStorageClient] = {} 78 self._child_profile_names = config.storage_provider_profiles 79 80 if not config._config_dict: 81 raise ValueError("CompositeStorageClient requires _config_dict to build child clients") 82 83 for child_profile in self._child_profile_names: 84 child_config = StorageClientConfig.from_dict( 85 config_dict=config._config_dict, 86 profile=child_profile, 87 ) 88 self._child_clients[child_profile] = SingleStorageClient(child_config) 89 90 @property 91 def profile(self) -> str: 92 """ 93 :return: The profile name of the storage client. 94 """ 95 return self._profile 96 97 @property 98 def replicas(self) -> List[AbstractStorageClient]: 99 """ 100 :return: List of replica storage clients (empty list for CompositeStorageClient). 101 """ 102 return [] 103
[docs] 104 def is_default_profile(self) -> bool: 105 return self._config.profile == "default"
106 107 def _is_rust_client_enabled(self) -> bool: 108 """ 109 Check if Rust client is enabled. 110 111 :return: False - composite client doesn't use Rust client directly. 112 """ 113 return False 114 115 def _is_posix_file_storage_provider(self) -> bool: 116 """ 117 Check if using POSIX file storage provider. 118 119 :return: False - composite client doesn't have a single storage provider. 120 """ 121 return False 122
[docs] 123 def get_posix_path(self, path: str) -> Optional[str]: 124 """ 125 Get the POSIX filesystem path for a given logical path. 126 127 :param path: The logical path to resolve. 128 :return: None - composite client doesn't support POSIX path resolution. 129 """ 130 return None
131 132 def _get_child_client(self, profile: Optional[str]) -> SingleStorageClient: 133 """ 134 Get the child client for the specified profile. 135 136 :param profile: Profile name from ResolvedPath 137 :return: Child SingleStorageClient instance 138 :raises ValueError: If profile is None or not found 139 """ 140 if profile is None: 141 raise ValueError( 142 "CompositeStorageClient requires profile from ResolvedPath for routing. " 143 "Metadata provider must return ResolvedPath with profile set." 144 ) 145 146 if profile not in self._child_clients: 147 raise ValueError( 148 f"Profile '{profile}' not found in composite client. " 149 f"Available profiles: {list(self._child_clients.keys())}" 150 ) 151 152 return self._child_clients[profile] 153
[docs] 154 def read( 155 self, 156 path: str, 157 byte_range: Optional[Range] = None, 158 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 159 ) -> bytes: 160 resolved = self._metadata_provider.realpath(path) 161 if not resolved.exists: 162 raise FileNotFoundError(f"Path '{path}' not found") 163 164 child = self._get_child_client(resolved.profile) 165 return child.read(resolved.physical_path, byte_range, check_source_version)
166
[docs] 167 def open( 168 self, 169 path: str, 170 mode: str = "rb", 171 buffering: int = -1, 172 encoding: Optional[str] = None, 173 disable_read_cache: bool = False, 174 memory_load_limit: int = MEMORY_LOAD_LIMIT, 175 atomic: bool = True, 176 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 177 attributes: Optional[dict[str, str]] = None, 178 prefetch_file: bool = True, 179 ) -> Union[PosixFile, ObjectFile]: 180 """ 181 Open a file for reading or writing. 182 183 :param path: The logical path of the object to open. 184 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab". 185 :param buffering: The buffering mode. Only applies to PosixFile. 186 :param encoding: The encoding to use for text files. 187 :param disable_read_cache: When set to ``True``, disables caching for file content. 188 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 189 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 190 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB. 191 :param atomic: When set to ``True``, file will be written atomically (rename upon close). 192 This parameter is only applicable to PosixFile in write mode. 193 :param check_source_version: Whether to check the source version of cached objects. 194 :param attributes: Attributes to add to the file. 195 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None. 196 :param prefetch_file: Whether to prefetch the file content. 197 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True. 198 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 199 :raises FileNotFoundError: If the file does not exist (read mode). 200 :raises NotImplementedError: If the operation is not supported (e.g., write on CompositeStorageClient). 201 """ 202 if mode not in ["r", "rb"]: 203 raise NotImplementedError( 204 f"CompositeStorageClient only supports read mode (got '{mode}'). " 205 "Write operations are not implemented for multi-location datasets." 206 ) 207 208 resolved = self._metadata_provider.realpath(path) 209 if not resolved.exists: 210 raise FileNotFoundError(f"Path '{path}' not found") 211 212 child = self._get_child_client(resolved.profile) 213 return child.open( 214 resolved.physical_path, 215 mode, 216 buffering, 217 encoding, 218 disable_read_cache, 219 memory_load_limit, 220 atomic, 221 check_source_version, 222 attributes, 223 prefetch_file, 224 )
225
[docs] 226 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None: 227 resolved = self._metadata_provider.realpath(remote_path) 228 if not resolved.exists: 229 raise FileNotFoundError(f"Path '{remote_path}' not found") 230 231 child = self._get_child_client(resolved.profile) 232 child.download_file(resolved.physical_path, local_path)
233
[docs] 234 def glob( 235 self, 236 pattern: str, 237 include_url_prefix: bool = False, 238 attribute_filter_expression: Optional[str] = None, 239 ) -> List[str]: 240 results = self._metadata_provider.glob( 241 pattern, 242 attribute_filter_expression=attribute_filter_expression, 243 ) 244 245 if include_url_prefix: 246 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 247 248 return results
249
[docs] 250 def list( 251 self, 252 prefix: str = "", 253 path: str = "", 254 start_after: Optional[str] = None, 255 end_at: Optional[str] = None, 256 include_directories: bool = False, 257 include_url_prefix: bool = False, 258 attribute_filter_expression: Optional[str] = None, 259 show_attributes: bool = False, 260 follow_symlinks: bool = True, 261 ) -> Iterator[ObjectMetadata]: 262 # Parameter validation - either path or prefix, not both 263 if path and prefix: 264 raise ValueError( 265 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). " 266 f"Please use only the 'path' parameter for new code. " 267 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})" 268 ) 269 270 # Use path if provided, otherwise fall back to prefix 271 effective_path = path if path else prefix 272 273 # Delegate to metadata provider (always present for CompositeStorageClient) 274 for obj in self._metadata_provider.list_objects( 275 effective_path, 276 start_after=start_after, 277 end_at=end_at, 278 include_directories=include_directories, 279 attribute_filter_expression=attribute_filter_expression, 280 show_attributes=show_attributes, 281 ): 282 if include_url_prefix: 283 obj.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", obj.key) 284 yield obj
285
[docs] 286 def is_file(self, path: str) -> bool: 287 resolved = self._metadata_provider.realpath(path) 288 return resolved.exists
289
[docs] 290 def is_empty(self, path: str) -> bool: 291 objects = self._metadata_provider.list_objects(path) 292 293 try: 294 return next(objects) is None 295 except StopIteration: 296 pass 297 298 return True
299
[docs] 300 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 301 return self._metadata_provider.get_object_metadata(path, include_pending=not strict)
302
[docs] 303 def write( 304 self, 305 path: str, 306 body: bytes, 307 attributes: Optional[dict[str, str]] = None, 308 ) -> None: 309 """Write operations not supported in read-only mode.""" 310 raise NotImplementedError( 311 "CompositeStorageClient is read-only. " 312 "Write operations are not implemented for multi-location datasets. " 313 "Use a single-location dataset for write operations." 314 )
315
[docs] 316 def delete(self, path: str, recursive: bool = False) -> None: 317 """Delete operations not supported in read-only mode.""" 318 raise NotImplementedError( 319 "CompositeStorageClient is read-only. Delete operations are not implemented for multi-location datasets." 320 )
321
[docs] 322 def copy(self, src_path: str, dest_path: str) -> None: 323 """Copy operations not supported in read-only mode.""" 324 raise NotImplementedError( 325 "CompositeStorageClient is read-only. Copy operations are not implemented for multi-location datasets." 326 )
327
[docs] 328 def upload_file( 329 self, 330 remote_path: str, 331 local_path: Union[str, IO], 332 attributes: Optional[dict[str, str]] = None, 333 ) -> None: 334 """Upload operations not supported in read-only mode.""" 335 raise NotImplementedError( 336 "CompositeStorageClient is read-only. Upload operations are not implemented for multi-location datasets." 337 )
338
[docs] 339 def commit_metadata(self, prefix: Optional[str] = None) -> None: 340 """No-op for read-only client.""" 341 pass
342
[docs] 343 def sync_from( 344 self, 345 source_client: AbstractStorageClient, 346 source_path: str = "", 347 target_path: str = "", 348 delete_unmatched_files: bool = False, 349 description: str = "Syncing", 350 num_worker_processes: Optional[int] = None, 351 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 352 patterns: Optional[PatternList] = None, 353 preserve_source_attributes: bool = False, 354 follow_symlinks: bool = True, 355 source_files: Optional[List[str]] = None, 356 ignore_hidden: bool = True, 357 commit_metadata: bool = True, 358 ) -> None: 359 raise NotImplementedError( 360 "CompositeStorageClient cannot be used as sync target (write operation). " 361 "Use CompositeStorageClient as source only, or use a single-location dataset as target." 362 )
363
[docs] 364 def sync_replicas( 365 self, 366 source_path: str, 367 replica_indices: Optional[List[int]] = None, 368 delete_unmatched_files: bool = False, 369 description: str = "Syncing replica", 370 num_worker_processes: Optional[int] = None, 371 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 372 patterns: Optional[PatternList] = None, 373 ignore_hidden: bool = True, 374 ) -> None: 375 """No-op for read-only client.""" 376 pass
377 378 def __getstate__(self) -> dict[str, Any]: 379 """Support for pickling.""" 380 return {"_config": self._config} 381 382 def __setstate__(self, state: dict[str, Any]) -> None: 383 """Support for unpickling - reinitialize from config.""" 384 config = state["_config"] 385 self.__init__(config)