Source code for multistorageclient.providers.manifest_metadata

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16from __future__ import annotations  # Enables forward references in type hints
 17
 18import io
 19import json
 20import os
 21from dataclasses import dataclass, asdict
 22from datetime import datetime, timezone
 23from typing import Any, Dict, Iterator, List, Optional, Tuple
 24
 25from ..types import MetadataProvider, ObjectMetadata, StorageProvider
 26from ..utils import glob
 27
 28DEFAULT_MANIFEST_BASE_DIR = ".msc_manifests"
 29MANIFEST_INDEX_FILENAME = "msc_manifest_index.json"
 30MANIFEST_PART_PREFIX = "msc_manifest_part"
 31MANIFEST_PART_SUFFIX = ".jsonl"  # Suffix for the manifest part files
 32SEQUENCE_PADDING = 6  # Define padding for the sequence number (e.g., 6 for "000001")
 33
 34
 35@dataclass
 36class ManifestPartReference:
 37    """
 38    A data class representing a reference to dataset manifest part.
 39
 40    Attributes:
 41        path (str): The path of the manifest part relative to the main manifest.
 42    """
 43
 44    path: str
 45
 46    @staticmethod
 47    def from_dict(data: Dict[str, Any]) -> ManifestPartReference:
 48        """
 49        Creates a ManifestPartReference instance from a dictionary.
 50        """
 51        # Validate that the required 'path' field is present
 52        if "path" not in data:
 53            raise ValueError("Missing required field: 'path'")
 54
 55        return ManifestPartReference(path=data["path"])
 56
 57    def to_dict(self) -> dict:
 58        """
 59        Converts ManifestPartReference instance to a dictionary.
 60        """
 61        return {
 62            "path": self.path,
 63        }
 64
 65
 66@dataclass
 67class Manifest:
 68    """
 69    A data class representing a dataset manifest.
 70
 71    Attributes:
 72        :version (str): Defines the version of the manifest schema.
 73        :parts (List[ManifestPartReference]): References to manifest parts.
 74    """
 75
 76    version: str
 77    parts: List[ManifestPartReference]
 78
 79    @staticmethod
 80    def from_dict(data: dict) -> "Manifest":
 81        """
 82        Creates a Manifest instance from a dictionary (parsed from JSON).
 83        """
 84        # Perform any necessary validation here
 85        try:
 86            version = data["version"]
 87            parts = [ManifestPartReference.from_dict(part) for part in data["parts"]]
 88        except KeyError as e:
 89            raise ValueError("Invalid manifest data: Missing required field") from e
 90
 91        return Manifest(version=version, parts=parts)
 92
 93    def to_json(self) -> str:
 94        # Convert dataclass to dict and parts to JSON-compatible format
 95        data = asdict(self)
 96        data["parts"] = [part.to_dict() for part in self.parts]
 97        return json.dumps(data)
 98
 99
