Source code for multistorageclient.client

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import atexit
 17import contextlib
 18import logging
 19import os
 20import threading
 21from collections.abc import Iterator
 22from pathlib import PurePosixPath
 23from typing import Any, List, Optional, Union, cast
 24
 25from .config import StorageClientConfig
 26from .constants import MEMORY_LOAD_LIMIT
 27from .file import ObjectFile, PosixFile
 28from .instrumentation.utils import instrumented
 29from .providers.posix_file import PosixFileStorageProvider
 30from .retry import retry
 31from .sync import SyncManager
 32from .types import MSC_PROTOCOL, ExecutionMode, ObjectMetadata, Range, Replica, SourceVersionCheckMode
 33from .utils import NullStorageClient, join_paths
 34
 35logger = logging.getLogger(__name__)
 36
 37
[docs] 38@instrumented 39class StorageClient: 40 """ 41 A client for interacting with different storage providers. 42 """ 43 44 _config: StorageClientConfig 45 _metadata_provider_lock: Optional[threading.Lock] = None 46 _stop_event: Optional[threading.Event] = None 47 48 def __init__(self, config: StorageClientConfig): 49 """ 50 Initializes the :py:class:`StorageClient` with the given configuration. 51 52 :param config: The configuration object for the storage client. 53 """ 54 self._initialize_providers(config) 55 self._initialize_replicas(config.replicas) 56 57 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event): 58 if not stop_event: 59 raise RuntimeError("Stop event not set") 60 61 while not stop_event.is_set(): 62 # Wait with the ability to exit early 63 if stop_event.wait(timeout=commit_interval_minutes * 60): 64 break 65 logger.debug("Auto-committing to metadata provider") 66 self.commit_metadata() 67 68 def _commit_on_exit(self): 69 logger.debug("Shutting down, committing metadata one last time...") 70 self.commit_metadata() 71 72 def _initialize_providers(self, config: StorageClientConfig) -> None: 73 self._config = config 74 self._credentials_provider = self._config.credentials_provider 75 self._storage_provider = self._config.storage_provider 76 self._metadata_provider = self._config.metadata_provider 77 self._cache_config = self._config.cache_config 78 self._retry_config = self._config.retry_config 79 self._cache_manager = self._config.cache_manager 80 self._autocommit_config = self._config.autocommit_config 81 82 if self._autocommit_config: 83 if self._metadata_provider: 84 logger.debug("Creating auto-commiter thread") 85 86 if self._autocommit_config.interval_minutes: 87 self._stop_event = threading.Event() 88 self._commit_thread = threading.Thread( 89 target=self._committer_thread, 90 daemon=True, 91 args=(self._autocommit_config.interval_minutes, self._stop_event), 92 ) 93 self._commit_thread.start() 94 95 if self._autocommit_config.at_exit: 96 atexit.register(self._commit_on_exit) 97 98 self._metadata_provider_lock = threading.Lock() 99 else: 100 logger.debug("No metadata provider configured, auto-commit will not be enabled") 101 102 def __del__(self): 103 if self._stop_event: 104 self._stop_event.set() 105 106 def _get_source_version(self, path: str) -> Optional[str]: 107 """ 108 Get etag from metadata provider or storage provider. 109 """ 110 if self._metadata_provider: 111 metadata = self._metadata_provider.get_object_metadata(path) 112 else: 113 metadata = self._storage_provider.get_object_metadata(path) 114 return metadata.etag 115 116 def _is_cache_enabled(self) -> bool: 117 return self._cache_manager is not None and not self._is_posix_file_storage_provider() 118 119 def _is_posix_file_storage_provider(self) -> bool: 120 return isinstance(self._storage_provider, PosixFileStorageProvider) 121
[docs] 122 def is_default_profile(self) -> bool: 123 """ 124 Return True if the storage client is using the default profile. 125 """ 126 return self._config.profile == "default"
127 128 @property 129 def profile(self) -> str: 130 return self._config.profile 131 132 def _initialize_replicas(self, replicas: list[Replica]) -> None: 133 """Initialize replica StorageClient instances.""" 134 # Sort replicas by read_priority, the first one is the primary replica 135 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority) 136 137 replica_clients = [] 138 for replica in sorted_replicas: 139 if self._config._config_dict is None: 140 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config") 141 replica_config = StorageClientConfig.from_dict( 142 config_dict=self._config._config_dict, profile=replica.replica_profile 143 ) 144 145 storage_client = StorageClient(config=replica_config) 146 replica_clients.append(storage_client) 147 148 self._replicas = replica_clients 149 150 @property 151 def replicas(self) -> list["StorageClient"]: 152 """Return StorageClient instances for all replicas, sorted by read priority.""" 153 return self._replicas 154 155 @retry 156 def read(self, path: str, byte_range: Optional[Range] = None) -> bytes: 157 """ 158 Reads an object from the specified logical path. 159 160 :param path: The logical path of the object to read. 161 :return: The content of the object. 162 """ 163 if self._metadata_provider: 164 path, exists = self._metadata_provider.realpath(path) 165 if not exists: 166 raise FileNotFoundError(f"The file at path '{path}' was not found.") 167 168 # Never cache range-read requests 169 if byte_range: 170 return self._storage_provider.get_object(path, byte_range=byte_range) 171 172 # Read from cache if the file exists 173 if self._is_cache_enabled(): 174 assert self._cache_manager is not None 175 source_version = self._get_source_version(path) 176 data = self._cache_manager.read(path, source_version) 177 178 if data is None: 179 data = self._storage_provider.get_object(path) 180 self._cache_manager.set(path, data, source_version) 181 182 return data 183 184 return self._storage_provider.get_object(path, byte_range=byte_range) 185
[docs] 186 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 187 """ 188 Retrieves metadata or information about an object stored at the specified path. 189 190 :param path: The logical path to the object for which metadata or information is being retrieved. 191 :param strict: If True, performs additional validation to determine whether the path refers to a directory. 192 193 :return: A dictionary containing metadata about the object. 194 """ 195 if not self._metadata_provider: 196 return self._storage_provider.get_object_metadata(path, strict=strict) 197 198 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory. 199 try: 200 return self._metadata_provider.get_object_metadata(path) 201 except FileNotFoundError: 202 # Try listing from the parent to determine if path is a valid directory 203 parent = os.path.dirname(path.rstrip("/")) + "/" 204 parent = "" if parent == "/" else parent 205 target = path.rstrip("/") + "/" 206 207 try: 208 entries = self._metadata_provider.list_objects(parent, include_directories=True) 209 for entry in entries: 210 if entry.key == target and entry.type == "directory": 211 return entry 212 except Exception: 213 pass 214 raise # Raise original FileNotFoundError
215 216 @retry 217 def download_file(self, remote_path: str, local_path: str) -> None: 218 """ 219 Downloads a file to the local file system. 220 221 :param remote_path: The logical path of the file in the storage provider. 222 :param local_path: The local path where the file should be downloaded. 223 """ 224 225 if self._metadata_provider: 226 real_path, exists = self._metadata_provider.realpath(remote_path) 227 if not exists: 228 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.") 229 metadata = self._metadata_provider.get_object_metadata(remote_path) 230 self._storage_provider.download_file(real_path, local_path, metadata) 231 else: 232 self._storage_provider.download_file(remote_path, local_path) 233 234 @retry 235 def upload_file(self, remote_path: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None: 236 """ 237 Uploads a file from the local file system. 238 239 :param remote_path: The logical path where the file should be stored. 240 :param local_path: The local path of the file to upload. 241 :param attributes: The attributes to add to the file. 242 """ 243 virtual_path = remote_path 244 if self._metadata_provider: 245 remote_path, exists = self._metadata_provider.realpath(remote_path) 246 if exists: 247 raise FileExistsError( 248 f"The file at path '{virtual_path}' already exists; " 249 f"overwriting is not yet allowed when using a metadata provider." 250 ) 251 if self._metadata_provider: 252 # if metdata provider is present, we only write attributes to the metadata provider 253 self._storage_provider.upload_file(remote_path, local_path, attributes=None) 254 obj_metadata = self._storage_provider.get_object_metadata(remote_path) 255 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 256 with self._metadata_provider_lock or contextlib.nullcontext(): 257 self._metadata_provider.add_file(virtual_path, obj_metadata) 258 else: 259 self._storage_provider.upload_file(remote_path, local_path, attributes) 260 261 @retry 262 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None: 263 """ 264 Writes an object at the specified path. 265 266 :param path: The logical path where the object should be written. 267 :param body: The content to write to the object. 268 :param attributes: The attributes to add to the file. 269 """ 270 virtual_path = path 271 if self._metadata_provider: 272 path, exists = self._metadata_provider.realpath(path) 273 if exists: 274 raise FileExistsError( 275 f"The file at path '{virtual_path}' already exists; " 276 f"overwriting is not yet allowed when using a metadata provider." 277 ) 278 if self._metadata_provider: 279 # if metadata provider is present, we only write attributes to the metadata provider 280 self._storage_provider.put_object(path, body, attributes=None) 281 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 282 obj_metadata = self._storage_provider.get_object_metadata(path) 283 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {}) 284 with self._metadata_provider_lock or contextlib.nullcontext(): 285 self._metadata_provider.add_file(virtual_path, obj_metadata) 286 else: 287 self._storage_provider.put_object(path, body, attributes=attributes) 288
[docs] 289 def copy(self, src_path: str, dest_path: str) -> None: 290 """ 291 Copies an object from source to destination path. 292 293 :param src_path: The logical path of the source object to copy. 294 :param dest_path: The logical path of the destination. 295 """ 296 virtual_dest_path = dest_path 297 if self._metadata_provider: 298 src_path, exists = self._metadata_provider.realpath(src_path) 299 if not exists: 300 raise FileNotFoundError(f"The file at path '{src_path}' was not found.") 301 302 dest_path, exists = self._metadata_provider.realpath(dest_path) 303 if exists: 304 raise FileExistsError( 305 f"The file at path '{virtual_dest_path}' already exists; " 306 f"overwriting is not yet allowed when using a metadata provider." 307 ) 308 309 self._storage_provider.copy_object(src_path, dest_path) 310 if self._metadata_provider: 311 metadata = self._storage_provider.get_object_metadata(dest_path) 312 with self._metadata_provider_lock or contextlib.nullcontext(): 313 self._metadata_provider.add_file(virtual_dest_path, metadata)
314
[docs] 315 def delete(self, path: str, recursive: bool = False) -> None: 316 """ 317 Deletes an object at the specified path. 318 319 :param path: The logical path of the object to delete. 320 :param recursive: Whether to delete objects in the path recursively. 321 """ 322 if recursive: 323 self.sync_from( 324 NullStorageClient(), 325 path, 326 path, 327 delete_unmatched_files=True, 328 num_worker_processes=1, 329 description="Deleting", 330 ) 331 # If this is a posix storage provider, we need to also delete remaining directory stubs. 332 if self._is_posix_file_storage_provider(): 333 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider) 334 posix_storage_provider.rmtree(path) 335 return 336 337 virtual_path = path 338 if self._metadata_provider: 339 path, exists = self._metadata_provider.realpath(path) 340 if not exists: 341 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.") 342 with self._metadata_provider_lock or contextlib.nullcontext(): 343 self._metadata_provider.remove_file(virtual_path) 344 345 # Delete the cached file if it exists 346 if self._is_cache_enabled(): 347 assert self._cache_manager is not None 348 self._cache_manager.delete(virtual_path) 349 350 self._storage_provider.delete_object(path)
351
[docs] 352 def glob( 353 self, 354 pattern: str, 355 include_url_prefix: bool = False, 356 attribute_filter_expression: Optional[str] = None, 357 ) -> list[str]: 358 """ 359 Matches and retrieves a list of objects in the storage provider that 360 match the specified pattern. 361 362 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``). 363 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 364 :param attribute_filter_expression: The attribute filter expression to apply to the result. 365 366 :return: A list of object paths that match the pattern. 367 """ 368 if self._metadata_provider: 369 results = self._metadata_provider.glob(pattern, attribute_filter_expression) 370 else: 371 results = self._storage_provider.glob(pattern, attribute_filter_expression) 372 373 if include_url_prefix: 374 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 375 376 return results
377
[docs] 378 def list( 379 self, 380 prefix: str = "", 381 start_after: Optional[str] = None, 382 end_at: Optional[str] = None, 383 include_directories: bool = False, 384 include_url_prefix: bool = False, 385 attribute_filter_expression: Optional[str] = None, 386 ) -> Iterator[ObjectMetadata]: 387 """ 388 Lists objects in the storage provider under the specified prefix. 389 390 :param prefix: The prefix to list objects under. 391 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 392 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 393 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 394 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 395 :param attribute_filter_expression: The attribute filter expression to apply to the result. 396 397 :return: An iterator over objects. 398 """ 399 if self._metadata_provider: 400 objects = self._metadata_provider.list_objects( 401 prefix, start_after, end_at, include_directories, attribute_filter_expression 402 ) 403 else: 404 objects = self._storage_provider.list_objects( 405 prefix, start_after, end_at, include_directories, attribute_filter_expression 406 ) 407 408 for object in objects: 409 if include_url_prefix: 410 if self.is_default_profile(): 411 object.key = str(PurePosixPath("/") / object.key) 412 else: 413 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key) 414 yield object
415
[docs] 416 def open( 417 self, 418 path: str, 419 mode: str = "rb", 420 buffering: int = -1, 421 encoding: Optional[str] = None, 422 disable_read_cache: bool = False, 423 memory_load_limit: int = MEMORY_LOAD_LIMIT, 424 atomic: bool = True, 425 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 426 attributes: Optional[dict[str, str]] = None, 427 ) -> Union[PosixFile, ObjectFile]: 428 """ 429 Returns a file-like object from the specified path. 430 431 :param path: The logical path of the object to read. 432 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported. 433 :param buffering: The buffering mode. Only applies to PosixFile. 434 :param encoding: The encoding to use for text files. 435 :param disable_read_cache: When set to True, disables caching for the file content. 436 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 437 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 438 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 439 :param atomic: When set to True, the file will be written atomically (rename upon close). 440 This parameter is only applicable to PosixFile in write mode. 441 :param check_source_version: Whether to check the source version of cached objects. 442 :param attributes: The attributes to add to the file. This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". 443 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 444 """ 445 if self._is_posix_file_storage_provider(): 446 return PosixFile( 447 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes 448 ) 449 else: 450 if atomic is False: 451 logger.warning("Non-atomic writes are not supported for object storage providers.") 452 453 return ObjectFile( 454 self, 455 remote_path=path, 456 mode=mode, 457 encoding=encoding, 458 disable_read_cache=disable_read_cache, 459 memory_load_limit=memory_load_limit, 460 check_source_version=check_source_version, 461 attributes=attributes, 462 )
463
[docs] 464 def is_file(self, path: str) -> bool: 465 """ 466 Checks whether the specified path points to a file (rather than a directory or folder). 467 468 :param path: The logical path to check. 469 470 :return: ``True`` if the path points to a file, ``False`` otherwise. 471 """ 472 if self._metadata_provider: 473 _, exists = self._metadata_provider.realpath(path) 474 return exists 475 return self._storage_provider.is_file(path)
476
[docs] 477 def commit_metadata(self, prefix: Optional[str] = None) -> None: 478 """ 479 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 480 481 :param prefix: If provided, scans the prefix to find files to commit. 482 """ 483 if self._metadata_provider: 484 with self._metadata_provider_lock or contextlib.nullcontext(): 485 if prefix: 486 # The logical path for each item will be the physical path with 487 # the base physical path removed from the beginning. 488 physical_base, _ = self._metadata_provider.realpath("") 489 physical_prefix, _ = self._metadata_provider.realpath(prefix) 490 for obj in self._storage_provider.list_objects(prefix=physical_prefix): 491 virtual_path = obj.key[len(physical_base) :].lstrip("/") 492 self._metadata_provider.add_file(virtual_path, obj) 493 self._metadata_provider.commit_updates()
494
[docs] 495 def is_empty(self, path: str) -> bool: 496 """ 497 Checks whether the specified path is empty. A path is considered empty if there are no 498 objects whose keys start with the given path as a prefix. 499 500 :param path: The logical path to check. This is typically a prefix representing a directory or folder. 501 502 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 503 """ 504 if self._metadata_provider: 505 objects = self._metadata_provider.list_objects(path) 506 else: 507 objects = self._storage_provider.list_objects(path) 508 509 try: 510 return next(objects) is None 511 except StopIteration: 512 pass 513 return True
514 515 def __getstate__(self) -> dict[str, Any]: 516 state = self.__dict__.copy() 517 del state["_credentials_provider"] 518 del state["_storage_provider"] 519 del state["_metadata_provider"] 520 del state["_cache_manager"] 521 if "_metadata_provider_lock" in state: 522 del state["_metadata_provider_lock"] 523 if "_replicas" in state: 524 del state["_replicas"] 525 return state 526 527 def __setstate__(self, state: dict[str, Any]) -> None: 528 config = state["_config"] 529 self._initialize_providers(config) 530 self._initialize_replicas(config.replicas) 531 if self._metadata_provider: 532 self._metadata_provider_lock = threading.Lock() 533
[docs] 534 def sync_from( 535 self, 536 source_client: "StorageClient", 537 source_path: str = "", 538 target_path: str = "", 539 delete_unmatched_files: bool = False, 540 description: str = "Syncing", 541 num_worker_processes: Optional[int] = None, 542 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 543 ) -> None: 544 """ 545 Syncs files from the source storage client to "path/". 546 547 :param source_client: The source storage client. 548 :param source_path: The logical path to sync from. 549 :param target_path: The logical path to sync to. 550 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 551 :param description: Description of sync process for logging purposes. 552 :param num_worker_processes: The number of worker processes to use. 553 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 554 """ 555 m = SyncManager(source_client, source_path, self, target_path) 556 m.sync_objects( 557 execution_mode=execution_mode, 558 description=description, 559 num_worker_processes=num_worker_processes, 560 delete_unmatched_files=delete_unmatched_files, 561 )
562
[docs] 563 def sync_replicas( 564 self, 565 source_path: str, 566 replica_indices: Optional[List[int]] = None, 567 delete_unmatched_files: bool = False, 568 description: str = "Syncing replicas", 569 num_worker_processes: Optional[int] = None, 570 execution_mode: ExecutionMode = ExecutionMode.LOCAL, 571 ) -> None: 572 """ 573 Sync files from the source storage client to target replicas. 574 575 :param source_path: The logical path to sync from. 576 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0. 577 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 578 :param description: Description of sync process for logging purposes. 579 :param num_worker_processes: The number of worker processes to use. 580 :param execution_mode: The execution mode to use. Currently supports "local" and "ray". 581 """ 582 if not self._replicas: 583 logger.warning( 584 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable " 585 "secondary storage locations for redundancy and performance.", 586 self._config.profile, 587 ) 588 return None 589 590 if replica_indices: 591 try: 592 replicas = [self._replicas[i] for i in replica_indices] 593 except IndexError as e: 594 raise ValueError(f"Replica index out of range: {replica_indices}") from e 595 else: 596 replicas = self._replicas 597 598 for replica in replicas: 599 replica.sync_from( 600 self, 601 source_path, 602 source_path, 603 delete_unmatched_files=delete_unmatched_files, 604 description=description, 605 num_worker_processes=num_worker_processes, 606 execution_mode=execution_mode, 607 )