Source code for multistorageclient.providers.manifest_metadata

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16from __future__ import annotations  # Enables forward references in type hints
 17
 18import json
 19import logging
 20import os
 21from collections.abc import Iterator
 22from dataclasses import asdict, dataclass
 23from datetime import datetime, timezone
 24from typing import Any, Optional, Union
 25
 26from ..types import (
 27    AWARE_DATETIME_MIN,
 28    MAX_SYMLINK_DEPTH,
 29    MetadataProvider,
 30    ObjectMetadata,
 31    ResolvedPath,
 32    ResolvedPathState,
 33    StorageProvider,
 34)
 35from ..utils import create_attribute_filter_evaluator, glob, matches_attribute_filter_expression
 36from .manifest_formats import ManifestFormat, get_format_handler
 37from .manifest_object_metadata import ManifestObjectMetadata
 38
 39logger = logging.getLogger(__name__)
 40
 41
 42DEFAULT_MANIFEST_BASE_DIR = ".msc_manifests"
 43MANIFEST_INDEX_FILENAME = "msc_manifest_index.json"
 44MANIFEST_PARTS_CHILD_DIR = "parts"
 45MANIFEST_PART_PREFIX = "msc_manifest_part"
 46SEQUENCE_PADDING = 6  # Define padding for the sequence number (e.g., 6 for "000001")
 47
 48
