1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import contextlib
18import logging
19import os
20import threading
21from collections.abc import Iterator
22from datetime import datetime, timezone
23from io import BytesIO
24from pathlib import PurePosixPath
25from typing import IO, Any, List, Optional, Union, cast
26
27from .config import StorageClientConfig
28from .constants import MEMORY_LOAD_LIMIT
29from .file import ObjectFile, PosixFile
30from .providers.posix_file import PosixFileStorageProvider
31from .replica_manager import ReplicaManager
32from .retry import retry
33from .sync import SyncManager
34from .types import (
35 AWARE_DATETIME_MIN,
36 MSC_PROTOCOL,
37 ExecutionMode,
38 ObjectMetadata,
39 PatternList,
40 Range,
41 Replica,
42 SourceVersionCheckMode,
43)
44from .utils import NullStorageClient, PatternMatcher, join_paths
45
46logger = logging.getLogger(__name__)
47
48
[docs]
49class StorageClient:
50 """
51 A client for interacting with different storage providers.
52 """
53
54 _config: StorageClientConfig
55 _metadata_provider_lock: Optional[threading.Lock] = None
56 _stop_event: Optional[threading.Event] = None
57 _replica_manager: Optional[ReplicaManager] = None
58
59 def __init__(self, config: StorageClientConfig):
60 """
61 Initializes the :py:class:`StorageClient` with the given configuration.
62
63 :param config: The configuration object for the storage client.
64 """
65 self._initialize_providers(config)
66 self._initialize_replicas(config.replicas)
67
68 def _initialize_providers(self, config: StorageClientConfig) -> None:
69 self._config = config
70 self._credentials_provider = self._config.credentials_provider
71 self._storage_provider = self._config.storage_provider
72 self._metadata_provider = self._config.metadata_provider
73 self._cache_config = self._config.cache_config
74 self._retry_config = self._config.retry_config
75 self._cache_manager = self._config.cache_manager
76 self._autocommit_config = self._config.autocommit_config
77
78 if self._autocommit_config:
79 if self._metadata_provider:
80 logger.debug("Creating auto-commiter thread")
81
82 if self._autocommit_config.interval_minutes:
83 self._stop_event = threading.Event()
84 self._commit_thread = threading.Thread(
85 target=self._committer_thread,
86 daemon=True,
87 args=(self._autocommit_config.interval_minutes, self._stop_event),
88 )
89 self._commit_thread.start()
90
91 if self._autocommit_config.at_exit:
92 atexit.register(self._commit_on_exit)
93
94 self._metadata_provider_lock = threading.Lock()
95 else:
96 logger.debug("No metadata provider configured, auto-commit will not be enabled")
97
98 def _initialize_replicas(self, replicas: list[Replica]) -> None:
99 """Initialize replica StorageClient instances."""
100 # Sort replicas by read_priority, the first one is the primary replica
101 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority)
102
103 replica_clients = []
104 for replica in sorted_replicas:
105 if self._config._config_dict is None:
106 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config")
107 replica_config = StorageClientConfig.from_dict(
108 config_dict=self._config._config_dict, profile=replica.replica_profile
109 )
110
111 storage_client = StorageClient(config=replica_config)
112 replica_clients.append(storage_client)
113
114 self._replicas = replica_clients
115 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None
116
117 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event):
118 if not stop_event:
119 raise RuntimeError("Stop event not set")
120
121 while not stop_event.is_set():
122 # Wait with the ability to exit early
123 if stop_event.wait(timeout=commit_interval_minutes * 60):
124 break
125 logger.debug("Auto-committing to metadata provider")
126 self.commit_metadata()
127
128 def _commit_on_exit(self):
129 logger.debug("Shutting down, committing metadata one last time...")
130 self.commit_metadata()
131
132 def _get_source_version(self, path: str) -> Optional[str]:
133 """
134 Get etag from metadata provider or storage provider.
135 """
136 if self._metadata_provider:
137 metadata = self._metadata_provider.get_object_metadata(path)
138 else:
139 metadata = self._storage_provider.get_object_metadata(path)
140 return metadata.etag
141
142 def _is_cache_enabled(self) -> bool:
143 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider()
144 return enabled
145
146 def _is_posix_file_storage_provider(self) -> bool:
147 return isinstance(self._storage_provider, PosixFileStorageProvider)
148
149 def _is_rust_client_enabled(self) -> bool:
150 """
151 Return True if the storage provider is using the Rust client.
152 """
153 return hasattr(self._storage_provider, "_rust_client")
154
155 def _read_from_replica_or_primary(self, path: str) -> bytes:
156 """
157 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files.
158 """
159 if self._replica_manager is None:
160 raise RuntimeError("Replica manager is not initialized")
161 file_obj = BytesIO()
162 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider)
163 return file_obj.getvalue()
164
165 def __del__(self):
166 if self._stop_event:
167 self._stop_event.set()
168 if self._commit_thread.is_alive():
169 self._commit_thread.join(timeout=5.0)
170
171 def __getstate__(self) -> dict[str, Any]:
172 state = self.__dict__.copy()
173 del state["_credentials_provider"]
174 del state["_storage_provider"]
175 del state["_metadata_provider"]
176 del state["_cache_manager"]
177
178 if "_metadata_provider_lock" in state:
179 del state["_metadata_provider_lock"]
180
181 if "_replicas" in state:
182 del state["_replicas"]
183
184 # Replica manager could be disabled if it's set to None in the state.
185 if "_replica_manager" in state:
186 if state["_replica_manager"] is not None:
187 del state["_replica_manager"]
188
189 return state
190
191 def __setstate__(self, state: dict[str, Any]) -> None:
192 config = state["_config"]
193 self._initialize_providers(config)
194
195 # Replica manager could be disabled if it's set to None in the state.
196 if "_replica_manager" in state and state["_replica_manager"] is None:
197 self._replica_manager = None
198 else:
199 self._initialize_replicas(config.replicas)
200
201 if self._metadata_provider:
202 self._metadata_provider_lock = threading.Lock()
203
[docs]
204 def is_default_profile(self) -> bool:
205 """
206 Return True if the storage client is using the default profile.
207 """
208 return self._config.profile == "default"
209
210 @property
211 def profile(self) -> str:
212 """
213 :return: The profile name of the storage client.
214 """
215 return self._config.profile
216
217 @property
218 def replicas(self) -> list["StorageClient"]:
219 """
220 :return: StorageClient instances for all replicas, sorted by read priority.
221 """
222 return self._replicas
223
224 @retry
225 def read(
226 self,
227 path: str,
228 byte_range: Optional[Range] = None,
229 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
230 ) -> bytes:
231 """
232 Reads an object from the specified logical path.
233
234 :param path: The logical path of the object to read.
235 :param byte_range: Optional byte range to read.
236 :param check_source_version: Whether to check the source version of cached objects.
237 :param size: Optional file size for range reads.
238 :return: The content of the object.
239 """
240 if self._metadata_provider:
241 path, exists = self._metadata_provider.realpath(path)
242 if not exists:
243 raise FileNotFoundError(f"The file at path '{path}' was not found.")
244
245 # Handle caching logic
246 if self._is_cache_enabled() and self._cache_manager:
247 if byte_range:
248 # Range request with cache
249 try:
250 if check_source_version == SourceVersionCheckMode.ENABLE:
251 metadata = self._storage_provider.get_object_metadata(path)
252 source_version = metadata.etag
253 elif check_source_version == SourceVersionCheckMode.INHERIT:
254 if self._cache_manager.check_source_version():
255 metadata = self._storage_provider.get_object_metadata(path)
256 source_version = metadata.etag
257 else:
258 metadata = None
259 source_version = None
260 else:
261 metadata = None
262 source_version = None
263
264 data = self._cache_manager.read(
265 key=path,
266 source_version=source_version,
267 byte_range=byte_range,
268 storage_provider=self._storage_provider,
269 )
270 if data is not None:
271 return data
272 # Fallback (should not normally happen)
273 return self._storage_provider.get_object(path, byte_range=byte_range)
274 except (FileNotFoundError, Exception):
275 # Fall back to direct read if metadata fetching fails
276 return self._storage_provider.get_object(path, byte_range=byte_range)
277 else:
278 # Full file read with cache
279 source_version = self._get_source_version(path)
280 data = self._cache_manager.read(path, source_version)
281
282 # Read from cache if the file exists
283 if self._is_cache_enabled():
284 if self._cache_manager is None:
285 raise RuntimeError("Cache manager is not initialized")
286 source_version = self._get_source_version(path)
287 data = self._cache_manager.read(path, source_version)
288
289 if data is None:
290 if self._replica_manager:
291 data = self._read_from_replica_or_primary(path)
292 else:
293 data = self._storage_provider.get_object(path)
294 self._cache_manager.set(path, data, source_version)
295 return data
296 elif self._replica_manager:
297 # No cache, but replicas available
298 return self._read_from_replica_or_primary(path)
299 else:
300 # No cache, no replicas - direct storage provider read
301 return self._storage_provider.get_object(path, byte_range=byte_range)
302
[docs]
303 def info(self, path: str, strict: bool = True) -> ObjectMetadata:
304 """
305 Retrieves metadata or information about an object stored at the specified path.
306
307 :param path: The logical path to the object for which metadata or information is being retrieved.
308 :param strict: If True, performs additional validation to determine whether the path refers to a directory.
309
310 :return: A dictionary containing metadata about the object.
311 """
312 if not path or path == ".": # for the empty path provided by the user
313 if self._is_posix_file_storage_provider():
314 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc)
315 else:
316 last_modified = AWARE_DATETIME_MIN
317 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified)
318
319 if not self._metadata_provider:
320 return self._storage_provider.get_object_metadata(path, strict=strict)
321
322 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory.
323 # TODO: Consider passing strict argument to the metadata provider.
324 try:
325 return self._metadata_provider.get_object_metadata(path)
326 except FileNotFoundError:
327 # Try listing from the parent to determine if path is a valid directory
328 parent = os.path.dirname(path.rstrip("/")) + "/"
329 parent = "" if parent == "/" else parent
330 target = path.rstrip("/") + "/"
331
332 try:
333 entries = self._metadata_provider.list_objects(parent, include_directories=True)
334 for entry in entries:
335 if entry.key == target and entry.type == "directory":
336 return entry
337 except Exception:
338 pass
339 raise # Raise original FileNotFoundError
340
341 @retry
342 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None:
343 """
344 Downloads a file to the local file system.
345
346 :param remote_path: The logical path of the file in the storage provider.
347 :param local_path: The local path where the file should be downloaded.
348 """
349 if self._metadata_provider:
350 real_path, exists = self._metadata_provider.realpath(remote_path)
351 if not exists:
352 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.")
353
354 metadata = self._metadata_provider.get_object_metadata(remote_path)
355 self._storage_provider.download_file(real_path, local_path, metadata)
356 elif self._replica_manager:
357 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider)
358 else:
359 self._storage_provider.download_file(remote_path, local_path)
360
361 @retry
362 def upload_file(self, remote_path: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None:
363 """
364 Uploads a file from the local file system.
365
366 :param remote_path: The logical path where the file should be stored.
367 :param local_path: The local path of the file to upload.
368 :param attributes: The attributes to add to the file.
369 """
370 virtual_path = remote_path
371 if self._metadata_provider:
372 remote_path, exists = self._metadata_provider.realpath(remote_path)
373 if exists:
374 raise FileExistsError(
375 f"The file at path '{virtual_path}' already exists; "
376 f"overwriting is not yet allowed when using a metadata provider."
377 )
378
379 # if metdata provider is present, we only write attributes to the metadata provider
380 self._storage_provider.upload_file(remote_path, local_path, attributes=None)
381
382 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
383 obj_metadata = self._storage_provider.get_object_metadata(remote_path)
384 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
385 with self._metadata_provider_lock or contextlib.nullcontext():
386 self._metadata_provider.add_file(virtual_path, obj_metadata)
387 else:
388 self._storage_provider.upload_file(remote_path, local_path, attributes)
389
390 @retry
391 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None:
392 """
393 Writes an object at the specified path.
394
395 :param path: The logical path where the object should be written.
396 :param body: The content to write to the object.
397 :param attributes: The attributes to add to the file.
398 """
399 virtual_path = path
400 if self._metadata_provider:
401 path, exists = self._metadata_provider.realpath(path)
402 if exists:
403 raise FileExistsError(
404 f"The file at path '{virtual_path}' already exists; "
405 f"overwriting is not yet allowed when using a metadata provider."
406 )
407
408 # if metadata provider is present, we only write attributes to the metadata provider
409 self._storage_provider.put_object(path, body, attributes=None)
410
411 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
412 obj_metadata = self._storage_provider.get_object_metadata(path)
413 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
414 with self._metadata_provider_lock or contextlib.nullcontext():
415 self._metadata_provider.add_file(virtual_path, obj_metadata)
416 else:
417 self._storage_provider.put_object(path, body, attributes=attributes)
418
[docs]
419 def copy(self, src_path: str, dest_path: str) -> None:
420 """
421 Copies an object from source to destination path.
422
423 :param src_path: The logical path of the source object to copy.
424 :param dest_path: The logical path of the destination.
425 """
426 virtual_dest_path = dest_path
427 if self._metadata_provider:
428 src_path, exists = self._metadata_provider.realpath(src_path)
429 if not exists:
430 raise FileNotFoundError(f"The file at path '{src_path}' was not found.")
431
432 dest_path, exists = self._metadata_provider.realpath(dest_path)
433 if exists:
434 raise FileExistsError(
435 f"The file at path '{virtual_dest_path}' already exists; "
436 f"overwriting is not yet allowed when using a metadata provider."
437 )
438
439 self._storage_provider.copy_object(src_path, dest_path)
440
441 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
442 obj_metadata = self._storage_provider.get_object_metadata(dest_path)
443 with self._metadata_provider_lock or contextlib.nullcontext():
444 self._metadata_provider.add_file(virtual_dest_path, obj_metadata)
445 else:
446 self._storage_provider.copy_object(src_path, dest_path)
447
[docs]
448 def delete(self, path: str, recursive: bool = False) -> None:
449 """
450 Deletes an object at the specified path.
451
452 :param path: The logical path of the object or directory to delete.
453 :param recursive: Whether to delete objects in the path recursively.
454 """
455 obj_metadata = self.info(path)
456 is_dir = obj_metadata and obj_metadata.type == "directory"
457 is_file = obj_metadata and obj_metadata.type == "file"
458 if recursive and is_dir:
459 self.sync_from(
460 cast(StorageClient, NullStorageClient()),
461 path,
462 path,
463 delete_unmatched_files=True,
464 num_worker_processes=1,
465 description="Deleting",
466 )
467 # If this is a posix storage provider, we need to also delete remaining directory stubs.
468 # TODO: Nofity metadata provider for the changes.
469 if self._is_posix_file_storage_provider():
470 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider)
471 posix_storage_provider.rmtree(path)
472 return
473 else:
474 # 1) If path is a file: delete the file
475 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag
476 if is_file:
477 virtual_path = path
478 if self._metadata_provider:
479 path, exists = self._metadata_provider.realpath(path)
480 if not exists:
481 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.")
482
483 self._storage_provider.delete_object(path)
484
485 with self._metadata_provider_lock or contextlib.nullcontext():
486 self._metadata_provider.remove_file(virtual_path)
487 else:
488 self._storage_provider.delete_object(path)
489
490 # Delete the cached file if it exists
491 if self._is_cache_enabled():
492 if self._cache_manager is None:
493 raise RuntimeError("Cache manager is not initialized")
494 self._cache_manager.delete(virtual_path)
495
496 # Delete from replicas if replica manager exists
497 if self._replica_manager:
498 self._replica_manager.delete_from_replicas(virtual_path)
499 elif is_dir:
500 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.")
501 else:
502 raise FileNotFoundError(f"The file at '{path}' was not found.")
503
[docs]
504 def glob(
505 self,
506 pattern: str,
507 include_url_prefix: bool = False,
508 attribute_filter_expression: Optional[str] = None,
509 ) -> list[str]:
510 """
511 Matches and retrieves a list of objects in the storage provider that
512 match the specified pattern.
513
514 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``).
515 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
516 :param attribute_filter_expression: The attribute filter expression to apply to the result.
517
518 :return: A list of object paths that match the pattern.
519 """
520 if self._metadata_provider:
521 results = self._metadata_provider.glob(pattern, attribute_filter_expression)
522 else:
523 results = self._storage_provider.glob(pattern, attribute_filter_expression)
524
525 if include_url_prefix:
526 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results]
527
528 return results
529
[docs]
530 def list(
531 self,
532 prefix: str = "",
533 path: str = "",
534 start_after: Optional[str] = None,
535 end_at: Optional[str] = None,
536 include_directories: bool = False,
537 include_url_prefix: bool = False,
538 attribute_filter_expression: Optional[str] = None,
539 show_attributes: bool = False,
540 follow_symlinks: bool = True,
541 ) -> Iterator[ObjectMetadata]:
542 """
543 Lists objects in the storage provider under the specified path.
544
545 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is
546 deprecated and will be removed in a future version.
547
548 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under.
549 :param path: The directory or file path to list objects under. This should be a
550 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
551 Cannot be used together with ``prefix``.
552 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
553 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
554 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects.
555 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
556 :param attribute_filter_expression: The attribute filter expression to apply to the result.
557 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to True.
558 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When False, symlinks are skipped during listing.
559
560 :return: An iterator over objects.
561
562 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty).
563 """
564 # Parameter validation - either path or prefix, not both
565 if path and prefix:
566 raise ValueError(
567 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). "
568 f"Please use only the 'path' parameter for new code. "
569 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
570 )
571 elif prefix:
572 logger.debug(
573 f"The 'prefix' parameter is deprecated and will be removed in a future version. "
574 f"Please use the 'path' parameter instead. "
575 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
576 )
577
578 # Use path if provided, otherwise fall back to prefix
579 effective_path = path if path else prefix
580 if effective_path and not self.is_file(effective_path):
581 effective_path = (
582 effective_path.rstrip("/") + "/"
583 ) # assume it's a directory if the effective path is not empty
584
585 if self._metadata_provider:
586 objects = self._metadata_provider.list_objects(
587 effective_path,
588 start_after=start_after,
589 end_at=end_at,
590 include_directories=include_directories,
591 attribute_filter_expression=attribute_filter_expression,
592 show_attributes=show_attributes,
593 )
594 else:
595 objects = self._storage_provider.list_objects(
596 effective_path,
597 start_after=start_after,
598 end_at=end_at,
599 include_directories=include_directories,
600 attribute_filter_expression=attribute_filter_expression,
601 show_attributes=show_attributes,
602 follow_symlinks=follow_symlinks,
603 )
604
605 for object in objects:
606 if include_url_prefix:
607 if self.is_default_profile():
608 object.key = str(PurePosixPath("/") / object.key)
609 else:
610 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key)
611 yield object
612
[docs]
613 def open(
614 self,
615 path: str,
616 mode: str = "rb",
617 buffering: int = -1,
618 encoding: Optional[str] = None,
619 disable_read_cache: bool = False,
620 memory_load_limit: int = MEMORY_LOAD_LIMIT,
621 atomic: bool = True,
622 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
623 attributes: Optional[dict[str, str]] = None,
624 prefetch_file: bool = True,
625 ) -> Union[PosixFile, ObjectFile]:
626 """
627 Returns a file-like object from the specified path.
628
629 :param path: The logical path of the object to read.
630 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported.
631 :param buffering: The buffering mode. Only applies to PosixFile.
632 :param encoding: The encoding to use for text files.
633 :param disable_read_cache: When set to True, disables caching for the file content.
634 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
635 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB.
636 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
637 :param atomic: When set to True, the file will be written atomically (rename upon close).
638 This parameter is only applicable to PosixFile in write mode.
639 :param check_source_version: Whether to check the source version of cached objects.
640 :param attributes: The attributes to add to the file. This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab".
641
642 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
643 """
644 if self._is_posix_file_storage_provider():
645 return PosixFile(
646 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes
647 )
648 else:
649 if atomic is False:
650 logger.warning("Non-atomic writes are not supported for object storage providers.")
651
652 return ObjectFile(
653 self,
654 remote_path=path,
655 mode=mode,
656 encoding=encoding,
657 disable_read_cache=disable_read_cache,
658 memory_load_limit=memory_load_limit,
659 check_source_version=check_source_version,
660 attributes=attributes,
661 prefetch_file=prefetch_file,
662 )
663
[docs]
664 def get_posix_path(self, path: str) -> Optional[str]:
665 """
666 Returns the physical POSIX filesystem path for POSIX storage providers.
667
668 :param path: The path to resolve (may be a symlink or virtual path).
669 :return: Physical POSIX filesystem path if POSIX storage, None otherwise.
670 """
671 if not self._is_posix_file_storage_provider():
672 return None
673
674 if self._metadata_provider:
675 realpath, _ = self._metadata_provider.realpath(path)
676 else:
677 realpath = path
678
679 return cast(PosixFileStorageProvider, self._storage_provider)._prepend_base_path(realpath)
680
[docs]
681 def is_file(self, path: str) -> bool:
682 """
683 Checks whether the specified path points to a file (rather than a directory or folder).
684
685 :param path: The logical path to check.
686
687 :return: ``True`` if the path points to a file, ``False`` otherwise.
688 """
689 if self._metadata_provider:
690 _, exists = self._metadata_provider.realpath(path)
691 return exists
692
693 return self._storage_provider.is_file(path)
694
712
[docs]
713 def is_empty(self, path: str) -> bool:
714 """
715 Checks whether the specified path is empty. A path is considered empty if there are no
716 objects whose keys start with the given path as a prefix.
717
718 :param path: The logical path to check. This is typically a prefix representing a directory or folder.
719
720 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
721 """
722 if self._metadata_provider:
723 objects = self._metadata_provider.list_objects(path)
724 else:
725 objects = self._storage_provider.list_objects(path)
726
727 try:
728 return next(objects) is None
729 except StopIteration:
730 pass
731
732 return True
733
[docs]
734 def sync_from(
735 self,
736 source_client: "StorageClient",
737 source_path: str = "",
738 target_path: str = "",
739 delete_unmatched_files: bool = False,
740 description: str = "Syncing",
741 num_worker_processes: Optional[int] = None,
742 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
743 patterns: Optional[PatternList] = None,
744 preserve_source_attributes: bool = False,
745 follow_symlinks: bool = True,
746 source_files: Optional[List[str]] = None,
747 ignore_hidden: bool = True,
748 ) -> None:
749 """
750 Syncs files from the source storage client to "path/".
751
752 :param source_client: The source storage client.
753 :param source_path: The logical path to sync from.
754 :param target_path: The logical path to sync to.
755 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
756 :param description: Description of sync process for logging purposes.
757 :param num_worker_processes: The number of worker processes to use.
758 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
759 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
760 Cannot be used together with source_files.
761 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization.
762 When False (default), only file content is copied. When True, custom metadata attributes are also preserved.
763
764 .. warning::
765 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD
766 request for each object to retrieve attributes, which can significantly impact performance on large-scale
767 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile.
768
769 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is True.
770 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these
771 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns.
772 :param ignore_hidden: Whether to ignore hidden files and directories. Default is True.
773 :raises ValueError: If both source_files and patterns are provided.
774 """
775 if source_files and patterns:
776 raise ValueError("Cannot specify both 'source_files' and 'patterns'. Please use only one filtering method.")
777
778 pattern_matcher = PatternMatcher(patterns) if patterns else None
779
780 # Disable the replica manager during sync
781 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager:
782 source_client = StorageClient(source_client._config)
783 source_client._replica_manager = None
784
785 m = SyncManager(source_client, source_path, self, target_path)
786
787 m.sync_objects(
788 execution_mode=execution_mode,
789 description=description,
790 num_worker_processes=num_worker_processes,
791 delete_unmatched_files=delete_unmatched_files,
792 pattern_matcher=pattern_matcher,
793 preserve_source_attributes=preserve_source_attributes,
794 follow_symlinks=follow_symlinks,
795 source_files=source_files,
796 ignore_hidden=ignore_hidden,
797 )
798
[docs]
799 def sync_replicas(
800 self,
801 source_path: str,
802 replica_indices: Optional[List[int]] = None,
803 delete_unmatched_files: bool = False,
804 description: str = "Syncing replica",
805 num_worker_processes: Optional[int] = None,
806 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
807 patterns: Optional[PatternList] = None,
808 ignore_hidden: bool = True,
809 ) -> None:
810 """
811 Sync files from the source storage client to target replicas.
812
813 :param source_path: The logical path to sync from.
814 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0.
815 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
816 :param description: Description of sync process for logging purposes.
817 :param num_worker_processes: The number of worker processes to use.
818 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
819 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
820 :param ignore_hidden: Whether to ignore hidden files and directories (starting with dot). Default is True.
821 """
822 if not self._replicas:
823 logger.warning(
824 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable "
825 "secondary storage locations for redundancy and performance.",
826 self._config.profile,
827 )
828 return None
829
830 if replica_indices:
831 try:
832 replicas = [self._replicas[i] for i in replica_indices]
833 except IndexError as e:
834 raise ValueError(f"Replica index out of range: {replica_indices}") from e
835 else:
836 replicas = self._replicas
837
838 # Disable the replica manager during sync
839 if self._replica_manager:
840 source_client = StorageClient(self._config)
841 source_client._replica_manager = None
842 else:
843 source_client = self
844
845 for replica in replicas:
846 replica.sync_from(
847 source_client,
848 source_path,
849 source_path,
850 delete_unmatched_files=delete_unmatched_files,
851 description=f"{description} ({replica.profile})",
852 num_worker_processes=num_worker_processes,
853 execution_mode=execution_mode,
854 patterns=patterns,
855 ignore_hidden=ignore_hidden,
856 )