Source code for multistorageclient.client

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import logging
 17import multiprocessing
 18import os
 19import queue
 20import tempfile
 21import threading
 22from collections.abc import Iterator
 23from concurrent.futures import ThreadPoolExecutor
 24from enum import Enum
 25from typing import Any, Optional, Union, cast
 26
 27from .config import StorageClientConfig
 28from .constants import MEMORY_LOAD_LIMIT
 29from .file import ObjectFile, PosixFile
 30from .instrumentation.utils import instrumented
 31from .progress_bar import ProgressBar
 32from .providers.posix_file import PosixFileStorageProvider
 33from .retry import retry
 34from .types import MSC_PROTOCOL, ObjectMetadata, Range, SourceVersionCheckMode
 35from .utils import NullStorageClient, calculate_worker_processes_and_threads, join_paths
 36
 37logger = logging.Logger(__name__)
 38
 39
 40class _SyncOp(Enum):
 41    ADD = "add"
 42    DELETE = "delete"
 43    STOP = "stop"
 44
 45
[docs] 46@instrumented 47class StorageClient: 48 """ 49 A client for interacting with different storage providers. 50 """ 51 52 _config: StorageClientConfig 53 54 def __init__(self, config: StorageClientConfig): 55 """ 56 Initializes the :py:class:`StorageClient` with the given configuration. 57 58 :param config: The configuration object for the storage client. 59 """ 60 self._initialize_providers(config) 61 62 def _initialize_providers(self, config: StorageClientConfig) -> None: 63 self._config = config 64 self._credentials_provider = self._config.credentials_provider 65 self._storage_provider = self._config.storage_provider 66 self._metadata_provider = self._config.metadata_provider 67 self._cache_config = self._config.cache_config 68 self._retry_config = self._config.retry_config 69 self._cache_manager = self._config.cache_manager 70 71 def _build_cache_path(self, path: str) -> str: 72 """ 73 Build cache path with or without etag. 74 """ 75 cache_path = f"{path}:{None}" 76 77 if self._metadata_provider: 78 if self._cache_manager and self._cache_manager.use_etag(): 79 metadata = self._metadata_provider.get_object_metadata(path) 80 cache_path = f"{path}:{metadata.etag}" 81 else: 82 if self._cache_manager and self._cache_manager.use_etag(): 83 metadata = self._storage_provider.get_object_metadata(path) 84 cache_path = f"{path}:{metadata.etag}" 85 86 return cache_path 87 88 def _is_cache_enabled(self) -> bool: 89 return self._cache_manager is not None and not self._is_posix_file_storage_provider() 90 91 def _is_posix_file_storage_provider(self) -> bool: 92 return isinstance(self._storage_provider, PosixFileStorageProvider) 93
[docs] 94 def is_default_profile(self) -> bool: 95 """ 96 Return True if the storage client is using the default profile. 97 """ 98 return self._config.profile == "default"
99 100 @property 101 def profile(self) -> str: 102 return self._config.profile 103 104 @retry 105 def read(self, path: str, byte_range: Optional[Range] = None) -> bytes: 106 """ 107 Reads an object from the storage provider at the specified path. 108 109 :param path: The path of the object to read. 110 :return: The content of the object. 111 """ 112 if self._metadata_provider: 113 path, exists = self._metadata_provider.realpath(path) 114 if not exists: 115 raise FileNotFoundError(f"The file at path '{path}' was not found.") 116 117 # Never cache range-read requests 118 if byte_range: 119 return self._storage_provider.get_object(path, byte_range=byte_range) 120 121 # Read from cache if the file exists 122 if self._is_cache_enabled(): 123 assert self._cache_manager is not None 124 cache_path = self._build_cache_path(path) 125 data = self._cache_manager.read(cache_path) 126 127 if data is None: 128 data = self._storage_provider.get_object(path) 129 self._cache_manager.set(cache_path, data) 130 131 return data 132 133 return self._storage_provider.get_object(path, byte_range=byte_range) 134
[docs] 135 def info(self, path: str, strict: bool = True) -> ObjectMetadata: 136 """ 137 Retrieves metadata or information about an object stored at the specified path. 138 139 :param path: The path to the object for which metadata or information is being retrieved. 140 :param strict: If True, performs additional validation to determine whether the path refers to a directory. 141 142 :return: A dictionary containing metadata about the object. 143 """ 144 if not self._metadata_provider: 145 return self._storage_provider.get_object_metadata(path, strict=strict) 146 147 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory. 148 try: 149 return self._metadata_provider.get_object_metadata(path) 150 except FileNotFoundError: 151 # Try listing from the parent to determine if path is a valid directory 152 parent = os.path.dirname(path.rstrip("/")) + "/" 153 parent = "" if parent == "/" else parent 154 target = path.rstrip("/") + "/" 155 156 try: 157 entries = self._metadata_provider.list_objects(parent, include_directories=True) 158 for entry in entries: 159 if entry.key == target and entry.type == "directory": 160 return entry 161 except Exception: 162 pass 163 raise # Raise original FileNotFoundError
164 165 @retry 166 def download_file(self, remote_path: str, local_path: str) -> None: 167 """ 168 Downloads a file from the storage provider to the local file system. 169 170 :param remote_path: The path of the file in the storage provider. 171 :param local_path: The local path where the file should be downloaded. 172 """ 173 174 if self._metadata_provider: 175 real_path, exists = self._metadata_provider.realpath(remote_path) 176 if not exists: 177 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.") 178 metadata = self._metadata_provider.get_object_metadata(remote_path) 179 self._storage_provider.download_file(real_path, local_path, metadata) 180 else: 181 self._storage_provider.download_file(remote_path, local_path) 182 183 @retry 184 def upload_file(self, remote_path: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None: 185 """ 186 Uploads a file from the local file system to the storage provider. 187 188 :param remote_path: The path where the file should be stored in the storage provider. 189 :param local_path: The local path of the file to upload. 190 :param attributes: The attributes to add to the file. 191 """ 192 virtual_path = remote_path 193 if self._metadata_provider: 194 remote_path, exists = self._metadata_provider.realpath(remote_path) 195 if exists: 196 raise FileExistsError( 197 f"The file at path '{virtual_path}' already exists; " 198 f"overwriting is not yet allowed when using a metadata provider." 199 ) 200 self._storage_provider.upload_file(remote_path, local_path, attributes) 201 if self._metadata_provider: 202 metadata = self._storage_provider.get_object_metadata(remote_path) 203 self._metadata_provider.add_file(virtual_path, metadata) 204 205 @retry 206 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None: 207 """ 208 Writes an object to the storage provider at the specified path. 209 210 :param path: The path where the object should be written. 211 :param body: The content to write to the object. 212 :param attributes: The attributes to add to the file. 213 """ 214 virtual_path = path 215 if self._metadata_provider: 216 path, exists = self._metadata_provider.realpath(path) 217 if exists: 218 raise FileExistsError( 219 f"The file at path '{virtual_path}' already exists; " 220 f"overwriting is not yet allowed when using a metadata provider." 221 ) 222 self._storage_provider.put_object(path, body, attributes=attributes) 223 if self._metadata_provider: 224 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait. 225 metadata = self._storage_provider.get_object_metadata(path) 226 self._metadata_provider.add_file(virtual_path, metadata) 227
[docs] 228 def copy(self, src_path: str, dest_path: str) -> None: 229 """ 230 Copies an object from source to destination in the storage provider. 231 232 :param src_path: The virtual path of the source object to copy. 233 :param dest_path: The virtual path of the destination. 234 """ 235 virtual_dest_path = dest_path 236 if self._metadata_provider: 237 src_path, exists = self._metadata_provider.realpath(src_path) 238 if not exists: 239 raise FileNotFoundError(f"The file at path '{src_path}' was not found.") 240 241 dest_path, exists = self._metadata_provider.realpath(dest_path) 242 if exists: 243 raise FileExistsError( 244 f"The file at path '{virtual_dest_path}' already exists; " 245 f"overwriting is not yet allowed when using a metadata provider." 246 ) 247 248 self._storage_provider.copy_object(src_path, dest_path) 249 if self._metadata_provider: 250 metadata = self._storage_provider.get_object_metadata(dest_path) 251 self._metadata_provider.add_file(virtual_dest_path, metadata)
252
[docs] 253 def delete(self, path: str, recursive: bool = False) -> None: 254 """ 255 Deletes an object from the storage provider at the specified path. 256 257 :param path: The virtual path of the object to delete. 258 :param recursive: Whether to delete objects in the path recursively. 259 """ 260 if recursive: 261 self.sync_from( 262 NullStorageClient(), 263 path, 264 path, 265 delete_unmatched_files=True, 266 num_worker_processes=1, 267 description="Deleting", 268 ) 269 # If this is a posix storage provider, we need to also delete remaining directory stubs. 270 if self._is_posix_file_storage_provider(): 271 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider) 272 posix_storage_provider.rmtree(path) 273 return 274 275 virtual_path = path 276 if self._metadata_provider: 277 path, exists = self._metadata_provider.realpath(path) 278 if not exists: 279 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.") 280 self._metadata_provider.remove_file(virtual_path) 281 282 # Delete the cached file if it exists 283 if self._is_cache_enabled(): 284 assert self._cache_manager is not None 285 self._cache_manager.delete(virtual_path) 286 287 self._storage_provider.delete_object(path)
288
[docs] 289 def glob( 290 self, 291 pattern: str, 292 include_url_prefix: bool = False, 293 ) -> list[str]: 294 """ 295 Matches and retrieves a list of objects in the storage provider that 296 match the specified pattern. 297 298 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``). 299 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 300 301 :return: A list of object paths that match the pattern. 302 """ 303 if self._metadata_provider: 304 results = self._metadata_provider.glob(pattern) 305 else: 306 results = self._storage_provider.glob(pattern) 307 308 if include_url_prefix: 309 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results] 310 311 return results
312
[docs] 313 def list( 314 self, 315 prefix: str = "", 316 start_after: Optional[str] = None, 317 end_at: Optional[str] = None, 318 include_directories: bool = False, 319 include_url_prefix: bool = False, 320 ) -> Iterator[ObjectMetadata]: 321 """ 322 Lists objects in the storage provider under the specified prefix. 323 324 :param prefix: The prefix to list objects under. 325 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 326 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 327 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 328 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result. 329 330 :return: An iterator over objects. 331 """ 332 if self._metadata_provider: 333 objects = self._metadata_provider.list_objects(prefix, start_after, end_at, include_directories) 334 else: 335 objects = self._storage_provider.list_objects(prefix, start_after, end_at, include_directories) 336 337 for object in objects: 338 if include_url_prefix: 339 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key) 340 yield object
341
[docs] 342 def open( 343 self, 344 path: str, 345 mode: str = "rb", 346 buffering: int = -1, 347 encoding: Optional[str] = None, 348 disable_read_cache: bool = False, 349 memory_load_limit: int = MEMORY_LOAD_LIMIT, 350 atomic: bool = True, 351 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT, 352 attributes: Optional[dict[str, str]] = None, 353 ) -> Union[PosixFile, ObjectFile]: 354 """ 355 Returns a file-like object from the storage provider at the specified path. 356 357 :param path: The path of the object to read. 358 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported. 359 :param buffering: The buffering mode. Only applies to PosixFile. 360 :param encoding: The encoding to use for text files. 361 :param disable_read_cache: When set to True, disables caching for the file content. 362 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 363 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB. 364 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". 365 :param atomic: When set to True, the file will be written atomically (rename upon close). 366 This parameter is only applicable to PosixFile in write mode. 367 :param check_source_version: Whether to check the source version of cached objects. 368 :param attributes: The attributes to add to the file. This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". 369 :return: A file-like object (PosixFile or ObjectFile) for the specified path. 370 """ 371 if self._is_posix_file_storage_provider(): 372 return PosixFile( 373 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes 374 ) 375 else: 376 if atomic is False: 377 logger.warning("Non-atomic writes are not supported for object storage providers.") 378 379 return ObjectFile( 380 self, 381 remote_path=path, 382 mode=mode, 383 encoding=encoding, 384 disable_read_cache=disable_read_cache, 385 memory_load_limit=memory_load_limit, 386 check_source_version=check_source_version, 387 attributes=attributes, 388 )
389
[docs] 390 def is_file(self, path: str) -> bool: 391 """ 392 Checks whether the specified path points to a file (rather than a directory or folder). 393 394 :param path: The path to check. 395 396 :return: ``True`` if the path points to a file, ``False`` otherwise. 397 """ 398 if self._metadata_provider: 399 _, exists = self._metadata_provider.realpath(path) 400 return exists 401 return self._storage_provider.is_file(path)
402
[docs] 403 def commit_metadata(self, prefix: Optional[str] = None) -> None: 404 """ 405 Commits any pending updates to the metadata provider. No-op if not using a metadata provider. 406 407 :param prefix: If provided, scans the prefix to find files to commit. 408 """ 409 if self._metadata_provider: 410 if prefix: 411 # The virtual path for each item will be the physical path with 412 # the base physical path removed from the beginning. 413 physical_base, _ = self._metadata_provider.realpath("") 414 physical_prefix, _ = self._metadata_provider.realpath(prefix) 415 for obj in self._storage_provider.list_objects(prefix=physical_prefix): 416 virtual_path = obj.key[len(physical_base) :].lstrip("/") 417 self._metadata_provider.add_file(virtual_path, obj) 418 self._metadata_provider.commit_updates()
419
[docs] 420 def is_empty(self, path: str) -> bool: 421 """ 422 Checks whether the specified path is empty. A path is considered empty if there are no 423 objects whose keys start with the given path as a prefix. 424 425 :param path: The path to check. This is typically a prefix representing a directory or folder. 426 427 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise. 428 """ 429 if self._metadata_provider: 430 objects = self._metadata_provider.list_objects(path) 431 else: 432 objects = self._storage_provider.list_objects(path) 433 434 try: 435 return next(objects) is None 436 except StopIteration: 437 pass 438 return True
439 440 def __getstate__(self) -> dict[str, Any]: 441 state = self.__dict__.copy() 442 del state["_credentials_provider"] 443 del state["_storage_provider"] 444 del state["_metadata_provider"] 445 del state["_cache_manager"] 446 return state 447 448 def __setstate__(self, state: dict[str, Any]) -> None: 449 config = state["_config"] 450 self._initialize_providers(config) 451
[docs] 452 def sync_from( 453 self, 454 source_client: "StorageClient", 455 source_path: str = "", 456 target_path: str = "", 457 delete_unmatched_files: bool = False, 458 description: str = "Syncing", 459 num_worker_processes: Optional[int] = None, 460 ) -> None: 461 """ 462 Syncs files from the source storage client to "path/". 463 464 :param source_client: The source storage client. 465 :param source_path: The path to sync from. 466 :param target_path: The path to sync to. 467 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source. 468 :param description: Description of sync process for logging purposes. 469 :param num_worker_processes: The number of worker processes to use. 470 """ 471 source_path = source_path.lstrip("/") 472 target_path = target_path.lstrip("/") 473 474 if source_client == self and (source_path.startswith(target_path) or target_path.startswith(source_path)): 475 raise ValueError("Source and target paths cannot overlap on same StorageClient.") 476 477 logger.debug(f"Starting sync operation {description}") 478 479 # Attempt to balance the number of worker processes and threads. 480 num_worker_processes, num_worker_threads = calculate_worker_processes_and_threads(num_worker_processes) 481 482 if num_worker_processes == 1: 483 file_queue = queue.Queue(maxsize=100000) 484 result_queue = queue.Queue() 485 else: 486 manager = multiprocessing.Manager() 487 file_queue = manager.Queue(maxsize=100000) 488 result_queue = manager.Queue() 489 490 progress = ProgressBar(desc=description, show_progress=True, total_items=0) 491 492 def match_file_metadata(source_info: ObjectMetadata, target_info: ObjectMetadata) -> bool: 493 # If target and source have valid etags defined, use etag and file size to compare. 494 if source_info.etag and target_info.etag: 495 return source_info.etag == target_info.etag and source_info.content_length == target_info.content_length 496 # Else, check file size is the same and the target's last_modified is newer than the source. 497 return ( 498 source_info.content_length == target_info.content_length 499 and source_info.last_modified <= target_info.last_modified 500 ) 501 502 def producer(): 503 """Lists source files and adds them to the queue.""" 504 source_iter = iter(source_client.list(prefix=source_path)) 505 target_iter = iter(self.list(prefix=target_path)) 506 total_count = 0 507 508 source_file = next(source_iter, None) 509 target_file = next(target_iter, None) 510 511 while source_file or target_file: 512 # Update progress and count each pair (or single) considered for syncing 513 if total_count % 1000 == 0: 514 progress.update_total(total_count) 515 total_count += 1 516 517 if source_file and target_file: 518 source_key = source_file.key[len(source_path) :].lstrip("/") 519 target_key = target_file.key[len(target_path) :].lstrip("/") 520 521 if source_key < target_key: 522 file_queue.put((_SyncOp.ADD, source_file)) 523 source_file = next(source_iter, None) 524 elif source_key > target_key: 525 if delete_unmatched_files: 526 file_queue.put((_SyncOp.DELETE, target_file)) 527 else: 528 progress.update_progress() 529 target_file = next(target_iter, None) # Skip unmatched target file 530 else: 531 # Both exist, compare metadata 532 if not match_file_metadata(source_file, target_file): 533 file_queue.put((_SyncOp.ADD, source_file)) 534 else: 535 progress.update_progress() 536 537 source_file = next(source_iter, None) 538 target_file = next(target_iter, None) 539 elif source_file: 540 file_queue.put((_SyncOp.ADD, source_file)) 541 source_file = next(source_iter, None) 542 else: 543 if delete_unmatched_files: 544 file_queue.put((_SyncOp.DELETE, target_file)) 545 else: 546 progress.update_progress() 547 target_file = next(target_iter, None) 548 549 progress.update_total(total_count) 550 551 for _ in range(num_worker_threads * num_worker_processes): 552 file_queue.put((_SyncOp.STOP, None)) # Signal consumers to stop 553 554 producer_thread = threading.Thread(target=producer, daemon=True) 555 producer_thread.start() 556 557 def _result_consumer(): 558 # Pull from result_queue to collect pending updates from each multiprocessing worker. 559 while True: 560 op, target_file_path, physical_metadata = result_queue.get() 561 if op == _SyncOp.STOP: 562 break 563 564 if self._metadata_provider: 565 if op == _SyncOp.ADD: 566 # Use realpath() to get physical path so metadata provider can 567 # track the logical/physical mapping. 568 phys_path, _ = self._metadata_provider.realpath(target_file_path) 569 physical_metadata.key = phys_path 570 self._metadata_provider.add_file(target_file_path, physical_metadata) 571 elif op == _SyncOp.DELETE: 572 self._metadata_provider.remove_file(target_file_path) 573 else: 574 raise RuntimeError(f"Unknown operation: {op}") 575 progress.update_progress() 576 577 result_consumer_thread = threading.Thread(target=_result_consumer, daemon=True) 578 result_consumer_thread.start() 579 580 if num_worker_processes == 1: 581 # Single process does not require multiprocessing. 582 _sync_worker_process( 583 source_client, source_path, self, target_path, num_worker_threads, file_queue, result_queue 584 ) 585 else: 586 with multiprocessing.Pool(processes=num_worker_processes) as pool: 587 pool.apply( 588 _sync_worker_process, 589 args=(source_client, source_path, self, target_path, num_worker_threads, file_queue, result_queue), 590 ) 591 592 producer_thread.join() 593 594 result_queue.put((_SyncOp.STOP, None, None)) 595 result_consumer_thread.join() 596 597 self.commit_metadata() 598 progress.close() 599 logger.debug(f"Completed sync operation {description}")
600 601 602def _sync_worker_process( 603 source_client: StorageClient, 604 source_path: str, 605 target_client: StorageClient, 606 target_path: str, 607 num_worker_threads: int, 608 file_queue: queue.Queue, 609 result_queue: Optional[queue.Queue], 610): 611 """Helper function for sync_from, defined at top-level for multiprocessing.""" 612 613 def _sync_consumer() -> None: 614 """Processes files from the queue and copies them.""" 615 while True: 616 op, file_metadata = file_queue.get() 617 if op == _SyncOp.STOP: 618 break 619 620 source_key = file_metadata.key[len(source_path) :].lstrip("/") 621 target_file_path = os.path.join(target_path, source_key) 622 623 if op == _SyncOp.ADD: 624 logger.debug(f"sync {file_metadata.key} -> {target_file_path}") 625 if file_metadata.content_length < MEMORY_LOAD_LIMIT: 626 file_content = source_client.read(file_metadata.key) 627 target_client.write(target_file_path, file_content) 628 else: 629 with tempfile.NamedTemporaryFile(delete=False) as temp_file: 630 temp_filename = temp_file.name 631 632 try: 633 source_client.download_file(file_metadata.key, temp_filename) 634 target_client.upload_file(target_file_path, temp_filename) 635 finally: 636 os.remove(temp_filename) # Ensure the temporary file is removed 637 elif op == _SyncOp.DELETE: 638 logger.debug(f"rm {file_metadata.key}") 639 target_client.delete(file_metadata.key) 640 else: 641 raise ValueError(f"Unknown operation: {op}") 642 643 if result_queue: 644 if op == _SyncOp.ADD: 645 # add tuple of (virtual_path, physical_metadata) to result_queue 646 if target_client._metadata_provider: 647 physical_metadata = target_client._metadata_provider.get_object_metadata( 648 target_file_path, include_pending=True 649 ) 650 else: 651 physical_metadata = None 652 result_queue.put((op, target_file_path, physical_metadata)) 653 elif op == _SyncOp.DELETE: 654 result_queue.put((op, target_file_path, None)) 655 else: 656 raise RuntimeError(f"Unknown operation: {op}") 657 658 # Worker process that spawns threads to handle syncing. 659 with ThreadPoolExecutor(max_workers=num_worker_threads) as executor: 660 futures = [executor.submit(_sync_consumer) for _ in range(num_worker_threads)] 661 for future in futures: 662 future.result() # Ensure all threads complete