Source code for multistorageclient.providers.manifest_metadata

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16from __future__ import annotations  # Enables forward references in type hints
 17
 18import json
 19import logging
 20import os
 21from collections.abc import Iterator
 22from dataclasses import asdict, dataclass
 23from datetime import datetime, timezone
 24from typing import Any, Optional, Union
 25
 26from ..types import MetadataProvider, ObjectMetadata, ResolvedPath, StorageProvider
 27from ..utils import create_attribute_filter_evaluator, glob, matches_attribute_filter_expression
 28from .manifest_formats import ManifestFormat, get_format_handler
 29from .manifest_object_metadata import ManifestObjectMetadata
 30
 31logger = logging.getLogger(__name__)
 32
 33
 34DEFAULT_MANIFEST_BASE_DIR = ".msc_manifests"
 35MANIFEST_INDEX_FILENAME = "msc_manifest_index.json"
 36MANIFEST_PARTS_CHILD_DIR = "parts"
 37MANIFEST_PART_PREFIX = "msc_manifest_part"
 38SEQUENCE_PADDING = 6  # Define padding for the sequence number (e.g., 6 for "000001")
 39
 40
[docs] 41@dataclass 42class ManifestPartReference: 43 """ 44 A data class representing a reference to dataset manifest part. 45 """ 46 47 #: The path of the manifest part relative to the main manifest. 48 path: str 49
[docs] 50 @staticmethod 51 def from_dict(data: dict[str, Any]) -> ManifestPartReference: 52 """ 53 Creates a ManifestPartReference instance from a dictionary. 54 """ 55 # Validate that the required 'path' field is present 56 if "path" not in data: 57 raise ValueError("Missing required field: 'path'") 58 59 return ManifestPartReference(path=data["path"])
60
[docs] 61 def to_dict(self) -> dict: 62 """ 63 Converts ManifestPartReference instance to a dictionary. 64 """ 65 return { 66 "path": self.path, 67 }
68 69
[docs] 70@dataclass 71class Manifest: 72 """ 73 A data class representing a dataset manifest. 74 """ 75 76 #: Defines the version of the manifest schema. 77 version: str 78 #: References to manifest parts. 79 parts: list[ManifestPartReference] 80 #: Format of manifest parts (jsonl or parquet). 81 format: str = "jsonl" 82
[docs] 83 @staticmethod 84 def from_dict(data: dict) -> "Manifest": 85 """ 86 Creates a Manifest instance from a dictionary (parsed from JSON). 87 """ 88 try: 89 version = data["version"] 90 parts = [ManifestPartReference.from_dict(part) for part in data["parts"]] 91 format = data.get("format", "jsonl") 92 except KeyError as e: 93 raise ValueError("Invalid manifest data: Missing required field") from e 94 95 return Manifest(version=version, parts=parts, format=format)
96
[docs] 97 def to_json(self) -> str: 98 data = asdict(self) 99 data["parts"] = [part.to_dict() for part in self.parts] 100 return json.dumps(data)
101 102
[docs] 103class ManifestMetadataProvider(MetadataProvider): 104 _storage_provider: StorageProvider 105 _files: dict[str, ManifestObjectMetadata] 106 _pending_adds: dict[str, ManifestObjectMetadata] 107 _pending_removes: set[str] 108 _manifest_path: str 109 _writable: bool 110 _allow_overwrites: bool 111 _format: Union[ManifestFormat, str] 112 113 def __init__( 114 self, 115 storage_provider: StorageProvider, 116 manifest_path: str, 117 writable: bool = False, 118 allow_overwrites: bool = False, 119 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL, 120 ) -> None: 121 """ 122 Creates a :py:class:`ManifestMetadataProvider`. 123 124 :param storage_provider: Storage provider. 125 :param manifest_path: Main manifest file path. 126 :param writable: If true, allows modifications and new manifests to be written. 127 :param allow_overwrites: If true, allows overwriting existing files without error. 128 :param manifest_format: Format for manifest parts. Defaults to ManifestFormat.JSONL. 129 """ 130 self._storage_provider = storage_provider 131 self._files = {} 132 self._pending_adds = {} 133 self._pending_removes = set() 134 self._manifest_path = manifest_path 135 self._writable = writable 136 self._allow_overwrites = allow_overwrites 137 self._format = ( 138 manifest_format if isinstance(manifest_format, ManifestFormat) else ManifestFormat(manifest_format) 139 ) 140 141 self._load_manifest(storage_provider, self._manifest_path) 142 143 def _load_manifest(self, storage_provider: StorageProvider, manifest_path: str) -> None: 144 """ 145 Loads manifest. 146 147 :param storage_provider: Storage provider. 148 :param manifest_path: Main manifest file path 149 """ 150 151 def helper_find_manifest_file(manifest_path: str) -> str: 152 if storage_provider.is_file(manifest_path): 153 return manifest_path 154 155 if storage_provider.is_file(os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)): 156 return os.path.join(manifest_path, MANIFEST_INDEX_FILENAME) 157 158 # Now go looking and select newest manifest. 159 if DEFAULT_MANIFEST_BASE_DIR not in manifest_path.split("/"): 160 manifest_path = os.path.join(manifest_path, DEFAULT_MANIFEST_BASE_DIR) 161 162 candidates = storage_provider.glob(os.path.join(manifest_path, "*", MANIFEST_INDEX_FILENAME)) 163 candidates = sorted(candidates) 164 return candidates[-1] if candidates else "" 165 166 resolved_manifest_path = helper_find_manifest_file(manifest_path) 167 if not resolved_manifest_path: 168 logger.warning(f"No manifest found at '{manifest_path}'.") 169 return 170 171 file_content = storage_provider.get_object(resolved_manifest_path) 172 173 prefix = os.path.dirname(resolved_manifest_path) 174 _, file_extension = os.path.splitext(resolved_manifest_path) 175 self._load_manifest_file(storage_provider, file_content, prefix, file_extension[1:]) 176 177 def _load_manifest_file( 178 self, storage_provider: StorageProvider, file_content: bytes, manifest_base: str, file_type: str 179 ) -> None: 180 """ 181 Loads a manifest. 182 183 :param storage_provider: Storage provider. 184 :param file_content: Manifest file content bytes. 185 :param manifest_base: Manifest file base path. 186 :param file_type: Manifest file type. 187 """ 188 if file_type == "json": 189 manifest_dict = json.loads(file_content.decode("utf-8")) 190 manifest = Manifest.from_dict(manifest_dict) 191 192 # Check manifest version. Not needed once we make the manifest model use sum types/discriminated unions. 193 if manifest.version != "1": 194 raise ValueError(f"Manifest version {manifest.version} is not supported.") 195 196 # Load manifest parts. 197 for manifest_part_reference in manifest.parts: 198 object_metadata_list: list[ManifestObjectMetadata] = self._load_manifest_part_file( 199 storage_provider=storage_provider, 200 manifest_base=manifest_base, 201 manifest_part_reference=manifest_part_reference, 202 manifest_format=manifest.format, 203 ) 204 205 for object_metadatum in object_metadata_list: 206 self._files[object_metadatum.key] = object_metadatum 207 else: 208 raise NotImplementedError(f"Manifest file type {file_type} is not supported.") 209 210 def _load_manifest_part_file( 211 self, 212 storage_provider: StorageProvider, 213 manifest_base: str, 214 manifest_part_reference: ManifestPartReference, 215 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL, 216 ) -> list[ManifestObjectMetadata]: 217 """ 218 Loads a manifest part and converts to ManifestObjectMetadata. 219 220 :param storage_provider: Storage provider. 221 :param manifest_base: Manifest file base path. Prepend to manifest part reference paths. 222 :param manifest_part_reference: Manifest part reference. 223 :param manifest_format: Format of the manifest part (jsonl or parquet). 224 """ 225 if not os.path.isabs(manifest_part_reference.path): 226 remote_path = os.path.join(manifest_base, manifest_part_reference.path) 227 else: 228 remote_path = manifest_part_reference.path 229 manifest_part_file_content = storage_provider.get_object(remote_path) 230 231 _, ext = os.path.splitext(remote_path) 232 detected_format = ext[1:] if ext else manifest_format 233 234 format_handler = get_format_handler(detected_format) 235 object_metadata_list = format_handler.read_part(manifest_part_file_content) 236 237 # Convert ObjectMetadata to ManifestObjectMetadata 238 # The format handler returns ObjectMetadata, but may have extra attributes set (like physical_path) 239 return [ManifestObjectMetadata.from_object_metadata(obj) for obj in object_metadata_list] 240 241 def _write_manifest_files( 242 self, 243 storage_provider: StorageProvider, 244 object_metadata: list[ManifestObjectMetadata], 245 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL, 246 ) -> None: 247 """ 248 Writes the main manifest and its part files. 249 250 Accepts ManifestObjectMetadata which extends ObjectMetadata, so format handlers 251 (which expect ObjectMetadata) can serialize it, preserving all fields including physical_path. 252 253 :param storage_provider: The storage provider to use for writing. 254 :param object_metadata: ManifestObjectMetadata objects to include in manifest. 255 :param manifest_format: Format for manifest parts. Defaults to ManifestFormat.JSONL. 256 """ 257 if not object_metadata: 258 return 259 260 base_path = self._manifest_path 261 manifest_base_path = base_path 262 263 base_path_parts = base_path.split(os.sep) 264 if DEFAULT_MANIFEST_BASE_DIR in base_path_parts: 265 manifests_index = base_path_parts.index(DEFAULT_MANIFEST_BASE_DIR) 266 if manifests_index > 0: 267 manifest_base_path = os.path.join(*base_path_parts[:manifests_index]) 268 else: 269 manifest_base_path = "" 270 if base_path.startswith(os.sep): 271 manifest_base_path = os.sep + manifest_base_path 272 273 current_time = datetime.now(timezone.utc) 274 current_time_str = current_time.isoformat(timespec="seconds") 275 manifest_folderpath = os.path.join(manifest_base_path, DEFAULT_MANIFEST_BASE_DIR, current_time_str) 276 277 format_handler = get_format_handler(manifest_format) 278 suffix = format_handler.get_file_suffix() 279 280 # We currently write only one part by default. 281 part_sequence_number = 1 282 manifest_part_file_path = os.path.join( 283 MANIFEST_PARTS_CHILD_DIR, 284 f"{MANIFEST_PART_PREFIX}{part_sequence_number:0{SEQUENCE_PADDING}}{suffix}", 285 ) 286 287 format_value = manifest_format.value if isinstance(manifest_format, ManifestFormat) else manifest_format 288 manifest = Manifest( 289 version="1", parts=[ManifestPartReference(path=manifest_part_file_path)], format=format_value 290 ) 291 292 manifest_part_content = format_handler.write_part(list(object_metadata)) 293 storage_provider.put_object(os.path.join(manifest_folderpath, manifest_part_file_path), manifest_part_content) 294 295 manifest_file_path = os.path.join(manifest_folderpath, MANIFEST_INDEX_FILENAME) 296 manifest_content = manifest.to_json() 297 storage_provider.put_object(manifest_file_path, manifest_content.encode("utf-8")) 298
[docs] 299 def list_objects( 300 self, 301 path: str, 302 start_after: Optional[str] = None, 303 end_at: Optional[str] = None, 304 include_directories: bool = False, 305 attribute_filter_expression: Optional[str] = None, 306 show_attributes: bool = False, 307 ) -> Iterator[ObjectMetadata]: 308 """ 309 List objects in the manifest. 310 311 :param path: The path to filter objects by. 312 :param start_after: The object to start after. 313 :param end_at: The object to end at. 314 :param include_directories: Whether to include directories. 315 :param attribute_filter_expression: The attribute filter expression to filter objects by. 316 :param show_attributes: This field is not used in this implementation - It will always return attributes. This is present merely to satisfy the interface. 317 """ 318 319 if (start_after is not None) and (end_at is not None) and not (start_after < end_at): 320 raise ValueError(f"start_after ({start_after}) must be before end_at ({end_at})!") 321 322 if path and not path.endswith("/"): 323 path = path + "/" 324 325 # create evaluator for attribute filter expression if present 326 evaluator = ( 327 create_attribute_filter_evaluator(attribute_filter_expression) if attribute_filter_expression else None 328 ) 329 330 # Note that this is a generator, not a tuple (there's no tuple comprehension). 331 keys = ( 332 key 333 for key, obj_metadata in self._files.items() 334 if key.startswith(path) 335 and (start_after is None or start_after < key) 336 and (end_at is None or key <= end_at) 337 and ( 338 evaluator is None or matches_attribute_filter_expression(obj_metadata, evaluator) 339 ) # filter by evaluator if present 340 ) 341 342 pending_directory: Optional[ObjectMetadata] = None 343 for key in sorted(keys): 344 if include_directories: 345 relative = key[len(path) :].lstrip("/") 346 subdirectory = relative.split("/", 1)[0] if "/" in relative else None 347 348 if subdirectory: 349 directory_name = f"{path}{subdirectory}/" 350 351 if pending_directory and pending_directory.key != directory_name: 352 yield pending_directory 353 354 obj_metadata = self.get_object_metadata(key) 355 if not pending_directory or pending_directory.key != directory_name: 356 pending_directory = ObjectMetadata( 357 key=directory_name, 358 type="directory", 359 last_modified=obj_metadata.last_modified, 360 content_length=0, 361 ) 362 else: 363 pending_directory.last_modified = max( 364 pending_directory.last_modified, obj_metadata.last_modified 365 ) 366 continue # Skip yielding this key as it's part of a directory 367 368 yield self._files[key] 369 370 if include_directories and pending_directory: 371 yield pending_directory
372
[docs] 373 def get_object_metadata(self, path: str, include_pending: bool = False) -> ObjectMetadata: 374 if path in self._files: 375 if include_pending and path in self._pending_removes: 376 raise FileNotFoundError(f"Object {path} does not exist.") 377 else: 378 # Return ManifestObjectMetadata directly (it extends ObjectMetadata) 379 return self._files[path] 380 elif include_pending and path in self._pending_adds: 381 return self._pending_adds[path] 382 else: 383 raise FileNotFoundError(f"Object {path} does not exist.")
384
[docs] 385 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]: 386 """ 387 List objects in the manifest. 388 389 :param pattern: The pattern to filter objects by. 390 :param attribute_filter_expression: The attribute filter expression to filter objects by. 391 """ 392 393 all_objects = [ 394 object.key for object in self.list_objects("", attribute_filter_expression=attribute_filter_expression) 395 ] 396 return [key for key in glob(all_objects, pattern)]
397
[docs] 398 def realpath(self, logical_path: str) -> ResolvedPath: 399 """ 400 Resolves a logical path to its physical storage path if the object exists. 401 Only checks committed files, not pending changes. 402 403 :param logical_path: The user-facing logical path 404 405 :return: ResolvedPath with exists=True if found, exists=False otherwise 406 """ 407 # Only check committed files 408 manifest_obj = self._files.get(logical_path) 409 if manifest_obj: 410 assert manifest_obj.physical_path is not None 411 return ResolvedPath(physical_path=manifest_obj.physical_path, exists=True, profile=None) 412 return ResolvedPath(physical_path=logical_path, exists=False, profile=None)
413
[docs] 414 def generate_physical_path(self, logical_path: str, for_overwrite: bool = False) -> ResolvedPath: 415 """ 416 Generates a physical storage path for a new object or for overwriting an existing object. 417 418 For now, this simply returns the logical path (no path rewriting). 419 In the future, this could generate unique paths for overwrites. 420 421 :param logical_path: The user-facing logical path 422 :param for_overwrite: If True, generate a path for overwriting an existing object 423 424 :return: The physical storage path to use for writing 425 """ 426 # For now, physical path = logical path 427 # Future enhancement: generate unique paths for overwrites 428 # if for_overwrite and self._allow_overwrites: 429 # return f"{logical_path}-{uuid.uuid4().hex}" 430 return ResolvedPath(physical_path=logical_path, exists=False, profile=None)
431
[docs] 432 def add_file(self, path: str, metadata: ObjectMetadata) -> None: 433 if not self.is_writable(): 434 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to add {path}.") 435 436 # Check if file already exists in committed state and overwrites are not allowed 437 # Pending files can always be overwritten 438 if not self._allow_overwrites and path in self._files: 439 raise FileExistsError(f"File {path} already exists and overwrites are not allowed.") 440 441 # Handle two cases: 442 # 1. If metadata is already a ManifestObjectMetadata, use it directly 443 # 2. Otherwise, create one assuming metadata.key contains the physical path 444 if isinstance(metadata, ManifestObjectMetadata): 445 # Already has proper logical/physical path separation 446 manifest_metadata = metadata 447 # Ensure the logical path matches what was requested 448 if manifest_metadata.key != path: 449 raise ValueError(f"Logical path mismatch: expected {path}, got {manifest_metadata.key}") 450 else: 451 # For backward compatibility, create a ManifestObjectMetadata from ObjectMetadata 452 manifest_metadata = ManifestObjectMetadata( 453 key=path, # Logical path (user-facing) 454 content_length=metadata.content_length, 455 last_modified=metadata.last_modified, 456 content_type=metadata.content_type, 457 etag=metadata.etag, 458 metadata=metadata.metadata, 459 type=metadata.type, 460 physical_path=metadata.key, # Physical path (from storage provider) 461 ) 462 463 # TODO: Time travel is not supported - we do not rename file paths when overwriting. 464 # When a file is overwritten, the manifest points to the new version of the file at the same location 465 # without maintaining history of previous versions. 466 self._pending_adds[path] = manifest_metadata
467
[docs] 468 def remove_file(self, path: str) -> None: 469 if not self.is_writable(): 470 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to remove {path}.") 471 if path not in self._files: 472 raise FileNotFoundError(f"Object {path} does not exist.") 473 self._pending_removes.add(path)
474
[docs] 475 def is_writable(self) -> bool: 476 return self._writable
477
[docs] 478 def allow_overwrites(self) -> bool: 479 return self._allow_overwrites
480
[docs] 481 def commit_updates(self) -> None: 482 if not self._pending_adds and not self._pending_removes: 483 return 484 485 if self._pending_adds: 486 self._files.update(self._pending_adds) 487 self._pending_adds = {} 488 489 for path in self._pending_removes: 490 self._files.pop(path) 491 self._pending_removes = set() 492 493 # Serialize ManifestObjectMetadata directly 494 # to_dict() will include all fields including physical_path 495 object_metadata = list(self._files.values()) 496 self._write_manifest_files(self._storage_provider, object_metadata, manifest_format=self._format)