Source code for multistorageclient.providers.posix_file

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import glob
 17import json
 18import logging
 19import os
 20import shutil
 21import tempfile
 22from collections.abc import Callable, Iterator
 23from datetime import datetime, timezone
 24from enum import Enum
 25from io import BytesIO, StringIO
 26from typing import IO, Any, Optional, TypeVar, Union
 27
 28import xattr
 29
 30from ..telemetry import Telemetry
 31from ..types import AWARE_DATETIME_MIN, ObjectMetadata, Range, SymlinkHandling
 32from ..utils import (
 33    create_attribute_filter_evaluator,
 34    matches_attribute_filter_expression,
 35    safe_makedirs,
 36    validate_attributes,
 37)
 38from .base import BaseStorageProvider
 39
 40_T = TypeVar("_T")
 41
 42PROVIDER = "file"
 43READ_CHUNK_SIZE = 8192
 44
 45logger = logging.getLogger(__name__)
 46
 47
 48class _EntryType(Enum):
 49    """
 50    An enum representing the type of an entry in a directory.
 51    """
 52
 53    FILE = 1
 54    DIRECTORY = 2
 55    DIRECTORY_TO_EXPLORE = 3
 56    SYMLINK = 4
 57
 58
[docs] 59def atomic_write(source: Union[str, IO], destination: str, attributes: Optional[dict[str, str]] = None): 60 """ 61 Writes the contents of a file to the specified destination path. 62 63 This function ensures that the file write operation is atomic, meaning the output file is either fully written or not modified at all. 64 This is achieved by writing to a temporary file first and then renaming it to the destination path. 65 66 :param source: The input file to read from. It can be a string representing the path to a file, or an open file-like object (IO). 67 :param destination: The path to the destination file where the contents should be written. 68 :param attributes: The attributes to set on the file. 69 """ 70 71 with tempfile.NamedTemporaryFile(mode="wb", delete=False, dir=os.path.dirname(destination), prefix=".") as fp: 72 temp_file_path = fp.name 73 if isinstance(source, str): 74 with open(source, mode="rb") as src: 75 while chunk := src.read(READ_CHUNK_SIZE): 76 fp.write(chunk) 77 else: 78 while chunk := source.read(READ_CHUNK_SIZE): 79 fp.write(chunk) 80 81 # Set attributes on temp file if provided 82 validated_attributes = validate_attributes(attributes) 83 if validated_attributes: 84 try: 85 xattr.setxattr(temp_file_path, "user.json", json.dumps(validated_attributes).encode("utf-8")) 86 except OSError as e: 87 logger.debug(f"Failed to set extended attributes on temp file {temp_file_path}: {e}") 88 89 os.rename(src=temp_file_path, dst=destination)
90 91
[docs] 92class PosixFileStorageProvider(BaseStorageProvider): 93 """ 94 A concrete implementation of the :py:class:`multistorageclient.types.StorageProvider` for interacting with POSIX file systems. 95 """ 96 97 def __init__( 98 self, 99 base_path: str, 100 config_dict: Optional[dict[str, Any]] = None, 101 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 102 **kwargs: Any, 103 ) -> None: 104 """ 105 :param base_path: The root prefix path within the POSIX file system where all operations will be scoped. 106 :param config_dict: Resolved MSC config. 107 :param telemetry_provider: A function that provides a telemetry instance. 108 """ 109 110 # Validate POSIX path 111 if base_path == "": 112 base_path = "/" 113 114 if not base_path.startswith("/"): 115 raise ValueError(f"The base_path {base_path} must be an absolute path.") 116 117 super().__init__( 118 base_path=base_path, 119 provider_name=PROVIDER, 120 config_dict=config_dict, 121 telemetry_provider=telemetry_provider, 122 ) 123 124 def _translate_errors( 125 self, 126 func: Callable[[], _T], 127 operation: str, 128 path: str, 129 ) -> _T: 130 """ 131 Translates errors like timeouts and client errors. 132 133 :param func: The function that performs the actual file operation. 134 :param operation: The type of operation being performed (e.g., "PUT", "GET", "DELETE"). 135 :param path: The path to the object. 136 137 :return: The result of the file operation, typically the return value of the `func` callable. 138 """ 139 try: 140 return func() 141 except FileNotFoundError: 142 raise 143 except Exception as error: 144 raise RuntimeError(f"Failed to {operation} object(s) at {path}, error: {error}") from error 145 146 def _put_object( 147 self, 148 path: str, 149 body: bytes, 150 if_match: Optional[str] = None, 151 if_none_match: Optional[str] = None, 152 attributes: Optional[dict[str, str]] = None, 153 ) -> int: 154 def _invoke_api() -> int: 155 safe_makedirs(os.path.dirname(path)) 156 atomic_write(source=BytesIO(body), destination=path, attributes=attributes) 157 return len(body) 158 159 return self._translate_errors(_invoke_api, operation="PUT", path=path) 160 161 def _get_object(self, path: str, byte_range: Optional[Range] = None) -> bytes: 162 def _invoke_api() -> bytes: 163 if byte_range: 164 with open(path, "rb") as f: 165 f.seek(byte_range.offset) 166 return f.read(byte_range.size) 167 else: 168 with open(path, "rb") as f: 169 return f.read() 170 171 return self._translate_errors(_invoke_api, operation="GET", path=path) 172 173 def _copy_object(self, src_path: str, dest_path: str) -> int: 174 src_object = self._get_object_metadata(src_path) 175 176 def _invoke_api() -> int: 177 safe_makedirs(os.path.dirname(dest_path)) 178 atomic_write(source=src_path, destination=dest_path, attributes=src_object.metadata) 179 180 return src_object.content_length 181 182 return self._translate_errors(_invoke_api, operation="COPY", path=src_path) 183 184 def _delete_object(self, path: str, if_match: Optional[str] = None) -> None: 185 def _invoke_api() -> None: 186 if os.path.exists(path) and os.path.isfile(path): 187 os.remove(path) 188 189 return self._translate_errors(_invoke_api, operation="DELETE", path=path) 190 191 def _make_symlink(self, path: str, target: str) -> None: 192 def _invoke_api() -> None: 193 safe_makedirs(os.path.dirname(path)) 194 relative_target = ObjectMetadata.encode_symlink_target(path, target) 195 if os.path.lexists(path): 196 os.remove(path) 197 os.symlink(relative_target, path) 198 199 self._translate_errors(_invoke_api, operation="SYMLINK", path=path) 200 201 def _get_object_metadata(self, path: str, strict: bool = True) -> ObjectMetadata: 202 is_dir = os.path.isdir(path) 203 if is_dir: 204 path = self._append_delimiter(path) 205 206 def _invoke_api() -> ObjectMetadata: 207 metadata_dict = {} 208 try: 209 json_bytes = xattr.getxattr(path, "user.json") 210 metadata_dict = json.loads(json_bytes.decode("utf-8")) 211 except (OSError, IOError, KeyError, json.JSONDecodeError, AttributeError) as e: 212 logger.debug(f"Failed to read extended attributes from {path}: {e}") 213 pass 214 215 # ``os.readlink`` may return an absolute path; normalise to the 216 # parent-relative form used by every backend. 217 symlink_target: Optional[str] = None 218 if os.path.islink(path): 219 raw = os.readlink(path) 220 symlink_target = ObjectMetadata.encode_symlink_target(path, raw) if os.path.isabs(raw) else raw 221 222 return ObjectMetadata( 223 key=path, 224 type="directory" if is_dir else "file", 225 content_length=0 if is_dir else os.path.getsize(path), 226 last_modified=datetime.fromtimestamp(os.path.getmtime(path), tz=timezone.utc), 227 metadata=metadata_dict if metadata_dict else None, 228 symlink_target=symlink_target, 229 ) 230 231 return self._translate_errors(_invoke_api, operation="HEAD", path=path) 232 233 def _list_objects( 234 self, 235 path: str, 236 start_after: Optional[str] = None, 237 end_at: Optional[str] = None, 238 include_directories: bool = False, 239 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW, 240 ) -> Iterator[ObjectMetadata]: 241 start_after = os.path.relpath(start_after, self._base_path) if start_after else None 242 end_at = os.path.relpath(end_at, self._base_path) if end_at else None 243 244 def _invoke_api() -> Iterator[ObjectMetadata]: 245 if os.path.isfile(path): 246 yield ObjectMetadata( 247 key=os.path.relpath(path, self._base_path), 248 content_length=os.path.getsize(path), 249 last_modified=datetime.fromtimestamp(os.path.getmtime(path), tz=timezone.utc), 250 ) 251 dir_path = path.rstrip("/") + "/" 252 if not os.path.isdir(dir_path): 253 return 254 255 yield from self._explore_directory(dir_path, start_after, end_at, include_directories, symlink_handling) 256 257 return self._translate_errors(_invoke_api, operation="LIST", path=path) 258 259 @property 260 def supports_parallel_listing(self) -> bool: 261 """ 262 Whether this provider supports heap-based parallel recursive listing. 263 264 :return: ``True`` for POSIX file storage. 265 """ 266 return True 267 268 def _shallow_list(self, path: str, symlink_handling: SymlinkHandling) -> tuple[list[str], list[ObjectMetadata]]: 269 """ 270 Adapt POSIX relative listing keys to the full-key contract expected by the recursive listing heap. 271 """ 272 prefixes: list[str] = [] 273 objects: list[ObjectMetadata] = [] 274 275 for item in self._list_objects(path, include_directories=True, symlink_handling=symlink_handling): 276 full_key = self._prepend_base_path(item.key) 277 if item.type == "directory" and item.symlink_target is None: 278 child_prefix = full_key + "/" 279 if child_prefix != path.rstrip("/") + "/": 280 prefixes.append(child_prefix) 281 else: 282 item.key = full_key 283 objects.append(item) 284 285 return prefixes, objects 286 287 def _explore_directory( 288 self, 289 dir_path: str, 290 start_after: Optional[str], 291 end_at: Optional[str], 292 include_directories: bool, 293 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW, 294 ) -> Iterator[ObjectMetadata]: 295 """ 296 Recursively explore a directory and yield objects in lexicographical order. 297 298 :param dir_path: The directory path to explore 299 :param start_after: The key to start after 300 :param end_at: The key to end at 301 :param include_directories: Whether to include directories in the result 302 :param symlink_handling: How to handle symbolic links during listing. 303 """ 304 try: 305 dir_entries = os.listdir(dir_path) 306 dir_entries.sort() 307 308 entries: list[tuple[str, str, _EntryType]] = [] 309 symlink_info: dict[str, tuple[str, str]] = {} 310 311 for entry in dir_entries: 312 full_path = os.path.join(dir_path, entry) 313 is_link = os.path.islink(full_path) 314 315 if is_link and symlink_handling == SymlinkHandling.SKIP: 316 continue 317 318 if is_link and symlink_handling == SymlinkHandling.PRESERVE: 319 relative_path = os.path.relpath(full_path, self._base_path) 320 321 real_target = os.path.realpath(full_path) 322 if not os.path.exists(real_target): 323 raise FileNotFoundError( 324 f"Broken symlink '{relative_path}' points to a missing target " 325 f"({full_path} -> {real_target}). Use symlink_handling=SKIP to ignore." 326 ) 327 328 try: 329 target_key = os.path.relpath(real_target, self._base_path) 330 except ValueError: 331 target_key = None 332 333 if target_key is None or target_key.startswith(".."): 334 raise ValueError( 335 f"Symlink '{relative_path}' points outside the base directory " 336 f"({full_path} -> {real_target}). Use symlink_handling=FOLLOW " 337 f"to dereference or symlink_handling=SKIP to ignore." 338 ) 339 340 relative_target = ObjectMetadata.encode_symlink_target(full_path, real_target) 341 342 target_type = "directory" if os.path.isdir(full_path) else "file" 343 344 if (start_after is None or start_after < relative_path) and ( 345 end_at is None or relative_path <= end_at 346 ): 347 entries.append((relative_path, full_path, _EntryType.SYMLINK)) 348 symlink_info[relative_path] = (relative_target, target_type) 349 continue 350 351 if is_link and symlink_handling == SymlinkHandling.FOLLOW: 352 # Broken symlinks have no target to dereference: isfile/isdir both return 353 # False, so the entry would be silently dropped. Fail fast instead. 354 real_target = os.path.realpath(full_path) 355 if not os.path.exists(real_target): 356 relative_path = os.path.relpath(full_path, self._base_path) 357 raise FileNotFoundError( 358 f"Broken symlink '{relative_path}' points to a missing target " 359 f"({full_path} -> {real_target}). Use symlink_handling=SKIP to ignore." 360 ) 361 362 relative_path = os.path.relpath(full_path, self._base_path) 363 364 if (start_after is None or start_after < relative_path) and (end_at is None or relative_path <= end_at): 365 if os.path.isfile(full_path): 366 entries.append((relative_path, full_path, _EntryType.FILE)) 367 elif os.path.isdir(full_path): 368 if include_directories: 369 entries.append((relative_path, full_path, _EntryType.DIRECTORY)) 370 else: 371 entries.append((relative_path, full_path, _EntryType.DIRECTORY_TO_EXPLORE)) 372 373 # Sort keys must mirror the keys S3 ``list_objects_v2`` would return so POSIX 374 # listings stay in the same raw-UTF-8-byte order. For directories (expanded or 375 # returned as-is) the emitted keys live under ``<name>/``, so the trailing 376 # delimiter must be part of the sort key. Otherwise the bare name ``a`` sorts 377 # before sibling file ``a.txt`` (since ``""`` < ``".txt"``), but S3 orders the 378 # nested key ``a/b.txt`` *after* ``a.txt`` because ``.`` (0x2E) < ``/`` (0x2F). 379 def _sort_key(entry: tuple[str, str, _EntryType]) -> str: 380 relative, _, entry_type = entry 381 if entry_type in (_EntryType.DIRECTORY, _EntryType.DIRECTORY_TO_EXPLORE): 382 return relative + "/" 383 return relative 384 385 entries.sort(key=_sort_key) 386 387 for relative_path, full_path, entry_type in entries: 388 if entry_type == _EntryType.FILE: 389 yield ObjectMetadata( 390 key=relative_path, 391 content_length=os.path.getsize(full_path), 392 last_modified=datetime.fromtimestamp(os.path.getmtime(full_path), tz=timezone.utc), 393 ) 394 elif entry_type == _EntryType.DIRECTORY: 395 yield ObjectMetadata( 396 key=relative_path, 397 content_length=0, 398 type="directory", 399 last_modified=AWARE_DATETIME_MIN, 400 ) 401 elif entry_type == _EntryType.DIRECTORY_TO_EXPLORE: 402 yield from self._explore_directory( 403 full_path, start_after, end_at, include_directories, symlink_handling 404 ) 405 elif entry_type == _EntryType.SYMLINK: 406 relative_target, target_type = symlink_info[relative_path] 407 yield ObjectMetadata( 408 key=relative_path, 409 content_length=0, 410 last_modified=datetime.fromtimestamp(os.path.getmtime(full_path), tz=timezone.utc), 411 type=target_type, 412 symlink_target=relative_target, 413 ) 414 415 except FileNotFoundError: 416 raise 417 except (OSError, PermissionError) as e: 418 logger.warning(f"Failed to list contents of {dir_path}, caused by: {e}") 419 return 420 421 def _upload_file(self, remote_path: str, f: Union[str, IO], attributes: Optional[dict[str, str]] = None) -> int: 422 safe_makedirs(os.path.dirname(remote_path)) 423 424 filesize: int = 0 425 if isinstance(f, str): 426 filesize = os.path.getsize(f) 427 elif isinstance(f, StringIO): 428 filesize = len(f.getvalue().encode("utf-8")) 429 else: 430 filesize = len(f.getvalue()) # type: ignore 431 432 def _invoke_api() -> int: 433 atomic_write(source=f, destination=remote_path, attributes=attributes) 434 435 return filesize 436 437 return self._translate_errors(_invoke_api, operation="PUT", path=remote_path) 438 439 def _download_file(self, remote_path: str, f: Union[str, IO], metadata: Optional[ObjectMetadata] = None) -> int: 440 filesize = metadata.content_length if metadata else os.path.getsize(remote_path) 441 442 if isinstance(f, str): 443 444 def _invoke_api() -> int: 445 if os.path.dirname(f): 446 safe_makedirs(os.path.dirname(f)) 447 atomic_write(source=remote_path, destination=f) 448 449 return filesize 450 451 return self._translate_errors(_invoke_api, operation="GET", path=remote_path) 452 elif isinstance(f, StringIO): 453 454 def _invoke_api() -> int: 455 with open(remote_path, "r", encoding="utf-8") as src: 456 while chunk := src.read(READ_CHUNK_SIZE): 457 f.write(chunk) 458 459 return filesize 460 461 return self._translate_errors(_invoke_api, operation="GET", path=remote_path) 462 else: 463 464 def _invoke_api() -> int: 465 with open(remote_path, "rb") as src: 466 while chunk := src.read(READ_CHUNK_SIZE): 467 f.write(chunk) 468 469 return filesize 470 471 return self._translate_errors(_invoke_api, operation="GET", path=remote_path) 472
[docs] 473 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]: 474 pattern = self._prepend_base_path(pattern) 475 keys = list(glob.glob(pattern, recursive=True)) 476 if attribute_filter_expression: 477 filtered_keys = [] 478 evaluator = create_attribute_filter_evaluator(attribute_filter_expression) 479 for key in keys: 480 obj_metadata = self._get_object_metadata(key) 481 if matches_attribute_filter_expression(obj_metadata, evaluator): 482 filtered_keys.append(key) 483 keys = filtered_keys 484 if self._base_path == "/": 485 return keys 486 else: 487 # NOTE: PosixStorageProvider does not have the concept of bucket and prefix. 488 # So we drop the base_path from it. 489 return [key.replace(self._base_path, "", 1).lstrip("/") for key in keys]
490
[docs] 491 def is_file(self, path: str) -> bool: 492 path = self._prepend_base_path(path) 493 return os.path.isfile(path)
494
[docs] 495 def rmtree(self, path: str) -> None: 496 path = self._prepend_base_path(path) 497 shutil.rmtree(path)