Source code for multistorageclient.types

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16from abc import ABC, abstractmethod
 17from collections.abc import Iterator
 18from dataclasses import asdict, dataclass, field
 19from datetime import datetime, timezone
 20from enum import Enum
 21from typing import IO, Any, NamedTuple, Optional, Tuple, Union
 22
 23from dateutil.parser import parse as dateutil_parser
 24
 25MSC_PROTOCOL_NAME = "msc"
 26MSC_PROTOCOL = MSC_PROTOCOL_NAME + "://"
 27
 28DEFAULT_RETRY_ATTEMPTS = 3
 29DEFAULT_RETRY_DELAY = 1.0
 30DEFAULT_RETRY_BACKOFF_MULTIPLIER = 2.0
 31
 32# datetime.min is a naive datetime.
 33#
 34# This creates issues when doing datetime.astimezone(timezone.utc) since it assumes the local timezone for the naive datetime.
 35# If the local timezone is offset behind UTC, it attempts to subtract off the offset which goes below the representable limit (i.e. an underflow).
 36# A `ValueError: year 0 is out of range` is thrown as a result.
 37AWARE_DATETIME_MIN = datetime.min.replace(tzinfo=timezone.utc)
 38
 39
[docs] 40@dataclass 41class Credentials: 42 """ 43 A data class representing the credentials needed to access a storage provider. 44 """ 45 46 #: The access key for authentication. 47 access_key: str 48 #: The secret key for authentication. 49 secret_key: str 50 #: An optional security token for temporary credentials. 51 token: Optional[str] 52 #: The expiration time of the credentials in ISO 8601 format. 53 expiration: Optional[str] 54 #: A dictionary for storing custom key-value pairs. 55 custom_fields: dict[str, Any] = field(default_factory=dict) 56
[docs] 57 def is_expired(self) -> bool: 58 """ 59 Checks if the credentials are expired based on the expiration time. 60 61 :return: ``True`` if the credentials are expired, ``False`` otherwise. 62 """ 63 expiry = dateutil_parser(self.expiration) if self.expiration else None 64 if expiry is None: 65 return False 66 return expiry <= datetime.now(tz=timezone.utc)
67
[docs] 68 def get_custom_field(self, key: str, default: Any = None) -> Any: 69 """ 70 Retrieves a value from custom fields by its key. 71 72 :param key: The key to look up in custom fields. 73 :param default: The default value to return if the key is not found. 74 :return: The value associated with the key, or the default value if not found. 75 """ 76 return self.custom_fields.get(key, default)
77 78
[docs] 79@dataclass 80class ObjectMetadata: 81 """ 82 A data class that represents the metadata associated with an object stored in a cloud storage service. This metadata 83 includes both required and optional information about the object. 84 """ 85 86 #: Relative path of the object. 87 key: str 88 #: The size of the object in bytes. 89 content_length: int 90 #: The timestamp indicating when the object was last modified. 91 last_modified: datetime 92 type: str = "file" 93 #: The MIME type of the object. 94 content_type: Optional[str] = field(default=None) 95 #: The entity tag (ETag) of the object. 96 etag: Optional[str] = field(default=None) 97 #: The storage class of the object. 98 storage_class: Optional[str] = field(default=None) 99 100 metadata: Optional[dict[str, Any]] = field(default=None) 101
[docs] 102 @staticmethod 103 def from_dict(data: dict) -> "ObjectMetadata": 104 """ 105 Creates an ObjectMetadata instance from a dictionary (parsed from JSON). 106 """ 107 try: 108 last_modified = dateutil_parser(data["last_modified"]) 109 key = data.get("key") 110 if key is None: 111 raise ValueError("Missing required field: 'key'") 112 return ObjectMetadata( 113 key=key, 114 content_length=data["content_length"], 115 last_modified=last_modified, 116 type=data.get("type", "file"), # default to file 117 content_type=data.get("content_type"), 118 etag=data.get("etag"), 119 storage_class=data.get("storage_class"), 120 metadata=data.get("metadata"), 121 ) 122 except KeyError as e: 123 raise ValueError("Missing required field.") from e
124
[docs] 125 def to_dict(self) -> dict: 126 data = asdict(self) 127 data["last_modified"] = self.last_modified.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") 128 return {k: v for k, v in data.items() if v is not None}
129 130
[docs] 131class CredentialsProvider(ABC): 132 """ 133 Abstract base class for providing credentials to access a storage provider. 134 """ 135
[docs] 136 @abstractmethod 137 def get_credentials(self) -> Credentials: 138 """ 139 Retrieves the current credentials. 140 141 :return: The current credentials used for authentication. 142 """ 143 pass
144
[docs] 145 @abstractmethod 146 def refresh_credentials(self) -> None: 147 """ 148 Refreshes the credentials if they are expired or about to expire. 149 """ 150 pass
151 152
[docs] 153@dataclass 154class Range: 155 """ 156 Byte-range read. 157 """ 158 159 offset: int 160 size: int
161 162
[docs] 163class StorageProvider(ABC): 164 """ 165 Abstract base class for interacting with a storage provider. 166 """ 167
[docs] 168 @abstractmethod 169 def put_object( 170 self, 171 path: str, 172 body: bytes, 173 if_match: Optional[str] = None, 174 if_none_match: Optional[str] = None, 175 attributes: Optional[dict[str, str]] = None, 176 ) -> None: 177 """ 178 Uploads an object to the storage provider. 179 180 :param path: The path where the object will be stored. 181 :param body: The content of the object to store. 182 :param attributes: The attributes to add to the file 183 """ 184 pass
185
[docs] 186 @abstractmethod 187 def get_object(self, path: str, byte_range: Optional[Range] = None) -> bytes: 188 """ 189 Retrieves an object from the storage provider. 190 191 :param path: The path where the object is stored. 192 193 :return: The content of the retrieved object. 194 """ 195 pass
196
[docs] 197 @abstractmethod 198 def copy_object(self, src_path: str, dest_path: str) -> None: 199 """ 200 Copies an object from source to destination in the storage provider. 201 202 :param src_path: The path of the source object to copy. 203 :param dest_path: The path of the destination. 204 """ 205 pass
206
[docs] 207 @abstractmethod 208 def delete_object(self, path: str, if_match: Optional[str] = None) -> None: 209 """ 210 Deletes an object from the storage provider. 211 212 :param path: The path of the object to delete. 213 :param if_match: Optional if-match value to use for conditional deletion. 214 """ 215 pass
216
[docs] 217 @abstractmethod 218 def get_object_metadata(self, path: str, strict: bool = True) -> ObjectMetadata: 219 """ 220 Retrieves metadata or information about an object stored in the provider. 221 222 :param path: The path of the object. 223 :param strict: If True, performs additional validation to determine whether the path refers to a directory. 224 225 :return: A metadata object containing the information about the object. 226 """ 227 pass
228
[docs] 229 @abstractmethod 230 def list_objects( 231 self, 232 path: str, 233 start_after: Optional[str] = None, 234 end_at: Optional[str] = None, 235 include_directories: bool = False, 236 attribute_filter_expression: Optional[str] = None, 237 show_attributes: bool = False, 238 follow_symlinks: bool = True, 239 ) -> Iterator[ObjectMetadata]: 240 """ 241 Lists objects in the storage provider under the specified path. 242 243 :param path: The path to list objects under. The path must be a valid file or subdirectory path, cannot be partial or just "prefix". 244 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 245 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 246 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 247 :param attribute_filter_expression: The attribute filter expression to apply to the result. 248 :param show_attributes: Whether to return attributes in the result. There will be performance impact if this is True as now we need to get object metadata for each object. 249 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. 250 251 :return: An iterator over objects metadata under the specified path. 252 """ 253 pass
254
[docs] 255 @abstractmethod 256 def upload_file(self, remote_path: str, f: Union[str, IO], attributes: Optional[dict[str, str]] = None) -> None: 257 """ 258 Uploads a file from the local file system to the storage provider. 259 260 :param remote_path: The path where the object will be stored. 261 :param f: The source file to upload. This can either be a string representing the local 262 file path, or a file-like object (e.g., an open file handle). 263 :param attributes: The attributes to add to the file if a new file is created. 264 """ 265 pass
266
[docs] 267 @abstractmethod 268 def download_file(self, remote_path: str, f: Union[str, IO], metadata: Optional[ObjectMetadata] = None) -> None: 269 """ 270 Downloads a file from the storage provider to the local file system. 271 272 :param remote_path: The path of the file to download. 273 :param f: The destination for the downloaded file. This can either be a string representing 274 the local file path where the file will be saved, or a file-like object to write the 275 downloaded content into. 276 :param metadata: Metadata about the object to download. 277 """ 278 pass
279
[docs] 280 @abstractmethod 281 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]: 282 """ 283 Matches and retrieves a list of object keys in the storage provider that match the specified pattern. 284 285 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``). 286 :param attribute_filter_expression: The attribute filter expression to apply to the result. 287 288 :return: A list of object keys that match the specified pattern. 289 """ 290 pass
291
[docs] 292 @abstractmethod 293 def is_file(self, path: str) -> bool: 294 """ 295 Checks whether the specified key in the storage provider points to a file (as opposed to a folder or directory). 296 297 :param path: The path to check. 298 299 :return: ``True`` if the key points to a file, ``False`` if it points to a directory or folder. 300 """ 301 pass
302 303
[docs] 304class ResolvedPath(NamedTuple): 305 """ 306 Result of resolving a virtual path to a physical path. 307 308 :param physical_path: The physical path in storage backend 309 :param exists: Whether the path exists in metadata 310 :param profile: Optional profile name for routing in CompositeStorageClient. 311 None means use current client's storage provider. 312 String means route to named child StorageClient. 313 """ 314 315 physical_path: str 316 exists: bool 317 profile: Optional[str] = None
318 319
[docs] 320class MetadataProvider(ABC): 321 """ 322 Abstract base class for accessing file metadata. 323 """ 324
[docs] 325 @abstractmethod 326 def list_objects( 327 self, 328 path: str, 329 start_after: Optional[str] = None, 330 end_at: Optional[str] = None, 331 include_directories: bool = False, 332 attribute_filter_expression: Optional[str] = None, 333 show_attributes: bool = False, 334 ) -> Iterator[ObjectMetadata]: 335 """ 336 Lists objects in the metadata provider under the specified path. 337 338 :param path: The path to list objects under. The path must be a valid file or subdirectory path, cannot be partial or just "prefix". 339 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist. 340 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist. 341 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects. 342 :param attribute_filter_expression: The attribute filter expression to apply to the result. 343 :param show_attributes: Whether to return attributes in the result. Depend on implementation, there might be performance impact if this set to True. 344 345 :return: A iterator over objects metadata under the specified path. 346 """ 347 pass
348
[docs] 349 @abstractmethod 350 def get_object_metadata(self, path: str, include_pending: bool = False) -> ObjectMetadata: 351 """ 352 Retrieves metadata or information about an object stored in the provider. 353 354 :param path: The path of the object. 355 :param include_pending: Whether to include metadata that is not yet committed. 356 357 :return: A metadata object containing the information about the object. 358 """ 359 pass
360
[docs] 361 @abstractmethod 362 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]: 363 """ 364 Matches and retrieves a list of object keys in the storage provider that match the specified pattern. 365 366 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``). 367 :param attribute_filter_expression: The attribute filter expression to apply to the result. 368 369 :return: A list of object keys that match the specified pattern. 370 """ 371 pass
372
[docs] 373 @abstractmethod 374 def realpath(self, logical_path: str) -> ResolvedPath: 375 """ 376 Resolves a logical path to its physical storage path. 377 378 This method checks if the object exists in the committed state and returns 379 the appropriate physical path. The exists flag indicates whether the path 380 was found. 381 382 :param logical_path: The user-facing logical path 383 384 :return: ResolvedPath with physical_path and exists flag indicating if path was found. 385 If exists=True, physical_path is the committed storage path. 386 If exists=False, physical_path is typically the logical_path as fallback. 387 """ 388 pass
389
[docs] 390 @abstractmethod 391 def generate_physical_path(self, logical_path: str, for_overwrite: bool = False) -> ResolvedPath: 392 """ 393 Generates a physical storage path for writing a new or overwritten object. 394 395 This method is used for write operations to determine where the object should 396 be physically stored. Implementations can use this to: 397 - Generate UUID-based paths for deduplication 398 - Create versioned paths (file-v1.txt, file-v2.txt) for time travel 399 - Implement path rewriting strategies 400 401 :param logical_path: The user-facing logical path 402 :param for_overwrite: If True, indicates the path is for overwriting an existing object. 403 Implementations may generate unique paths for overwrites to support versioning. 404 405 :return: ResolvedPath with physical_path for writing. The exists flag indicates 406 whether the logical path currently exists in committed state (for overwrite scenarios). 407 """ 408 pass
409
[docs] 410 @abstractmethod 411 def add_file(self, path: str, metadata: ObjectMetadata) -> None: 412 """ 413 Add a file to be tracked by the :py:class:`MetadataProvider`. Does not have to be 414 reflected in listing until a :py:meth:`MetadataProvider.commit_updates` forces a persist. 415 This function must tolerate duplicate calls (idempotent behavior). 416 417 :param path: User-supplied virtual path 418 :param metadata: physical file metadata from StorageProvider 419 """ 420 pass
421
[docs] 422 @abstractmethod 423 def remove_file(self, path: str) -> None: 424 """ 425 Remove a file tracked by the :py:class:`MetadataProvider`. Does not have to be 426 reflected in listing until a :py:meth:`MetadataProvider.commit_updates` forces a persist. 427 This function must tolerate duplicate calls (idempotent behavior). 428 429 :param path: User-supplied virtual path 430 """ 431 pass
432
[docs] 433 @abstractmethod 434 def commit_updates(self) -> None: 435 """ 436 Commit any newly adding files, used in conjunction with :py:meth:`MetadataProvider.add_file`. 437 :py:class:`MetadataProvider` will persistently record any metadata changes. 438 """ 439 pass
440
[docs] 441 @abstractmethod 442 def is_writable(self) -> bool: 443 """ 444 Returns ``True`` if the :py:class:`MetadataProvider` supports writes else ``False``. 445 """ 446 pass
447
[docs] 448 @abstractmethod 449 def allow_overwrites(self) -> bool: 450 """ 451 Returns ``True`` if the :py:class:`MetadataProvider` allows overwriting existing files else ``False``. 452 When ``True``, :py:meth:`add_file` will not raise an error if the file already exists. 453 """ 454 pass
455 456
[docs] 457@dataclass 458class StorageProviderConfig: 459 """ 460 A data class that represents the configuration needed to initialize a storage provider. 461 """ 462 463 #: The name or type of the storage provider (e.g., ``s3``, ``gcs``, ``oci``, ``azure``). 464 type: str 465 #: Additional options required to configure the storage provider (e.g., endpoint URLs, region, etc.). 466 options: Optional[dict[str, Any]] = None
467 468
[docs] 469class ProviderBundle(ABC): 470 """ 471 Abstract base class that serves as a container for various providers (storage, credentials, and metadata) 472 that interact with a storage service. The :py:class:`ProviderBundle` abstracts access to these providers, allowing for 473 flexible implementations of cloud storage solutions. 474 """ 475 476 @property 477 @abstractmethod 478 def storage_provider_config(self) -> StorageProviderConfig: 479 """ 480 :return: The configuration for the storage provider, which includes the provider 481 name/type and additional options. 482 """ 483 pass 484 485 @property 486 @abstractmethod 487 def credentials_provider(self) -> Optional[CredentialsProvider]: 488 """ 489 :return: The credentials provider responsible for managing authentication credentials 490 required to access the storage service. 491 """ 492 pass 493 494 @property 495 @abstractmethod 496 def metadata_provider(self) -> Optional[MetadataProvider]: 497 """ 498 :return: The metadata provider responsible for retrieving metadata about objects in the storage service. 499 """ 500 pass 501 502 @property 503 @abstractmethod 504 def replicas(self) -> list["Replica"]: 505 """ 506 :return: The replicas configuration for this provider bundle, if any. 507 """ 508 pass
509 510
[docs] 511@dataclass 512class RetryConfig: 513 """ 514 A data class that represents the configuration for retry strategy. 515 """ 516 517 #: The number of attempts before giving up. Must be at least 1. 518 attempts: int = DEFAULT_RETRY_ATTEMPTS 519 #: The base delay (in seconds) for exponential backoff. Must be a non-negative value. 520 delay: float = DEFAULT_RETRY_DELAY 521 #: The backoff multiplier for exponential backoff. Must be at least 1.0. 522 backoff_multiplier: float = DEFAULT_RETRY_BACKOFF_MULTIPLIER 523 524 def __post_init__(self) -> None: 525 if self.attempts < 1: 526 raise ValueError("Attempts must be at least 1.") 527 if self.delay < 0: 528 raise ValueError("Delay must be a non-negative number.") 529 if self.backoff_multiplier < 1.0: 530 raise ValueError("Backoff multiplier must be at least 1.0.")
531 532
[docs] 533class RetryableError(Exception): 534 """ 535 Exception raised for errors that should trigger a retry. 536 """ 537 538 pass
539 540
[docs] 541class PreconditionFailedError(Exception): 542 """ 543 Exception raised when a precondition fails. e.g. if-match, if-none-match, etc. 544 """ 545 546 pass
547 548
[docs] 549class NotModifiedError(Exception): 550 """ 551 Raised when a conditional operation fails because the resource has not been modified. 552 553 This typically occurs when using if-none-match with a specific generation/etag 554 and the resource's current generation/etag matches the specified one. 555 """ 556 557 pass
558 559
[docs] 560class SourceVersionCheckMode(Enum): 561 """ 562 Enum for controlling source version checking behavior. 563 """ 564 565 INHERIT = "inherit" # Inherit from configuration (cache config) 566 ENABLE = "enable" # Always check source version 567 DISABLE = "disable" # Never check source version
568 569
[docs] 570@dataclass 571class Replica: 572 """ 573 A tier of storage that can be used to store data. 574 """ 575 576 replica_profile: str 577 read_priority: int
578 579
[docs] 580class AutoCommitConfig: 581 """ 582 A data class that represents the configuration for auto commit. 583 """ 584 585 interval_minutes: Optional[float] # The interval in minutes for auto commit. 586 at_exit: bool = False # if True, commit on program exit 587 588 def __init__(self, interval_minutes: Optional[float] = None, at_exit: bool = False) -> None: 589 self.interval_minutes = interval_minutes 590 self.at_exit = at_exit
591 592
[docs] 593class ExecutionMode(Enum): 594 """ 595 Enum for controlling execution mode in sync operations. 596 """ 597 598 LOCAL = "local" 599 RAY = "ray"
600 601
[docs] 602class PatternType(Enum): 603 """ 604 Type of pattern operation for include/exclude filtering. 605 """ 606 607 INCLUDE = "include" 608 EXCLUDE = "exclude"
609 610 611# Type alias for pattern matching 612PatternList = list[Tuple[PatternType, str]]