# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Any, Dict, Iterator, List, Optional, Union
from .config import StorageClientConfig
from .file import ObjectFile, PosixFile
from .providers.posix_file import PosixFileStorageProvider
from .retry import retry
from .types import MSC_PROTOCOL, ObjectMetadata, Range
from .utils import join_paths
from .instrumentation.utils import instrumented
[docs]@instrumented
class StorageClient:
"""
A client for interacting with different storage providers.
"""
_config: StorageClientConfig
def __init__(self, config: StorageClientConfig):
"""
Initializes the :py:class:`StorageClient` with the given configuration.
:param config: The configuration object for the storage client.
"""
self._initialize_providers(config)
def _initialize_providers(self, config: StorageClientConfig) -> None:
self._config = config
self._credentials_provider = self._config.credentials_provider
self._storage_provider = self._config.storage_provider
self._metadata_provider = self._config.metadata_provider
self._cache_config = self._config.cache_config
self._retry_config = self._config.retry_config
self._cache_manager = self._config.cache_manager
def _build_cache_path(self, path: str) -> str:
"""
Build cache path with or without etag.
"""
cache_path = f"{path}:{None}"
if self._metadata_provider:
if self._cache_manager and self._cache_manager.use_etag():
metadata = self._metadata_provider.get_object_metadata(path)
cache_path = f"{path}:{metadata.etag}"
else:
if self._cache_manager and self._cache_manager.use_etag():
metadata = self._storage_provider.get_object_metadata(path)
cache_path = f"{path}:{metadata.etag}"
return cache_path
def _is_cache_enabled(self) -> bool:
return self._cache_manager is not None and not self._is_posix_file_storage_provider()
def _is_posix_file_storage_provider(self) -> bool:
return isinstance(self._storage_provider, PosixFileStorageProvider)
@retry
def read(self, path: str, byte_range: Optional[Range] = None) -> bytes:
"""
Reads an object from the storage provider at the specified path.
:param path: The path of the object to read.
:return: The content of the object.
"""
if self._metadata_provider:
path, exists = self._metadata_provider.realpath(path)
if not exists:
raise FileNotFoundError(f"The file at path '{path}' was not found.")
# Read from cache if the file exists
if self._is_cache_enabled():
assert self._cache_manager is not None
cache_path = self._build_cache_path(path)
data = self._cache_manager.read(cache_path)
if data:
if byte_range:
return data[byte_range.offset : byte_range.offset + byte_range.size]
else:
return data
else:
# Only cache the entire file
if byte_range is None:
data = self._storage_provider.get_object(path)
self._cache_manager.set(cache_path, data)
return data
return self._storage_provider.get_object(path, byte_range=byte_range)
[docs] def info(self, path: str) -> ObjectMetadata:
"""
Retrieves metadata or information about an object stored at the specified path.
:param path: The path to the object for which metadata or information is being retrieved.
:return: A dictionary containing metadata or information about the object.
"""
if self._metadata_provider:
return self._metadata_provider.get_object_metadata(path)
else:
return self._storage_provider.get_object_metadata(path)
@retry
def download_file(self, remote_path: str, local_path: str) -> None:
"""
Downloads a file from the storage provider to the local file system.
:param remote_path: The path of the file in the storage provider.
:param local_path: The local path where the file should be downloaded.
"""
if self._metadata_provider:
real_path, exists = self._metadata_provider.realpath(remote_path)
if not exists:
raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.")
metadata = self._metadata_provider.get_object_metadata(remote_path)
self._storage_provider.download_file(real_path, local_path, metadata)
else:
self._storage_provider.download_file(remote_path, local_path)
[docs] def upload_file(self, remote_path: str, local_path: str) -> None:
"""
Uploads a file from the local file system to the storage provider.
:param remote_path: The path where the file should be stored in the storage provider.
:param local_path: The local path of the file to upload.
"""
if self._metadata_provider:
remote_path, exists = self._metadata_provider.realpath(remote_path)
if exists:
raise FileExistsError(
f"The file at path '{remote_path}' already exists; "
f"overwriting is not yet allowed when using a metadata provider."
)
self._storage_provider.upload_file(remote_path, local_path)
if self._metadata_provider:
metadata = self._storage_provider.get_object_metadata(remote_path)
self._metadata_provider.add_file(remote_path, metadata)
[docs] def write(self, path: str, body: bytes) -> None:
"""
Writes an object to the storage provider at the specified path.
:param path: The path where the object should be written.
:param body: The content to write to the object.
"""
if self._metadata_provider:
path, exists = self._metadata_provider.realpath(path)
if exists:
raise FileExistsError(
f"The file at path '{path}' already exists; "
f"overwriting is not yet allowed when using a metadata provider."
)
self._storage_provider.put_object(path, body)
if self._metadata_provider:
# TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
metadata = self._storage_provider.get_object_metadata(path)
self._metadata_provider.add_file(path, metadata)
[docs] def delete(self, path: str) -> None:
"""
Deletes an object from the storage provider at the specified path.
:param path: The path of the object to delete.
"""
if self._metadata_provider:
path, exists = self._metadata_provider.realpath(path)
if not exists:
raise FileNotFoundError(f"The file at path '{path}' was not found.")
self._metadata_provider.remove_file(path)
self._storage_provider.delete_object(path)
# Delete cached files
if self._is_cache_enabled():
assert self._cache_manager is not None
cache_path = self._build_cache_path(path)
self._cache_manager.delete(cache_path)
[docs] def glob(self, pattern: str, include_url_prefix: bool = False) -> List[str]:
"""
Matches and retrieves a list of objects in the storage provider that
match the specified pattern.
:param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``).
:param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
:return: A list of object paths that match the pattern.
"""
if self._metadata_provider:
results = self._metadata_provider.glob(pattern)
else:
results = self._storage_provider.glob(pattern)
if include_url_prefix:
results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results]
return results
[docs] def list(
self, prefix: str = "", start_after: Optional[str] = None, end_at: Optional[str] = None
) -> Iterator[ObjectMetadata]:
"""
Lists objects in the storage provider under the specified prefix.
:param prefix: The prefix to list objects under.
:param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
:param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
:return: An iterator over objects.
"""
if self._metadata_provider:
return self._metadata_provider.list_objects(prefix, start_after, end_at)
else:
return self._storage_provider.list_objects(prefix, start_after, end_at)
[docs] def open(self, path: str, mode: str = "rb") -> Union[PosixFile, ObjectFile]:
"""
Returns a file-like object from the storage provider at the specified path.
:param path: The path of the object to read.
:param mode: The file mode.
:return: A file-like object.
"""
if self._metadata_provider:
path, exists = self._metadata_provider.realpath(path)
if "w" in mode and exists:
raise FileExistsError(f"The file at path '{path}' already exists.")
if "r" in mode and not exists:
raise FileNotFoundError(f"The file at path '{path}' was not found.")
if self._is_posix_file_storage_provider():
realpath = self._storage_provider._realpath(path) # type: ignore
return PosixFile(path=realpath, mode=mode)
else:
return ObjectFile(self._storage_provider, remote_path=path, mode=mode, cache_manager=self._cache_manager)
[docs] def is_file(self, path: str) -> bool:
"""
Checks whether the specified path points to a file (rather than a directory or folder).
:param path: The path to check.
:return: ``True`` if the path points to a file, ``False`` otherwise.
"""
if self._metadata_provider:
_, exists = self._metadata_provider.realpath(path)
return exists
return self._storage_provider.is_file(path)
[docs] def commit_updates(self, prefix: Optional[str] = None) -> None:
"""
Commits any pending updates to the metadata provider. No-op if not using a metadata provider.
:param prefix: If provided, scans the prefix to find files to commit.
"""
if self._metadata_provider:
if prefix:
for obj in self._storage_provider.list_objects(prefix=prefix):
fullpath = os.path.join(prefix, obj.key)
self._metadata_provider.add_file(fullpath, obj)
self._metadata_provider.commit_updates()
[docs] def is_empty(self, path: str) -> bool:
"""
Checks whether the specified path is empty. A path is considered empty if there are no
objects whose keys start with the given path as a prefix.
:param path: The path to check. This is typically a prefix representing a directory or folder.
:return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
"""
objects = self._storage_provider.list_objects(path)
try:
return next(objects) is None
except StopIteration:
pass
return True
def __getstate__(self) -> Dict[str, Any]:
state = self.__dict__.copy()
del state["_credentials_provider"]
del state["_storage_provider"]
del state["_metadata_provider"]
del state["_cache_manager"]
return state
def __setstate__(self, state: Dict[str, Any]) -> None:
config = state["_config"]
self._initialize_providers(config)