Source code for multistorageclient.providers.manifest_metadata

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16from __future__ import annotations  # Enables forward references in type hints
 17
 18import json
 19import logging
 20import os
 21from collections.abc import Iterator
 22from dataclasses import asdict, dataclass
 23from datetime import datetime, timezone
 24from typing import Any, Optional, Union
 25
 26from ..types import MetadataProvider, ObjectMetadata, StorageProvider
 27from ..utils import create_attribute_filter_evaluator, glob, matches_attribute_filter_expression
 28from .manifest_formats import ManifestFormat, get_format_handler
 29
 30logger = logging.getLogger(__name__)
 31
 32
 33DEFAULT_MANIFEST_BASE_DIR = ".msc_manifests"
 34MANIFEST_INDEX_FILENAME = "msc_manifest_index.json"
 35MANIFEST_PARTS_CHILD_DIR = "parts"
 36MANIFEST_PART_PREFIX = "msc_manifest_part"
 37SEQUENCE_PADDING = 6  # Define padding for the sequence number (e.g., 6 for "000001")
 38
 39
[docs] 40@dataclass 41class ManifestPartReference: 42 """ 43 A data class representing a reference to dataset manifest part. 44 """ 45 46 #: The path of the manifest part relative to the main manifest. 47 path: str 48
[docs] 49 @staticmethod 50 def from_dict(data: dict[str, Any]) -> ManifestPartReference: 51 """ 52 Creates a ManifestPartReference instance from a dictionary. 53 """ 54 # Validate that the required 'path' field is present 55 if "path" not in data: 56 raise ValueError("Missing required field: 'path'") 57 58 return ManifestPartReference(path=data["path"])
59
[docs] 60 def to_dict(self) -> dict: 61 """ 62 Converts ManifestPartReference instance to a dictionary. 63 """ 64 return { 65 "path": self.path, 66 }
67 68
[docs] 69@dataclass 70class Manifest: 71 """ 72 A data class representing a dataset manifest. 73 """ 74 75 #: Defines the version of the manifest schema. 76 version: str 77 #: References to manifest parts. 78 parts: list[ManifestPartReference] 79 #: Format of manifest parts (jsonl or parquet). 80 format: str = "jsonl" 81
[docs] 82 @staticmethod 83 def from_dict(data: dict) -> "Manifest": 84 """ 85 Creates a Manifest instance from a dictionary (parsed from JSON). 86 """ 87 try: 88 version = data["version"] 89 parts = [ManifestPartReference.from_dict(part) for part in data["parts"]] 90 format = data.get("format", "jsonl") 91 except KeyError as e: 92 raise ValueError("Invalid manifest data: Missing required field") from e 93 94 return Manifest(version=version, parts=parts, format=format)
95
[docs] 96 def to_json(self) -> str: 97 data = asdict(self) 98 data["parts"] = [part.to_dict() for part in self.parts] 99 return json.dumps(data)
100 101
[docs] 102class ManifestMetadataProvider(MetadataProvider): 103 _storage_provider: StorageProvider 104 _files: dict[str, ObjectMetadata] 105 _pending_adds: dict[str, ObjectMetadata] 106 _pending_removes: set[str] 107 _manifest_path: str 108 _writable: bool 109 _format: Union[ManifestFormat, str] 110 111 def __init__( 112 self, 113 storage_provider: StorageProvider, 114 manifest_path: str, 115 writable: bool = False, 116 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL, 117 ) -> None: 118 """ 119 Creates a :py:class:`ManifestMetadataProvider`. 120 121 :param storage_provider: Storage provider. 122 :param manifest_path: Main manifest file path. 123 :param writable: If true, allows modifications and new manifests to be written. 124 :param manifest_format: Format for manifest parts. Defaults to ManifestFormat.JSONL. 125 """ 126 self._storage_provider = storage_provider 127 self._files = {} 128 self._pending_adds = {} 129 self._pending_removes = set() 130 self._manifest_path = manifest_path 131 self._writable = writable 132 self._format = ( 133 manifest_format if isinstance(manifest_format, ManifestFormat) else ManifestFormat(manifest_format) 134 ) 135 136 self._load_manifest(storage_provider, self._manifest_path) 137 138 def _load_manifest(self, storage_provider: StorageProvider, manifest_path: str) -> None: 139 """ 140 Loads manifest. 141 142 :param storage_provider: Storage provider. 143 :param manifest_path: Main manifest file path 144 """ 145 146 def helper_find_manifest_file(manifest_path: str) -> str: 147 if storage_provider.is_file(manifest_path): 148 return manifest_path 149 150 if storage_provider.is_file(os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)): 151 return os.path.join(manifest_path, MANIFEST_INDEX_FILENAME) 152 153 # Now go looking and select newest manifest. 154 if DEFAULT_MANIFEST_BASE_DIR not in manifest_path.split("/"): 155 manifest_path = os.path.join(manifest_path, DEFAULT_MANIFEST_BASE_DIR) 156 157 candidates = storage_provider.glob(os.path.join(manifest_path, "*", MANIFEST_INDEX_FILENAME)) 158 candidates = sorted(candidates) 159 return candidates[-1] if candidates else "" 160 161 resolved_manifest_path = helper_find_manifest_file(manifest_path) 162 if not resolved_manifest_path: 163 logger.warning(f"No manifest found at '{manifest_path}'.") 164 return 165 166 file_content = storage_provider.get_object(resolved_manifest_path) 167 168 prefix = os.path.dirname(resolved_manifest_path) 169 _, file_extension = os.path.splitext(resolved_manifest_path) 170 self._load_manifest_file(storage_provider, file_content, prefix, file_extension[1:]) 171 172 def _load_manifest_file( 173 self, storage_provider: StorageProvider, file_content: bytes, manifest_base: str, file_type: str 174 ) -> None: 175 """ 176 Loads a manifest. 177 178 :param storage_provider: Storage provider. 179 :param file_content: Manifest file content bytes. 180 :param manifest_base: Manifest file base path. 181 :param file_type: Manifest file type. 182 """ 183 if file_type == "json": 184 manifest_dict = json.loads(file_content.decode("utf-8")) 185 manifest = Manifest.from_dict(manifest_dict) 186 187 # Check manifest version. Not needed once we make the manifest model use sum types/discriminated unions. 188 if manifest.version != "1": 189 raise ValueError(f"Manifest version {manifest.version} is not supported.") 190 191 # Load manifest parts. 192 for manifest_part_reference in manifest.parts: 193 object_metadata: list[ObjectMetadata] = self._load_manifest_part_file( 194 storage_provider=storage_provider, 195 manifest_base=manifest_base, 196 manifest_part_reference=manifest_part_reference, 197 manifest_format=manifest.format, 198 ) 199 200 for object_metadatum in object_metadata: 201 self._files[object_metadatum.key] = object_metadatum 202 else: 203 raise NotImplementedError(f"Manifest file type {file_type} is not supported.") 204 205 def _load_manifest_part_file( 206 self, 207 storage_provider: StorageProvider, 208 manifest_base: str, 209 manifest_part_reference: ManifestPartReference, 210 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL, 211 ) -> list[ObjectMetadata]: 212 """ 213 Loads a manifest part. 214 215 :param storage_provider: Storage provider. 216 :param manifest_base: Manifest file base path. Prepend to manifest part reference paths. 217 :param manifest_part_reference: Manifest part reference. 218 :param manifest_format: Format of the manifest part (jsonl or parquet). 219 """ 220 if not os.path.isabs(manifest_part_reference.path): 221 remote_path = os.path.join(manifest_base, manifest_part_reference.path) 222 else: 223 remote_path = manifest_part_reference.path 224 manifest_part_file_content = storage_provider.get_object(remote_path) 225 226 _, ext = os.path.splitext(remote_path) 227 detected_format = ext[1:] if ext else manifest_format 228 229 format_handler = get_format_handler(detected_format) 230 return format_handler.read_part(manifest_part_file_content) 231 232 def _write_manifest_files( 233 self, 234 storage_provider: StorageProvider, 235 object_metadata: list[ObjectMetadata], 236 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL, 237 ) -> None: 238 """ 239 Writes the main manifest and its part files. 240 241 :param storage_provider: The storage provider to use for writing. 242 :param object_metadata: objects to include in manifest. 243 :param manifest_format: Format for manifest parts. Defaults to ManifestFormat.JSONL. 244 """ 245 if not object_metadata: 246 return 247 248 base_path = self._manifest_path 249 manifest_base_path = base_path 250 251 base_path_parts = base_path.split(os.sep) 252 if DEFAULT_MANIFEST_BASE_DIR in base_path_parts: 253 manifests_index = base_path_parts.index(DEFAULT_MANIFEST_BASE_DIR) 254 if manifests_index > 0: 255 manifest_base_path = os.path.join(*base_path_parts[:manifests_index]) 256 else: 257 manifest_base_path = "" 258 if base_path.startswith(os.sep): 259 manifest_base_path = os.sep + manifest_base_path 260 261 current_time = datetime.now(timezone.utc) 262 current_time_str = current_time.isoformat(timespec="seconds") 263 manifest_folderpath = os.path.join(manifest_base_path, DEFAULT_MANIFEST_BASE_DIR, current_time_str) 264 265 format_handler = get_format_handler(manifest_format) 266 suffix = format_handler.get_file_suffix() 267 268 # We currently write only one part by default. 269 part_sequence_number = 1 270 manifest_part_file_path = os.path.join( 271 MANIFEST_PARTS_CHILD_DIR, 272 f"{MANIFEST_PART_PREFIX}{part_sequence_number:0{SEQUENCE_PADDING}}{suffix}", 273 ) 274 275 format_value = manifest_format.value if isinstance(manifest_format, ManifestFormat) else manifest_format 276 manifest = Manifest( 277 version="1", parts=[ManifestPartReference(path=manifest_part_file_path)], format=format_value 278 ) 279 280 manifest_part_content = format_handler.write_part(object_metadata) 281 storage_provider.put_object(os.path.join(manifest_folderpath, manifest_part_file_path), manifest_part_content) 282 283 manifest_file_path = os.path.join(manifest_folderpath, MANIFEST_INDEX_FILENAME) 284 manifest_content = manifest.to_json() 285 storage_provider.put_object(manifest_file_path, manifest_content.encode("utf-8")) 286
[docs] 287 def list_objects( 288 self, 289 path: str, 290 start_after: Optional[str] = None, 291 end_at: Optional[str] = None, 292 include_directories: bool = False, 293 attribute_filter_expression: Optional[str] = None, 294 show_attributes: bool = False, 295 ) -> Iterator[ObjectMetadata]: 296 """ 297 List objects in the manifest. 298 299 :param path: The path to filter objects by. 300 :param start_after: The object to start after. 301 :param end_at: The object to end at. 302 :param include_directories: Whether to include directories. 303 :param attribute_filter_expression: The attribute filter expression to filter objects by. 304 :param show_attributes: This field is not used in this implementation - It will always return attributes. This is present merely to satisfy the interface. 305 """ 306 307 if (start_after is not None) and (end_at is not None) and not (start_after < end_at): 308 raise ValueError(f"start_after ({start_after}) must be before end_at ({end_at})!") 309 310 if path and not path.endswith("/"): 311 path = path + "/" 312 313 # create evaluator for attribute filter expression if present 314 evaluator = ( 315 create_attribute_filter_evaluator(attribute_filter_expression) if attribute_filter_expression else None 316 ) 317 318 # Note that this is a generator, not a tuple (there's no tuple comprehension). 319 keys = ( 320 key 321 for key, obj_metadata in self._files.items() 322 if key.startswith(path) 323 and (start_after is None or start_after < key) 324 and (end_at is None or key <= end_at) 325 and ( 326 evaluator is None or matches_attribute_filter_expression(obj_metadata, evaluator) 327 ) # filter by evaluator if present 328 ) 329 330 pending_directory: Optional[ObjectMetadata] = None 331 for key in sorted(keys): 332 if include_directories: 333 relative = key[len(path) :].lstrip("/") 334 subdirectory = relative.split("/", 1)[0] if "/" in relative else None 335 336 if subdirectory: 337 directory_name = f"{path}{subdirectory}/" 338 339 if pending_directory and pending_directory.key != directory_name: 340 yield pending_directory 341 342 obj_metadata = self.get_object_metadata(key) 343 if not pending_directory or pending_directory.key != directory_name: 344 pending_directory = ObjectMetadata( 345 key=directory_name, 346 type="directory", 347 last_modified=obj_metadata.last_modified, 348 content_length=0, 349 ) 350 else: 351 pending_directory.last_modified = max( 352 pending_directory.last_modified, obj_metadata.last_modified 353 ) 354 continue # Skip yielding this key as it's part of a directory 355 356 obj = self._files[key] 357 obj.key = key # use key without base_path 358 yield obj 359 360 if include_directories and pending_directory: 361 yield pending_directory
362
[docs] 363 def get_object_metadata(self, path: str, include_pending: bool = False) -> ObjectMetadata: 364 if path in self._files: 365 if include_pending and path in self._pending_removes: 366 raise FileNotFoundError(f"Object {path} does not exist.") 367 else: 368 return self._files[path] 369 elif include_pending and path in self._pending_adds: 370 return self._pending_adds[path] 371 else: 372 raise FileNotFoundError(f"Object {path} does not exist.")
373
[docs] 374 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]: 375 """ 376 List objects in the manifest. 377 378 :param pattern: The pattern to filter objects by. 379 :param attribute_filter_expression: The attribute filter expression to filter objects by. 380 """ 381 382 all_objects = [ 383 object.key for object in self.list_objects("", attribute_filter_expression=attribute_filter_expression) 384 ] 385 return [key for key in glob(all_objects, pattern)]
386
[docs] 387 def realpath(self, path: str) -> tuple[str, bool]: 388 exists = path in self._files 389 return path, exists
390
[docs] 391 def add_file(self, path: str, metadata: ObjectMetadata) -> None: 392 if not self.is_writable(): 393 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to add {path}.") 394 self._pending_adds[path] = metadata
395
[docs] 396 def remove_file(self, path: str) -> None: 397 if not self.is_writable(): 398 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to remove {path}.") 399 if path not in self._files: 400 raise FileNotFoundError(f"Object {path} does not exist.") 401 self._pending_removes.add(path)
402
[docs] 403 def is_writable(self) -> bool: 404 return self._writable
405
[docs] 406 def commit_updates(self) -> None: 407 if not self._pending_adds and not self._pending_removes: 408 return 409 410 if self._pending_adds: 411 self._files.update(self._pending_adds) 412 self._pending_adds = {} 413 414 for path in self._pending_removes: 415 self._files.pop(path) 416 self._pending_removes = set() 417 418 # Collect metadata for each object to write out in this part file. 419 object_metadata = [ 420 ObjectMetadata( 421 key=file_path, 422 content_length=metadata.content_length, 423 last_modified=metadata.last_modified, 424 metadata=metadata.metadata, 425 ) 426 for file_path, metadata in self._files.items() 427 ] 428 self._write_manifest_files(self._storage_provider, object_metadata, manifest_format=self._format)