1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import contextlib
18import logging
19import os
20import threading
21from collections.abc import Iterator, Sequence
22from concurrent.futures import ThreadPoolExecutor, as_completed
23from datetime import datetime, timezone
24from io import BytesIO
25from pathlib import PurePosixPath
26from typing import IO, Any, Optional, Union, cast
27
28from ..config import StorageClientConfig
29from ..constants import DEFAULT_SYNC_BATCH_SIZE, MEMORY_LOAD_LIMIT
30from ..file import ObjectFile, PosixFile
31from ..providers.posix_file import PosixFileStorageProvider
32from ..replica_manager import ReplicaManager
33from ..retry import batch_retry, retry
34from ..sync import SyncManager
35from ..types import (
36 AWARE_DATETIME_MIN,
37 MSC_PROTOCOL,
38 ExecutionMode,
39 ObjectMetadata,
40 PatternList,
41 Range,
42 Replica,
43 ResolvedPathState,
44 SignerType,
45 SourceVersionCheckMode,
46 StorageProvider,
47 SymlinkHandling,
48 SyncResult,
49)
50from ..utils import NullStorageClient, PatternMatcher, join_paths, resolve_symlink_handling
51from .types import AbstractStorageClient
52
53logger = logging.getLogger(__name__)
54
55
[docs]
56class SingleStorageClient(AbstractStorageClient):
57 """
58 Storage client for single-backend configurations.
59
60 Supports full read and write operations against a single storage provider.
61 """
62
63 _config: StorageClientConfig
64 _storage_provider: StorageProvider
65 _metadata_provider_lock: Optional[threading.Lock] = None
66 _stop_event: Optional[threading.Event] = None
67 _replica_manager: Optional[ReplicaManager] = None
68
69 def __init__(self, config: StorageClientConfig):
70 """
71 Initialize the :py:class:`SingleStorageClient` with the given configuration.
72
73 :param config: Storage client configuration with storage_provider set
74 :raises ValueError: If config has storage_provider_profiles (multi-backend)
75 """
76 self._initialize_providers(config)
77 self._initialize_replicas(config.replicas)
78
79 def _initialize_providers(self, config: StorageClientConfig) -> None:
80 if config.storage_provider_profiles:
81 raise ValueError(
82 "SingleStorageClient requires storage_provider, not storage_provider_profiles. "
83 "Use CompositeStorageClient for multi-backend configurations."
84 )
85
86 if config.storage_provider is None:
87 raise ValueError("SingleStorageClient requires storage_provider to be set.")
88
89 self._config = config
90 self._credentials_provider = self._config.credentials_provider
91 self._storage_provider = cast(StorageProvider, self._config.storage_provider)
92 self._metadata_provider = self._config.metadata_provider
93 self._cache_config = self._config.cache_config
94 self._retry_config = self._config.retry_config
95 self._cache_manager = self._config.cache_manager
96 self._autocommit_config = self._config.autocommit_config
97
98 if self._autocommit_config:
99 if self._metadata_provider:
100 logger.debug("Creating auto-commiter thread")
101
102 if self._autocommit_config.interval_minutes:
103 self._stop_event = threading.Event()
104 self._commit_thread = threading.Thread(
105 target=self._committer_thread,
106 daemon=True,
107 args=(self._autocommit_config.interval_minutes, self._stop_event),
108 )
109 self._commit_thread.start()
110
111 if self._autocommit_config.at_exit:
112 atexit.register(self._commit_on_exit)
113
114 self._metadata_provider_lock = threading.Lock()
115 else:
116 logger.debug("No metadata provider configured, auto-commit will not be enabled")
117
118 def _initialize_replicas(self, replicas: list[Replica]) -> None:
119 """Initialize replica StorageClient instances (facade)."""
120 # Import here to avoid circular dependency
121 from .client import StorageClient as StorageClientFacade
122
123 # Sort replicas by read_priority, the first one is the primary replica.
124 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority)
125
126 replica_clients = []
127 for replica in sorted_replicas:
128 if self._config._config_dict is None:
129 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config")
130 replica_config = StorageClientConfig.from_dict(
131 config_dict=self._config._config_dict, profile=replica.replica_profile
132 )
133
134 storage_client = StorageClientFacade(config=replica_config)
135 replica_clients.append(storage_client)
136
137 self._replicas = replica_clients
138 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None
139
140 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event):
141 if not stop_event:
142 raise RuntimeError("Stop event not set")
143
144 while not stop_event.is_set():
145 # Wait with the ability to exit early
146 if stop_event.wait(timeout=commit_interval_minutes * 60):
147 break
148 logger.debug("Auto-committing to metadata provider")
149 self.commit_metadata()
150
151 def _commit_on_exit(self):
152 logger.debug("Shutting down, committing metadata one last time...")
153 self.commit_metadata()
154
155 def _get_source_version(self, path: str) -> Optional[str]:
156 """
157 Get etag from metadata provider or storage provider.
158 """
159 if self._metadata_provider:
160 metadata = self._metadata_provider.get_object_metadata(path)
161 else:
162 metadata = self._storage_provider.get_object_metadata(path)
163 return metadata.etag
164
165 def _is_cache_enabled(self) -> bool:
166 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider()
167 return enabled
168
169 def _is_posix_file_storage_provider(self) -> bool:
170 """
171 :return: ``True`` if the storage client is using a POSIX file storage provider, ``False`` otherwise.
172 """
173 return isinstance(self._storage_provider, PosixFileStorageProvider)
174
175 def _is_rust_client_enabled(self) -> bool:
176 """
177 :return: ``True`` if the storage provider is using the Rust client, ``False`` otherwise.
178 """
179 return getattr(self._storage_provider, "_rust_client", None) is not None
180
181 def _read_from_replica_or_primary(self, path: str) -> bytes:
182 """
183 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files.
184 """
185 if self._replica_manager is None:
186 raise RuntimeError("Replica manager is not initialized")
187 file_obj = BytesIO()
188 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider)
189 return file_obj.getvalue()
190
191 def __del__(self):
192 if self._stop_event:
193 self._stop_event.set()
194 if self._commit_thread.is_alive():
195 self._commit_thread.join(timeout=5.0)
196
197 def __getstate__(self) -> dict[str, Any]:
198 state = self.__dict__.copy()
199 del state["_credentials_provider"]
200 del state["_storage_provider"]
201 del state["_metadata_provider"]
202 del state["_cache_manager"]
203
204 if "_metadata_provider_lock" in state:
205 del state["_metadata_provider_lock"]
206
207 if "_replicas" in state:
208 del state["_replicas"]
209
210 # Replica manager could be disabled if it's set to None in the state.
211 if "_replica_manager" in state:
212 if state["_replica_manager"] is not None:
213 del state["_replica_manager"]
214
215 return state
216
217 def __setstate__(self, state: dict[str, Any]) -> None:
218 config = state["_config"]
219 self._initialize_providers(config)
220
221 # Replica manager could be disabled if it's set to None in the state.
222 if "_replica_manager" in state and state["_replica_manager"] is None:
223 self._replica_manager = None
224 else:
225 self._initialize_replicas(config.replicas)
226
227 if self._metadata_provider:
228 self._metadata_provider_lock = threading.Lock()
229
230 @property
231 def profile(self) -> str:
232 """
233 :return: The profile name of the storage client.
234 """
235 return self._config.profile
236
[docs]
237 def is_default_profile(self) -> bool:
238 """
239 :return: ``True`` if the storage client is using the reserved POSIX profile, ``False`` otherwise.
240 """
241 return self._config.profile == "__filesystem__"
242
243 @property
244 def replicas(self) -> list[AbstractStorageClient]:
245 """
246 :return: List of replica storage clients, sorted by read priority.
247 """
248 return self._replicas
249
250 # -- Metadata resolution helpers --
251
252 def _resolve_read_path(self, logical_path: str) -> str:
253 """
254 Resolve a logical path to its physical storage path for read operations.
255
256 :param logical_path: The user-facing logical path.
257 :return: The physical storage path.
258 :raises FileNotFoundError: If the file does not exist in the metadata provider.
259 """
260 assert self._metadata_provider is not None
261 resolved = self._metadata_provider.realpath(logical_path)
262 if not resolved.exists:
263 raise FileNotFoundError(f"The file at path '{logical_path}' was not found by metadata provider.")
264 return resolved.physical_path
265
266 def _resolve_write_path(self, logical_path: str) -> str:
267 """
268 Resolve a logical path to its physical storage path for write operations.
269
270 Checks overwrite policy and generates the physical path via the metadata provider.
271
272 :param logical_path: The user-facing logical path.
273 :return: The physical storage path to write to.
274 :raises FileExistsError: If the file exists and overwrites are not allowed.
275 """
276 assert self._metadata_provider is not None
277 resolved = self._metadata_provider.realpath(logical_path)
278 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED):
279 if not self._metadata_provider.allow_overwrites():
280 raise FileExistsError(
281 f"The file at path '{logical_path}' already exists; "
282 f"overwriting is not allowed when using a metadata provider."
283 )
284 return self._metadata_provider.generate_physical_path(logical_path, for_overwrite=True).physical_path
285 return self._metadata_provider.generate_physical_path(logical_path, for_overwrite=False).physical_path
286
287 def _register_written_file(
288 self, virtual_path: str, physical_path: str, attributes: Optional[dict[str, Any]] = None
289 ) -> None:
290 """
291 Register a written file with the metadata provider.
292
293 Fetches metadata from the storage provider, optionally merges custom attributes,
294 and registers the file with the metadata provider.
295
296 Protects metadata provider mutation with ``_metadata_provider_lock`` when configured.
297
298 .. note::
299 TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
300 """
301 assert self._metadata_provider is not None
302 obj_metadata = self._storage_provider.get_object_metadata(physical_path)
303 if attributes:
304 obj_metadata.metadata = (obj_metadata.metadata or {}) | attributes
305 with self._metadata_provider_lock or contextlib.nullcontext():
306 self._metadata_provider.add_file(virtual_path, obj_metadata)
307
308 def _register_written_files(
309 self,
310 virtual_paths: Sequence[str],
311 physical_paths: Sequence[str],
312 attributes: Optional[Sequence[Optional[dict[str, Any]]]] = None,
313 max_workers: int = 16,
314 ) -> None:
315 """Register multiple written files with the metadata provider concurrently."""
316 if not virtual_paths:
317 return
318
319 def _register_one(index: int, virtual_path: str, physical_path: str) -> None:
320 file_attrs = attributes[index] if attributes is not None else None
321 self._register_written_file(virtual_path, physical_path, file_attrs)
322
323 worker_count = max(1, min(len(virtual_paths), max_workers))
324 with ThreadPoolExecutor(max_workers=worker_count) as executor:
325 future_to_virtual_path = {
326 executor.submit(_register_one, i, virtual_path, physical_path): virtual_path
327 for i, (virtual_path, physical_path) in enumerate(zip(virtual_paths, physical_paths))
328 }
329 for future in as_completed(future_to_virtual_path):
330 future.result()
331
332 @batch_retry(operation_name="download_files")
333 def _download_files_batch(
334 self,
335 indices: list[int],
336 remote_paths: Sequence[str],
337 local_paths: list[str],
338 metadata: Optional[Sequence[Optional[ObjectMetadata]]],
339 max_workers: int,
340 ) -> None:
341 """
342 Execute one provider batch download attempt for the selected item indices.
343
344 The ``@batch_retry`` decorator retries this method with only failed
345 indices when the provider reports item-level retryable failures.
346
347 :param indices: Original batch indices to include in this attempt.
348 :param remote_paths: Remote paths for the full original batch.
349 :param local_paths: Local destination paths for the full original batch.
350 :param metadata: Optional per-file metadata for the full original batch.
351 :param max_workers: Maximum provider workers for this attempt.
352 """
353 provider_remote_paths = [remote_paths[index] for index in indices]
354 provider_local_paths = [local_paths[index] for index in indices]
355 provider_metadata = [metadata[index] for index in indices] if metadata is not None else None
356
357 self._storage_provider.download_files(
358 provider_remote_paths,
359 provider_local_paths,
360 provider_metadata,
361 max_workers,
362 )
363
364 @batch_retry(operation_name="upload_files")
365 def _upload_files_batch(
366 self,
367 indices: list[int],
368 local_paths: list[str],
369 remote_paths: Sequence[str],
370 attributes: Optional[Sequence[Optional[dict[str, Any]]]],
371 max_workers: int,
372 ) -> None:
373 """
374 Execute one provider batch upload attempt for the selected item indices.
375
376 The ``@batch_retry`` decorator retries this method with only failed
377 indices when the provider reports item-level retryable failures.
378
379 :param indices: Original batch indices to include in this attempt.
380 :param local_paths: Local source paths for the full original batch.
381 :param remote_paths: Provider destination paths for the full original batch.
382 :param attributes: Optional per-file attributes for the full original batch.
383 :param max_workers: Maximum provider workers for this attempt.
384 """
385 provider_local_paths = [local_paths[index] for index in indices]
386 provider_remote_paths = [remote_paths[index] for index in indices]
387 provider_attributes = [attributes[index] for index in indices] if attributes is not None else None
388
389 self._storage_provider.upload_files(
390 provider_local_paths,
391 provider_remote_paths,
392 provider_attributes,
393 max_workers,
394 )
395
[docs]
396 @retry
397 def read(
398 self,
399 path: str,
400 byte_range: Optional[Range] = None,
401 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
402 ) -> bytes:
403 """
404 Read bytes from a file at the specified logical path.
405
406 :param path: The logical path of the object to read.
407 :param byte_range: Optional byte range to read (offset and length).
408 :param check_source_version: Whether to check the source version of cached objects.
409 :return: The content of the object as bytes.
410 :raises FileNotFoundError: If the file at the specified path does not exist.
411 """
412 if self._metadata_provider:
413 path = self._resolve_read_path(path)
414
415 # Handle caching logic
416 if self._is_cache_enabled() and self._cache_manager:
417 if byte_range:
418 # Range request with cache
419 try:
420 # Fetch metadata for source version checking (if needed)
421 metadata = None
422 source_version = None
423 if check_source_version == SourceVersionCheckMode.ENABLE:
424 metadata = self._storage_provider.get_object_metadata(path)
425 source_version = metadata.etag
426 elif check_source_version == SourceVersionCheckMode.INHERIT:
427 if self._cache_manager.check_source_version():
428 metadata = self._storage_provider.get_object_metadata(path)
429 source_version = metadata.etag
430
431 # Optimization: For full-file reads (offset=0, size >= file_size), cache whole file instead of chunking
432 # This avoids creating many small chunks when the user requests the entire file.
433 # Only apply this optimization when metadata is already available (i.e., when version checking is enabled),
434 # to respect the user's choice to disable version checking and avoid extra HEAD requests.
435 if byte_range.offset == 0 and metadata and byte_range.size >= metadata.content_length:
436 full_file_data = self._storage_provider.get_object(path)
437 self._cache_manager.set(path, full_file_data, source_version)
438 return full_file_data[: metadata.content_length]
439
440 # Use chunk-based caching for partial reads or when optimization doesn't apply
441 data = self._cache_manager.read(
442 key=path,
443 source_version=source_version,
444 byte_range=byte_range,
445 storage_provider=self._storage_provider,
446 source_size=metadata.content_length if metadata else None,
447 )
448 if data is not None:
449 return data
450 # Fallback (should not normally happen)
451 return self._storage_provider.get_object(path, byte_range=byte_range)
452 except (FileNotFoundError, Exception):
453 # Fall back to direct read if metadata fetching fails
454 return self._storage_provider.get_object(path, byte_range=byte_range)
455 else:
456 # Full file read with cache
457 # Only fetch source version if check_source_version is enabled
458 source_version = None
459 if check_source_version == SourceVersionCheckMode.ENABLE:
460 source_version = self._get_source_version(path)
461 elif check_source_version == SourceVersionCheckMode.INHERIT:
462 if self._cache_manager.check_source_version():
463 source_version = self._get_source_version(path)
464
465 data = self._cache_manager.read(path, source_version)
466 if data is None:
467 if self._replica_manager:
468 data = self._read_from_replica_or_primary(path)
469 else:
470 data = self._storage_provider.get_object(path)
471 self._cache_manager.set(path, data, source_version)
472 return data
473 elif self._replica_manager:
474 # No cache, but replicas available
475 return self._read_from_replica_or_primary(path)
476 else:
477 # No cache, no replicas - direct storage provider read
478 return self._storage_provider.get_object(path, byte_range=byte_range)
479
[docs]
480 def info(self, path: str, strict: bool = True) -> ObjectMetadata:
481 """
482 Get metadata for a file at the specified path.
483
484 :param path: The logical path of the object.
485 :param strict: When ``True``, only return committed metadata. When ``False``, include pending changes.
486 :return: ObjectMetadata containing file information (size, last modified, etc.).
487 :raises FileNotFoundError: If the file at the specified path does not exist.
488 """
489 if not path or path == ".": # empty path or '.' provided by the user
490 if self._is_posix_file_storage_provider():
491 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc)
492 else:
493 last_modified = AWARE_DATETIME_MIN
494 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified)
495
496 if not self._metadata_provider:
497 return self._storage_provider.get_object_metadata(path, strict=strict)
498
499 return self._metadata_provider.get_object_metadata(path, include_pending=not strict)
500
[docs]
501 @retry
502 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None:
503 """
504 Download a remote file to a local path or file-like object.
505
506 :param remote_path: The logical path of the remote file to download.
507 :param local_path: The local file path or file-like object to write to.
508 :raises FileNotFoundError: If the remote file does not exist.
509 """
510 if self._metadata_provider:
511 physical_path = self._resolve_read_path(remote_path)
512 metadata = self._metadata_provider.get_object_metadata(remote_path)
513 self._storage_provider.download_file(physical_path, local_path, metadata)
514 elif self._replica_manager:
515 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider)
516 else:
517 self._storage_provider.download_file(remote_path, local_path)
518
[docs]
519 def download_files(
520 self,
521 remote_paths: list[str],
522 local_paths: list[str],
523 metadata: Optional[Sequence[Optional[ObjectMetadata]]] = None,
524 max_workers: int = 16,
525 ) -> None:
526 """
527 Download multiple remote files to local paths.
528
529 :param remote_paths: List of logical paths of remote files to download.
530 :param local_paths: List of local file paths to save the downloaded files to.
531 :param metadata: Optional per-file metadata used to decide between regular and multipart download.
532 :param max_workers: Maximum number of concurrent download workers (default: 16).
533 :raises ValueError: If remote_paths and local_paths have different lengths.
534 :raises FileNotFoundError: If any remote file does not exist.
535 """
536 if len(remote_paths) != len(local_paths):
537 raise ValueError("remote_paths and local_paths must have the same length")
538
539 if metadata is not None and len(metadata) != len(remote_paths):
540 raise ValueError("metadata must have the same length as remote_paths and local_paths")
541
542 if self._metadata_provider:
543 physical_paths = [self._resolve_read_path(rp) for rp in remote_paths]
544 self._download_files_batch(
545 list(range(len(remote_paths))), physical_paths, local_paths, metadata, max_workers
546 )
547 elif self._replica_manager:
548 for remote_path, local_path in zip(remote_paths, local_paths):
549 self.download_file(remote_path, local_path)
550 else:
551 self._download_files_batch(list(range(len(remote_paths))), remote_paths, local_paths, metadata, max_workers)
552
[docs]
553 @retry
554 def upload_file(
555 self, remote_path: str, local_path: Union[str, IO], attributes: Optional[dict[str, Any]] = None
556 ) -> None:
557 """
558 Uploads a file from the local file system to the storage provider.
559
560 :param remote_path: The path where the object will be stored.
561 :param local_path: The source file to upload. This can either be a string representing the local
562 file path, or a file-like object (e.g., an open file handle).
563 :param attributes: The attributes to add to the file if a new file is created.
564 """
565 virtual_path = remote_path
566 if self._metadata_provider:
567 physical_path = self._resolve_write_path(remote_path)
568 # Attributes belong to logical metadata, so physical writes get none and registration stores them.
569 self._storage_provider.upload_file(physical_path, local_path, attributes=None)
570 self._register_written_file(virtual_path, physical_path, attributes)
571 else:
572 self._storage_provider.upload_file(remote_path, local_path, attributes)
573
[docs]
574 def upload_files(
575 self,
576 remote_paths: list[str],
577 local_paths: list[str],
578 attributes: Optional[Sequence[Optional[dict[str, Any]]]] = None,
579 max_workers: int = 16,
580 ) -> None:
581 """
582 Upload multiple local files to remote storage.
583
584 :param remote_paths: List of logical paths where the files will be uploaded.
585 :param local_paths: List of local file paths to upload.
586 :param attributes: Optional list of per-file attributes to add. When provided, must have the same length
587 as remote_paths/local_paths. Each element may be ``None`` for files that need no attributes.
588 :param max_workers: Maximum number of concurrent upload workers (default: 16).
589 :raises ValueError: If remote_paths and local_paths have different lengths.
590 :raises ValueError: If attributes is provided and has a different length than remote_paths.
591 """
592 if len(remote_paths) != len(local_paths):
593 raise ValueError("remote_paths and local_paths must have the same length")
594
595 if attributes is not None and len(attributes) != len(remote_paths):
596 raise ValueError("attributes must have the same length as remote_paths and local_paths")
597
598 if self._metadata_provider:
599 physical_paths = [self._resolve_write_path(rp) for rp in remote_paths]
600
601 # Attributes belong to logical metadata, so physical writes get none and registration stores them.
602 self._upload_files_batch(
603 list(range(len(remote_paths))),
604 local_paths,
605 physical_paths,
606 None,
607 max_workers,
608 )
609
610 self._register_written_files(remote_paths, physical_paths, attributes, max_workers)
611 else:
612 self._upload_files_batch(list(range(len(remote_paths))), local_paths, remote_paths, attributes, max_workers)
613
[docs]
614 @retry
615 def write(self, path: str, body: bytes, attributes: Optional[dict[str, Any]] = None) -> None:
616 """
617 Write bytes to a file at the specified path.
618
619 :param path: The logical path where the object will be written.
620 :param body: The content to write as bytes.
621 :param attributes: Optional attributes to add to the file.
622 """
623 virtual_path = path
624 if self._metadata_provider:
625 physical_path = self._resolve_write_path(path)
626 # Attributes belong to logical metadata, so physical writes get none and registration stores them.
627 self._storage_provider.put_object(physical_path, body, attributes=None)
628 self._register_written_file(virtual_path, physical_path, attributes)
629 else:
630 self._storage_provider.put_object(path, body, attributes=attributes)
631
[docs]
632 def copy(self, src_path: str, dest_path: str) -> None:
633 """
634 Copy a file from source path to destination path.
635
636 :param src_path: The logical path of the source object.
637 :param dest_path: The logical path where the object will be copied to.
638 :raises FileNotFoundError: If the source file does not exist.
639 """
640 virtual_dest_path = dest_path
641 if self._metadata_provider:
642 src_path = self._resolve_read_path(src_path)
643 dest_path = self._resolve_write_path(dest_path)
644 self._storage_provider.copy_object(src_path, dest_path)
645 self._register_written_file(virtual_dest_path, dest_path)
646 else:
647 self._storage_provider.copy_object(src_path, dest_path)
648
[docs]
649 def make_symlink(self, path: str, target: str) -> None:
650 """
651 Creates a symbolic link at ``path`` pointing to ``target``.
652
653 :param path: The logical path where the symlink will be created.
654 :param target: The logical key that the symlink points to.
655 """
656 if self._metadata_provider:
657 try:
658 self._metadata_provider.get_object_metadata(path)
659 if not self._metadata_provider.allow_overwrites():
660 raise FileExistsError(
661 f"The file at path '{path}' already exists; "
662 f"overwriting is not allowed when using a metadata provider."
663 )
664 except FileNotFoundError:
665 pass
666 obj_metadata = ObjectMetadata(
667 key=path,
668 content_length=0,
669 last_modified=datetime.now(tz=timezone.utc),
670 symlink_target=ObjectMetadata.encode_symlink_target(path, target),
671 )
672 with self._metadata_provider_lock or contextlib.nullcontext():
673 self._metadata_provider.add_file(path, obj_metadata)
674 else:
675 self._storage_provider.make_symlink(path, target)
676
[docs]
677 def delete(self, path: str, recursive: bool = False) -> None:
678 """
679 Deletes an object at the specified path.
680
681 :param path: The logical path of the object or directory to delete.
682 :param recursive: Whether to delete objects in the path recursively.
683 """
684 obj_metadata = self.info(path)
685 is_dir = obj_metadata and obj_metadata.type == "directory"
686 is_file = obj_metadata and obj_metadata.type == "file"
687 if recursive and is_dir:
688 self.sync_from(
689 cast(AbstractStorageClient, NullStorageClient()),
690 path,
691 path,
692 delete_unmatched_files=True,
693 num_worker_processes=1,
694 description="Deleting",
695 )
696 # If this is a posix storage provider, we need to also delete remaining directory stubs.
697 # TODO: Notify metadata provider of the changes.
698 if self._is_posix_file_storage_provider():
699 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider)
700 posix_storage_provider.rmtree(path)
701 return
702 else:
703 # 1) If path is a file: delete the file
704 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag
705 if is_file:
706 virtual_path = path
707 if self._metadata_provider:
708 resolved = self._metadata_provider.realpath(path)
709 if not resolved.exists:
710 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.")
711
712 # Check if soft-delete is enabled
713 if not self._metadata_provider.should_use_soft_delete():
714 # Hard delete: remove both physical file and metadata
715 self._storage_provider.delete_object(resolved.physical_path)
716
717 with self._metadata_provider_lock or contextlib.nullcontext():
718 self._metadata_provider.remove_file(virtual_path)
719 else:
720 self._storage_provider.delete_object(path)
721
722 # Delete the cached file if it exists
723 if self._is_cache_enabled():
724 if self._cache_manager is None:
725 raise RuntimeError("Cache manager is not initialized")
726 self._cache_manager.delete(virtual_path)
727
728 # Delete from replicas if replica manager exists
729 if self._replica_manager:
730 self._replica_manager.delete_from_replicas(virtual_path)
731 elif is_dir:
732 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.")
733 else:
734 raise FileNotFoundError(f"The file at '{path}' was not found.")
735
[docs]
736 def delete_many(self, paths: list[str]) -> None:
737 """
738 Delete multiple files at the specified paths. Only files are supported; directories are not deleted.
739
740 :param paths: List of logical paths of the files to delete.
741 """
742 physical_paths_to_delete: list[str] = []
743 for path in paths:
744 if self._metadata_provider:
745 resolved = self._metadata_provider.realpath(path)
746 if not resolved.exists:
747 raise FileNotFoundError(f"The file at path '{path}' was not found.")
748 if not self._metadata_provider.should_use_soft_delete():
749 physical_paths_to_delete.append(resolved.physical_path)
750 else:
751 physical_paths_to_delete.append(path)
752
753 if physical_paths_to_delete:
754 self._storage_provider.delete_objects(physical_paths_to_delete)
755
756 for path in paths:
757 virtual_path = path
758 if self._metadata_provider:
759 with self._metadata_provider_lock or contextlib.nullcontext():
760 self._metadata_provider.remove_file(virtual_path)
761 if self._is_cache_enabled():
762 if self._cache_manager is None:
763 raise RuntimeError("Cache manager is not initialized")
764 self._cache_manager.delete(virtual_path)
765 if self._replica_manager:
766 self._replica_manager.delete_from_replicas(virtual_path)
767
[docs]
768 def glob(
769 self,
770 pattern: str,
771 include_url_prefix: bool = False,
772 attribute_filter_expression: Optional[str] = None,
773 ) -> list[str]:
774 """
775 Matches and retrieves a list of object keys in the storage provider that match the specified pattern.
776
777 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``).
778 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
779 :param attribute_filter_expression: The attribute filter expression to apply to the result.
780 :return: A list of object paths that match the specified pattern.
781 """
782 if self._metadata_provider:
783 results = self._metadata_provider.glob(pattern, attribute_filter_expression)
784 else:
785 results = self._storage_provider.glob(pattern, attribute_filter_expression)
786
787 if include_url_prefix:
788 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results]
789
790 return results
791
792 def _resolve_single_file(
793 self,
794 path: str,
795 start_after: Optional[str],
796 end_at: Optional[str],
797 include_url_prefix: bool,
798 pattern_matcher: Optional[PatternMatcher],
799 ) -> tuple[Optional[ObjectMetadata], Optional[str]]:
800 """
801 Resolve whether ``path`` should be handled as a single-file listing result.
802
803 :param path: Candidate file path or directory prefix to resolve.
804 :param start_after: Exclusive lower bound for file key filtering.
805 :param end_at: Inclusive upper bound for file key filtering.
806 :param include_url_prefix: Whether to prefix returned keys with ``msc://profile``.
807 :param pattern_matcher: Optional include/exclude matcher for file keys.
808 :return: A tuple of ``(single_file, normalized_path)``. Returns file metadata and
809 the original path when ``path`` resolves to a file that passes filters;
810 returns ``(None, normalized_directory_path)`` when the caller should
811 continue with directory listing; returns ``(None, None)`` when filtering
812 excludes the single-file candidate and listing should stop.
813 """
814 if not path:
815 return None, path
816
817 if self.is_file(path):
818 if pattern_matcher and not pattern_matcher.should_include_file(path):
819 return None, None
820
821 try:
822 object_metadata = self.info(path)
823 if start_after and object_metadata.key <= start_after:
824 return None, None
825 if end_at and object_metadata.key > end_at:
826 return None, None
827 if include_url_prefix:
828 self._prepend_url_prefix(object_metadata)
829 return object_metadata, path
830 except FileNotFoundError:
831 return None, path.rstrip("/") + "/"
832 else:
833 return None, path.rstrip("/") + "/"
834
835 def _prepend_url_prefix(self, obj: ObjectMetadata) -> None:
836 if self.is_default_profile():
837 obj.key = str(PurePosixPath("/") / obj.key)
838 else:
839 obj.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", obj.key)
840
841 def _filter_and_decorate(
842 self,
843 objects: Iterator[ObjectMetadata],
844 include_url_prefix: bool,
845 pattern_matcher: Optional[PatternMatcher],
846 ) -> Iterator[ObjectMetadata]:
847 for obj in objects:
848 if pattern_matcher and not pattern_matcher.should_include_file(obj.key):
849 continue
850 if include_url_prefix:
851 self._prepend_url_prefix(obj)
852 yield obj
853
[docs]
854 def list_recursive(
855 self,
856 path: str = "",
857 start_after: Optional[str] = None,
858 end_at: Optional[str] = None,
859 max_workers: int = 32,
860 look_ahead: int = 2,
861 include_url_prefix: bool = False,
862 follow_symlinks: Optional[bool] = None,
863 patterns: Optional[PatternList] = None,
864 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW,
865 ) -> Iterator[ObjectMetadata]:
866 """
867 List files recursively in the storage provider under the specified path.
868
869 :param path: The directory or file path to list objects under. This should be a
870 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
871 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
872 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
873 :param max_workers: Maximum concurrent workers for provider-level recursive listing.
874 :param look_ahead: Prefixes to buffer per worker for provider-level recursive listing.
875 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
876 :param follow_symlinks: **Deprecated.** Use ``symlink_handling`` instead.
877 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
878 :param symlink_handling: How to handle symbolic links during listing. Only applicable for POSIX file storage providers.
879 :return: An iterator over ObjectMetadata for matching files.
880 """
881 symlink_handling = resolve_symlink_handling(follow_symlinks, symlink_handling)
882
883 pattern_matcher = PatternMatcher(patterns) if patterns else None
884
885 single_file, effective_path = self._resolve_single_file(
886 path, start_after, end_at, include_url_prefix, pattern_matcher
887 )
888 if single_file is not None:
889 yield single_file
890 return
891 if effective_path is None:
892 return
893
894 if self._metadata_provider:
895 objects = self._metadata_provider.list_objects(
896 effective_path,
897 start_after=start_after,
898 end_at=end_at,
899 include_directories=False,
900 )
901 else:
902 objects = self._storage_provider.list_objects_recursive(
903 effective_path,
904 start_after=start_after,
905 end_at=end_at,
906 max_workers=max_workers,
907 look_ahead=look_ahead,
908 symlink_handling=symlink_handling,
909 )
910
911 yield from self._filter_and_decorate(objects, include_url_prefix, pattern_matcher)
912
[docs]
913 def open(
914 self,
915 path: str,
916 mode: str = "rb",
917 buffering: int = -1,
918 encoding: Optional[str] = None,
919 disable_read_cache: bool = False,
920 memory_load_limit: int = MEMORY_LOAD_LIMIT,
921 atomic: bool = True,
922 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
923 attributes: Optional[dict[str, Any]] = None,
924 prefetch_file: Optional[bool] = None,
925 ) -> Union[PosixFile, ObjectFile]:
926 """
927 Open a file for reading or writing.
928
929 :param path: The logical path of the object to open.
930 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab".
931 :param buffering: The buffering mode. Only applies to PosixFile.
932 :param encoding: The encoding to use for text files.
933 :param disable_read_cache: When set to ``True``, disables caching for file content.
934 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
935 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB.
936 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB.
937 :param atomic: When set to ``True``, file will be written atomically (rename upon close).
938 This parameter is only applicable to PosixFile in write mode.
939 :param check_source_version: Whether to check the source version of cached objects.
940 :param attributes: Attributes to add to the file.
941 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None.
942 :param prefetch_file: Whether to prefetch the file content.
943 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
944 If None, inherits from cache configuration.
945 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
946 :raises FileNotFoundError: If the file does not exist (read mode).
947 """
948 if self._is_posix_file_storage_provider():
949 return PosixFile(
950 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes
951 )
952 else:
953 if atomic is False:
954 logger.warning("Non-atomic writes are not supported for object storage providers.")
955
956 return ObjectFile(
957 self,
958 remote_path=path,
959 mode=mode,
960 encoding=encoding,
961 disable_read_cache=disable_read_cache,
962 memory_load_limit=memory_load_limit,
963 check_source_version=check_source_version,
964 attributes=attributes,
965 prefetch_file=prefetch_file,
966 )
967
[docs]
968 def get_posix_path(self, path: str) -> Optional[str]:
969 """
970 Returns the physical POSIX filesystem path for POSIX storage providers.
971
972 :param path: The path to resolve (may be a symlink or virtual path).
973 :return: Physical POSIX filesystem path if POSIX storage, None otherwise.
974 """
975 if not self._is_posix_file_storage_provider():
976 return None
977
978 if self._metadata_provider:
979 resolved = self._metadata_provider.realpath(path)
980 realpath = resolved.physical_path
981 else:
982 realpath = path
983
984 return cast(PosixFileStorageProvider, self._storage_provider)._prepend_base_path(realpath)
985
[docs]
986 def is_file(self, path: str) -> bool:
987 """
988 Checks whether the specified path points to a file (rather than a folder or directory).
989
990 :param path: The logical path to check.
991 :return: ``True`` if the key points to a file, ``False`` otherwise.
992 """
993 if self._metadata_provider:
994 resolved = self._metadata_provider.realpath(path)
995 return resolved.exists
996
997 return self._storage_provider.is_file(path)
998
1018
[docs]
1019 def is_empty(self, path: str) -> bool:
1020 """
1021 Check whether the specified path is empty. A path is considered empty if there are no
1022 objects whose keys start with the given path as a prefix.
1023
1024 :param path: The logical path to check (typically a directory or folder prefix).
1025 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
1026 """
1027 if self._metadata_provider:
1028 objects = self._metadata_provider.list_objects(path)
1029 else:
1030 objects = self._storage_provider.list_objects(path)
1031
1032 try:
1033 return next(objects) is None
1034 except StopIteration:
1035 pass
1036
1037 return True
1038
[docs]
1039 def sync_from(
1040 self,
1041 source_client: AbstractStorageClient,
1042 source_path: str = "",
1043 target_path: str = "",
1044 delete_unmatched_files: bool = False,
1045 description: str = "Syncing",
1046 num_worker_processes: Optional[int] = None,
1047 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
1048 patterns: Optional[PatternList] = None,
1049 preserve_source_attributes: bool = False,
1050 follow_symlinks: Optional[bool] = None,
1051 source_files: Optional[list[str]] = None,
1052 ignore_hidden: bool = True,
1053 commit_metadata: bool = True,
1054 dryrun: bool = False,
1055 dryrun_output_path: Optional[str] = None,
1056 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW,
1057 ) -> SyncResult:
1058 """
1059 Syncs files from the source storage client to "path/".
1060
1061 :param source_client: The source storage client.
1062 :param source_path: The logical path to sync from.
1063 :param target_path: The logical path to sync to.
1064 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
1065 :param description: Description of sync process for logging purposes.
1066 :param num_worker_processes: The number of worker processes to use.
1067 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
1068 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
1069 Cannot be used together with source_files.
1070 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization.
1071 When ``False`` (default), only file content is copied. When ``True``, custom metadata attributes are also preserved.
1072
1073 .. warning::
1074 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD
1075 request for each object to retrieve attributes, which can significantly impact performance on large-scale
1076 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile.
1077
1078 :param follow_symlinks: **Deprecated.** Use ``symlink_handling`` instead. If the source StorageClient is PosixFile,
1079 whether to follow symbolic links. ``True`` maps to :py:attr:`SymlinkHandling.FOLLOW`; ``False`` maps to
1080 :py:attr:`SymlinkHandling.SKIP`. Cannot be combined with a non-default ``symlink_handling``.
1081 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these
1082 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns.
1083 :param ignore_hidden: Whether to ignore hidden files and directories. Default is ``True``.
1084 :param commit_metadata: When ``True`` (default), calls :py:meth:`StorageClient.commit_metadata` after sync completes.
1085 Set to ``False`` to skip the commit, allowing batching of multiple sync operations before committing manually.
1086 :param dryrun: If ``True``, only enumerate and compare objects without performing any copy/delete operations.
1087 The returned :py:class:`SyncResult` will include a :py:class:`DryrunResult` with paths to JSONL files.
1088 :param dryrun_output_path: Directory to write dryrun JSONL files into. If ``None`` (default), a temporary
1089 directory is created automatically. Ignored when ``dryrun`` is ``False``.
1090 :param symlink_handling: How to handle symbolic links during sync.
1091 :py:attr:`SymlinkHandling.FOLLOW` (default) dereferences symlinks and copies the target's bytes.
1092 :py:attr:`SymlinkHandling.SKIP` excludes symlinks from the sync.
1093 :py:attr:`SymlinkHandling.PRESERVE` recreates symlinks on the target via :py:meth:`make_symlink`
1094 instead of copying bytes (required for round-trip preservation of symlinks).
1095 :raises ValueError: If both source_files and patterns are provided, or if ``follow_symlinks`` is combined
1096 with a non-default ``symlink_handling``.
1097 :raises RuntimeError: If errors occur during sync operations. The sync will stop on first error (fail-fast).
1098 """
1099 if source_files and patterns:
1100 raise ValueError("Cannot specify both 'source_files' and 'patterns'. Please use only one filtering method.")
1101
1102 symlink_handling = resolve_symlink_handling(follow_symlinks, symlink_handling)
1103
1104 pattern_matcher = PatternMatcher(patterns) if patterns else None
1105
1106 # Disable the replica manager during sync
1107 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager:
1108 # Import here to avoid circular dependency
1109 from .client import StorageClient as StorageClientFacade
1110
1111 source_client = StorageClientFacade(source_client._config)
1112 source_client._replica_manager = None
1113
1114 m = SyncManager(source_client, source_path, self, target_path)
1115 batch_size = int(os.environ.get("MSC_SYNC_BATCH_SIZE", DEFAULT_SYNC_BATCH_SIZE))
1116
1117 return m.sync_objects(
1118 execution_mode=execution_mode,
1119 description=description,
1120 num_worker_processes=num_worker_processes,
1121 delete_unmatched_files=delete_unmatched_files,
1122 pattern_matcher=pattern_matcher,
1123 preserve_source_attributes=preserve_source_attributes,
1124 symlink_handling=symlink_handling,
1125 source_files=source_files,
1126 ignore_hidden=ignore_hidden,
1127 commit_metadata=commit_metadata,
1128 batch_size=batch_size,
1129 dryrun=dryrun,
1130 dryrun_output_path=dryrun_output_path,
1131 )
1132
[docs]
1133 def sync_replicas(
1134 self,
1135 source_path: str,
1136 replica_indices: Optional[list[int]] = None,
1137 delete_unmatched_files: bool = False,
1138 description: str = "Syncing replica",
1139 num_worker_processes: Optional[int] = None,
1140 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
1141 patterns: Optional[PatternList] = None,
1142 ignore_hidden: bool = True,
1143 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW,
1144 ) -> None:
1145 """
1146 Sync files from this client to its replica storage clients.
1147
1148 :param source_path: The logical path to sync from.
1149 :param replica_indices: Specific replica indices to sync to (0-indexed). If None, syncs to all replicas.
1150 :param delete_unmatched_files: When set to ``True``, delete files in replicas that don't exist in source.
1151 :param description: Description of sync process for logging purposes.
1152 :param num_worker_processes: Number of worker processes for parallel sync.
1153 :param execution_mode: Execution mode (LOCAL or REMOTE).
1154 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
1155 :param ignore_hidden: When set to ``True``, ignore hidden files (starting with '.'). Defaults to ``True``.
1156 :param symlink_handling: How to handle symbolic links during sync.
1157 :py:attr:`SymlinkHandling.FOLLOW` (default) dereferences symlinks and copies the target's bytes.
1158 :py:attr:`SymlinkHandling.SKIP` excludes symlinks from the sync.
1159 :py:attr:`SymlinkHandling.PRESERVE` recreates symlinks on each replica via
1160 :py:meth:`make_symlink` instead of copying bytes.
1161 """
1162 if not self._replicas:
1163 logger.warning(
1164 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable "
1165 "secondary storage locations for redundancy and performance.",
1166 self._config.profile,
1167 )
1168 return None
1169
1170 if replica_indices:
1171 try:
1172 replicas = [self._replicas[i] for i in replica_indices]
1173 except IndexError as e:
1174 raise ValueError(f"Replica index out of range: {replica_indices}") from e
1175 else:
1176 replicas = self._replicas
1177
1178 # Disable the replica manager during sync
1179 if self._replica_manager:
1180 # Import here to avoid circular dependency
1181 from .client import StorageClient as StorageClientFacade
1182
1183 source_client = StorageClientFacade(self._config)
1184 source_client._replica_manager = None
1185 else:
1186 source_client = self
1187
1188 for replica in replicas:
1189 replica.sync_from(
1190 source_client,
1191 source_path,
1192 source_path,
1193 delete_unmatched_files=delete_unmatched_files,
1194 description=f"{description} ({replica.profile})",
1195 num_worker_processes=num_worker_processes,
1196 execution_mode=execution_mode,
1197 patterns=patterns,
1198 ignore_hidden=ignore_hidden,
1199 symlink_handling=symlink_handling,
1200 )
1201
[docs]
1202 def list(
1203 self,
1204 prefix: str = "",
1205 path: str = "",
1206 start_after: Optional[str] = None,
1207 end_at: Optional[str] = None,
1208 include_directories: bool = False,
1209 include_url_prefix: bool = False,
1210 attribute_filter_expression: Optional[str] = None,
1211 show_attributes: bool = False,
1212 follow_symlinks: Optional[bool] = None,
1213 patterns: Optional[PatternList] = None,
1214 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW,
1215 ) -> Iterator[ObjectMetadata]:
1216 """
1217 List objects in the storage provider under the specified path.
1218
1219 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is
1220 deprecated and will be removed in a future version.
1221
1222 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under.
1223 :param path: The directory or file path to list objects under. This should be a
1224 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
1225 Cannot be used together with ``prefix``.
1226 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
1227 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
1228 :param include_directories: Whether to include directories in the result. When ``True``, directories are returned alongside objects.
1229 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
1230 :param attribute_filter_expression: The attribute filter expression to apply to the result.
1231 :param show_attributes: Whether to return attributes in the result. WARNING: Depending on implementation, there may be a performance impact if this is set to ``True``.
1232 :param follow_symlinks: **Deprecated.** Use ``symlink_handling`` instead.
1233 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
1234 :param symlink_handling: How to handle symbolic links during listing. Only applicable for POSIX file storage providers.
1235 :return: An iterator over ObjectMetadata for matching objects.
1236 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty).
1237 """
1238 symlink_handling = resolve_symlink_handling(follow_symlinks, symlink_handling)
1239
1240 # Parameter validation - either path or prefix, not both
1241 if path and prefix:
1242 raise ValueError(
1243 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). "
1244 f"Please use only the 'path' parameter for new code. "
1245 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
1246 )
1247 elif prefix:
1248 logger.debug(
1249 f"The 'prefix' parameter is deprecated and will be removed in a future version. "
1250 f"Please use the 'path' parameter instead. "
1251 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
1252 )
1253
1254 pattern_matcher = PatternMatcher(patterns) if patterns else None
1255 effective_path = path if path else prefix
1256
1257 single_file, effective_path = self._resolve_single_file(
1258 effective_path, start_after, end_at, include_url_prefix, pattern_matcher
1259 )
1260 if single_file is not None:
1261 yield single_file
1262 return
1263 if effective_path is None:
1264 return
1265
1266 if self._metadata_provider:
1267 objects = self._metadata_provider.list_objects(
1268 effective_path,
1269 start_after=start_after,
1270 end_at=end_at,
1271 include_directories=include_directories,
1272 attribute_filter_expression=attribute_filter_expression,
1273 show_attributes=show_attributes,
1274 )
1275 else:
1276 objects = self._storage_provider.list_objects(
1277 effective_path,
1278 start_after=start_after,
1279 end_at=end_at,
1280 include_directories=include_directories,
1281 attribute_filter_expression=attribute_filter_expression,
1282 show_attributes=show_attributes,
1283 symlink_handling=symlink_handling,
1284 )
1285
1286 yield from self._filter_and_decorate(objects, include_url_prefix, pattern_matcher)
1287
[docs]
1288 def generate_presigned_url(
1289 self,
1290 path: str,
1291 *,
1292 method: str = "GET",
1293 signer_type: Optional[SignerType] = None,
1294 signer_options: Optional[dict[str, Any]] = None,
1295 ) -> str:
1296 return self._storage_provider.generate_presigned_url(
1297 path, method=method, signer_type=signer_type, signer_options=signer_options
1298 )