Source code for multistorageclient.types

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16from __future__ import annotations
 17
 18from abc import ABC, abstractmethod
 19from collections.abc import Iterator
 20from dataclasses import asdict, dataclass, field
 21from datetime import datetime, timezone
 22from enum import Enum
 23from typing import IO, Any, NamedTuple, Optional, Tuple, Union
 24
 25from dateutil.parser import parse as dateutil_parser
 26
 27MSC_PROTOCOL_NAME = "msc"
 28MSC_PROTOCOL = MSC_PROTOCOL_NAME + "://"
 29
 30DEFAULT_RETRY_ATTEMPTS = 3
 31DEFAULT_RETRY_DELAY = 1.0
 32DEFAULT_RETRY_BACKOFF_MULTIPLIER = 2.0
 33
 34# datetime.min is a naive datetime.
 35#
 36# This creates issues when doing datetime.astimezone(timezone.utc) since it assumes the local timezone for the naive datetime.
 37# If the local timezone is offset behind UTC, it attempts to subtract off the offset which goes below the representable limit (i.e. an underflow).
 38# A `ValueError: year 0 is out of range` is thrown as a result.
 39AWARE_DATETIME_MIN = datetime.min.replace(tzinfo=timezone.utc)
 40
 41
[docs] 42@dataclass 43class Credentials: 44 """ 45 A data class representing the credentials needed to access a storage provider. 46 """ 47 48 #: The access key for authentication. 49 access_key: str 50 #: The secret key for authentication. 51 secret_key: str 52 #: An optional security token for temporary credentials. 53 token: Optional[str] 54 #: The expiration time of the credentials in ISO 8601 format. 55 expiration: Optional[str] 56 #: A dictionary for storing custom key-value pairs. 57 custom_fields: dict[str, Any] = field(default_factory=dict) 58
[docs] 59 def is_expired(self) -> bool: 60 """ 61 Checks if the credentials are expired based on the expiration time. 62 63 :return: ``True`` if the credentials are expired, ``False`` otherwise. 64 """ 65 expiry = dateutil_parser(self.expiration) if self.expiration else None 66 if expiry is None: 67 return False 68 return expiry <= datetime.now(tz=timezone.utc)
69
[docs] 70 def get_custom_field(self, key: str, default: Any = None) -> Any: 71 """ 72 Retrieves a value from custom fields by its key. 73 74 :param key: The key to look up in custom fields. 75 :param default: The default value to return if the key is not found. 76 :return: The value associated with the key, or the default value if not found. 77 """ 78 return self.custom_fields.get(key, default)
79 80
[docs] 81@dataclass 82class ObjectMetadata: 83 """ 84 A data class that represents the metadata associated with an object stored in a cloud storage service. This metadata 85 includes both required and optional information about the object. 86 """ 87 88 #: Relative path of the object. 89 key: str 90 #: The size of the object in bytes. 91 content_length: int 92 #: The timestamp indicating when the object was last modified. 93 last_modified: datetime 94 type: str = "file" 95 #: The MIME type of the object. 96 content_type: Optional[str] = field(default=None) 97 #: The entity tag (ETag) of the object. 98 etag: Optional[str] = field(default=None) 99 #: The storage class of the object. 100 storage_class: Optional[str] = field(default=None) 101 102 metadata: Optional[dict[str, Any]] = field(default=None) 103
[docs] 104 @staticmethod 105 def from_dict(data: dict) -> "ObjectMetadata": 106 """ 107 Creates an ObjectMetadata instance from a dictionary (parsed from JSON). 108 """ 109 try: 110 last_modified = dateutil_parser(data["last_modified"]) 111 key = data.get("key") 112 if key is None: 113 raise ValueError("Missing required field: 'key'") 114 return ObjectMetadata( 115 key=key, 116 content_length=data["content_length"], 117 last_modified=last_modified, 118 type=data.get("type", "file"), # default to file 119 content_type=data.get("content_type"), 120 etag=data.get("etag"), 121 storage_class=data.get("storage_class"), 122 metadata=data.get("metadata"), 123 ) 124 except KeyError as e: 125 raise ValueError("Missing required field.") from e
126
[docs] 127 def to_dict(self) -> dict: 128 data = asdict(self) 129 data["last_modified"] = self.last_modified.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") 130 return {k: v for k, v in data.items() if v is not None}
131 132
[docs] 133class CredentialsProvider(ABC): 134 """ 135 Abstract base class for providing credentials to access a storage provider. 136 """ 137
[docs] 138 @abstractmethod 139 def get_credentials(self) -> Credentials: 140 """ 141 Retrieves the current credentials. 142 143 :return: The current credentials used for authentication. 144 """ 145 pass
146
[docs] 147 @abstractmethod 148 def refresh_credentials(self) -> None: 149 """ 150 Refreshes the credentials if they are expired or about to expire. 151 """ 152 pass
153 154
[docs] 155@dataclass 156class Range: 157 """ 158 A data class that represents a byte range for read operations. 159 """ 160 161 #: The start offset in bytes. 162 offset: int 163 #: The number of bytes to read. 164 size: int
165 166
[docs] 167class StorageProvider(ABC): 168 """ 169 Abstract base class for interacting with a storage provider. 170 """ 171
[docs] 172 @abstractmethod 173 def put_object( 174 self, 175 path: str, 176 body: bytes, 177 if_match: Optional[str] = None, 178 if_none_match: Optional[str] = None, 179 attributes: Optional[dict[str, str]] = None, 180 ) -> None: 181 """ 182 Uploads an object to the storage provider. 183 184 :param path: The path where the object will be stored. 185 :param body: The content of the object to store. 186 :param attributes: The attributes to add to the file 187 """ 188 pass
189
[docs] 190 @abstractmethod 191 def get_object(self, path: str, byte_range: Optional[Range] = None) -> bytes: 192 """ 193 Retrieves an object from the storage provider. 194 195 :param path: The path where the object is stored. 196 197 :return: The content of the retrieved object. 198 """ 199 pass
200
[docs] 201 @abstractmethod 202 def copy_object(self, src_path: str, dest_path: str) -> None: 203 """ 204 Copies an object from source to destination in the storage provider. 205 206 :param src_path: The path of the source object to copy. 207 :param dest_path: The path of the destination. 208 """ 209 pass
210
[docs] 211 @abstractmethod 212 def delete_object(self, path: str, if_match: Optional[str] = None) -> None: 213 """ 214 Deletes an object from the storage provider. 215 216 :param path: The path of the object to delete. 217 :param if_match: Optional if-match value to use for conditional deletion. 218 """ 219 pass
220
[docs] 221 @abstractmethod 222 def get_object_metadata(self, path: str, strict: bool = True) -> ObjectMetadata: 223 """ 224 Retrieves metadata or information about an object stored in the provider. 225 226 :param path: The path of the object. 227 :param strict: If True, performs additional validation to determine whether the path refers to a directory. 228 229 :return: A metadata object containing the information about the object. 230 """ 231 pass
232
[docs] 233 @abstractmethod 234 def list_objects( 235 self, 236 path: str, 237 start_after: Optional[str] = None, 238 end_at: Optional[str] = None, 239 include_directories: bool = False, 240 attribute_filter_expression: Optional[str] = None, 241 show_attributes: bool = False, 242 follow_symlinks: bool = True, 243 ) -> Iterator[ObjectMetadata]: 244 """ 245 Lists objects in the storage provider under the specified path. 246 247 :param path: The path to list objects under. The path must be a valid file or subdirectory path, cannot be partial or just "prefix". 248 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 249 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 250 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 251 :param attribute_filter_expression: The attribute filter expression to apply to the result. 252 :param show_attributes: Whether to return attributes in the result. There will be performance impact if this is True as now we need to get object metadata for each object. 253 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. 254 255 :return: An iterator over objects metadata under the specified path. 256 """ 257 pass
258
[docs] 259 @abstractmethod 260 def upload_file(self, remote_path: str, f: Union[str, IO], attributes: Optional[dict[str, str]] = None) -> None: 261 """ 262 Uploads a file from the local file system to the storage provider. 263 264 :param remote_path: The path where the object will be stored. 265 :param f: The source file to upload. This can either be a string representing the local 266 file path, or a file-like object (e.g., an open file handle). 267 :param attributes: The attributes to add to the file if a new file is created. 268 """ 269 pass
270
[docs] 271 @abstractmethod 272 def download_file(self, remote_path: str, f: Union[str, IO], metadata: Optional[ObjectMetadata] = None) -> None: 273 """ 274 Downloads a file from the storage provider to the local file system. 275 276 :param remote_path: The path of the file to download. 277 :param f: The destination for the downloaded file. This can either be a string representing 278 the local file path where the file will be saved, or a file-like object to write the 279 downloaded content into. 280 :param metadata: Metadata about the object to download. 281 """ 282 pass
283
[docs] 284 @abstractmethod 285 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]: 286 """ 287 Matches and retrieves a list of object keys in the storage provider that match the specified pattern. 288 289 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``). 290 :param attribute_filter_expression: The attribute filter expression to apply to the result. 291 292 :return: A list of object keys that match the specified pattern. 293 """ 294 pass
295
[docs] 296 @abstractmethod 297 def is_file(self, path: str) -> bool: 298 """ 299 Checks whether the specified key in the storage provider points to a file (as opposed to a folder or directory). 300 301 :param path: The path to check. 302 303 :return: ``True`` if the key points to a file, ``False`` if it points to a directory or folder. 304 """ 305 pass
306 307
[docs] 308class ResolvedPathState(str, Enum): 309 """ 310 Enum representing the state of a resolved path. 311 """ 312 313 EXISTS = "exists" # File currently exists 314 DELETED = "deleted" # File existed before but has been deleted 315 UNTRACKED = "untracked" # File never existed or was never tracked
316 317
[docs] 318class ResolvedPath(NamedTuple): 319 """ 320 Result of resolving a virtual path to a physical path. 321 322 :param physical_path: The physical path in storage backend 323 :param state: The state of the path (EXISTS, DELETED, or UNTRACKED) 324 :param profile: Optional profile name for routing in CompositeStorageClient. 325 None means use current client's storage provider. 326 String means route to named child StorageClient. 327 328 State meanings: 329 - EXISTS: File currently exists in metadata 330 - DELETED: File existed before but has been deleted (soft delete) 331 - UNTRACKED: File never existed or was never tracked 332 """ 333 334 physical_path: str 335 state: ResolvedPathState 336 profile: Optional[str] = None 337 338 @property 339 def exists(self) -> bool: 340 """Backward compatibility property: True if state is EXISTS.""" 341 return self.state == ResolvedPathState.EXISTS
342 343
[docs] 344class MetadataProvider(ABC): 345 """ 346 Abstract base class for accessing file metadata. 347 """ 348
[docs] 349 @abstractmethod 350 def list_objects( 351 self, 352 path: str, 353 start_after: Optional[str] = None, 354 end_at: Optional[str] = None, 355 include_directories: bool = False, 356 attribute_filter_expression: Optional[str] = None, 357 show_attributes: bool = False, 358 ) -> Iterator[ObjectMetadata]: 359 """ 360 Lists objects in the metadata provider under the specified path. 361 362 :param path: The path to list objects under. The path must be a valid file or subdirectory path, cannot be partial or just "prefix". 363 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 364 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 365 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 366 :param attribute_filter_expression: The attribute filter expression to apply to the result. 367 :param show_attributes: Whether to return attributes in the result. Depend on implementation, there might be performance impact if this set to True. 368 369 :return: A iterator over objects metadata under the specified path. 370 """ 371 pass
372
[docs] 373 @abstractmethod 374 def get_object_metadata(self, path: str, include_pending: bool = False) -> ObjectMetadata: 375 """ 376 Retrieves metadata or information about an object stored in the provider. 377 378 :param path: The path of the object. 379 :param include_pending: Whether to include metadata that is not yet committed. 380 381 :return: A metadata object containing the information about the object. 382 """ 383 pass
384
[docs] 385 @abstractmethod 386 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]: 387 """ 388 Matches and retrieves a list of object keys in the storage provider that match the specified pattern. 389 390 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``). 391 :param attribute_filter_expression: The attribute filter expression to apply to the result. 392 393 :return: A list of object keys that match the specified pattern. 394 """ 395 pass
396
[docs] 397 @abstractmethod 398 def realpath(self, logical_path: str) -> ResolvedPath: 399 """ 400 Resolves a logical path to its physical storage path. 401 402 This method checks if the object exists in the committed state and returns 403 the appropriate physical path with the current state of the path. 404 405 :param logical_path: The user-facing logical path 406 407 :return: ResolvedPath with physical_path and state: 408 - ResolvedPathState.EXISTS: File currently exists 409 - ResolvedPathState.UNTRACKED: File never existed 410 - ResolvedPathState.DELETED: File was deleted 411 If state is EXISTS, physical_path is the committed storage path. 412 Otherwise, physical_path is typically the logical_path as fallback. 413 """ 414 pass
415
[docs] 416 @abstractmethod 417 def generate_physical_path(self, logical_path: str, for_overwrite: bool = False) -> ResolvedPath: 418 """ 419 Generates a physical storage path for writing a new or overwritten object. 420 421 This method is used for write operations to determine where the object should 422 be physically stored. Implementations can use this to: 423 - Generate UUID-based paths for deduplication 424 - Create versioned paths (file-v1.txt, file-v2.txt) for time travel 425 - Implement path rewriting strategies 426 427 :param logical_path: The user-facing logical path 428 :param for_overwrite: If True, indicates the path is for overwriting an existing object. 429 Implementations may generate unique paths for overwrites to support versioning. 430 431 :return: ResolvedPath with physical_path for writing. The exists flag indicates 432 whether the logical path currently exists in committed state (for overwrite scenarios). 433 """ 434 pass
435
[docs] 436 @abstractmethod 437 def add_file(self, path: str, metadata: ObjectMetadata) -> None: 438 """ 439 Add a file to be tracked by the :py:class:`MetadataProvider`. Does not have to be 440 reflected in listing until a :py:meth:`MetadataProvider.commit_updates` forces a persist. 441 This function must tolerate duplicate calls (idempotent behavior). 442 443 :param path: User-supplied virtual path 444 :param metadata: physical file metadata from StorageProvider 445 """ 446 pass
447
[docs] 448 @abstractmethod 449 def remove_file(self, path: str) -> None: 450 """ 451 Remove a file tracked by the :py:class:`MetadataProvider`. Does not have to be 452 reflected in listing until a :py:meth:`MetadataProvider.commit_updates` forces a persist. 453 This function must tolerate duplicate calls (idempotent behavior). 454 455 :param path: User-supplied virtual path 456 """ 457 pass
458
[docs] 459 @abstractmethod 460 def commit_updates(self) -> None: 461 """ 462 Commit any newly adding files, used in conjunction with :py:meth:`MetadataProvider.add_file`. 463 :py:class:`MetadataProvider` will persistently record any metadata changes. 464 """ 465 pass
466
[docs] 467 @abstractmethod 468 def is_writable(self) -> bool: 469 """ 470 Returns ``True`` if the :py:class:`MetadataProvider` supports writes else ``False``. 471 """ 472 pass
473
[docs] 474 @abstractmethod 475 def allow_overwrites(self) -> bool: 476 """ 477 Returns ``True`` if the :py:class:`MetadataProvider` allows overwriting existing files else ``False``. 478 When ``True``, :py:meth:`add_file` will not raise an error if the file already exists. 479 """ 480 pass
481 482
[docs] 483@dataclass 484class StorageProviderConfig: 485 """ 486 A data class that represents the configuration needed to initialize a storage provider. 487 """ 488 489 #: The name or type of the storage provider (e.g., ``s3``, ``gcs``, ``oci``, ``azure``). 490 type: str 491 #: Additional options required to configure the storage provider (e.g., endpoint URLs, region, etc.). 492 options: Optional[dict[str, Any]] = None
493 494
[docs] 495@dataclass 496class StorageBackend: 497 """ 498 Represents configuration for a single storage backend. 499 """ 500 501 storage_provider_config: StorageProviderConfig 502 credentials_provider: Optional[CredentialsProvider] = None 503 replicas: list["Replica"] = field(default_factory=list)
504 505
[docs] 506class ProviderBundle(ABC): 507 """ 508 Abstract base class that serves as a container for various providers (storage, credentials, and metadata) 509 that interact with a storage service. The :py:class:`ProviderBundle` abstracts access to these providers, allowing for 510 flexible implementations of cloud storage solutions. 511 """ 512 513 @property 514 @abstractmethod 515 def storage_provider_config(self) -> StorageProviderConfig: 516 """ 517 :return: The configuration for the storage provider, which includes the provider 518 name/type and additional options. 519 """ 520 pass 521 522 @property 523 @abstractmethod 524 def credentials_provider(self) -> Optional[CredentialsProvider]: 525 """ 526 :return: The credentials provider responsible for managing authentication credentials 527 required to access the storage service. 528 """ 529 pass 530 531 @property 532 @abstractmethod 533 def metadata_provider(self) -> Optional[MetadataProvider]: 534 """ 535 :return: The metadata provider responsible for retrieving metadata about objects in the storage service. 536 """ 537 pass 538 539 @property 540 @abstractmethod 541 def replicas(self) -> list["Replica"]: 542 """ 543 :return: The replicas configuration for this provider bundle, if any. 544 """ 545 pass
546 547
[docs] 548class ProviderBundleV2(ABC): 549 """ 550 Abstract base class that serves as a container for various providers (storage, credentials, and metadata) 551 that interact with one or multiple storage service. The :py:class:`ProviderBundleV2` abstracts access to these providers, allowing for 552 flexible implementations of cloud storage solutions. 553 554 """ 555 556 @property 557 @abstractmethod 558 def storage_backends(self) -> dict[str, StorageBackend]: 559 """ 560 :return: Mapping of storage backend name -> StorageBackend. Must have at least one backend. 561 """ 562 pass 563 564 @property 565 @abstractmethod 566 def metadata_provider(self) -> Optional[MetadataProvider]: 567 """ 568 :return: The metadata provider responsible for retrieving metadata about objects in the storage service. If there are multiple backends, this is required. 569 """ 570 pass
571 572
[docs] 573@dataclass 574class RetryConfig: 575 """ 576 A data class that represents the configuration for retry strategy. 577 """ 578 579 #: The number of attempts before giving up. Must be at least 1. 580 attempts: int = DEFAULT_RETRY_ATTEMPTS 581 #: The base delay (in seconds) for exponential backoff. Must be a non-negative value. 582 delay: float = DEFAULT_RETRY_DELAY 583 #: The backoff multiplier for exponential backoff. Must be at least 1.0. 584 backoff_multiplier: float = DEFAULT_RETRY_BACKOFF_MULTIPLIER 585 586 def __post_init__(self) -> None: 587 if self.attempts < 1: 588 raise ValueError("Attempts must be at least 1.") 589 if self.delay < 0: 590 raise ValueError("Delay must be a non-negative number.") 591 if self.backoff_multiplier < 1.0: 592 raise ValueError("Backoff multiplier must be at least 1.0.")
593 594
[docs] 595class RetryableError(Exception): 596 """ 597 Exception raised for errors that should trigger a retry. 598 """ 599 600 pass
601 602
[docs] 603class PreconditionFailedError(Exception): 604 """ 605 Exception raised when a precondition fails. e.g. if-match, if-none-match, etc. 606 """ 607 608 pass
609 610
[docs] 611class NotModifiedError(Exception): 612 """ 613 Raised when a conditional operation fails because the resource has not been modified. 614 615 This typically occurs when using if-none-match with a specific generation/etag 616 and the resource's current generation/etag matches the specified one. 617 """ 618 619 pass
620 621
[docs] 622class SourceVersionCheckMode(Enum): 623 """ 624 Enum for controlling source version checking behavior. 625 """ 626 627 INHERIT = "inherit" # Inherit from configuration (cache config) 628 ENABLE = "enable" # Always check source version 629 DISABLE = "disable" # Never check source version
630 631
[docs] 632@dataclass 633class Replica: 634 """ 635 A tier of storage that can be used to store data. 636 """ 637 638 replica_profile: str 639 read_priority: int
640 641
[docs] 642class AutoCommitConfig: 643 """ 644 A data class that represents the configuration for auto commit. 645 """ 646 647 interval_minutes: Optional[float] # The interval in minutes for auto commit. 648 at_exit: bool = False # if True, commit on program exit 649 650 def __init__(self, interval_minutes: Optional[float] = None, at_exit: bool = False) -> None: 651 self.interval_minutes = interval_minutes 652 self.at_exit = at_exit
653 654
[docs] 655class ExecutionMode(Enum): 656 """ 657 Enum for controlling execution mode in sync operations. 658 """ 659 660 LOCAL = "local" 661 RAY = "ray"
662 663
[docs] 664class PatternType(Enum): 665 """ 666 Type of pattern operation for include/exclude filtering. 667 """ 668 669 INCLUDE = "include" 670 EXCLUDE = "exclude"
671 672 673# Type alias for pattern matching 674PatternList = list[Tuple[PatternType, str]]