Source code for multistorageclient.providers.manifest_metadata

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16from __future__ import annotations  # Enables forward references in type hints
 17
 18import io
 19import json
 20import os
 21from dataclasses import dataclass, asdict
 22from datetime import datetime, timezone
 23from typing import Any, Dict, Iterator, List, Optional, Tuple
 24
 25from ..types import MetadataProvider, ObjectMetadata, StorageProvider
 26from ..utils import glob
 27
 28DEFAULT_MANIFEST_BASE_DIR = ".msc_manifests"
 29MANIFEST_INDEX_FILENAME = "msc_manifest_index.json"
 30MANIFEST_PARTS_CHILD_DIR = "parts"
 31MANIFEST_PART_PREFIX = "msc_manifest_part"
 32MANIFEST_PART_SUFFIX = ".jsonl"  # Suffix for the manifest part files
 33SEQUENCE_PADDING = 6  # Define padding for the sequence number (e.g., 6 for "000001")
 34
 35
 36@dataclass
 37class ManifestPartReference:
 38    """
 39    A data class representing a reference to dataset manifest part.
 40
 41    Attributes:
 42        path (str): The path of the manifest part relative to the main manifest.
 43    """
 44
 45    path: str
 46
 47    @staticmethod
 48    def from_dict(data: Dict[str, Any]) -> ManifestPartReference:
 49        """
 50        Creates a ManifestPartReference instance from a dictionary.
 51        """
 52        # Validate that the required 'path' field is present
 53        if "path" not in data:
 54            raise ValueError("Missing required field: 'path'")
 55
 56        return ManifestPartReference(path=data["path"])
 57
 58    def to_dict(self) -> dict:
 59        """
 60        Converts ManifestPartReference instance to a dictionary.
 61        """
 62        return {
 63            "path": self.path,
 64        }
 65
 66
 67@dataclass
 68class Manifest:
 69    """
 70    A data class representing a dataset manifest.
 71
 72    Attributes:
 73        :version (str): Defines the version of the manifest schema.
 74        :parts (List[ManifestPartReference]): References to manifest parts.
 75    """
 76
 77    version: str
 78    parts: List[ManifestPartReference]
 79
 80    @staticmethod
 81    def from_dict(data: dict) -> "Manifest":
 82        """
 83        Creates a Manifest instance from a dictionary (parsed from JSON).
 84        """
 85        # Perform any necessary validation here
 86        try:
 87            version = data["version"]
 88            parts = [ManifestPartReference.from_dict(part) for part in data["parts"]]
 89        except KeyError as e:
 90            raise ValueError("Invalid manifest data: Missing required field") from e
 91
 92        return Manifest(version=version, parts=parts)
 93
 94    def to_json(self) -> str:
 95        # Convert dataclass to dict and parts to JSON-compatible format
 96        data = asdict(self)
 97        data["parts"] = [part.to_dict() for part in self.parts]
 98        return json.dumps(data)
 99
