1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import contextlib
18import logging
19import os
20import threading
21from collections.abc import Iterator
22from datetime import datetime, timezone
23from io import BytesIO
24from pathlib import PurePosixPath
25from typing import IO, Any, Optional, Union, cast
26
27from ..config import StorageClientConfig
28from ..constants import DEFAULT_SYNC_BATCH_SIZE, MEMORY_LOAD_LIMIT
29from ..file import ObjectFile, PosixFile
30from ..providers.posix_file import PosixFileStorageProvider
31from ..replica_manager import ReplicaManager
32from ..retry import retry
33from ..sync import SyncManager
34from ..types import (
35 AWARE_DATETIME_MIN,
36 MSC_PROTOCOL,
37 ExecutionMode,
38 ObjectMetadata,
39 PatternList,
40 Range,
41 Replica,
42 ResolvedPathState,
43 SignerType,
44 SourceVersionCheckMode,
45 StorageProvider,
46 SyncResult,
47)
48from ..utils import NullStorageClient, PatternMatcher, join_paths
49from .types import AbstractStorageClient
50
51logger = logging.getLogger(__name__)
52
53
[docs]
54class SingleStorageClient(AbstractStorageClient):
55 """
56 Storage client for single-backend configurations.
57
58 Supports full read and write operations against a single storage provider.
59 """
60
61 _config: StorageClientConfig
62 _storage_provider: StorageProvider
63 _metadata_provider_lock: Optional[threading.Lock] = None
64 _stop_event: Optional[threading.Event] = None
65 _replica_manager: Optional[ReplicaManager] = None
66
67 def __init__(self, config: StorageClientConfig):
68 """
69 Initialize the :py:class:`SingleStorageClient` with the given configuration.
70
71 :param config: Storage client configuration with storage_provider set
72 :raises ValueError: If config has storage_provider_profiles (multi-backend)
73 """
74 self._initialize_providers(config)
75 self._initialize_replicas(config.replicas)
76
77 def _initialize_providers(self, config: StorageClientConfig) -> None:
78 if config.storage_provider_profiles:
79 raise ValueError(
80 "SingleStorageClient requires storage_provider, not storage_provider_profiles. "
81 "Use CompositeStorageClient for multi-backend configurations."
82 )
83
84 if config.storage_provider is None:
85 raise ValueError("SingleStorageClient requires storage_provider to be set.")
86
87 self._config = config
88 self._credentials_provider = self._config.credentials_provider
89 self._storage_provider = cast(StorageProvider, self._config.storage_provider)
90 self._metadata_provider = self._config.metadata_provider
91 self._cache_config = self._config.cache_config
92 self._retry_config = self._config.retry_config
93 self._cache_manager = self._config.cache_manager
94 self._autocommit_config = self._config.autocommit_config
95
96 if self._autocommit_config:
97 if self._metadata_provider:
98 logger.debug("Creating auto-commiter thread")
99
100 if self._autocommit_config.interval_minutes:
101 self._stop_event = threading.Event()
102 self._commit_thread = threading.Thread(
103 target=self._committer_thread,
104 daemon=True,
105 args=(self._autocommit_config.interval_minutes, self._stop_event),
106 )
107 self._commit_thread.start()
108
109 if self._autocommit_config.at_exit:
110 atexit.register(self._commit_on_exit)
111
112 self._metadata_provider_lock = threading.Lock()
113 else:
114 logger.debug("No metadata provider configured, auto-commit will not be enabled")
115
116 def _initialize_replicas(self, replicas: list[Replica]) -> None:
117 """Initialize replica StorageClient instances (facade)."""
118 # Import here to avoid circular dependency
119 from .client import StorageClient as StorageClientFacade
120
121 # Sort replicas by read_priority, the first one is the primary replica.
122 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority)
123
124 replica_clients = []
125 for replica in sorted_replicas:
126 if self._config._config_dict is None:
127 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config")
128 replica_config = StorageClientConfig.from_dict(
129 config_dict=self._config._config_dict, profile=replica.replica_profile
130 )
131
132 storage_client = StorageClientFacade(config=replica_config)
133 replica_clients.append(storage_client)
134
135 self._replicas = replica_clients
136 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None
137
138 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event):
139 if not stop_event:
140 raise RuntimeError("Stop event not set")
141
142 while not stop_event.is_set():
143 # Wait with the ability to exit early
144 if stop_event.wait(timeout=commit_interval_minutes * 60):
145 break
146 logger.debug("Auto-committing to metadata provider")
147 self.commit_metadata()
148
149 def _commit_on_exit(self):
150 logger.debug("Shutting down, committing metadata one last time...")
151 self.commit_metadata()
152
153 def _get_source_version(self, path: str) -> Optional[str]:
154 """
155 Get etag from metadata provider or storage provider.
156 """
157 if self._metadata_provider:
158 metadata = self._metadata_provider.get_object_metadata(path)
159 else:
160 metadata = self._storage_provider.get_object_metadata(path)
161 return metadata.etag
162
163 def _is_cache_enabled(self) -> bool:
164 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider()
165 return enabled
166
167 def _is_posix_file_storage_provider(self) -> bool:
168 """
169 :return: ``True`` if the storage client is using a POSIX file storage provider, ``False`` otherwise.
170 """
171 return isinstance(self._storage_provider, PosixFileStorageProvider)
172
173 def _is_rust_client_enabled(self) -> bool:
174 """
175 :return: ``True`` if the storage provider is using the Rust client, ``False`` otherwise.
176 """
177 return getattr(self._storage_provider, "_rust_client", None) is not None
178
179 def _read_from_replica_or_primary(self, path: str) -> bytes:
180 """
181 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files.
182 """
183 if self._replica_manager is None:
184 raise RuntimeError("Replica manager is not initialized")
185 file_obj = BytesIO()
186 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider)
187 return file_obj.getvalue()
188
189 def __del__(self):
190 if self._stop_event:
191 self._stop_event.set()
192 if self._commit_thread.is_alive():
193 self._commit_thread.join(timeout=5.0)
194
195 def __getstate__(self) -> dict[str, Any]:
196 state = self.__dict__.copy()
197 del state["_credentials_provider"]
198 del state["_storage_provider"]
199 del state["_metadata_provider"]
200 del state["_cache_manager"]
201
202 if "_metadata_provider_lock" in state:
203 del state["_metadata_provider_lock"]
204
205 if "_replicas" in state:
206 del state["_replicas"]
207
208 # Replica manager could be disabled if it's set to None in the state.
209 if "_replica_manager" in state:
210 if state["_replica_manager"] is not None:
211 del state["_replica_manager"]
212
213 return state
214
215 def __setstate__(self, state: dict[str, Any]) -> None:
216 config = state["_config"]
217 self._initialize_providers(config)
218
219 # Replica manager could be disabled if it's set to None in the state.
220 if "_replica_manager" in state and state["_replica_manager"] is None:
221 self._replica_manager = None
222 else:
223 self._initialize_replicas(config.replicas)
224
225 if self._metadata_provider:
226 self._metadata_provider_lock = threading.Lock()
227
228 @property
229 def profile(self) -> str:
230 """
231 :return: The profile name of the storage client.
232 """
233 return self._config.profile
234
[docs]
235 def is_default_profile(self) -> bool:
236 """
237 :return: ``True`` if the storage client is using the reserved POSIX profile, ``False`` otherwise.
238 """
239 return self._config.profile == "__filesystem__"
240
241 @property
242 def replicas(self) -> list[AbstractStorageClient]:
243 """
244 :return: List of replica storage clients, sorted by read priority.
245 """
246 return self._replicas
247
248 @retry
249 def read(
250 self,
251 path: str,
252 byte_range: Optional[Range] = None,
253 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
254 ) -> bytes:
255 """
256 Read bytes from a file at the specified logical path.
257
258 :param path: The logical path of the object to read.
259 :param byte_range: Optional byte range to read (offset and length).
260 :param check_source_version: Whether to check the source version of cached objects.
261 :return: The content of the object as bytes.
262 :raises FileNotFoundError: If the file at the specified path does not exist.
263 """
264 if self._metadata_provider:
265 resolved = self._metadata_provider.realpath(path)
266 if not resolved.exists:
267 raise FileNotFoundError(f"The file at path '{path}' was not found.")
268 path = resolved.physical_path
269
270 # Handle caching logic
271 if self._is_cache_enabled() and self._cache_manager:
272 if byte_range:
273 # Range request with cache
274 try:
275 # Fetch metadata for source version checking (if needed)
276 metadata = None
277 source_version = None
278 if check_source_version == SourceVersionCheckMode.ENABLE:
279 metadata = self._storage_provider.get_object_metadata(path)
280 source_version = metadata.etag
281 elif check_source_version == SourceVersionCheckMode.INHERIT:
282 if self._cache_manager.check_source_version():
283 metadata = self._storage_provider.get_object_metadata(path)
284 source_version = metadata.etag
285
286 # Optimization: For full-file reads (offset=0, size >= file_size), cache whole file instead of chunking
287 # This avoids creating many small chunks when the user requests the entire file.
288 # Only apply this optimization when metadata is already available (i.e., when version checking is enabled),
289 # to respect the user's choice to disable version checking and avoid extra HEAD requests.
290 if byte_range.offset == 0 and metadata and byte_range.size >= metadata.content_length:
291 full_file_data = self._storage_provider.get_object(path)
292 self._cache_manager.set(path, full_file_data, source_version)
293 return full_file_data[: metadata.content_length]
294
295 # Use chunk-based caching for partial reads or when optimization doesn't apply
296 data = self._cache_manager.read(
297 key=path,
298 source_version=source_version,
299 byte_range=byte_range,
300 storage_provider=self._storage_provider,
301 source_size=metadata.content_length if metadata else None,
302 )
303 if data is not None:
304 return data
305 # Fallback (should not normally happen)
306 return self._storage_provider.get_object(path, byte_range=byte_range)
307 except (FileNotFoundError, Exception):
308 # Fall back to direct read if metadata fetching fails
309 return self._storage_provider.get_object(path, byte_range=byte_range)
310 else:
311 # Full file read with cache
312 # Only fetch source version if check_source_version is enabled
313 source_version = None
314 if check_source_version == SourceVersionCheckMode.ENABLE:
315 source_version = self._get_source_version(path)
316 elif check_source_version == SourceVersionCheckMode.INHERIT:
317 if self._cache_manager.check_source_version():
318 source_version = self._get_source_version(path)
319
320 data = self._cache_manager.read(path, source_version)
321 if data is None:
322 if self._replica_manager:
323 data = self._read_from_replica_or_primary(path)
324 else:
325 data = self._storage_provider.get_object(path)
326 self._cache_manager.set(path, data, source_version)
327 return data
328 elif self._replica_manager:
329 # No cache, but replicas available
330 return self._read_from_replica_or_primary(path)
331 else:
332 # No cache, no replicas - direct storage provider read
333 return self._storage_provider.get_object(path, byte_range=byte_range)
334
[docs]
335 def info(self, path: str, strict: bool = True) -> ObjectMetadata:
336 """
337 Get metadata for a file at the specified path.
338
339 :param path: The logical path of the object.
340 :param strict: When ``True``, only return committed metadata. When ``False``, include pending changes.
341 :return: ObjectMetadata containing file information (size, last modified, etc.).
342 :raises FileNotFoundError: If the file at the specified path does not exist.
343 """
344 if not path or path == ".": # empty path or '.' provided by the user
345 if self._is_posix_file_storage_provider():
346 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc)
347 else:
348 last_modified = AWARE_DATETIME_MIN
349 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified)
350
351 if not self._metadata_provider:
352 return self._storage_provider.get_object_metadata(path, strict=strict)
353
354 return self._metadata_provider.get_object_metadata(path, include_pending=not strict)
355
356 @retry
357 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None:
358 """
359 Download a remote file to a local path or file-like object.
360
361 :param remote_path: The logical path of the remote file to download.
362 :param local_path: The local file path or file-like object to write to.
363 :raises FileNotFoundError: If the remote file does not exist.
364 """
365 if self._metadata_provider:
366 resolved = self._metadata_provider.realpath(remote_path)
367 if not resolved.exists:
368 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.")
369
370 metadata = self._metadata_provider.get_object_metadata(remote_path)
371 self._storage_provider.download_file(resolved.physical_path, local_path, metadata)
372 elif self._replica_manager:
373 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider)
374 else:
375 self._storage_provider.download_file(remote_path, local_path)
376
377 @retry
378 def upload_file(
379 self, remote_path: str, local_path: Union[str, IO], attributes: Optional[dict[str, str]] = None
380 ) -> None:
381 """
382 Uploads a file from the local file system to the storage provider.
383
384 :param remote_path: The path where the object will be stored.
385 :param local_path: The source file to upload. This can either be a string representing the local
386 file path, or a file-like object (e.g., an open file handle).
387 :param attributes: The attributes to add to the file if a new file is created.
388 """
389 virtual_path = remote_path
390 if self._metadata_provider:
391 resolved = self._metadata_provider.realpath(remote_path)
392 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED):
393 # File exists or has been deleted
394 if not self._metadata_provider.allow_overwrites():
395 raise FileExistsError(
396 f"The file at path '{virtual_path}' already exists; "
397 f"overwriting is not allowed when using a metadata provider."
398 )
399 # Generate path for overwrite (future: may return different path for versioning)
400 remote_path = self._metadata_provider.generate_physical_path(
401 remote_path, for_overwrite=True
402 ).physical_path
403 else:
404 # New file - generate path
405 remote_path = self._metadata_provider.generate_physical_path(
406 remote_path, for_overwrite=False
407 ).physical_path
408
409 # if metadata provider is present, we only write attributes to the metadata provider
410 self._storage_provider.upload_file(remote_path, local_path, attributes=None)
411
412 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
413 obj_metadata = self._storage_provider.get_object_metadata(remote_path)
414 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
415 with self._metadata_provider_lock or contextlib.nullcontext():
416 self._metadata_provider.add_file(virtual_path, obj_metadata)
417 else:
418 self._storage_provider.upload_file(remote_path, local_path, attributes)
419
420 @retry
421 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None:
422 """
423 Write bytes to a file at the specified path.
424
425 :param path: The logical path where the object will be written.
426 :param body: The content to write as bytes.
427 :param attributes: Optional attributes to add to the file.
428 """
429 virtual_path = path
430 if self._metadata_provider:
431 resolved = self._metadata_provider.realpath(path)
432 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED):
433 # File exists or has been deleted
434 if not self._metadata_provider.allow_overwrites():
435 raise FileExistsError(
436 f"The file at path '{virtual_path}' already exists; "
437 f"overwriting is not allowed when using a metadata provider."
438 )
439 # Generate path for overwrite (future: may return different path for versioning)
440 path = self._metadata_provider.generate_physical_path(path, for_overwrite=True).physical_path
441 else:
442 # New file - generate path
443 path = self._metadata_provider.generate_physical_path(path, for_overwrite=False).physical_path
444
445 # if metadata provider is present, we only write attributes to the metadata provider
446 self._storage_provider.put_object(path, body, attributes=None)
447
448 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
449 obj_metadata = self._storage_provider.get_object_metadata(path)
450 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
451 with self._metadata_provider_lock or contextlib.nullcontext():
452 self._metadata_provider.add_file(virtual_path, obj_metadata)
453 else:
454 self._storage_provider.put_object(path, body, attributes=attributes)
455
[docs]
456 def copy(self, src_path: str, dest_path: str) -> None:
457 """
458 Copy a file from source path to destination path.
459
460 :param src_path: The logical path of the source object.
461 :param dest_path: The logical path where the object will be copied to.
462 :raises FileNotFoundError: If the source file does not exist.
463 """
464 virtual_dest_path = dest_path
465 if self._metadata_provider:
466 # Source: must exist
467 src_resolved = self._metadata_provider.realpath(src_path)
468 if not src_resolved.exists:
469 raise FileNotFoundError(f"The file at path '{src_path}' was not found.")
470 src_path = src_resolved.physical_path
471
472 # Destination: check for overwrites
473 dest_resolved = self._metadata_provider.realpath(dest_path)
474 if dest_resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED):
475 # Destination exists or has been deleted
476 if not self._metadata_provider.allow_overwrites():
477 raise FileExistsError(
478 f"The file at path '{virtual_dest_path}' already exists; "
479 f"overwriting is not allowed when using a metadata provider."
480 )
481 # Generate path for overwrite
482 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=True).physical_path
483 else:
484 # New file - generate path
485 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=False).physical_path
486
487 self._storage_provider.copy_object(src_path, dest_path)
488
489 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
490 obj_metadata = self._storage_provider.get_object_metadata(dest_path)
491 with self._metadata_provider_lock or contextlib.nullcontext():
492 self._metadata_provider.add_file(virtual_dest_path, obj_metadata)
493 else:
494 self._storage_provider.copy_object(src_path, dest_path)
495
[docs]
496 def delete(self, path: str, recursive: bool = False) -> None:
497 """
498 Deletes an object at the specified path.
499
500 :param path: The logical path of the object or directory to delete.
501 :param recursive: Whether to delete objects in the path recursively.
502 """
503 obj_metadata = self.info(path)
504 is_dir = obj_metadata and obj_metadata.type == "directory"
505 is_file = obj_metadata and obj_metadata.type == "file"
506 if recursive and is_dir:
507 self.sync_from(
508 cast(AbstractStorageClient, NullStorageClient()),
509 path,
510 path,
511 delete_unmatched_files=True,
512 num_worker_processes=1,
513 description="Deleting",
514 )
515 # If this is a posix storage provider, we need to also delete remaining directory stubs.
516 # TODO: Notify metadata provider of the changes.
517 if self._is_posix_file_storage_provider():
518 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider)
519 posix_storage_provider.rmtree(path)
520 return
521 else:
522 # 1) If path is a file: delete the file
523 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag
524 if is_file:
525 virtual_path = path
526 if self._metadata_provider:
527 resolved = self._metadata_provider.realpath(path)
528 if not resolved.exists:
529 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.")
530
531 # Check if soft-delete is enabled
532 if not self._metadata_provider.should_use_soft_delete():
533 # Hard delete: remove both physical file and metadata
534 self._storage_provider.delete_object(resolved.physical_path)
535
536 with self._metadata_provider_lock or contextlib.nullcontext():
537 self._metadata_provider.remove_file(virtual_path)
538 else:
539 self._storage_provider.delete_object(path)
540
541 # Delete the cached file if it exists
542 if self._is_cache_enabled():
543 if self._cache_manager is None:
544 raise RuntimeError("Cache manager is not initialized")
545 self._cache_manager.delete(virtual_path)
546
547 # Delete from replicas if replica manager exists
548 if self._replica_manager:
549 self._replica_manager.delete_from_replicas(virtual_path)
550 elif is_dir:
551 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.")
552 else:
553 raise FileNotFoundError(f"The file at '{path}' was not found.")
554
[docs]
555 def delete_many(self, paths: list[str]) -> None:
556 """
557 Delete multiple files at the specified paths. Only files are supported; directories are not deleted.
558
559 :param paths: List of logical paths of the files to delete.
560 """
561 physical_paths_to_delete: list[str] = []
562 for path in paths:
563 if self._metadata_provider:
564 resolved = self._metadata_provider.realpath(path)
565 if not resolved.exists:
566 raise FileNotFoundError(f"The file at path '{path}' was not found.")
567 if not self._metadata_provider.should_use_soft_delete():
568 physical_paths_to_delete.append(resolved.physical_path)
569 else:
570 physical_paths_to_delete.append(path)
571
572 if physical_paths_to_delete:
573 self._storage_provider.delete_objects(physical_paths_to_delete)
574
575 for path in paths:
576 virtual_path = path
577 if self._metadata_provider:
578 with self._metadata_provider_lock or contextlib.nullcontext():
579 self._metadata_provider.remove_file(virtual_path)
580 if self._is_cache_enabled():
581 if self._cache_manager is None:
582 raise RuntimeError("Cache manager is not initialized")
583 self._cache_manager.delete(virtual_path)
584 if self._replica_manager:
585 self._replica_manager.delete_from_replicas(virtual_path)
586
[docs]
587 def glob(
588 self,
589 pattern: str,
590 include_url_prefix: bool = False,
591 attribute_filter_expression: Optional[str] = None,
592 ) -> list[str]:
593 """
594 Matches and retrieves a list of object keys in the storage provider that match the specified pattern.
595
596 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``).
597 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
598 :param attribute_filter_expression: The attribute filter expression to apply to the result.
599 :return: A list of object paths that match the specified pattern.
600 """
601 if self._metadata_provider:
602 results = self._metadata_provider.glob(pattern, attribute_filter_expression)
603 else:
604 results = self._storage_provider.glob(pattern, attribute_filter_expression)
605
606 if include_url_prefix:
607 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results]
608
609 return results
610
611 def _resolve_single_file(
612 self,
613 path: str,
614 start_after: Optional[str],
615 end_at: Optional[str],
616 include_url_prefix: bool,
617 pattern_matcher: Optional[PatternMatcher],
618 ) -> tuple[Optional[ObjectMetadata], Optional[str]]:
619 """
620 Resolve whether ``path`` should be handled as a single-file listing result.
621
622 :param path: Candidate file path or directory prefix to resolve.
623 :param start_after: Exclusive lower bound for file key filtering.
624 :param end_at: Inclusive upper bound for file key filtering.
625 :param include_url_prefix: Whether to prefix returned keys with ``msc://profile``.
626 :param pattern_matcher: Optional include/exclude matcher for file keys.
627 :return: A tuple of ``(single_file, normalized_path)``. Returns file metadata and
628 the original path when ``path`` resolves to a file that passes filters;
629 returns ``(None, normalized_directory_path)`` when the caller should
630 continue with directory listing; returns ``(None, None)`` when filtering
631 excludes the single-file candidate and listing should stop.
632 """
633 if not path:
634 return None, path
635
636 if self.is_file(path):
637 if pattern_matcher and not pattern_matcher.should_include_file(path):
638 return None, None
639
640 try:
641 object_metadata = self.info(path)
642 if start_after and object_metadata.key <= start_after:
643 return None, None
644 if end_at and object_metadata.key > end_at:
645 return None, None
646 if include_url_prefix:
647 self._prepend_url_prefix(object_metadata)
648 return object_metadata, path
649 except FileNotFoundError:
650 return None, path.rstrip("/") + "/"
651 else:
652 return None, path.rstrip("/") + "/"
653
654 def _prepend_url_prefix(self, obj: ObjectMetadata) -> None:
655 if self.is_default_profile():
656 obj.key = str(PurePosixPath("/") / obj.key)
657 else:
658 obj.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", obj.key)
659
660 def _filter_and_decorate(
661 self,
662 objects: Iterator[ObjectMetadata],
663 include_url_prefix: bool,
664 pattern_matcher: Optional[PatternMatcher],
665 ) -> Iterator[ObjectMetadata]:
666 for obj in objects:
667 if pattern_matcher and not pattern_matcher.should_include_file(obj.key):
668 continue
669 if include_url_prefix:
670 self._prepend_url_prefix(obj)
671 yield obj
672
[docs]
673 def list_recursive(
674 self,
675 path: str = "",
676 start_after: Optional[str] = None,
677 end_at: Optional[str] = None,
678 max_workers: int = 32,
679 look_ahead: int = 2,
680 include_url_prefix: bool = False,
681 follow_symlinks: bool = True,
682 patterns: Optional[PatternList] = None,
683 ) -> Iterator[ObjectMetadata]:
684 """
685 List files recursively in the storage provider under the specified path.
686
687 :param path: The directory or file path to list objects under. This should be a
688 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
689 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
690 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
691 :param max_workers: Maximum concurrent workers for provider-level recursive listing.
692 :param look_ahead: Prefixes to buffer per worker for provider-level recursive listing.
693 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
694 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing.
695 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
696 :return: An iterator over ObjectMetadata for matching files.
697 """
698 pattern_matcher = PatternMatcher(patterns) if patterns else None
699
700 single_file, effective_path = self._resolve_single_file(
701 path, start_after, end_at, include_url_prefix, pattern_matcher
702 )
703 if single_file is not None:
704 yield single_file
705 return
706 if effective_path is None:
707 return
708
709 if self._metadata_provider:
710 objects = self._metadata_provider.list_objects(
711 effective_path,
712 start_after=start_after,
713 end_at=end_at,
714 include_directories=False,
715 )
716 else:
717 objects = self._storage_provider.list_objects_recursive(
718 effective_path,
719 start_after=start_after,
720 end_at=end_at,
721 max_workers=max_workers,
722 look_ahead=look_ahead,
723 follow_symlinks=follow_symlinks,
724 )
725
726 yield from self._filter_and_decorate(objects, include_url_prefix, pattern_matcher)
727
[docs]
728 def open(
729 self,
730 path: str,
731 mode: str = "rb",
732 buffering: int = -1,
733 encoding: Optional[str] = None,
734 disable_read_cache: bool = False,
735 memory_load_limit: int = MEMORY_LOAD_LIMIT,
736 atomic: bool = True,
737 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
738 attributes: Optional[dict[str, str]] = None,
739 prefetch_file: bool = True,
740 ) -> Union[PosixFile, ObjectFile]:
741 """
742 Open a file for reading or writing.
743
744 :param path: The logical path of the object to open.
745 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab".
746 :param buffering: The buffering mode. Only applies to PosixFile.
747 :param encoding: The encoding to use for text files.
748 :param disable_read_cache: When set to ``True``, disables caching for file content.
749 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
750 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB.
751 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB.
752 :param atomic: When set to ``True``, file will be written atomically (rename upon close).
753 This parameter is only applicable to PosixFile in write mode.
754 :param check_source_version: Whether to check the source version of cached objects.
755 :param attributes: Attributes to add to the file.
756 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None.
757 :param prefetch_file: Whether to prefetch the file content.
758 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True.
759 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
760 :raises FileNotFoundError: If the file does not exist (read mode).
761 """
762 if self._is_posix_file_storage_provider():
763 return PosixFile(
764 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes
765 )
766 else:
767 if atomic is False:
768 logger.warning("Non-atomic writes are not supported for object storage providers.")
769
770 return ObjectFile(
771 self,
772 remote_path=path,
773 mode=mode,
774 encoding=encoding,
775 disable_read_cache=disable_read_cache,
776 memory_load_limit=memory_load_limit,
777 check_source_version=check_source_version,
778 attributes=attributes,
779 prefetch_file=prefetch_file,
780 )
781
[docs]
782 def get_posix_path(self, path: str) -> Optional[str]:
783 """
784 Returns the physical POSIX filesystem path for POSIX storage providers.
785
786 :param path: The path to resolve (may be a symlink or virtual path).
787 :return: Physical POSIX filesystem path if POSIX storage, None otherwise.
788 """
789 if not self._is_posix_file_storage_provider():
790 return None
791
792 if self._metadata_provider:
793 resolved = self._metadata_provider.realpath(path)
794 realpath = resolved.physical_path
795 else:
796 realpath = path
797
798 return cast(PosixFileStorageProvider, self._storage_provider)._prepend_base_path(realpath)
799
[docs]
800 def is_file(self, path: str) -> bool:
801 """
802 Checks whether the specified path points to a file (rather than a folder or directory).
803
804 :param path: The logical path to check.
805 :return: ``True`` if the key points to a file, ``False`` otherwise.
806 """
807 if self._metadata_provider:
808 resolved = self._metadata_provider.realpath(path)
809 return resolved.exists
810
811 return self._storage_provider.is_file(path)
812
832
[docs]
833 def is_empty(self, path: str) -> bool:
834 """
835 Check whether the specified path is empty. A path is considered empty if there are no
836 objects whose keys start with the given path as a prefix.
837
838 :param path: The logical path to check (typically a directory or folder prefix).
839 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
840 """
841 if self._metadata_provider:
842 objects = self._metadata_provider.list_objects(path)
843 else:
844 objects = self._storage_provider.list_objects(path)
845
846 try:
847 return next(objects) is None
848 except StopIteration:
849 pass
850
851 return True
852
[docs]
853 def sync_from(
854 self,
855 source_client: AbstractStorageClient,
856 source_path: str = "",
857 target_path: str = "",
858 delete_unmatched_files: bool = False,
859 description: str = "Syncing",
860 num_worker_processes: Optional[int] = None,
861 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
862 patterns: Optional[PatternList] = None,
863 preserve_source_attributes: bool = False,
864 follow_symlinks: bool = True,
865 source_files: Optional[list[str]] = None,
866 ignore_hidden: bool = True,
867 commit_metadata: bool = True,
868 dryrun: bool = False,
869 dryrun_output_path: Optional[str] = None,
870 ) -> SyncResult:
871 """
872 Syncs files from the source storage client to "path/".
873
874 :param source_client: The source storage client.
875 :param source_path: The logical path to sync from.
876 :param target_path: The logical path to sync to.
877 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
878 :param description: Description of sync process for logging purposes.
879 :param num_worker_processes: The number of worker processes to use.
880 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
881 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
882 Cannot be used together with source_files.
883 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization.
884 When ``False`` (default), only file content is copied. When ``True``, custom metadata attributes are also preserved.
885
886 .. warning::
887 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD
888 request for each object to retrieve attributes, which can significantly impact performance on large-scale
889 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile.
890
891 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is ``True``.
892 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these
893 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns.
894 :param ignore_hidden: Whether to ignore hidden files and directories. Default is ``True``.
895 :param commit_metadata: When ``True`` (default), calls :py:meth:`StorageClient.commit_metadata` after sync completes.
896 Set to ``False`` to skip the commit, allowing batching of multiple sync operations before committing manually.
897 :param dryrun: If ``True``, only enumerate and compare objects without performing any copy/delete operations.
898 The returned :py:class:`SyncResult` will include a :py:class:`DryrunResult` with paths to JSONL files.
899 :param dryrun_output_path: Directory to write dryrun JSONL files into. If ``None`` (default), a temporary
900 directory is created automatically. Ignored when ``dryrun`` is ``False``.
901 :raises ValueError: If both source_files and patterns are provided.
902 :raises RuntimeError: If errors occur during sync operations. The sync will stop on first error (fail-fast).
903 """
904 if source_files and patterns:
905 raise ValueError("Cannot specify both 'source_files' and 'patterns'. Please use only one filtering method.")
906
907 pattern_matcher = PatternMatcher(patterns) if patterns else None
908
909 # Disable the replica manager during sync
910 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager:
911 # Import here to avoid circular dependency
912 from .client import StorageClient as StorageClientFacade
913
914 source_client = StorageClientFacade(source_client._config)
915 source_client._replica_manager = None
916
917 m = SyncManager(source_client, source_path, self, target_path)
918 batch_size = int(os.environ.get("MSC_SYNC_BATCH_SIZE", DEFAULT_SYNC_BATCH_SIZE))
919
920 return m.sync_objects(
921 execution_mode=execution_mode,
922 description=description,
923 num_worker_processes=num_worker_processes,
924 delete_unmatched_files=delete_unmatched_files,
925 pattern_matcher=pattern_matcher,
926 preserve_source_attributes=preserve_source_attributes,
927 follow_symlinks=follow_symlinks,
928 source_files=source_files,
929 ignore_hidden=ignore_hidden,
930 commit_metadata=commit_metadata,
931 batch_size=batch_size,
932 dryrun=dryrun,
933 dryrun_output_path=dryrun_output_path,
934 )
935
[docs]
936 def sync_replicas(
937 self,
938 source_path: str,
939 replica_indices: Optional[list[int]] = None,
940 delete_unmatched_files: bool = False,
941 description: str = "Syncing replica",
942 num_worker_processes: Optional[int] = None,
943 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
944 patterns: Optional[PatternList] = None,
945 ignore_hidden: bool = True,
946 ) -> None:
947 """
948 Sync files from this client to its replica storage clients.
949
950 :param source_path: The logical path to sync from.
951 :param replica_indices: Specific replica indices to sync to (0-indexed). If None, syncs to all replicas.
952 :param delete_unmatched_files: When set to ``True``, delete files in replicas that don't exist in source.
953 :param description: Description of sync process for logging purposes.
954 :param num_worker_processes: Number of worker processes for parallel sync.
955 :param execution_mode: Execution mode (LOCAL or REMOTE).
956 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
957 :param ignore_hidden: When set to ``True``, ignore hidden files (starting with '.'). Defaults to ``True``.
958 """
959 if not self._replicas:
960 logger.warning(
961 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable "
962 "secondary storage locations for redundancy and performance.",
963 self._config.profile,
964 )
965 return None
966
967 if replica_indices:
968 try:
969 replicas = [self._replicas[i] for i in replica_indices]
970 except IndexError as e:
971 raise ValueError(f"Replica index out of range: {replica_indices}") from e
972 else:
973 replicas = self._replicas
974
975 # Disable the replica manager during sync
976 if self._replica_manager:
977 # Import here to avoid circular dependency
978 from .client import StorageClient as StorageClientFacade
979
980 source_client = StorageClientFacade(self._config)
981 source_client._replica_manager = None
982 else:
983 source_client = self
984
985 for replica in replicas:
986 replica.sync_from(
987 source_client,
988 source_path,
989 source_path,
990 delete_unmatched_files=delete_unmatched_files,
991 description=f"{description} ({replica.profile})",
992 num_worker_processes=num_worker_processes,
993 execution_mode=execution_mode,
994 patterns=patterns,
995 ignore_hidden=ignore_hidden,
996 )
997
[docs]
998 def list(
999 self,
1000 prefix: str = "",
1001 path: str = "",
1002 start_after: Optional[str] = None,
1003 end_at: Optional[str] = None,
1004 include_directories: bool = False,
1005 include_url_prefix: bool = False,
1006 attribute_filter_expression: Optional[str] = None,
1007 show_attributes: bool = False,
1008 follow_symlinks: bool = True,
1009 patterns: Optional[PatternList] = None,
1010 ) -> Iterator[ObjectMetadata]:
1011 """
1012 List objects in the storage provider under the specified path.
1013
1014 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is
1015 deprecated and will be removed in a future version.
1016
1017 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under.
1018 :param path: The directory or file path to list objects under. This should be a
1019 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
1020 Cannot be used together with ``prefix``.
1021 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
1022 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
1023 :param include_directories: Whether to include directories in the result. When ``True``, directories are returned alongside objects.
1024 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
1025 :param attribute_filter_expression: The attribute filter expression to apply to the result.
1026 :param show_attributes: Whether to return attributes in the result. WARNING: Depending on implementation, there may be a performance impact if this is set to ``True``.
1027 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing.
1028 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
1029 :return: An iterator over ObjectMetadata for matching objects.
1030 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty).
1031 """
1032 # Parameter validation - either path or prefix, not both
1033 if path and prefix:
1034 raise ValueError(
1035 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). "
1036 f"Please use only the 'path' parameter for new code. "
1037 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
1038 )
1039 elif prefix:
1040 logger.debug(
1041 f"The 'prefix' parameter is deprecated and will be removed in a future version. "
1042 f"Please use the 'path' parameter instead. "
1043 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
1044 )
1045
1046 pattern_matcher = PatternMatcher(patterns) if patterns else None
1047 effective_path = path if path else prefix
1048
1049 single_file, effective_path = self._resolve_single_file(
1050 effective_path, start_after, end_at, include_url_prefix, pattern_matcher
1051 )
1052 if single_file is not None:
1053 yield single_file
1054 return
1055 if effective_path is None:
1056 return
1057
1058 if self._metadata_provider:
1059 objects = self._metadata_provider.list_objects(
1060 effective_path,
1061 start_after=start_after,
1062 end_at=end_at,
1063 include_directories=include_directories,
1064 attribute_filter_expression=attribute_filter_expression,
1065 show_attributes=show_attributes,
1066 )
1067 else:
1068 objects = self._storage_provider.list_objects(
1069 effective_path,
1070 start_after=start_after,
1071 end_at=end_at,
1072 include_directories=include_directories,
1073 attribute_filter_expression=attribute_filter_expression,
1074 show_attributes=show_attributes,
1075 follow_symlinks=follow_symlinks,
1076 )
1077
1078 yield from self._filter_and_decorate(objects, include_url_prefix, pattern_matcher)
1079
[docs]
1080 def generate_presigned_url(
1081 self,
1082 path: str,
1083 *,
1084 method: str = "GET",
1085 signer_type: Optional[SignerType] = None,
1086 signer_options: Optional[dict[str, Any]] = None,
1087 ) -> str:
1088 return self._storage_provider.generate_presigned_url(
1089 path, method=method, signer_type=signer_type, signer_options=signer_options
1090 )