Source code for multistorageclient.client

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import os
 17from typing import Any, Dict, Iterator, List, Optional, Union
 18
 19from .config import StorageClientConfig
 20from .file import ObjectFile, PosixFile
 21from .instrumentation.utils import instrumented
 22from .providers.posix_file import PosixFileStorageProvider
 23from .retry import retry
 24from .types import MSC_PROTOCOL, ObjectMetadata, Range
 25from .utils import join_paths
 26
 27
[docs] 28@instrumented 29class StorageClient: 30 """ 31 A client for interacting with different storage providers. 32 """ 33 34 _config: StorageClientConfig 35 36 def __init__(self, config: StorageClientConfig): 37 """ 38 Initializes the :py:class:`StorageClient` with the given configuration. 39 40 :param config: The configuration object for the storage client. 41 """ 42 self._initialize_providers(config) 43 44 def _initialize_providers(self, config: StorageClientConfig) -> None: 45 self._config = config 46 self._credentials_provider = self._config.credentials_provider 47 self._storage_provider = self._config.storage_provider 48 self._metadata_provider = self._config.metadata_provider 49 self._cache_config = self._config.cache_config 50 self._retry_config = self._config.retry_config 51 self._cache_manager = self._config.cache_manager 52 53 def _build_cache_path(self, path: str) -> str: 54 """ 55 Build cache path with or without etag. 56 """ 57 cache_path = f"{path}:{None}" 58 59 if self._metadata_provider: 60 if self._cache_manager and self._cache_manager.use_etag(): 61 metadata = self._metadata_provider.get_object_metadata(path) 62 cache_path = f"{path}:{metadata.etag}" 63 else: 64 if self._cache_manager and self._cache_manager.use_etag(): 65 metadata = self._storage_provider.get_object_metadata(path) 66 cache_path = f"{path}:{metadata.etag}" 67 68 return cache_path 69 70 def _is_cache_enabled(self) -> bool: 71 return self._cache_manager is not None and not self._is_posix_file_storage_provider() 72 73 def _is_posix_file_storage_provider(self) -> bool: 74 return isinstance(self._storage_provider, PosixFileStorageProvider) 75 76 @property 77 def profile(self) -> str: 78 return self._config.profile 79 80 @retry 81 def read(self, path: str, byte_range: Optional[Range] = None) -> bytes: 82 """ 83 Reads an object from the storage provider at the specified path. 84 85 :param path: The path of the object to read. 86 :return: The content of the object. 87 """ 88 if self._metadata_provider: 89 path, exists = self._metadata_provider.realpath(path) 90 if not exists: 91 raise FileNotFoundError(f"The file at path '{path}' was not found.") 92 93 # Read from cache if the file exists 94 if self._is_cache_enabled(): 95 assert self._cache_manager is not None 96 cache_path = self._build_cache_path(path) 97 data = self._cache_manager.read(cache_path) 98 99 if data: 100 if byte_range: 101 return data[byte_range.offset : byte_range.offset + byte_range.size] 102 else: 103 return data 104 else: 105 # Only cache the entire file 106 if byte_range is None: 107 data = self._storage_provider.get_object(path) 108 self._cache_manager.set(cache_path, data) 109 return data 110 111 return self._storage_provider.get_object(path, byte_range=byte_range) 112
[docs] 113 def info(self, path: str) -> ObjectMetadata: 114 """ 115 Retrieves metadata or information about an object stored at the specified path. 116 117 :param path: The path to the object for which metadata or information is being retrieved. 118 119 :return: A dictionary containing metadata or information about the object. 120 """ 121 if self._metadata_provider: 122 return self._metadata_provider.get_object_metadata(path) 123 else: 124 return self._storage_provider.get_object_metadata(path)
125 126 @retry 127 def download_file(self, remote_path: str, local_path: str) -> None: 128 """ 129 Downloads a file from the storage provider to the local file system. 130 131 :param remote_path: The path of the file in the storage provider. 132 :param local_path: The local path where the file should be downloaded. 133 """ 134 135 if self._metadata_provider: 136 real_path, exists = self._metadata_provider.realpath(remote_path) 137 if not exists: 138 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.") 139 metadata = self._metadata_provider.get_object_metadata(remote_path) 140 self._storage_provider.download_file(real_path, local_path, metadata) 141 else: 142 self._storage_provider.download_file(remote_path, local_path) 143 144 @retry 145 def upload_file(self, remote_path: str, local_path: str) -> None: 146 """ 147 Uploads a file from the local file system to the storage provider. 148 149 :param remote_path: The path where the file should be stored in the storage provider. 150 :param local_path: The local path of the file to upload. 151 """ 152 if self._metadata_provider: 153 remote_path, exists = self._metadata_provider.realpath(remote_path) 154 if exists: 155 raise FileExistsError( 156 f"The file at path '{remote_path}' already exists; " 157 f"overwriting is not yet allowed when using a metadata provider." 158 ) 159 self._storage_provider.upload_file(remote_path, local_path) 160 if self._metadata_provider: 161 metadata = self._storage_provider.get_object_metadata(remote_path) 162 self._metadata_provider.add_file(remote_path, metadata) 163 164 @retry 165 def write(self, path: str, body: bytes) -> None: 166 """ 167 Writes an object to the storage provider at the specified path. 168 169 :param path: The path where the object should be written. 170 :param body: The content to write to the object. 171 """ 172 if self._metadata_provider: 173 path, exists = self._metadata_provider.realpath(path) 174 if exists: 175 raise FileExistsError( 176 f"The file at path '{path}' already exists; " 177 f"overwriting is not yet allowed when using a metadata provider." 178 ) 179 self._storage_provider.put_object(path, body) 180 if self._metadata_provider: 181 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 182 metadata = self._storage_provider.get_object_metadata(path) 183 self._metadata_provider.add_file(path, metadata) 184
[docs] 185 def copy(self, src_path: str, dest_path: str) -> None: 186 """ 187 Copies an object from source to destination in the storage provider. 188 189 :param src_path: The path of the source object to copy. 190 :param dest_path: The path of the destination. 191 """ 192 if self._metadata_provider: 193 dest_path, exists = self._metadata_provider.realpath(dest_path) 194 if exists: 195 raise FileExistsError( 196 f"The file at path '{dest_path}' already exists; " 197 f"overwriting is not yet allowed when using a metadata provider." 198 ) 199 200 self._storage_provider.copy_object(src_path, dest_path) 201 if self._metadata_provider: 202 metadata = self._storage_provider.get_object_metadata(dest_path) 203 self._metadata_provider.add_file(dest_path, metadata)
204
[docs] 205 def delete(self, path: str) -> None: 206 """ 207 Deletes an object from the storage provider at the specified path. 208 209 :param path: The path of the object to delete. 210 """ 211 if self._metadata_provider: 212 path, exists = self._metadata_provider.realpath(path) 213 if not exists: 214 raise FileNotFoundError(f"The file at path '{path}' was not found.") 215 self._metadata_provider.remove_file(path) 216 217 self._storage_provider.delete_object(path) 218 219 # Delete cached files 220 if self._is_cache_enabled(): 221 assert self._cache_manager is not None 222 cache_path = self._build_cache_path(path) 223 self._cache_manager.delete(cache_path)
224
[docs] 225 def glob(self, pattern: str, include_url_prefix: bool = False) -> List[str]: 226 """ 227 Matches and retrieves a list of objects in the storage provider that 228 match the specified pattern. 229 230 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``). 231 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 232 233 :return: A list of object paths that match the pattern. 234 """ 235 if self._metadata_provider: 236 results = self._metadata_provider.glob(pattern) 237 else: 238 results = self._storage_provider.glob(pattern) 239 240 if include_url_prefix: 241 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 242 243 return results
244
[docs] 245 def list( 246 self, 247 prefix: str = "", 248 start_after: Optional[str] = None, 249 end_at: Optional[str] = None, 250 include_directories: bool = False, 251 include_url_prefix: bool = False, 252 ) -> Iterator[ObjectMetadata]: 253 """ 254 Lists objects in the storage provider under the specified prefix. 255 256 :param prefix: The prefix to list objects under. 257 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 258 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 259 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 260 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 261 262 :return: An iterator over objects. 263 """ 264 if self._metadata_provider: 265 objects = self._metadata_provider.list_objects(prefix, start_after, end_at) 266 else: 267 objects = self._storage_provider.list_objects(prefix, start_after, end_at, include_directories) 268 269 for object in objects: 270 if include_url_prefix: 271 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key) 272 yield object
273
[docs] 274 def open( 275 self, path: str, mode: str = "rb", encoding: Optional[str] = None, disable_read_cache: bool = False 276 ) -> Union[PosixFile, ObjectFile]: 277 """ 278 Returns a file-like object from the storage provider at the specified path. 279 280 :param path: The path of the object to read. 281 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported. 282 :param encoding: The encoding to use for text files. 283 :param disable_read_cache: When set to True, disables caching for the file content. This parameter is only applicable when the mode is "r" or "rb". 284 285 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 286 """ 287 if self._metadata_provider: 288 path, exists = self._metadata_provider.realpath(path) 289 if "w" in mode and exists: 290 raise FileExistsError(f"The file at path '{path}' already exists.") 291 if "r" in mode and not exists: 292 raise FileNotFoundError(f"The file at path '{path}' was not found.") 293 294 if self._is_posix_file_storage_provider(): 295 realpath = self._storage_provider._realpath(path) # type: ignore 296 return PosixFile(path=realpath, mode=mode, encoding=encoding) 297 else: 298 cache_manager = self._cache_manager 299 if disable_read_cache: 300 cache_manager = None 301 return ObjectFile( 302 self._storage_provider, 303 remote_path=path, 304 mode=mode, 305 encoding=encoding, 306 cache_manager=cache_manager, 307 metadata_provider=self._metadata_provider, 308 )
309
[docs] 310 def is_file(self, path: str) -> bool: 311 """ 312 Checks whether the specified path points to a file (rather than a directory or folder). 313 314 :param path: The path to check. 315 316 :return: ``True`` if the path points to a file, ``False`` otherwise. 317 """ 318 if self._metadata_provider: 319 _, exists = self._metadata_provider.realpath(path) 320 return exists 321 return self._storage_provider.is_file(path)
322
[docs] 323 def commit_updates(self, prefix: Optional[str] = None) -> None: 324 """ 325 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 326 327 :param prefix: If provided, scans the prefix to find files to commit. 328 """ 329 if self._metadata_provider: 330 if prefix: 331 for obj in self._storage_provider.list_objects(prefix=prefix): 332 fullpath = os.path.join(prefix, obj.key) 333 self._metadata_provider.add_file(fullpath, obj) 334 self._metadata_provider.commit_updates()
335
[docs] 336 def is_empty(self, path: str) -> bool: 337 """ 338 Checks whether the specified path is empty. A path is considered empty if there are no 339 objects whose keys start with the given path as a prefix. 340 341 :param path: The path to check. This is typically a prefix representing a directory or folder. 342 343 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 344 """ 345 objects = self._storage_provider.list_objects(path) 346 try: 347 return next(objects) is None 348 except StopIteration: 349 pass 350 return True
351 352 def __getstate__(self) -> Dict[str, Any]: 353 state = self.__dict__.copy() 354 del state["_credentials_provider"] 355 del state["_storage_provider"] 356 del state["_metadata_provider"] 357 del state["_cache_manager"] 358 return state 359 360 def __setstate__(self, state: Dict[str, Any]) -> None: 361 config = state["_config"] 362 self._initialize_providers(config)