100def _metadata_to_manifest_dict(metadata: ObjectMetadata) -> dict:
101    """
102    Convert an ObjectMetadata instance to a dictionary suitable with manifest format,
103    replacing 'content_length' with 'size_bytes' and removing 'content_length'.
104    """
105    metadata_dict = metadata.to_dict()
106    # Pop out content_length, store it in size_bytes
107    size_bytes = metadata_dict.pop("content_length", None)
108    metadata_dict["size_bytes"] = size_bytes
109    return metadata_dict
110
111
[docs] 112class ManifestMetadataProvider(MetadataProvider): 113 _storage_provider: StorageProvider 114 _files: Dict[str, ObjectMetadata] 115 _pending_adds: Dict[str, ObjectMetadata] 116 _pending_removes: list[str] 117 _manifest_path: str 118 _writable: bool 119 120 def __init__(self, storage_provider: StorageProvider, manifest_path: str, writable: bool = False) -> None: 121 """ 122 Creates a :py:class:`ManifestMetadataProvider`. 123 124 :param storage_provider: Storage provider. 125 :param manifest_path: Main manifest file path. 126 :param writable: If true, allows modifications and new manifests to be written. 127 """ 128 self._storage_provider = storage_provider 129 self._files = {} 130 self._pending_adds = {} 131 self._pending_removes = [] 132 self._manifest_path = manifest_path 133 self._writable = writable 134 135 self._load_manifest(storage_provider, self._manifest_path) 136 137 def _load_manifest(self, storage_provider: StorageProvider, manifest_path: str) -> None: 138 """ 139 Loads manifest. 140 141 :param storage_provider: Storage provider. 142 :param manifest_path: Main manifest file path 143 """ 144 145 def helper_find_manifest_file(manifest_path: str) -> str: 146 if storage_provider.is_file(manifest_path): 147 return manifest_path 148 149 if storage_provider.is_file(os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)): 150 return os.path.join(manifest_path, MANIFEST_INDEX_FILENAME) 151 152 # Now go looking and select newest manifest. 153 if DEFAULT_MANIFEST_BASE_DIR not in manifest_path.split("/"): 154 manifest_path = os.path.join(manifest_path, DEFAULT_MANIFEST_BASE_DIR) 155 156 candidates = storage_provider.glob(os.path.join(manifest_path, "*", MANIFEST_INDEX_FILENAME)) 157 candidates = sorted(candidates) 158 return candidates[-1] if candidates else "" 159 160 manifest_path = helper_find_manifest_file(manifest_path) 161 if not manifest_path: 162 return 163 164 file_content = storage_provider.get_object(manifest_path) 165 166 prefix = os.path.dirname(manifest_path) 167 _, file_extension = os.path.splitext(manifest_path) 168 self._load_manifest_file(storage_provider, file_content, prefix, file_extension[1:]) 169 170 def _load_manifest_file( 171 self, storage_provider: StorageProvider, file_content: bytes, manifest_base: str, file_type: str 172 ) -> None: 173 """ 174 Loads a manifest. 175 176 :param storage_provider: Storage provider. 177 :param file_content: Manifest file content bytes. 178 :param manifest_base: Manifest file base path. 179 :param file_type: Manifest file type. 180 """ 181 if file_type == "json": 182 manifest_dict = json.loads(file_content.decode("utf-8")) 183 manifest = Manifest.from_dict(manifest_dict) 184 185 # Check manifest version. Not needed once we make the manifest model use sum types/discriminated unions. 186 if manifest.version != "1": 187 raise ValueError(f"Manifest version {manifest.version} is not supported.") 188 189 # Load manifest parts. 190 for manifest_part_reference in manifest.parts: 191 object_metadata: List[ObjectMetadata] = self._load_manifest_part_file( 192 storage_provider=storage_provider, 193 manifest_base=manifest_base, 194 manifest_part_reference=manifest_part_reference, 195 ) 196 197 for object_metadatum in object_metadata: 198 self._files[object_metadatum.key] = object_metadatum 199 else: 200 raise NotImplementedError(f"Manifest file type {file_type} is not supported.") 201 202 def _load_manifest_part_file( 203 self, storage_provider: StorageProvider, manifest_base: str, manifest_part_reference: ManifestPartReference 204 ) -> List[ObjectMetadata]: 205 """ 206 Loads a manifest part. 207 208 :param storage_provider: Storage provider. 209 :param manifest_base: Manifest file base path. Prepend to manifest part reference paths. 210 :param manifest_part_reference: Manifest part reference. 211 """ 212 object_metadata = [] 213 214 if not os.path.isabs(manifest_part_reference.path): 215 remote_path = os.path.join(manifest_base, manifest_part_reference.path) 216 else: 217 remote_path = manifest_part_reference.path 218 manifest_part_file_content = storage_provider.get_object(remote_path) 219 220 # The manifest part is a JSON lines file. Each line is a JSON-serialized ObjectMetadata. 221 for line in io.TextIOWrapper(io.BytesIO(manifest_part_file_content), encoding="utf-8"): 222 object_metadatum_dict = json.loads(line) 223 object_metadatum_dict["content_length"] = object_metadatum_dict.pop("size_bytes") 224 object_metadatum = ObjectMetadata.from_dict(object_metadatum_dict) 225 object_metadata.append(object_metadatum) 226 227 return object_metadata 228 229 def _write_manifest_files(self, storage_provider: StorageProvider, object_metadata: List[ObjectMetadata]) -> None: 230 """ 231 Writes the main manifest and its part files. 232 233 Args: 234 storage_provider (StorageProvider): The storage provider to use for writing. 235 object_metadata (List[ObjectMetadata]): objects to include in manifest. 236 """ 237 238 def helper_write_file_to_storage(storage_provider: StorageProvider, path: str, content: str) -> None: 239 # Convert content to bytes and write it to the storage provider 240 storage_provider.put_object(path, content.encode("utf-8")) 241 242 base_path = self._manifest_path 243 manifest_base_path = base_path 244 245 base_path_parts = base_path.split(os.sep) 246 if DEFAULT_MANIFEST_BASE_DIR in base_path_parts: 247 manifests_index = base_path_parts.index(DEFAULT_MANIFEST_BASE_DIR) 248 if manifests_index > 0: 249 manifest_base_path = os.path.join(*base_path_parts[:manifests_index]) 250 else: 251 manifest_base_path = "" 252 if base_path.startswith(os.sep): 253 manifest_base_path = os.sep + manifest_base_path 254 255 current_time = datetime.now(timezone.utc) 256 current_time_str = current_time.isoformat(timespec="seconds") 257 manifest_folderpath = os.path.join(manifest_base_path, DEFAULT_MANIFEST_BASE_DIR, current_time_str) 258 # We currently write only one part by default 259 part_sequence_number = 1 260 manifest_part_file_path = os.path.join( 261 "parts", f"{MANIFEST_PART_PREFIX}{part_sequence_number:0{SEQUENCE_PADDING}}{MANIFEST_PART_SUFFIX}" 262 ) 263 264 manifest = Manifest(version="1", parts=[ManifestPartReference(path=manifest_part_file_path)]) 265 266 # Write single manifest part with metadata as JSON lines (each object on a new line) 267 manifest_part_content = "\n".join( 268 [json.dumps(_metadata_to_manifest_dict(metadata)) for metadata in object_metadata] 269 ) 270 storage_provider.put_object( 271 os.path.join(manifest_folderpath, manifest_part_file_path), manifest_part_content.encode("utf-8") 272 ) 273 274 # Write the main manifest file 275 manifest_file_path = os.path.join(manifest_folderpath, MANIFEST_INDEX_FILENAME) 276 manifest_content = manifest.to_json() 277 storage_provider.put_object(manifest_file_path, manifest_content.encode("utf-8")) 278
[docs] 279 def list_objects( 280 self, prefix: str, start_after: Optional[str] = None, end_at: Optional[str] = None 281 ) -> Iterator[ObjectMetadata]: 282 if (start_after is not None) and (end_at is not None) and not (start_after < end_at): 283 raise ValueError(f"start_after ({start_after}) must be before end_at ({end_at})!") 284 285 # Note that this is a generator, not a tuple (there's no tuple comprehension). 286 keys = ( 287 key 288 for key in self._files 289 if key.startswith(prefix) 290 and (start_after is None or start_after < key) 291 and (end_at is None or key <= end_at) 292 ) 293 294 # Dictionaries don't guarantee lexicographical order. 295 for key in sorted(keys): 296 yield ObjectMetadata(key=key, content_length=0, last_modified=datetime.now(timezone.utc))
297
[docs] 298 def get_object_metadata(self, path: str) -> ObjectMetadata: 299 metadata = self._files.get(path, None) 300 if metadata is None: 301 raise FileNotFoundError(f"Object {path} does not exist.") 302 return metadata
303
[docs] 304 def glob(self, pattern: str) -> List[str]: 305 all_objects = [object.key for object in self.list_objects("")] 306 return [key for key in glob(all_objects, pattern)]
307
[docs] 308 def realpath(self, path: str) -> Tuple[str, bool]: 309 exists = path in self._files 310 return path, exists
311
[docs] 312 def add_file(self, path: str, metadata: ObjectMetadata) -> None: 313 if not self.is_writable(): 314 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to add {path}.") 315 self._pending_adds[path] = metadata
316
[docs] 317 def remove_file(self, path: str) -> None: 318 if not self.is_writable(): 319 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to remove {path}.") 320 if path not in self._files: 321 raise FileNotFoundError(f"Object {path} does not exist.") 322 self._pending_removes.append(path)
323
[docs] 324 def is_writable(self) -> bool: 325 return self._writable
326
[docs] 327 def commit_updates(self) -> None: 328 if not self._pending_adds and not self._pending_removes: 329 return 330 331 if self._pending_adds: 332 self._files.update(self._pending_adds) 333 self._pending_adds = {} 334 335 for path in self._pending_removes: 336 self._files.pop(path) 337 338 # Collect metadata for each object to write out in this part file. 339 object_metadata = [ 340 ObjectMetadata(key=file_path, content_length=metadata.content_length, last_modified=metadata.last_modified) 341 for file_path, metadata in self._files.items() 342 ] 343 344 self._write_manifest_files(self._storage_provider, object_metadata)