1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import contextlib
18import logging
19import os
20import threading
21from collections.abc import Iterator
22from datetime import datetime, timezone
23from io import BytesIO
24from pathlib import PurePosixPath
25from typing import IO, Any, List, Optional, Union, cast
26
27from .config import StorageClientConfig
28from .constants import MEMORY_LOAD_LIMIT
29from .file import ObjectFile, PosixFile
30from .providers.posix_file import PosixFileStorageProvider
31from .replica_manager import ReplicaManager
32from .retry import retry
33from .sync import SyncManager
34from .types import (
35 AWARE_DATETIME_MIN,
36 MSC_PROTOCOL,
37 ExecutionMode,
38 ObjectMetadata,
39 PatternList,
40 Range,
41 Replica,
42 SourceVersionCheckMode,
43 StorageProvider,
44)
45from .utils import NullStorageClient, PatternMatcher, join_paths
46
47logger = logging.getLogger(__name__)
48
49
[docs]
50class StorageClient:
51 """
52 A client for interacting with different storage providers.
53 """
54
55 _config: StorageClientConfig
56 _storage_provider: StorageProvider
57 _metadata_provider_lock: Optional[threading.Lock] = None
58 _stop_event: Optional[threading.Event] = None
59 _replica_manager: Optional[ReplicaManager] = None
60
61 def __init__(self, config: StorageClientConfig):
62 """
63 Initializes the :py:class:`StorageClient` with the given configuration.
64
65 :param config: The configuration object for the storage client.
66 """
67 self._initialize_providers(config)
68 self._initialize_replicas(config.replicas)
69
70 def _initialize_providers(self, config: StorageClientConfig) -> None:
71 # TODO: Support composite clients with storage_provider_profiles
72 if config.storage_provider is None or config.storage_provider_profiles:
73 raise NotImplementedError("StorageClient with storage_provider_profiles is not yet supported.")
74
75 self._config = config
76 self._credentials_provider = self._config.credentials_provider
77 self._storage_provider = cast(StorageProvider, self._config.storage_provider)
78 self._metadata_provider = self._config.metadata_provider
79 self._cache_config = self._config.cache_config
80 self._retry_config = self._config.retry_config
81 self._cache_manager = self._config.cache_manager
82 self._autocommit_config = self._config.autocommit_config
83
84 if self._autocommit_config:
85 if self._metadata_provider:
86 logger.debug("Creating auto-commiter thread")
87
88 if self._autocommit_config.interval_minutes:
89 self._stop_event = threading.Event()
90 self._commit_thread = threading.Thread(
91 target=self._committer_thread,
92 daemon=True,
93 args=(self._autocommit_config.interval_minutes, self._stop_event),
94 )
95 self._commit_thread.start()
96
97 if self._autocommit_config.at_exit:
98 atexit.register(self._commit_on_exit)
99
100 self._metadata_provider_lock = threading.Lock()
101 else:
102 logger.debug("No metadata provider configured, auto-commit will not be enabled")
103
104 def _initialize_replicas(self, replicas: list[Replica]) -> None:
105 """Initialize replica StorageClient instances."""
106 # Sort replicas by read_priority, the first one is the primary replica
107 sorted_replicas = sorted(replicas, key=lambda r: r.read_priority)
108
109 replica_clients = []
110 for replica in sorted_replicas:
111 if self._config._config_dict is None:
112 raise ValueError(f"Cannot initialize replica '{replica.replica_profile}' without a config")
113 replica_config = StorageClientConfig.from_dict(
114 config_dict=self._config._config_dict, profile=replica.replica_profile
115 )
116
117 storage_client = StorageClient(config=replica_config)
118 replica_clients.append(storage_client)
119
120 self._replicas = replica_clients
121 self._replica_manager = ReplicaManager(self) if len(self._replicas) > 0 else None
122
123 def _committer_thread(self, commit_interval_minutes: float, stop_event: threading.Event):
124 if not stop_event:
125 raise RuntimeError("Stop event not set")
126
127 while not stop_event.is_set():
128 # Wait with the ability to exit early
129 if stop_event.wait(timeout=commit_interval_minutes * 60):
130 break
131 logger.debug("Auto-committing to metadata provider")
132 self.commit_metadata()
133
134 def _commit_on_exit(self):
135 logger.debug("Shutting down, committing metadata one last time...")
136 self.commit_metadata()
137
138 def _get_source_version(self, path: str) -> Optional[str]:
139 """
140 Get etag from metadata provider or storage provider.
141 """
142 if self._metadata_provider:
143 metadata = self._metadata_provider.get_object_metadata(path)
144 else:
145 metadata = self._storage_provider.get_object_metadata(path)
146 return metadata.etag
147
148 def _is_cache_enabled(self) -> bool:
149 enabled = self._cache_manager is not None and not self._is_posix_file_storage_provider()
150 return enabled
151
152 def _is_posix_file_storage_provider(self) -> bool:
153 return isinstance(self._storage_provider, PosixFileStorageProvider)
154
155 def _is_rust_client_enabled(self) -> bool:
156 """
157 Return True if the storage provider is using the Rust client.
158 """
159 return hasattr(self._storage_provider, "_rust_client")
160
161 def _read_from_replica_or_primary(self, path: str) -> bytes:
162 """
163 Read from replica or primary storage provider. Use BytesIO to avoid creating temporary files.
164 """
165 if self._replica_manager is None:
166 raise RuntimeError("Replica manager is not initialized")
167 file_obj = BytesIO()
168 self._replica_manager.download_from_replica_or_primary(path, file_obj, self._storage_provider)
169 return file_obj.getvalue()
170
171 def __del__(self):
172 if self._stop_event:
173 self._stop_event.set()
174 if self._commit_thread.is_alive():
175 self._commit_thread.join(timeout=5.0)
176
177 def __getstate__(self) -> dict[str, Any]:
178 state = self.__dict__.copy()
179 del state["_credentials_provider"]
180 del state["_storage_provider"]
181 del state["_metadata_provider"]
182 del state["_cache_manager"]
183
184 if "_metadata_provider_lock" in state:
185 del state["_metadata_provider_lock"]
186
187 if "_replicas" in state:
188 del state["_replicas"]
189
190 # Replica manager could be disabled if it's set to None in the state.
191 if "_replica_manager" in state:
192 if state["_replica_manager"] is not None:
193 del state["_replica_manager"]
194
195 return state
196
197 def __setstate__(self, state: dict[str, Any]) -> None:
198 config = state["_config"]
199 self._initialize_providers(config)
200
201 # Replica manager could be disabled if it's set to None in the state.
202 if "_replica_manager" in state and state["_replica_manager"] is None:
203 self._replica_manager = None
204 else:
205 self._initialize_replicas(config.replicas)
206
207 if self._metadata_provider:
208 self._metadata_provider_lock = threading.Lock()
209
[docs]
210 def is_default_profile(self) -> bool:
211 """
212 Return True if the storage client is using the default profile.
213 """
214 return self._config.profile == "default"
215
216 @property
217 def profile(self) -> str:
218 """
219 :return: The profile name of the storage client.
220 """
221 return self._config.profile
222
223 @property
224 def replicas(self) -> list["StorageClient"]:
225 """
226 :return: StorageClient instances for all replicas, sorted by read priority.
227 """
228 return self._replicas
229
230 @retry
231 def read(
232 self,
233 path: str,
234 byte_range: Optional[Range] = None,
235 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
236 ) -> bytes:
237 """
238 Reads an object from the specified logical path.
239
240 :param path: The logical path of the object to read.
241 :param byte_range: Optional byte range to read.
242 :param check_source_version: Whether to check the source version of cached objects.
243 :param size: Optional file size for range reads.
244 :return: The content of the object.
245 """
246 if self._metadata_provider:
247 resolved = self._metadata_provider.realpath(path)
248 if not resolved.exists:
249 raise FileNotFoundError(f"The file at path '{path}' was not found.")
250 path = resolved.physical_path
251
252 # Handle caching logic
253 if self._is_cache_enabled() and self._cache_manager:
254 if byte_range:
255 # Range request with cache
256 try:
257 if check_source_version == SourceVersionCheckMode.ENABLE:
258 metadata = self._storage_provider.get_object_metadata(path)
259 source_version = metadata.etag
260 elif check_source_version == SourceVersionCheckMode.INHERIT:
261 if self._cache_manager.check_source_version():
262 metadata = self._storage_provider.get_object_metadata(path)
263 source_version = metadata.etag
264 else:
265 metadata = None
266 source_version = None
267 else:
268 metadata = None
269 source_version = None
270
271 data = self._cache_manager.read(
272 key=path,
273 source_version=source_version,
274 byte_range=byte_range,
275 storage_provider=self._storage_provider,
276 )
277 if data is not None:
278 return data
279 # Fallback (should not normally happen)
280 return self._storage_provider.get_object(path, byte_range=byte_range)
281 except (FileNotFoundError, Exception):
282 # Fall back to direct read if metadata fetching fails
283 return self._storage_provider.get_object(path, byte_range=byte_range)
284 else:
285 # Full file read with cache
286 source_version = self._get_source_version(path)
287 data = self._cache_manager.read(path, source_version)
288
289 # Read from cache if the file exists
290 if self._is_cache_enabled():
291 if self._cache_manager is None:
292 raise RuntimeError("Cache manager is not initialized")
293 source_version = self._get_source_version(path)
294 data = self._cache_manager.read(path, source_version)
295
296 if data is None:
297 if self._replica_manager:
298 data = self._read_from_replica_or_primary(path)
299 else:
300 data = self._storage_provider.get_object(path)
301 self._cache_manager.set(path, data, source_version)
302 return data
303 elif self._replica_manager:
304 # No cache, but replicas available
305 return self._read_from_replica_or_primary(path)
306 else:
307 # No cache, no replicas - direct storage provider read
308 return self._storage_provider.get_object(path, byte_range=byte_range)
309
[docs]
310 def info(self, path: str, strict: bool = True) -> ObjectMetadata:
311 """
312 Retrieves metadata or information about an object stored at the specified path.
313
314 :param path: The logical path to the object for which metadata or information is being retrieved.
315 :param strict: If True, performs additional validation to determine whether the path refers to a directory.
316
317 :return: A dictionary containing metadata about the object.
318 """
319 if not path or path == ".": # for the empty path provided by the user
320 if self._is_posix_file_storage_provider():
321 last_modified = datetime.fromtimestamp(os.path.getmtime("."), tz=timezone.utc)
322 else:
323 last_modified = AWARE_DATETIME_MIN
324 return ObjectMetadata(key="", type="directory", content_length=0, last_modified=last_modified)
325
326 if not self._metadata_provider:
327 return self._storage_provider.get_object_metadata(path, strict=strict)
328
329 # For metadata_provider, first check if the path exists as a file, then fallback to detecting if path is a directory.
330 # TODO: Consider passing strict argument to the metadata provider.
331 try:
332 return self._metadata_provider.get_object_metadata(path)
333 except FileNotFoundError:
334 # Try listing from the parent to determine if path is a valid directory
335 parent = os.path.dirname(path.rstrip("/")) + "/"
336 parent = "" if parent == "/" else parent
337 target = path.rstrip("/") + "/"
338
339 try:
340 entries = self._metadata_provider.list_objects(parent, include_directories=True)
341 for entry in entries:
342 if entry.key == target and entry.type == "directory":
343 return entry
344 except Exception:
345 pass
346 raise # Raise original FileNotFoundError
347
348 @retry
349 def download_file(self, remote_path: str, local_path: Union[str, IO]) -> None:
350 """
351 Downloads a file to the local file system.
352
353 :param remote_path: The logical path of the file in the storage provider.
354 :param local_path: The local path where the file should be downloaded.
355 """
356 if self._metadata_provider:
357 resolved = self._metadata_provider.realpath(remote_path)
358 if not resolved.exists:
359 raise FileNotFoundError(f"The file at path '{remote_path}' was not found by metadata provider.")
360
361 metadata = self._metadata_provider.get_object_metadata(remote_path)
362 self._storage_provider.download_file(resolved.physical_path, local_path, metadata)
363 elif self._replica_manager:
364 self._replica_manager.download_from_replica_or_primary(remote_path, local_path, self._storage_provider)
365 else:
366 self._storage_provider.download_file(remote_path, local_path)
367
368 @retry
369 def upload_file(self, remote_path: str, local_path: str, attributes: Optional[dict[str, str]] = None) -> None:
370 """
371 Uploads a file from the local file system.
372
373 :param remote_path: The logical path where the file should be stored.
374 :param local_path: The local path of the file to upload.
375 :param attributes: The attributes to add to the file.
376 """
377 virtual_path = remote_path
378 if self._metadata_provider:
379 resolved = self._metadata_provider.realpath(remote_path)
380 if resolved.exists:
381 # File exists
382 if not self._metadata_provider.allow_overwrites():
383 raise FileExistsError(
384 f"The file at path '{virtual_path}' already exists; "
385 f"overwriting is not allowed when using a metadata provider."
386 )
387 # Generate path for overwrite (future: may return different path for versioning)
388 remote_path = self._metadata_provider.generate_physical_path(
389 remote_path, for_overwrite=True
390 ).physical_path
391 else:
392 # New file - generate path
393 remote_path = self._metadata_provider.generate_physical_path(
394 remote_path, for_overwrite=False
395 ).physical_path
396
397 # if metdata provider is present, we only write attributes to the metadata provider
398 self._storage_provider.upload_file(remote_path, local_path, attributes=None)
399
400 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
401 obj_metadata = self._storage_provider.get_object_metadata(remote_path)
402 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
403 with self._metadata_provider_lock or contextlib.nullcontext():
404 self._metadata_provider.add_file(virtual_path, obj_metadata)
405 else:
406 self._storage_provider.upload_file(remote_path, local_path, attributes)
407
408 @retry
409 def write(self, path: str, body: bytes, attributes: Optional[dict[str, str]] = None) -> None:
410 """
411 Writes an object at the specified path.
412
413 :param path: The logical path where the object should be written.
414 :param body: The content to write to the object.
415 :param attributes: The attributes to add to the file.
416 """
417 virtual_path = path
418 if self._metadata_provider:
419 resolved = self._metadata_provider.realpath(path)
420 if resolved.exists:
421 # File exists
422 if not self._metadata_provider.allow_overwrites():
423 raise FileExistsError(
424 f"The file at path '{virtual_path}' already exists; "
425 f"overwriting is not allowed when using a metadata provider."
426 )
427 # Generate path for overwrite (future: may return different path for versioning)
428 path = self._metadata_provider.generate_physical_path(path, for_overwrite=True).physical_path
429 else:
430 # New file - generate path
431 path = self._metadata_provider.generate_physical_path(path, for_overwrite=False).physical_path
432
433 # if metadata provider is present, we only write attributes to the metadata provider
434 self._storage_provider.put_object(path, body, attributes=None)
435
436 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
437 obj_metadata = self._storage_provider.get_object_metadata(path)
438 obj_metadata.metadata = (obj_metadata.metadata or {}) | (attributes or {})
439 with self._metadata_provider_lock or contextlib.nullcontext():
440 self._metadata_provider.add_file(virtual_path, obj_metadata)
441 else:
442 self._storage_provider.put_object(path, body, attributes=attributes)
443
[docs]
444 def copy(self, src_path: str, dest_path: str) -> None:
445 """
446 Copies an object from source to destination path.
447
448 :param src_path: The logical path of the source object to copy.
449 :param dest_path: The logical path of the destination.
450 """
451 virtual_dest_path = dest_path
452 if self._metadata_provider:
453 # Source: must exist
454 src_resolved = self._metadata_provider.realpath(src_path)
455 if not src_resolved.exists:
456 raise FileNotFoundError(f"The file at path '{src_path}' was not found.")
457 src_path = src_resolved.physical_path
458
459 # Destination: check for overwrites
460 dest_resolved = self._metadata_provider.realpath(dest_path)
461 if dest_resolved.exists:
462 # Destination exists
463 if not self._metadata_provider.allow_overwrites():
464 raise FileExistsError(
465 f"The file at path '{virtual_dest_path}' already exists; "
466 f"overwriting is not allowed when using a metadata provider."
467 )
468 # Generate path for overwrite
469 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=True).physical_path
470 else:
471 # New file - generate path
472 dest_path = self._metadata_provider.generate_physical_path(dest_path, for_overwrite=False).physical_path
473
474 self._storage_provider.copy_object(src_path, dest_path)
475
476 # TODO(NGCDP-3016): Handle eventual consistency of Swiftstack, without wait.
477 obj_metadata = self._storage_provider.get_object_metadata(dest_path)
478 with self._metadata_provider_lock or contextlib.nullcontext():
479 self._metadata_provider.add_file(virtual_dest_path, obj_metadata)
480 else:
481 self._storage_provider.copy_object(src_path, dest_path)
482
[docs]
483 def delete(self, path: str, recursive: bool = False) -> None:
484 """
485 Deletes an object at the specified path.
486
487 :param path: The logical path of the object or directory to delete.
488 :param recursive: Whether to delete objects in the path recursively.
489 """
490 obj_metadata = self.info(path)
491 is_dir = obj_metadata and obj_metadata.type == "directory"
492 is_file = obj_metadata and obj_metadata.type == "file"
493 if recursive and is_dir:
494 self.sync_from(
495 cast(StorageClient, NullStorageClient()),
496 path,
497 path,
498 delete_unmatched_files=True,
499 num_worker_processes=1,
500 description="Deleting",
501 )
502 # If this is a posix storage provider, we need to also delete remaining directory stubs.
503 # TODO: Nofity metadata provider for the changes.
504 if self._is_posix_file_storage_provider():
505 posix_storage_provider = cast(PosixFileStorageProvider, self._storage_provider)
506 posix_storage_provider.rmtree(path)
507 return
508 else:
509 # 1) If path is a file: delete the file
510 # 2) If path is a directory: raise an error to prompt the user to use the recursive flag
511 if is_file:
512 virtual_path = path
513 if self._metadata_provider:
514 resolved = self._metadata_provider.realpath(path)
515 if not resolved.exists:
516 raise FileNotFoundError(f"The file at path '{virtual_path}' was not found.")
517
518 self._storage_provider.delete_object(resolved.physical_path)
519
520 with self._metadata_provider_lock or contextlib.nullcontext():
521 self._metadata_provider.remove_file(virtual_path)
522 else:
523 self._storage_provider.delete_object(path)
524
525 # Delete the cached file if it exists
526 if self._is_cache_enabled():
527 if self._cache_manager is None:
528 raise RuntimeError("Cache manager is not initialized")
529 self._cache_manager.delete(virtual_path)
530
531 # Delete from replicas if replica manager exists
532 if self._replica_manager:
533 self._replica_manager.delete_from_replicas(virtual_path)
534 elif is_dir:
535 raise ValueError(f"'{path}' is a directory. Set recursive=True to delete entire directory.")
536 else:
537 raise FileNotFoundError(f"The file at '{path}' was not found.")
538
[docs]
539 def glob(
540 self,
541 pattern: str,
542 include_url_prefix: bool = False,
543 attribute_filter_expression: Optional[str] = None,
544 ) -> list[str]:
545 """
546 Matches and retrieves a list of objects in the storage provider that
547 match the specified pattern.
548
549 :param pattern: The pattern to match object paths against, supporting wildcards (e.g., ``*.txt``).
550 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
551 :param attribute_filter_expression: The attribute filter expression to apply to the result.
552
553 :return: A list of object paths that match the pattern.
554 """
555 if self._metadata_provider:
556 results = self._metadata_provider.glob(pattern, attribute_filter_expression)
557 else:
558 results = self._storage_provider.glob(pattern, attribute_filter_expression)
559
560 if include_url_prefix:
561 results = [join_paths(f"{MSC_PROTOCOL}{self._config.profile}", path) for path in results]
562
563 return results
564
[docs]
565 def list(
566 self,
567 prefix: str = "",
568 path: str = "",
569 start_after: Optional[str] = None,
570 end_at: Optional[str] = None,
571 include_directories: bool = False,
572 include_url_prefix: bool = False,
573 attribute_filter_expression: Optional[str] = None,
574 show_attributes: bool = False,
575 follow_symlinks: bool = True,
576 ) -> Iterator[ObjectMetadata]:
577 """
578 Lists objects in the storage provider under the specified path.
579
580 **IMPORTANT**: Use the ``path`` parameter for new code. The ``prefix`` parameter is
581 deprecated and will be removed in a future version.
582
583 :param prefix: [DEPRECATED] Use ``path`` instead. The prefix to list objects under.
584 :param path: The directory or file path to list objects under. This should be a
585 complete filesystem path (e.g., "my-bucket/documents/" or "data/2024/").
586 Cannot be used together with ``prefix``.
587 :param start_after: The key to start after (i.e. exclusive). An object with this key doesn't have to exist.
588 :param end_at: The key to end at (i.e. inclusive). An object with this key doesn't have to exist.
589 :param include_directories: Whether to include directories in the result. When True, directories are returned alongside objects.
590 :param include_url_prefix: Whether to include the URL prefix ``msc://profile`` in the result.
591 :param attribute_filter_expression: The attribute filter expression to apply to the result.
592 :param show_attributes: Whether to return attributes in the result. WARNING: Depend on implementation, there might be performance impact if this set to True.
593 :param follow_symlinks: Whether to follow symbolic links. Only applicable for POSIX file storage providers. When False, symlinks are skipped during listing.
594
595 :return: An iterator over objects.
596
597 :raises ValueError: If both ``path`` and ``prefix`` parameters are provided (both non-empty).
598 """
599 # Parameter validation - either path or prefix, not both
600 if path and prefix:
601 raise ValueError(
602 f"Cannot specify both 'path' ({path!r}) and 'prefix' ({prefix!r}). "
603 f"Please use only the 'path' parameter for new code. "
604 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
605 )
606 elif prefix:
607 logger.debug(
608 f"The 'prefix' parameter is deprecated and will be removed in a future version. "
609 f"Please use the 'path' parameter instead. "
610 f"Migration guide: Replace list(prefix={prefix!r}) with list(path={prefix!r})"
611 )
612
613 # Use path if provided, otherwise fall back to prefix
614 effective_path = path if path else prefix
615
616 if effective_path:
617 if self.is_file(effective_path):
618 try:
619 object_metadata = self.info(effective_path)
620 if include_url_prefix:
621 if self.is_default_profile():
622 object_metadata.key = str(PurePosixPath("/") / object_metadata.key)
623 else:
624 object_metadata.key = join_paths(
625 f"{MSC_PROTOCOL}{self._config.profile}", object_metadata.key
626 )
627 yield object_metadata # short circuit if the path is a file
628 return
629 except FileNotFoundError:
630 pass
631 else:
632 effective_path = effective_path.rstrip("/") + "/"
633
634 if self._metadata_provider:
635 objects = self._metadata_provider.list_objects(
636 effective_path,
637 start_after=start_after,
638 end_at=end_at,
639 include_directories=include_directories,
640 attribute_filter_expression=attribute_filter_expression,
641 show_attributes=show_attributes,
642 )
643 else:
644 objects = self._storage_provider.list_objects(
645 effective_path,
646 start_after=start_after,
647 end_at=end_at,
648 include_directories=include_directories,
649 attribute_filter_expression=attribute_filter_expression,
650 show_attributes=show_attributes,
651 follow_symlinks=follow_symlinks,
652 )
653
654 for object in objects:
655 if include_url_prefix:
656 if self.is_default_profile():
657 object.key = str(PurePosixPath("/") / object.key)
658 else:
659 object.key = join_paths(f"{MSC_PROTOCOL}{self._config.profile}", object.key)
660 yield object
661
[docs]
662 def open(
663 self,
664 path: str,
665 mode: str = "rb",
666 buffering: int = -1,
667 encoding: Optional[str] = None,
668 disable_read_cache: bool = False,
669 memory_load_limit: int = MEMORY_LOAD_LIMIT,
670 atomic: bool = True,
671 check_source_version: SourceVersionCheckMode = SourceVersionCheckMode.INHERIT,
672 attributes: Optional[dict[str, str]] = None,
673 prefetch_file: bool = True,
674 ) -> Union[PosixFile, ObjectFile]:
675 """
676 Returns a file-like object from the specified path.
677
678 :param path: The logical path of the object to read.
679 :param mode: The file mode, only "w", "r", "a", "wb", "rb" and "ab" are supported.
680 :param buffering: The buffering mode. Only applies to PosixFile.
681 :param encoding: The encoding to use for text files.
682 :param disable_read_cache: When set to True, disables caching for the file content.
683 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
684 :param memory_load_limit: Size limit in bytes for loading files into memory. Defaults to 512MB.
685 This parameter is only applicable to ObjectFile when the mode is "r" or "rb".
686 :param atomic: When set to True, the file will be written atomically (rename upon close).
687 This parameter is only applicable to PosixFile in write mode.
688 :param check_source_version: Whether to check the source version of cached objects.
689 :param attributes: The attributes to add to the file. This parameter is only applicable when the mode is "w" or "wb" or "a" or "ab".
690
691 :return: A file-like object (PosixFile or ObjectFile) for the specified path.
692 """
693 if self._is_posix_file_storage_provider():
694 return PosixFile(
695 self, path=path, mode=mode, buffering=buffering, encoding=encoding, atomic=atomic, attributes=attributes
696 )
697 else:
698 if atomic is False:
699 logger.warning("Non-atomic writes are not supported for object storage providers.")
700
701 return ObjectFile(
702 self,
703 remote_path=path,
704 mode=mode,
705 encoding=encoding,
706 disable_read_cache=disable_read_cache,
707 memory_load_limit=memory_load_limit,
708 check_source_version=check_source_version,
709 attributes=attributes,
710 prefetch_file=prefetch_file,
711 )
712
[docs]
713 def get_posix_path(self, path: str) -> Optional[str]:
714 """
715 Returns the physical POSIX filesystem path for POSIX storage providers.
716
717 :param path: The path to resolve (may be a symlink or virtual path).
718 :return: Physical POSIX filesystem path if POSIX storage, None otherwise.
719 """
720 if not self._is_posix_file_storage_provider():
721 return None
722
723 if self._metadata_provider:
724 resolved = self._metadata_provider.realpath(path)
725 realpath = resolved.physical_path
726 else:
727 realpath = path
728
729 return cast(PosixFileStorageProvider, self._storage_provider)._prepend_base_path(realpath)
730
[docs]
731 def is_file(self, path: str) -> bool:
732 """
733 Checks whether the specified path points to a file (rather than a directory or folder).
734
735 :param path: The logical path to check.
736
737 :return: ``True`` if the path points to a file, ``False`` otherwise.
738 """
739 if self._metadata_provider:
740 resolved = self._metadata_provider.realpath(path)
741 return resolved.exists
742
743 return self._storage_provider.is_file(path)
744
766
[docs]
767 def is_empty(self, path: str) -> bool:
768 """
769 Checks whether the specified path is empty. A path is considered empty if there are no
770 objects whose keys start with the given path as a prefix.
771
772 :param path: The logical path to check. This is typically a prefix representing a directory or folder.
773
774 :return: ``True`` if no objects exist under the specified path prefix, ``False`` otherwise.
775 """
776 if self._metadata_provider:
777 objects = self._metadata_provider.list_objects(path)
778 else:
779 objects = self._storage_provider.list_objects(path)
780
781 try:
782 return next(objects) is None
783 except StopIteration:
784 pass
785
786 return True
787
[docs]
788 def sync_from(
789 self,
790 source_client: "StorageClient",
791 source_path: str = "",
792 target_path: str = "",
793 delete_unmatched_files: bool = False,
794 description: str = "Syncing",
795 num_worker_processes: Optional[int] = None,
796 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
797 patterns: Optional[PatternList] = None,
798 preserve_source_attributes: bool = False,
799 follow_symlinks: bool = True,
800 source_files: Optional[List[str]] = None,
801 ignore_hidden: bool = True,
802 ) -> None:
803 """
804 Syncs files from the source storage client to "path/".
805
806 :param source_client: The source storage client.
807 :param source_path: The logical path to sync from.
808 :param target_path: The logical path to sync to.
809 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
810 :param description: Description of sync process for logging purposes.
811 :param num_worker_processes: The number of worker processes to use.
812 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
813 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
814 Cannot be used together with source_files.
815 :param preserve_source_attributes: Whether to preserve source file metadata attributes during synchronization.
816 When False (default), only file content is copied. When True, custom metadata attributes are also preserved.
817
818 .. warning::
819 **Performance Impact**: When enabled without a ``metadata_provider`` configured, this will make a HEAD
820 request for each object to retrieve attributes, which can significantly impact performance on large-scale
821 sync operations. For production use at scale, configure a ``metadata_provider`` in your storage profile.
822
823 :param follow_symlinks: If the source StorageClient is PosixFile, whether to follow symbolic links. Default is True.
824 :param source_files: Optional list of file paths (relative to source_path) to sync. When provided, only these
825 specific files will be synced, skipping enumeration of the source path. Cannot be used together with patterns.
826 :param ignore_hidden: Whether to ignore hidden files and directories. Default is True.
827 :raises ValueError: If both source_files and patterns are provided.
828 """
829 if source_files and patterns:
830 raise ValueError("Cannot specify both 'source_files' and 'patterns'. Please use only one filtering method.")
831
832 pattern_matcher = PatternMatcher(patterns) if patterns else None
833
834 # Disable the replica manager during sync
835 if not isinstance(source_client, NullStorageClient) and source_client._replica_manager:
836 source_client = StorageClient(source_client._config)
837 source_client._replica_manager = None
838
839 m = SyncManager(source_client, source_path, self, target_path)
840
841 m.sync_objects(
842 execution_mode=execution_mode,
843 description=description,
844 num_worker_processes=num_worker_processes,
845 delete_unmatched_files=delete_unmatched_files,
846 pattern_matcher=pattern_matcher,
847 preserve_source_attributes=preserve_source_attributes,
848 follow_symlinks=follow_symlinks,
849 source_files=source_files,
850 ignore_hidden=ignore_hidden,
851 )
852
[docs]
853 def sync_replicas(
854 self,
855 source_path: str,
856 replica_indices: Optional[List[int]] = None,
857 delete_unmatched_files: bool = False,
858 description: str = "Syncing replica",
859 num_worker_processes: Optional[int] = None,
860 execution_mode: ExecutionMode = ExecutionMode.LOCAL,
861 patterns: Optional[PatternList] = None,
862 ignore_hidden: bool = True,
863 ) -> None:
864 """
865 Sync files from the source storage client to target replicas.
866
867 :param source_path: The logical path to sync from.
868 :param replica_indices: Specify the indices of the replicas to sync to. If not provided, all replicas will be synced. Index starts from 0.
869 :param delete_unmatched_files: Whether to delete files at the target that are not present at the source.
870 :param description: Description of sync process for logging purposes.
871 :param num_worker_processes: The number of worker processes to use.
872 :param execution_mode: The execution mode to use. Currently supports "local" and "ray".
873 :param patterns: PatternList for include/exclude filtering. If None, all files are included.
874 :param ignore_hidden: Whether to ignore hidden files and directories (starting with dot). Default is True.
875 """
876 if not self._replicas:
877 logger.warning(
878 "No replicas found in profile '%s'. Add a 'replicas' section to your profile configuration to enable "
879 "secondary storage locations for redundancy and performance.",
880 self._config.profile,
881 )
882 return None
883
884 if replica_indices:
885 try:
886 replicas = [self._replicas[i] for i in replica_indices]
887 except IndexError as e:
888 raise ValueError(f"Replica index out of range: {replica_indices}") from e
889 else:
890 replicas = self._replicas
891
892 # Disable the replica manager during sync
893 if self._replica_manager:
894 source_client = StorageClient(self._config)
895 source_client._replica_manager = None
896 else:
897 source_client = self
898
899 for replica in replicas:
900 replica.sync_from(
901 source_client,
902 source_path,
903 source_path,
904 delete_unmatched_files=delete_unmatched_files,
905 description=f"{description} ({replica.profile})",
906 num_worker_processes=num_worker_processes,
907 execution_mode=execution_mode,
908 patterns=patterns,
909 ignore_hidden=ignore_hidden,
910 )