1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import codecs
17import io
18import os
19import tempfile
20from collections.abc import Callable, Iterator
21from typing import IO, Any, Optional, TypeVar, Union
22
23import boto3
24import botocore
25from boto3.s3.transfer import TransferConfig
26from botocore.credentials import RefreshableCredentials
27from botocore.exceptions import ClientError, IncompleteReadError, ReadTimeoutError, ResponseStreamingError
28from botocore.session import get_session
29
30from multistorageclient_rust import RustClient, RustRetryableError
31
32from ..instrumentation.utils import set_span_attribute
33from ..rust_utils import run_async_rust_client_method
34from ..telemetry import Telemetry
35from ..types import (
36 AWARE_DATETIME_MIN,
37 Credentials,
38 CredentialsProvider,
39 ObjectMetadata,
40 PreconditionFailedError,
41 Range,
42 RetryableError,
43)
44from ..utils import (
45 split_path,
46 validate_attributes,
47)
48from .base import BaseStorageProvider
49
50_T = TypeVar("_T")
51
52BOTO3_MAX_POOL_CONNECTIONS = 32
53
54MiB = 1024 * 1024
55
56# Python and Rust share the same multipart_threshold to keep the code simple.
57MULTIPART_THRESHOLD = 64 * MiB
58MULTIPART_CHUNKSIZE = 32 * MiB
59IO_CHUNKSIZE = 32 * MiB
60PYTHON_MAX_CONCURRENCY = 8
61
62PROVIDER = "s3"
63
64EXPRESS_ONEZONE_STORAGE_CLASS = "EXPRESS_ONEZONE"
65
66
67def _extract_x_trans_id(response: Any) -> None:
68 """Extract x-trans-id from boto3 response and set it as span attribute."""
69 try:
70 if response and isinstance(response, dict):
71 headers = response.get("ResponseMetadata", {}).get("HTTPHeaders", {})
72 if headers and isinstance(headers, dict) and "x-trans-id" in headers:
73 set_span_attribute("x_trans_id", headers["x-trans-id"])
74 except (KeyError, AttributeError, TypeError):
75 # Silently ignore any errors in extraction
76 pass
77
78
[docs]
79class StaticS3CredentialsProvider(CredentialsProvider):
80 """
81 A concrete implementation of the :py:class:`multistorageclient.types.CredentialsProvider` that provides static S3 credentials.
82 """
83
84 _access_key: str
85 _secret_key: str
86 _session_token: Optional[str]
87
88 def __init__(self, access_key: str, secret_key: str, session_token: Optional[str] = None):
89 """
90 Initializes the :py:class:`StaticS3CredentialsProvider` with the provided access key, secret key, and optional
91 session token.
92
93 :param access_key: The access key for S3 authentication.
94 :param secret_key: The secret key for S3 authentication.
95 :param session_token: An optional session token for temporary credentials.
96 """
97 self._access_key = access_key
98 self._secret_key = secret_key
99 self._session_token = session_token
100
[docs]
101 def get_credentials(self) -> Credentials:
102 return Credentials(
103 access_key=self._access_key,
104 secret_key=self._secret_key,
105 token=self._session_token,
106 expiration=None,
107 )
108
[docs]
109 def refresh_credentials(self) -> None:
110 pass
111
112
[docs]
113class S3StorageProvider(BaseStorageProvider):
114 """
115 A concrete implementation of the :py:class:`multistorageclient.types.StorageProvider` for interacting with Amazon S3 or S3-compatible object stores.
116 """
117
118 def __init__(
119 self,
120 region_name: str = "",
121 endpoint_url: str = "",
122 base_path: str = "",
123 credentials_provider: Optional[CredentialsProvider] = None,
124 config_dict: Optional[dict[str, Any]] = None,
125 telemetry_provider: Optional[Callable[[], Telemetry]] = None,
126 **kwargs: Any,
127 ) -> None:
128 """
129 Initializes the :py:class:`S3StorageProvider` with the region, endpoint URL, and optional credentials provider.
130
131 :param region_name: The AWS region where the S3 bucket is located.
132 :param endpoint_url: The custom endpoint URL for the S3 service.
133 :param base_path: The root prefix path within the S3 bucket where all operations will be scoped.
134 :param credentials_provider: The provider to retrieve S3 credentials.
135 :param config_dict: Resolved MSC config.
136 :param telemetry_provider: A function that provides a telemetry instance.
137 """
138 super().__init__(
139 base_path=base_path,
140 provider_name=PROVIDER,
141 config_dict=config_dict,
142 telemetry_provider=telemetry_provider,
143 )
144
145 self._region_name = region_name
146 self._endpoint_url = endpoint_url
147 self._credentials_provider = credentials_provider
148
149 self._signature_version = kwargs.get("signature_version", "")
150 self._s3_client = self._create_s3_client(
151 request_checksum_calculation=kwargs.get("request_checksum_calculation"),
152 response_checksum_validation=kwargs.get("response_checksum_validation"),
153 max_pool_connections=kwargs.get("max_pool_connections", BOTO3_MAX_POOL_CONNECTIONS),
154 connect_timeout=kwargs.get("connect_timeout"),
155 read_timeout=kwargs.get("read_timeout"),
156 retries=kwargs.get("retries"),
157 )
158 self._transfer_config = TransferConfig(
159 multipart_threshold=int(kwargs.get("multipart_threshold", MULTIPART_THRESHOLD)),
160 max_concurrency=int(kwargs.get("max_concurrency", PYTHON_MAX_CONCURRENCY)),
161 multipart_chunksize=int(kwargs.get("multipart_chunksize", MULTIPART_CHUNKSIZE)),
162 io_chunksize=int(kwargs.get("io_chunksize", IO_CHUNKSIZE)),
163 use_threads=True,
164 )
165
166 self._rust_client = None
167 if "rust_client" in kwargs:
168 # Inherit the rust client options from the kwargs
169 rust_client_options = kwargs["rust_client"]
170 if "max_pool_connections" in kwargs:
171 rust_client_options["max_pool_connections"] = kwargs["max_pool_connections"]
172 if "max_concurrency" in kwargs:
173 rust_client_options["max_concurrency"] = kwargs["max_concurrency"]
174 if "multipart_chunksize" in kwargs:
175 rust_client_options["multipart_chunksize"] = kwargs["multipart_chunksize"]
176 if "read_timeout" in kwargs:
177 rust_client_options["read_timeout"] = kwargs["read_timeout"]
178 if "connect_timeout" in kwargs:
179 rust_client_options["connect_timeout"] = kwargs["connect_timeout"]
180 if self._signature_version == "UNSIGNED":
181 rust_client_options["skip_signature"] = True
182 self._rust_client = self._create_rust_client(rust_client_options)
183
184 def _is_directory_bucket(self, bucket: str) -> bool:
185 """
186 Determines if the bucket is a directory bucket based on bucket name.
187 """
188 # S3 Express buckets have a specific naming convention
189 return "--x-s3" in bucket
190
191 def _create_s3_client(
192 self,
193 request_checksum_calculation: Optional[str] = None,
194 response_checksum_validation: Optional[str] = None,
195 max_pool_connections: int = BOTO3_MAX_POOL_CONNECTIONS,
196 connect_timeout: Union[float, int, None] = None,
197 read_timeout: Union[float, int, None] = None,
198 retries: Optional[dict[str, Any]] = None,
199 ):
200 """
201 Creates and configures the boto3 S3 client, using refreshable credentials if possible.
202
203 :param request_checksum_calculation: When the underlying S3 client should calculate request checksums. See the equivalent option in the `AWS configuration file <https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#using-a-configuration-file>`_.
204 :param response_checksum_validation: When the underlying S3 client should validate response checksums. See the equivalent option in the `AWS configuration file <https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#using-a-configuration-file>`_.
205 :param max_pool_connections: The maximum number of connections to keep in a connection pool.
206 :param connect_timeout: The time in seconds till a timeout exception is thrown when attempting to make a connection.
207 :param read_timeout: The time in seconds till a timeout exception is thrown when attempting to read from a connection.
208 :param retries: A dictionary for configuration related to retry behavior.
209
210 :return: The configured S3 client.
211 """
212 options = {
213 # https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html
214 "config": boto3.session.Config( # pyright: ignore [reportAttributeAccessIssue]
215 max_pool_connections=max_pool_connections,
216 connect_timeout=connect_timeout,
217 read_timeout=read_timeout,
218 retries=retries or {"mode": "standard"},
219 request_checksum_calculation=request_checksum_calculation,
220 response_checksum_validation=response_checksum_validation,
221 ),
222 }
223
224 if self._region_name:
225 options["region_name"] = self._region_name
226
227 if self._endpoint_url:
228 options["endpoint_url"] = self._endpoint_url
229
230 if self._credentials_provider:
231 creds = self._fetch_credentials()
232 if "expiry_time" in creds and creds["expiry_time"]:
233 # Use RefreshableCredentials if expiry_time provided.
234 refreshable_credentials = RefreshableCredentials.create_from_metadata(
235 metadata=creds, refresh_using=self._fetch_credentials, method="custom-refresh"
236 )
237
238 botocore_session = get_session()
239 botocore_session._credentials = refreshable_credentials
240
241 boto3_session = boto3.Session(botocore_session=botocore_session)
242
243 return boto3_session.client("s3", **options)
244 else:
245 # Add static credentials to the options dictionary
246 options["aws_access_key_id"] = creds["access_key"]
247 options["aws_secret_access_key"] = creds["secret_key"]
248 if creds["token"]:
249 options["aws_session_token"] = creds["token"]
250
251 if self._signature_version:
252 signature_config = botocore.config.Config( # pyright: ignore[reportAttributeAccessIssue]
253 signature_version=botocore.UNSIGNED
254 if self._signature_version == "UNSIGNED"
255 else self._signature_version
256 )
257 options["config"] = options["config"].merge(signature_config)
258
259 # Fallback to standard credential chain.
260 return boto3.client("s3", **options)
261
262 def _create_rust_client(self, rust_client_options: Optional[dict[str, Any]] = None):
263 """
264 Creates and configures the rust client, using refreshable credentials if possible.
265 """
266 configs = {}
267 if self._region_name:
268 configs["region_name"] = self._region_name
269
270 # If the user specifies a bucket, use it. Otherwise, use the base path.
271 if rust_client_options and "bucket" in rust_client_options:
272 configs["bucket"] = rust_client_options["bucket"]
273 else:
274 bucket, _ = split_path(self._base_path)
275 configs["bucket"] = bucket
276
277 if self._endpoint_url:
278 configs["endpoint_url"] = self._endpoint_url
279
280 if rust_client_options:
281 if rust_client_options.get("allow_http", False):
282 configs["allow_http"] = True
283 if "max_concurrency" in rust_client_options:
284 configs["max_concurrency"] = rust_client_options["max_concurrency"]
285 if "max_pool_connections" in rust_client_options:
286 configs["max_pool_connections"] = rust_client_options["max_pool_connections"]
287 if "multipart_chunksize" in rust_client_options:
288 configs["multipart_chunksize"] = rust_client_options["multipart_chunksize"]
289 if "skip_signature" in rust_client_options:
290 configs["skip_signature"] = rust_client_options["skip_signature"]
291
292 return RustClient(
293 provider=PROVIDER,
294 configs=configs,
295 credentials_provider=self._credentials_provider,
296 )
297
298 def _fetch_credentials(self) -> dict:
299 """
300 Refreshes the S3 client if the current credentials are expired.
301 """
302 if not self._credentials_provider:
303 raise RuntimeError("Cannot fetch credentials if no credential provider configured.")
304 self._credentials_provider.refresh_credentials()
305 credentials = self._credentials_provider.get_credentials()
306 return {
307 "access_key": credentials.access_key,
308 "secret_key": credentials.secret_key,
309 "token": credentials.token,
310 "expiry_time": credentials.expiration,
311 }
312
313 def _translate_errors(
314 self,
315 func: Callable[[], _T],
316 operation: str,
317 bucket: str,
318 key: str,
319 ) -> _T:
320 """
321 Translates errors like timeouts and client errors.
322
323 TODO: Remove tracing from this method.
324
325 :param func: The function that performs the actual S3 operation.
326 :param operation: The type of operation being performed (e.g., "PUT", "GET", "DELETE").
327 :param bucket: The name of the S3 bucket involved in the operation.
328 :param key: The key of the object within the S3 bucket.
329
330 :return: The result of the S3 operation, typically the return value of the `func` callable.
331 """
332 # Set basic operation attributes
333 set_span_attribute("s3_operation", operation)
334 set_span_attribute("s3_bucket", bucket)
335 set_span_attribute("s3_key", key)
336
337 try:
338 return func()
339 except ClientError as error:
340 status_code = error.response["ResponseMetadata"]["HTTPStatusCode"]
341 request_id = error.response["ResponseMetadata"].get("RequestId")
342 host_id = error.response["ResponseMetadata"].get("HostId")
343 header = error.response["ResponseMetadata"].get("HTTPHeaders", {})
344 error_code = error.response["Error"]["Code"]
345
346 # Ensure header is a dictionary before trying to get from it
347 x_trans_id = header.get("x-trans-id") if isinstance(header, dict) else None
348
349 # Record error details in span
350 set_span_attribute("request_id", request_id)
351 set_span_attribute("host_id", host_id)
352
353 error_info = f"request_id: {request_id}, host_id: {host_id}, status_code: {status_code}"
354 if x_trans_id:
355 error_info += f", x-trans-id: {x_trans_id}"
356 set_span_attribute("x_trans_id", x_trans_id)
357
358 if status_code == 404:
359 if error_code == "NoSuchUpload":
360 error_message = error.response["Error"]["Message"]
361 raise RetryableError(f"Multipart upload failed for {bucket}/{key}: {error_message}") from error
362 raise FileNotFoundError(f"Object {bucket}/{key} does not exist. {error_info}") # pylint: disable=raise-missing-from
363 elif status_code == 412: # Precondition Failed
364 raise PreconditionFailedError(
365 f"ETag mismatch for {operation} operation on {bucket}/{key}. {error_info}"
366 ) from error
367 elif status_code == 429:
368 raise RetryableError(
369 f"Too many request to {operation} object(s) at {bucket}/{key}. {error_info}"
370 ) from error
371 elif status_code == 503:
372 raise RetryableError(
373 f"Service unavailable when {operation} object(s) at {bucket}/{key}. {error_info}"
374 ) from error
375 elif status_code == 501:
376 raise NotImplementedError(
377 f"Operation {operation} not implemented for object(s) at {bucket}/{key}. {error_info}"
378 ) from error
379 else:
380 raise RuntimeError(
381 f"Failed to {operation} object(s) at {bucket}/{key}. {error_info}, "
382 f"error_type: {type(error).__name__}"
383 ) from error
384 except FileNotFoundError:
385 raise
386 except (ReadTimeoutError, IncompleteReadError, ResponseStreamingError) as error:
387 raise RetryableError(
388 f"Failed to {operation} object(s) at {bucket}/{key} due to network timeout or incomplete read. "
389 f"error_type: {type(error).__name__}"
390 ) from error
391 except RustRetryableError as error:
392 raise RetryableError(
393 f"Failed to {operation} object(s) at {bucket}/{key} due to exhausted retries from Rust. "
394 f"error_type: {type(error).__name__}"
395 ) from error
396 except Exception as error:
397 raise RuntimeError(
398 f"Failed to {operation} object(s) at {bucket}/{key}, error type: {type(error).__name__}, error: {error}"
399 ) from error
400
401 def _put_object(
402 self,
403 path: str,
404 body: bytes,
405 if_match: Optional[str] = None,
406 if_none_match: Optional[str] = None,
407 attributes: Optional[dict[str, str]] = None,
408 content_type: Optional[str] = None,
409 ) -> int:
410 """
411 Uploads an object to the specified S3 path.
412
413 :param path: The S3 path where the object will be uploaded.
414 :param body: The content of the object as bytes.
415 :param if_match: Optional If-Match header value. Use "*" to only upload if the object doesn't exist.
416 :param if_none_match: Optional If-None-Match header value. Use "*" to only upload if the object doesn't exist.
417 :param attributes: Optional attributes to attach to the object.
418 :param content_type: Optional Content-Type header value.
419 """
420 bucket, key = split_path(path)
421
422 def _invoke_api() -> int:
423 kwargs = {"Bucket": bucket, "Key": key, "Body": body}
424 if content_type:
425 kwargs["ContentType"] = content_type
426 if self._is_directory_bucket(bucket):
427 kwargs["StorageClass"] = EXPRESS_ONEZONE_STORAGE_CLASS
428 if if_match:
429 kwargs["IfMatch"] = if_match
430 if if_none_match:
431 kwargs["IfNoneMatch"] = if_none_match
432 validated_attributes = validate_attributes(attributes)
433 if validated_attributes:
434 kwargs["Metadata"] = validated_attributes
435
436 # TODO(NGCDP-5804): Add support to update ContentType header in Rust client
437 rust_unsupported_feature_keys = {"Metadata", "StorageClass", "IfMatch", "IfNoneMatch", "ContentType"}
438 if (
439 self._rust_client
440 # Rust client doesn't support creating objects with trailing /, see https://github.com/apache/arrow-rs/issues/7026
441 and not path.endswith("/")
442 and all(key not in kwargs for key in rust_unsupported_feature_keys)
443 ):
444 response = run_async_rust_client_method(self._rust_client, "put", key, body)
445 else:
446 # Capture the response from put_object
447 response = self._s3_client.put_object(**kwargs)
448
449 # Extract and set x-trans-id if present
450 _extract_x_trans_id(response)
451
452 return len(body)
453
454 return self._translate_errors(_invoke_api, operation="PUT", bucket=bucket, key=key)
455
456 def _get_object(self, path: str, byte_range: Optional[Range] = None) -> bytes:
457 bucket, key = split_path(path)
458
459 def _invoke_api() -> bytes:
460 if byte_range:
461 bytes_range = f"bytes={byte_range.offset}-{byte_range.offset + byte_range.size - 1}"
462 if self._rust_client:
463 response = run_async_rust_client_method(
464 self._rust_client, "get", key, byte_range.offset, byte_range.offset + byte_range.size - 1
465 )
466 return response
467 else:
468 response = self._s3_client.get_object(Bucket=bucket, Key=key, Range=bytes_range)
469 else:
470 if self._rust_client:
471 response = run_async_rust_client_method(self._rust_client, "get", key)
472 return response
473 else:
474 response = self._s3_client.get_object(Bucket=bucket, Key=key)
475
476 # Extract and set x-trans-id if present
477 _extract_x_trans_id(response)
478
479 return response["Body"].read()
480
481 return self._translate_errors(_invoke_api, operation="GET", bucket=bucket, key=key)
482
483 def _copy_object(self, src_path: str, dest_path: str) -> int:
484 src_bucket, src_key = split_path(src_path)
485 dest_bucket, dest_key = split_path(dest_path)
486
487 src_object = self._get_object_metadata(src_path)
488
489 def _invoke_api() -> int:
490 response = self._s3_client.copy(
491 CopySource={"Bucket": src_bucket, "Key": src_key},
492 Bucket=dest_bucket,
493 Key=dest_key,
494 Config=self._transfer_config,
495 )
496
497 # Extract and set x-trans-id if present
498 _extract_x_trans_id(response)
499
500 return src_object.content_length
501
502 return self._translate_errors(_invoke_api, operation="COPY", bucket=dest_bucket, key=dest_key)
503
504 def _delete_object(self, path: str, if_match: Optional[str] = None) -> None:
505 bucket, key = split_path(path)
506
507 def _invoke_api() -> None:
508 # conditionally delete the object if if_match(etag) is provided, if not, delete the object unconditionally
509 if if_match:
510 response = self._s3_client.delete_object(Bucket=bucket, Key=key, IfMatch=if_match)
511 else:
512 response = self._s3_client.delete_object(Bucket=bucket, Key=key)
513
514 # Extract and set x-trans-id if present
515 _extract_x_trans_id(response)
516
517 return self._translate_errors(_invoke_api, operation="DELETE", bucket=bucket, key=key)
518
519 def _is_dir(self, path: str) -> bool:
520 # Ensure the path ends with '/' to mimic a directory
521 path = self._append_delimiter(path)
522
523 bucket, key = split_path(path)
524
525 def _invoke_api() -> bool:
526 # List objects with the given prefix
527 response = self._s3_client.list_objects_v2(Bucket=bucket, Prefix=key, MaxKeys=1, Delimiter="/")
528
529 # Extract and set x-trans-id if present
530 _extract_x_trans_id(response)
531
532 # Check if there are any contents or common prefixes
533 return bool(response.get("Contents", []) or response.get("CommonPrefixes", []))
534
535 return self._translate_errors(_invoke_api, operation="LIST", bucket=bucket, key=key)
536
537 def _get_object_metadata(self, path: str, strict: bool = True) -> ObjectMetadata:
538 bucket, key = split_path(path)
539 if path.endswith("/") or (bucket and not key):
540 # If path ends with "/" or empty key name is provided, then assume it's a "directory",
541 # which metadata is not guaranteed to exist for cases such as
542 # "virtual prefix" that was never explicitly created.
543 if self._is_dir(path):
544 return ObjectMetadata(
545 key=path,
546 type="directory",
547 content_length=0,
548 last_modified=AWARE_DATETIME_MIN,
549 )
550 else:
551 raise FileNotFoundError(f"Directory {path} does not exist.")
552 else:
553
554 def _invoke_api() -> ObjectMetadata:
555 response = self._s3_client.head_object(Bucket=bucket, Key=key)
556
557 # Extract and set x-trans-id if present
558 _extract_x_trans_id(response)
559
560 return ObjectMetadata(
561 key=path,
562 type="file",
563 content_length=response["ContentLength"],
564 content_type=response["ContentType"],
565 last_modified=response["LastModified"],
566 etag=response["ETag"].strip('"'),
567 storage_class=response.get("StorageClass"),
568 metadata=response.get("Metadata"),
569 )
570
571 try:
572 return self._translate_errors(_invoke_api, operation="HEAD", bucket=bucket, key=key)
573 except FileNotFoundError as error:
574 if strict:
575 # If the object does not exist on the given path, we will append a trailing slash and
576 # check if the path is a directory.
577 path = self._append_delimiter(path)
578 if self._is_dir(path):
579 return ObjectMetadata(
580 key=path,
581 type="directory",
582 content_length=0,
583 last_modified=AWARE_DATETIME_MIN,
584 )
585 raise error
586
587 def _list_objects(
588 self,
589 path: str,
590 start_after: Optional[str] = None,
591 end_at: Optional[str] = None,
592 include_directories: bool = False,
593 ) -> Iterator[ObjectMetadata]:
594 bucket, prefix = split_path(path)
595
596 def _invoke_api() -> Iterator[ObjectMetadata]:
597 paginator = self._s3_client.get_paginator("list_objects_v2")
598 if include_directories:
599 page_iterator = paginator.paginate(
600 Bucket=bucket, Prefix=prefix, Delimiter="/", StartAfter=(start_after or "")
601 )
602 else:
603 page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix, StartAfter=(start_after or ""))
604
605 for page in page_iterator:
606 for item in page.get("CommonPrefixes", []):
607 yield ObjectMetadata(
608 key=os.path.join(bucket, item["Prefix"].rstrip("/")),
609 type="directory",
610 content_length=0,
611 last_modified=AWARE_DATETIME_MIN,
612 )
613
614 # S3 guarantees lexicographical order for general purpose buckets (for
615 # normal S3) but not directory buckets (for S3 Express One Zone).
616 for response_object in page.get("Contents", []):
617 key = response_object["Key"]
618 if end_at is None or key <= end_at:
619 if key.endswith("/"):
620 if include_directories:
621 yield ObjectMetadata(
622 key=os.path.join(bucket, key.rstrip("/")),
623 type="directory",
624 content_length=0,
625 last_modified=response_object["LastModified"],
626 )
627 else:
628 yield ObjectMetadata(
629 key=os.path.join(bucket, key),
630 type="file",
631 content_length=response_object["Size"],
632 last_modified=response_object["LastModified"],
633 etag=response_object["ETag"].strip('"'),
634 storage_class=response_object.get("StorageClass"), # Pass storage_class
635 )
636 else:
637 return
638
639 return self._translate_errors(_invoke_api, operation="LIST", bucket=bucket, key=prefix)
640
641 def _upload_file(
642 self,
643 remote_path: str,
644 f: Union[str, IO],
645 attributes: Optional[dict[str, str]] = None,
646 content_type: Optional[str] = None,
647 ) -> int:
648 file_size: int = 0
649
650 if isinstance(f, str):
651 bucket, key = split_path(remote_path)
652 file_size = os.path.getsize(f)
653
654 # Upload small files
655 if file_size <= self._transfer_config.multipart_threshold:
656 if self._rust_client and not attributes and not content_type:
657 run_async_rust_client_method(self._rust_client, "upload", f, key)
658 else:
659 with open(f, "rb") as fp:
660 self._put_object(remote_path, fp.read(), attributes=attributes, content_type=content_type)
661 return file_size
662
663 # Upload large files using TransferConfig
664 def _invoke_api() -> int:
665 extra_args = {}
666 if content_type:
667 extra_args["ContentType"] = content_type
668 if self._is_directory_bucket(bucket):
669 extra_args["StorageClass"] = EXPRESS_ONEZONE_STORAGE_CLASS
670 validated_attributes = validate_attributes(attributes)
671 if validated_attributes:
672 extra_args["Metadata"] = validated_attributes
673 if self._rust_client and not extra_args:
674 response = run_async_rust_client_method(self._rust_client, "upload_multipart_from_file", f, key)
675 else:
676 response = self._s3_client.upload_file(
677 Filename=f,
678 Bucket=bucket,
679 Key=key,
680 Config=self._transfer_config,
681 ExtraArgs=extra_args,
682 )
683
684 # Extract and set x-trans-id if present
685 _extract_x_trans_id(response)
686
687 return file_size
688
689 return self._translate_errors(_invoke_api, operation="PUT", bucket=bucket, key=key)
690 else:
691 # Upload small files
692 f.seek(0, io.SEEK_END)
693 file_size = f.tell()
694 f.seek(0)
695
696 if file_size <= self._transfer_config.multipart_threshold:
697 if isinstance(f, io.StringIO):
698 self._put_object(
699 remote_path, f.read().encode("utf-8"), attributes=attributes, content_type=content_type
700 )
701 else:
702 self._put_object(remote_path, f.read(), attributes=attributes, content_type=content_type)
703 return file_size
704
705 # Upload large files using TransferConfig
706 bucket, key = split_path(remote_path)
707
708 def _invoke_api() -> int:
709 extra_args = {}
710 if content_type:
711 extra_args["ContentType"] = content_type
712 if self._is_directory_bucket(bucket):
713 extra_args["StorageClass"] = EXPRESS_ONEZONE_STORAGE_CLASS
714 validated_attributes = validate_attributes(attributes)
715 if validated_attributes:
716 extra_args["Metadata"] = validated_attributes
717 self._s3_client.upload_fileobj(
718 Fileobj=f,
719 Bucket=bucket,
720 Key=key,
721 Config=self._transfer_config,
722 ExtraArgs=extra_args,
723 )
724
725 return file_size
726
727 return self._translate_errors(_invoke_api, operation="PUT", bucket=bucket, key=key)
728
729 def _download_file(self, remote_path: str, f: Union[str, IO], metadata: Optional[ObjectMetadata] = None) -> int:
730 if metadata is None:
731 metadata = self._get_object_metadata(remote_path)
732
733 if isinstance(f, str):
734 bucket, key = split_path(remote_path)
735 if os.path.dirname(f):
736 os.makedirs(os.path.dirname(f), exist_ok=True)
737
738 # Download small files
739 if metadata.content_length <= self._transfer_config.multipart_threshold:
740 if self._rust_client:
741 run_async_rust_client_method(self._rust_client, "download", key, f)
742 else:
743 with tempfile.NamedTemporaryFile(mode="wb", delete=False, dir=os.path.dirname(f), prefix=".") as fp:
744 temp_file_path = fp.name
745 fp.write(self._get_object(remote_path))
746 os.rename(src=temp_file_path, dst=f)
747 return metadata.content_length
748
749 # Download large files using TransferConfig
750 def _invoke_api() -> int:
751 response = None
752 with tempfile.NamedTemporaryFile(mode="wb", delete=False, dir=os.path.dirname(f), prefix=".") as fp:
753 temp_file_path = fp.name
754 if self._rust_client:
755 response = run_async_rust_client_method(
756 self._rust_client, "download_multipart_to_file", key, temp_file_path
757 )
758 else:
759 response = self._s3_client.download_fileobj(
760 Bucket=bucket,
761 Key=key,
762 Fileobj=fp,
763 Config=self._transfer_config,
764 )
765
766 # Extract and set x-trans-id if present
767 _extract_x_trans_id(response)
768 os.rename(src=temp_file_path, dst=f)
769
770 return metadata.content_length
771
772 return self._translate_errors(_invoke_api, operation="GET", bucket=bucket, key=key)
773 else:
774 # Download small files
775 if metadata.content_length <= self._transfer_config.multipart_threshold:
776 response = self._get_object(remote_path)
777 # Python client returns `bytes`, but Rust client returns a object implements buffer protocol,
778 # so we need to check whether `.decode()` is available.
779 if isinstance(f, io.StringIO):
780 if hasattr(response, "decode"):
781 f.write(response.decode("utf-8"))
782 else:
783 f.write(codecs.decode(memoryview(response), "utf-8"))
784 else:
785 f.write(response)
786 return metadata.content_length
787
788 # Download large files using TransferConfig
789 bucket, key = split_path(remote_path)
790
791 def _invoke_api() -> int:
792 response = self._s3_client.download_fileobj(
793 Bucket=bucket,
794 Key=key,
795 Fileobj=f,
796 Config=self._transfer_config,
797 )
798
799 # Extract and set x-trans-id if present
800 _extract_x_trans_id(response)
801
802 return metadata.content_length
803
804 return self._translate_errors(_invoke_api, operation="GET", bucket=bucket, key=key)