Source code for multistorageclient.providers.s3

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import codecs
 17import io
 18import os
 19import tempfile
 20from collections.abc import Callable, Iterator
 21from typing import IO, Any, Optional, TypeVar, Union
 22
 23import boto3
 24import botocore
 25from boto3.s3.transfer import TransferConfig
 26from botocore.credentials import RefreshableCredentials
 27from botocore.exceptions import ClientError, IncompleteReadError, ReadTimeoutError, ResponseStreamingError
 28from botocore.session import get_session
 29
 30from multistorageclient_rust import RustClient, RustRetryableError
 31
 32from ..instrumentation.utils import set_span_attribute
 33from ..rust_utils import run_async_rust_client_method
 34from ..telemetry import Telemetry
 35from ..types import (
 36    AWARE_DATETIME_MIN,
 37    Credentials,
 38    CredentialsProvider,
 39    ObjectMetadata,
 40    PreconditionFailedError,
 41    Range,
 42    RetryableError,
 43)
 44from ..utils import (
 45    split_path,
 46    validate_attributes,
 47)
 48from .base import BaseStorageProvider
 49
 50_T = TypeVar("_T")
 51
 52BOTO3_MAX_POOL_CONNECTIONS = 32
 53
 54MiB = 1024 * 1024
 55
 56# Python and Rust share the same multipart_threshold to keep the code simple.
 57MULTIPART_THRESHOLD = 64 * MiB
 58MULTIPART_CHUNKSIZE = 32 * MiB
 59IO_CHUNKSIZE = 32 * MiB
 60PYTHON_MAX_CONCURRENCY = 8
 61
 62PROVIDER = "s3"
 63
 64EXPRESS_ONEZONE_STORAGE_CLASS = "EXPRESS_ONEZONE"
 65
 66
 67def _extract_x_trans_id(response: Any) -> None:
 68    """Extract x-trans-id from boto3 response and set it as span attribute."""
 69    try:
 70        if response and isinstance(response, dict):
 71            headers = response.get("ResponseMetadata", {}).get("HTTPHeaders", {})
 72            if headers and isinstance(headers, dict) and "x-trans-id" in headers:
 73                set_span_attribute("x_trans_id", headers["x-trans-id"])
 74    except (KeyError, AttributeError, TypeError):
 75        # Silently ignore any errors in extraction
 76        pass
 77
 78
