Source code for multistorageclient.providers.manifest_metadata
1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from __future__ import annotations # Enables forward references in type hints
17
18import json
19import logging
20import os
21from collections.abc import Iterator
22from dataclasses import asdict, dataclass
23from datetime import datetime, timezone
24from typing import Any, Optional, Union
25
26from ..types import (
27 AWARE_DATETIME_MIN,
28 MetadataProvider,
29 ObjectMetadata,
30 ResolvedPath,
31 ResolvedPathState,
32 StorageProvider,
33)
34from ..utils import create_attribute_filter_evaluator, glob, matches_attribute_filter_expression
35from .manifest_formats import ManifestFormat, get_format_handler
36from .manifest_object_metadata import ManifestObjectMetadata
37
38logger = logging.getLogger(__name__)
39
40
41DEFAULT_MANIFEST_BASE_DIR = ".msc_manifests"
42MANIFEST_INDEX_FILENAME = "msc_manifest_index.json"
43MANIFEST_PARTS_CHILD_DIR = "parts"
44MANIFEST_PART_PREFIX = "msc_manifest_part"
45SEQUENCE_PADDING = 6 # Define padding for the sequence number (e.g., 6 for "000001")
46
47
[docs]
48@dataclass
49class ManifestPartReference:
50 """
51 A data class representing a reference to dataset manifest part.
52 """
53
54 #: The path of the manifest part relative to the main manifest.
55 path: str
56
[docs]
57 @staticmethod
58 def from_dict(data: dict[str, Any]) -> ManifestPartReference:
59 """
60 Creates a ManifestPartReference instance from a dictionary.
61 """
62 # Validate that the required 'path' field is present
63 if "path" not in data:
64 raise ValueError("Missing required field: 'path'")
65
66 return ManifestPartReference(path=data["path"])
67
[docs]
68 def to_dict(self) -> dict:
69 """
70 Converts ManifestPartReference instance to a dictionary.
71 """
72 return {
73 "path": self.path,
74 }
75
76
[docs]
77@dataclass
78class Manifest:
79 """
80 A data class representing a dataset manifest.
81 """
82
83 #: Defines the version of the manifest schema.
84 version: str
85 #: References to manifest parts.
86 parts: list[ManifestPartReference]
87 #: Format of manifest parts (jsonl or parquet).
88 format: str = "jsonl"
89
[docs]
90 @staticmethod
91 def from_dict(data: dict) -> "Manifest":
92 """
93 Creates a Manifest instance from a dictionary (parsed from JSON).
94 """
95 try:
96 version = data["version"]
97 parts = [ManifestPartReference.from_dict(part) for part in data["parts"]]
98 format = data.get("format", "jsonl")
99 except KeyError as e:
100 raise ValueError("Invalid manifest data: Missing required field") from e
101
102 return Manifest(version=version, parts=parts, format=format)
103
[docs]
104 def to_json(self) -> str:
105 data = asdict(self)
106 data["parts"] = [part.to_dict() for part in self.parts]
107 return json.dumps(data)
108
109
[docs]
110class ManifestMetadataProvider(MetadataProvider):
111 _storage_provider: StorageProvider
112 _files: dict[str, ManifestObjectMetadata]
113 _pending_adds: dict[str, ManifestObjectMetadata]
114 _pending_removes: set[str]
115 _manifest_path: str
116 _writable: bool
117 _allow_overwrites: bool
118 _format: Union[ManifestFormat, str]
119
120 def __init__(
121 self,
122 storage_provider: StorageProvider,
123 manifest_path: str,
124 writable: bool = False,
125 allow_overwrites: bool = False,
126 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL,
127 ) -> None:
128 """
129 Creates a :py:class:`ManifestMetadataProvider`.
130
131 :param storage_provider: Storage provider.
132 :param manifest_path: Main manifest file path.
133 :param writable: If true, allows modifications and new manifests to be written.
134 :param allow_overwrites: If true, allows overwriting existing files without error.
135 :param manifest_format: Format for manifest parts. Defaults to ManifestFormat.JSONL.
136 """
137 self._storage_provider = storage_provider
138 self._files = {}
139 self._pending_adds = {}
140 self._pending_removes = set()
141 self._manifest_path = manifest_path
142 self._writable = writable
143 self._allow_overwrites = allow_overwrites
144 self._format = (
145 manifest_format if isinstance(manifest_format, ManifestFormat) else ManifestFormat(manifest_format)
146 )
147
148 self._load_manifest(storage_provider, self._manifest_path)
149
150 def _load_manifest(self, storage_provider: StorageProvider, manifest_path: str) -> None:
151 """
152 Loads manifest.
153
154 :param storage_provider: Storage provider.
155 :param manifest_path: Main manifest file path
156 """
157
158 def helper_find_manifest_file(manifest_path: str) -> str:
159 if storage_provider.is_file(manifest_path):
160 return manifest_path
161
162 if storage_provider.is_file(os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)):
163 return os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)
164
165 # Now go looking and select newest manifest.
166 if DEFAULT_MANIFEST_BASE_DIR not in manifest_path.split("/"):
167 manifest_path = os.path.join(manifest_path, DEFAULT_MANIFEST_BASE_DIR)
168
169 candidates = storage_provider.glob(os.path.join(manifest_path, "*", MANIFEST_INDEX_FILENAME))
170 candidates = sorted(candidates)
171 return candidates[-1] if candidates else ""
172
173 resolved_manifest_path = helper_find_manifest_file(manifest_path)
174 if not resolved_manifest_path:
175 logger.warning(f"No manifest found at '{manifest_path}'.")
176 return
177
178 file_content = storage_provider.get_object(resolved_manifest_path)
179
180 prefix = os.path.dirname(resolved_manifest_path)
181 _, file_extension = os.path.splitext(resolved_manifest_path)
182 self._load_manifest_file(storage_provider, file_content, prefix, file_extension[1:])
183
184 def _load_manifest_file(
185 self, storage_provider: StorageProvider, file_content: bytes, manifest_base: str, file_type: str
186 ) -> None:
187 """
188 Loads a manifest.
189
190 :param storage_provider: Storage provider.
191 :param file_content: Manifest file content bytes.
192 :param manifest_base: Manifest file base path.
193 :param file_type: Manifest file type.
194 """
195 if file_type == "json":
196 manifest_dict = json.loads(file_content.decode("utf-8"))
197 manifest = Manifest.from_dict(manifest_dict)
198
199 # Check manifest version. Not needed once we make the manifest model use sum types/discriminated unions.
200 if manifest.version != "1":
201 raise ValueError(f"Manifest version {manifest.version} is not supported.")
202
203 # Load manifest parts.
204 for manifest_part_reference in manifest.parts:
205 object_metadata_list: list[ManifestObjectMetadata] = self._load_manifest_part_file(
206 storage_provider=storage_provider,
207 manifest_base=manifest_base,
208 manifest_part_reference=manifest_part_reference,
209 manifest_format=manifest.format,
210 )
211
212 for object_metadatum in object_metadata_list:
213 self._files[object_metadatum.key] = object_metadatum
214 else:
215 raise NotImplementedError(f"Manifest file type {file_type} is not supported.")
216
217 def _load_manifest_part_file(
218 self,
219 storage_provider: StorageProvider,
220 manifest_base: str,
221 manifest_part_reference: ManifestPartReference,
222 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL,
223 ) -> list[ManifestObjectMetadata]:
224 """
225 Loads a manifest part and converts to ManifestObjectMetadata.
226
227 :param storage_provider: Storage provider.
228 :param manifest_base: Manifest file base path. Prepend to manifest part reference paths.
229 :param manifest_part_reference: Manifest part reference.
230 :param manifest_format: Format of the manifest part (jsonl or parquet).
231 """
232 if not os.path.isabs(manifest_part_reference.path):
233 remote_path = os.path.join(manifest_base, manifest_part_reference.path)
234 else:
235 remote_path = manifest_part_reference.path
236 manifest_part_file_content = storage_provider.get_object(remote_path)
237
238 _, ext = os.path.splitext(remote_path)
239 detected_format = ext[1:] if ext else manifest_format
240
241 format_handler = get_format_handler(detected_format)
242 object_metadata_list = format_handler.read_part(manifest_part_file_content)
243
244 # Convert ObjectMetadata to ManifestObjectMetadata
245 # The format handler returns ObjectMetadata, but may have extra attributes set (like physical_path)
246 return [ManifestObjectMetadata.from_object_metadata(obj) for obj in object_metadata_list]
247
248 def _write_manifest_files(
249 self,
250 storage_provider: StorageProvider,
251 object_metadata: list[ManifestObjectMetadata],
252 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL,
253 ) -> None:
254 """
255 Writes the main manifest and its part files.
256
257 Accepts ManifestObjectMetadata which extends ObjectMetadata, so format handlers
258 (which expect ObjectMetadata) can serialize it, preserving all fields including physical_path.
259
260 :param storage_provider: The storage provider to use for writing.
261 :param object_metadata: ManifestObjectMetadata objects to include in manifest.
262 :param manifest_format: Format for manifest parts. Defaults to ManifestFormat.JSONL.
263 """
264 if not object_metadata:
265 return
266
267 base_path = self._manifest_path
268 manifest_base_path = base_path
269
270 base_path_parts = base_path.split(os.sep)
271 if DEFAULT_MANIFEST_BASE_DIR in base_path_parts:
272 manifests_index = base_path_parts.index(DEFAULT_MANIFEST_BASE_DIR)
273 if manifests_index > 0:
274 manifest_base_path = os.path.join(*base_path_parts[:manifests_index])
275 else:
276 manifest_base_path = ""
277 if base_path.startswith(os.sep):
278 manifest_base_path = os.sep + manifest_base_path
279
280 current_time = datetime.now(timezone.utc)
281 current_time_str = current_time.isoformat(timespec="seconds")
282 manifest_folderpath = os.path.join(manifest_base_path, DEFAULT_MANIFEST_BASE_DIR, current_time_str)
283
284 format_handler = get_format_handler(manifest_format)
285 suffix = format_handler.get_file_suffix()
286
287 # We currently write only one part by default.
288 part_sequence_number = 1
289 manifest_part_file_path = os.path.join(
290 MANIFEST_PARTS_CHILD_DIR,
291 f"{MANIFEST_PART_PREFIX}{part_sequence_number:0{SEQUENCE_PADDING}}{suffix}",
292 )
293
294 format_value = manifest_format.value if isinstance(manifest_format, ManifestFormat) else manifest_format
295 manifest = Manifest(
296 version="1", parts=[ManifestPartReference(path=manifest_part_file_path)], format=format_value
297 )
298
299 manifest_part_content = format_handler.write_part(list(object_metadata))
300 storage_provider.put_object(os.path.join(manifest_folderpath, manifest_part_file_path), manifest_part_content)
301
302 manifest_file_path = os.path.join(manifest_folderpath, MANIFEST_INDEX_FILENAME)
303 manifest_content = manifest.to_json()
304 storage_provider.put_object(manifest_file_path, manifest_content.encode("utf-8"))
305
[docs]
306 def list_objects(
307 self,
308 path: str,
309 start_after: Optional[str] = None,
310 end_at: Optional[str] = None,
311 include_directories: bool = False,
312 attribute_filter_expression: Optional[str] = None,
313 show_attributes: bool = False,
314 ) -> Iterator[ObjectMetadata]:
315 """
316 List objects in the manifest.
317
318 :param path: The path to filter objects by.
319 :param start_after: The object to start after.
320 :param end_at: The object to end at.
321 :param include_directories: Whether to include directories.
322 :param attribute_filter_expression: The attribute filter expression to filter objects by.
323 :param show_attributes: This field is not used in this implementation - It will always return attributes. This is present merely to satisfy the interface.
324 """
325
326 if (start_after is not None) and (end_at is not None) and not (start_after < end_at):
327 raise ValueError(f"start_after ({start_after}) must be before end_at ({end_at})!")
328
329 if path and not path.endswith("/"):
330 path = path + "/"
331
332 # create evaluator for attribute filter expression if present
333 evaluator = (
334 create_attribute_filter_evaluator(attribute_filter_expression) if attribute_filter_expression else None
335 )
336
337 # Note that this is a generator, not a tuple (there's no tuple comprehension).
338 keys = (
339 key
340 for key, obj_metadata in self._files.items()
341 if key.startswith(path)
342 and (start_after is None or start_after < key)
343 and (end_at is None or key <= end_at)
344 and (
345 evaluator is None or matches_attribute_filter_expression(obj_metadata, evaluator)
346 ) # filter by evaluator if present
347 )
348
349 pending_directory: Optional[ObjectMetadata] = None
350 for key in sorted(keys):
351 if include_directories:
352 relative = key[len(path) :].lstrip("/")
353 subdirectory = relative.split("/", 1)[0] if "/" in relative else None
354
355 if subdirectory:
356 directory_name = f"{path}{subdirectory}/"
357
358 if pending_directory and pending_directory.key != directory_name:
359 yield pending_directory
360
361 obj_metadata = self.get_object_metadata(key)
362 if not pending_directory or pending_directory.key != directory_name:
363 pending_directory = ObjectMetadata(
364 key=directory_name,
365 type="directory",
366 last_modified=obj_metadata.last_modified,
367 content_length=0,
368 )
369 else:
370 pending_directory.last_modified = max(
371 pending_directory.last_modified, obj_metadata.last_modified
372 )
373 continue # Skip yielding this key as it's part of a directory
374
375 yield self._files[key]
376
377 if include_directories and pending_directory:
378 yield pending_directory
379
[docs]
380 def get_object_metadata(self, path: str, include_pending: bool = False) -> ObjectMetadata:
381 if path in self._files:
382 if include_pending and path in self._pending_removes:
383 raise FileNotFoundError(f"Object {path} does not exist.")
384 else:
385 # Return ManifestObjectMetadata directly (it extends ObjectMetadata)
386 return self._files[path]
387 elif include_pending and path in self._pending_adds:
388 return self._pending_adds[path]
389 else:
390 return self._get_directory_metadata(path, include_pending)
391
392 def _get_directory_metadata(self, path: str, include_pending: bool) -> ObjectMetadata:
393 dir_prefix = path.rstrip("/") + "/"
394
395 has_committed = any(
396 k.startswith(dir_prefix) and (not include_pending or k not in self._pending_removes) for k in self._files
397 )
398 has_pending = include_pending and any(k.startswith(dir_prefix) for k in self._pending_adds)
399
400 if not has_committed and not has_pending:
401 raise FileNotFoundError(f"Object {path} does not exist.")
402
403 return ObjectMetadata(
404 key=dir_prefix,
405 type="directory",
406 content_length=0,
407 last_modified=AWARE_DATETIME_MIN,
408 )
409
[docs]
410 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]:
411 """
412 List objects in the manifest.
413
414 :param pattern: The pattern to filter objects by.
415 :param attribute_filter_expression: The attribute filter expression to filter objects by.
416 """
417
418 all_objects = [
419 object.key for object in self.list_objects("", attribute_filter_expression=attribute_filter_expression)
420 ]
421 return [key for key in glob(all_objects, pattern)]
422
[docs]
423 def realpath(self, logical_path: str) -> ResolvedPath:
424 """
425 Resolves a logical path to its physical storage path if the object exists.
426 Only checks committed files, not pending changes.
427
428 :param logical_path: The user-facing logical path
429
430 :return: ResolvedPath with exists=True if found, exists=False otherwise
431 """
432 # Only check committed files
433 manifest_obj = self._files.get(logical_path)
434 if manifest_obj:
435 assert manifest_obj.physical_path is not None
436 return ResolvedPath(physical_path=manifest_obj.physical_path, state=ResolvedPathState.EXISTS, profile=None)
437 return ResolvedPath(physical_path=logical_path, state=ResolvedPathState.UNTRACKED, profile=None)
438
[docs]
439 def generate_physical_path(self, logical_path: str, for_overwrite: bool = False) -> ResolvedPath:
440 """
441 Generates a physical storage path for a new object or for overwriting an existing object.
442
443 For now, this simply returns the logical path (no path rewriting).
444 In the future, this could generate unique paths for overwrites.
445
446 :param logical_path: The user-facing logical path
447 :param for_overwrite: When ``True``, generate a path for overwriting an existing object.
448
449 :return: The physical storage path to use for writing
450 """
451 # For now, physical path = logical path
452 # Future enhancement: generate unique paths for overwrites
453 # if for_overwrite and self._allow_overwrites:
454 # return f"{logical_path}-{uuid.uuid4().hex}"
455 return ResolvedPath(physical_path=logical_path, state=ResolvedPathState.UNTRACKED, profile=None)
456
[docs]
457 def add_file(self, path: str, metadata: ObjectMetadata) -> None:
458 if not self.is_writable():
459 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to add {path}.")
460
461 # Check if file already exists in committed state and overwrites are not allowed
462 # Pending files can always be overwritten
463 if not self._allow_overwrites and path in self._files:
464 raise FileExistsError(f"File {path} already exists and overwrites are not allowed.")
465
466 # Handle two cases:
467 # 1. If metadata is already a ManifestObjectMetadata, use it directly
468 # 2. Otherwise, create one assuming metadata.key contains the physical path
469 if isinstance(metadata, ManifestObjectMetadata):
470 # Already has proper logical/physical path separation
471 manifest_metadata = metadata
472 # Ensure the logical path matches what was requested
473 if manifest_metadata.key != path:
474 raise ValueError(f"Logical path mismatch: expected {path}, got {manifest_metadata.key}")
475 else:
476 # For backward compatibility, create a ManifestObjectMetadata from ObjectMetadata
477 manifest_metadata = ManifestObjectMetadata(
478 key=path, # Logical path (user-facing)
479 content_length=metadata.content_length,
480 last_modified=metadata.last_modified,
481 content_type=metadata.content_type,
482 etag=metadata.etag,
483 metadata=metadata.metadata,
484 type=metadata.type,
485 physical_path=metadata.key, # Physical path (from storage provider)
486 )
487
488 # TODO: Time travel is not supported - we do not rename file paths when overwriting.
489 # When a file is overwritten, the manifest points to the new version of the file at the same location
490 # without maintaining history of previous versions.
491 self._pending_adds[path] = manifest_metadata
492
[docs]
493 def remove_file(self, path: str) -> None:
494 if not self.is_writable():
495 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to remove {path}.")
496 if path not in self._files:
497 raise FileNotFoundError(f"Object {path} does not exist.")
498 self._pending_removes.add(path)
499
502
505
508
[docs]
509 def commit_updates(self) -> None:
510 if not self._pending_adds and not self._pending_removes:
511 return
512
513 if self._pending_adds:
514 self._files.update(self._pending_adds)
515 self._pending_adds = {}
516
517 for path in self._pending_removes:
518 self._files.pop(path)
519 self._pending_removes = set()
520
521 # Serialize ManifestObjectMetadata directly
522 # to_dict() will include all fields including physical_path
523 object_metadata = list(self._files.values())
524 self._write_manifest_files(self._storage_provider, object_metadata, manifest_format=self._format)