1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import contextlib
18import logging
19import os
20import threading
21from collections.abc import Iterator
22from pathlib import PurePosixPath
23from typing import Any, List, Optional, Union, cast
24
25from .config import StorageClientConfig
26from .constants import MEMORY_LOAD_LIMIT
27from .file import ObjectFile, PosixFile
28from .instrumentation.utils import instrumented
29from .providers.posix_file import PosixFileStorageProvider
30from .retry import retry
31from .sync import SyncManager
32from .types import MSC_PROTOCOL, ExecutionMode, ObjectMetadata, Range, Replica, SourceVersionCheckMode
33from .utils import NullStorageClient, join_paths
34
35logger = logging.getLogger(__name__)
36
37
[docs]
38@instrumented
39class StorageClient:
40 """
41 A client for interacting with different storage providers.
42 """
43
44 _config: StorageClientConfig
45 _metadata_provider_lock: Optional[threading.Lock] = None
46 _stop_event: Optional[threading.Event] = None
47
48 def __init__(self, config: StorageClientConfig):
49 """
50 Initializes the :py:class:`StorageClient` with the given configuration.
51
52 :param config: The configuration object for the storage client.
53 """
54 self._initialize_providers(config)
55 self._initialize_replicas(config.replicas)
56
57 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event):
58 if not stop_event:
59 raise RuntimeError("Stop event not set")
60
61 while not stop_event.is_set():
62 # Wait with the ability to exit early
63 if stop_event.wait(timeout=commit_interval_minutes * 60):
64 break
65 logger.debug("Auto-committing to metadata provider")
66 self.commit_metadata()
67
68 def _commit_on_exit(self):
69 logger.debug("Shutting down, committing metadata one last time...")
70 self.commit_metadata()
71
72 def _initialize_providers(self, config: StorageClientConfig) -> None:
73 self._config = config
74 self._credentials_provider = self._config.credentials_provider
75 self._storage_provider = self._config.storage_provider
76 self._metadata_provider = self._config.metadata_provider
77 self._cache_config = self._config.cache_config
78 self._retry_config = self._config.retry_config
79 self._cache_manager = self._config.cache_manager
80 self._autocommit_config = self._config.autocommit_config
81
82 if self._autocommit_config:
83 if self._metadata_provider:
84 logger.debug("Creating auto-commiter thread")
85
86 if self._autocommit_config.interval_minutes:
87 self._stop_event = threading.Event()
88 self._commit_thread = threading.Thread(
89 target=self._committer_thread,
90 daemon=True,
91 args=(self._autocommit_config.interval_minutes, self._stop_event),
92 )
93 self._commit_thread.start()
94
95 if self._autocommit_config.at_exit:
96 atexit.register(self._commit_on_exit)
97
98 self._metadata_provider_lock = threading.Lock()
99 else:
100 logger.debug("No metadata provider configured, auto-commit will not be enabled")
101
102 def __del__(self):
103 if self._stop_event:
104 self._stop_event.set()
105
106 def _get_source_version(self, path: str) -> Optional[str]:
107 """
108 Get etag from metadata provider or storage provider.
109 """
110 if self._metadata_provider:
111 metadata = self._metadata_provider.get_object_metadata(path)
112 else:
113 metadata = self._storage_provider.get_object_metadata(path)
114 return metadata.etag
115
116 def _is_cache_enabled(self) -> bool:
117 return self._cache_manager is not None and not self._is_posix_file_storage_provider()
118
119 def _is_posix_file_storage_provider(self) -> bool:
120 return isinstance(self._storage_provider, PosixFileStorageProvider)
121
[docs]
122 def is_default_profile(self) -> bool:
123 """
124 Return True if the storage client is using the default profile.
125 """
126 return self._config.profile == "default"
127
128 @property
129 def profile(self) -> str:
130 return self._config.profile
131
132 def _initialize_replicas(self, replicas: list[Replica]) -> None:
133 """Initialize replica StorageClient instances."""
134 # Sort replicas by read_priority, the first one is the primary replica
135 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority)
136
137 replica_clients = []
138 for replica in sorted_replicas:
139 if self._config._config_dict is None:
140 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config")
141 replica_config = StorageClientConfig.from_dict(
142 config_dict=self._config._config_dict, profile=replica.replica_profile
143 )
144
145 storage_client = StorageClient(config=replica_config)
146 replica_clients.append(storage_client)
147
148 self._replicas = replica_clients
149
150 @property
151 def replicas(self) -> list["StorageClient"]:
152 """Return StorageClient instances for all replicas, sorted by read priority."""
153 return self._replicas
154
155 @retry
156 def read(self, path: str, byte_range: Optional[Range] = None) -> bytes:
157 """
158 Reads an object from the specified logical path.
159
160 :param path: The logical path of the object to read.
161 :return: The content of the object.
162 """
163 if self._metadata_provider:
164 path, exists = self._metadata_provider.realpath(path)
165 if not exists:
166 raise FileNotFoundError(f"The file at path '{path}' was not found.")
167
168 # Never cache range-read requests
169 if byte_range:
170 return self._storage_provider.get_object(path, byte_range=byte_range)
171
172 # Read from cache if the file exists
173 if self._is_cache_enabled():
174 assert self._cache_manager is not None
175 source_version = self._get_source_version(path)
176 data = self._cache_manager.read(path, source_version)
177
178 if data is None:
179 data = self._storage_provider.get_object(path)
180 self._cache_manager.set(path, data, source_version)
181
182 return data
183
184 return self._storage_provider.get_object(path, byte_range=byte_range)
185
[docs]
186 def info(self, path: str, strict: bool = True) -> ObjectMetadata:
187 """
188 Retrieves metadata or information about an object stored at the specified path.
189
190 :param path: The logical path to the object for which metadata or information is being retrieved.
191 :param strict: If True, performs additional validation to determine whether the path refers to a directory.
192
193 :return: A dictionary containing metadata about the object.
194 """
195 if not self._metadata_provider:
196 return self._storage_provider.get_object_metadata(path, strict=strict)
197
198 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory.
199 try:
200 return self._metadata_provider.get_object_metadata(path)
201 except FileNotFoundError:
202 # Try listing from the parent to determine if path is a valid directory
203 parent = os.path.dirname(path.rstrip("/")) + "/"
204 parent = "" if parent == "/" else parent
205 target = path.rstrip("/") + "/"
206
207 try:
208 entries = self._metadata_provider.list_objects(parent, include_directories=True)
209 for entry in entries:
210 if entry.key == target and entry.type == "directory":
211 return entry
212 except Exception:
213 pass
214 raise # Raise original FileNotFoundError
215
216 @retry
217 def download_file(self, remote_path: str, local_path: str) -> None:
218 """
219 Downloads a file to the local file system.
220
221 :param remote_path: The logical path of the file in the storage provider.
222 :param local_path: The local path where the file should be downloaded.
223 """
224
225 if self._metadata_provider:
226 real_path, exists = self._metadata_provider.realpath(remote_path)
227 if not exists:
228 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.")
229 metadata = self._metadata_provider.get_object_metadata(remote_path)
230 self._storage_provider.download_file(real_path, local_path, metadata)
231 else:
232 self._storage_provider.download_file(remote_path, local_path)
233
234 @retry
235 def upload_file(self, remote_path: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None:
236 """
237 Uploads a file from the local file system.
238
239 :param remote_path: The logical path where the file should be stored.
240 :param local_path: The local path of the file to upload.
241 :param attributes: The attributes to add to the file.
242 """
243 virtual_path = remote_path
244 if self._metadata_provider:
245 remote_path, exists = self._metadata_provider.realpath(remote_path)
246 if exists:
247 raise FileExistsError(
248 f"The file at path '{virtual_path}' already exists; "
249 f"overwriting is not yet allowed when using a metadata provider."
250 )
251 if self._metadata_provider:
252 # if metdata provider is present, we only write attributes to the metadata provider
253 self._storage_provider.upload_file(remote_path, local_path, attributes=None)
254 obj_metadata = self._storage_provider.get_object_metadata(remote_path)
255 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
256 with self._metadata_provider_lock or contextlib.nullcontext():
257 self._metadata_provider.add_file(virtual_path, obj_metadata)
258 else:
259 self._storage_provider.upload_file(remote_path, local_path, attributes)
260
261 @retry
262 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None:
263 """
264 Writes an object at the specified path.
265
266 :param path: The logical path where the object should be written.
267 :param body: The content to write to the object.
268 :param attributes: The attributes to add to the file.
269 """
270 virtual_path = path
271 if self._metadata_provider:
272 path, exists = self._metadata_provider.realpath(path)
273 if exists:
274 raise FileExistsError(
275 f"The file at path '{virtual_path}' already exists; "
276 f"overwriting is not yet allowed when using a metadata provider."
277 )
278 if self._metadata_provider:
279 # if metadata provider is present, we only write attributes to the metadata provider
280 self._storage_provider.put_object(path, body, attributes=None)
281 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
282 obj_metadata = self._storage_provider.get_object_metadata(path)
283 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
284 with self._metadata_provider_lock or contextlib.nullcontext():
285 self._metadata_provider.add_file(virtual_path, obj_metadata)
286 else:
287 self._storage_provider.put_object(path, body, attributes=attributes)
288
[docs]
289 def copy(self, src_path: str, dest_path: str) -> None:
290 """
291 Copies an object from source to destination path.
292
293 :param src_path: The logical path of the source object to copy.
294 :param dest_path: The logical path of the destination.
295 """
296 virtual_dest_path = dest_path
297 if self._metadata_provider:
298 src_path, exists = self._metadata_provider.realpath(src_path)
299 if not exists:
300 raise FileNotFoundError(f"The file at path '{src_path}' was not found.")
301
302 dest_path, exists = self._metadata_provider.realpath(dest_path)
303 if exists:
304 raise FileExistsError(
305 f"The file at path '{virtual_dest_path}' already exists; "
306 f"overwriting is not yet allowed when using a metadata provider."
307 )
308
309 self._storage_provider.copy_object(src_path, dest_path)
310 if self._metadata_provider:
311 metadata = self._storage_provider.get_object_metadata(dest_path)
312 with self._metadata_provider_lock or contextlib.nullcontext():
313 self._metadata_provider.add_file(virtual_dest_path, metadata)
314
[docs]
315 def delete(self, path: str, recursive: bool = False) -> None:
316 """
317 Deletes an object at the specified path.
318
319 :param path: The logical path of the object to delete.
320 :param recursive: Whether to delete objects in the path recursively.
321 """
322 if recursive:
323 self.sync_from(
324 NullStorageClient(),
325 path,
326 path,
327 delete_unmatched_files=True,
328 num_worker_processes=1,
329 description="Deleting",
330 )
331 # If this is a posix storage provider, we need to also delete remaining directory stubs.
332 if self._is_posix_file_storage_provider():
333 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider)
334 posix_storage_provider.rmtree(path)
335 return
336
337 virtual_path = path
338 if self._metadata_provider:
339 path, exists = self._metadata_provider.realpath(path)
340 if not exists:
341 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.")
342 with self._metadata_provider_lock or contextlib.nullcontext():
343 self._metadata_provider.remove_file(virtual_path)
344
345 # Delete the cached file if it exists
346 if self._is_cache_enabled():
347 assert self._cache_manager is not None
348 self._cache_manager.delete(virtual_path)
349
350 self._storage_provider.delete_object(path)
351
[docs]
352 def glob(
353 self,
354 pattern: str,
355 include_url_prefix: bool = False,
356 attribute_filter_expression: Optional[str] = None,
357 ) -> list[str]:
358 """
359 Matches and retrieves a list of objects in the storage provider that
360 match the specified pattern.
361
362 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``).
363 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
364 :param attribute_filter_expression: The attribute filter expression to apply to the result.
365
366 :return: A list of object paths that match the pattern.
367 """
368 if self._metadata_provider:
369 results = self._metadata_provider.glob(pattern, attribute_filter_expression)
370 else:
371 results = self._storage_provider.glob(pattern, attribute_filter_expression)
372
373 if include_url_prefix:
374 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results]
375
376 return results
377
[docs]
378 def list(
379 self,
380 prefix: str = "",
381 start_after: Optional[str] = None,
382 end_at: Optional[str] = None,
383 include_directories: bool = False,
384 include_url_prefix: bool = False,
385 attribute_filter_expression: Optional[str] = None,
386 ) -> Iterator[ObjectMetadata]:
387 """
388 Lists objects in the storage provider under the specified prefix.
389
390 :param prefix: The prefix to list objects under.
391 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
392 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
393 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects.
394 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
395 :param attribute_filter_expression: The attribute filter expression to apply to the result.
396
397 :return: An iterator over objects.
398 """
399 if self._metadata_provider:
400 objects = self._metadata_provider.list_objects(
401 prefix, start_after, end_at, include_directories, attribute_filter_expression
402 )
403 else:
404 objects = self._storage_provider.list_objects(
405 prefix, start_after, end_at, include_directories, attribute_filter_expression
406 )
407
408 for object in objects:
409 if include_url_prefix:
410 if self.is_default_profile():
411 object.key = str(PurePosixPath("/") / object.key)
412 else:
413 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key)
414 yield object
415
[docs]
416 def open(
417 self,
418 path: str,
419 mode: str = "rb",
420 buffering: int = -1,
421 encoding: Optional[str] = None,
422 disable_read_cache: bool = False,
423 memory_load_limit: int = MEMORY_LOAD_LIMIT,
424 atomic: bool = True,
425 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
426 attributes: Optional[dict[str, str]] = None,
427 ) -> Union[PosixFile, ObjectFile]:
428 """
429 Returns a file-like object from the specified path.
430
431 :param path: The logical path of the object to read.
432 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported.
433 :param buffering: The buffering mode. Only applies to PosixFile.
434 :param encoding: The encoding to use for text files.
435 :param disable_read_cache: When set to True, disables caching for the file content.
436 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
437 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB.
438 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
439 :param atomic: When set to True, the file will be written atomically (rename upon close).
440 This parameter is only applicable to PosixFile in write mode.
441 :param check_source_version: Whether to check the source version of cached objects.
442 :param attributes: The attributes to add to the file. This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab".
443 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
444 """
445 if self._is_posix_file_storage_provider():
446 return PosixFile(
447 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes
448 )
449 else:
450 if atomic is False:
451 logger.warning("Non-atomic writes are not supported for object storage providers.")
452
453 return ObjectFile(
454 self,
455 remote_path=path,
456 mode=mode,
457 encoding=encoding,
458 disable_read_cache=disable_read_cache,
459 memory_load_limit=memory_load_limit,
460 check_source_version=check_source_version,
461 attributes=attributes,
462 )
463
[docs]
464 def is_file(self, path: str) -> bool:
465 """
466 Checks whether the specified path points to a file (rather than a directory or folder).
467
468 :param path: The logical path to check.
469
470 :return: ``True`` if the path points to a file, ``False`` otherwise.
471 """
472 if self._metadata_provider:
473 _, exists = self._metadata_provider.realpath(path)
474 return exists
475 return self._storage_provider.is_file(path)
476
494
[docs]
495 def is_empty(self, path: str) -> bool:
496 """
497 Checks whether the specified path is empty. A path is considered empty if there are no
498 objects whose keys start with the given path as a prefix.
499
500 :param path: The logical path to check. This is typically a prefix representing a directory or folder.
501
502 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
503 """
504 if self._metadata_provider:
505 objects = self._metadata_provider.list_objects(path)
506 else:
507 objects = self._storage_provider.list_objects(path)
508
509 try:
510 return next(objects) is None
511 except StopIteration:
512 pass
513 return True
514
515 def __getstate__(self) -> dict[str, Any]:
516 state = self.__dict__.copy()
517 del state["_credentials_provider"]
518 del state["_storage_provider"]
519 del state["_metadata_provider"]
520 del state["_cache_manager"]
521 if "_metadata_provider_lock" in state:
522 del state["_metadata_provider_lock"]
523 if "_replicas" in state:
524 del state["_replicas"]
525 return state
526
527 def __setstate__(self, state: dict[str, Any]) -> None:
528 config = state["_config"]
529 self._initialize_providers(config)
530 self._initialize_replicas(config.replicas)
531 if self._metadata_provider:
532 self._metadata_provider_lock = threading.Lock()
533
[docs]
534 def sync_from(
535 self,
536 source_client: "StorageClient",
537 source_path: str = "",
538 target_path: str = "",
539 delete_unmatched_files: bool = False,
540 description: str = "Syncing",
541 num_worker_processes: Optional[int] = None,
542 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
543 ) -> None:
544 """
545 Syncs files from the source storage client to "path/".
546
547 :param source_client: The source storage client.
548 :param source_path: The logical path to sync from.
549 :param target_path: The logical path to sync to.
550 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
551 :param description: Description of sync process for logging purposes.
552 :param num_worker_processes: The number of worker processes to use.
553 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
554 """
555 m = SyncManager(source_client, source_path, self, target_path)
556 m.sync_objects(
557 execution_mode=execution_mode,
558 description=description,
559 num_worker_processes=num_worker_processes,
560 delete_unmatched_files=delete_unmatched_files,
561 )
562
[docs]
563 def sync_replicas(
564 self,
565 source_path: str,
566 replica_indices: Optional[List[int]] = None,
567 delete_unmatched_files: bool = False,
568 description: str = "Syncing replicas",
569 num_worker_processes: Optional[int] = None,
570 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
571 ) -> None:
572 """
573 Sync files from the source storage client to target replicas.
574
575 :param source_path: The logical path to sync from.
576 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0.
577 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
578 :param description: Description of sync process for logging purposes.
579 :param num_worker_processes: The number of worker processes to use.
580 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
581 """
582 if not self._replicas:
583 logger.warning(
584 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable "
585 "secondary storage locations for redundancy and performance.",
586 self._config.profile,
587 )
588 return None
589
590 if replica_indices:
591 try:
592 replicas = [self._replicas[i] for i in replica_indices]
593 except IndexError as e:
594 raise ValueError(f"Replica index out of range: {replica_indices}") from e
595 else:
596 replicas = self._replicas
597
598 for replica in replicas:
599 replica.sync_from(
600 self,
601 source_path,
602 source_path,
603 delete_unmatched_files=delete_unmatched_files,
604 description=description,
605 num_worker_processes=num_worker_processes,
606 execution_mode=execution_mode,
607 )