Source code for multistorageclient.providers.posix_file

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import glob
 17import json
 18import logging
 19import os
 20import shutil
 21import tempfile
 22from collections.abc import Callable, Iterator
 23from datetime import datetime, timezone
 24from enum import Enum
 25from io import BytesIO, StringIO
 26from typing import IO, Any, Optional, TypeVar, Union
 27
 28import xattr
 29
 30from ..telemetry import Telemetry
 31from ..types import AWARE_DATETIME_MIN, ObjectMetadata, Range, SymlinkHandling
 32from ..utils import (
 33    create_attribute_filter_evaluator,
 34    matches_attribute_filter_expression,
 35    safe_makedirs,
 36    validate_attributes,
 37)
 38from .base import BaseStorageProvider
 39
 40_T = TypeVar("_T")
 41
 42PROVIDER = "file"
 43READ_CHUNK_SIZE = 8192
 44
 45logger = logging.getLogger(__name__)
 46
 47
 48class _EntryType(Enum):
 49    """
 50    An enum representing the type of an entry in a directory.
 51    """
 52
 53    FILE = 1
 54    DIRECTORY = 2
 55    DIRECTORY_TO_EXPLORE = 3
 56    SYMLINK = 4
 57
 58
[docs] 59def atomic_write(source: Union[str, IO], destination: str, attributes: Optional[dict[str, str]] = None): 60 """ 61 Writes the contents of a file to the specified destination path. 62 63 This function ensures that the file write operation is atomic, meaning the output file is either fully written or not modified at all. 64 This is achieved by writing to a temporary file first and then renaming it to the destination path. 65 66 :param source: The input file to read from. It can be a string representing the path to a file, or an open file-like object (IO). 67 :param destination: The path to the destination file where the contents should be written. 68 :param attributes: The attributes to set on the file. 69 """ 70 71 with tempfile.NamedTemporaryFile(mode="wb", delete=False, dir=os.path.dirname(destination), prefix=".") as fp: 72 temp_file_path = fp.name 73 if isinstance(source, str): 74 with open(source, mode="rb") as src: 75 while chunk := src.read(READ_CHUNK_SIZE): 76 fp.write(chunk) 77 else: 78 while chunk := source.read(READ_CHUNK_SIZE): 79 fp.write(chunk) 80 81 # Set attributes on temp file if provided 82 validated_attributes = validate_attributes(attributes) 83 if validated_attributes: 84 try: 85 xattr.setxattr(temp_file_path, "user.json", json.dumps(validated_attributes).encode("utf-8")) 86 except OSError as e: 87 logger.debug(f"Failed to set extended attributes on temp file {temp_file_path}: {e}") 88 89 os.rename(src=temp_file_path, dst=destination)
90 91
[docs] 92class PosixFileStorageProvider(BaseStorageProvider): 93 """ 94 A concrete implementation of the :py:class:`multistorageclient.types.StorageProvider` for interacting with POSIX file systems. 95 """ 96 97 def __init__( 98 self, 99 base_path: str, 100 config_dict: Optional[dict[str, Any]] = None, 101 telemetry_provider: Optional[Callable[[], Telemetry]] = None, 102 **kwargs: Any, 103 ) -> None: 104 """ 105 :param base_path: The root prefix path within the POSIX file system where all operations will be scoped. 106 :param config_dict: Resolved MSC config. 107 :param telemetry_provider: A function that provides a telemetry instance. 108 """ 109 110 # Validate POSIX path 111 if base_path == "": 112 base_path = "/" 113 114 if not base_path.startswith("/"): 115 raise ValueError(f"The base_path {base_path} must be an absolute path.") 116 117 super().__init__( 118 base_path=base_path, 119 provider_name=PROVIDER, 120 config_dict=config_dict, 121 telemetry_provider=telemetry_provider, 122 ) 123 124 def _translate_errors( 125 self, 126 func: Callable[[], _T], 127 operation: str, 128 path: str, 129 ) -> _T: 130 """ 131 Translates errors like timeouts and client errors. 132 133 :param func: The function that performs the actual file operation. 134 :param operation: The type of operation being performed (e.g., "PUT", "GET", "DELETE"). 135 :param path: The path to the object. 136 137 :return: The result of the file operation, typically the return value of the `func` callable. 138 """ 139 try: 140 return func() 141 except FileNotFoundError: 142 raise 143 except Exception as error: 144 raise RuntimeError(f"Failed to {operation} object(s) at {path}, error: {error}") from error 145 146 def _put_object( 147 self, 148 path: str, 149 body: bytes, 150 if_match: Optional[str] = None, 151 if_none_match: Optional[str] = None, 152 attributes: Optional[dict[str, str]] = None, 153 ) -> int: 154 def _invoke_api() -> int: 155 safe_makedirs(os.path.dirname(path)) 156 atomic_write(source=BytesIO(body), destination=path, attributes=attributes) 157 return len(body) 158 159 return self._translate_errors(_invoke_api, operation="PUT", path=path) 160 161 def _get_object(self, path: str, byte_range: Optional[Range] = None) -> bytes: 162 def _invoke_api() -> bytes: 163 if byte_range: 164 with open(path, "rb") as f: 165 f.seek(byte_range.offset) 166 return f.read(byte_range.size) 167 else: 168 with open(path, "rb") as f: 169 return f.read() 170 171 return self._translate_errors(_invoke_api, operation="GET", path=path) 172 173 def _copy_object(self, src_path: str, dest_path: str) -> int: 174 src_object = self._get_object_metadata(src_path) 175 176 def _invoke_api() -> int: 177 safe_makedirs(os.path.dirname(dest_path)) 178 atomic_write(source=src_path, destination=dest_path, attributes=src_object.metadata) 179 180 return src_object.content_length 181 182 return self._translate_errors(_invoke_api, operation="COPY", path=src_path) 183 184 def _delete_object(self, path: str, if_match: Optional[str] = None) -> None: 185 def _invoke_api() -> None: 186 if os.path.exists(path) and os.path.isfile(path): 187 os.remove(path) 188 189 return self._translate_errors(_invoke_api, operation="DELETE", path=path) 190 191 def _make_symlink(self, path: str, target: str) -> None: 192 def _invoke_api() -> None: 193 safe_makedirs(os.path.dirname(path)) 194 relative_target = ObjectMetadata.encode_symlink_target(path, target) 195 if os.path.lexists(path): 196 os.remove(path) 197 os.symlink(relative_target, path) 198 199 self._translate_errors(_invoke_api, operation="SYMLINK", path=path) 200 201 def _get_object_metadata(self, path: str, strict: bool = True) -> ObjectMetadata: 202 is_dir = os.path.isdir(path) 203 if is_dir: 204 path = self._append_delimiter(path) 205 206 def _invoke_api() -> ObjectMetadata: 207 metadata_dict = {} 208 try: 209 json_bytes = xattr.getxattr(path, "user.json") 210 metadata_dict = json.loads(json_bytes.decode("utf-8")) 211 except (OSError, IOError, KeyError, json.JSONDecodeError, AttributeError) as e: 212 logger.debug(f"Failed to read extended attributes from {path}: {e}") 213 pass 214 215 # ``os.readlink`` may return an absolute path; normalise to the 216 # parent-relative form used by every backend. 217 symlink_target: Optional[str] = None 218 if os.path.islink(path): 219 raw = os.readlink(path) 220 symlink_target = ObjectMetadata.encode_symlink_target(path, raw) if os.path.isabs(raw) else raw 221 222 return ObjectMetadata( 223 key=path, 224 type="directory" if is_dir else "file", 225 content_length=0 if is_dir else os.path.getsize(path), 226 last_modified=datetime.fromtimestamp(os.path.getmtime(path), tz=timezone.utc), 227 metadata=metadata_dict if metadata_dict else None, 228 symlink_target=symlink_target, 229 ) 230 231 return self._translate_errors(_invoke_api, operation="HEAD", path=path) 232 233 def _list_objects( 234 self, 235 path: str, 236 start_after: Optional[str] = None, 237 end_at: Optional[str] = None, 238 include_directories: bool = False, 239 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW, 240 ) -> Iterator[ObjectMetadata]: 241 start_after = os.path.relpath(start_after, self._base_path) if start_after else None 242 end_at = os.path.relpath(end_at, self._base_path) if end_at else None 243 244 def _invoke_api() -> Iterator[ObjectMetadata]: 245 if os.path.isfile(path): 246 yield ObjectMetadata( 247 key=os.path.relpath(path, self._base_path), 248 content_length=os.path.getsize(path), 249 last_modified=datetime.fromtimestamp(os.path.getmtime(path), tz=timezone.utc), 250 ) 251 dir_path = path.rstrip("/") + "/" 252 if not os.path.isdir(dir_path): 253 return 254 255 yield from self._explore_directory(dir_path, start_after, end_at, include_directories, symlink_handling) 256 257 return self._translate_errors(_invoke_api, operation="LIST", path=path) 258 259 @property 260 def supports_parallel_listing(self) -> bool: 261 """ 262 Whether this provider supports heap-based parallel recursive listing. 263 264 :return: ``True`` for POSIX file storage. 265 """ 266 return True 267 268 def _shallow_list(self, path: str, symlink_handling: SymlinkHandling) -> tuple[list[str], list[ObjectMetadata]]: 269 """ 270 Adapt POSIX relative listing keys to the full-key contract expected by the recursive listing heap. 271 """ 272 prefixes: list[str] = [] 273 objects: list[ObjectMetadata] = [] 274 275 for item in self._list_objects(path, include_directories=True, symlink_handling=symlink_handling): 276 full_key = self._prepend_base_path(item.key) 277 if item.type == "directory" and item.symlink_target is None: 278 child_prefix = full_key + "/" 279 if child_prefix != path.rstrip("/") + "/": 280 prefixes.append(child_prefix) 281 else: 282 item.key = full_key 283 objects.append(item) 284 285 return prefixes, objects 286 287 def _explore_directory( 288 self, 289 dir_path: str, 290 start_after: Optional[str], 291 end_at: Optional[str], 292 include_directories: bool, 293 symlink_handling: SymlinkHandling = SymlinkHandling.FOLLOW, 294 ) -> Iterator[ObjectMetadata]: 295 """ 296 Recursively explore a directory and yield objects in lexicographical order. 297 298 :param dir_path: The directory path to explore 299 :param start_after: The key to start after 300 :param end_at: The key to end at 301 :param include_directories: Whether to include directories in the result 302 :param symlink_handling: How to handle symbolic links during listing. 303 """ 304 try: 305 dir_entries = os.listdir(dir_path) 306 dir_entries.sort() 307 308 entries: list[tuple[str, str, _EntryType]] = [] 309 symlink_info: dict[str, tuple[str, str]] = {} 310 311 for entry in dir_entries: 312 full_path = os.path.join(dir_path, entry) 313 is_link = os.path.islink(full_path) 314 315 if is_link and symlink_handling == SymlinkHandling.SKIP: 316 continue 317 318 if is_link and symlink_handling == SymlinkHandling.PRESERVE: 319 relative_path = os.path.relpath(full_path, self._base_path) 320 321 real_target = os.path.realpath(full_path) 322 if not os.path.exists(real_target): 323 raise ValueError( 324 f"Broken symlink '{relative_path}' points to a missing target " 325 f"({full_path} -> {real_target}). Use symlink_handling=SKIP to ignore." 326 ) 327 328 try: 329 target_key = os.path.relpath(real_target, self._base_path) 330 except ValueError: 331 target_key = None 332 333 if target_key is None or target_key.startswith(".."): 334 raise ValueError( 335 f"Symlink '{relative_path}' points outside the base directory " 336 f"({full_path} -> {real_target}). Use symlink_handling=FOLLOW " 337 f"to dereference or symlink_handling=SKIP to ignore." 338 ) 339 340 relative_target = ObjectMetadata.encode_symlink_target(full_path, real_target) 341 342 target_type = "directory" if os.path.isdir(full_path) else "file" 343 344 if (start_after is None or start_after < relative_path) and ( 345 end_at is None or relative_path <= end_at 346 ): 347 entries.append((relative_path, full_path, _EntryType.SYMLINK)) 348 symlink_info[relative_path] = (relative_target, target_type) 349 continue 350 351 relative_path = os.path.relpath(full_path, self._base_path) 352 353 if (start_after is None or start_after < relative_path) and (end_at is None or relative_path <= end_at): 354 if os.path.isfile(full_path): 355 entries.append((relative_path, full_path, _EntryType.FILE)) 356 elif os.path.isdir(full_path): 357 if include_directories: 358 entries.append((relative_path, full_path, _EntryType.DIRECTORY)) 359 else: 360 entries.append((relative_path, full_path, _EntryType.DIRECTORY_TO_EXPLORE)) 361 362 # Sort keys must mirror the keys S3 ``list_objects_v2`` would return so POSIX 363 # listings stay in the same raw-UTF-8-byte order. For directories (expanded or 364 # returned as-is) the emitted keys live under ``<name>/``, so the trailing 365 # delimiter must be part of the sort key. Otherwise the bare name ``a`` sorts 366 # before sibling file ``a.txt`` (since ``""`` < ``".txt"``), but S3 orders the 367 # nested key ``a/b.txt`` *after* ``a.txt`` because ``.`` (0x2E) < ``/`` (0x2F). 368 def _sort_key(entry: tuple[str, str, _EntryType]) -> str: 369 relative, _, entry_type = entry 370 if entry_type in (_EntryType.DIRECTORY, _EntryType.DIRECTORY_TO_EXPLORE): 371 return relative + "/" 372 return relative 373 374 entries.sort(key=_sort_key) 375 376 for relative_path, full_path, entry_type in entries: 377 if entry_type == _EntryType.FILE: 378 yield ObjectMetadata( 379 key=relative_path, 380 content_length=os.path.getsize(full_path), 381 last_modified=datetime.fromtimestamp(os.path.getmtime(full_path), tz=timezone.utc), 382 ) 383 elif entry_type == _EntryType.DIRECTORY: 384 yield ObjectMetadata( 385 key=relative_path, 386 content_length=0, 387 type="directory", 388 last_modified=AWARE_DATETIME_MIN, 389 ) 390 elif entry_type == _EntryType.DIRECTORY_TO_EXPLORE: 391 yield from self._explore_directory( 392 full_path, start_after, end_at, include_directories, symlink_handling 393 ) 394 elif entry_type == _EntryType.SYMLINK: 395 relative_target, target_type = symlink_info[relative_path] 396 yield ObjectMetadata( 397 key=relative_path, 398 content_length=0, 399 last_modified=datetime.fromtimestamp(os.path.getmtime(full_path), tz=timezone.utc), 400 type=target_type, 401 symlink_target=relative_target, 402 ) 403 404 except (OSError, PermissionError) as e: 405 logger.warning(f"Failed to list contents of {dir_path}, caused by: {e}") 406 return 407 408 def _upload_file(self, remote_path: str, f: Union[str, IO], attributes: Optional[dict[str, str]] = None) -> int: 409 safe_makedirs(os.path.dirname(remote_path)) 410 411 filesize: int = 0 412 if isinstance(f, str): 413 filesize = os.path.getsize(f) 414 elif isinstance(f, StringIO): 415 filesize = len(f.getvalue().encode("utf-8")) 416 else: 417 filesize = len(f.getvalue()) # type: ignore 418 419 def _invoke_api() -> int: 420 atomic_write(source=f, destination=remote_path, attributes=attributes) 421 422 return filesize 423 424 return self._translate_errors(_invoke_api, operation="PUT", path=remote_path) 425 426 def _download_file(self, remote_path: str, f: Union[str, IO], metadata: Optional[ObjectMetadata] = None) -> int: 427 filesize = metadata.content_length if metadata else os.path.getsize(remote_path) 428 429 if isinstance(f, str): 430 431 def _invoke_api() -> int: 432 if os.path.dirname(f): 433 safe_makedirs(os.path.dirname(f)) 434 atomic_write(source=remote_path, destination=f) 435 436 return filesize 437 438 return self._translate_errors(_invoke_api, operation="GET", path=remote_path) 439 elif isinstance(f, StringIO): 440 441 def _invoke_api() -> int: 442 with open(remote_path, "r", encoding="utf-8") as src: 443 while chunk := src.read(READ_CHUNK_SIZE): 444 f.write(chunk) 445 446 return filesize 447 448 return self._translate_errors(_invoke_api, operation="GET", path=remote_path) 449 else: 450 451 def _invoke_api() -> int: 452 with open(remote_path, "rb") as src: 453 while chunk := src.read(READ_CHUNK_SIZE): 454 f.write(chunk) 455 456 return filesize 457 458 return self._translate_errors(_invoke_api, operation="GET", path=remote_path) 459
[docs] 460 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]: 461 pattern = self._prepend_base_path(pattern) 462 keys = list(glob.glob(pattern, recursive=True)) 463 if attribute_filter_expression: 464 filtered_keys = [] 465 evaluator = create_attribute_filter_evaluator(attribute_filter_expression) 466 for key in keys: 467 obj_metadata = self._get_object_metadata(key) 468 if matches_attribute_filter_expression(obj_metadata, evaluator): 469 filtered_keys.append(key) 470 keys = filtered_keys 471 if self._base_path == "/": 472 return keys 473 else: 474 # NOTE: PosixStorageProvider does not have the concept of bucket and prefix. 475 # So we drop the base_path from it. 476 return [key.replace(self._base_path, "", 1).lstrip("/") for key in keys]
477
[docs] 478 def is_file(self, path: str) -> bool: 479 path = self._prepend_base_path(path) 480 return os.path.isfile(path)
481
[docs] 482 def rmtree(self, path: str) -> None: 483 path = self._prepend_base_path(path) 484 shutil.rmtree(path)