[docs] 79class StaticS3CredentialsProvider(CredentialsProvider): 80 """ 81 A concrete implementation of the :py:class:`multistorageclient.types.CredentialsProvider` that provides static S3 credentials. 82 """ 83 84 _access_key: str 85 _secret_key: str 86 _session_token: Optional[str] 87 88 def __init__(self, access_key: str, secret_key: str, session_token: Optional[str] = None): 89 """ 90 Initializes the :py:class:`StaticS3CredentialsProvider` with the provided access key, secret key, and optional 91 session token. 92 93 :param access_key: The access key for S3 authentication. 94 :param secret_key: The secret key for S3 authentication. 95 :param session_token: An optional session token for temporary credentials. 96 """ 97 self._access_key = access_key 98 self._secret_key = secret_key 99 self._session_token = session_token 100
[docs] 101 def get_credentials(self) -> Credentials: 102 return Credentials( 103 access_key=self._access_key, 104 secret_key=self._secret_key, 105 token=self._session_token, 106 expiration=None, 107 )
108
[docs] 109 def refresh_credentials(self) -> None: 110 pass
111 112
[docs] 113class S3StorageProvider(BaseStorageProvider): 114 """ 115 A concrete implementation of the :py:class:`multistorageclient.types.StorageProvider` for interacting with Amazon S3 or S3-compatible object stores. 116 """ 117 118 def __init__( 119 self, 120 region_name: str = "", 121 endpoint_url: str = "", 122 base_path: str = "", 123 credentials_provider: Optional[CredentialsProvider] = None, 124 config_dict: Optional[dict[str, Any]] = None, 125 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 126 **kwargs: Any, 127 ) -> None: 128 """ 129 Initializes the :py:class:`S3StorageProvider` with the region, endpoint URL, and optional credentials provider. 130 131 :param region_name: The AWS region where the S3 bucket is located. 132 :param endpoint_url: The custom endpoint URL for the S3 service. 133 :param base_path: The root prefix path within the S3 bucket where all operations will be scoped. 134 :param credentials_provider: The provider to retrieve S3 credentials. 135 :param config_dict: Resolved MSC config. 136 :param telemetry_provider: A function that provides a telemetry instance. 137 """ 138 super().__init__( 139 base_path=base_path, 140 provider_name=PROVIDER, 141 config_dict=config_dict, 142 telemetry_provider=telemetry_provider, 143 ) 144 145 self._region_name = region_name 146 self._endpoint_url = endpoint_url 147 self._credentials_provider = credentials_provider 148 149 self._signature_version = kwargs.get("signature_version", "") 150 self._s3_client = self._create_s3_client( 151 request_checksum_calculation=kwargs.get("request_checksum_calculation"), 152 response_checksum_validation=kwargs.get("response_checksum_validation"), 153 max_pool_connections=kwargs.get("max_pool_connections", BOTO3_MAX_POOL_CONNECTIONS), 154 connect_timeout=kwargs.get("connect_timeout"), 155 read_timeout=kwargs.get("read_timeout"), 156 retries=kwargs.get("retries"), 157 ) 158 self._transfer_config = TransferConfig( 159 multipart_threshold=int(kwargs.get("multipart_threshold", MULTIPART_THRESHOLD)), 160 max_concurrency=int(kwargs.get("max_concurrency", PYTHON_MAX_CONCURRENCY)), 161 multipart_chunksize=int(kwargs.get("multipart_chunksize", MULTIPART_CHUNKSIZE)), 162 io_chunksize=int(kwargs.get("io_chunksize", IO_CHUNKSIZE)), 163 use_threads=True, 164 ) 165 166 self._rust_client = None 167 if "rust_client" in kwargs: 168 # Inherit the rust client options from the kwargs 169 rust_client_options = kwargs["rust_client"] 170 if "max_pool_connections" in kwargs: 171 rust_client_options["max_pool_connections"] = kwargs["max_pool_connections"] 172 if "max_concurrency" in kwargs: 173 rust_client_options["max_concurrency"] = kwargs["max_concurrency"] 174 if "multipart_chunksize" in kwargs: 175 rust_client_options["multipart_chunksize"] = kwargs["multipart_chunksize"] 176 if "read_timeout" in kwargs: 177 rust_client_options["read_timeout"] = kwargs["read_timeout"] 178 if "connect_timeout" in kwargs: 179 rust_client_options["connect_timeout"] = kwargs["connect_timeout"] 180 if self._signature_version == "UNSIGNED": 181 rust_client_options["skip_signature"] = True 182 self._rust_client = self._create_rust_client(rust_client_options) 183 184 def _is_directory_bucket(self, bucket: str) -> bool: 185 """ 186 Determines if the bucket is a directory bucket based on bucket name. 187 """ 188 # S3 Express buckets have a specific naming convention 189 return "--x-s3" in bucket 190 191 def _create_s3_client( 192 self, 193 request_checksum_calculation: Optional[str] = None, 194 response_checksum_validation: Optional[str] = None, 195 max_pool_connections: int = BOTO3_MAX_POOL_CONNECTIONS, 196 connect_timeout: Union[float, int, None] = None, 197 read_timeout: Union[float, int, None] = None, 198 retries: Optional[dict[str, Any]] = None, 199 ): 200 """ 201 Creates and configures the boto3 S3 client, using refreshable credentials if possible. 202 203 :param request_checksum_calculation: When the underlying S3 client should calculate request checksums. See the equivalent option in the `AWS configuration file <https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#using-a-configuration-file>`_. 204 :param response_checksum_validation: When the underlying S3 client should validate response checksums. See the equivalent option in the `AWS configuration file <https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#using-a-configuration-file>`_. 205 :param max_pool_connections: The maximum number of connections to keep in a connection pool. 206 :param connect_timeout: The time in seconds till a timeout exception is thrown when attempting to make a connection. 207 :param read_timeout: The time in seconds till a timeout exception is thrown when attempting to read from a connection. 208 :param retries: A dictionary for configuration related to retry behavior. 209 210 :return: The configured S3 client. 211 """ 212 options = { 213 # https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html 214 "config": boto3.session.Config( # pyright: ignore [reportAttributeAccessIssue] 215 max_pool_connections=max_pool_connections, 216 connect_timeout=connect_timeout, 217 read_timeout=read_timeout, 218 retries=retries or {"mode": "standard"}, 219 request_checksum_calculation=request_checksum_calculation, 220 response_checksum_validation=response_checksum_validation, 221 ), 222 } 223 224 if self._region_name: 225 options["region_name"] = self._region_name 226 227 if self._endpoint_url: 228 options["endpoint_url"] = self._endpoint_url 229 230 if self._credentials_provider: 231 creds = self._fetch_credentials() 232 if "expiry_time" in creds and creds["expiry_time"]: 233 # Use RefreshableCredentials if expiry_time provided. 234 refreshable_credentials = RefreshableCredentials.create_from_metadata( 235 metadata=creds, refresh_using=self._fetch_credentials, method="custom-refresh" 236 ) 237 238 botocore_session = get_session() 239 botocore_session._credentials = refreshable_credentials 240 241 boto3_session = boto3.Session(botocore_session=botocore_session) 242 243 return boto3_session.client("s3", **options) 244 else: 245 # Add static credentials to the options dictionary 246 options["aws_access_key_id"] = creds["access_key"] 247 options["aws_secret_access_key"] = creds["secret_key"] 248 if creds["token"]: 249 options["aws_session_token"] = creds["token"] 250 251 if self._signature_version: 252 signature_config = botocore.config.Config( # pyright: ignore[reportAttributeAccessIssue] 253 signature_version=botocore.UNSIGNED 254 if self._signature_version == "UNSIGNED" 255 else self._signature_version 256 ) 257 options["config"] = options["config"].merge(signature_config) 258 259 # Fallback to standard credential chain. 260 return boto3.client("s3", **options) 261 262 def _create_rust_client(self, rust_client_options: Optional[dict[str, Any]] = None): 263 """ 264 Creates and configures the rust client, using refreshable credentials if possible. 265 """ 266 configs = {} 267 if self._region_name: 268 configs["region_name"] = self._region_name 269 270 # If the user specifies a bucket, use it. Otherwise, use the base path. 271 if rust_client_options and "bucket" in rust_client_options: 272 configs["bucket"] = rust_client_options["bucket"] 273 else: 274 bucket, _ = split_path(self._base_path) 275 configs["bucket"] = bucket 276 277 if self._endpoint_url: 278 configs["endpoint_url"] = self._endpoint_url 279 280 if rust_client_options: 281 if rust_client_options.get("allow_http", False): 282 configs["allow_http"] = True 283 if "max_concurrency" in rust_client_options: 284 configs["max_concurrency"] = rust_client_options["max_concurrency"] 285 if "max_pool_connections" in rust_client_options: 286 configs["max_pool_connections"] = rust_client_options["max_pool_connections"] 287 if "multipart_chunksize" in rust_client_options: 288 configs["multipart_chunksize"] = rust_client_options["multipart_chunksize"] 289 if "skip_signature" in rust_client_options: 290 configs["skip_signature"] = rust_client_options["skip_signature"] 291 292 return RustClient( 293 provider=PROVIDER, 294 configs=configs, 295 credentials_provider=self._credentials_provider, 296 ) 297 298 def _fetch_credentials(self) -> dict: 299 """ 300 Refreshes the S3 client if the current credentials are expired. 301 """ 302 if not self._credentials_provider: 303 raise RuntimeError("Cannot fetch credentials if no credential provider configured.") 304 self._credentials_provider.refresh_credentials() 305 credentials = self._credentials_provider.get_credentials() 306 return { 307 "access_key": credentials.access_key, 308 "secret_key": credentials.secret_key, 309 "token": credentials.token, 310 "expiry_time": credentials.expiration, 311 } 312 313 def _translate_errors( 314 self, 315 func: Callable[[], _T], 316 operation: str, 317 bucket: str, 318 key: str, 319 ) -> _T: 320 """ 321 Translates errors like timeouts and client errors. 322 323 TODO: Remove tracing from this method. 324 325 :param func: The function that performs the actual S3 operation. 326 :param operation: The type of operation being performed (e.g., "PUT", "GET", "DELETE"). 327 :param bucket: The name of the S3 bucket involved in the operation. 328 :param key: The key of the object within the S3 bucket. 329 330 :return: The result of the S3 operation, typically the return value of the `func` callable. 331 """ 332 # Set basic operation attributes 333 set_span_attribute("s3_operation", operation) 334 set_span_attribute("s3_bucket", bucket) 335 set_span_attribute("s3_key", key) 336 337 try: 338 return func() 339 except ClientError as error: 340 status_code = error.response["ResponseMetadata"]["HTTPStatusCode"] 341 request_id = error.response["ResponseMetadata"].get("RequestId") 342 host_id = error.response["ResponseMetadata"].get("HostId") 343 header = error.response["ResponseMetadata"].get("HTTPHeaders", {}) 344 error_code = error.response["Error"]["Code"] 345 346 # Ensure header is a dictionary before trying to get from it 347 x_trans_id = header.get("x-trans-id") if isinstance(header, dict) else None 348 349 # Record error details in span 350 set_span_attribute("request_id", request_id) 351 set_span_attribute("host_id", host_id) 352 353 error_info = f"request_id: {request_id}, host_id: {host_id}, status_code: {status_code}" 354 if x_trans_id: 355 error_info += f", x-trans-id: {x_trans_id}" 356 set_span_attribute("x_trans_id", x_trans_id) 357 358 if status_code == 404: 359 if error_code == "NoSuchUpload": 360 error_message = error.response["Error"]["Message"] 361 raise RetryableError(f"Multipart upload failed for {bucket}/{key}: {error_message}") from error 362 raise FileNotFoundError(f"Object {bucket}/{key} does not exist. {error_info}") # pylint: disable=raise-missing-from 363 elif status_code == 412: # Precondition Failed 364 raise PreconditionFailedError( 365 f"ETag mismatch for {operation} operation on {bucket}/{key}. {error_info}" 366 ) from error 367 elif status_code == 429: 368 raise RetryableError( 369 f"Too many request to {operation} object(s) at {bucket}/{key}. {error_info}" 370 ) from error 371 elif status_code == 503: 372 raise RetryableError( 373 f"Service unavailable when {operation} object(s) at {bucket}/{key}. {error_info}" 374 ) from error 375 elif status_code == 501: 376 raise NotImplementedError( 377 f"Operation {operation} not implemented for object(s) at {bucket}/{key}. {error_info}" 378 ) from error 379 else: 380 raise RuntimeError( 381 f"Failed to {operation} object(s) at {bucket}/{key}. {error_info}, " 382 f"error_type: {type(error).__name__}" 383 ) from error 384 except FileNotFoundError: 385 raise 386 except (ReadTimeoutError, IncompleteReadError, ResponseStreamingError) as error: 387 raise RetryableError( 388 f"Failed to {operation} object(s) at {bucket}/{key} due to network timeout or incomplete read. " 389 f"error_type: {type(error).__name__}" 390 ) from error 391 except RustRetryableError as error: 392 raise RetryableError( 393 f"Failed to {operation} object(s) at {bucket}/{key} due to exhausted retries from Rust. " 394 f"error_type: {type(error).__name__}" 395 ) from error 396 except Exception as error: 397 raise RuntimeError( 398 f"Failed to {operation} object(s) at {bucket}/{key}, error type: {type(error).__name__}, error: {error}" 399 ) from error 400 401 def _put_object( 402 self, 403 path: str, 404 body: bytes, 405 if_match: Optional[str] = None, 406 if_none_match: Optional[str] = None, 407 attributes: Optional[dict[str, str]] = None, 408 content_type: Optional[str] = None, 409 ) -> int: 410 """ 411 Uploads an object to the specified S3 path. 412 413 :param path: The S3 path where the object will be uploaded. 414 :param body: The content of the object as bytes. 415 :param if_match: Optional If-Match header value. Use "*" to only upload if the object doesn't exist. 416 :param if_none_match: Optional If-None-Match header value. Use "*" to only upload if the object doesn't exist. 417 :param attributes: Optional attributes to attach to the object. 418 :param content_type: Optional Content-Type header value. 419 """ 420 bucket, key = split_path(path) 421 422 def _invoke_api() -> int: 423 kwargs = {"Bucket": bucket, "Key": key, "Body": body} 424 if content_type: 425 kwargs["ContentType"] = content_type 426 if self._is_directory_bucket(bucket): 427 kwargs["StorageClass"] = EXPRESS_ONEZONE_STORAGE_CLASS 428 if if_match: 429 kwargs["IfMatch"] = if_match 430 if if_none_match: 431 kwargs["IfNoneMatch"] = if_none_match 432 validated_attributes = validate_attributes(attributes) 433 if validated_attributes: 434 kwargs["Metadata"] = validated_attributes 435 436 # TODO(NGCDP-5804): Add support to update ContentType header in Rust client 437 rust_unsupported_feature_keys = {"Metadata", "StorageClass", "IfMatch", "IfNoneMatch", "ContentType"} 438 if ( 439 self._rust_client 440 # Rust client doesn't support creating objects with trailing /, see https://github.com/apache/arrow-rs/issues/7026 441 and not path.endswith("/") 442 and all(key not in kwargs for key in rust_unsupported_feature_keys) 443 ): 444 response = run_async_rust_client_method(self._rust_client, "put", key, body) 445 else: 446 # Capture the response from put_object 447 response = self._s3_client.put_object(**kwargs) 448 449 # Extract and set x-trans-id if present 450 _extract_x_trans_id(response) 451 452 return len(body) 453 454 return self._translate_errors(_invoke_api, operation="PUT", bucket=bucket, key=key) 455 456 def _get_object(self, path: str, byte_range: Optional[Range] = None) -> bytes: 457 bucket, key = split_path(path) 458 459 def _invoke_api() -> bytes: 460 if byte_range: 461 bytes_range = f"bytes={byte_range.offset}-{byte_range.offset + byte_range.size - 1}" 462 if self._rust_client: 463 response = run_async_rust_client_method( 464 self._rust_client, "get", key, byte_range.offset, byte_range.offset + byte_range.size - 1 465 ) 466 return response 467 else: 468 response = self._s3_client.get_object(Bucket=bucket, Key=key, Range=bytes_range) 469 else: 470 if self._rust_client: 471 response = run_async_rust_client_method(self._rust_client, "get", key) 472 return response 473 else: 474 response = self._s3_client.get_object(Bucket=bucket, Key=key) 475 476 # Extract and set x-trans-id if present 477 _extract_x_trans_id(response) 478 479 return response["Body"].read() 480 481 return self._translate_errors(_invoke_api, operation="GET", bucket=bucket, key=key) 482 483 def _copy_object(self, src_path: str, dest_path: str) -> int: 484 src_bucket, src_key = split_path(src_path) 485 dest_bucket, dest_key = split_path(dest_path) 486 487 src_object = self._get_object_metadata(src_path) 488 489 def _invoke_api() -> int: 490 response = self._s3_client.copy( 491 CopySource={"Bucket": src_bucket, "Key": src_key}, 492 Bucket=dest_bucket, 493 Key=dest_key, 494 Config=self._transfer_config, 495 ) 496 497 # Extract and set x-trans-id if present 498 _extract_x_trans_id(response) 499 500 return src_object.content_length 501 502 return self._translate_errors(_invoke_api, operation="COPY", bucket=dest_bucket, key=dest_key) 503 504 def _delete_object(self, path: str, if_match: Optional[str] = None) -> None: 505 bucket, key = split_path(path) 506 507 def _invoke_api() -> None: 508 # conditionally delete the object if if_match(etag) is provided, if not, delete the object unconditionally 509 if if_match: 510 response = self._s3_client.delete_object(Bucket=bucket, Key=key, IfMatch=if_match) 511 else: 512 response = self._s3_client.delete_object(Bucket=bucket, Key=key) 513 514 # Extract and set x-trans-id if present 515 _extract_x_trans_id(response) 516 517 return self._translate_errors(_invoke_api, operation="DELETE", bucket=bucket, key=key) 518 519 def _is_dir(self, path: str) -> bool: 520 # Ensure the path ends with '/' to mimic a directory 521 path = self._append_delimiter(path) 522 523 bucket, key = split_path(path) 524 525 def _invoke_api() -> bool: 526 # List objects with the given prefix 527 response = self._s3_client.list_objects_v2(Bucket=bucket, Prefix=key, MaxKeys=1, Delimiter="/") 528 529 # Extract and set x-trans-id if present 530 _extract_x_trans_id(response) 531 532 # Check if there are any contents or common prefixes 533 return bool(response.get("Contents", []) or response.get("CommonPrefixes", [])) 534 535 return self._translate_errors(_invoke_api, operation="LIST", bucket=bucket, key=key) 536 537 def _get_object_metadata(self, path: str, strict: bool = True) -> ObjectMetadata: 538 bucket, key = split_path(path) 539 if path.endswith("/") or (bucket and not key): 540 # If path ends with "/" or empty key name is provided, then assume it's a "directory", 541 # which metadata is not guaranteed to exist for cases such as 542 # "virtual prefix" that was never explicitly created. 543 if self._is_dir(path): 544 return ObjectMetadata( 545 key=path, 546 type="directory", 547 content_length=0, 548 last_modified=AWARE_DATETIME_MIN, 549 ) 550 else: 551 raise FileNotFoundError(f"Directory {path} does not exist.") 552 else: 553 554 def _invoke_api() -> ObjectMetadata: 555 response = self._s3_client.head_object(Bucket=bucket, Key=key) 556 557 # Extract and set x-trans-id if present 558 _extract_x_trans_id(response) 559 560 return ObjectMetadata( 561 key=path, 562 type="file", 563 content_length=response["ContentLength"], 564 content_type=response["ContentType"], 565 last_modified=response["LastModified"], 566 etag=response["ETag"].strip('"'), 567 storage_class=response.get("StorageClass"), 568 metadata=response.get("Metadata"), 569 ) 570 571 try: 572 return self._translate_errors(_invoke_api, operation="HEAD", bucket=bucket, key=key) 573 except FileNotFoundError as error: 574 if strict: 575 # If the object does not exist on the given path, we will append a trailing slash and 576 # check if the path is a directory. 577 path = self._append_delimiter(path) 578 if self._is_dir(path): 579 return ObjectMetadata( 580 key=path, 581 type="directory", 582 content_length=0, 583 last_modified=AWARE_DATETIME_MIN, 584 ) 585 raise error 586 587 def _list_objects( 588 self, 589 path: str, 590 start_after: Optional[str] = None, 591 end_at: Optional[str] = None, 592 include_directories: bool = False, 593 ) -> Iterator[ObjectMetadata]: 594 bucket, prefix = split_path(path) 595 596 def _invoke_api() -> Iterator[ObjectMetadata]: 597 paginator = self._s3_client.get_paginator("list_objects_v2") 598 if include_directories: 599 page_iterator = paginator.paginate( 600 Bucket=bucket, Prefix=prefix, Delimiter="/", StartAfter=(start_after or "") 601 ) 602 else: 603 page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix, StartAfter=(start_after or "")) 604 605 for page in page_iterator: 606 for item in page.get("CommonPrefixes", []): 607 yield ObjectMetadata( 608 key=os.path.join(bucket, item["Prefix"].rstrip("/")), 609 type="directory", 610 content_length=0, 611 last_modified=AWARE_DATETIME_MIN, 612 ) 613 614 # S3 guarantees lexicographical order for general purpose buckets (for 615 # normal S3) but not directory buckets (for S3 Express One Zone). 616 for response_object in page.get("Contents", []): 617 key = response_object["Key"] 618 if end_at is None or key <= end_at: 619 if key.endswith("/"): 620 if include_directories: 621 yield ObjectMetadata( 622 key=os.path.join(bucket, key.rstrip("/")), 623 type="directory", 624 content_length=0, 625 last_modified=response_object["LastModified"], 626 ) 627 else: 628 yield ObjectMetadata( 629 key=os.path.join(bucket, key), 630 type="file", 631 content_length=response_object["Size"], 632 last_modified=response_object["LastModified"], 633 etag=response_object["ETag"].strip('"'), 634 storage_class=response_object.get("StorageClass"), # Pass storage_class 635 ) 636 else: 637 return 638 639 return self._translate_errors(_invoke_api, operation="LIST", bucket=bucket, key=prefix) 640 641 def _upload_file( 642 self, 643 remote_path: str, 644 f: Union[str, IO], 645 attributes: Optional[dict[str, str]] = None, 646 content_type: Optional[str] = None, 647 ) -> int: 648 file_size: int = 0 649 650 if isinstance(f, str): 651 bucket, key = split_path(remote_path) 652 file_size = os.path.getsize(f) 653 654 # Upload small files 655 if file_size <= self._transfer_config.multipart_threshold: 656 if self._rust_client and not attributes and not content_type: 657 run_async_rust_client_method(self._rust_client, "upload", f, key) 658 else: 659 with open(f, "rb") as fp: 660 self._put_object(remote_path, fp.read(), attributes=attributes, content_type=content_type) 661 return file_size 662 663 # Upload large files using TransferConfig 664 def _invoke_api() -> int: 665 extra_args = {} 666 if content_type: 667 extra_args["ContentType"] = content_type 668 if self._is_directory_bucket(bucket): 669 extra_args["StorageClass"] = EXPRESS_ONEZONE_STORAGE_CLASS 670 validated_attributes = validate_attributes(attributes) 671 if validated_attributes: 672 extra_args["Metadata"] = validated_attributes 673 if self._rust_client and not extra_args: 674 response = run_async_rust_client_method(self._rust_client, "upload_multipart_from_file", f, key) 675 else: 676 response = self._s3_client.upload_file( 677 Filename=f, 678 Bucket=bucket, 679 Key=key, 680 Config=self._transfer_config, 681 ExtraArgs=extra_args, 682 ) 683 684 # Extract and set x-trans-id if present 685 _extract_x_trans_id(response) 686 687 return file_size 688 689 return self._translate_errors(_invoke_api, operation="PUT", bucket=bucket, key=key) 690 else: 691 # Upload small files 692 f.seek(0, io.SEEK_END) 693 file_size = f.tell() 694 f.seek(0) 695 696 if file_size <= self._transfer_config.multipart_threshold: 697 if isinstance(f, io.StringIO): 698 self._put_object( 699 remote_path, f.read().encode("utf-8"), attributes=attributes, content_type=content_type 700 ) 701 else: 702 self._put_object(remote_path, f.read(), attributes=attributes, content_type=content_type) 703 return file_size 704 705 # Upload large files using TransferConfig 706 bucket, key = split_path(remote_path) 707 708 def _invoke_api() -> int: 709 extra_args = {} 710 if content_type: 711 extra_args["ContentType"] = content_type 712 if self._is_directory_bucket(bucket): 713 extra_args["StorageClass"] = EXPRESS_ONEZONE_STORAGE_CLASS 714 validated_attributes = validate_attributes(attributes) 715 if validated_attributes: 716 extra_args["Metadata"] = validated_attributes 717 self._s3_client.upload_fileobj( 718 Fileobj=f, 719 Bucket=bucket, 720 Key=key, 721 Config=self._transfer_config, 722 ExtraArgs=extra_args, 723 ) 724 725 return file_size 726 727 return self._translate_errors(_invoke_api, operation="PUT", bucket=bucket, key=key) 728 729 def _download_file(self, remote_path: str, f: Union[str, IO], metadata: Optional[ObjectMetadata] = None) -> int: 730 if metadata is None: 731 metadata = self._get_object_metadata(remote_path) 732 733 if isinstance(f, str): 734 bucket, key = split_path(remote_path) 735 if os.path.dirname(f): 736 os.makedirs(os.path.dirname(f), exist_ok=True) 737 738 # Download small files 739 if metadata.content_length <= self._transfer_config.multipart_threshold: 740 if self._rust_client: 741 run_async_rust_client_method(self._rust_client, "download", key, f) 742 else: 743 with tempfile.NamedTemporaryFile(mode="wb", delete=False, dir=os.path.dirname(f), prefix=".") as fp: 744 temp_file_path = fp.name 745 fp.write(self._get_object(remote_path)) 746 os.rename(src=temp_file_path, dst=f) 747 return metadata.content_length 748 749 # Download large files using TransferConfig 750 def _invoke_api() -> int: 751 response = None 752 with tempfile.NamedTemporaryFile(mode="wb", delete=False, dir=os.path.dirname(f), prefix=".") as fp: 753 temp_file_path = fp.name 754 if self._rust_client: 755 response = run_async_rust_client_method( 756 self._rust_client, "download_multipart_to_file", key, temp_file_path 757 ) 758 else: 759 response = self._s3_client.download_fileobj( 760 Bucket=bucket, 761 Key=key, 762 Fileobj=fp, 763 Config=self._transfer_config, 764 ) 765 766 # Extract and set x-trans-id if present 767 _extract_x_trans_id(response) 768 os.rename(src=temp_file_path, dst=f) 769 770 return metadata.content_length 771 772 return self._translate_errors(_invoke_api, operation="GET", bucket=bucket, key=key) 773 else: 774 # Download small files 775 if metadata.content_length <= self._transfer_config.multipart_threshold: 776 response = self._get_object(remote_path) 777 # Python client returns `bytes`, but Rust client returns a object implements buffer protocol, 778 # so we need to check whether `.decode()` is available. 779 if isinstance(f, io.StringIO): 780 if hasattr(response, "decode"): 781 f.write(response.decode("utf-8")) 782 else: 783 f.write(codecs.decode(memoryview(response), "utf-8")) 784 else: 785 f.write(response) 786 return metadata.content_length 787 788 # Download large files using TransferConfig 789 bucket, key = split_path(remote_path) 790 791 def _invoke_api() -> int: 792 response = self._s3_client.download_fileobj( 793 Bucket=bucket, 794 Key=key, 795 Fileobj=f, 796 Config=self._transfer_config, 797 ) 798 799 # Extract and set x-trans-id if present 800 _extract_x_trans_id(response) 801 802 return metadata.content_length 803 804 return self._translate_errors(_invoke_api, operation="GET", bucket=bucket, key=key)