[docs] 49@dataclass 50class ManifestPartReference: 51 """ 52 A data class representing a reference to dataset manifest part. 53 """ 54 55 #: The path of the manifest part relative to the main manifest. 56 path: str 57
[docs] 58 @staticmethod 59 def from_dict(data: dict[str, Any]) -> ManifestPartReference: 60 """ 61 Creates a ManifestPartReference instance from a dictionary. 62 """ 63 # Validate that the required 'path' field is present 64 if "path" not in data: 65 raise ValueError("Missing required field: 'path'") 66 67 return ManifestPartReference(path=data["path"])
68
[docs] 69 def to_dict(self) -> dict: 70 """ 71 Converts ManifestPartReference instance to a dictionary. 72 """ 73 return { 74 "path": self.path, 75 }
76 77
[docs] 78@dataclass 79class Manifest: 80 """ 81 A data class representing a dataset manifest. 82 """ 83 84 #: Defines the version of the manifest schema. 85 version: str 86 #: References to manifest parts. 87 parts: list[ManifestPartReference] 88 #: Format of manifest parts (jsonl or parquet). 89 format: str = "jsonl" 90
[docs] 91 @staticmethod 92 def from_dict(data: dict) -> "Manifest": 93 """ 94 Creates a Manifest instance from a dictionary (parsed from JSON). 95 """ 96 try: 97 version = data["version"] 98 parts = [ManifestPartReference.from_dict(part) for part in data["parts"]] 99 format = data.get("format", "jsonl") 100 except KeyError as e: 101 raise ValueError("Invalid manifest data: Missing required field") from e 102 103 return Manifest(version=version, parts=parts, format=format)
104
[docs] 105 def to_json(self) -> str: 106 data = asdict(self) 107 data["parts"] = [part.to_dict() for part in self.parts] 108 return json.dumps(data)
109 110
[docs] 111class ManifestMetadataProvider(MetadataProvider): 112 _storage_provider: StorageProvider 113 _files: dict[str, ManifestObjectMetadata] 114 _pending_adds: dict[str, ManifestObjectMetadata] 115 _pending_removes: set[str] 116 _manifest_path: str 117 _writable: bool 118 _allow_overwrites: bool 119 _format: Union[ManifestFormat, str] 120 121 def __init__( 122 self, 123 storage_provider: StorageProvider, 124 manifest_path: str, 125 writable: bool = False, 126 allow_overwrites: bool = False, 127 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL, 128 ) -> None: 129 """ 130 Creates a :py:class:`ManifestMetadataProvider`. 131 132 :param storage_provider: Storage provider. 133 :param manifest_path: Main manifest file path. 134 :param writable: If true, allows modifications and new manifests to be written. 135 :param allow_overwrites: If true, allows overwriting existing files without error. 136 :param manifest_format: Format for manifest parts. Defaults to ManifestFormat.JSONL. 137 """ 138 self._storage_provider = storage_provider 139 self._files = {} 140 self._pending_adds = {} 141 self._pending_removes = set() 142 self._manifest_path = manifest_path 143 self._writable = writable 144 self._allow_overwrites = allow_overwrites 145 self._format = ( 146 manifest_format if isinstance(manifest_format, ManifestFormat) else ManifestFormat(manifest_format) 147 ) 148 149 self._load_manifest(storage_provider, self._manifest_path) 150 151 def _load_manifest(self, storage_provider: StorageProvider, manifest_path: str) -> None: 152 """ 153 Loads manifest. 154 155 :param storage_provider: Storage provider. 156 :param manifest_path: Main manifest file path 157 """ 158 159 def helper_find_manifest_file(manifest_path: str) -> str: 160 if storage_provider.is_file(manifest_path): 161 return manifest_path 162 163 if storage_provider.is_file(os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)): 164 return os.path.join(manifest_path, MANIFEST_INDEX_FILENAME) 165 166 # Now go looking and select newest manifest. 167 if DEFAULT_MANIFEST_BASE_DIR not in manifest_path.split("/"): 168 manifest_path = os.path.join(manifest_path, DEFAULT_MANIFEST_BASE_DIR) 169 170 candidates = storage_provider.glob(os.path.join(manifest_path, "*", MANIFEST_INDEX_FILENAME)) 171 candidates = sorted(candidates) 172 return candidates[-1] if candidates else "" 173 174 resolved_manifest_path = helper_find_manifest_file(manifest_path) 175 if not resolved_manifest_path: 176 logger.warning(f"No manifest found at '{manifest_path}'.") 177 return 178 179 file_content = storage_provider.get_object(resolved_manifest_path) 180 181 prefix = os.path.dirname(resolved_manifest_path) 182 _, file_extension = os.path.splitext(resolved_manifest_path) 183 self._load_manifest_file(storage_provider, file_content, prefix, file_extension[1:]) 184 185 def _load_manifest_file( 186 self, storage_provider: StorageProvider, file_content: bytes, manifest_base: str, file_type: str 187 ) -> None: 188 """ 189 Loads a manifest. 190 191 :param storage_provider: Storage provider. 192 :param file_content: Manifest file content bytes. 193 :param manifest_base: Manifest file base path. 194 :param file_type: Manifest file type. 195 """ 196 if file_type == "json": 197 manifest_dict = json.loads(file_content.decode("utf-8")) 198 manifest = Manifest.from_dict(manifest_dict) 199 200 # Check manifest version. Not needed once we make the manifest model use sum types/discriminated unions. 201 if manifest.version != "1": 202 raise ValueError(f"Manifest version {manifest.version} is not supported.") 203 204 # Load manifest parts. 205 for manifest_part_reference in manifest.parts: 206 object_metadata_list: list[ManifestObjectMetadata] = self._load_manifest_part_file( 207 storage_provider=storage_provider, 208 manifest_base=manifest_base, 209 manifest_part_reference=manifest_part_reference, 210 manifest_format=manifest.format, 211 ) 212 213 for object_metadatum in object_metadata_list: 214 self._files[object_metadatum.key] = object_metadatum 215 else: 216 raise NotImplementedError(f"Manifest file type {file_type} is not supported.") 217 218 def _load_manifest_part_file( 219 self, 220 storage_provider: StorageProvider, 221 manifest_base: str, 222 manifest_part_reference: ManifestPartReference, 223 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL, 224 ) -> list[ManifestObjectMetadata]: 225 """ 226 Loads a manifest part and converts to ManifestObjectMetadata. 227 228 :param storage_provider: Storage provider. 229 :param manifest_base: Manifest file base path. Prepend to manifest part reference paths. 230 :param manifest_part_reference: Manifest part reference. 231 :param manifest_format: Format of the manifest part (jsonl or parquet). 232 """ 233 if not os.path.isabs(manifest_part_reference.path): 234 remote_path = os.path.join(manifest_base, manifest_part_reference.path) 235 else: 236 remote_path = manifest_part_reference.path 237 manifest_part_file_content = storage_provider.get_object(remote_path) 238 239 _, ext = os.path.splitext(remote_path) 240 detected_format = ext[1:] if ext else manifest_format 241 242 format_handler = get_format_handler(detected_format) 243 object_metadata_list = format_handler.read_part(manifest_part_file_content) 244 245 # Convert ObjectMetadata to ManifestObjectMetadata 246 # The format handler returns ObjectMetadata, but may have extra attributes set (like physical_path) 247 return [ManifestObjectMetadata.from_object_metadata(obj) for obj in object_metadata_list] 248 249 def _write_manifest_files( 250 self, 251 storage_provider: StorageProvider, 252 object_metadata: list[ManifestObjectMetadata], 253 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL, 254 ) -> None: 255 """ 256 Writes the main manifest and its part files. 257 258 Accepts ManifestObjectMetadata which extends ObjectMetadata, so format handlers 259 (which expect ObjectMetadata) can serialize it, preserving all fields including physical_path. 260 261 :param storage_provider: The storage provider to use for writing. 262 :param object_metadata: ManifestObjectMetadata objects to include in manifest. 263 :param manifest_format: Format for manifest parts. Defaults to ManifestFormat.JSONL. 264 """ 265 if not object_metadata: 266 return 267 268 base_path = self._manifest_path 269 manifest_base_path = base_path 270 271 base_path_parts = base_path.split(os.sep) 272 if DEFAULT_MANIFEST_BASE_DIR in base_path_parts: 273 manifests_index = base_path_parts.index(DEFAULT_MANIFEST_BASE_DIR) 274 if manifests_index > 0: 275 manifest_base_path = os.path.join(*base_path_parts[:manifests_index]) 276 else: 277 manifest_base_path = "" 278 if base_path.startswith(os.sep): 279 manifest_base_path = os.sep + manifest_base_path 280 281 current_time = datetime.now(timezone.utc) 282 current_time_str = current_time.isoformat(timespec="seconds") 283 manifest_folderpath = os.path.join(manifest_base_path, DEFAULT_MANIFEST_BASE_DIR, current_time_str) 284 285 format_handler = get_format_handler(manifest_format) 286 suffix = format_handler.get_file_suffix() 287 288 # We currently write only one part by default. 289 part_sequence_number = 1 290 manifest_part_file_path = os.path.join( 291 MANIFEST_PARTS_CHILD_DIR, 292 f"{MANIFEST_PART_PREFIX}{part_sequence_number:0{SEQUENCE_PADDING}}{suffix}", 293 ) 294 295 format_value = manifest_format.value if isinstance(manifest_format, ManifestFormat) else manifest_format 296 manifest = Manifest( 297 version="1", parts=[ManifestPartReference(path=manifest_part_file_path)], format=format_value 298 ) 299 300 manifest_part_content = format_handler.write_part(list(object_metadata)) 301 storage_provider.put_object(os.path.join(manifest_folderpath, manifest_part_file_path), manifest_part_content) 302 303 manifest_file_path = os.path.join(manifest_folderpath, MANIFEST_INDEX_FILENAME) 304 manifest_content = manifest.to_json() 305 storage_provider.put_object(manifest_file_path, manifest_content.encode("utf-8")) 306
[docs] 307 def list_objects( 308 self, 309 path: str, 310 start_after: Optional[str] = None, 311 end_at: Optional[str] = None, 312 include_directories: bool = False, 313 attribute_filter_expression: Optional[str] = None, 314 show_attributes: bool = False, 315 ) -> Iterator[ObjectMetadata]: 316 """ 317 List objects in the manifest. 318 319 :param path: The path to filter objects by. 320 :param start_after: The object to start after. 321 :param end_at: The object to end at. 322 :param include_directories: Whether to include directories. 323 :param attribute_filter_expression: The attribute filter expression to filter objects by. 324 :param show_attributes: This field is not used in this implementation - It will always return attributes. This is present merely to satisfy the interface. 325 """ 326 327 if (start_after is not None) and (end_at is not None) and not (start_after < end_at): 328 raise ValueError(f"start_after ({start_after}) must be before end_at ({end_at})!") 329 330 if path and not path.endswith("/"): 331 path = path + "/" 332 333 # create evaluator for attribute filter expression if present 334 evaluator = ( 335 create_attribute_filter_evaluator(attribute_filter_expression) if attribute_filter_expression else None 336 ) 337 338 # Note that this is a generator, not a tuple (there's no tuple comprehension). 339 keys = ( 340 key 341 for key, obj_metadata in self._files.items() 342 if key.startswith(path) 343 and (start_after is None or start_after < key) 344 and (end_at is None or key <= end_at) 345 and ( 346 evaluator is None or matches_attribute_filter_expression(obj_metadata, evaluator) 347 ) # filter by evaluator if present 348 ) 349 350 pending_directory: Optional[ObjectMetadata] = None 351 for key in sorted(keys): 352 if include_directories: 353 relative = key[len(path) :].lstrip("/") 354 subdirectory = relative.split("/", 1)[0] if "/" in relative else None 355 356 if subdirectory: 357 directory_name = f"{path}{subdirectory}/" 358 359 if pending_directory and pending_directory.key != directory_name: 360 yield pending_directory 361 362 obj_metadata = self.get_object_metadata(key) 363 if not pending_directory or pending_directory.key != directory_name: 364 pending_directory = ObjectMetadata( 365 key=directory_name, 366 type="directory", 367 last_modified=obj_metadata.last_modified, 368 content_length=0, 369 ) 370 else: 371 pending_directory.last_modified = max( 372 pending_directory.last_modified, obj_metadata.last_modified 373 ) 374 continue # Skip yielding this key as it's part of a directory 375 376 yield self._files[key] 377 378 if include_directories and pending_directory: 379 yield pending_directory
380
[docs] 381 def get_object_metadata(self, path: str, include_pending: bool = False) -> ObjectMetadata: 382 if path in self._files: 383 if include_pending and path in self._pending_removes: 384 raise FileNotFoundError(f"Object {path} does not exist.") 385 else: 386 # Return ManifestObjectMetadata directly (it extends ObjectMetadata) 387 return self._files[path] 388 elif include_pending and path in self._pending_adds: 389 return self._pending_adds[path] 390 else: 391 return self._get_directory_metadata(path, include_pending)
392 393 def _get_directory_metadata(self, path: str, include_pending: bool) -> ObjectMetadata: 394 dir_prefix = path.rstrip("/") + "/" 395 396 has_committed = any( 397 k.startswith(dir_prefix) and (not include_pending or k not in self._pending_removes) for k in self._files 398 ) 399 has_pending = include_pending and any(k.startswith(dir_prefix) for k in self._pending_adds) 400 401 if not has_committed and not has_pending: 402 raise FileNotFoundError(f"Object {path} does not exist.") 403 404 return ObjectMetadata( 405 key=dir_prefix, 406 type="directory", 407 content_length=0, 408 last_modified=AWARE_DATETIME_MIN, 409 ) 410
[docs] 411 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]: 412 """ 413 List objects in the manifest. 414 415 :param pattern: The pattern to filter objects by. 416 :param attribute_filter_expression: The attribute filter expression to filter objects by. 417 """ 418 419 all_objects = [ 420 object.key for object in self.list_objects("", attribute_filter_expression=attribute_filter_expression) 421 ] 422 return [key for key in glob(all_objects, pattern)]
423
[docs] 424 def realpath(self, logical_path: str) -> ResolvedPath: 425 """ 426 Resolves a logical path to its physical storage path if the object exists. 427 Only checks committed files, not pending changes. 428 Follows ``symlink_target`` chains in the manifest with depth limit and cycle detection. 429 430 :param logical_path: The user-facing logical path 431 432 :return: ResolvedPath with exists=True if found, exists=False otherwise 433 """ 434 visited: set[str] = set() 435 current = logical_path 436 for _ in range(MAX_SYMLINK_DEPTH): 437 manifest_obj = self._files.get(current) 438 if not manifest_obj: 439 return ResolvedPath(physical_path=current, state=ResolvedPathState.UNTRACKED, profile=None) 440 if not manifest_obj.symlink_target: 441 assert manifest_obj.physical_path is not None 442 return ResolvedPath( 443 physical_path=manifest_obj.physical_path, state=ResolvedPathState.EXISTS, profile=None 444 ) 445 if current in visited: 446 raise ValueError(f"Symlink cycle detected at: {current}") 447 visited.add(current) 448 current = ObjectMetadata.resolve_symlink_target(current, manifest_obj.symlink_target) 449 raise ValueError(f"Too many levels of symlinks (>{MAX_SYMLINK_DEPTH}): {logical_path}")
450
[docs] 451 def generate_physical_path(self, logical_path: str, for_overwrite: bool = False) -> ResolvedPath: 452 """ 453 Generates a physical storage path for a new object or for overwriting an existing object. 454 455 For now, this simply returns the logical path (no path rewriting). 456 In the future, this could generate unique paths for overwrites. 457 458 :param logical_path: The user-facing logical path 459 :param for_overwrite: When ``True``, generate a path for overwriting an existing object. 460 461 :return: The physical storage path to use for writing 462 """ 463 # For now, physical path = logical path 464 # Future enhancement: generate unique paths for overwrites 465 # if for_overwrite and self._allow_overwrites: 466 # return f"{logical_path}-{uuid.uuid4().hex}" 467 return ResolvedPath(physical_path=logical_path, state=ResolvedPathState.UNTRACKED, profile=None)
468
[docs] 469 def add_file(self, path: str, metadata: ObjectMetadata) -> None: 470 if not self.is_writable(): 471 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to add {path}.") 472 473 # Check if file already exists in committed state and overwrites are not allowed 474 # Pending files can always be overwritten 475 if not self._allow_overwrites and path in self._files: 476 raise FileExistsError(f"File {path} already exists and overwrites are not allowed.") 477 478 # Handle two cases: 479 # 1. If metadata is already a ManifestObjectMetadata, use it directly 480 # 2. Otherwise, create one assuming metadata.key contains the physical path 481 if isinstance(metadata, ManifestObjectMetadata): 482 # Already has proper logical/physical path separation 483 manifest_metadata = metadata 484 # Ensure the logical path matches what was requested 485 if manifest_metadata.key != path: 486 raise ValueError(f"Logical path mismatch: expected {path}, got {manifest_metadata.key}") 487 else: 488 # For backward compatibility, create a ManifestObjectMetadata from ObjectMetadata 489 manifest_metadata = ManifestObjectMetadata( 490 key=path, # Logical path (user-facing) 491 content_length=metadata.content_length, 492 last_modified=metadata.last_modified, 493 content_type=metadata.content_type, 494 etag=metadata.etag, 495 metadata=metadata.metadata, 496 type=metadata.type, 497 physical_path=metadata.key, # Physical path (from storage provider) 498 ) 499 500 # TODO: Time travel is not supported - we do not rename file paths when overwriting. 501 # When a file is overwritten, the manifest points to the new version of the file at the same location 502 # without maintaining history of previous versions. 503 self._pending_adds[path] = manifest_metadata
504
[docs] 505 def remove_file(self, path: str) -> None: 506 if not self.is_writable(): 507 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to remove {path}.") 508 if path not in self._files: 509 raise FileNotFoundError(f"Object {path} does not exist.") 510 self._pending_removes.add(path)
511
[docs] 512 def is_writable(self) -> bool: 513 return self._writable
514
[docs] 515 def allow_overwrites(self) -> bool: 516 return self._allow_overwrites
517
[docs] 518 def should_use_soft_delete(self) -> bool: 519 return False
520
[docs] 521 def commit_updates(self) -> None: 522 if not self._pending_adds and not self._pending_removes: 523 return 524 525 if self._pending_adds: 526 self._files.update(self._pending_adds) 527 self._pending_adds = {} 528 529 for path in self._pending_removes: 530 self._files.pop(path) 531 self._pending_removes = set() 532 533 # Serialize ManifestObjectMetadata directly 534 # to_dict() will include all fields including physical_path 535 object_metadata = list(self._files.values()) 536 self._write_manifest_files(self._storage_provider, object_metadata, manifest_format=self._format)