1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import os
17from typing import Any, Dict, Iterator, List, Optional, Union
18
19from .config import StorageClientConfig
20from .file import ObjectFile, PosixFile
21from .instrumentation.utils import instrumented
22from .providers.posix_file import PosixFileStorageProvider
23from .retry import retry
24from .types import MSC_PROTOCOL, ObjectMetadata, Range
25from .utils import join_paths
26
27
[docs]
28@instrumented
29class StorageClient:
30 """
31 A client for interacting with different storage providers.
32 """
33
34 _config: StorageClientConfig
35
36 def __init__(self, config: StorageClientConfig):
37 """
38 Initializes the :py:class:`StorageClient` with the given configuration.
39
40 :param config: The configuration object for the storage client.
41 """
42 self._initialize_providers(config)
43
44 def _initialize_providers(self, config: StorageClientConfig) -> None:
45 self._config = config
46 self._credentials_provider = self._config.credentials_provider
47 self._storage_provider = self._config.storage_provider
48 self._metadata_provider = self._config.metadata_provider
49 self._cache_config = self._config.cache_config
50 self._retry_config = self._config.retry_config
51 self._cache_manager = self._config.cache_manager
52
53 def _build_cache_path(self, path: str) -> str:
54 """
55 Build cache path with or without etag.
56 """
57 cache_path = f"{path}:{None}"
58
59 if self._metadata_provider:
60 if self._cache_manager and self._cache_manager.use_etag():
61 metadata = self._metadata_provider.get_object_metadata(path)
62 cache_path = f"{path}:{metadata.etag}"
63 else:
64 if self._cache_manager and self._cache_manager.use_etag():
65 metadata = self._storage_provider.get_object_metadata(path)
66 cache_path = f"{path}:{metadata.etag}"
67
68 return cache_path
69
70 def _is_cache_enabled(self) -> bool:
71 return self._cache_manager is not None and not self._is_posix_file_storage_provider()
72
73 def _is_posix_file_storage_provider(self) -> bool:
74 return isinstance(self._storage_provider, PosixFileStorageProvider)
75
76 @property
77 def profile(self) -> str:
78 return self._config.profile
79
80 @retry
81 def read(self, path: str, byte_range: Optional[Range] = None) -> bytes:
82 """
83 Reads an object from the storage provider at the specified path.
84
85 :param path: The path of the object to read.
86 :return: The content of the object.
87 """
88 if self._metadata_provider:
89 path, exists = self._metadata_provider.realpath(path)
90 if not exists:
91 raise FileNotFoundError(f"The file at path '{path}' was not found.")
92
93 # Read from cache if the file exists
94 if self._is_cache_enabled():
95 assert self._cache_manager is not None
96 cache_path = self._build_cache_path(path)
97 data = self._cache_manager.read(cache_path)
98
99 if data:
100 if byte_range:
101 return data[byte_range.offset : byte_range.offset + byte_range.size]
102 else:
103 return data
104 else:
105 # Only cache the entire file
106 if byte_range is None:
107 data = self._storage_provider.get_object(path)
108 self._cache_manager.set(cache_path, data)
109 return data
110
111 return self._storage_provider.get_object(path, byte_range=byte_range)
112
[docs]
113 def info(self, path: str) -> ObjectMetadata:
114 """
115 Retrieves metadata or information about an object stored at the specified path.
116
117 :param path: The path to the object for which metadata or information is being retrieved.
118
119 :return: A dictionary containing metadata or information about the object.
120 """
121 if self._metadata_provider:
122 return self._metadata_provider.get_object_metadata(path)
123 else:
124 return self._storage_provider.get_object_metadata(path)
125
126 @retry
127 def download_file(self, remote_path: str, local_path: str) -> None:
128 """
129 Downloads a file from the storage provider to the local file system.
130
131 :param remote_path: The path of the file in the storage provider.
132 :param local_path: The local path where the file should be downloaded.
133 """
134
135 if self._metadata_provider:
136 real_path, exists = self._metadata_provider.realpath(remote_path)
137 if not exists:
138 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.")
139 metadata = self._metadata_provider.get_object_metadata(remote_path)
140 self._storage_provider.download_file(real_path, local_path, metadata)
141 else:
142 self._storage_provider.download_file(remote_path, local_path)
143
144 @retry
145 def upload_file(self, remote_path: str, local_path: str) -> None:
146 """
147 Uploads a file from the local file system to the storage provider.
148
149 :param remote_path: The path where the file should be stored in the storage provider.
150 :param local_path: The local path of the file to upload.
151 """
152 if self._metadata_provider:
153 remote_path, exists = self._metadata_provider.realpath(remote_path)
154 if exists:
155 raise FileExistsError(
156 f"The file at path '{remote_path}' already exists; "
157 f"overwriting is not yet allowed when using a metadata provider."
158 )
159 self._storage_provider.upload_file(remote_path, local_path)
160 if self._metadata_provider:
161 metadata = self._storage_provider.get_object_metadata(remote_path)
162 self._metadata_provider.add_file(remote_path, metadata)
163
164 @retry
165 def write(self, path: str, body: bytes) -> None:
166 """
167 Writes an object to the storage provider at the specified path.
168
169 :param path: The path where the object should be written.
170 :param body: The content to write to the object.
171 """
172 if self._metadata_provider:
173 path, exists = self._metadata_provider.realpath(path)
174 if exists:
175 raise FileExistsError(
176 f"The file at path '{path}' already exists; "
177 f"overwriting is not yet allowed when using a metadata provider."
178 )
179 self._storage_provider.put_object(path, body)
180 if self._metadata_provider:
181 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
182 metadata = self._storage_provider.get_object_metadata(path)
183 self._metadata_provider.add_file(path, metadata)
184
[docs]
185 def copy(self, src_path: str, dest_path: str) -> None:
186 """
187 Copies an object from source to destination in the storage provider.
188
189 :param src_path: The path of the source object to copy.
190 :param dest_path: The path of the destination.
191 """
192 if self._metadata_provider:
193 dest_path, exists = self._metadata_provider.realpath(dest_path)
194 if exists:
195 raise FileExistsError(
196 f"The file at path '{dest_path}' already exists; "
197 f"overwriting is not yet allowed when using a metadata provider."
198 )
199
200 self._storage_provider.copy_object(src_path, dest_path)
201 if self._metadata_provider:
202 metadata = self._storage_provider.get_object_metadata(dest_path)
203 self._metadata_provider.add_file(dest_path, metadata)
204
[docs]
205 def delete(self, path: str) -> None:
206 """
207 Deletes an object from the storage provider at the specified path.
208
209 :param path: The path of the object to delete.
210 """
211 if self._metadata_provider:
212 path, exists = self._metadata_provider.realpath(path)
213 if not exists:
214 raise FileNotFoundError(f"The file at path '{path}' was not found.")
215 self._metadata_provider.remove_file(path)
216
217 self._storage_provider.delete_object(path)
218
219 # Delete cached files
220 if self._is_cache_enabled():
221 assert self._cache_manager is not None
222 cache_path = self._build_cache_path(path)
223 self._cache_manager.delete(cache_path)
224
[docs]
225 def glob(self, pattern: str, include_url_prefix: bool = False) -> List[str]:
226 """
227 Matches and retrieves a list of objects in the storage provider that
228 match the specified pattern.
229
230 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``).
231 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
232
233 :return: A list of object paths that match the pattern.
234 """
235 if self._metadata_provider:
236 results = self._metadata_provider.glob(pattern)
237 else:
238 results = self._storage_provider.glob(pattern)
239
240 if include_url_prefix:
241 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results]
242
243 return results
244
[docs]
245 def list(
246 self,
247 prefix: str = "",
248 start_after: Optional[str] = None,
249 end_at: Optional[str] = None,
250 include_directories: bool = False,
251 include_url_prefix: bool = False,
252 ) -> Iterator[ObjectMetadata]:
253 """
254 Lists objects in the storage provider under the specified prefix.
255
256 :param prefix: The prefix to list objects under.
257 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
258 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
259 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects.
260 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
261
262 :return: An iterator over objects.
263 """
264 if self._metadata_provider:
265 objects = self._metadata_provider.list_objects(prefix, start_after, end_at)
266 else:
267 objects = self._storage_provider.list_objects(prefix, start_after, end_at, include_directories)
268
269 for object in objects:
270 if include_url_prefix:
271 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key)
272 yield object
273
[docs]
274 def open(
275 self, path: str, mode: str = "rb", encoding: Optional[str] = None, disable_read_cache: bool = False
276 ) -> Union[PosixFile, ObjectFile]:
277 """
278 Returns a file-like object from the storage provider at the specified path.
279
280 :param path: The path of the object to read.
281 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported.
282 :param encoding: The encoding to use for text files.
283 :param disable_read_cache: When set to True, disables caching for the file content. This parameter is only applicable when the mode is "r" or "rb".
284
285 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
286 """
287 if self._metadata_provider:
288 path, exists = self._metadata_provider.realpath(path)
289 if "w" in mode and exists:
290 raise FileExistsError(f"The file at path '{path}' already exists.")
291 if "r" in mode and not exists:
292 raise FileNotFoundError(f"The file at path '{path}' was not found.")
293
294 if self._is_posix_file_storage_provider():
295 realpath = self._storage_provider._realpath(path) # type: ignore
296 return PosixFile(path=realpath, mode=mode, encoding=encoding)
297 else:
298 cache_manager = self._cache_manager
299 if disable_read_cache:
300 cache_manager = None
301 return ObjectFile(
302 self._storage_provider,
303 remote_path=path,
304 mode=mode,
305 encoding=encoding,
306 cache_manager=cache_manager,
307 metadata_provider=self._metadata_provider,
308 )
309
[docs]
310 def is_file(self, path: str) -> bool:
311 """
312 Checks whether the specified path points to a file (rather than a directory or folder).
313
314 :param path: The path to check.
315
316 :return: ``True`` if the path points to a file, ``False`` otherwise.
317 """
318 if self._metadata_provider:
319 _, exists = self._metadata_provider.realpath(path)
320 return exists
321 return self._storage_provider.is_file(path)
322
[docs]
323 def commit_updates(self, prefix: Optional[str] = None) -> None:
324 """
325 Commits any pending updates to the metadata provider. No-op if not using a metadata provider.
326
327 :param prefix: If provided, scans the prefix to find files to commit.
328 """
329 if self._metadata_provider:
330 if prefix:
331 for obj in self._storage_provider.list_objects(prefix=prefix):
332 fullpath = os.path.join(prefix, obj.key)
333 self._metadata_provider.add_file(fullpath, obj)
334 self._metadata_provider.commit_updates()
335
[docs]
336 def is_empty(self, path: str) -> bool:
337 """
338 Checks whether the specified path is empty. A path is considered empty if there are no
339 objects whose keys start with the given path as a prefix.
340
341 :param path: The path to check. This is typically a prefix representing a directory or folder.
342
343 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
344 """
345 objects = self._storage_provider.list_objects(path)
346 try:
347 return next(objects) is None
348 except StopIteration:
349 pass
350 return True
351
352 def __getstate__(self) -> Dict[str, Any]:
353 state = self.__dict__.copy()
354 del state["_credentials_provider"]
355 del state["_storage_provider"]
356 del state["_metadata_provider"]
357 del state["_cache_manager"]
358 return state
359
360 def __setstate__(self, state: Dict[str, Any]) -> None:
361 config = state["_config"]
362 self._initialize_providers(config)