1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import logging
17import os
18import stat
19from pathlib import Path, PurePosixPath
20from typing import Union
21
22from .client import StorageClient
23from .shortcuts import resolve_storage_client
24from .types import MSC_PROTOCOL, ObjectMetadata, SourceVersionCheckMode
25from .utils import join_paths
26
27logger = logging.Logger(__name__)
28
29
[docs]
30class StatResult:
31 """
32 A stat-like result object that mimics os.stat_result for remote storage paths.
33
34 This class provides the same interface as os.stat_result but is populated
35 from ObjectMetadata obtained from storage providers.
36 """
37
38 def __init__(self, metadata: ObjectMetadata):
39 """Initialize StatResult from ObjectMetadata."""
40 # File type and mode bits
41 if metadata.type == "directory":
42 # Directory: 0o755 (rwxr-xr-x) + S_IFDIR
43 self.st_mode = stat.S_IFDIR | 0o755
44 else:
45 # Regular file: 0o644 (rw-r--r--) + S_IFREG
46 self.st_mode = stat.S_IFREG | 0o644
47
48 # File size
49 self.st_size = metadata.content_length
50
51 # Timestamps - convert datetime to epoch seconds
52 mtime = metadata.last_modified.timestamp()
53 self.st_mtime = mtime
54 self.st_atime = mtime
55 self.st_ctime = mtime
56
57 # Nanosecond precision timestamps
58 mtime_ns = int(mtime * 1_000_000_000)
59 self.st_mtime_ns = mtime_ns
60 self.st_atime_ns = mtime_ns
61 self.st_ctime_ns = mtime_ns
62
63 # Default values for fields we don't have from storage providers
64 self.st_ino = 0
65 self.st_dev = 0
66 self.st_nlink = 1
67 self.st_uid = os.getuid() if hasattr(os, "getuid") else 0 # User ID
68 self.st_gid = os.getgid() if hasattr(os, "getgid") else 0 # Group ID
69
70
[docs]
71class MultiStoragePath:
72 """
73 A path object similar to pathlib.Path that supports both local and remote file systems.
74
75 MultiStoragePath provides a unified interface for working with paths across different storage systems,
76 including local files, S3, GCS, Azure Blob Storage, and more. It uses the "msc://" protocol
77 prefix to identify remote storage paths.
78
79 This implementation is based on Python 3.9's pathlib.Path interface, providing compatible behavior
80 for local filesystem operations while extending support to remote storage systems.
81
82 Examples:
83 >>> import multistorageclient as msc
84 >>> msc.Path("/local/path/file.txt")
85 >>> msc.Path("msc://my-profile/data/file.txt")
86 >>> msc.Path(pathlib.Path("relative/path"))
87 """
88
89 _internal_path: PurePosixPath
90 _storage_client: StorageClient
91 _path: str
92
93 def __init__(self, path: Union[str, os.PathLike]):
94 """
95 Initialize path object supporting multiple storage backends.
96
97 :param path: String, Path, or MultiStoragePath. Relative paths are automatically converted to absolute.
98 """
99 self._path = str(path)
100 self._storage_client, relative_path = resolve_storage_client(self._path)
101 self._internal_path = PurePosixPath(relative_path)
102
103 if self._storage_client.is_default_profile():
104 self._internal_path = PurePosixPath("/") / self._internal_path
105
106 def __str__(self) -> str:
107 if self._storage_client.is_default_profile():
108 return str(self._internal_path)
109 return join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(self._internal_path))
110
111 def __repr__(self) -> str:
112 return f"MultiStoragePath({str(self)!r})"
113
114 def __eq__(self, other) -> bool:
115 if not isinstance(other, MultiStoragePath):
116 return False
117 return (
118 self._storage_client.profile == other._storage_client.profile
119 and self._internal_path == other._internal_path
120 )
121
122 def __fspath__(self) -> str:
123 return str(self)
124
[docs]
125 def joinpath(self, *pathsegments):
126 return self.with_segments(*pathsegments)
127
128 def __truediv__(self, key):
129 try:
130 return self.joinpath(key)
131 except TypeError:
132 return NotImplemented
133
134 def __rtruediv__(self, key):
135 try:
136 return self.with_segments(key, self)
137 except TypeError:
138 return NotImplemented
139
140 @property
141 def anchor(self) -> str:
142 """
143 The concatenation of the drive and root, or ''.
144 """
145 return self._internal_path.anchor
146
147 @property
148 def name(self) -> str:
149 """
150 The final path component, if any.
151 """
152 return self._internal_path.name
153
154 @property
155 def suffix(self) -> str:
156 """
157 The final path component, if any.
158 """
159 return self._internal_path.suffix
160
161 @property
162 def suffixes(self) -> list[str]:
163 """
164 A list of the final component's suffixes, if any.
165
166 These include the leading periods. For example: ['.tar', '.gz']
167 """
168 return self._internal_path.suffixes
169
170 @property
171 def stem(self) -> str:
172 """
173 The final path component, minus its last suffix.
174 """
175 return self._internal_path.stem
176
177 @property
178 def parent(self) -> "MultiStoragePath":
179 """
180 The logical parent of the path.
181 """
182 parent_path = self._internal_path.parent
183 if self._storage_client.is_default_profile():
184 return MultiStoragePath(str(parent_path))
185 return MultiStoragePath(join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(parent_path)))
186
187 @property
188 def parents(self) -> list["MultiStoragePath"]:
189 """
190 A sequence of this path's logical parents.
191 """
192 if self._storage_client.is_default_profile():
193 return [MultiStoragePath(str(p)) for p in self._internal_path.parents]
194 else:
195 return [
196 MultiStoragePath(join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(p)))
197 for p in self._internal_path.parents
198 ]
199
200 @property
201 def parts(self):
202 """
203 An object providing sequence-like access to the components in the filesystem path (does not
204 include the msc:// and the profile name).
205 """
206 return self._internal_path.parts
207
[docs]
208 def as_posix(self) -> str:
209 """
210 Return the string representation of the path with forward (/) slashes.
211
212 If the path is a remote path, the file content is downloaded to local storage
213 (either cached or temporary file) and the local filesystem path is returned.
214 This enables access to remote file content through standard filesystem operations.
215 """
216 if self._storage_client.is_default_profile():
217 return self._internal_path.as_posix()
218
219 # Return the local path of the file
220 with self._storage_client.open(str(self._internal_path), mode="rb") as fp:
221 return fp.resolve_filesystem_path()
222
[docs]
223 def is_absolute(self) -> bool:
224 """
225 Paths are always absolute.
226 """
227 return True
228
[docs]
229 def is_relative_to(self, other: "MultiStoragePath") -> bool:
230 """
231 Return True if the path is relative to another path or False.
232 """
233 return isinstance(other, MultiStoragePath) and self._internal_path.is_relative_to(other._internal_path)
234
[docs]
235 def is_reserved(self) -> bool:
236 if self._storage_client.is_default_profile():
237 return self._internal_path.is_reserved()
238 raise NotImplementedError("MultiStoragePath.is_reserved() is unsupported for remote storage paths")
239
[docs]
240 def match(self, pattern) -> bool:
241 """
242 Return True if this path matches the given pattern.
243 """
244 return Path(self._internal_path).match(pattern)
245
[docs]
246 def relative_to(self, other: "MultiStoragePath") -> "MultiStoragePath":
247 """
248 Not implemented.
249 """
250 raise NotImplementedError("MultiStoragePath.relative_to() is unsupported")
251
[docs]
252 def with_name(self, name: str) -> "MultiStoragePath":
253 """
254 Return a new path with the file name changed.
255 """
256 if self._storage_client.is_default_profile():
257 return MultiStoragePath(str(self._internal_path.with_name(name)))
258 else:
259 return MultiStoragePath(
260 join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(self._internal_path.with_name(name)))
261 )
262
[docs]
263 def with_stem(self, stem: str) -> "MultiStoragePath":
264 """
265 Return a new path with the stem changed.
266 """
267 if self._storage_client.is_default_profile():
268 return MultiStoragePath(str(self._internal_path.with_stem(stem)))
269 else:
270 return MultiStoragePath(
271 join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(self._internal_path.with_stem(stem)))
272 )
273
[docs]
274 def with_suffix(self, suffix: str) -> "MultiStoragePath":
275 """
276 Return a new path with the file suffix changed. If the path has no suffix, add given suffix.
277 If the given suffix is an empty string, remove the suffix from the path.
278 """
279 if self._storage_client.is_default_profile():
280 return MultiStoragePath(str(self._internal_path.with_suffix(suffix)))
281 else:
282 return MultiStoragePath(
283 join_paths(
284 f"{MSC_PROTOCOL}{self._storage_client.profile}", str(self._internal_path.with_suffix(suffix))
285 )
286 )
287
[docs]
288 def with_segments(self, *pathsegments) -> "MultiStoragePath":
289 """
290 Construct a new path object from any number of path-like objects.
291 """
292 if self._storage_client.is_default_profile():
293 new_path = self._internal_path.joinpath(*pathsegments)
294 return MultiStoragePath(str(new_path))
295 else:
296 new_path = self._internal_path.joinpath(*pathsegments)
297 return MultiStoragePath(join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(new_path)))
298
299 # Expanding and resolving paths
300
[docs]
301 @classmethod
302 def home(cls):
303 """
304 Return a new path pointing to the user's home directory.
305 """
306 return Path.home()
307
[docs]
308 def expanduser(self):
309 """
310 Return a new path with expanded ~ and ~user constructs (as returned by os.path.expanduser).
311
312 Not supported for remote storage paths.
313 """
314 if self._storage_client.is_default_profile():
315 return Path(self._internal_path).expanduser()
316 raise NotImplementedError("MultiStoragePath.expanduser() is unsupported for remote storage paths")
317
[docs]
318 @classmethod
319 def cwd(cls):
320 """
321 Return a new path pointing to the current working directory.
322 """
323 return Path.cwd()
324
[docs]
325 def absolute(self):
326 """
327 Return the path itself since it is always absolute.
328 """
329 return self
330
[docs]
331 def resolve(self, strict=False):
332 """
333 Return the absolute path.
334 """
335 if self._storage_client.is_default_profile():
336 return MultiStoragePath(str(Path(self._internal_path).resolve(strict=strict)))
337 return MultiStoragePath(join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(self._internal_path)))
338
[docs]
339 def readlink(self):
340 """
341 Return the path to which the symbolic link points.
342
343 Not supported for remote storage paths.
344 """
345 if self._storage_client.is_default_profile():
346 return MultiStoragePath(str(Path(self._internal_path).readlink()))
347 raise NotImplementedError("MultiStoragePath.readlink() is unsupported for remote storage paths")
348
349 # Querying file type and status
350
[docs]
351 def stat(self):
352 """
353 Return the result of the stat() system call on this path, like os.stat() does.
354
355 If the path is a remote path, the result is a :py:class:`multistorageclient.pathlib.StatResult` object.
356 """
357 if self._storage_client.is_default_profile():
358 return Path(self._internal_path).stat()
359 info = self._storage_client.info(str(self._internal_path))
360 return StatResult(info)
361
[docs]
362 def lstat(self):
363 """
364 Like stat(), except if the path points to a symlink, the symlink's status information
365 is returned, rather than its target's.
366
367 If the path is a remote path, the result is a :py:class:`multistorageclient.pathlib.StatResult` object.
368 """
369 if self._storage_client.is_default_profile():
370 return Path(self._internal_path).lstat()
371 info = self._storage_client.info(str(self._internal_path))
372 return StatResult(info)
373
[docs]
374 def exists(self) -> bool:
375 """
376 Return True if the path exists.
377 """
378 if self._storage_client.is_default_profile():
379 return Path(self._internal_path).exists()
380 else:
381 try:
382 self._storage_client.info(str(self._internal_path))
383 return True
384 except FileNotFoundError:
385 return False
386
[docs]
387 def is_file(self, strict: bool = True) -> bool:
388 """
389 Return True if the path exists and is a regular file.
390 """
391 if self._storage_client.is_default_profile():
392 return Path(self._internal_path).is_file()
393 else:
394 try:
395 # If the path ends with a "/", assume it is a directory.
396 path = str(self._internal_path)
397 if path.endswith("/"):
398 return False
399
400 meta = self._storage_client.info(path, strict=strict)
401 return meta.type == "file"
402 except FileNotFoundError:
403 return False
404 except Exception as e:
405 logger.warning("Error occurred while fetching file info at %s, caused by: %s", self._internal_path, e)
406 return False
407
[docs]
408 def is_dir(self, strict: bool = True) -> bool:
409 """
410 Return True if the path exists and is a directory.
411 """
412 if self._storage_client.is_default_profile():
413 return Path(self._internal_path).is_dir()
414 else:
415 try:
416 # If the path does not end with a "/", append it to ensure the path is a directory.
417 path = str(self._internal_path)
418 if not path.endswith("/"):
419 path += "/"
420
421 meta = self._storage_client.info(path, strict=strict)
422 return meta.type == "directory"
423 except FileNotFoundError:
424 return False
425 except Exception as e:
426 logger.warning("Error occurred while fetching file info at %s, caused by: %s", self._internal_path, e)
427 return False
428
[docs]
429 def is_symlink(self):
430 """
431 Return True if the path exists and is a symbolic link.
432
433 Not supported for remote storage paths.
434 """
435 if self._storage_client.is_default_profile():
436 return Path(self._internal_path).is_symlink()
437 raise NotImplementedError("MultiStoragePath.is_symlink() is unsupported for remote storage paths")
438
[docs]
439 def is_mount(self):
440 """
441 Return True if the path exists and is a mount point.
442
443 Not supported for remote storage paths.
444 """
445 if self._storage_client.is_default_profile():
446 return Path(self._internal_path).is_mount()
447 raise NotImplementedError("MultiStoragePath.is_mount() is unsupported for remote storage paths")
448
[docs]
449 def is_socket(self):
450 """
451 Return True if the path exists and is a socket.
452
453 Not supported for remote storage paths.
454 """
455 if self._storage_client.is_default_profile():
456 return Path(self._internal_path).is_socket()
457 raise NotImplementedError("MultiStoragePath.is_socket() is unsupported for remote storage paths")
458
[docs]
459 def is_fifo(self):
460 """
461 Return True if the path exists and is a FIFO.
462
463 Not supported for remote storage paths.
464 """
465 if self._storage_client.is_default_profile():
466 return Path(self._internal_path).is_fifo()
467 raise NotImplementedError("MultiStoragePath.is_fifo() is unsupported for remote storage paths")
468
[docs]
469 def is_block_device(self):
470 """
471 Return True if the path exists and is a block device.
472
473 Not supported for remote storage paths.
474 """
475 if self._storage_client.is_default_profile():
476 return Path(self._internal_path).is_block_device()
477 raise NotImplementedError("MultiStoragePath.is_block_device() is unsupported for remote storage paths")
478
[docs]
479 def is_char_device(self):
480 """
481 Return True if the path exists and is a character device.
482
483 Not supported for remote storage paths.
484 """
485 if self._storage_client.is_default_profile():
486 return Path(self._internal_path).is_char_device()
487 raise NotImplementedError("MultiStoragePath.is_char_device() is unsupported for remote storage paths")
488
[docs]
489 def samefile(self, other_path):
490 """
491 Return True if both paths point to the same file or directory.
492
493 Not supported for remote storage paths.
494 """
495 if self._storage_client.is_default_profile():
496 return Path(self._internal_path).samefile(other_path)
497 return self == other_path
498
499 # Reading and writing files
500
[docs]
501 def open(
502 self,
503 mode="r",
504 buffering=-1,
505 encoding=None,
506 errors=None,
507 newline=None,
508 check_source_version=SourceVersionCheckMode.INHERIT,
509 ):
510 """
511 Open the file and return a file object.
512 """
513 return self._storage_client.open(
514 str(self._internal_path),
515 mode=mode,
516 buffering=buffering,
517 encoding=encoding,
518 check_source_version=check_source_version,
519 )
520
[docs]
521 def read_bytes(self) -> bytes:
522 """
523 Open the file in bytes mode, read it, and close the file.
524 """
525 return self._storage_client.read(str(self._internal_path))
526
[docs]
527 def read_text(self, encoding: str = "utf-8", errors: str = "strict") -> str:
528 """
529 Open the file in text mode, read it, and close the file.
530 """
531 return self._storage_client.read(str(self._internal_path)).decode(encoding)
532
[docs]
533 def write_bytes(self, data: bytes) -> None:
534 """
535 Open the file in bytes mode, write to it, and close the file.
536 """
537 self._storage_client.write(str(self._internal_path), data)
538
[docs]
539 def write_text(self, data: str, encoding: str = "utf-8", errors: str = "strict") -> None:
540 """
541 Open the file in text mode, write to it, and close the file.
542 """
543 self._storage_client.write(str(self._internal_path), data.encode(encoding))
544
545 # Reading directories
546
[docs]
547 def iterdir(self):
548 """
549 Yield path objects of the directory contents.
550 """
551 if self._storage_client.is_default_profile():
552 for item in Path(self._internal_path).iterdir():
553 yield MultiStoragePath(str(item))
554 else:
555 path = str(self._internal_path)
556 if not path.endswith("/"):
557 path += "/"
558 for item in self._storage_client.list(path, include_directories=True, include_url_prefix=True):
559 yield MultiStoragePath(item.key)
560
[docs]
561 def glob(self, pattern):
562 """
563 Iterate over this subtree and yield all existing files (of any kind, including directories)
564 matching the given relative pattern.
565 """
566 if self._storage_client.is_default_profile():
567 return [MultiStoragePath(str(p)) for p in Path(self._internal_path).glob(pattern)]
568 else:
569 return [
570 MultiStoragePath(str(p))
571 for p in self._storage_client.glob(str(self._internal_path / pattern), include_url_prefix=True)
572 ]
573
[docs]
574 def rglob(self, pattern):
575 """
576 Recursively yield all existing files (of any kind, including directories) matching the
577 given relative pattern, anywhere in this subtree.
578 """
579 if self._storage_client.is_default_profile():
580 return [MultiStoragePath(str(p)) for p in Path(self._internal_path).rglob(pattern)]
581 else:
582 recursive_pattern = f"**/{pattern}"
583 return [
584 MultiStoragePath(str(p))
585 for p in self._storage_client.glob(
586 str(self._internal_path / recursive_pattern), include_url_prefix=True
587 )
588 ]
589
[docs]
590 def walk(self, top_down=True, on_error=None, follow_symlinks=False):
591 """
592 Walk the directory tree from this directory, similar to os.walk().
593
594 Not supported for remote storage paths.
595 """
596 if self._storage_client.is_default_profile():
597 return Path(self._internal_path).walk(top_down, on_error, follow_symlinks) # pyright: ignore[reportAttributeAccessIssue]
598 raise NotImplementedError("MultiStoragePath.walk() is unsupported for remote storage paths")
599
600 # Creating files and directories
601
[docs]
602 def touch(self, mode=0o666, exist_ok=False):
603 """
604 Create this file with the given access mode, if it doesn't exist.
605 """
606 if self._storage_client.is_default_profile():
607 Path(self._internal_path).touch(mode, exist_ok)
608 else:
609 if self.exists():
610 # object storage does not support updating the last modified time of a object without writing the object
611 logger.warning("MultiStoragePath.touch() is not supported for remote storage paths")
612 else:
613 self._storage_client.write(str(self._internal_path), b"")
614
[docs]
615 def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None:
616 """
617 Create a new directory at this given path.
618
619 For remote storage paths, this operation is a no-op.
620 """
621 if self._storage_client.is_default_profile():
622 Path(self._internal_path).mkdir(mode, parents, exist_ok)
623
[docs]
624 def symlink_to(self, target, target_is_directory=False):
625 """
626 Make this path a symlink pointing to the target path.
627
628 Not supported for remote storage paths.
629 """
630 if self._storage_client.is_default_profile():
631 Path(self._internal_path).symlink_to(target, target_is_directory)
632 else:
633 raise NotImplementedError("MultiStoragePath.symlink_to() is unsupported for remote storage paths")
634
635 # Renaming and deleting
636
[docs]
637 def rename(self, target) -> "MultiStoragePath":
638 """
639 Rename this path to the target path.
640 """
641 if not isinstance(target, MultiStoragePath):
642 target = MultiStoragePath(target)
643
644 if self._storage_client.is_default_profile():
645 Path(self._internal_path).rename(str(target._internal_path))
646 else:
647 # Note: This operation is not atomic, and the target path must be a single file.
648 self._storage_client.copy(str(self._internal_path), str(target._internal_path))
649 self._storage_client.delete(str(self._internal_path))
650
651 return target
652
[docs]
653 def replace(self, target):
654 """
655 Rename this path to the target path, overwriting if that path exists.
656
657 Not supported for remote storage paths.
658 """
659 if self._storage_client.is_default_profile():
660 Path(self._internal_path).replace(target)
661 else:
662 raise NotImplementedError("MultiStoragePath.replace() is unsupported for remote storage paths")
663
[docs]
664 def unlink(self, missing_ok: bool = False) -> None:
665 """
666 Remove this file or link. If the path is a directory, use rmdir() instead.
667 """
668 if self._storage_client.is_default_profile():
669 Path(self._internal_path).unlink(missing_ok=missing_ok)
670 else:
671 try:
672 self._storage_client.delete(str(self._internal_path))
673 except FileNotFoundError:
674 if not missing_ok:
675 raise
676
[docs]
677 def rmdir(self) -> None:
678 """
679 Remove this directory. The directory must be empty.
680
681 Not supported for remote storage paths.
682 """
683 if self._storage_client.is_default_profile():
684 Path(self._internal_path).rmdir()
685 else:
686 raise NotImplementedError("MultiStoragePath.rmdir() is unsupported for remote storage paths")
687
688 # Permissions and ownership
689
[docs]
690 def owner(self):
691 """
692 Return the login name of the file owner.
693
694 Not supported for remote storage paths.
695 """
696 if self._storage_client.is_default_profile():
697 return Path(self._internal_path).owner()
698 raise NotImplementedError("MultiStoragePath.owner() is unsupported for remote storage paths")
699
[docs]
700 def group(self):
701 """
702 Return the group name of the file gid.
703
704 Not supported for remote storage paths.
705 """
706 if self._storage_client.is_default_profile():
707 return Path(self._internal_path).group()
708 raise NotImplementedError("MultiStoragePath.group() is unsupported for remote storage paths")
709
[docs]
710 def chmod(self, mode):
711 """
712 Change the permissions of the path, like os.chmod().
713
714 Not supported for remote storage paths.
715 """
716 if self._storage_client.is_default_profile():
717 Path(self._internal_path).chmod(mode)
718 else:
719 raise NotImplementedError("MultiStoragePath.chmod() is unsupported for remote storage paths")
720
[docs]
721 def lchmod(self, mode):
722 """
723 Like chmod(), except if the path points to a symlink, the symlink's permissions are changed, rather
724 than its target's.
725
726 Not supported for remote storage paths.
727 """
728 if self._storage_client.is_default_profile():
729 Path(self._internal_path).lchmod(mode)
730 else:
731 raise NotImplementedError("MultiStoragePath.lchmod() is unsupported for remote storage paths")