Source code for multistorageclient.providers.manifest_metadata

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16from __future__ import annotations  # Enables forward references in type hints
 17
 18import io
 19import json
 20import logging
 21import os
 22from collections.abc import Iterator
 23from dataclasses import asdict, dataclass
 24from datetime import datetime, timezone
 25from typing import Any, Optional
 26
 27from ..types import MetadataProvider, ObjectMetadata, StorageProvider
 28from ..utils import create_attribute_filter_evaluator, glob, matches_attribute_filter_expression
 29
 30logger = logging.getLogger(__name__)
 31
 32
 33DEFAULT_MANIFEST_BASE_DIR = ".msc_manifests"
 34MANIFEST_INDEX_FILENAME = "msc_manifest_index.json"
 35MANIFEST_PARTS_CHILD_DIR = "parts"
 36MANIFEST_PART_PREFIX = "msc_manifest_part"
 37MANIFEST_PART_SUFFIX = ".jsonl"  # Suffix for the manifest part files
 38SEQUENCE_PADDING = 6  # Define padding for the sequence number (e.g., 6 for "000001")
 39
 40
[docs] 41@dataclass 42class ManifestPartReference: 43 """ 44 A data class representing a reference to dataset manifest part. 45 """ 46 47 #: The path of the manifest part relative to the main manifest. 48 path: str 49
[docs] 50 @staticmethod 51 def from_dict(data: dict[str, Any]) -> ManifestPartReference: 52 """ 53 Creates a ManifestPartReference instance from a dictionary. 54 """ 55 # Validate that the required 'path' field is present 56 if "path" not in data: 57 raise ValueError("Missing required field: 'path'") 58 59 return ManifestPartReference(path=data["path"])
60
[docs] 61 def to_dict(self) -> dict: 62 """ 63 Converts ManifestPartReference instance to a dictionary. 64 """ 65 return { 66 "path": self.path, 67 }
68 69
[docs] 70@dataclass 71class Manifest: 72 """ 73 A data class representing a dataset manifest. 74 """ 75 76 #: Defines the version of the manifest schema. 77 version: str 78 #: References to manifest parts. 79 parts: list[ManifestPartReference] 80
[docs] 81 @staticmethod 82 def from_dict(data: dict) -> "Manifest": 83 """ 84 Creates a Manifest instance from a dictionary (parsed from JSON). 85 """ 86 # Perform any necessary validation here 87 try: 88 version = data["version"] 89 parts = [ManifestPartReference.from_dict(part) for part in data["parts"]] 90 except KeyError as e: 91 raise ValueError("Invalid manifest data: Missing required field") from e 92 93 return Manifest(version=version, parts=parts)
94
[docs] 95 def to_json(self) -> str: 96 # Convert dataclass to dict and parts to JSON-compatible format 97 data = asdict(self) 98 data["parts"] = [part.to_dict() for part in self.parts] 99 return json.dumps(data)
100 101 102def _metadata_to_manifest_dict(metadata: ObjectMetadata) -> dict: 103 """ 104 Convert an ObjectMetadata instance to a dictionary suitable with manifest format, 105 replacing 'content_length' with 'size_bytes' and removing 'content_length'. 106 """ 107 metadata_dict = metadata.to_dict() 108 # Pop out content_length, store it in size_bytes 109 size_bytes = metadata_dict.pop("content_length", None) 110 metadata_dict["size_bytes"] = size_bytes 111 return metadata_dict 112 113
[docs] 114class ManifestMetadataProvider(MetadataProvider): 115 _storage_provider: StorageProvider 116 _files: dict[str, ObjectMetadata] 117 _pending_adds: dict[str, ObjectMetadata] 118 _pending_removes: set[str] 119 _manifest_path: str 120 _writable: bool 121 122 def __init__(self, storage_provider: StorageProvider, manifest_path: str, writable: bool = False) -> None: 123 """ 124 Creates a :py:class:`ManifestMetadataProvider`. 125 126 :param storage_provider: Storage provider. 127 :param manifest_path: Main manifest file path. 128 :param writable: If true, allows modifications and new manifests to be written. 129 """ 130 self._storage_provider = storage_provider 131 self._files = {} 132 self._pending_adds = {} 133 self._pending_removes = set() 134 self._manifest_path = manifest_path 135 self._writable = writable 136 137 self._load_manifest(storage_provider, self._manifest_path) 138 139 def _load_manifest(self, storage_provider: StorageProvider, manifest_path: str) -> None: 140 """ 141 Loads manifest. 142 143 :param storage_provider: Storage provider. 144 :param manifest_path: Main manifest file path 145 """ 146 147 def helper_find_manifest_file(manifest_path: str) -> str: 148 if storage_provider.is_file(manifest_path): 149 return manifest_path 150 151 if storage_provider.is_file(os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)): 152 return os.path.join(manifest_path, MANIFEST_INDEX_FILENAME) 153 154 # Now go looking and select newest manifest. 155 if DEFAULT_MANIFEST_BASE_DIR not in manifest_path.split("/"): 156 manifest_path = os.path.join(manifest_path, DEFAULT_MANIFEST_BASE_DIR) 157 158 candidates = storage_provider.glob(os.path.join(manifest_path, "*", MANIFEST_INDEX_FILENAME)) 159 candidates = sorted(candidates) 160 return candidates[-1] if candidates else "" 161 162 resolved_manifest_path = helper_find_manifest_file(manifest_path) 163 if not resolved_manifest_path: 164 logger.warning(f"No manifest found at '{manifest_path}'.") 165 return 166 167 file_content = storage_provider.get_object(resolved_manifest_path) 168 169 prefix = os.path.dirname(resolved_manifest_path) 170 _, file_extension = os.path.splitext(resolved_manifest_path) 171 self._load_manifest_file(storage_provider, file_content, prefix, file_extension[1:]) 172 173 def _load_manifest_file( 174 self, storage_provider: StorageProvider, file_content: bytes, manifest_base: str, file_type: str 175 ) -> None: 176 """ 177 Loads a manifest. 178 179 :param storage_provider: Storage provider. 180 :param file_content: Manifest file content bytes. 181 :param manifest_base: Manifest file base path. 182 :param file_type: Manifest file type. 183 """ 184 if file_type == "json": 185 manifest_dict = json.loads(file_content.decode("utf-8")) 186 manifest = Manifest.from_dict(manifest_dict) 187 188 # Check manifest version. Not needed once we make the manifest model use sum types/discriminated unions. 189 if manifest.version != "1": 190 raise ValueError(f"Manifest version {manifest.version} is not supported.") 191 192 # Load manifest parts. 193 for manifest_part_reference in manifest.parts: 194 object_metadata: list[ObjectMetadata] = self._load_manifest_part_file( 195 storage_provider=storage_provider, 196 manifest_base=manifest_base, 197 manifest_part_reference=manifest_part_reference, 198 ) 199 200 for object_metadatum in object_metadata: 201 self._files[object_metadatum.key] = object_metadatum 202 else: 203 raise NotImplementedError(f"Manifest file type {file_type} is not supported.") 204 205 def _load_manifest_part_file( 206 self, storage_provider: StorageProvider, manifest_base: str, manifest_part_reference: ManifestPartReference 207 ) -> list[ObjectMetadata]: 208 """ 209 Loads a manifest part. 210 211 :param storage_provider: Storage provider. 212 :param manifest_base: Manifest file base path. Prepend to manifest part reference paths. 213 :param manifest_part_reference: Manifest part reference. 214 """ 215 object_metadata = [] 216 217 if not os.path.isabs(manifest_part_reference.path): 218 remote_path = os.path.join(manifest_base, manifest_part_reference.path) 219 else: 220 remote_path = manifest_part_reference.path 221 manifest_part_file_content = storage_provider.get_object(remote_path) 222 223 # The manifest part is a JSON lines file. Each line is a JSON-serialized ObjectMetadata. 224 for line in io.TextIOWrapper(io.BytesIO(manifest_part_file_content), encoding="utf-8"): 225 object_metadatum_dict = json.loads(line) 226 object_metadatum_dict["content_length"] = object_metadatum_dict.pop("size_bytes") 227 object_metadatum = ObjectMetadata.from_dict(object_metadatum_dict) 228 object_metadata.append(object_metadatum) 229 230 return object_metadata 231 232 def _write_manifest_files(self, storage_provider: StorageProvider, object_metadata: list[ObjectMetadata]) -> None: 233 """ 234 Writes the main manifest and its part files. 235 236 :param storage_provider: The storage provider to use for writing. 237 :param object_metadata: objects to include in manifest. 238 """ 239 if not object_metadata: 240 return 241 242 base_path = self._manifest_path 243 manifest_base_path = base_path 244 245 base_path_parts = base_path.split(os.sep) 246 if DEFAULT_MANIFEST_BASE_DIR in base_path_parts: 247 manifests_index = base_path_parts.index(DEFAULT_MANIFEST_BASE_DIR) 248 if manifests_index > 0: 249 manifest_base_path = os.path.join(*base_path_parts[:manifests_index]) 250 else: 251 manifest_base_path = "" 252 if base_path.startswith(os.sep): 253 manifest_base_path = os.sep + manifest_base_path 254 255 current_time = datetime.now(timezone.utc) 256 current_time_str = current_time.isoformat(timespec="seconds") 257 manifest_folderpath = os.path.join(manifest_base_path, DEFAULT_MANIFEST_BASE_DIR, current_time_str) 258 # We currently write only one part by default 259 part_sequence_number = 1 260 manifest_part_file_path = os.path.join( 261 MANIFEST_PARTS_CHILD_DIR, 262 f"{MANIFEST_PART_PREFIX}{part_sequence_number:0{SEQUENCE_PADDING}}{MANIFEST_PART_SUFFIX}", 263 ) 264 265 manifest = Manifest(version="1", parts=[ManifestPartReference(path=manifest_part_file_path)]) 266 267 # Write single manifest part with metadata as JSON lines (each object on a new line) 268 manifest_part_content = "\n".join( 269 [json.dumps(_metadata_to_manifest_dict(metadata)) for metadata in object_metadata] 270 ) 271 storage_provider.put_object( 272 os.path.join(manifest_folderpath, manifest_part_file_path), manifest_part_content.encode("utf-8") 273 ) 274 275 # Write the main manifest file 276 manifest_file_path = os.path.join(manifest_folderpath, MANIFEST_INDEX_FILENAME) 277 manifest_content = manifest.to_json() 278 storage_provider.put_object(manifest_file_path, manifest_content.encode("utf-8")) 279
[docs] 280 def list_objects( 281 self, 282 path: str, 283 start_after: Optional[str] = None, 284 end_at: Optional[str] = None, 285 include_directories: bool = False, 286 attribute_filter_expression: Optional[str] = None, 287 show_attributes: bool = False, 288 ) -> Iterator[ObjectMetadata]: 289 """ 290 List objects in the manifest. 291 292 :param path: The path to filter objects by. 293 :param start_after: The object to start after. 294 :param end_at: The object to end at. 295 :param include_directories: Whether to include directories. 296 :param attribute_filter_expression: The attribute filter expression to filter objects by. 297 :param show_attributes: This field is not used in this implementation - It will always return attributes. This is present merely to satisfy the interface. 298 """ 299 300 if (start_after is not None) and (end_at is not None) and not (start_after < end_at): 301 raise ValueError(f"start_after ({start_after}) must be before end_at ({end_at})!") 302 303 if path and not path.endswith("/"): 304 path = path + "/" 305 306 # create evaluator for attribute filter expression if present 307 evaluator = ( 308 create_attribute_filter_evaluator(attribute_filter_expression) if attribute_filter_expression else None 309 ) 310 311 # Note that this is a generator, not a tuple (there's no tuple comprehension). 312 keys = ( 313 key 314 for key, obj_metadata in self._files.items() 315 if key.startswith(path) 316 and (start_after is None or start_after < key) 317 and (end_at is None or key <= end_at) 318 and ( 319 evaluator is None or matches_attribute_filter_expression(obj_metadata, evaluator) 320 ) # filter by evaluator if present 321 ) 322 323 pending_directory: Optional[ObjectMetadata] = None 324 for key in sorted(keys): 325 if include_directories: 326 relative = key[len(path) :].lstrip("/") 327 subdirectory = relative.split("/", 1)[0] if "/" in relative else None 328 329 if subdirectory: 330 directory_name = f"{path}{subdirectory}/" 331 332 if pending_directory and pending_directory.key != directory_name: 333 yield pending_directory 334 335 obj_metadata = self.get_object_metadata(key) 336 if not pending_directory or pending_directory.key != directory_name: 337 pending_directory = ObjectMetadata( 338 key=directory_name, 339 type="directory", 340 last_modified=obj_metadata.last_modified, 341 content_length=0, 342 ) 343 else: 344 pending_directory.last_modified = max( 345 pending_directory.last_modified, obj_metadata.last_modified 346 ) 347 continue # Skip yielding this key as it's part of a directory 348 349 obj = self._files[key] 350 obj.key = key # use key without base_path 351 yield obj 352 353 if include_directories and pending_directory: 354 yield pending_directory
355
[docs] 356 def get_object_metadata(self, path: str, include_pending: bool = False) -> ObjectMetadata: 357 if path in self._files: 358 if include_pending and path in self._pending_removes: 359 raise FileNotFoundError(f"Object {path} does not exist.") 360 else: 361 return self._files[path] 362 elif include_pending and path in self._pending_adds: 363 return self._pending_adds[path] 364 else: 365 raise FileNotFoundError(f"Object {path} does not exist.")
366
[docs] 367 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]: 368 """ 369 List objects in the manifest. 370 371 :param pattern: The pattern to filter objects by. 372 :param attribute_filter_expression: The attribute filter expression to filter objects by. 373 """ 374 375 all_objects = [ 376 object.key for object in self.list_objects("", attribute_filter_expression=attribute_filter_expression) 377 ] 378 return [key for key in glob(all_objects, pattern)]
379
[docs] 380 def realpath(self, path: str) -> tuple[str, bool]: 381 exists = path in self._files 382 return path, exists
383
[docs] 384 def add_file(self, path: str, metadata: ObjectMetadata) -> None: 385 if not self.is_writable(): 386 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to add {path}.") 387 self._pending_adds[path] = metadata
388
[docs] 389 def remove_file(self, path: str) -> None: 390 if not self.is_writable(): 391 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to remove {path}.") 392 if path not in self._files: 393 raise FileNotFoundError(f"Object {path} does not exist.") 394 self._pending_removes.add(path)
395
[docs] 396 def is_writable(self) -> bool: 397 return self._writable
398
[docs] 399 def commit_updates(self) -> None: 400 if not self._pending_adds and not self._pending_removes: 401 return 402 403 if self._pending_adds: 404 self._files.update(self._pending_adds) 405 self._pending_adds = {} 406 407 for path in self._pending_removes: 408 self._files.pop(path) 409 self._pending_removes = set() 410 411 # Collect metadata for each object to write out in this part file. 412 object_metadata = [ 413 ObjectMetadata( 414 key=file_path, 415 content_length=metadata.content_length, 416 last_modified=metadata.last_modified, 417 metadata=metadata.metadata, 418 ) 419 for file_path, metadata in self._files.items() 420 ] 421 self._write_manifest_files(self._storage_provider, object_metadata)