1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import contextlib
18import logging
19import os
20import threading
21from collections.abc import Iterator
22from datetime import datetime, timezone
23from io import BytesIO
24from pathlib import PurePosixPath
25from typing import IO, Any, List, Optional, Union, cast
26
27from ..config import StorageClientConfig
28from ..constants import MEMORY_LOAD_LIMIT
29from ..file import ObjectFile, PosixFile
30from ..providers.posix_file import PosixFileStorageProvider
31from ..replica_manager import ReplicaManager
32from ..retry import retry
33from ..sync import SyncManager
34from ..types import (
35 AWARE_DATETIME_MIN,
36 MSC_PROTOCOL,
37 ExecutionMode,
38 ObjectMetadata,
39 PatternList,
40 Range,
41 Replica,
42 ResolvedPathState,
43 SourceVersionCheckMode,
44 StorageProvider,
45 SyncResult,
46)
47from ..utils import NullStorageClient, PatternMatcher, join_paths
48from .types import AbstractStorageClient
49
50logger = logging.getLogger(__name__)
51
52
[docs]
53class SingleStorageClient(AbstractStorageClient):
54 """
55 Storage client for single-backend configurations.
56
57 Supports full read and write operations against a single storage provider.
58 """
59
60 _config: StorageClientConfig
61 _storage_provider: StorageProvider
62 _metadata_provider_lock: Optional[threading.Lock] = None
63 _stop_event: Optional[threading.Event] = None
64 _replica_manager: Optional[ReplicaManager] = None
65
66 def __init__(self, config: StorageClientConfig):
67 """
68 Initialize the :py:class:`SingleStorageClient` with the given configuration.
69
70 :param config: Storage client configuration with storage_provider set
71 :raises ValueError: If config has storage_provider_profiles (multi-backend)
72 """
73 self._initialize_providers(config)
74 self._initialize_replicas(config.replicas)
75
76 def _initialize_providers(self, config: StorageClientConfig) -> None:
77 if config.storage_provider_profiles:
78 raise ValueError(
79 "SingleStorageClient requires storage_provider, not storage_provider_profiles. "
80 "Use CompositeStorageClient for multi-backend configurations."
81 )
82
83 if config.storage_provider is None:
84 raise ValueError("SingleStorageClient requires storage_provider to be set.")
85
86 self._config = config
87 self._credentials_provider = self._config.credentials_provider
88 self._storage_provider = cast(StorageProvider, self._config.storage_provider)
89 self._metadata_provider = self._config.metadata_provider
90 self._cache_config = self._config.cache_config
91 self._retry_config = self._config.retry_config
92 self._cache_manager = self._config.cache_manager
93 self._autocommit_config = self._config.autocommit_config
94
95 if self._autocommit_config:
96 if self._metadata_provider:
97 logger.debug("Creating auto-commiter thread")
98
99 if self._autocommit_config.interval_minutes:
100 self._stop_event = threading.Event()
101 self._commit_thread = threading.Thread(
102 target=self._committer_thread,
103 daemon=True,
104 args=(self._autocommit_config.interval_minutes, self._stop_event),
105 )
106 self._commit_thread.start()
107
108 if self._autocommit_config.at_exit:
109 atexit.register(self._commit_on_exit)
110
111 self._metadata_provider_lock = threading.Lock()
112 else:
113 logger.debug("No metadata provider configured, auto-commit will not be enabled")
114
115 def _initialize_replicas(self, replicas: list[Replica]) -> None:
116 """Initialize replica StorageClient instances (facade)."""
117 # Import here to avoid circular dependency
118 from .client import StorageClient as StorageClientFacade
119
120 # Sort replicas by read_priority, the first one is the primary replica.
121 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority)
122
123 replica_clients = []
124 for replica in sorted_replicas:
125 if self._config._config_dict is None:
126 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config")
127 replica_config = StorageClientConfig.from_dict(
128 config_dict=self._config._config_dict, profile=replica.replica_profile
129 )
130
131 storage_client = StorageClientFacade(config=replica_config)
132 replica_clients.append(storage_client)
133
134 self._replicas = replica_clients
135 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None
136
137 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event):
138 if not stop_event:
139 raise RuntimeError("Stop event not set")
140
141 while not stop_event.is_set():
142 # Wait with the ability to exit early
143 if stop_event.wait(timeout=commit_interval_minutes * 60):
144 break
145 logger.debug("Auto-committing to metadata provider")
146 self.commit_metadata()
147
148 def _commit_on_exit(self):
149 logger.debug("Shutting down, committing metadata one last time...")
150 self.commit_metadata()
151
152 def _get_source_version(self, path: str) -> Optional[str]:
153 """
154 Get etag from metadata provider or storage provider.
155 """
156 if self._metadata_provider:
157 metadata = self._metadata_provider.get_object_metadata(path)
158 else:
159 metadata = self._storage_provider.get_object_metadata(path)
160 return metadata.etag
161
162 def _is_cache_enabled(self) -> bool:
163 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider()
164 return enabled
165
166 def _is_posix_file_storage_provider(self) -> bool:
167 """
168 :return: ``True`` if the storage client is using a POSIX file storage provider, ``False`` otherwise.
169 """
170 return isinstance(self._storage_provider, PosixFileStorageProvider)
171
172 def _is_rust_client_enabled(self) -> bool:
173 """
174 :return: ``True`` if the storage provider is using the Rust client, ``False`` otherwise.
175 """
176 return hasattr(self._storage_provider, "_rust_client")
177
178 def _read_from_replica_or_primary(self, path: str) -> bytes:
179 """
180 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files.
181 """
182 if self._replica_manager is None:
183 raise RuntimeError("Replica manager is not initialized")
184 file_obj = BytesIO()
185 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider)
186 return file_obj.getvalue()
187
188 def __del__(self):
189 if self._stop_event:
190 self._stop_event.set()
191 if self._commit_thread.is_alive():
192 self._commit_thread.join(timeout=5.0)
193
194 def __getstate__(self) -> dict[str, Any]:
195 state = self.__dict__.copy()
196 del state["_credentials_provider"]
197 del state["_storage_provider"]
198 del state["_metadata_provider"]
199 del state["_cache_manager"]
200
201 if "_metadata_provider_lock" in state:
202 del state["_metadata_provider_lock"]
203
204 if "_replicas" in state:
205 del state["_replicas"]
206
207 # Replica manager could be disabled if it's set to None in the state.
208 if "_replica_manager" in state:
209 if state["_replica_manager"] is not None:
210 del state["_replica_manager"]
211
212 return state
213
214 def __setstate__(self, state: dict[str, Any]) -> None:
215 config = state["_config"]
216 self._initialize_providers(config)
217
218 # Replica manager could be disabled if it's set to None in the state.
219 if "_replica_manager" in state and state["_replica_manager"] is None:
220 self._replica_manager = None
221 else:
222 self._initialize_replicas(config.replicas)
223
224 if self._metadata_provider:
225 self._metadata_provider_lock = threading.Lock()
226
227 @property
228 def profile(self) -> str:
229 """
230 :return: The profile name of the storage client.
231 """
232 return self._config.profile
233
[docs]
234 def is_default_profile(self) -> bool:
235 """
236 :return: ``True`` if the storage client is using the default profile, ``False`` otherwise.
237 """
238 return self._config.profile == "default"
239
240 @property
241 def replicas(self) -> List[AbstractStorageClient]:
242 """
243 :return: List of replica storage clients, sorted by read priority.
244 """
245 return self._replicas
246
247 @retry
248 def read(
249 self,
250 path: str,
251 byte_range: Optional[Range] = None,
252 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
253 ) -> bytes:
254 """
255 Read bytes from a file at the specified logical path.
256
257 :param path: The logical path of the object to read.
258 :param byte_range: Optional byte range to read (offset and length).
259 :param check_source_version: Whether to check the source version of cached objects.
260 :return: The content of the object as bytes.
261 :raises FileNotFoundError: If the file at the specified path does not exist.
262 """
263 if self._metadata_provider:
264 resolved = self._metadata_provider.realpath(path)
265 if not resolved.exists:
266 raise FileNotFoundError(f"The file at path '{path}' was not found.")
267 path = resolved.physical_path
268
269 # Handle caching logic
270 if self._is_cache_enabled() and self._cache_manager:
271 if byte_range:
272 # Range request with cache
273 try:
274 # Fetch metadata for source version checking (if needed)
275 metadata = None
276 source_version = None
277 if check_source_version == SourceVersionCheckMode.ENABLE:
278 metadata = self._storage_provider.get_object_metadata(path)
279 source_version = metadata.etag
280 elif check_source_version == SourceVersionCheckMode.INHERIT:
281 if self._cache_manager.check_source_version():
282 metadata = self._storage_provider.get_object_metadata(path)
283 source_version = metadata.etag
284
285 # Optimization: For full-file reads (offset=0, size >= file_size), cache whole file instead of chunking
286 # This avoids creating many small chunks when the user requests the entire file.
287 # Only apply this optimization when metadata is already available (i.e., when version checking is enabled),
288 # to respect the user's choice to disable version checking and avoid extra HEAD requests.
289 if byte_range.offset == 0 and metadata and byte_range.size >= metadata.content_length:
290 full_file_data = self._storage_provider.get_object(path)
291 self._cache_manager.set(path, full_file_data, source_version)
292 return full_file_data[: metadata.content_length]
293
294 # Use chunk-based caching for partial reads or when optimization doesn't apply
295 data = self._cache_manager.read(
296 key=path,
297 source_version=source_version,
298 byte_range=byte_range,
299 storage_provider=self._storage_provider,
300 )
301 if data is not None:
302 return data
303 # Fallback (should not normally happen)
304 return self._storage_provider.get_object(path, byte_range=byte_range)
305 except (FileNotFoundError, Exception):
306 # Fall back to direct read if metadata fetching fails
307 return self._storage_provider.get_object(path, byte_range=byte_range)
308 else:
309 # Full file read with cache
310 source_version = self._get_source_version(path)
311 data = self._cache_manager.read(path, source_version)
312 if data is None:
313 if self._replica_manager:
314 data = self._read_from_replica_or_primary(path)
315 else:
316 data = self._storage_provider.get_object(path)
317 self._cache_manager.set(path, data, source_version)
318 return data
319 elif self._replica_manager:
320 # No cache, but replicas available
321 return self._read_from_replica_or_primary(path)
322 else:
323 # No cache, no replicas - direct storage provider read
324 return self._storage_provider.get_object(path, byte_range=byte_range)
325
[docs]
326 def info(self, path: str, strict: bool = True) -> ObjectMetadata:
327 """
328 Get metadata for a file at the specified path.
329
330 :param path: The logical path of the object.
331 :param strict: When ``True``, only return committed metadata. When ``False``, include pending changes.
332 :return: ObjectMetadata containing file information (size, last modified, etc.).
333 :raises FileNotFoundError: If the file at the specified path does not exist.
334 """
335 if not path or path == ".": # for the empty path provided by the user
336 if self._is_posix_file_storage_provider():
337 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc)
338 else:
339 last_modified = AWARE_DATETIME_MIN
340 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified)
341
342 if not self._metadata_provider:
343 return self._storage_provider.get_object_metadata(path, strict=strict)
344
345 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory.
346 # TODO: Consider passing strict argument to the metadata provider.
347 try:
348 return self._metadata_provider.get_object_metadata(path)
349 except FileNotFoundError:
350 # Try listing from the parent to determine if path is a valid directory
351 parent = os.path.dirname(path.rstrip("/")) + "/"
352 parent = "" if parent == "/" else parent
353 target = path.rstrip("/") + "/"
354
355 try:
356 entries = self._metadata_provider.list_objects(parent, include_directories=True)
357 for entry in entries:
358 if entry.key == target and entry.type == "directory":
359 return entry
360 except Exception:
361 pass
362 raise # Raise original FileNotFoundError
363
364 @retry
365 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None:
366 """
367 Download a remote file to a local path or file-like object.
368
369 :param remote_path: The logical path of the remote file to download.
370 :param local_path: The local file path or file-like object to write to.
371 :raises FileNotFoundError: If the remote file does not exist.
372 """
373 if self._metadata_provider:
374 resolved = self._metadata_provider.realpath(remote_path)
375 if not resolved.exists:
376 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.")
377
378 metadata = self._metadata_provider.get_object_metadata(remote_path)
379 self._storage_provider.download_file(resolved.physical_path, local_path, metadata)
380 elif self._replica_manager:
381 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider)
382 else:
383 self._storage_provider.download_file(remote_path, local_path)
384
385 @retry
386 def upload_file(
387 self, remote_path: str, local_path: Union[str, IO], attributes: Optional[dict[str, str]] = None
388 ) -> None:
389 """
390 Uploads a file from the local file system to the storage provider.
391
392 :param remote_path: The path where the object will be stored.
393 :param local_path: The source file to upload. This can either be a string representing the local
394 file path, or a file-like object (e.g., an open file handle).
395 :param attributes: The attributes to add to the file if a new file is created.
396 """
397 virtual_path = remote_path
398 if self._metadata_provider:
399 resolved = self._metadata_provider.realpath(remote_path)
400 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED):
401 # File exists or has been deleted
402 if not self._metadata_provider.allow_overwrites():
403 raise FileExistsError(
404 f"The file at path '{virtual_path}' already exists; "
405 f"overwriting is not allowed when using a metadata provider."
406 )
407 # Generate path for overwrite (future: may return different path for versioning)
408 remote_path = self._metadata_provider.generate_physical_path(
409 remote_path, for_overwrite=True
410 ).physical_path
411 else:
412 # New file - generate path
413 remote_path = self._metadata_provider.generate_physical_path(
414 remote_path, for_overwrite=False
415 ).physical_path
416
417 # if metdata provider is present, we only write attributes to the metadata provider
418 self._storage_provider.upload_file(remote_path, local_path, attributes=None)
419
420 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
421 obj_metadata = self._storage_provider.get_object_metadata(remote_path)
422 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
423 with self._metadata_provider_lock or contextlib.nullcontext():
424 self._metadata_provider.add_file(virtual_path, obj_metadata)
425 else:
426 self._storage_provider.upload_file(remote_path, local_path, attributes)
427
428 @retry
429 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None:
430 """
431 Write bytes to a file at the specified path.
432
433 :param path: The logical path where the object will be written.
434 :param body: The content to write as bytes.
435 :param attributes: Optional attributes to add to the file.
436 """
437 virtual_path = path
438 if self._metadata_provider:
439 resolved = self._metadata_provider.realpath(path)
440 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED):
441 # File exists or has been deleted
442 if not self._metadata_provider.allow_overwrites():
443 raise FileExistsError(
444 f"The file at path '{virtual_path}' already exists; "
445 f"overwriting is not allowed when using a metadata provider."
446 )
447 # Generate path for overwrite (future: may return different path for versioning)
448 path = self._metadata_provider.generate_physical_path(path, for_overwrite=True).physical_path
449 else:
450 # New file - generate path
451 path = self._metadata_provider.generate_physical_path(path, for_overwrite=False).physical_path
452
453 # if metadata provider is present, we only write attributes to the metadata provider
454 self._storage_provider.put_object(path, body, attributes=None)
455
456 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
457 obj_metadata = self._storage_provider.get_object_metadata(path)
458 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
459 with self._metadata_provider_lock or contextlib.nullcontext():
460 self._metadata_provider.add_file(virtual_path, obj_metadata)
461 else:
462 self._storage_provider.put_object(path, body, attributes=attributes)
463
[docs]
464 def copy(self, src_path: str, dest_path: str) -> None:
465 """
466 Copy a file from source path to destination path.
467
468 :param src_path: The logical path of the source object.
469 :param dest_path: The logical path where the object will be copied to.
470 :raises FileNotFoundError: If the source file does not exist.
471 """
472 virtual_dest_path = dest_path
473 if self._metadata_provider:
474 # Source: must exist
475 src_resolved = self._metadata_provider.realpath(src_path)
476 if not src_resolved.exists:
477 raise FileNotFoundError(f"The file at path '{src_path}' was not found.")
478 src_path = src_resolved.physical_path
479
480 # Destination: check for overwrites
481 dest_resolved = self._metadata_provider.realpath(dest_path)
482 if dest_resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED):
483 # Destination exists or has been deleted
484 if not self._metadata_provider.allow_overwrites():
485 raise FileExistsError(
486 f"The file at path '{virtual_dest_path}' already exists; "
487 f"overwriting is not allowed when using a metadata provider."
488 )
489 # Generate path for overwrite
490 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=True).physical_path
491 else:
492 # New file - generate path
493 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=False).physical_path
494
495 self._storage_provider.copy_object(src_path, dest_path)
496
497 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
498 obj_metadata = self._storage_provider.get_object_metadata(dest_path)
499 with self._metadata_provider_lock or contextlib.nullcontext():
500 self._metadata_provider.add_file(virtual_dest_path, obj_metadata)
501 else:
502 self._storage_provider.copy_object(src_path, dest_path)
503
[docs]
504 def delete(self, path: str, recursive: bool = False) -> None:
505 """
506 Deletes an object at the specified path.
507
508 :param path: The logical path of the object or directory to delete.
509 :param recursive: Whether to delete objects in the path recursively.
510 """
511 obj_metadata = self.info(path)
512 is_dir = obj_metadata and obj_metadata.type == "directory"
513 is_file = obj_metadata and obj_metadata.type == "file"
514 if recursive and is_dir:
515 self.sync_from(
516 cast(AbstractStorageClient, NullStorageClient()),
517 path,
518 path,
519 delete_unmatched_files=True,
520 num_worker_processes=1,
521 description="Deleting",
522 )
523 # If this is a posix storage provider, we need to also delete remaining directory stubs.
524 # TODO: Nofity metadata provider for the changes.
525 if self._is_posix_file_storage_provider():
526 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider)
527 posix_storage_provider.rmtree(path)
528 return
529 else:
530 # 1) If path is a file: delete the file
531 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag
532 if is_file:
533 virtual_path = path
534 if self._metadata_provider:
535 resolved = self._metadata_provider.realpath(path)
536 if not resolved.exists:
537 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.")
538
539 # Check if soft-delete is enabled
540 if not self._metadata_provider.should_use_soft_delete():
541 # Hard delete: remove both physical file and metadata
542 self._storage_provider.delete_object(resolved.physical_path)
543
544 with self._metadata_provider_lock or contextlib.nullcontext():
545 self._metadata_provider.remove_file(virtual_path)
546 else:
547 self._storage_provider.delete_object(path)
548
549 # Delete the cached file if it exists
550 if self._is_cache_enabled():
551 if self._cache_manager is None:
552 raise RuntimeError("Cache manager is not initialized")
553 self._cache_manager.delete(virtual_path)
554
555 # Delete from replicas if replica manager exists
556 if self._replica_manager:
557 self._replica_manager.delete_from_replicas(virtual_path)
558 elif is_dir:
559 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.")
560 else:
561 raise FileNotFoundError(f"The file at '{path}' was not found.")
562
[docs]
563 def glob(
564 self,
565 pattern: str,
566 include_url_prefix: bool = False,
567 attribute_filter_expression: Optional[str] = None,
568 ) -> list[str]:
569 """
570 Matches and retrieves a list of object keys in the storage provider that match the specified pattern.
571
572 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``).
573 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
574 :param attribute_filter_expression: The attribute filter expression to apply to the result.
575 :return: A list of object paths that match the specified pattern.
576 """
577 if self._metadata_provider:
578 results = self._metadata_provider.glob(pattern, attribute_filter_expression)
579 else:
580 results = self._storage_provider.glob(pattern, attribute_filter_expression)
581
582 if include_url_prefix:
583 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results]
584
585 return results
586
[docs]
587 def list(
588 self,
589 prefix: str = "",
590 path: str = "",
591 start_after: Optional[str] = None,
592 end_at: Optional[str] = None,
593 include_directories: bool = False,
594 include_url_prefix: bool = False,
595 attribute_filter_expression: Optional[str] = None,
596 show_attributes: bool = False,
597 follow_symlinks: bool = True,
598 patterns: Optional[PatternList] = None,
599 ) -> Iterator[ObjectMetadata]:
600 """
601 List objects in the storage provider under the specified path.
602
603 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is
604 deprecated and will be removed in a future version.
605
606 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under.
607 :param path: The directory or file path to list objects under. This should be a
608 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
609 Cannot be used together with ``prefix``.
610 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
611 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
612 :param include_directories: Whether to include directories in the result. when ``True``, directories are returned alongside objects.
613 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
614 :param attribute_filter_expression: The attribute filter expression to apply to the result.
615 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to ``True``.
616 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing.
617 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
618 :return: An iterator over ObjectMetadata for matching objects.
619 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty).
620 """
621 # Parameter validation - either path or prefix, not both
622 if path and prefix:
623 raise ValueError(
624 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). "
625 f"Please use only the 'path' parameter for new code. "
626 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
627 )
628 elif prefix:
629 logger.debug(
630 f"The 'prefix' parameter is deprecated and will be removed in a future version. "
631 f"Please use the 'path' parameter instead. "
632 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
633 )
634
635 # Apply patterns to the objects
636 pattern_matcher = PatternMatcher(patterns) if patterns else None
637
638 # Use path if provided, otherwise fall back to prefix
639 effective_path = path if path else prefix
640
641 if effective_path:
642 if self.is_file(effective_path):
643 if pattern_matcher and not pattern_matcher.should_include_file(effective_path):
644 return iter([])
645
646 try:
647 object_metadata = self.info(effective_path)
648 if include_url_prefix:
649 if self.is_default_profile():
650 object_metadata.key = str(PurePosixPath("/") / object_metadata.key)
651 else:
652 object_metadata.key = join_paths(
653 f"{MSC_PROTOCOL}{self._config.profile}", object_metadata.key
654 )
655 yield object_metadata # short circuit if the path is a file
656 return
657 except FileNotFoundError:
658 pass
659 else:
660 effective_path = effective_path.rstrip("/") + "/"
661
662 if self._metadata_provider:
663 objects = self._metadata_provider.list_objects(
664 effective_path,
665 start_after=start_after,
666 end_at=end_at,
667 include_directories=include_directories,
668 attribute_filter_expression=attribute_filter_expression,
669 show_attributes=show_attributes,
670 )
671 else:
672 objects = self._storage_provider.list_objects(
673 effective_path,
674 start_after=start_after,
675 end_at=end_at,
676 include_directories=include_directories,
677 attribute_filter_expression=attribute_filter_expression,
678 show_attributes=show_attributes,
679 follow_symlinks=follow_symlinks,
680 )
681
682 for object in objects:
683 # Skip objects that do not match the patterns
684 if pattern_matcher and not pattern_matcher.should_include_file(object.key):
685 continue
686
687 if include_url_prefix:
688 if self.is_default_profile():
689 object.key = str(PurePosixPath("/") / object.key)
690 else:
691 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key)
692
693 yield object
694
[docs]
695 def open(
696 self,
697 path: str,
698 mode: str = "rb",
699 buffering: int = -1,
700 encoding: Optional[str] = None,
701 disable_read_cache: bool = False,
702 memory_load_limit: int = MEMORY_LOAD_LIMIT,
703 atomic: bool = True,
704 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
705 attributes: Optional[dict[str, str]] = None,
706 prefetch_file: bool = True,
707 ) -> Union[PosixFile, ObjectFile]:
708 """
709 Open a file for reading or writing.
710
711 :param path: The logical path of the object to open.
712 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab".
713 :param buffering: The buffering mode. Only applies to PosixFile.
714 :param encoding: The encoding to use for text files.
715 :param disable_read_cache: When set to ``True``, disables caching for file content.
716 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
717 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB.
718 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB.
719 :param atomic: When set to ``True``, file will be written atomically (rename upon close).
720 This parameter is only applicable to PosixFile in write mode.
721 :param check_source_version: Whether to check the source version of cached objects.
722 :param attributes: Attributes to add to the file.
723 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None.
724 :param prefetch_file: Whether to prefetch the file content.
725 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True.
726 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
727 :raises FileNotFoundError: If the file does not exist (read mode).
728 """
729 if self._is_posix_file_storage_provider():
730 return PosixFile(
731 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes
732 )
733 else:
734 if atomic is False:
735 logger.warning("Non-atomic writes are not supported for object storage providers.")
736
737 return ObjectFile(
738 self,
739 remote_path=path,
740 mode=mode,
741 encoding=encoding,
742 disable_read_cache=disable_read_cache,
743 memory_load_limit=memory_load_limit,
744 check_source_version=check_source_version,
745 attributes=attributes,
746 prefetch_file=prefetch_file,
747 )
748
[docs]
749 def get_posix_path(self, path: str) -> Optional[str]:
750 """
751 Returns the physical POSIX filesystem path for POSIX storage providers.
752
753 :param path: The path to resolve (may be a symlink or virtual path).
754 :return: Physical POSIX filesystem path if POSIX storage, None otherwise.
755 """
756 if not self._is_posix_file_storage_provider():
757 return None
758
759 if self._metadata_provider:
760 resolved = self._metadata_provider.realpath(path)
761 realpath = resolved.physical_path
762 else:
763 realpath = path
764
765 return cast(PosixFileStorageProvider, self._storage_provider)._prepend_base_path(realpath)
766
[docs]
767 def is_file(self, path: str) -> bool:
768 """
769 Checks whether the specified path points to a file (rather than a folder or directory).
770
771 :param path: The logical path to check.
772 :return: ``True`` if the key points to a file, ``False`` otherwise.
773 """
774 if self._metadata_provider:
775 resolved = self._metadata_provider.realpath(path)
776 return resolved.exists
777
778 return self._storage_provider.is_file(path)
779
801
[docs]
802 def is_empty(self, path: str) -> bool:
803 """
804 Check whether the specified path is empty. A path is considered empty if there are no
805 objects whose keys start with the given path as a prefix.
806
807 :param path: The logical path to check (typically a directory or folder prefix).
808 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
809 """
810 if self._metadata_provider:
811 objects = self._metadata_provider.list_objects(path)
812 else:
813 objects = self._storage_provider.list_objects(path)
814
815 try:
816 return next(objects) is None
817 except StopIteration:
818 pass
819
820 return True
821
[docs]
822 def sync_from(
823 self,
824 source_client: AbstractStorageClient,
825 source_path: str = "",
826 target_path: str = "",
827 delete_unmatched_files: bool = False,
828 description: str = "Syncing",
829 num_worker_processes: Optional[int] = None,
830 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
831 patterns: Optional[PatternList] = None,
832 preserve_source_attributes: bool = False,
833 follow_symlinks: bool = True,
834 source_files: Optional[List[str]] = None,
835 ignore_hidden: bool = True,
836 commit_metadata: bool = True,
837 ) -> SyncResult:
838 """
839 Syncs files from the source storage client to "path/".
840
841 :param source_client: The source storage client.
842 :param source_path: The logical path to sync from.
843 :param target_path: The logical path to sync to.
844 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
845 :param description: Description of sync process for logging purposes.
846 :param num_worker_processes: The number of worker processes to use.
847 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
848 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
849 Cannot be used together with source_files.
850 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization.
851 When ``False`` (default), only file content is copied. When ``True``, custom metadata attributes are also preserved.
852
853 .. warning::
854 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD
855 request for each object to retrieve attributes, which can significantly impact performance on large-scale
856 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile.
857
858 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is ``True``.
859 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these
860 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns.
861 :param ignore_hidden: Whether to ignore hidden files and directories. Default is ``True``.
862 :param commit_metadata: When ``True`` (default), calls :py:meth:`StorageClient.commit_metadata` after sync completes.
863 Set to ``False`` to skip the commit, allowing batching of multiple sync operations before committing manually.
864 :raises ValueError: If both source_files and patterns are provided.
865 :raises RuntimeError: If errors occur during sync operations. The sync will stop on first error (fail-fast).
866 """
867 if source_files and patterns:
868 raise ValueError("Cannot specify both 'source_files' and 'patterns'. Please use only one filtering method.")
869
870 pattern_matcher = PatternMatcher(patterns) if patterns else None
871
872 # Disable the replica manager during sync
873 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager:
874 # Import here to avoid circular dependency
875 from .client import StorageClient as StorageClientFacade
876
877 source_client = StorageClientFacade(source_client._config)
878 source_client._replica_manager = None
879
880 m = SyncManager(source_client, source_path, self, target_path)
881
882 return m.sync_objects(
883 execution_mode=execution_mode,
884 description=description,
885 num_worker_processes=num_worker_processes,
886 delete_unmatched_files=delete_unmatched_files,
887 pattern_matcher=pattern_matcher,
888 preserve_source_attributes=preserve_source_attributes,
889 follow_symlinks=follow_symlinks,
890 source_files=source_files,
891 ignore_hidden=ignore_hidden,
892 commit_metadata=commit_metadata,
893 )
894
[docs]
895 def sync_replicas(
896 self,
897 source_path: str,
898 replica_indices: Optional[List[int]] = None,
899 delete_unmatched_files: bool = False,
900 description: str = "Syncing replica",
901 num_worker_processes: Optional[int] = None,
902 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
903 patterns: Optional[PatternList] = None,
904 ignore_hidden: bool = True,
905 ) -> None:
906 """
907 Sync files from this client to its replica storage clients.
908
909 :param source_path: The logical path to sync from.
910 :param replica_indices: Specific replica indices to sync to (0-indexed). If None, syncs to all replicas.
911 :param delete_unmatched_files: When set to ``True``, delete files in replicas that don't exist in source.
912 :param description: Description of sync process for logging purposes.
913 :param num_worker_processes: Number of worker processes for parallel sync.
914 :param execution_mode: Execution mode (LOCAL or REMOTE).
915 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
916 :param ignore_hidden: When set to ``True``, ignore hidden files (starting with '.'). Defaults to ``True``.
917 """
918 if not self._replicas:
919 logger.warning(
920 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable "
921 "secondary storage locations for redundancy and performance.",
922 self._config.profile,
923 )
924 return None
925
926 if replica_indices:
927 try:
928 replicas = [self._replicas[i] for i in replica_indices]
929 except IndexError as e:
930 raise ValueError(f"Replica index out of range: {replica_indices}") from e
931 else:
932 replicas = self._replicas
933
934 # Disable the replica manager during sync
935 if self._replica_manager:
936 # Import here to avoid circular dependency
937 from .client import StorageClient as StorageClientFacade
938
939 source_client = StorageClientFacade(self._config)
940 source_client._replica_manager = None
941 else:
942 source_client = self
943
944 for replica in replicas:
945 replica.sync_from(
946 source_client,
947 source_path,
948 source_path,
949 delete_unmatched_files=delete_unmatched_files,
950 description=f"{description} ({replica.profile})",
951 num_worker_processes=num_worker_processes,
952 execution_mode=execution_mode,
953 patterns=patterns,
954 ignore_hidden=ignore_hidden,
955 )