1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import logging
17from collections.abc import Iterator
18from typing import IO, Any, Optional, Union
19
20from ..config import StorageClientConfig
21from ..constants import MEMORY_LOAD_LIMIT
22from ..file import ObjectFile, PosixFile
23from ..types import (
24 ExecutionMode,
25 MetadataProvider,
26 ObjectMetadata,
27 PatternList,
28 Range,
29 SignerType,
30 SourceVersionCheckMode,
31 StorageProvider,
32 SyncResult,
33)
34from .composite import CompositeStorageClient
35from .single import SingleStorageClient
36from .types import AbstractStorageClient
37
38logger = logging.getLogger(__name__)
39
40
[docs]
41class StorageClient(AbstractStorageClient):
42 """
43 Unified storage client facade.
44
45 Automatically delegates to:
46 - SingleStorageClient: For single-backend configurations (full read/write)
47 - CompositeStorageClient: For multi-backend configurations (read-only)
48 """
49
50 _delegate: Union[SingleStorageClient, CompositeStorageClient]
51
52 def __init__(self, config: StorageClientConfig):
53 if config.storage_provider_profiles:
54 self._delegate = CompositeStorageClient(config)
55 logger.debug(f"StorageClient '{config.profile}' using CompositeStorageClient (read-only)")
56 else:
57 self._delegate = SingleStorageClient(config)
58 logger.debug(f"StorageClient '{config.profile}' using SingleStorageClient")
59
60 @property
61 def delegate(self) -> Union[SingleStorageClient, CompositeStorageClient]:
62 """
63 Access to underlying delegate storage client.
64
65 :return: SingleStorageClient or CompositeStorageClient.
66 """
67 return self._delegate
68
69 @property
70 def _config(self) -> StorageClientConfig:
71 """
72 :return: The configuration for the underlying storage client.
73 """
74 return self._delegate._config
75
76 @property
77 def _storage_provider(self) -> Optional[StorageProvider]:
78 """
79 :return: The storage provider for the underlying storage client. None for CompositeStorageClient.
80 """
81 return self._delegate._storage_provider
82
83 @_storage_provider.setter
84 def _storage_provider(self, value: StorageProvider) -> None:
85 """Allow mutation of storage provider for testing purposes."""
86 if isinstance(self._delegate, SingleStorageClient):
87 self._delegate._storage_provider = value
88
89 @property
90 def _metadata_provider(self) -> Optional[MetadataProvider]:
91 """
92 :return: The metadata provider for the underlying storage client.
93 """
94 return self._delegate._metadata_provider
95
96 @_metadata_provider.setter
97 def _metadata_provider(self, value: Optional[MetadataProvider]) -> None:
98 """Allow mutation of metadata provider for DSS compatibility."""
99 if isinstance(self._delegate, CompositeStorageClient) and value is None:
100 raise ValueError("CompositeStorageClient requires a metadata_provider for routing decisions.")
101 self._delegate._metadata_provider = value # type: ignore[assignment]
102
103 @property
104 def _metadata_provider_lock(self):
105 """
106 Access to metadata provider lock for DSS compatibility.
107
108 :return: The lock for the metadata provider.
109 """
110 return self._delegate._metadata_provider_lock
111
112 @_metadata_provider_lock.setter
113 def _metadata_provider_lock(self, value):
114 """Allow mutation of metadata provider lock for DSS compatibility."""
115 self._delegate._metadata_provider_lock = value
116
117 @property
118 def _credentials_provider(self):
119 """
120 :return: The credentials provider for the underlying storage client.
121 """
122 return self._delegate._credentials_provider
123
124 @property
125 def _retry_config(self):
126 """
127 :return: The retry configuration for the underlying storage client.
128 """
129 return self._delegate._retry_config
130
131 @property
132 def _cache_manager(self):
133 """
134 :return: The cache manager for the underlying storage client.
135 """
136 return self._delegate._cache_manager
137
138 @property
139 def _replica_manager(self):
140 """
141 :return: The replica manager for the underlying storage client.
142 """
143 return self._delegate._replica_manager
144
145 @_replica_manager.setter
146 def _replica_manager(self, value):
147 """
148 Allow mutation of replica manager for testing purposes.
149
150 :param value: The new replica manager.
151 """
152 if isinstance(self._delegate, SingleStorageClient):
153 self._delegate._replica_manager = value
154
155 @property
156 def profile(self) -> str:
157 """
158 :return: The profile name of the storage client.
159 """
160 return self._delegate.profile
161
162 @property
163 def replicas(self) -> list[AbstractStorageClient]:
164 """
165 :return: List of replica storage clients, sorted by read priority.
166 """
167 return self._delegate.replicas
168
[docs]
169 def is_default_profile(self) -> bool:
170 """
171 :return: ``True`` if the storage client is using the default profile, ``False`` otherwise.
172 """
173 return self._delegate.is_default_profile()
174
175 def _is_rust_client_enabled(self) -> bool:
176 """
177 :return: ``True`` if the storage provider is using the Rust client, ``False`` otherwise.
178 """
179 return self._delegate._is_rust_client_enabled()
180
181 def _is_posix_file_storage_provider(self) -> bool:
182 """
183 :return: ``True`` if the storage client is using a POSIX file storage provider, ``False`` otherwise.
184 """
185 return self._delegate._is_posix_file_storage_provider()
186
[docs]
187 def get_posix_path(self, path: str) -> Optional[str]:
188 """
189 Returns the physical POSIX filesystem path for POSIX storage providers.
190
191 :param path: The path to resolve (may be a symlink or virtual path).
192 :return: Physical POSIX filesystem path if POSIX storage, None otherwise.
193 """
194 return self._delegate.get_posix_path(path)
195
[docs]
196 def read(
197 self,
198 path: str,
199 byte_range: Optional[Range] = None,
200 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
201 ) -> bytes:
202 """
203 Read bytes from a file at the specified logical path.
204
205 :param path: The logical path of the object to read.
206 :param byte_range: Optional byte range to read (offset and length).
207 :param check_source_version: Whether to check the source version of cached objects.
208 :return: The content of the object as bytes.
209 :raises FileNotFoundError: If the file at the specified path does not exist.
210 """
211 return self._delegate.read(path, byte_range, check_source_version)
212
[docs]
213 def open(
214 self,
215 path: str,
216 mode: str = "rb",
217 buffering: int = -1,
218 encoding: Optional[str] = None,
219 disable_read_cache: bool = False,
220 memory_load_limit: int = MEMORY_LOAD_LIMIT,
221 atomic: bool = True,
222 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
223 attributes: Optional[dict[str, str]] = None,
224 prefetch_file: bool = True,
225 ) -> Union[PosixFile, ObjectFile]:
226 """
227 Open a file for reading or writing.
228
229 :param path: The logical path of the object to open.
230 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab".
231 :param buffering: The buffering mode. Only applies to PosixFile.
232 :param encoding: The encoding to use for text files.
233 :param disable_read_cache: When set to ``True``, disables caching for file content.
234 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
235 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB.
236 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB.
237 :param atomic: When set to ``True``, file will be written atomically (rename upon close).
238 This parameter is only applicable to PosixFile in write mode.
239 :param check_source_version: Whether to check the source version of cached objects.
240 :param attributes: Attributes to add to the file.
241 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None.
242 :param prefetch_file: Whether to prefetch the file content.
243 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True.
244 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
245 :raises FileNotFoundError: If the file does not exist (read mode).
246 :raises NotImplementedError: If the operation is not supported (e.g., write on CompositeStorageClient).
247 """
248 return self._delegate.open(
249 path,
250 mode,
251 buffering,
252 encoding,
253 disable_read_cache,
254 memory_load_limit,
255 atomic,
256 check_source_version,
257 attributes,
258 prefetch_file,
259 )
260
[docs]
261 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None:
262 """
263 Download a remote file to a local path or file-like object.
264
265 :param remote_path: The logical path of the remote file to download.
266 :param local_path: The local file path or file-like object to write to.
267 :raises FileNotFoundError: If the remote file does not exist.
268 """
269 return self._delegate.download_file(remote_path, local_path)
270
[docs]
271 def glob(
272 self,
273 pattern: str,
274 include_url_prefix: bool = False,
275 attribute_filter_expression: Optional[str] = None,
276 ) -> list[str]:
277 """
278 Matches and retrieves a list of object keys in the storage provider that match the specified pattern.
279
280 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``).
281 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
282 :param attribute_filter_expression: The attribute filter expression to apply to the result.
283 :return: A list of object paths that match the specified pattern.
284 """
285 return self._delegate.glob(pattern, include_url_prefix, attribute_filter_expression)
286
[docs]
287 def list_recursive(
288 self,
289 path: str = "",
290 start_after: Optional[str] = None,
291 end_at: Optional[str] = None,
292 max_workers: int = 32,
293 look_ahead: int = 2,
294 include_url_prefix: bool = False,
295 follow_symlinks: bool = True,
296 patterns: Optional[PatternList] = None,
297 ) -> Iterator[ObjectMetadata]:
298 """
299 List files recursively in the storage provider under the specified path.
300
301 :param path: The directory or file path to list objects under. This should be a
302 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
303 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
304 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
305 :param max_workers: Maximum concurrent workers for provider-level recursive listing.
306 :param look_ahead: Prefixes to buffer per worker for provider-level recursive listing.
307 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
308 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing.
309 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
310 :return: An iterator over ObjectMetadata for matching files.
311 """
312 return self._delegate.list_recursive(
313 path=path,
314 start_after=start_after,
315 end_at=end_at,
316 max_workers=max_workers,
317 look_ahead=look_ahead,
318 include_url_prefix=include_url_prefix,
319 follow_symlinks=follow_symlinks,
320 patterns=patterns,
321 )
322
[docs]
323 def is_file(self, path: str) -> bool:
324 """
325 Checks whether the specified path points to a file (rather than a folder or directory).
326
327 :param path: The logical path to check.
328 :return: ``True`` if the key points to a file, ``False`` otherwise.
329 """
330 return self._delegate.is_file(path)
331
[docs]
332 def is_empty(self, path: str) -> bool:
333 """
334 Check whether the specified path is empty. A path is considered empty if there are no
335 objects whose keys start with the given path as a prefix.
336
337 :param path: The logical path to check (typically a directory or folder prefix).
338 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
339 """
340 return self._delegate.is_empty(path)
341
[docs]
342 def info(self, path: str, strict: bool = True) -> ObjectMetadata:
343 """
344 Get metadata for a file at the specified path.
345
346 :param path: The logical path of the object.
347 :param strict: When ``True``, only return committed metadata. When ``False``, include pending changes.
348 :return: ObjectMetadata containing file information (size, last modified, etc.).
349 :raises FileNotFoundError: If the file at the specified path does not exist.
350 """
351 return self._delegate.info(path, strict)
352
[docs]
353 def write(
354 self,
355 path: str,
356 body: bytes,
357 attributes: Optional[dict[str, str]] = None,
358 ) -> None:
359 """
360 Write bytes to a file at the specified path.
361
362 :param path: The logical path where the object will be written.
363 :param body: The content to write as bytes.
364 :param attributes: Optional attributes to add to the file.
365 :raises NotImplementedError: If write operations are not supported (e.g., CompositeStorageClient).
366 """
367 return self._delegate.write(path, body, attributes)
368
[docs]
369 def delete(self, path: str, recursive: bool = False) -> None:
370 """
371 Delete a file or directory at the specified path.
372
373 :param path: The logical path of the object to delete.
374 :param recursive: When True, delete directory and all its contents recursively.
375 :raises FileNotFoundError: If the file or directory does not exist.
376 :raises NotImplementedError: If delete operations are not supported (e.g., CompositeStorageClient).
377 """
378 return self._delegate.delete(path, recursive)
379
[docs]
380 def delete_many(self, paths: list[str]) -> None:
381 """
382 Delete multiple files at the specified paths. Only files are supported; directories are not deleted.
383
384 :param paths: List of logical paths of the files to delete.
385 :raises NotImplementedError: If delete operations are not supported (e.g., CompositeStorageClient).
386 """
387 return self._delegate.delete_many(paths)
388
[docs]
389 def copy(self, src_path: str, dest_path: str) -> None:
390 """
391 Copy a file from source path to destination path.
392
393 :param src_path: The logical path of the source object.
394 :param dest_path: The logical path where the object will be copied to.
395 :raises FileNotFoundError: If the source file does not exist.
396 :raises NotImplementedError: If copy operations are not supported (e.g., CompositeStorageClient).
397 """
398 return self._delegate.copy(src_path, dest_path)
399
[docs]
400 def upload_file(
401 self,
402 remote_path: str,
403 local_path: Union[str, IO],
404 attributes: Optional[dict[str, str]] = None,
405 ) -> None:
406 """
407 Upload a local file to remote storage.
408
409 :param remote_path: The logical path where the file will be uploaded.
410 :param local_path: The local file path or file-like object to upload.
411 :param attributes: Optional attributes to add to the file.
412 :raises FileNotFoundError: If the local file does not exist.
413 :raises NotImplementedError: If upload operations are not supported (e.g., CompositeStorageClient).
414 """
415 return self._delegate.upload_file(remote_path, local_path, attributes)
416
424
[docs]
425 def sync_from(
426 self,
427 source_client: AbstractStorageClient,
428 source_path: str = "",
429 target_path: str = "",
430 delete_unmatched_files: bool = False,
431 description: str = "Syncing",
432 num_worker_processes: Optional[int] = None,
433 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
434 patterns: Optional[PatternList] = None,
435 preserve_source_attributes: bool = False,
436 follow_symlinks: bool = True,
437 source_files: Optional[list[str]] = None,
438 ignore_hidden: bool = True,
439 commit_metadata: bool = True,
440 dryrun: bool = False,
441 dryrun_output_path: Optional[str] = None,
442 ) -> SyncResult:
443 """
444 Syncs files from the source storage client to "path/".
445
446 :param source_client: The source storage client.
447 :param source_path: The logical path to sync from.
448 :param target_path: The logical path to sync to.
449 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
450 :param description: Description of sync process for logging purposes.
451 :param num_worker_processes: The number of worker processes to use.
452 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
453 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
454 Cannot be used together with source_files.
455 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization.
456 When ``False`` (default), only file content is copied. When ``True``, custom metadata attributes are also preserved.
457
458 .. warning::
459 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD
460 request for each object to retrieve attributes, which can significantly impact performance on large-scale
461 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile.
462
463 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is ``True``.
464 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these
465 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns.
466 :param ignore_hidden: Whether to ignore hidden files and directories. Default is ``True``.
467 :param commit_metadata: When ``True`` (default), calls :py:meth:`StorageClient.commit_metadata` after sync completes.
468 Set to ``False`` to skip the commit, allowing batching of multiple sync operations before committing manually.
469 :param dryrun: If ``True``, only enumerate and compare objects without performing any copy/delete operations.
470 The returned :py:class:`SyncResult` will include a :py:class:`DryrunResult` with paths to JSONL files.
471 :param dryrun_output_path: Directory to write dryrun JSONL files into. If ``None`` (default), a temporary
472 directory is created automatically. Ignored when ``dryrun`` is ``False``.
473 :raises ValueError: If both source_files and patterns are provided.
474 :raises NotImplementedError: If sync operations are not supported (e.g., CompositeStorageClient as target).
475 """
476 return self._delegate.sync_from(
477 source_client,
478 source_path,
479 target_path,
480 delete_unmatched_files,
481 description,
482 num_worker_processes,
483 execution_mode,
484 patterns,
485 preserve_source_attributes,
486 follow_symlinks,
487 source_files,
488 ignore_hidden,
489 commit_metadata,
490 dryrun,
491 dryrun_output_path,
492 )
493
[docs]
494 def sync_replicas(
495 self,
496 source_path: str,
497 replica_indices: Optional[list[int]] = None,
498 delete_unmatched_files: bool = False,
499 description: str = "Syncing replica",
500 num_worker_processes: Optional[int] = None,
501 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
502 patterns: Optional[PatternList] = None,
503 ignore_hidden: bool = True,
504 ) -> None:
505 """
506 Sync files from this client to its replica storage clients.
507
508 :param source_path: The logical path to sync from.
509 :param replica_indices: Specific replica indices to sync to (0-indexed). If None, syncs to all replicas.
510 :param delete_unmatched_files: When set to ``True``, delete files in replicas that don't exist in source.
511 :param description: Description of sync process for logging purposes.
512 :param num_worker_processes: Number of worker processes for parallel sync.
513 :param execution_mode: Execution mode (LOCAL or REMOTE).
514 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
515 :param ignore_hidden: When set to ``True``, ignore hidden files (starting with '.'). Defaults to ``True``.
516 """
517 return self._delegate.sync_replicas(
518 source_path,
519 replica_indices,
520 delete_unmatched_files,
521 description,
522 num_worker_processes,
523 execution_mode,
524 patterns,
525 ignore_hidden,
526 )
527
[docs]
528 def list(
529 self,
530 prefix: str = "",
531 path: str = "",
532 start_after: Optional[str] = None,
533 end_at: Optional[str] = None,
534 include_directories: bool = False,
535 include_url_prefix: bool = False,
536 attribute_filter_expression: Optional[str] = None,
537 show_attributes: bool = False,
538 follow_symlinks: bool = True,
539 patterns: Optional[PatternList] = None,
540 ) -> Iterator[ObjectMetadata]:
541 """
542 List objects in the storage provider under the specified path.
543
544 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is
545 deprecated and will be removed in a future version.
546
547 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under.
548 :param path: The directory or file path to list objects under. This should be a
549 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
550 Cannot be used together with ``prefix``.
551 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
552 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
553 :param include_directories: Whether to include directories in the result. When ``True``, directories are returned alongside objects.
554 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
555 :param attribute_filter_expression: The attribute filter expression to apply to the result.
556 :param show_attributes: Whether to return attributes in the result. WARNING: Depending on implementation, there may be a performance impact if this is set to ``True``.
557 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing.
558 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
559 :return: An iterator over ObjectMetadata for matching objects.
560 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty).
561 """
562 return self._delegate.list(
563 prefix,
564 path,
565 start_after,
566 end_at,
567 include_directories,
568 include_url_prefix,
569 attribute_filter_expression,
570 show_attributes,
571 follow_symlinks,
572 patterns,
573 )
574
[docs]
575 def generate_presigned_url(
576 self,
577 path: str,
578 *,
579 method: str = "GET",
580 signer_type: Optional[SignerType] = None,
581 signer_options: Optional[dict[str, Any]] = None,
582 ) -> str:
583 """
584 Generate a pre-signed URL granting temporary access to the object at *path*.
585
586 :param path: The logical path of the object.
587 :param method: The HTTP method the URL should authorise (e.g. ``"GET"``, ``"PUT"``).
588 :param signer_type: The signing backend to use. ``None`` means the provider's native signer.
589 :param signer_options: Backend-specific options forwarded to the signer.
590 :return: A pre-signed URL string.
591 :raises NotImplementedError: If the underlying storage provider does not support presigned URLs.
592 """
593 return self._delegate.generate_presigned_url(
594 path, method=method, signer_type=signer_type, signer_options=signer_options
595 )
596
597 def __getstate__(self) -> dict[str, Any]:
598 """Support for pickling (forward to delegate)."""
599 return self._delegate.__getstate__()
600
601 def __setstate__(self, state: dict[str, Any]) -> None:
602 """Support for unpickling - reconstruct the delegate."""
603 config = state["_config"]
604
605 if config.storage_provider_profiles:
606 self._delegate = CompositeStorageClient.__new__(CompositeStorageClient)
607 self._delegate.__setstate__(state)
608 else:
609 self._delegate = SingleStorageClient.__new__(SingleStorageClient)
610 self._delegate.__setstate__(state)