100
101def _metadata_to_manifest_dict(metadata: ObjectMetadata) -> dict:
102    """
103    Convert an ObjectMetadata instance to a dictionary suitable with manifest format,
104    replacing 'content_length' with 'size_bytes' and removing 'content_length'.
105    """
106    metadata_dict = metadata.to_dict()
107    # Pop out content_length, store it in size_bytes
108    size_bytes = metadata_dict.pop("content_length", None)
109    metadata_dict["size_bytes"] = size_bytes
110    return metadata_dict
111
112
[docs] 113class ManifestMetadataProvider(MetadataProvider): 114 _storage_provider: StorageProvider 115 _files: Dict[str, ObjectMetadata] 116 _pending_adds: Dict[str, ObjectMetadata] 117 _pending_removes: list[str] 118 _manifest_path: str 119 _writable: bool 120 121 def __init__(self, storage_provider: StorageProvider, manifest_path: str, writable: bool = False) -> None: 122 """ 123 Creates a :py:class:`ManifestMetadataProvider`. 124 125 :param storage_provider: Storage provider. 126 :param manifest_path: Main manifest file path. 127 :param writable: If true, allows modifications and new manifests to be written. 128 """ 129 self._storage_provider = storage_provider 130 self._files = {} 131 self._pending_adds = {} 132 self._pending_removes = [] 133 self._manifest_path = manifest_path 134 self._writable = writable 135 136 self._load_manifest(storage_provider, self._manifest_path) 137 138 def _load_manifest(self, storage_provider: StorageProvider, manifest_path: str) -> None: 139 """ 140 Loads manifest. 141 142 :param storage_provider: Storage provider. 143 :param manifest_path: Main manifest file path 144 """ 145 146 def helper_find_manifest_file(manifest_path: str) -> str: 147 if storage_provider.is_file(manifest_path): 148 return manifest_path 149 150 if storage_provider.is_file(os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)): 151 return os.path.join(manifest_path, MANIFEST_INDEX_FILENAME) 152 153 # Now go looking and select newest manifest. 154 if DEFAULT_MANIFEST_BASE_DIR not in manifest_path.split("/"): 155 manifest_path = os.path.join(manifest_path, DEFAULT_MANIFEST_BASE_DIR) 156 157 candidates = storage_provider.glob(os.path.join(manifest_path, "*", MANIFEST_INDEX_FILENAME)) 158 candidates = sorted(candidates) 159 return candidates[-1] if candidates else "" 160 161 manifest_path = helper_find_manifest_file(manifest_path) 162 if not manifest_path: 163 return 164 165 file_content = storage_provider.get_object(manifest_path) 166 167 prefix = os.path.dirname(manifest_path) 168 _, file_extension = os.path.splitext(manifest_path) 169 self._load_manifest_file(storage_provider, file_content, prefix, file_extension[1:]) 170 171 def _load_manifest_file( 172 self, storage_provider: StorageProvider, file_content: bytes, manifest_base: str, file_type: str 173 ) -> None: 174 """ 175 Loads a manifest. 176 177 :param storage_provider: Storage provider. 178 :param file_content: Manifest file content bytes. 179 :param manifest_base: Manifest file base path. 180 :param file_type: Manifest file type. 181 """ 182 if file_type == "json": 183 manifest_dict = json.loads(file_content.decode("utf-8")) 184 manifest = Manifest.from_dict(manifest_dict) 185 186 # Check manifest version. Not needed once we make the manifest model use sum types/discriminated unions. 187 if manifest.version != "1": 188 raise ValueError(f"Manifest version {manifest.version} is not supported.") 189 190 # Load manifest parts. 191 for manifest_part_reference in manifest.parts: 192 object_metadata: List[ObjectMetadata] = self._load_manifest_part_file( 193 storage_provider=storage_provider, 194 manifest_base=manifest_base, 195 manifest_part_reference=manifest_part_reference, 196 ) 197 198 for object_metadatum in object_metadata: 199 self._files[object_metadatum.key] = object_metadatum 200 else: 201 raise NotImplementedError(f"Manifest file type {file_type} is not supported.") 202 203 def _load_manifest_part_file( 204 self, storage_provider: StorageProvider, manifest_base: str, manifest_part_reference: ManifestPartReference 205 ) -> List[ObjectMetadata]: 206 """ 207 Loads a manifest part. 208 209 :param storage_provider: Storage provider. 210 :param manifest_base: Manifest file base path. Prepend to manifest part reference paths. 211 :param manifest_part_reference: Manifest part reference. 212 """ 213 object_metadata = [] 214 215 if not os.path.isabs(manifest_part_reference.path): 216 remote_path = os.path.join(manifest_base, manifest_part_reference.path) 217 else: 218 remote_path = manifest_part_reference.path 219 manifest_part_file_content = storage_provider.get_object(remote_path) 220 221 # The manifest part is a JSON lines file. Each line is a JSON-serialized ObjectMetadata. 222 for line in io.TextIOWrapper(io.BytesIO(manifest_part_file_content), encoding="utf-8"): 223 object_metadatum_dict = json.loads(line) 224 object_metadatum_dict["content_length"] = object_metadatum_dict.pop("size_bytes") 225 object_metadatum = ObjectMetadata.from_dict(object_metadatum_dict) 226 object_metadata.append(object_metadatum) 227 228 return object_metadata 229 230 def _write_manifest_files(self, storage_provider: StorageProvider, object_metadata: List[ObjectMetadata]) -> None: 231 """ 232 Writes the main manifest and its part files. 233 234 Args: 235 storage_provider (StorageProvider): The storage provider to use for writing. 236 object_metadata (List[ObjectMetadata]): objects to include in manifest. 237 """ 238 239 def helper_write_file_to_storage(storage_provider: StorageProvider, path: str, content: str) -> None: 240 # Convert content to bytes and write it to the storage provider 241 storage_provider.put_object(path, content.encode("utf-8")) 242 243 base_path = self._manifest_path 244 manifest_base_path = base_path 245 246 base_path_parts = base_path.split(os.sep) 247 if DEFAULT_MANIFEST_BASE_DIR in base_path_parts: 248 manifests_index = base_path_parts.index(DEFAULT_MANIFEST_BASE_DIR) 249 if manifests_index > 0: 250 manifest_base_path = os.path.join(*base_path_parts[:manifests_index]) 251 else: 252 manifest_base_path = "" 253 if base_path.startswith(os.sep): 254 manifest_base_path = os.sep + manifest_base_path 255 256 current_time = datetime.now(timezone.utc) 257 current_time_str = current_time.isoformat(timespec="seconds") 258 manifest_folderpath = os.path.join(manifest_base_path, DEFAULT_MANIFEST_BASE_DIR, current_time_str) 259 # We currently write only one part by default 260 part_sequence_number = 1 261 manifest_part_file_path = os.path.join( 262 MANIFEST_PARTS_CHILD_DIR, 263 f"{MANIFEST_PART_PREFIX}{part_sequence_number:0{SEQUENCE_PADDING}}{MANIFEST_PART_SUFFIX}", 264 ) 265 266 manifest = Manifest(version="1", parts=[ManifestPartReference(path=manifest_part_file_path)]) 267 268 # Write single manifest part with metadata as JSON lines (each object on a new line) 269 manifest_part_content = "\n".join( 270 [json.dumps(_metadata_to_manifest_dict(metadata)) for metadata in object_metadata] 271 ) 272 storage_provider.put_object( 273 os.path.join(manifest_folderpath, manifest_part_file_path), manifest_part_content.encode("utf-8") 274 ) 275 276 # Write the main manifest file 277 manifest_file_path = os.path.join(manifest_folderpath, MANIFEST_INDEX_FILENAME) 278 manifest_content = manifest.to_json() 279 storage_provider.put_object(manifest_file_path, manifest_content.encode("utf-8")) 280
[docs] 281 def list_objects( 282 self, 283 prefix: str, 284 start_after: Optional[str] = None, 285 end_at: Optional[str] = None, 286 include_directories: bool = False, 287 ) -> Iterator[ObjectMetadata]: 288 if (start_after is not None) and (end_at is not None) and not (start_after < end_at): 289 raise ValueError(f"start_after ({start_after}) must be before end_at ({end_at})!") 290 291 # Note that this is a generator, not a tuple (there's no tuple comprehension). 292 keys = ( 293 key 294 for key in self._files 295 if key.startswith(prefix) 296 and (start_after is None or start_after < key) 297 and (end_at is None or key <= end_at) 298 ) 299 300 pending_directory: Optional[ObjectMetadata] = None 301 for key in sorted(keys): 302 if include_directories: 303 subdirectory = key.split("/", 1)[0] if "/" in key else None 304 305 if subdirectory: 306 directory_name = f"{prefix}{subdirectory}/" 307 308 if pending_directory and pending_directory.key != directory_name: 309 yield pending_directory 310 311 obj_metadata = self.get_object_metadata(key) 312 if not pending_directory or pending_directory.key != directory_name: 313 pending_directory = ObjectMetadata( 314 key=directory_name, 315 type="directory", 316 last_modified=obj_metadata.last_modified, 317 content_length=0, 318 ) 319 else: 320 pending_directory.last_modified = max( 321 pending_directory.last_modified, obj_metadata.last_modified 322 ) 323 continue # Skip yielding this key as it's part of a directory 324 325 obj = self._files[key] 326 obj.key = key # use key without base_path 327 yield obj 328 329 if include_directories and pending_directory: 330 yield pending_directory
331
[docs] 332 def get_object_metadata(self, path: str, include_pending: bool = False) -> ObjectMetadata: 333 if path in self._files: 334 if include_pending and path in self._pending_removes: 335 raise FileNotFoundError(f"Object {path} does not exist.") 336 else: 337 return self._files[path] 338 elif include_pending and path in self._pending_adds: 339 return self._pending_adds[path] 340 else: 341 raise FileNotFoundError(f"Object {path} does not exist.")
342
[docs] 343 def glob(self, pattern: str) -> List[str]: 344 all_objects = [object.key for object in self.list_objects("")] 345 return [key for key in glob(all_objects, pattern)]
346
[docs] 347 def realpath(self, path: str) -> Tuple[str, bool]: 348 exists = path in self._files 349 return path, exists
350
[docs] 351 def add_file(self, path: str, metadata: ObjectMetadata) -> None: 352 if not self.is_writable(): 353 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to add {path}.") 354 self._pending_adds[path] = metadata
355
[docs] 356 def remove_file(self, path: str) -> None: 357 if not self.is_writable(): 358 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to remove {path}.") 359 if path not in self._files: 360 raise FileNotFoundError(f"Object {path} does not exist.") 361 self._pending_removes.append(path)
362
[docs] 363 def is_writable(self) -> bool: 364 return self._writable
365
[docs] 366 def commit_updates(self) -> None: 367 if not self._pending_adds and not self._pending_removes: 368 return 369 370 if self._pending_adds: 371 self._files.update(self._pending_adds) 372 self._pending_adds = {} 373 374 for path in self._pending_removes: 375 self._files.pop(path) 376 self._pending_removes = [] 377 378 # Collect metadata for each object to write out in this part file. 379 object_metadata = [ 380 ObjectMetadata(key=file_path, content_length=metadata.content_length, last_modified=metadata.last_modified) 381 for file_path, metadata in self._files.items() 382 ] 383 384 self._write_manifest_files(self._storage_provider, object_metadata)