1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import logging
17from collections.abc import Iterator, Sequence
18from typing import IO, Any, Optional, Union
19
20from ..config import StorageClientConfig
21from ..constants import MEMORY_LOAD_LIMIT
22from ..file import ObjectFile, PosixFile
23from ..types import (
24 MSC_PROTOCOL,
25 ExecutionMode,
26 MetadataProvider,
27 ObjectMetadata,
28 PatternList,
29 Range,
30 SignerType,
31 SourceVersionCheckMode,
32 SyncResult,
33)
34from ..utils import PatternMatcher, join_paths
35from .single import SingleStorageClient
36from .types import AbstractStorageClient
37
38logger = logging.getLogger(__name__)
39
40
[docs]
41class CompositeStorageClient(AbstractStorageClient):
42 """
43 READ-ONLY storage client for multi-backend configurations.
44
45 Routes read operations to child SingleStorageClient instances based on
46 metadata provider's routing information (ResolvedPath.profile).
47
48 Write operations raise NotImplementedError with clear error messages.
49 """
50
51 _metadata_provider: MetadataProvider
52
53 def __init__(self, config: StorageClientConfig):
54 """
55 Initialize a composite storage client.
56
57 :param config: Storage client configuration with storage_provider_profiles set
58 :raises ValueError: If config doesn't have storage_provider_profiles
59 """
60 if not config.storage_provider_profiles:
61 raise ValueError(
62 "CompositeStorageClient requires storage_provider_profiles. "
63 "Use SingleStorageClient for single-backend configurations."
64 )
65
66 if config.metadata_provider is None:
67 raise ValueError("CompositeStorageClient requires a metadata_provider for routing decisions.")
68
69 self._config = config
70 self._profile = config.profile
71 self._metadata_provider = config.metadata_provider
72 self._metadata_provider_lock = None
73 self._storage_provider = config.storage_provider
74 self._credentials_provider = config.credentials_provider
75 self._retry_config = config.retry_config
76 self._cache_manager = config.cache_manager
77 self._replica_manager = None
78
79 self._child_clients: dict[str, SingleStorageClient] = {}
80 self._child_profile_names = config.storage_provider_profiles
81
82 if config.child_configs:
83 # ProviderBundleV2 path: child configs are pre-built
84 for child_name, child_config in config.child_configs.items():
85 self._child_clients[child_name] = SingleStorageClient(child_config)
86 else:
87 # Config-dict path: child profiles have their own credentials defined
88 if not config._config_dict:
89 raise ValueError("CompositeStorageClient requires _config_dict to build child clients")
90
91 for child_profile in self._child_profile_names:
92 child_config = StorageClientConfig.from_dict(
93 config_dict=config._config_dict,
94 profile=child_profile,
95 )
96 self._child_clients[child_profile] = SingleStorageClient(child_config)
97
98 @property
99 def profile(self) -> str:
100 """
101 :return: The profile name of the storage client.
102 """
103 return self._profile
104
105 @property
106 def replicas(self) -> list[AbstractStorageClient]:
107 """
108 :return: List of replica storage clients (empty list for CompositeStorageClient).
109 """
110 return []
111
[docs]
112 def is_default_profile(self) -> bool:
113 return self._config.profile == "__filesystem__"
114
115 def _is_rust_client_enabled(self) -> bool:
116 """
117 Check if Rust client is enabled for all child storage clients.
118
119 When all child backends are Rust-enabled, MSC can use single-process
120 multi-threaded mode instead of multi-process mode, as Rust handles
121 parallelism internally via async I/O without Python GIL contention.
122
123 :return: True if all child clients are Rust-enabled, False otherwise.
124 """
125 return all(client._is_rust_client_enabled() for client in self._child_clients.values())
126
127 def _is_posix_file_storage_provider(self) -> bool:
128 """
129 Check if using POSIX file storage provider.
130
131 :return: False - composite client doesn't have a single storage provider.
132 """
133 return False
134
[docs]
135 def get_posix_path(self, path: str) -> Optional[str]:
136 """
137 Get the POSIX filesystem path for a given logical path.
138
139 :param path: The logical path to resolve.
140 :return: None - composite client doesn't support POSIX path resolution.
141 """
142 return None
143
144 def _get_child_client(self, profile: Optional[str]) -> SingleStorageClient:
145 """
146 Get the child client for the specified profile.
147
148 :param profile: Profile name from ResolvedPath
149 :return: Child SingleStorageClient instance
150 :raises ValueError: If profile is None or not found
151 """
152 if profile is None:
153 raise ValueError(
154 "CompositeStorageClient requires profile from ResolvedPath for routing. "
155 "Metadata provider must return ResolvedPath with profile set."
156 )
157
158 if profile not in self._child_clients:
159 raise ValueError(
160 f"Profile '{profile}' not found in composite client. "
161 f"Available profiles: {list(self._child_clients.keys())}"
162 )
163
164 return self._child_clients[profile]
165
[docs]
166 def read(
167 self,
168 path: str,
169 byte_range: Optional[Range] = None,
170 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
171 ) -> bytes:
172 resolved = self._metadata_provider.realpath(path)
173 if not resolved.exists:
174 raise FileNotFoundError(f"Path '{path}' not found")
175
176 child = self._get_child_client(resolved.profile)
177 return child.read(resolved.physical_path, byte_range, check_source_version)
178
[docs]
179 def open(
180 self,
181 path: str,
182 mode: str = "rb",
183 buffering: int = -1,
184 encoding: Optional[str] = None,
185 disable_read_cache: bool = False,
186 memory_load_limit: int = MEMORY_LOAD_LIMIT,
187 atomic: bool = True,
188 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
189 attributes: Optional[dict[str, str]] = None,
190 prefetch_file: bool = True,
191 ) -> Union[PosixFile, ObjectFile]:
192 """
193 Open a file for reading or writing.
194
195 :param path: The logical path of the object to open.
196 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab".
197 :param buffering: The buffering mode. Only applies to PosixFile.
198 :param encoding: The encoding to use for text files.
199 :param disable_read_cache: When set to ``True``, disables caching for file content.
200 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
201 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB.
202 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB.
203 :param atomic: When set to ``True``, file will be written atomically (rename upon close).
204 This parameter is only applicable to PosixFile in write mode.
205 :param check_source_version: Whether to check the source version of cached objects.
206 :param attributes: Attributes to add to the file.
207 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None.
208 :param prefetch_file: Whether to prefetch the file content.
209 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True.
210 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
211 :raises FileNotFoundError: If the file does not exist (read mode).
212 :raises NotImplementedError: If the operation is not supported (e.g., write on CompositeStorageClient).
213 """
214 if mode not in ["r", "rb"]:
215 raise NotImplementedError(
216 f"CompositeStorageClient only supports read mode (got '{mode}'). "
217 "Write operations are not implemented for multi-location datasets."
218 )
219
220 resolved = self._metadata_provider.realpath(path)
221 if not resolved.exists:
222 raise FileNotFoundError(f"Path '{path}' not found")
223
224 child = self._get_child_client(resolved.profile)
225 return child.open(
226 resolved.physical_path,
227 mode,
228 buffering,
229 encoding,
230 disable_read_cache,
231 memory_load_limit,
232 atomic,
233 check_source_version,
234 attributes,
235 prefetch_file,
236 )
237
[docs]
238 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None:
239 resolved = self._metadata_provider.realpath(remote_path)
240 if not resolved.exists:
241 raise FileNotFoundError(f"Path '{remote_path}' not found")
242
243 child = self._get_child_client(resolved.profile)
244 child.download_file(resolved.physical_path, local_path)
245
[docs]
246 def download_files(
247 self,
248 remote_paths: list[str],
249 local_paths: list[str],
250 metadata: Optional[Sequence[Optional[ObjectMetadata]]] = None,
251 max_workers: int = 16,
252 ) -> None:
253 if len(remote_paths) != len(local_paths):
254 raise ValueError("remote_paths and local_paths must have the same length")
255
256 groups: dict[str, tuple[list[str], list[str], list[Optional[ObjectMetadata]]]] = {}
257 for i, (remote_path, local_path) in enumerate(zip(remote_paths, local_paths)):
258 resolved = self._metadata_provider.realpath(remote_path)
259 if not resolved.exists:
260 raise FileNotFoundError(f"Path '{remote_path}' not found")
261
262 profile = resolved.profile
263 if profile is None:
264 raise ValueError(
265 "CompositeStorageClient requires profile from ResolvedPath for routing. "
266 "Metadata provider must return ResolvedPath with profile set."
267 )
268 if profile not in groups:
269 groups[profile] = ([], [], [])
270 groups[profile][0].append(resolved.physical_path)
271 groups[profile][1].append(local_path)
272 groups[profile][2].append(metadata[i] if metadata is not None else None)
273
274 for profile, (physical_paths, group_local_paths, group_metadata) in groups.items():
275 child = self._get_child_client(profile)
276 has_any_metadata = any(m is not None for m in group_metadata)
277 child.download_files(
278 physical_paths, group_local_paths, group_metadata if has_any_metadata else None, max_workers
279 )
280
[docs]
281 def glob(
282 self,
283 pattern: str,
284 include_url_prefix: bool = False,
285 attribute_filter_expression: Optional[str] = None,
286 ) -> list[str]:
287 results = self._metadata_provider.glob(
288 pattern,
289 attribute_filter_expression=attribute_filter_expression,
290 )
291
292 if include_url_prefix:
293 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results]
294
295 return results
296
[docs]
297 def list_recursive(
298 self,
299 path: str = "",
300 start_after: Optional[str] = None,
301 end_at: Optional[str] = None,
302 max_workers: int = 32,
303 look_ahead: int = 2,
304 include_url_prefix: bool = False,
305 follow_symlinks: bool = True,
306 patterns: Optional[PatternList] = None,
307 ) -> Iterator[ObjectMetadata]:
308
309 yield from self.list(
310 path=path,
311 start_after=start_after,
312 end_at=end_at,
313 include_directories=False,
314 include_url_prefix=include_url_prefix,
315 follow_symlinks=follow_symlinks,
316 patterns=patterns,
317 )
318
[docs]
319 def is_file(self, path: str) -> bool:
320 resolved = self._metadata_provider.realpath(path)
321 return resolved.exists
322
[docs]
323 def is_empty(self, path: str) -> bool:
324 objects = self._metadata_provider.list_objects(path)
325
326 try:
327 return next(objects) is None
328 except StopIteration:
329 pass
330
331 return True
332
[docs]
333 def info(self, path: str, strict: bool = True) -> ObjectMetadata:
334 return self._metadata_provider.get_object_metadata(path, include_pending=not strict)
335
[docs]
336 def write(
337 self,
338 path: str,
339 body: bytes,
340 attributes: Optional[dict[str, str]] = None,
341 ) -> None:
342 """Write operations not supported in read-only mode."""
343 raise NotImplementedError(
344 "CompositeStorageClient is read-only. "
345 "Write operations are not implemented for multi-location datasets. "
346 "Use a single-location dataset for write operations."
347 )
348
[docs]
349 def delete(self, path: str, recursive: bool = False) -> None:
350 """Delete operations not supported in read-only mode."""
351 raise NotImplementedError(
352 "CompositeStorageClient is read-only. Delete operations are not implemented for multi-location datasets."
353 )
354
[docs]
355 def delete_many(self, paths: list[str]) -> None:
356 """Delete operations not supported in read-only mode."""
357 raise NotImplementedError(
358 "CompositeStorageClient is read-only. Delete operations are not implemented for multi-location datasets."
359 )
360
[docs]
361 def copy(self, src_path: str, dest_path: str) -> None:
362 """Copy operations not supported in read-only mode."""
363 raise NotImplementedError(
364 "CompositeStorageClient is read-only. Copy operations are not implemented for multi-location datasets."
365 )
366
[docs]
367 def upload_file(
368 self,
369 remote_path: str,
370 local_path: Union[str, IO],
371 attributes: Optional[dict[str, str]] = None,
372 ) -> None:
373 """Upload operations not supported in read-only mode."""
374 raise NotImplementedError(
375 "CompositeStorageClient is read-only. Upload operations are not implemented for multi-location datasets."
376 )
377
[docs]
378 def upload_files(
379 self,
380 remote_paths: list[str],
381 local_paths: list[str],
382 attributes: Optional[Sequence[Optional[dict[str, str]]]] = None,
383 max_workers: int = 16,
384 ) -> None:
385 """Upload operations not supported in read-only mode."""
386 raise NotImplementedError(
387 "CompositeStorageClient is read-only. Upload operations are not implemented for multi-location datasets."
388 )
389
393
[docs]
394 def sync_from(
395 self,
396 source_client: AbstractStorageClient,
397 source_path: str = "",
398 target_path: str = "",
399 delete_unmatched_files: bool = False,
400 description: str = "Syncing",
401 num_worker_processes: Optional[int] = None,
402 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
403 patterns: Optional[PatternList] = None,
404 preserve_source_attributes: bool = False,
405 follow_symlinks: bool = True,
406 source_files: Optional[list[str]] = None,
407 ignore_hidden: bool = True,
408 commit_metadata: bool = True,
409 dryrun: bool = False,
410 dryrun_output_path: Optional[str] = None,
411 ) -> SyncResult:
412 raise NotImplementedError(
413 "CompositeStorageClient cannot be used as sync target (write operation). "
414 "Use CompositeStorageClient as source only, or use a single-location dataset as target."
415 )
416
[docs]
417 def sync_replicas(
418 self,
419 source_path: str,
420 replica_indices: Optional[list[int]] = None,
421 delete_unmatched_files: bool = False,
422 description: str = "Syncing replica",
423 num_worker_processes: Optional[int] = None,
424 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
425 patterns: Optional[PatternList] = None,
426 ignore_hidden: bool = True,
427 ) -> None:
428 """No-op for read-only client."""
429 pass
430
[docs]
431 def list(
432 self,
433 prefix: str = "",
434 path: str = "",
435 start_after: Optional[str] = None,
436 end_at: Optional[str] = None,
437 include_directories: bool = False,
438 include_url_prefix: bool = False,
439 attribute_filter_expression: Optional[str] = None,
440 show_attributes: bool = False,
441 follow_symlinks: bool = True,
442 patterns: Optional[PatternList] = None,
443 ) -> Iterator[ObjectMetadata]:
444 # Parameter validation - either path or prefix, not both
445 if path and prefix:
446 raise ValueError(
447 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). "
448 f"Please use only the 'path' parameter for new code. "
449 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
450 )
451
452 # Use path if provided, otherwise fall back to prefix
453 effective_path = path if path else prefix
454
455 # Apply patterns to the objects
456 pattern_matcher = PatternMatcher(patterns) if patterns else None
457
458 # Delegate to metadata provider (always present for CompositeStorageClient)
459 for obj in self._metadata_provider.list_objects(
460 effective_path,
461 start_after=start_after,
462 end_at=end_at,
463 include_directories=include_directories,
464 attribute_filter_expression=attribute_filter_expression,
465 show_attributes=show_attributes,
466 ):
467 # Skip objects that do not match the patterns
468 if pattern_matcher and not pattern_matcher.should_include_file(obj.key):
469 continue
470
471 if include_url_prefix:
472 obj.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", obj.key)
473
474 yield obj
475
[docs]
476 def generate_presigned_url(
477 self,
478 path: str,
479 *,
480 method: str = "GET",
481 signer_type: Optional[SignerType] = None,
482 signer_options: Optional[dict[str, Any]] = None,
483 ) -> str:
484 resolved = self._metadata_provider.realpath(path)
485 if not resolved.exists:
486 raise FileNotFoundError(f"Path '{path}' not found")
487
488 child = self._get_child_client(resolved.profile)
489 return child.generate_presigned_url(
490 resolved.physical_path, method=method, signer_type=signer_type, signer_options=signer_options
491 )
492
493 def __getstate__(self) -> dict[str, Any]:
494 """Support for pickling."""
495 return {"_config": self._config}
496
497 def __setstate__(self, state: dict[str, Any]) -> None:
498 """Support for unpickling - reinitialize from config."""
499 config = state["_config"]
500 self.__init__(config)