Source code for multistorageclient.pathlib

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import logging
 17import os
 18import stat
 19from pathlib import Path, PurePosixPath
 20from typing import Union
 21
 22from .client import StorageClient
 23from .shortcuts import resolve_storage_client
 24from .types import MSC_PROTOCOL, ObjectMetadata, SourceVersionCheckMode
 25from .utils import join_paths
 26
 27logger = logging.Logger(__name__)
 28
 29
[docs] 30class StatResult: 31 """ 32 A stat-like result object that mimics os.stat_result for remote storage paths. 33 34 This class provides the same interface as os.stat_result but is populated 35 from ObjectMetadata obtained from storage providers. 36 """ 37 38 def __init__(self, metadata: ObjectMetadata): 39 """Initialize StatResult from ObjectMetadata.""" 40 # File type and mode bits 41 if metadata.type == "directory": 42 # Directory: 0o755 (rwxr-xr-x) + S_IFDIR 43 self.st_mode = stat.S_IFDIR | 0o755 44 else: 45 # Regular file: 0o644 (rw-r--r--) + S_IFREG 46 self.st_mode = stat.S_IFREG | 0o644 47 48 # File size 49 self.st_size = metadata.content_length 50 51 # Timestamps - convert datetime to epoch seconds 52 mtime = metadata.last_modified.timestamp() 53 self.st_mtime = mtime 54 self.st_atime = mtime 55 self.st_ctime = mtime 56 57 # Nanosecond precision timestamps 58 mtime_ns = int(mtime * 1_000_000_000) 59 self.st_mtime_ns = mtime_ns 60 self.st_atime_ns = mtime_ns 61 self.st_ctime_ns = mtime_ns 62 63 # Default values for fields we don't have from storage providers 64 self.st_ino = 0 65 self.st_dev = 0 66 self.st_nlink = 1 67 self.st_uid = os.getuid() if hasattr(os, "getuid") else 0 # User ID 68 self.st_gid = os.getgid() if hasattr(os, "getgid") else 0 # Group ID
69 70
[docs] 71class MultiStoragePath: 72 """ 73 A path object similar to pathlib.Path that supports both local and remote file systems. 74 75 MultiStoragePath provides a unified interface for working with paths across different storage systems, 76 including local files, S3, GCS, Azure Blob Storage, and more. It uses the "msc://" protocol 77 prefix to identify remote storage paths. 78 79 This implementation is based on Python 3.9's pathlib.Path interface, providing compatible behavior 80 for local filesystem operations while extending support to remote storage systems. 81 82 Examples: 83 >>> import multistorageclient as msc 84 >>> msc.Path("/local/path/file.txt") 85 >>> msc.Path("msc://my-profile/data/file.txt") 86 >>> msc.Path(pathlib.Path("relative/path")) 87 """ 88 89 _internal_path: PurePosixPath 90 _storage_client: StorageClient 91 _path: str 92 93 def __init__(self, path: Union[str, os.PathLike]): 94 """ 95 Initialize path object supporting multiple storage backends. 96 97 :param path: String, Path, or MultiStoragePath. Relative paths are automatically converted to absolute. 98 """ 99 self._path = str(path) 100 self._storage_client, relative_path = resolve_storage_client(self._path) 101 self._internal_path = PurePosixPath(relative_path) 102 103 if self._storage_client.is_default_profile(): 104 self._internal_path = PurePosixPath("/") / self._internal_path 105 106 def __str__(self) -> str: 107 if self._storage_client.is_default_profile(): 108 return str(self._internal_path) 109 return join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(self._internal_path)) 110 111 def __repr__(self) -> str: 112 return f"MultiStoragePath({str(self)!r})" 113 114 def __eq__(self, other) -> bool: 115 if not isinstance(other, MultiStoragePath): 116 return False 117 return ( 118 self._storage_client.profile == other._storage_client.profile 119 and self._internal_path == other._internal_path 120 ) 121 122 def __fspath__(self) -> str: 123 return str(self) 124
[docs] 125 def joinpath(self, *pathsegments): 126 return self.with_segments(*pathsegments)
127 128 def __truediv__(self, key): 129 try: 130 return self.joinpath(key) 131 except TypeError: 132 return NotImplemented 133 134 def __rtruediv__(self, key): 135 try: 136 return self.with_segments(key, self) 137 except TypeError: 138 return NotImplemented 139 140 @property 141 def anchor(self) -> str: 142 """ 143 The concatenation of the drive and root, or ''. 144 """ 145 return self._internal_path.anchor 146 147 @property 148 def name(self) -> str: 149 """ 150 The final path component, if any. 151 """ 152 return self._internal_path.name 153 154 @property 155 def suffix(self) -> str: 156 """ 157 The final path component, if any. 158 """ 159 return self._internal_path.suffix 160 161 @property 162 def suffixes(self) -> list[str]: 163 """ 164 A list of the final component's suffixes, if any. 165 166 These include the leading periods. For example: ['.tar', '.gz'] 167 """ 168 return self._internal_path.suffixes 169 170 @property 171 def stem(self) -> str: 172 """ 173 The final path component, minus its last suffix. 174 """ 175 return self._internal_path.stem 176 177 @property 178 def parent(self) -> "MultiStoragePath": 179 """ 180 The logical parent of the path. 181 """ 182 parent_path = self._internal_path.parent 183 if self._storage_client.is_default_profile(): 184 return MultiStoragePath(str(parent_path)) 185 return MultiStoragePath(join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(parent_path))) 186 187 @property 188 def parents(self) -> list["MultiStoragePath"]: 189 """ 190 A sequence of this path's logical parents. 191 """ 192 if self._storage_client.is_default_profile(): 193 return [MultiStoragePath(str(p)) for p in self._internal_path.parents] 194 else: 195 return [ 196 MultiStoragePath(join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(p))) 197 for p in self._internal_path.parents 198 ] 199 200 @property 201 def parts(self): 202 """ 203 An object providing sequence-like access to the components in the filesystem path (does not 204 include the msc:// and the profile name). 205 """ 206 return self._internal_path.parts 207
[docs] 208 def as_posix(self) -> str: 209 """ 210 Return the string representation of the path with forward (/) slashes. 211 212 If the path is a remote path, the file content is downloaded to local storage 213 (either cached or temporary file) and the local filesystem path is returned. 214 This enables access to remote file content through standard filesystem operations. 215 """ 216 if self._storage_client.is_default_profile(): 217 return self._internal_path.as_posix() 218 219 # Return the local path of the file 220 with self._storage_client.open(str(self._internal_path), mode="rb") as fp: 221 return fp.resolve_filesystem_path()
222
[docs] 223 def is_absolute(self) -> bool: 224 """ 225 Paths are always absolute. 226 """ 227 return True
228
[docs] 229 def is_relative_to(self, other: "MultiStoragePath") -> bool: 230 """ 231 Return True if the path is relative to another path or False. 232 """ 233 return isinstance(other, MultiStoragePath) and self._internal_path.is_relative_to(other._internal_path)
234
[docs] 235 def is_reserved(self) -> bool: 236 if self._storage_client.is_default_profile(): 237 return self._internal_path.is_reserved() 238 raise NotImplementedError("MultiStoragePath.is_reserved() is unsupported for remote storage paths")
239
[docs] 240 def match(self, pattern) -> bool: 241 """ 242 Return True if this path matches the given pattern. 243 """ 244 return Path(self._internal_path).match(pattern)
245
[docs] 246 def relative_to(self, other: "MultiStoragePath") -> "MultiStoragePath": 247 """ 248 Not implemented. 249 """ 250 raise NotImplementedError("MultiStoragePath.relative_to() is unsupported")
251
[docs] 252 def with_name(self, name: str) -> "MultiStoragePath": 253 """ 254 Return a new path with the file name changed. 255 """ 256 if self._storage_client.is_default_profile(): 257 return MultiStoragePath(str(self._internal_path.with_name(name))) 258 else: 259 return MultiStoragePath( 260 join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(self._internal_path.with_name(name))) 261 )
262
[docs] 263 def with_stem(self, stem: str) -> "MultiStoragePath": 264 """ 265 Return a new path with the stem changed. 266 """ 267 if self._storage_client.is_default_profile(): 268 return MultiStoragePath(str(self._internal_path.with_stem(stem))) 269 else: 270 return MultiStoragePath( 271 join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(self._internal_path.with_stem(stem))) 272 )
273
[docs] 274 def with_suffix(self, suffix: str) -> "MultiStoragePath": 275 """ 276 Return a new path with the file suffix changed. If the path has no suffix, add given suffix. 277 If the given suffix is an empty string, remove the suffix from the path. 278 """ 279 if self._storage_client.is_default_profile(): 280 return MultiStoragePath(str(self._internal_path.with_suffix(suffix))) 281 else: 282 return MultiStoragePath( 283 join_paths( 284 f"{MSC_PROTOCOL}{self._storage_client.profile}", str(self._internal_path.with_suffix(suffix)) 285 ) 286 )
287
[docs] 288 def with_segments(self, *pathsegments) -> "MultiStoragePath": 289 """ 290 Construct a new path object from any number of path-like objects. 291 """ 292 if self._storage_client.is_default_profile(): 293 new_path = self._internal_path.joinpath(*pathsegments) 294 return MultiStoragePath(str(new_path)) 295 else: 296 new_path = self._internal_path.joinpath(*pathsegments) 297 return MultiStoragePath(join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(new_path)))
298 299 # Expanding and resolving paths 300
[docs] 301 @classmethod 302 def home(cls): 303 """ 304 Return a new path pointing to the user's home directory. 305 """ 306 return Path.home()
307
[docs] 308 def expanduser(self): 309 """ 310 Return a new path with expanded ~ and ~user constructs (as returned by os.path.expanduser). 311 312 Not supported for remote storage paths. 313 """ 314 if self._storage_client.is_default_profile(): 315 return Path(self._internal_path).expanduser() 316 raise NotImplementedError("MultiStoragePath.expanduser() is unsupported for remote storage paths")
317
[docs] 318 @classmethod 319 def cwd(cls): 320 """ 321 Return a new path pointing to the current working directory. 322 """ 323 return Path.cwd()
324
[docs] 325 def absolute(self): 326 """ 327 Return the path itself since it is always absolute. 328 """ 329 return self
330
[docs] 331 def resolve(self, strict=False): 332 """ 333 Return the absolute path. 334 """ 335 if self._storage_client.is_default_profile(): 336 return MultiStoragePath(str(Path(self._internal_path).resolve(strict=strict))) 337 return MultiStoragePath(join_paths(f"{MSC_PROTOCOL}{self._storage_client.profile}", str(self._internal_path)))
338 348 349 # Querying file type and status 350
[docs] 351 def stat(self): 352 """ 353 Return the result of the stat() system call on this path, like os.stat() does. 354 355 If the path is a remote path, the result is a :py:class:`multistorageclient.pathlib.StatResult` object. 356 """ 357 if self._storage_client.is_default_profile(): 358 return Path(self._internal_path).stat() 359 info = self._storage_client.info(str(self._internal_path)) 360 return StatResult(info)
361
[docs] 362 def lstat(self): 363 """ 364 Like stat(), except if the path points to a symlink, the symlink's status information 365 is returned, rather than its target's. 366 367 If the path is a remote path, the result is a :py:class:`multistorageclient.pathlib.StatResult` object. 368 """ 369 if self._storage_client.is_default_profile(): 370 return Path(self._internal_path).lstat() 371 info = self._storage_client.info(str(self._internal_path)) 372 return StatResult(info)
373
[docs] 374 def exists(self) -> bool: 375 """ 376 Return True if the path exists. 377 """ 378 if self._storage_client.is_default_profile(): 379 return Path(self._internal_path).exists() 380 else: 381 try: 382 self._storage_client.info(str(self._internal_path)) 383 return True 384 except FileNotFoundError: 385 return False
386
[docs] 387 def is_file(self, strict: bool = True) -> bool: 388 """ 389 Return True if the path exists and is a regular file. 390 """ 391 if self._storage_client.is_default_profile(): 392 return Path(self._internal_path).is_file() 393 else: 394 try: 395 # If the path ends with a "/", assume it is a directory. 396 path = str(self._internal_path) 397 if path.endswith("/"): 398 return False 399 400 meta = self._storage_client.info(path, strict=strict) 401 return meta.type == "file" 402 except FileNotFoundError: 403 return False 404 except Exception as e: 405 logger.warning("Error occurred while fetching file info at %s, caused by: %s", self._internal_path, e) 406 return False
407
[docs] 408 def is_dir(self, strict: bool = True) -> bool: 409 """ 410 Return True if the path exists and is a directory. 411 """ 412 if self._storage_client.is_default_profile(): 413 return Path(self._internal_path).is_dir() 414 else: 415 try: 416 # If the path does not end with a "/", append it to ensure the path is a directory. 417 path = str(self._internal_path) 418 if not path.endswith("/"): 419 path += "/" 420 421 meta = self._storage_client.info(path, strict=strict) 422 return meta.type == "directory" 423 except FileNotFoundError: 424 return False 425 except Exception as e: 426 logger.warning("Error occurred while fetching file info at %s, caused by: %s", self._internal_path, e) 427 return False
428 438
[docs] 439 def is_mount(self): 440 """ 441 Return True if the path exists and is a mount point. 442 443 Not supported for remote storage paths. 444 """ 445 if self._storage_client.is_default_profile(): 446 return Path(self._internal_path).is_mount() 447 raise NotImplementedError("MultiStoragePath.is_mount() is unsupported for remote storage paths")
448
[docs] 449 def is_socket(self): 450 """ 451 Return True if the path exists and is a socket. 452 453 Not supported for remote storage paths. 454 """ 455 if self._storage_client.is_default_profile(): 456 return Path(self._internal_path).is_socket() 457 raise NotImplementedError("MultiStoragePath.is_socket() is unsupported for remote storage paths")
458
[docs] 459 def is_fifo(self): 460 """ 461 Return True if the path exists and is a FIFO. 462 463 Not supported for remote storage paths. 464 """ 465 if self._storage_client.is_default_profile(): 466 return Path(self._internal_path).is_fifo() 467 raise NotImplementedError("MultiStoragePath.is_fifo() is unsupported for remote storage paths")
468
[docs] 469 def is_block_device(self): 470 """ 471 Return True if the path exists and is a block device. 472 473 Not supported for remote storage paths. 474 """ 475 if self._storage_client.is_default_profile(): 476 return Path(self._internal_path).is_block_device() 477 raise NotImplementedError("MultiStoragePath.is_block_device() is unsupported for remote storage paths")
478
[docs] 479 def is_char_device(self): 480 """ 481 Return True if the path exists and is a character device. 482 483 Not supported for remote storage paths. 484 """ 485 if self._storage_client.is_default_profile(): 486 return Path(self._internal_path).is_char_device() 487 raise NotImplementedError("MultiStoragePath.is_char_device() is unsupported for remote storage paths")
488
[docs] 489 def samefile(self, other_path): 490 """ 491 Return True if both paths point to the same file or directory. 492 493 Not supported for remote storage paths. 494 """ 495 if self._storage_client.is_default_profile(): 496 return Path(self._internal_path).samefile(other_path) 497 return self == other_path
498 499 # Reading and writing files 500
[docs] 501 def open( 502 self, 503 mode="r", 504 buffering=-1, 505 encoding=None, 506 errors=None, 507 newline=None, 508 check_source_version=SourceVersionCheckMode.INHERIT, 509 ): 510 """ 511 Open the file and return a file object. 512 """ 513 return self._storage_client.open( 514 str(self._internal_path), 515 mode=mode, 516 buffering=buffering, 517 encoding=encoding, 518 check_source_version=check_source_version, 519 )
520
[docs] 521 def read_bytes(self) -> bytes: 522 """ 523 Open the file in bytes mode, read it, and close the file. 524 """ 525 return self._storage_client.read(str(self._internal_path))
526
[docs] 527 def read_text(self, encoding: str = "utf-8", errors: str = "strict") -> str: 528 """ 529 Open the file in text mode, read it, and close the file. 530 """ 531 return self._storage_client.read(str(self._internal_path)).decode(encoding)
532
[docs] 533 def write_bytes(self, data: bytes) -> None: 534 """ 535 Open the file in bytes mode, write to it, and close the file. 536 """ 537 self._storage_client.write(str(self._internal_path), data)
538
[docs] 539 def write_text(self, data: str, encoding: str = "utf-8", errors: str = "strict") -> None: 540 """ 541 Open the file in text mode, write to it, and close the file. 542 """ 543 self._storage_client.write(str(self._internal_path), data.encode(encoding))
544 545 # Reading directories 546
[docs] 547 def iterdir(self): 548 """ 549 Yield path objects of the directory contents. 550 """ 551 if self._storage_client.is_default_profile(): 552 for item in Path(self._internal_path).iterdir(): 553 yield MultiStoragePath(str(item)) 554 else: 555 path = str(self._internal_path) 556 if not path.endswith("/"): 557 path += "/" 558 for item in self._storage_client.list(path, include_directories=True, include_url_prefix=True): 559 yield MultiStoragePath(item.key)
560
[docs] 561 def glob(self, pattern): 562 """ 563 Iterate over this subtree and yield all existing files (of any kind, including directories) 564 matching the given relative pattern. 565 """ 566 if self._storage_client.is_default_profile(): 567 return [MultiStoragePath(str(p)) for p in Path(self._internal_path).glob(pattern)] 568 else: 569 return [ 570 MultiStoragePath(str(p)) 571 for p in self._storage_client.glob(str(self._internal_path / pattern), include_url_prefix=True) 572 ]
573
[docs] 574 def rglob(self, pattern): 575 """ 576 Recursively yield all existing files (of any kind, including directories) matching the 577 given relative pattern, anywhere in this subtree. 578 """ 579 if self._storage_client.is_default_profile(): 580 return [MultiStoragePath(str(p)) for p in Path(self._internal_path).rglob(pattern)] 581 else: 582 recursive_pattern = f"**/{pattern}" 583 return [ 584 MultiStoragePath(str(p)) 585 for p in self._storage_client.glob( 586 str(self._internal_path / recursive_pattern), include_url_prefix=True 587 ) 588 ]
589
[docs] 590 def walk(self, top_down=True, on_error=None, follow_symlinks=False): 591 """ 592 Walk the directory tree from this directory, similar to os.walk(). 593 594 Not supported for remote storage paths. 595 """ 596 if self._storage_client.is_default_profile(): 597 return Path(self._internal_path).walk(top_down, on_error, follow_symlinks) # pyright: ignore[reportAttributeAccessIssue] 598 raise NotImplementedError("MultiStoragePath.walk() is unsupported for remote storage paths")
599 600 # Creating files and directories 601
[docs] 602 def touch(self, mode=0o666, exist_ok=False): 603 """ 604 Create this file with the given access mode, if it doesn't exist. 605 """ 606 if self._storage_client.is_default_profile(): 607 Path(self._internal_path).touch(mode, exist_ok) 608 else: 609 if self.exists(): 610 # object storage does not support updating the last modified time of a object without writing the object 611 logger.warning("MultiStoragePath.touch() is not supported for remote storage paths") 612 else: 613 self._storage_client.write(str(self._internal_path), b"")
614
[docs] 615 def mkdir(self, mode=0o777, parents=False, exist_ok=False) -> None: 616 """ 617 Create a new directory at this given path. 618 619 For remote storage paths, this operation is a no-op. 620 """ 621 if self._storage_client.is_default_profile(): 622 Path(self._internal_path).mkdir(mode, parents, exist_ok)
623 634 635 # Renaming and deleting 636
[docs] 637 def rename(self, target) -> "MultiStoragePath": 638 """ 639 Rename this path to the target path. 640 """ 641 if not isinstance(target, MultiStoragePath): 642 target = MultiStoragePath(target) 643 644 if self._storage_client.is_default_profile(): 645 Path(self._internal_path).rename(str(target._internal_path)) 646 else: 647 # Note: This operation is not atomic, and the target path must be a single file. 648 self._storage_client.copy(str(self._internal_path), str(target._internal_path)) 649 self._storage_client.delete(str(self._internal_path)) 650 651 return target
652
[docs] 653 def replace(self, target): 654 """ 655 Rename this path to the target path, overwriting if that path exists. 656 657 Not supported for remote storage paths. 658 """ 659 if self._storage_client.is_default_profile(): 660 Path(self._internal_path).replace(target) 661 else: 662 raise NotImplementedError("MultiStoragePath.replace() is unsupported for remote storage paths")
663 676
[docs] 677 def rmdir(self) -> None: 678 """ 679 Remove this directory. The directory must be empty. 680 681 Not supported for remote storage paths. 682 """ 683 if self._storage_client.is_default_profile(): 684 Path(self._internal_path).rmdir() 685 else: 686 raise NotImplementedError("MultiStoragePath.rmdir() is unsupported for remote storage paths")
687 688 # Permissions and ownership 689
[docs] 690 def owner(self): 691 """ 692 Return the login name of the file owner. 693 694 Not supported for remote storage paths. 695 """ 696 if self._storage_client.is_default_profile(): 697 return Path(self._internal_path).owner() 698 raise NotImplementedError("MultiStoragePath.owner() is unsupported for remote storage paths")
699
[docs] 700 def group(self): 701 """ 702 Return the group name of the file gid. 703 704 Not supported for remote storage paths. 705 """ 706 if self._storage_client.is_default_profile(): 707 return Path(self._internal_path).group() 708 raise NotImplementedError("MultiStoragePath.group() is unsupported for remote storage paths")
709
[docs] 710 def chmod(self, mode): 711 """ 712 Change the permissions of the path, like os.chmod(). 713 714 Not supported for remote storage paths. 715 """ 716 if self._storage_client.is_default_profile(): 717 Path(self._internal_path).chmod(mode) 718 else: 719 raise NotImplementedError("MultiStoragePath.chmod() is unsupported for remote storage paths")
720
[docs] 721 def lchmod(self, mode): 722 """ 723 Like chmod(), except if the path points to a symlink, the symlink's permissions are changed, rather 724 than its target's. 725 726 Not supported for remote storage paths. 727 """ 728 if self._storage_client.is_default_profile(): 729 Path(self._internal_path).lchmod(mode) 730 else: 731 raise NotImplementedError("MultiStoragePath.lchmod() is unsupported for remote storage paths")