1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import contextlib
18import logging
19import os
20import threading
21from collections.abc import Iterator
22from datetime import datetime, timezone
23from io import BytesIO
24from pathlib import PurePosixPath
25from typing import IO, Any, Optional, Union, cast
26
27from ..config import StorageClientConfig
28from ..constants import DEFAULT_SYNC_BATCH_SIZE, MEMORY_LOAD_LIMIT
29from ..file import ObjectFile, PosixFile
30from ..providers.posix_file import PosixFileStorageProvider
31from ..replica_manager import ReplicaManager
32from ..retry import retry
33from ..sync import SyncManager
34from ..types import (
35 AWARE_DATETIME_MIN,
36 MSC_PROTOCOL,
37 ExecutionMode,
38 ObjectMetadata,
39 PatternList,
40 Range,
41 Replica,
42 ResolvedPathState,
43 SourceVersionCheckMode,
44 StorageProvider,
45 SyncResult,
46)
47from ..utils import NullStorageClient, PatternMatcher, join_paths
48from .types import AbstractStorageClient
49
50logger = logging.getLogger(__name__)
51
52
[docs]
53class SingleStorageClient(AbstractStorageClient):
54 """
55 Storage client for single-backend configurations.
56
57 Supports full read and write operations against a single storage provider.
58 """
59
60 _config: StorageClientConfig
61 _storage_provider: StorageProvider
62 _metadata_provider_lock: Optional[threading.Lock] = None
63 _stop_event: Optional[threading.Event] = None
64 _replica_manager: Optional[ReplicaManager] = None
65
66 def __init__(self, config: StorageClientConfig):
67 """
68 Initialize the :py:class:`SingleStorageClient` with the given configuration.
69
70 :param config: Storage client configuration with storage_provider set
71 :raises ValueError: If config has storage_provider_profiles (multi-backend)
72 """
73 self._initialize_providers(config)
74 self._initialize_replicas(config.replicas)
75
76 def _initialize_providers(self, config: StorageClientConfig) -> None:
77 if config.storage_provider_profiles:
78 raise ValueError(
79 "SingleStorageClient requires storage_provider, not storage_provider_profiles. "
80 "Use CompositeStorageClient for multi-backend configurations."
81 )
82
83 if config.storage_provider is None:
84 raise ValueError("SingleStorageClient requires storage_provider to be set.")
85
86 self._config = config
87 self._credentials_provider = self._config.credentials_provider
88 self._storage_provider = cast(StorageProvider, self._config.storage_provider)
89 self._metadata_provider = self._config.metadata_provider
90 self._cache_config = self._config.cache_config
91 self._retry_config = self._config.retry_config
92 self._cache_manager = self._config.cache_manager
93 self._autocommit_config = self._config.autocommit_config
94
95 if self._autocommit_config:
96 if self._metadata_provider:
97 logger.debug("Creating auto-commiter thread")
98
99 if self._autocommit_config.interval_minutes:
100 self._stop_event = threading.Event()
101 self._commit_thread = threading.Thread(
102 target=self._committer_thread,
103 daemon=True,
104 args=(self._autocommit_config.interval_minutes, self._stop_event),
105 )
106 self._commit_thread.start()
107
108 if self._autocommit_config.at_exit:
109 atexit.register(self._commit_on_exit)
110
111 self._metadata_provider_lock = threading.Lock()
112 else:
113 logger.debug("No metadata provider configured, auto-commit will not be enabled")
114
115 def _initialize_replicas(self, replicas: list[Replica]) -> None:
116 """Initialize replica StorageClient instances (facade)."""
117 # Import here to avoid circular dependency
118 from .client import StorageClient as StorageClientFacade
119
120 # Sort replicas by read_priority, the first one is the primary replica.
121 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority)
122
123 replica_clients = []
124 for replica in sorted_replicas:
125 if self._config._config_dict is None:
126 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config")
127 replica_config = StorageClientConfig.from_dict(
128 config_dict=self._config._config_dict, profile=replica.replica_profile
129 )
130
131 storage_client = StorageClientFacade(config=replica_config)
132 replica_clients.append(storage_client)
133
134 self._replicas = replica_clients
135 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None
136
137 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event):
138 if not stop_event:
139 raise RuntimeError("Stop event not set")
140
141 while not stop_event.is_set():
142 # Wait with the ability to exit early
143 if stop_event.wait(timeout=commit_interval_minutes * 60):
144 break
145 logger.debug("Auto-committing to metadata provider")
146 self.commit_metadata()
147
148 def _commit_on_exit(self):
149 logger.debug("Shutting down, committing metadata one last time...")
150 self.commit_metadata()
151
152 def _get_source_version(self, path: str) -> Optional[str]:
153 """
154 Get etag from metadata provider or storage provider.
155 """
156 if self._metadata_provider:
157 metadata = self._metadata_provider.get_object_metadata(path)
158 else:
159 metadata = self._storage_provider.get_object_metadata(path)
160 return metadata.etag
161
162 def _is_cache_enabled(self) -> bool:
163 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider()
164 return enabled
165
166 def _is_posix_file_storage_provider(self) -> bool:
167 """
168 :return: ``True`` if the storage client is using a POSIX file storage provider, ``False`` otherwise.
169 """
170 return isinstance(self._storage_provider, PosixFileStorageProvider)
171
172 def _is_rust_client_enabled(self) -> bool:
173 """
174 :return: ``True`` if the storage provider is using the Rust client, ``False`` otherwise.
175 """
176 return getattr(self._storage_provider, "_rust_client", None) is not None
177
178 def _read_from_replica_or_primary(self, path: str) -> bytes:
179 """
180 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files.
181 """
182 if self._replica_manager is None:
183 raise RuntimeError("Replica manager is not initialized")
184 file_obj = BytesIO()
185 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider)
186 return file_obj.getvalue()
187
188 def __del__(self):
189 if self._stop_event:
190 self._stop_event.set()
191 if self._commit_thread.is_alive():
192 self._commit_thread.join(timeout=5.0)
193
194 def __getstate__(self) -> dict[str, Any]:
195 state = self.__dict__.copy()
196 del state["_credentials_provider"]
197 del state["_storage_provider"]
198 del state["_metadata_provider"]
199 del state["_cache_manager"]
200
201 if "_metadata_provider_lock" in state:
202 del state["_metadata_provider_lock"]
203
204 if "_replicas" in state:
205 del state["_replicas"]
206
207 # Replica manager could be disabled if it's set to None in the state.
208 if "_replica_manager" in state:
209 if state["_replica_manager"] is not None:
210 del state["_replica_manager"]
211
212 return state
213
214 def __setstate__(self, state: dict[str, Any]) -> None:
215 config = state["_config"]
216 self._initialize_providers(config)
217
218 # Replica manager could be disabled if it's set to None in the state.
219 if "_replica_manager" in state and state["_replica_manager"] is None:
220 self._replica_manager = None
221 else:
222 self._initialize_replicas(config.replicas)
223
224 if self._metadata_provider:
225 self._metadata_provider_lock = threading.Lock()
226
227 @property
228 def profile(self) -> str:
229 """
230 :return: The profile name of the storage client.
231 """
232 return self._config.profile
233
[docs]
234 def is_default_profile(self) -> bool:
235 """
236 :return: ``True`` if the storage client is using the reserved POSIX profile, ``False`` otherwise.
237 """
238 return self._config.profile == "__filesystem__"
239
240 @property
241 def replicas(self) -> list[AbstractStorageClient]:
242 """
243 :return: List of replica storage clients, sorted by read priority.
244 """
245 return self._replicas
246
247 @retry
248 def read(
249 self,
250 path: str,
251 byte_range: Optional[Range] = None,
252 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
253 ) -> bytes:
254 """
255 Read bytes from a file at the specified logical path.
256
257 :param path: The logical path of the object to read.
258 :param byte_range: Optional byte range to read (offset and length).
259 :param check_source_version: Whether to check the source version of cached objects.
260 :return: The content of the object as bytes.
261 :raises FileNotFoundError: If the file at the specified path does not exist.
262 """
263 if self._metadata_provider:
264 resolved = self._metadata_provider.realpath(path)
265 if not resolved.exists:
266 raise FileNotFoundError(f"The file at path '{path}' was not found.")
267 path = resolved.physical_path
268
269 # Handle caching logic
270 if self._is_cache_enabled() and self._cache_manager:
271 if byte_range:
272 # Range request with cache
273 try:
274 # Fetch metadata for source version checking (if needed)
275 metadata = None
276 source_version = None
277 if check_source_version == SourceVersionCheckMode.ENABLE:
278 metadata = self._storage_provider.get_object_metadata(path)
279 source_version = metadata.etag
280 elif check_source_version == SourceVersionCheckMode.INHERIT:
281 if self._cache_manager.check_source_version():
282 metadata = self._storage_provider.get_object_metadata(path)
283 source_version = metadata.etag
284
285 # Optimization: For full-file reads (offset=0, size >= file_size), cache whole file instead of chunking
286 # This avoids creating many small chunks when the user requests the entire file.
287 # Only apply this optimization when metadata is already available (i.e., when version checking is enabled),
288 # to respect the user's choice to disable version checking and avoid extra HEAD requests.
289 if byte_range.offset == 0 and metadata and byte_range.size >= metadata.content_length:
290 full_file_data = self._storage_provider.get_object(path)
291 self._cache_manager.set(path, full_file_data, source_version)
292 return full_file_data[: metadata.content_length]
293
294 # Use chunk-based caching for partial reads or when optimization doesn't apply
295 data = self._cache_manager.read(
296 key=path,
297 source_version=source_version,
298 byte_range=byte_range,
299 storage_provider=self._storage_provider,
300 )
301 if data is not None:
302 return data
303 # Fallback (should not normally happen)
304 return self._storage_provider.get_object(path, byte_range=byte_range)
305 except (FileNotFoundError, Exception):
306 # Fall back to direct read if metadata fetching fails
307 return self._storage_provider.get_object(path, byte_range=byte_range)
308 else:
309 # Full file read with cache
310 # Only fetch source version if check_source_version is enabled
311 source_version = None
312 if check_source_version == SourceVersionCheckMode.ENABLE:
313 source_version = self._get_source_version(path)
314 elif check_source_version == SourceVersionCheckMode.INHERIT:
315 if self._cache_manager.check_source_version():
316 source_version = self._get_source_version(path)
317
318 data = self._cache_manager.read(path, source_version)
319 if data is None:
320 if self._replica_manager:
321 data = self._read_from_replica_or_primary(path)
322 else:
323 data = self._storage_provider.get_object(path)
324 self._cache_manager.set(path, data, source_version)
325 return data
326 elif self._replica_manager:
327 # No cache, but replicas available
328 return self._read_from_replica_or_primary(path)
329 else:
330 # No cache, no replicas - direct storage provider read
331 return self._storage_provider.get_object(path, byte_range=byte_range)
332
[docs]
333 def info(self, path: str, strict: bool = True) -> ObjectMetadata:
334 """
335 Get metadata for a file at the specified path.
336
337 :param path: The logical path of the object.
338 :param strict: When ``True``, only return committed metadata. When ``False``, include pending changes.
339 :return: ObjectMetadata containing file information (size, last modified, etc.).
340 :raises FileNotFoundError: If the file at the specified path does not exist.
341 """
342 if not path or path == ".": # empty path or '.' provided by the user
343 if self._is_posix_file_storage_provider():
344 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc)
345 else:
346 last_modified = AWARE_DATETIME_MIN
347 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified)
348
349 if not self._metadata_provider:
350 return self._storage_provider.get_object_metadata(path, strict=strict)
351
352 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory.
353 # TODO: Consider passing strict argument to the metadata provider.
354 try:
355 return self._metadata_provider.get_object_metadata(path)
356 except FileNotFoundError:
357 # Try listing from the parent to determine if path is a valid directory
358 parent = os.path.dirname(path.rstrip("/")) + "/"
359 parent = "" if parent == "/" else parent
360 target = path.rstrip("/") + "/"
361
362 try:
363 entries = self._metadata_provider.list_objects(parent, include_directories=True)
364 for entry in entries:
365 if entry.key == target and entry.type == "directory":
366 return entry
367 except Exception:
368 pass
369 raise # Raise original FileNotFoundError
370
371 @retry
372 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None:
373 """
374 Download a remote file to a local path or file-like object.
375
376 :param remote_path: The logical path of the remote file to download.
377 :param local_path: The local file path or file-like object to write to.
378 :raises FileNotFoundError: If the remote file does not exist.
379 """
380 if self._metadata_provider:
381 resolved = self._metadata_provider.realpath(remote_path)
382 if not resolved.exists:
383 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.")
384
385 metadata = self._metadata_provider.get_object_metadata(remote_path)
386 self._storage_provider.download_file(resolved.physical_path, local_path, metadata)
387 elif self._replica_manager:
388 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider)
389 else:
390 self._storage_provider.download_file(remote_path, local_path)
391
392 @retry
393 def upload_file(
394 self, remote_path: str, local_path: Union[str, IO], attributes: Optional[dict[str, str]] = None
395 ) -> None:
396 """
397 Uploads a file from the local file system to the storage provider.
398
399 :param remote_path: The path where the object will be stored.
400 :param local_path: The source file to upload. This can either be a string representing the local
401 file path, or a file-like object (e.g., an open file handle).
402 :param attributes: The attributes to add to the file if a new file is created.
403 """
404 virtual_path = remote_path
405 if self._metadata_provider:
406 resolved = self._metadata_provider.realpath(remote_path)
407 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED):
408 # File exists or has been deleted
409 if not self._metadata_provider.allow_overwrites():
410 raise FileExistsError(
411 f"The file at path '{virtual_path}' already exists; "
412 f"overwriting is not allowed when using a metadata provider."
413 )
414 # Generate path for overwrite (future: may return different path for versioning)
415 remote_path = self._metadata_provider.generate_physical_path(
416 remote_path, for_overwrite=True
417 ).physical_path
418 else:
419 # New file - generate path
420 remote_path = self._metadata_provider.generate_physical_path(
421 remote_path, for_overwrite=False
422 ).physical_path
423
424 # if metadata provider is present, we only write attributes to the metadata provider
425 self._storage_provider.upload_file(remote_path, local_path, attributes=None)
426
427 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
428 obj_metadata = self._storage_provider.get_object_metadata(remote_path)
429 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
430 with self._metadata_provider_lock or contextlib.nullcontext():
431 self._metadata_provider.add_file(virtual_path, obj_metadata)
432 else:
433 self._storage_provider.upload_file(remote_path, local_path, attributes)
434
435 @retry
436 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None:
437 """
438 Write bytes to a file at the specified path.
439
440 :param path: The logical path where the object will be written.
441 :param body: The content to write as bytes.
442 :param attributes: Optional attributes to add to the file.
443 """
444 virtual_path = path
445 if self._metadata_provider:
446 resolved = self._metadata_provider.realpath(path)
447 if resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED):
448 # File exists or has been deleted
449 if not self._metadata_provider.allow_overwrites():
450 raise FileExistsError(
451 f"The file at path '{virtual_path}' already exists; "
452 f"overwriting is not allowed when using a metadata provider."
453 )
454 # Generate path for overwrite (future: may return different path for versioning)
455 path = self._metadata_provider.generate_physical_path(path, for_overwrite=True).physical_path
456 else:
457 # New file - generate path
458 path = self._metadata_provider.generate_physical_path(path, for_overwrite=False).physical_path
459
460 # if metadata provider is present, we only write attributes to the metadata provider
461 self._storage_provider.put_object(path, body, attributes=None)
462
463 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
464 obj_metadata = self._storage_provider.get_object_metadata(path)
465 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
466 with self._metadata_provider_lock or contextlib.nullcontext():
467 self._metadata_provider.add_file(virtual_path, obj_metadata)
468 else:
469 self._storage_provider.put_object(path, body, attributes=attributes)
470
[docs]
471 def copy(self, src_path: str, dest_path: str) -> None:
472 """
473 Copy a file from source path to destination path.
474
475 :param src_path: The logical path of the source object.
476 :param dest_path: The logical path where the object will be copied to.
477 :raises FileNotFoundError: If the source file does not exist.
478 """
479 virtual_dest_path = dest_path
480 if self._metadata_provider:
481 # Source: must exist
482 src_resolved = self._metadata_provider.realpath(src_path)
483 if not src_resolved.exists:
484 raise FileNotFoundError(f"The file at path '{src_path}' was not found.")
485 src_path = src_resolved.physical_path
486
487 # Destination: check for overwrites
488 dest_resolved = self._metadata_provider.realpath(dest_path)
489 if dest_resolved.state in (ResolvedPathState.EXISTS, ResolvedPathState.DELETED):
490 # Destination exists or has been deleted
491 if not self._metadata_provider.allow_overwrites():
492 raise FileExistsError(
493 f"The file at path '{virtual_dest_path}' already exists; "
494 f"overwriting is not allowed when using a metadata provider."
495 )
496 # Generate path for overwrite
497 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=True).physical_path
498 else:
499 # New file - generate path
500 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=False).physical_path
501
502 self._storage_provider.copy_object(src_path, dest_path)
503
504 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
505 obj_metadata = self._storage_provider.get_object_metadata(dest_path)
506 with self._metadata_provider_lock or contextlib.nullcontext():
507 self._metadata_provider.add_file(virtual_dest_path, obj_metadata)
508 else:
509 self._storage_provider.copy_object(src_path, dest_path)
510
[docs]
511 def delete(self, path: str, recursive: bool = False) -> None:
512 """
513 Deletes an object at the specified path.
514
515 :param path: The logical path of the object or directory to delete.
516 :param recursive: Whether to delete objects in the path recursively.
517 """
518 obj_metadata = self.info(path)
519 is_dir = obj_metadata and obj_metadata.type == "directory"
520 is_file = obj_metadata and obj_metadata.type == "file"
521 if recursive and is_dir:
522 self.sync_from(
523 cast(AbstractStorageClient, NullStorageClient()),
524 path,
525 path,
526 delete_unmatched_files=True,
527 num_worker_processes=1,
528 description="Deleting",
529 )
530 # If this is a posix storage provider, we need to also delete remaining directory stubs.
531 # TODO: Notify metadata provider of the changes.
532 if self._is_posix_file_storage_provider():
533 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider)
534 posix_storage_provider.rmtree(path)
535 return
536 else:
537 # 1) If path is a file: delete the file
538 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag
539 if is_file:
540 virtual_path = path
541 if self._metadata_provider:
542 resolved = self._metadata_provider.realpath(path)
543 if not resolved.exists:
544 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.")
545
546 # Check if soft-delete is enabled
547 if not self._metadata_provider.should_use_soft_delete():
548 # Hard delete: remove both physical file and metadata
549 self._storage_provider.delete_object(resolved.physical_path)
550
551 with self._metadata_provider_lock or contextlib.nullcontext():
552 self._metadata_provider.remove_file(virtual_path)
553 else:
554 self._storage_provider.delete_object(path)
555
556 # Delete the cached file if it exists
557 if self._is_cache_enabled():
558 if self._cache_manager is None:
559 raise RuntimeError("Cache manager is not initialized")
560 self._cache_manager.delete(virtual_path)
561
562 # Delete from replicas if replica manager exists
563 if self._replica_manager:
564 self._replica_manager.delete_from_replicas(virtual_path)
565 elif is_dir:
566 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.")
567 else:
568 raise FileNotFoundError(f"The file at '{path}' was not found.")
569
[docs]
570 def delete_many(self, paths: list[str]) -> None:
571 """
572 Delete multiple files at the specified paths. Only files are supported; directories are not deleted.
573
574 :param paths: List of logical paths of the files to delete.
575 """
576 physical_paths_to_delete: list[str] = []
577 for path in paths:
578 if self._metadata_provider:
579 resolved = self._metadata_provider.realpath(path)
580 if not resolved.exists:
581 raise FileNotFoundError(f"The file at path '{path}' was not found.")
582 if not self._metadata_provider.should_use_soft_delete():
583 physical_paths_to_delete.append(resolved.physical_path)
584 else:
585 physical_paths_to_delete.append(path)
586
587 if physical_paths_to_delete:
588 self._storage_provider.delete_objects(physical_paths_to_delete)
589
590 for path in paths:
591 virtual_path = path
592 if self._metadata_provider:
593 with self._metadata_provider_lock or contextlib.nullcontext():
594 self._metadata_provider.remove_file(virtual_path)
595 if self._is_cache_enabled():
596 if self._cache_manager is None:
597 raise RuntimeError("Cache manager is not initialized")
598 self._cache_manager.delete(virtual_path)
599 if self._replica_manager:
600 self._replica_manager.delete_from_replicas(virtual_path)
601
[docs]
602 def glob(
603 self,
604 pattern: str,
605 include_url_prefix: bool = False,
606 attribute_filter_expression: Optional[str] = None,
607 ) -> list[str]:
608 """
609 Matches and retrieves a list of object keys in the storage provider that match the specified pattern.
610
611 :param pattern: The pattern to match object keys against, supporting wildcards (e.g., ``*.txt``).
612 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
613 :param attribute_filter_expression: The attribute filter expression to apply to the result.
614 :return: A list of object paths that match the specified pattern.
615 """
616 if self._metadata_provider:
617 results = self._metadata_provider.glob(pattern, attribute_filter_expression)
618 else:
619 results = self._storage_provider.glob(pattern, attribute_filter_expression)
620
621 if include_url_prefix:
622 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results]
623
624 return results
625
626 def _resolve_single_file(
627 self,
628 path: str,
629 start_after: Optional[str],
630 end_at: Optional[str],
631 include_url_prefix: bool,
632 pattern_matcher: Optional[PatternMatcher],
633 ) -> tuple[Optional[ObjectMetadata], Optional[str]]:
634 """
635 Resolve whether ``path`` should be handled as a single-file listing result.
636
637 :param path: Candidate file path or directory prefix to resolve.
638 :param start_after: Exclusive lower bound for file key filtering.
639 :param end_at: Inclusive upper bound for file key filtering.
640 :param include_url_prefix: Whether to prefix returned keys with ``msc://profile``.
641 :param pattern_matcher: Optional include/exclude matcher for file keys.
642 :return: A tuple of ``(single_file, normalized_path)``. Returns file metadata and
643 the original path when ``path`` resolves to a file that passes filters;
644 returns ``(None, normalized_directory_path)`` when the caller should
645 continue with directory listing; returns ``(None, None)`` when filtering
646 excludes the single-file candidate and listing should stop.
647 """
648 if not path:
649 return None, path
650
651 if self.is_file(path):
652 if pattern_matcher and not pattern_matcher.should_include_file(path):
653 return None, None
654
655 try:
656 object_metadata = self.info(path)
657 if start_after and object_metadata.key <= start_after:
658 return None, None
659 if end_at and object_metadata.key > end_at:
660 return None, None
661 if include_url_prefix:
662 self._prepend_url_prefix(object_metadata)
663 return object_metadata, path
664 except FileNotFoundError:
665 return None, path.rstrip("/") + "/"
666 else:
667 return None, path.rstrip("/") + "/"
668
669 def _prepend_url_prefix(self, obj: ObjectMetadata) -> None:
670 if self.is_default_profile():
671 obj.key = str(PurePosixPath("/") / obj.key)
672 else:
673 obj.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", obj.key)
674
675 def _filter_and_decorate(
676 self,
677 objects: Iterator[ObjectMetadata],
678 include_url_prefix: bool,
679 pattern_matcher: Optional[PatternMatcher],
680 ) -> Iterator[ObjectMetadata]:
681 for obj in objects:
682 if pattern_matcher and not pattern_matcher.should_include_file(obj.key):
683 continue
684 if include_url_prefix:
685 self._prepend_url_prefix(obj)
686 yield obj
687
[docs]
688 def list_recursive(
689 self,
690 path: str = "",
691 start_after: Optional[str] = None,
692 end_at: Optional[str] = None,
693 max_workers: int = 32,
694 look_ahead: int = 2,
695 include_url_prefix: bool = False,
696 follow_symlinks: bool = True,
697 patterns: Optional[PatternList] = None,
698 ) -> Iterator[ObjectMetadata]:
699 """
700 List files recursively in the storage provider under the specified path.
701
702 :param path: The directory or file path to list objects under. This should be a
703 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
704 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
705 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
706 :param max_workers: Maximum concurrent workers for provider-level recursive listing.
707 :param look_ahead: Prefixes to buffer per worker for provider-level recursive listing.
708 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
709 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing.
710 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
711 :return: An iterator over ObjectMetadata for matching files.
712 """
713 pattern_matcher = PatternMatcher(patterns) if patterns else None
714
715 single_file, effective_path = self._resolve_single_file(
716 path, start_after, end_at, include_url_prefix, pattern_matcher
717 )
718 if single_file is not None:
719 yield single_file
720 return
721 if effective_path is None:
722 return
723
724 if self._metadata_provider:
725 objects = self._metadata_provider.list_objects(
726 effective_path,
727 start_after=start_after,
728 end_at=end_at,
729 include_directories=False,
730 )
731 else:
732 objects = self._storage_provider.list_objects_recursive(
733 effective_path,
734 start_after=start_after,
735 end_at=end_at,
736 max_workers=max_workers,
737 look_ahead=look_ahead,
738 follow_symlinks=follow_symlinks,
739 )
740
741 yield from self._filter_and_decorate(objects, include_url_prefix, pattern_matcher)
742
[docs]
743 def open(
744 self,
745 path: str,
746 mode: str = "rb",
747 buffering: int = -1,
748 encoding: Optional[str] = None,
749 disable_read_cache: bool = False,
750 memory_load_limit: int = MEMORY_LOAD_LIMIT,
751 atomic: bool = True,
752 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
753 attributes: Optional[dict[str, str]] = None,
754 prefetch_file: bool = True,
755 ) -> Union[PosixFile, ObjectFile]:
756 """
757 Open a file for reading or writing.
758
759 :param path: The logical path of the object to open.
760 :param mode: The file mode. Supported modes: "r", "rb", "w", "wb", "a", "ab".
761 :param buffering: The buffering mode. Only applies to PosixFile.
762 :param encoding: The encoding to use for text files.
763 :param disable_read_cache: When set to ``True``, disables caching for file content.
764 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
765 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB.
766 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to 512MB.
767 :param atomic: When set to ``True``, file will be written atomically (rename upon close).
768 This parameter is only applicable to PosixFile in write mode.
769 :param check_source_version: Whether to check the source version of cached objects.
770 :param attributes: Attributes to add to the file.
771 This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab". Defaults to None.
772 :param prefetch_file: Whether to prefetch the file content.
773 This parameter is only applicable to ObjectFile when the mode is "r" or "rb". Defaults to True.
774 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
775 :raises FileNotFoundError: If the file does not exist (read mode).
776 """
777 if self._is_posix_file_storage_provider():
778 return PosixFile(
779 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes
780 )
781 else:
782 if atomic is False:
783 logger.warning("Non-atomic writes are not supported for object storage providers.")
784
785 return ObjectFile(
786 self,
787 remote_path=path,
788 mode=mode,
789 encoding=encoding,
790 disable_read_cache=disable_read_cache,
791 memory_load_limit=memory_load_limit,
792 check_source_version=check_source_version,
793 attributes=attributes,
794 prefetch_file=prefetch_file,
795 )
796
[docs]
797 def get_posix_path(self, path: str) -> Optional[str]:
798 """
799 Returns the physical POSIX filesystem path for POSIX storage providers.
800
801 :param path: The path to resolve (may be a symlink or virtual path).
802 :return: Physical POSIX filesystem path if POSIX storage, None otherwise.
803 """
804 if not self._is_posix_file_storage_provider():
805 return None
806
807 if self._metadata_provider:
808 resolved = self._metadata_provider.realpath(path)
809 realpath = resolved.physical_path
810 else:
811 realpath = path
812
813 return cast(PosixFileStorageProvider, self._storage_provider)._prepend_base_path(realpath)
814
[docs]
815 def is_file(self, path: str) -> bool:
816 """
817 Checks whether the specified path points to a file (rather than a folder or directory).
818
819 :param path: The logical path to check.
820 :return: ``True`` if the key points to a file, ``False`` otherwise.
821 """
822 if self._metadata_provider:
823 resolved = self._metadata_provider.realpath(path)
824 return resolved.exists
825
826 return self._storage_provider.is_file(path)
827
847
[docs]
848 def is_empty(self, path: str) -> bool:
849 """
850 Check whether the specified path is empty. A path is considered empty if there are no
851 objects whose keys start with the given path as a prefix.
852
853 :param path: The logical path to check (typically a directory or folder prefix).
854 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
855 """
856 if self._metadata_provider:
857 objects = self._metadata_provider.list_objects(path)
858 else:
859 objects = self._storage_provider.list_objects(path)
860
861 try:
862 return next(objects) is None
863 except StopIteration:
864 pass
865
866 return True
867
[docs]
868 def sync_from(
869 self,
870 source_client: AbstractStorageClient,
871 source_path: str = "",
872 target_path: str = "",
873 delete_unmatched_files: bool = False,
874 description: str = "Syncing",
875 num_worker_processes: Optional[int] = None,
876 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
877 patterns: Optional[PatternList] = None,
878 preserve_source_attributes: bool = False,
879 follow_symlinks: bool = True,
880 source_files: Optional[list[str]] = None,
881 ignore_hidden: bool = True,
882 commit_metadata: bool = True,
883 ) -> SyncResult:
884 """
885 Syncs files from the source storage client to "path/".
886
887 :param source_client: The source storage client.
888 :param source_path: The logical path to sync from.
889 :param target_path: The logical path to sync to.
890 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
891 :param description: Description of sync process for logging purposes.
892 :param num_worker_processes: The number of worker processes to use.
893 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
894 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
895 Cannot be used together with source_files.
896 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization.
897 When ``False`` (default), only file content is copied. When ``True``, custom metadata attributes are also preserved.
898
899 .. warning::
900 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD
901 request for each object to retrieve attributes, which can significantly impact performance on large-scale
902 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile.
903
904 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is ``True``.
905 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these
906 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns.
907 :param ignore_hidden: Whether to ignore hidden files and directories. Default is ``True``.
908 :param commit_metadata: When ``True`` (default), calls :py:meth:`StorageClient.commit_metadata` after sync completes.
909 Set to ``False`` to skip the commit, allowing batching of multiple sync operations before committing manually.
910 :raises ValueError: If both source_files and patterns are provided.
911 :raises RuntimeError: If errors occur during sync operations. The sync will stop on first error (fail-fast).
912 """
913 if source_files and patterns:
914 raise ValueError("Cannot specify both 'source_files' and 'patterns'. Please use only one filtering method.")
915
916 pattern_matcher = PatternMatcher(patterns) if patterns else None
917
918 # Disable the replica manager during sync
919 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager:
920 # Import here to avoid circular dependency
921 from .client import StorageClient as StorageClientFacade
922
923 source_client = StorageClientFacade(source_client._config)
924 source_client._replica_manager = None
925
926 m = SyncManager(source_client, source_path, self, target_path)
927 batch_size = int(os.environ.get("MSC_SYNC_BATCH_SIZE", DEFAULT_SYNC_BATCH_SIZE))
928
929 return m.sync_objects(
930 execution_mode=execution_mode,
931 description=description,
932 num_worker_processes=num_worker_processes,
933 delete_unmatched_files=delete_unmatched_files,
934 pattern_matcher=pattern_matcher,
935 preserve_source_attributes=preserve_source_attributes,
936 follow_symlinks=follow_symlinks,
937 source_files=source_files,
938 ignore_hidden=ignore_hidden,
939 commit_metadata=commit_metadata,
940 batch_size=batch_size,
941 )
942
[docs]
943 def sync_replicas(
944 self,
945 source_path: str,
946 replica_indices: Optional[list[int]] = None,
947 delete_unmatched_files: bool = False,
948 description: str = "Syncing replica",
949 num_worker_processes: Optional[int] = None,
950 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
951 patterns: Optional[PatternList] = None,
952 ignore_hidden: bool = True,
953 ) -> None:
954 """
955 Sync files from this client to its replica storage clients.
956
957 :param source_path: The logical path to sync from.
958 :param replica_indices: Specific replica indices to sync to (0-indexed). If None, syncs to all replicas.
959 :param delete_unmatched_files: When set to ``True``, delete files in replicas that don't exist in source.
960 :param description: Description of sync process for logging purposes.
961 :param num_worker_processes: Number of worker processes for parallel sync.
962 :param execution_mode: Execution mode (LOCAL or REMOTE).
963 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
964 :param ignore_hidden: When set to ``True``, ignore hidden files (starting with '.'). Defaults to ``True``.
965 """
966 if not self._replicas:
967 logger.warning(
968 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable "
969 "secondary storage locations for redundancy and performance.",
970 self._config.profile,
971 )
972 return None
973
974 if replica_indices:
975 try:
976 replicas = [self._replicas[i] for i in replica_indices]
977 except IndexError as e:
978 raise ValueError(f"Replica index out of range: {replica_indices}") from e
979 else:
980 replicas = self._replicas
981
982 # Disable the replica manager during sync
983 if self._replica_manager:
984 # Import here to avoid circular dependency
985 from .client import StorageClient as StorageClientFacade
986
987 source_client = StorageClientFacade(self._config)
988 source_client._replica_manager = None
989 else:
990 source_client = self
991
992 for replica in replicas:
993 replica.sync_from(
994 source_client,
995 source_path,
996 source_path,
997 delete_unmatched_files=delete_unmatched_files,
998 description=f"{description} ({replica.profile})",
999 num_worker_processes=num_worker_processes,
1000 execution_mode=execution_mode,
1001 patterns=patterns,
1002 ignore_hidden=ignore_hidden,
1003 )
1004
[docs]
1005 def list(
1006 self,
1007 prefix: str = "",
1008 path: str = "",
1009 start_after: Optional[str] = None,
1010 end_at: Optional[str] = None,
1011 include_directories: bool = False,
1012 include_url_prefix: bool = False,
1013 attribute_filter_expression: Optional[str] = None,
1014 show_attributes: bool = False,
1015 follow_symlinks: bool = True,
1016 patterns: Optional[PatternList] = None,
1017 ) -> Iterator[ObjectMetadata]:
1018 """
1019 List objects in the storage provider under the specified path.
1020
1021 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is
1022 deprecated and will be removed in a future version.
1023
1024 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under.
1025 :param path: The directory or file path to list objects under. This should be a
1026 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
1027 Cannot be used together with ``prefix``.
1028 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
1029 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
1030 :param include_directories: Whether to include directories in the result. When ``True``, directories are returned alongside objects.
1031 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
1032 :param attribute_filter_expression: The attribute filter expression to apply to the result.
1033 :param show_attributes: Whether to return attributes in the result. WARNING: Depending on implementation, there may be a performance impact if this is set to ``True``.
1034 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When ``False``, symlinks are skipped during listing.
1035 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
1036 :return: An iterator over ObjectMetadata for matching objects.
1037 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty).
1038 """
1039 # Parameter validation - either path or prefix, not both
1040 if path and prefix:
1041 raise ValueError(
1042 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). "
1043 f"Please use only the 'path' parameter for new code. "
1044 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
1045 )
1046 elif prefix:
1047 logger.debug(
1048 f"The 'prefix' parameter is deprecated and will be removed in a future version. "
1049 f"Please use the 'path' parameter instead. "
1050 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
1051 )
1052
1053 pattern_matcher = PatternMatcher(patterns) if patterns else None
1054 effective_path = path if path else prefix
1055
1056 single_file, effective_path = self._resolve_single_file(
1057 effective_path, start_after, end_at, include_url_prefix, pattern_matcher
1058 )
1059 if single_file is not None:
1060 yield single_file
1061 return
1062 if effective_path is None:
1063 return
1064
1065 if self._metadata_provider:
1066 objects = self._metadata_provider.list_objects(
1067 effective_path,
1068 start_after=start_after,
1069 end_at=end_at,
1070 include_directories=include_directories,
1071 attribute_filter_expression=attribute_filter_expression,
1072 show_attributes=show_attributes,
1073 )
1074 else:
1075 objects = self._storage_provider.list_objects(
1076 effective_path,
1077 start_after=start_after,
1078 end_at=end_at,
1079 include_directories=include_directories,
1080 attribute_filter_expression=attribute_filter_expression,
1081 show_attributes=show_attributes,
1082 follow_symlinks=follow_symlinks,
1083 )
1084
1085 yield from self._filter_and_decorate(objects, include_url_prefix, pattern_matcher)