Source code for multistorageclient.providers.ais

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import io
 17import os
 18from collections.abc import Callable, Iterator
 19from typing import IO, Any, Optional, TypeVar, Union
 20
 21from aistore.sdk import Client
 22from aistore.sdk.authn import AuthNClient
 23from aistore.sdk.errors import AISError
 24from aistore.sdk.obj.object_props import ObjectProps
 25from requests.exceptions import HTTPError
 26from urllib3.util import Retry
 27
 28from ..telemetry import Telemetry
 29from ..types import (
 30    AWARE_DATETIME_MIN,
 31    Credentials,
 32    CredentialsProvider,
 33    ObjectMetadata,
 34    Range,
 35)
 36from ..utils import split_path, validate_attributes
 37from .base import BaseStorageProvider
 38
 39_T = TypeVar("_T")
 40
 41PROVIDER = "ais"
 42DEFAULT_PAGE_SIZE = 1000
 43
 44
[docs] 45class StaticAISCredentialProvider(CredentialsProvider): 46 """ 47 A concrete implementation of the :py:class:`multistorageclient.types.CredentialsProvider` that provides static S3 credentials. 48 """ 49 50 _username: Optional[str] 51 _password: Optional[str] 52 _authn_endpoint: Optional[str] 53 _token: Optional[str] 54 _skip_verify: bool 55 _ca_cert: Optional[str] 56 57 def __init__( 58 self, 59 username: Optional[str] = None, 60 password: Optional[str] = None, 61 authn_endpoint: Optional[str] = None, 62 token: Optional[str] = None, 63 skip_verify: bool = True, 64 ca_cert: Optional[str] = None, 65 ): 66 """ 67 Initializes the :py:class:`StaticAISCredentialProvider` with the given credentials. 68 69 :param username: The username for the AIStore authentication. 70 :param password: The password for the AIStore authentication. 71 :param authn_endpoint: The AIStore authentication endpoint. 72 :param token: The AIStore authentication token. This is used for authentication if username, 73 password and authn_endpoint are not provided. 74 :param skip_verify: If true, skip SSL certificate verification. 75 :param ca_cert: Path to a CA certificate file for SSL verification. 76 """ 77 self._username = username 78 self._password = password 79 self._authn_endpoint = authn_endpoint 80 self._token = token 81 self._skip_verify = skip_verify 82 self._ca_cert = ca_cert 83
[docs] 84 def get_credentials(self) -> Credentials: 85 if self._username and self._password and self._authn_endpoint: 86 authn_client = AuthNClient(self._authn_endpoint, self._skip_verify, self._ca_cert) 87 self._token = authn_client.login(self._username, self._password) 88 return Credentials(token=self._token, access_key="", secret_key="", expiration=None)
89
[docs] 90 def refresh_credentials(self) -> None: 91 pass
92 93
[docs] 94class AIStoreStorageProvider(BaseStorageProvider): 95 """ 96 A concrete implementation of the :py:class:`multistorageclient.types.StorageProvider` for interacting with NVIDIA AIStore. 97 """ 98 99 def __init__( 100 self, 101 endpoint: str = os.getenv("AIS_ENDPOINT", ""), 102 provider: str = PROVIDER, 103 skip_verify: bool = True, 104 ca_cert: Optional[str] = None, 105 timeout: Optional[Union[float, tuple[float, float]]] = None, 106 retry: Optional[dict[str, Any]] = None, 107 base_path: str = "", 108 credentials_provider: Optional[CredentialsProvider] = None, 109 config_dict: Optional[dict[str, Any]] = None, 110 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 111 **kwargs: Any, 112 ) -> None: 113 """ 114 AIStore client for managing buckets, objects, and ETL jobs. 115 116 :param endpoint: The AIStore endpoint. 117 :param skip_verify: Whether to skip SSL certificate verification. 118 :param ca_cert: Path to a CA certificate file for SSL verification. 119 :param timeout: Request timeout in seconds; a single float 120 for both connect/read timeouts (e.g., ``5.0``), a tuple for separate connect/read 121 timeouts (e.g., ``(3.0, 10.0)``), or ``None`` to disable timeout. 122 :param retry: ``urllib3.util.Retry`` parameters. 123 :param token: Authorization token. If not provided, the ``AIS_AUTHN_TOKEN`` environment variable will be used. 124 :param base_path: The root prefix path within the bucket where all operations will be scoped. 125 :param credentials_provider: The provider to retrieve AIStore credentials. 126 :param config_dict: Resolved MSC config. 127 :param telemetry_provider: A function that provides a telemetry instance. 128 """ 129 super().__init__( 130 base_path=base_path, 131 provider_name=PROVIDER, 132 config_dict=config_dict, 133 telemetry_provider=telemetry_provider, 134 ) 135 136 # https://aistore.nvidia.com/docs/python-sdk#client.Client 137 client_retry = None if retry is None else Retry(**retry) 138 token = None 139 if credentials_provider: 140 token = credentials_provider.get_credentials().token 141 self.client = Client( 142 endpoint=endpoint, 143 retry=client_retry, 144 skip_verify=skip_verify, 145 ca_cert=ca_cert, 146 timeout=timeout, 147 token=token, 148 ) 149 else: 150 self.client = Client(endpoint=endpoint, retry=client_retry) 151 self.provider = provider 152 153 def _translate_errors( 154 self, 155 func: Callable[[], _T], 156 operation: str, 157 bucket: str, 158 key: str, 159 ) -> _T: 160 """ 161 Translates errors like timeouts and client errors. 162 163 :param func: The function that performs the actual object storage operation. 164 :param operation: The type of operation being performed (e.g., ``PUT``, ``GET``, ``DELETE``). 165 :param bucket: The name of the object storage bucket involved in the operation. 166 :param key: The key of the object within the object storage bucket. 167 168 :return: The result of the object storage operation, typically the return value of the `func` callable. 169 """ 170 171 try: 172 return func() 173 except AISError as error: 174 status_code = error.status_code 175 error_info = f"status_code: {status_code}, message: {error.message}" 176 raise RuntimeError(f"Failed to {operation} object(s) at {bucket}/{key}. {error_info}") from error 177 except HTTPError as error: 178 status_code = error.response.status_code 179 if status_code == 404: 180 raise FileNotFoundError(f"Object {bucket}/{key} does not exist.") # pylint: disable=raise-missing-from 181 else: 182 raise RuntimeError( 183 f"Failed to {operation} object(s) at {bucket}/{key}, error type: {type(error).__name__}" 184 ) from error 185 except Exception as error: 186 raise RuntimeError( 187 f"Failed to {operation} object(s) at {bucket}/{key}, error type: {type(error).__name__}, error: {error}" 188 ) from error 189 190 def _put_object( 191 self, 192 path: str, 193 body: bytes, 194 if_match: Optional[str] = None, 195 if_none_match: Optional[str] = None, 196 attributes: Optional[dict[str, str]] = None, 197 ) -> int: 198 # ais does not support if_match and if_none_match 199 bucket, key = split_path(path) 200 201 def _invoke_api() -> int: 202 obj = self.client.bucket(bucket, self.provider).object(obj_name=key) 203 obj.put_content(body) 204 validated_attributes = validate_attributes(attributes) 205 if validated_attributes: 206 obj.set_custom_props(custom_metadata=validated_attributes, replace_existing=True) 207 208 return len(body) 209 210 return self._translate_errors(_invoke_api, operation="PUT", bucket=bucket, key=key) 211 212 def _get_object(self, path: str, byte_range: Optional[Range] = None) -> bytes: 213 bucket, key = split_path(path) 214 if byte_range: 215 bytes_range = f"bytes={byte_range.offset}-{byte_range.offset + byte_range.size - 1}" 216 else: 217 bytes_range = None 218 219 def _invoke_api() -> bytes: 220 obj = self.client.bucket(bucket, self.provider).object(obj_name=key) 221 if byte_range: 222 reader = obj.get(byte_range=bytes_range) # pyright: ignore [reportArgumentType] 223 else: 224 reader = obj.get() 225 return reader.read_all() 226 227 return self._translate_errors(_invoke_api, operation="GET", bucket=bucket, key=key) 228 229 def _copy_object(self, src_path: str, dest_path: str) -> int: 230 raise AttributeError("AIStore does not support copy operations") 231 232 def _delete_object(self, path: str, if_match: Optional[str] = None) -> None: 233 bucket, key = split_path(path) 234 235 def _invoke_api() -> None: 236 obj = self.client.bucket(bucket, self.provider).object(obj_name=key) 237 # AIS doesn't support if-match deletion, so we implement a fallback mechanism 238 if if_match: 239 raise NotImplementedError("AIStore does not support if-match deletion") 240 # Perform deletion 241 obj.delete() 242 243 return self._translate_errors(_invoke_api, operation="DELETE", bucket=bucket, key=key) 244 245 def _get_object_metadata(self, path: str, strict: bool = True) -> ObjectMetadata: 246 bucket, key = split_path(path) 247 if path.endswith("/") or (bucket and not key): 248 # If path ends with "/" or empty key name is provided, then assume it's a "directory", 249 # which metadata is not guaranteed to exist for cases such as 250 # "virtual prefix" that was never explicitly created. 251 if self._is_dir(path): 252 return ObjectMetadata( 253 key=path, 254 type="directory", 255 content_length=0, 256 last_modified=AWARE_DATETIME_MIN, 257 ) 258 else: 259 raise FileNotFoundError(f"Directory {path} does not exist.") 260 else: 261 262 def _invoke_api() -> ObjectMetadata: 263 obj = self.client.bucket(bck_name=bucket, provider=self.provider).object(obj_name=key) 264 headers = obj.head() 265 props = ObjectProps(headers) 266 267 return ObjectMetadata( 268 key=key, 269 content_length=int(props.size), # pyright: ignore [reportArgumentType] 270 last_modified=AWARE_DATETIME_MIN, 271 etag=props.checksum_value, 272 metadata=props.custom_metadata, 273 ) 274 275 return self._translate_errors(_invoke_api, operation="HEAD", bucket=bucket, key=key) 276 277 def _list_objects( 278 self, 279 path: str, 280 start_after: Optional[str] = None, 281 end_at: Optional[str] = None, 282 include_directories: bool = False, 283 ) -> Iterator[ObjectMetadata]: 284 bucket, prefix = split_path(path) 285 286 def _invoke_api() -> Iterator[ObjectMetadata]: 287 # AIS has no start key option like other object stores. 288 all_objects = self.client.bucket(bck_name=bucket, provider=self.provider).list_objects_iter( 289 prefix=prefix, props="name,size,atime,checksum,cone", page_size=DEFAULT_PAGE_SIZE 290 ) 291 292 # Assume AIS guarantees lexicographical order. 293 for bucket_entry in all_objects: 294 obj = bucket_entry.object 295 key = obj.name 296 props = bucket_entry.generate_object_props() 297 if (start_after is None or start_after < key) and (end_at is None or key <= end_at): 298 yield ObjectMetadata( 299 key=key, 300 content_length=int(props.size), 301 last_modified=AWARE_DATETIME_MIN, 302 etag=props.checksum_value, 303 ) 304 elif end_at is not None and end_at < key: 305 return 306 307 return self._translate_errors(_invoke_api, operation="LIST", bucket=bucket, key=prefix) 308 309 def _upload_file(self, remote_path: str, f: Union[str, IO], attributes: Optional[dict[str, str]] = None) -> int: 310 file_size: int = 0 311 312 if isinstance(f, str): 313 with open(f, "rb") as fp: 314 body = fp.read() 315 file_size = len(body) 316 self._put_object(remote_path, body, attributes=attributes) 317 else: 318 if isinstance(f, io.StringIO): 319 body = f.read().encode("utf-8") 320 file_size = len(body) 321 self._put_object(remote_path, body, attributes=attributes) 322 else: 323 body = f.read() 324 file_size = len(body) 325 self._put_object(remote_path, body, attributes=attributes) 326 327 return file_size 328 329 def _download_file(self, remote_path: str, f: Union[str, IO], metadata: Optional[ObjectMetadata] = None) -> int: 330 if metadata is None: 331 metadata = self._get_object_metadata(remote_path) 332 333 if isinstance(f, str): 334 if os.path.dirname(f): 335 os.makedirs(os.path.dirname(f), exist_ok=True) 336 with open(f, "wb") as fp: 337 fp.write(self._get_object(remote_path)) 338 else: 339 if isinstance(f, io.StringIO): 340 f.write(self._get_object(remote_path).decode("utf-8")) 341 else: 342 f.write(self._get_object(remote_path)) 343 344 return metadata.content_length