1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import logging
17from collections.abc import Iterator
18from typing import IO, Any, List, Optional, Union
19
20from ..config import StorageClientConfig
21from ..constants import MEMORY_LOAD_LIMIT
22from ..file import ObjectFile, PosixFile
23from ..types import (
24 ExecutionMode,
25 MetadataProvider,
26 ObjectMetadata,
27 PatternList,
28 Range,
29 SourceVersionCheckMode,
30 StorageProvider,
31)
32from .composite import CompositeStorageClient
33from .single import SingleStorageClient
34from .types import AbstractStorageClient
35
36logger = logging.getLogger(__name__)
37
38
[docs]
39class StorageClient(AbstractStorageClient):
40 """
41 Unified storage client facade.
42
43 Automatically delegates to:
44 - SingleStorageClient: For single-backend configurations (full read/write)
45 - CompositeStorageClient: For multi-backend configurations (read-only)
46 """
47
48 _delegate: Union[SingleStorageClient, CompositeStorageClient]
49
50 def __init__(self, config: StorageClientConfig):
51 if config.storage_provider_profiles:
52 self._delegate = CompositeStorageClient(config)
53 logger.debug(f"StorageClient '{config.profile}' using CompositeStorageClient (read-only)")
54 else:
55 self._delegate = SingleStorageClient(config)
56 logger.debug(f"StorageClient '{config.profile}' using SingleStorageClient")
57
58 @property
59 def delegate(self) -> Union[SingleStorageClient, CompositeStorageClient]:
60 """
61 Access to underlying delegate storage client.
62
63 :return: SingleStorageClient or CompositeStorageClient.
64 """
65 return self._delegate
66
67 @property
68 def _config(self) -> StorageClientConfig:
69 """
70 :return: The configuration for the underlying storage client.
71 """
72 return self._delegate._config
73
74 @property
75 def _storage_provider(self) -> Optional[StorageProvider]:
76 """
77 :return: The storage provider for the underlying storage client. None for CompositeStorageClient.
78 """
79 return self._delegate._storage_provider
80
81 @_storage_provider.setter
82 def _storage_provider(self, value: StorageProvider) -> None:
83 """Allow mutation of storage provider for testing purposes."""
84 if isinstance(self._delegate, SingleStorageClient):
85 self._delegate._storage_provider = value
86
87 @property
88 def _metadata_provider(self) -> Optional[MetadataProvider]:
89 """
90 :return: The metadata provider for the underlying storage client.
91 """
92 return self._delegate._metadata_provider
93
94 @_metadata_provider.setter
95 def _metadata_provider(self, value: Optional[MetadataProvider]) -> None:
96 """Allow mutation of metadata provider for DSS compatibility."""
97 if isinstance(self._delegate, CompositeStorageClient) and value is None:
98 raise ValueError("CompositeStorageClient requires a metadata_provider for routing decisions.")
99 self._delegate._metadata_provider = value # type: ignore[assignment]
100
101 @property
102 def _metadata_provider_lock(self):
103 """
104 Access to metadata provider lock for DSS compatibility.
105
106 :return: The lock for the metadata provider.
107 """
108 return self._delegate._metadata_provider_lock
109
110 @_metadata_provider_lock.setter
111 def _metadata_provider_lock(self, value):
112 """Allow mutation of metadata provider lock for DSS compatibility."""
113 self._delegate._metadata_provider_lock = value
114
115 @property
116 def _credentials_provider(self):
117 """
118 :return: The credentials provider for the underlying storage client.
119 """
120 return self._delegate._credentials_provider
121
122 @property
123 def _retry_config(self):
124 """
125 :return: The retry configuration for the underlying storage client.
126 """
127 return self._delegate._retry_config
128
129 @property
130 def _cache_manager(self):
131 """
132 :return: The cache manager for the underlying storage client.
133 """
134 return self._delegate._cache_manager
135
136 @property
137 def _replica_manager(self):
138 """
139 :return: The replica manager for the underlying storage client.
140 """
141 return self._delegate._replica_manager
142
143 @_replica_manager.setter
144 def _replica_manager(self, value):
145 """
146 Allow mutation of replica manager for testing purposes.
147
148 :param value: The new replica manager.
149 """
150 if isinstance(self._delegate, SingleStorageClient):
151 self._delegate._replica_manager = value
152
153 @property
154 def profile(self) -> str:
155 """
156 :return: The profile name of the storage client.
157 """
158 return self._delegate.profile
159
160 @property
161 def replicas(self) -> List[AbstractStorageClient]:
162 """
163 :return: List of replica storage clients, sorted by read priority.
164 """
165 return self._delegate.replicas
166
[docs]
167 def is_default_profile(self) -> bool:
168 """
169 :return: ``True`` if the storage client is using the default profile, ``False`` otherwise.
170 """
171 return self._delegate.is_default_profile()
172
173 def _is_rust_client_enabled(self) -> bool:
174 """
175 :return: ``True`` if the storage provider is using the Rust client, ``False`` otherwise.
176 """
177 return self._delegate._is_rust_client_enabled()
178
179 def _is_posix_file_storage_provider(self) -> bool:
180 """
181 :return: ``True`` if the storage client is using a POSIX file storage provider, ``False`` otherwise.
182 """
183 return self._delegate._is_posix_file_storage_provider()
184
[docs]
185 def get_posix_path(self, path: str) -> Optional[str]:
186 """
187 Returns the physical POSIX filesystem path for POSIX storage providers.
188
189 :param path: The path to resolve (may be a symlink or virtual path).
190 :return: Physical POSIX filesystem path if POSIX storage, None otherwise.
191 """
192 return self._delegate.get_posix_path(path)
193
[docs]
194 def read(
195 self,
196 path: str,
197 byte_range: Optional[Range] = None,
198 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
199 ) -> bytes:
200 """
201 Read bytes from a file at the specified logical path.
202
203 :param path: The logical path of the object to read.
204 :param byte_range: Optional byte range to read (offset and length).
205 :param check_source_version: Whether to check the source version of cached objects.
206 :return: The content of the object as bytes.
207 :raises FileNotFoundError: If the file at the specified path does not exist.
208 """
209 return self._delegate.read(path, byte_range, check_source_version)
210
[docs]
211 def open(
212 self,
213 path: str,
214 mode: str = "rb",
215 buffering: int = -1,
216 encoding: Optional[str] = None,
217 disable_read_cache: bool = False,
218 memory_load_limit: int = MEMORY_LOAD_LIMIT,
219 atomic: bool = True,
220 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
221 attributes: Optional[dict[str, str]] = None,
222 prefetch_file: bool = True,
223 ) -> Union[PosixFile, ObjectFile]:
224 """
225 Open a file for reading or writing.
226
227 :param path: The logical path of the object to open.
228 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab".
229 :param buffering: The buffering mode. Only applies to PosixFile.
230 :param encoding: The encoding to use for text files.
231 :param disable_read_cache: When set to ``True``, disables caching for file content.
232 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
233 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB.
234 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB.
235 :param atomic: When set to ``True``, file will be written atomically (rename upon close).
236 This parameter is only applicable to PosixFile in write mode.
237 :param check_source_version: Whether to check the source version of cached objects.
238 :param attributes: Attributes to add to the file.
239 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None.
240 :param prefetch_file: Whether to prefetch the file content.
241 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True.
242 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
243 :raises FileNotFoundError: If the file does not exist (read mode).
244 :raises NotImplementedError: If the operation is not supported (e.g., write on CompositeStorageClient).
245 """
246 return self._delegate.open(
247 path,
248 mode,
249 buffering,
250 encoding,
251 disable_read_cache,
252 memory_load_limit,
253 atomic,
254 check_source_version,
255 attributes,
256 prefetch_file,
257 )
258
[docs]
259 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None:
260 """
261 Download a remote file to a local path or file-like object.
262
263 :param remote_path: The logical path of the remote file to download.
264 :param local_path: The local file path or file-like object to write to.
265 :raises FileNotFoundError: If the remote file does not exist.
266 """
267 return self._delegate.download_file(remote_path, local_path)
268
[docs]
269 def glob(
270 self,
271 pattern: str,
272 include_url_prefix: bool = False,
273 attribute_filter_expression: Optional[str] = None,
274 ) -> List[str]:
275 """
276 Matches and retrieves a list of object keys in the storage provider that match the specified pattern.
277
278 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``).
279 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
280 :param attribute_filter_expression: The attribute filter expression to apply to the result.
281 :return: A list of object paths that match the specified pattern.
282 """
283 return self._delegate.glob(pattern, include_url_prefix, attribute_filter_expression)
284
[docs]
285 def list(
286 self,
287 prefix: str = "",
288 path: str = "",
289 start_after: Optional[str] = None,
290 end_at: Optional[str] = None,
291 include_directories: bool = False,
292 include_url_prefix: bool = False,
293 attribute_filter_expression: Optional[str] = None,
294 show_attributes: bool = False,
295 follow_symlinks: bool = True,
296 ) -> Iterator[ObjectMetadata]:
297 """
298 List objects in the storage provider under the specified path.
299
300 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is
301 deprecated and will be removed in a future version.
302
303 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under.
304 :param path: The directory or file path to list objects under. This should be a
305 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
306 Cannot be used together with ``prefix``.
307 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
308 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
309 :param include_directories: Whether to include directories in the result. when ``True``, directories are returned alongside objects.
310 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
311 :param attribute_filter_expression: The attribute filter expression to apply to the result.
312 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to ``True``.
313 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing.
314 :return: An iterator over ObjectMetadata for matching objects.
315 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty).
316 """
317 return self._delegate.list(
318 prefix,
319 path,
320 start_after,
321 end_at,
322 include_directories,
323 include_url_prefix,
324 attribute_filter_expression,
325 show_attributes,
326 follow_symlinks,
327 )
328
[docs]
329 def is_file(self, path: str) -> bool:
330 """
331 Checks whether the specified path points to a file (rather than a folder or directory).
332
333 :param path: The logical path to check.
334 :return: ``True`` if the key points to a file, ``False`` otherwise.
335 """
336 return self._delegate.is_file(path)
337
[docs]
338 def is_empty(self, path: str) -> bool:
339 """
340 Check whether the specified path is empty. A path is considered empty if there are no
341 objects whose keys start with the given path as a prefix.
342
343 :param path: The logical path to check (typically a directory or folder prefix).
344 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
345 """
346 return self._delegate.is_empty(path)
347
[docs]
348 def info(self, path: str, strict: bool = True) -> ObjectMetadata:
349 """
350 Get metadata for a file at the specified path.
351
352 :param path: The logical path of the object.
353 :param strict: When ``True``, only return committed metadata. When ``False``, include pending changes.
354 :return: ObjectMetadata containing file information (size, last modified, etc.).
355 :raises FileNotFoundError: If the file at the specified path does not exist.
356 """
357 return self._delegate.info(path, strict)
358
[docs]
359 def write(
360 self,
361 path: str,
362 body: bytes,
363 attributes: Optional[dict[str, str]] = None,
364 ) -> None:
365 """
366 Write bytes to a file at the specified path.
367
368 :param path: The logical path where the object will be written.
369 :param body: The content to write as bytes.
370 :param attributes: Optional attributes to add to the file.
371 :raises NotImplementedError: If write operations are not supported (e.g., CompositeStorageClient).
372 """
373 return self._delegate.write(path, body, attributes)
374
[docs]
375 def delete(self, path: str, recursive: bool = False) -> None:
376 """
377 Delete a file or directory at the specified path.
378
379 :param path: The logical path of the object to delete.
380 :param recursive: When True, delete directory and all its contents recursively.
381 :raises FileNotFoundError: If the file or directory does not exist.
382 :raises NotImplementedError: If delete operations are not supported (e.g., CompositeStorageClient).
383 """
384 return self._delegate.delete(path, recursive)
385
[docs]
386 def copy(self, src_path: str, dest_path: str) -> None:
387 """
388 Copy a file from source path to destination path.
389
390 :param src_path: The logical path of the source object.
391 :param dest_path: The logical path where the object will be copied to.
392 :raises FileNotFoundError: If the source file does not exist.
393 :raises NotImplementedError: If copy operations are not supported (e.g., CompositeStorageClient).
394 """
395 return self._delegate.copy(src_path, dest_path)
396
[docs]
397 def upload_file(
398 self,
399 remote_path: str,
400 local_path: Union[str, IO],
401 attributes: Optional[dict[str, str]] = None,
402 ) -> None:
403 """
404 Upload a local file to remote storage.
405
406 :param remote_path: The logical path where the file will be uploaded.
407 :param local_path: The local file path or file-like object to upload.
408 :param attributes: Optional attributes to add to the file.
409 :raises FileNotFoundError: If the local file does not exist.
410 :raises NotImplementedError: If upload operations are not supported (e.g., CompositeStorageClient).
411 """
412 return self._delegate.upload_file(remote_path, local_path, attributes)
413
421
[docs]
422 def sync_from(
423 self,
424 source_client: AbstractStorageClient,
425 source_path: str = "",
426 target_path: str = "",
427 delete_unmatched_files: bool = False,
428 description: str = "Syncing",
429 num_worker_processes: Optional[int] = None,
430 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
431 patterns: Optional[PatternList] = None,
432 preserve_source_attributes: bool = False,
433 follow_symlinks: bool = True,
434 source_files: Optional[List[str]] = None,
435 ignore_hidden: bool = True,
436 commit_metadata: bool = True,
437 ) -> None:
438 """
439 Syncs files from the source storage client to "path/".
440
441 :param source_client: The source storage client.
442 :param source_path: The logical path to sync from.
443 :param target_path: The logical path to sync to.
444 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
445 :param description: Description of sync process for logging purposes.
446 :param num_worker_processes: The number of worker processes to use.
447 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
448 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
449 Cannot be used together with source_files.
450 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization.
451 When ``False`` (default), only file content is copied. When ``True``, custom metadata attributes are also preserved.
452
453 .. warning::
454 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD
455 request for each object to retrieve attributes, which can significantly impact performance on large-scale
456 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile.
457
458 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is ``True``.
459 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these
460 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns.
461 :param ignore_hidden: Whether to ignore hidden files and directories. Default is ``True``.
462 :param commit_metadata: When ``True`` (default), calls :py:meth:`StorageClient.commit_metadata` after sync completes.
463 Set to ``False`` to skip the commit, allowing batching of multiple sync operations before committing manually.
464 :raises ValueError: If both source_files and patterns are provided.
465 :raises NotImplementedError: If sync operations are not supported (e.g., CompositeStorageClient as target).
466 """
467 return self._delegate.sync_from(
468 source_client,
469 source_path,
470 target_path,
471 delete_unmatched_files,
472 description,
473 num_worker_processes,
474 execution_mode,
475 patterns,
476 preserve_source_attributes,
477 follow_symlinks,
478 source_files,
479 ignore_hidden,
480 commit_metadata,
481 )
482
[docs]
483 def sync_replicas(
484 self,
485 source_path: str,
486 replica_indices: Optional[List[int]] = None,
487 delete_unmatched_files: bool = False,
488 description: str = "Syncing replica",
489 num_worker_processes: Optional[int] = None,
490 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
491 patterns: Optional[PatternList] = None,
492 ignore_hidden: bool = True,
493 ) -> None:
494 """
495 Sync files from this client to its replica storage clients.
496
497 :param source_path: The logical path to sync from.
498 :param replica_indices: Specific replica indices to sync to (0-indexed). If None, syncs to all replicas.
499 :param delete_unmatched_files: When set to ``True``, delete files in replicas that don't exist in source.
500 :param description: Description of sync process for logging purposes.
501 :param num_worker_processes: Number of worker processes for parallel sync.
502 :param execution_mode: Execution mode (LOCAL or REMOTE).
503 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
504 :param ignore_hidden: When set to ``True``, ignore hidden files (starting with '.'). Defaults to ``True``.
505 """
506 return self._delegate.sync_replicas(
507 source_path,
508 replica_indices,
509 delete_unmatched_files,
510 description,
511 num_worker_processes,
512 execution_mode,
513 patterns,
514 ignore_hidden,
515 )
516
517 def __getstate__(self) -> dict[str, Any]:
518 """Support for pickling (forward to delegate)."""
519 return self._delegate.__getstate__()
520
521 def __setstate__(self, state: dict[str, Any]) -> None:
522 """Support for unpickling - reconstruct the delegate."""
523 config = state["_config"]
524
525 if config.storage_provider_profiles:
526 self._delegate = CompositeStorageClient.__new__(CompositeStorageClient)
527 self._delegate.__setstate__(state)
528 else:
529 self._delegate = SingleStorageClient.__new__(SingleStorageClient)
530 self._delegate.__setstate__(state)