1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from __future__ import annotations
17
18import posixpath
19from abc import ABC, abstractmethod
20from collections.abc import Iterator, Sequence
21from dataclasses import asdict, dataclass, field, replace
22from datetime import datetime, timezone
23from enum import Enum
24from typing import IO, Any, NamedTuple, Optional, Tuple, Union
25
26from dateutil.parser import parse as dateutil_parser
27
28MSC_PROTOCOL_NAME = "msc"
29MSC_PROTOCOL = MSC_PROTOCOL_NAME + "://"
30
31DEFAULT_RETRY_ATTEMPTS = 3
32DEFAULT_RETRY_DELAY = 1.0
33DEFAULT_RETRY_BACKOFF_MULTIPLIER = 2.0
34
35# datetime.min is a naive datetime.
36#
37# This creates issues when doing datetime.astimezone(timezone.utc) since it assumes the local timezone for the naive datetime.
38# If the local timezone is offset behind UTC, it attempts to subtract off the offset which goes below the representable limit (i.e. an underflow).
39# A `ValueError: year 0 is out of range` is thrown as a result.
40AWARE_DATETIME_MIN = datetime.min.replace(tzinfo=timezone.utc)
41
42
[docs]
43class SymlinkHandling(str, Enum):
44 """Controls how symbolic links are handled during listing and sync operations.
45
46 - ``FOLLOW``: Symlinks are transparent -- directory symlinks are recursed into,
47 file symlinks appear as regular files. This is the default and is backward-compatible.
48 - ``SKIP``: All symlinks are excluded from results.
49 - ``PRESERVE``: Symlinks are detected and yielded as leaf entries with
50 :py:attr:`ObjectMetadata.symlink_target` populated. Directory symlinks are
51 **not** recursed into.
52
53 .. note::
54 This option is only meaningful for POSIX file storage providers, which
55 have native symlink semantics. Cloud storage providers (S3, GCS, Azure,
56 OCI, AIS) ignore this parameter: they always list whatever is in the
57 bucket/prefix and surface MSC's symlink convention (an empty object
58 whose user metadata carries the target path) with
59 :py:attr:`ObjectMetadata.symlink_target` populated -- object storage
60 isn't a filesystem, so there is nothing to "follow" during listing.
61 """
62
63 FOLLOW = "follow"
64 SKIP = "skip"
65 PRESERVE = "preserve"
66
67
68# Maximum number of symlink hops allowed when resolving a symlink chain.
69MAX_SYMLINK_DEPTH = 8
70
71
[docs]
72class SignerType(str, Enum):
73 """Supported signer backends for presigned URL generation."""
74
75 S3 = "s3"
76 CLOUDFRONT = "cloudfront"
77 AZURE = "azure"
78
79
[docs]
80@dataclass
81class Credentials:
82 """
83 A data class representing the credentials needed to access a storage provider.
84 """
85
86 #: The access key for authentication.
87 access_key: str
88 #: The secret key for authentication.
89 secret_key: str
90 #: An optional security token for temporary credentials.
91 token: Optional[str]
92 #: The expiration time of the credentials in ISO 8601 format.
93 expiration: Optional[str]
94 #: A dictionary for storing custom key-value pairs.
95 custom_fields: dict[str, Any] = field(default_factory=dict)
96
[docs]
97 def is_expired(self) -> bool:
98 """
99 Checks if the credentials are expired based on the expiration time.
100
101 :return: ``True`` if the credentials are expired, ``False`` otherwise.
102 """
103 expiry = dateutil_parser(self.expiration) if self.expiration else None
104 if expiry is None:
105 return False
106 return expiry <= datetime.now(tz=timezone.utc)
107
[docs]
108 def get_custom_field(self, key: str, default: Any = None) -> Any:
109 """
110 Retrieves a value from custom fields by its key.
111
112 :param key: The key to look up in custom fields.
113 :param default: The default value to return if the key is not found.
114 :return: The value associated with the key, or the default value if not found.
115 """
116 return self.custom_fields.get(key, default)
117
118
218
219
[docs]
220class CredentialsProvider(ABC):
221 """
222 Abstract base class for providing credentials to access a storage provider.
223 """
224
[docs]
225 @abstractmethod
226 def get_credentials(self) -> Credentials:
227 """
228 Retrieves the current credentials.
229
230 :return: The current credentials used for authentication.
231 """
232 pass
233
[docs]
234 @abstractmethod
235 def refresh_credentials(self) -> None:
236 """
237 Refreshes the credentials if they are expired or about to expire.
238 """
239 pass
240
241
[docs]
242@dataclass
243class Range:
244 """
245 A data class that represents a byte range for read operations.
246 """
247
248 #: The start offset in bytes.
249 offset: int
250 #: The number of bytes to read.
251 size: int
252
253
[docs]
254class StorageProvider(ABC):
255 """
256 Abstract base class for interacting with a storage provider.
257 """
258
[docs]
259 @abstractmethod
260 def put_object(
261 self,
262 path: str,
263 body: bytes,
264 if_match: Optional[str] = None,
265 if_none_match: Optional[str] = None,
266 attributes: Optional[dict[str, Any]] = None,
267 ) -> None:
268 """
269 Uploads an object to the storage provider.
270
271 :param path: The path where the object will be stored.
272 :param body: The content of the object to store.
273 :param if_match: Optional If-Match value for conditional upload.
274 :param if_none_match: Optional If-None-Match value for conditional upload.
275 :param attributes: The attributes to add to the file.
276 """
277 pass
278
[docs]
279 @abstractmethod
280 def get_object(self, path: str, byte_range: Optional[Range] = None) -> bytes:
281 """
282 Retrieves an object from the storage provider.
283
284 :param path: The path where the object is stored.
285 :param byte_range: Optional byte range (offset, length) to read.
286 :return: The content of the retrieved object.
287 """
288 pass
289
[docs]
290 @abstractmethod
291 def copy_object(self, src_path: str, dest_path: str) -> None:
292 """
293 Copies an object from source to destination in the storage provider.
294
295 :param src_path: The path of the source object to copy.
296 :param dest_path: The path of the destination.
297 """
298 pass
299
[docs]
300 @abstractmethod
301 def delete_object(self, path: str, if_match: Optional[str] = None) -> None:
302 """
303 Deletes an object from the storage provider.
304
305 :param path: The path of the object to delete.
306 :param if_match: Optional if-match value to use for conditional deletion.
307 """
308 pass
309
[docs]
310 @abstractmethod
311 def delete_objects(self, paths: list[str]) -> None:
312 """
313 Deletes multiple objects from the storage provider.
314
315 :param paths: A list of paths of objects to delete.
316 """
317 pass
318
[docs]
319 @abstractmethod
320 def make_symlink(self, path: str, target: str) -> None:
321 """
322 Creates a symbolic link at ``path`` pointing to ``target``.
323
324 On POSIX backends this creates a native OS symlink with a relative target path.
325 On object-store backends this creates a zero-byte marker object with the target
326 stored in user metadata (``msc-symlink-target``).
327
328 :param path: The path where the symlink will be created.
329 :param target: The logical key that the symlink points to.
330 """
331 pass
332
344
[docs]
345 @abstractmethod
346 def list_objects(
347 self,
348 path: str,
349 start_after: Optional[str] = None,
350 end_at: Optional[str] = None,
351 include_directories: bool = False,
352 attribute_filter_expression: Optional[str] = None,
353 show_attributes: bool = False,
354 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW,
355 ) -> Iterator[ObjectMetadata]:
356 """
357 Lists objects in the storage provider under the specified path.
358
359 :param path: The path to list objects under. The path must be a valid file or subdirectory path, cannot be partial or just "prefix".
360 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
361 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
362 :param include_directories: Whether to include directories in the result. When ``True``, directories are returned alongside objects.
363 :param attribute_filter_expression: The attribute filter expression to apply to the result.
364 :param show_attributes: Whether to return attributes in the result. There will be performance impact if this is True as now we need to get object metadata for each object.
365 :param symlink_handling: How to handle symbolic links during listing.
366
367 :return: An iterator over objects metadata under the specified path.
368 """
369 pass
370
[docs]
371 @abstractmethod
372 def list_objects_recursive(
373 self,
374 path: str = "",
375 start_after: Optional[str] = None,
376 end_at: Optional[str] = None,
377 max_workers: int = 32,
378 look_ahead: int = 2,
379 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW,
380 ) -> Iterator[ObjectMetadata]:
381 """
382 Lists files recursively in the storage provider under the specified path.
383
384 :param path: The path to list objects under.
385 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
386 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
387 :param max_workers: Maximum concurrent workers for provider-level recursive listing.
388 :param look_ahead: Prefixes to buffer per worker for provider-level recursive listing.
389 :param symlink_handling: How to handle symbolic links during listing.
390 :return: An iterator over object metadata under the specified path.
391 """
392 pass
393
[docs]
394 @abstractmethod
395 def upload_file(self, remote_path: str, f: Union[str, IO], attributes: Optional[dict[str, Any]] = None) -> None:
396 """
397 Uploads a file from the local file system to the storage provider.
398
399 :param remote_path: The path where the object will be stored.
400 :param f: The source file to upload. This can either be a string representing the local
401 file path, or a file-like object (e.g., an open file handle).
402 :param attributes: The attributes to add to the file if a new file is created.
403 """
404 pass
405
[docs]
406 @abstractmethod
407 def download_file(self, remote_path: str, f: Union[str, IO], metadata: Optional[ObjectMetadata] = None) -> None:
408 """
409 Downloads a file from the storage provider to the local file system.
410
411 :param remote_path: The path of the file to download.
412 :param f: The destination for the downloaded file. This can either be a string representing
413 the local file path where the file will be saved, or a file-like object to write the
414 downloaded content into.
415 :param metadata: Metadata about the object to download.
416 """
417 pass
418
[docs]
419 @abstractmethod
420 def download_files(
421 self,
422 remote_paths: list[str],
423 local_paths: list[str],
424 metadata: Optional[Sequence[Optional[ObjectMetadata]]] = None,
425 max_workers: int = 16,
426 ) -> None:
427 """
428 Downloads multiple files from the storage provider to the local file system.
429
430 :param remote_paths: List of remote paths of files to download.
431 :param local_paths: List of local file paths to save the downloaded files to.
432 :param metadata: Optional per-file metadata used to decide between regular and multipart download.
433 :param max_workers: Maximum number of concurrent download workers (default: 16).
434 :raises ValueError: If remote_paths and local_paths have different lengths.
435 """
436 pass
437
[docs]
438 @abstractmethod
439 def upload_files(
440 self,
441 local_paths: list[str],
442 remote_paths: list[str],
443 attributes: Optional[Sequence[Optional[dict[str, Any]]]] = None,
444 max_workers: int = 16,
445 ) -> None:
446 """
447 Uploads multiple files from the local file system to the storage provider.
448
449 :param local_paths: List of local file paths to upload.
450 :param remote_paths: List of remote paths to upload the files to.
451 :param attributes: Optional list of per-file attributes to add. When provided, must have the same length
452 as local_paths/remote_paths. Each element may be ``None`` for files that need no attributes.
453 :param max_workers: Maximum number of concurrent upload workers (default: 16).
454 :raises ValueError: If local_paths and remote_paths have different lengths.
455 :raises ValueError: If attributes is provided and has a different length than local_paths.
456 """
457 pass
458
[docs]
459 @abstractmethod
460 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]:
461 """
462 Matches and retrieves a list of object keys in the storage provider that match the specified pattern.
463
464 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``).
465 :param attribute_filter_expression: The attribute filter expression to apply to the result.
466
467 :return: A list of object keys that match the specified pattern.
468 """
469 pass
470
[docs]
471 @abstractmethod
472 def is_file(self, path: str) -> bool:
473 """
474 Checks whether the specified key in the storage provider points to a file (as opposed to a folder or directory).
475
476 :param path: The path to check.
477
478 :return: ``True`` if the key points to a file, ``False`` if it points to a directory or folder.
479 """
480 pass
481
[docs]
482 def generate_presigned_url(
483 self,
484 path: str,
485 *,
486 method: str = "GET",
487 signer_type: Optional[SignerType] = None,
488 signer_options: Optional[dict[str, Any]] = None,
489 ) -> str:
490 """
491 Generate a pre-signed URL granting temporary access to the object at *path*.
492
493 :param path: The object path within the storage provider.
494 :param method: The HTTP method the URL should authorise (e.g. ``"GET"``, ``"PUT"``).
495 :param signer_type: The signing backend to use. ``None`` means the provider's native signer.
496 :param signer_options: Backend-specific options forwarded to the signer.
497 :return: A pre-signed URL string.
498 :raises NotImplementedError: If this storage provider does not support presigned URLs.
499 """
500 raise NotImplementedError(f"{type(self).__name__} does not support presigned URL generation.")
501
502
[docs]
503class ResolvedPathState(str, Enum):
504 """
505 Enum representing the state of a resolved path.
506 """
507
508 EXISTS = "exists" # File currently exists
509 DELETED = "deleted" # File existed before but has been deleted
510 UNTRACKED = "untracked" # File never existed or was never tracked
511
512
[docs]
513class ResolvedPath(NamedTuple):
514 """
515 Result of resolving a virtual path to a physical path.
516
517 :param physical_path: The physical path in storage backend
518 :param state: The state of the path (EXISTS, DELETED, or UNTRACKED)
519 :param profile: Optional profile name for routing in CompositeStorageClient.
520 None means use current client's storage provider.
521 String means route to named child StorageClient.
522
523 State meanings:
524 - EXISTS: File currently exists in metadata
525 - DELETED: File existed before but has been deleted (soft delete)
526 - UNTRACKED: File never existed or was never tracked
527 """
528
529 physical_path: str
530 state: ResolvedPathState
531 profile: Optional[str] = None
532
533 @property
534 def exists(self) -> bool:
535 """Backward compatibility property: True if state is EXISTS."""
536 return self.state == ResolvedPathState.EXISTS
537
538
695
696
[docs]
697@dataclass
698class StorageProviderConfig:
699 """
700 A data class that represents the configuration needed to initialize a storage provider.
701 """
702
703 #: The name or type of the storage provider (e.g., ``s3``, ``gcs``, ``oci``, ``azure``).
704 type: str
705 #: Additional options required to configure the storage provider (e.g., endpoint URLs, region, etc.).
706 options: Optional[dict[str, Any]] = None
707
708
[docs]
709@dataclass
710class StorageBackend:
711 """
712 Represents configuration for a single storage backend.
713 """
714
715 storage_provider_config: StorageProviderConfig
716 credentials_provider: Optional[CredentialsProvider] = None
717 replicas: list["Replica"] = field(default_factory=list)
718
719
[docs]
720class ProviderBundle(ABC):
721 """
722 Abstract base class that serves as a container for various providers (storage, credentials, and metadata)
723 that interact with a storage service. The :py:class:`ProviderBundle` abstracts access to these providers, allowing for
724 flexible implementations of cloud storage solutions.
725 """
726
727 @property
728 @abstractmethod
729 def storage_provider_config(self) -> StorageProviderConfig:
730 """
731 :return: The configuration for the storage provider, which includes the provider
732 name/type and additional options.
733 """
734 pass
735
736 @property
737 @abstractmethod
738 def credentials_provider(self) -> Optional[CredentialsProvider]:
739 """
740 :return: The credentials provider responsible for managing authentication credentials
741 required to access the storage service.
742 """
743 pass
744
745 @property
746 @abstractmethod
747 def metadata_provider(self) -> Optional[MetadataProvider]:
748 """
749 :return: The metadata provider responsible for retrieving metadata about objects in the storage service.
750 """
751 pass
752
753 @property
754 @abstractmethod
755 def replicas(self) -> list["Replica"]:
756 """
757 :return: The replicas configuration for this provider bundle, if any.
758 """
759 pass
760
761
[docs]
762class ProviderBundleV2(ABC):
763 """
764 Abstract base class that serves as a container for various providers (storage, credentials, and metadata)
765 that interact with one or multiple storage service. The :py:class:`ProviderBundleV2` abstracts access to these providers, allowing for
766 flexible implementations of cloud storage solutions.
767
768 """
769
770 @property
771 @abstractmethod
772 def storage_backends(self) -> dict[str, StorageBackend]:
773 """
774 :return: Mapping of storage backend name -> StorageBackend. Must have at least one backend.
775 """
776 pass
777
778 @property
779 @abstractmethod
780 def metadata_provider(self) -> Optional[MetadataProvider]:
781 """
782 :return: The metadata provider responsible for retrieving metadata about objects in the storage service. If there are multiple backends, this is required.
783 """
784 pass
785
786
[docs]
787@dataclass
788class RetryConfig:
789 """
790 A data class that represents the configuration for retry strategy.
791 """
792
793 #: The number of attempts before giving up. Must be at least 1.
794 attempts: int = DEFAULT_RETRY_ATTEMPTS
795 #: The base delay (in seconds) for exponential backoff. Must be a non-negative value.
796 delay: float = DEFAULT_RETRY_DELAY
797 #: The backoff multiplier for exponential backoff. Must be at least 1.0.
798 backoff_multiplier: float = DEFAULT_RETRY_BACKOFF_MULTIPLIER
799
800 def __post_init__(self) -> None:
801 if self.attempts < 1:
802 raise ValueError("Attempts must be at least 1.")
803 if self.delay < 0:
804 raise ValueError("Delay must be a non-negative number.")
805 if self.backoff_multiplier < 1.0:
806 raise ValueError("Backoff multiplier must be at least 1.0.")
807
808
[docs]
809class RetryableError(Exception):
810 """
811 Exception raised for errors that should trigger a retry.
812 """
813
814 pass
815
816
[docs]
817class PreconditionFailedError(Exception):
818 """
819 Exception raised when a precondition fails. e.g. if-match, if-none-match, etc.
820 """
821
822 pass
823
824
[docs]
825class NotModifiedError(Exception):
826 """
827 Raised when a conditional operation fails because the resource has not been modified.
828
829 This typically occurs when using if-none-match with a specific generation/etag
830 and the resource's current generation/etag matches the specified one.
831 """
832
833 pass
834
835
[docs]
836class SourceVersionCheckMode(Enum):
837 """
838 Enum for controlling source version checking behavior.
839 """
840
841 INHERIT = "inherit" # Inherit from configuration (cache config)
842 ENABLE = "enable" # Always check source version
843 DISABLE = "disable" # Never check source version
844
845
[docs]
846@dataclass
847class Replica:
848 """
849 A tier of storage that can be used to store data.
850 """
851
852 replica_profile: str
853 read_priority: int
854
855
[docs]
856class AutoCommitConfig:
857 """
858 A data class that represents the configuration for auto commit.
859 """
860
861 interval_minutes: Optional[float] # The interval in minutes for auto commit.
862 at_exit: bool = False # if True, commit on program exit
863
864 def __init__(self, interval_minutes: Optional[float] = None, at_exit: bool = False) -> None:
865 self.interval_minutes = interval_minutes
866 self.at_exit = at_exit
867
868
[docs]
869class ExecutionMode(Enum):
870 """
871 Enum for controlling execution mode in sync operations.
872 """
873
874 LOCAL = "local"
875 RAY = "ray"
876
877
[docs]
878class PatternType(Enum):
879 """
880 Type of pattern operation for include/exclude filtering.
881 """
882
883 INCLUDE = "include"
884 EXCLUDE = "exclude"
885
886
887# Type alias for pattern matching
888PatternList = list[Tuple[PatternType, str]]
889
890
[docs]
891@dataclass
892class DryrunResult:
893 """
894 Holds references to JSONL files produced by a dryrun sync operation.
895
896 Each file contains one JSON object per line, matching the :py:class:`ObjectMetadata`
897 serialization format (see :py:meth:`ObjectMetadata.to_dict` / :py:meth:`ObjectMetadata.from_dict`).
898
899 The caller is responsible for cleaning up the files when they are no longer needed.
900 """
901
902 #: Path to a JSONL file listing source objects that would be added to the target.
903 files_to_add: str
904 #: Path to a JSONL file listing target objects that would be deleted.
905 files_to_delete: str
906
907
[docs]
908@dataclass
909class SyncResult:
910 """
911 A data class that represents the summary of a sync operation.
912 """
913
914 #: The total number of work units tracked for progress (including files from both source and target after filtering). Each work unit represents an ADD or DELETE operation.
915 total_work_units: int = 0
916 #: The total number of files processed to the target.
917 total_files_added: int = 0
918 #: The total number of files deleted from the target.
919 total_files_deleted: int = 0
920 #: The total number of bytes transferred to the target.
921 total_bytes_added: int = 0
922 #: The total number of bytes deleted from the target.
923 total_bytes_deleted: int = 0
924 #: The total time taken to process the sync operation.
925 total_time_seconds: float = 0.0
926 #: Dryrun details with paths to JSONL files. ``None`` for normal (non-dryrun) sync operations.
927 dryrun: Optional[DryrunResult] = None
928
929 def __str__(self) -> str:
930 header = "Sync dryrun statistics:" if self.dryrun else "Sync statistics:"
931 lines = (
932 f"{header}\n"
933 f" Work units: {self.total_work_units}\n"
934 f" Files added: {self.total_files_added}\n"
935 f" Files deleted: {self.total_files_deleted}\n"
936 f" Bytes added: {self.total_bytes_added}\n"
937 f" Bytes deleted: {self.total_bytes_deleted}\n"
938 f" Time elapsed: {self.total_time_seconds:.2f}s"
939 )
940 if self.dryrun:
941 lines += f"\n Files to add: {self.dryrun.files_to_add}\n Files to delete: {self.dryrun.files_to_delete}"
942 return lines
943
944
[docs]
945class SyncError(RuntimeError):
946 """
947 Exception raised when errors occur during a sync operation.
948
949 This exception includes the partial SyncResult showing what was accomplished
950 before the error occurred, allowing users to understand the state of the sync.
951
952 :param message: The error message describing what went wrong.
953 :param sync_result: The partial SyncResult with statistics from the failed sync operation.
954 """
955
956 def __init__(self, message: str, sync_result: SyncResult):
957 super().__init__(message)
958 self.sync_result = sync_result
959
960 def __str__(self) -> str:
961 sync_stats = str(self.sync_result).replace("Sync statistics:", "Partial sync statistics:")
962 return f"{super().__str__()}\n\n{sync_stats}"