Source code for multistorageclient.providers.manifest_metadata
1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from __future__ import annotations # Enables forward references in type hints
17
18import json
19import logging
20import os
21from collections.abc import Iterator
22from dataclasses import asdict, dataclass
23from datetime import datetime, timezone
24from typing import Any, Optional, Union
25
26from ..types import (
27 AWARE_DATETIME_MIN,
28 MAX_SYMLINK_DEPTH,
29 MetadataProvider,
30 ObjectMetadata,
31 ResolvedPath,
32 ResolvedPathState,
33 StorageProvider,
34)
35from ..utils import create_attribute_filter_evaluator, glob, matches_attribute_filter_expression
36from .manifest_formats import ManifestFormat, get_format_handler
37from .manifest_object_metadata import ManifestObjectMetadata
38
39logger = logging.getLogger(__name__)
40
41
42DEFAULT_MANIFEST_BASE_DIR = ".msc_manifests"
43MANIFEST_INDEX_FILENAME = "msc_manifest_index.json"
44MANIFEST_PARTS_CHILD_DIR = "parts"
45MANIFEST_PART_PREFIX = "msc_manifest_part"
46SEQUENCE_PADDING = 6 # Define padding for the sequence number (e.g., 6 for "000001")
47
48
[docs]
49@dataclass
50class ManifestPartReference:
51 """
52 A data class representing a reference to dataset manifest part.
53 """
54
55 #: The path of the manifest part relative to the main manifest.
56 path: str
57
[docs]
58 @staticmethod
59 def from_dict(data: dict[str, Any]) -> ManifestPartReference:
60 """
61 Creates a ManifestPartReference instance from a dictionary.
62 """
63 # Validate that the required 'path' field is present
64 if "path" not in data:
65 raise ValueError("Missing required field: 'path'")
66
67 return ManifestPartReference(path=data["path"])
68
[docs]
69 def to_dict(self) -> dict:
70 """
71 Converts ManifestPartReference instance to a dictionary.
72 """
73 return {
74 "path": self.path,
75 }
76
77
[docs]
78@dataclass
79class Manifest:
80 """
81 A data class representing a dataset manifest.
82 """
83
84 #: Defines the version of the manifest schema.
85 version: str
86 #: References to manifest parts.
87 parts: list[ManifestPartReference]
88 #: Format of manifest parts (jsonl or parquet).
89 format: str = "jsonl"
90
[docs]
91 @staticmethod
92 def from_dict(data: dict) -> "Manifest":
93 """
94 Creates a Manifest instance from a dictionary (parsed from JSON).
95 """
96 try:
97 version = data["version"]
98 parts = [ManifestPartReference.from_dict(part) for part in data["parts"]]
99 format = data.get("format", "jsonl")
100 except KeyError as e:
101 raise ValueError("Invalid manifest data: Missing required field") from e
102
103 return Manifest(version=version, parts=parts, format=format)
104
[docs]
105 def to_json(self) -> str:
106 data = asdict(self)
107 data["parts"] = [part.to_dict() for part in self.parts]
108 return json.dumps(data)
109
110
[docs]
111class ManifestMetadataProvider(MetadataProvider):
112 _storage_provider: StorageProvider
113 _files: dict[str, ManifestObjectMetadata]
114 _pending_adds: dict[str, ManifestObjectMetadata]
115 _pending_removes: set[str]
116 _manifest_path: str
117 _writable: bool
118 _allow_overwrites: bool
119 _format: Union[ManifestFormat, str]
120
121 def __init__(
122 self,
123 storage_provider: StorageProvider,
124 manifest_path: str,
125 writable: bool = False,
126 allow_overwrites: bool = False,
127 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL,
128 ) -> None:
129 """
130 Creates a :py:class:`ManifestMetadataProvider`.
131
132 :param storage_provider: Storage provider.
133 :param manifest_path: Main manifest file path.
134 :param writable: If true, allows modifications and new manifests to be written.
135 :param allow_overwrites: If true, allows overwriting existing files without error.
136 :param manifest_format: Format for manifest parts. Defaults to ManifestFormat.JSONL.
137 """
138 self._storage_provider = storage_provider
139 self._files = {}
140 self._pending_adds = {}
141 self._pending_removes = set()
142 self._manifest_path = manifest_path
143 self._writable = writable
144 self._allow_overwrites = allow_overwrites
145 self._format = (
146 manifest_format if isinstance(manifest_format, ManifestFormat) else ManifestFormat(manifest_format)
147 )
148
149 self._load_manifest(storage_provider, self._manifest_path)
150
151 def _load_manifest(self, storage_provider: StorageProvider, manifest_path: str) -> None:
152 """
153 Loads manifest.
154
155 :param storage_provider: Storage provider.
156 :param manifest_path: Main manifest file path
157 """
158
159 def helper_find_manifest_file(manifest_path: str) -> str:
160 if storage_provider.is_file(manifest_path):
161 return manifest_path
162
163 if storage_provider.is_file(os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)):
164 return os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)
165
166 # Now go looking and select newest manifest.
167 if DEFAULT_MANIFEST_BASE_DIR not in manifest_path.split("/"):
168 manifest_path = os.path.join(manifest_path, DEFAULT_MANIFEST_BASE_DIR)
169
170 candidates = storage_provider.glob(os.path.join(manifest_path, "*", MANIFEST_INDEX_FILENAME))
171 candidates = sorted(candidates)
172 return candidates[-1] if candidates else ""
173
174 resolved_manifest_path = helper_find_manifest_file(manifest_path)
175 if not resolved_manifest_path:
176 logger.warning(f"No manifest found at '{manifest_path}'.")
177 return
178
179 file_content = storage_provider.get_object(resolved_manifest_path)
180
181 prefix = os.path.dirname(resolved_manifest_path)
182 _, file_extension = os.path.splitext(resolved_manifest_path)
183 self._load_manifest_file(storage_provider, file_content, prefix, file_extension[1:])
184
185 def _load_manifest_file(
186 self, storage_provider: StorageProvider, file_content: bytes, manifest_base: str, file_type: str
187 ) -> None:
188 """
189 Loads a manifest.
190
191 :param storage_provider: Storage provider.
192 :param file_content: Manifest file content bytes.
193 :param manifest_base: Manifest file base path.
194 :param file_type: Manifest file type.
195 """
196 if file_type == "json":
197 manifest_dict = json.loads(file_content.decode("utf-8"))
198 manifest = Manifest.from_dict(manifest_dict)
199
200 # Check manifest version. Not needed once we make the manifest model use sum types/discriminated unions.
201 if manifest.version != "1":
202 raise ValueError(f"Manifest version {manifest.version} is not supported.")
203
204 # Load manifest parts.
205 for manifest_part_reference in manifest.parts:
206 object_metadata_list: list[ManifestObjectMetadata] = self._load_manifest_part_file(
207 storage_provider=storage_provider,
208 manifest_base=manifest_base,
209 manifest_part_reference=manifest_part_reference,
210 manifest_format=manifest.format,
211 )
212
213 for object_metadatum in object_metadata_list:
214 self._files[object_metadatum.key] = object_metadatum
215 else:
216 raise NotImplementedError(f"Manifest file type {file_type} is not supported.")
217
218 def _load_manifest_part_file(
219 self,
220 storage_provider: StorageProvider,
221 manifest_base: str,
222 manifest_part_reference: ManifestPartReference,
223 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL,
224 ) -> list[ManifestObjectMetadata]:
225 """
226 Loads a manifest part and converts to ManifestObjectMetadata.
227
228 :param storage_provider: Storage provider.
229 :param manifest_base: Manifest file base path. Prepend to manifest part reference paths.
230 :param manifest_part_reference: Manifest part reference.
231 :param manifest_format: Format of the manifest part (jsonl or parquet).
232 """
233 if not os.path.isabs(manifest_part_reference.path):
234 remote_path = os.path.join(manifest_base, manifest_part_reference.path)
235 else:
236 remote_path = manifest_part_reference.path
237 manifest_part_file_content = storage_provider.get_object(remote_path)
238
239 _, ext = os.path.splitext(remote_path)
240 detected_format = ext[1:] if ext else manifest_format
241
242 format_handler = get_format_handler(detected_format)
243 object_metadata_list = format_handler.read_part(manifest_part_file_content)
244
245 # Convert ObjectMetadata to ManifestObjectMetadata
246 # The format handler returns ObjectMetadata, but may have extra attributes set (like physical_path)
247 return [ManifestObjectMetadata.from_object_metadata(obj) for obj in object_metadata_list]
248
249 def _write_manifest_files(
250 self,
251 storage_provider: StorageProvider,
252 object_metadata: list[ManifestObjectMetadata],
253 manifest_format: Union[ManifestFormat, str] = ManifestFormat.JSONL,
254 ) -> None:
255 """
256 Writes the main manifest and its part files.
257
258 Accepts ManifestObjectMetadata which extends ObjectMetadata, so format handlers
259 (which expect ObjectMetadata) can serialize it, preserving all fields including physical_path.
260
261 :param storage_provider: The storage provider to use for writing.
262 :param object_metadata: ManifestObjectMetadata objects to include in manifest.
263 :param manifest_format: Format for manifest parts. Defaults to ManifestFormat.JSONL.
264 """
265 if not object_metadata:
266 return
267
268 base_path = self._manifest_path
269 manifest_base_path = base_path
270
271 base_path_parts = base_path.split(os.sep)
272 if DEFAULT_MANIFEST_BASE_DIR in base_path_parts:
273 manifests_index = base_path_parts.index(DEFAULT_MANIFEST_BASE_DIR)
274 if manifests_index > 0:
275 manifest_base_path = os.path.join(*base_path_parts[:manifests_index])
276 else:
277 manifest_base_path = ""
278 if base_path.startswith(os.sep):
279 manifest_base_path = os.sep + manifest_base_path
280
281 current_time = datetime.now(timezone.utc)
282 current_time_str = current_time.isoformat(timespec="seconds")
283 manifest_folderpath = os.path.join(manifest_base_path, DEFAULT_MANIFEST_BASE_DIR, current_time_str)
284
285 format_handler = get_format_handler(manifest_format)
286 suffix = format_handler.get_file_suffix()
287
288 # We currently write only one part by default.
289 part_sequence_number = 1
290 manifest_part_file_path = os.path.join(
291 MANIFEST_PARTS_CHILD_DIR,
292 f"{MANIFEST_PART_PREFIX}{part_sequence_number:0{SEQUENCE_PADDING}}{suffix}",
293 )
294
295 format_value = manifest_format.value if isinstance(manifest_format, ManifestFormat) else manifest_format
296 manifest = Manifest(
297 version="1", parts=[ManifestPartReference(path=manifest_part_file_path)], format=format_value
298 )
299
300 manifest_part_content = format_handler.write_part(list(object_metadata))
301 storage_provider.put_object(os.path.join(manifest_folderpath, manifest_part_file_path), manifest_part_content)
302
303 manifest_file_path = os.path.join(manifest_folderpath, MANIFEST_INDEX_FILENAME)
304 manifest_content = manifest.to_json()
305 storage_provider.put_object(manifest_file_path, manifest_content.encode("utf-8"))
306
[docs]
307 def list_objects(
308 self,
309 path: str,
310 start_after: Optional[str] = None,
311 end_at: Optional[str] = None,
312 include_directories: bool = False,
313 attribute_filter_expression: Optional[str] = None,
314 show_attributes: bool = False,
315 ) -> Iterator[ObjectMetadata]:
316 """
317 List objects in the manifest.
318
319 :param path: The path to filter objects by.
320 :param start_after: The object to start after.
321 :param end_at: The object to end at.
322 :param include_directories: Whether to include directories.
323 :param attribute_filter_expression: The attribute filter expression to filter objects by.
324 :param show_attributes: This field is not used in this implementation - It will always return attributes. This is present merely to satisfy the interface.
325 """
326
327 if (start_after is not None) and (end_at is not None) and not (start_after < end_at):
328 raise ValueError(f"start_after ({start_after}) must be before end_at ({end_at})!")
329
330 if path and not path.endswith("/"):
331 path = path + "/"
332
333 # create evaluator for attribute filter expression if present
334 evaluator = (
335 create_attribute_filter_evaluator(attribute_filter_expression) if attribute_filter_expression else None
336 )
337
338 # Note that this is a generator, not a tuple (there's no tuple comprehension).
339 keys = (
340 key
341 for key, obj_metadata in self._files.items()
342 if key.startswith(path)
343 and (start_after is None or start_after < key)
344 and (end_at is None or key <= end_at)
345 and (
346 evaluator is None or matches_attribute_filter_expression(obj_metadata, evaluator)
347 ) # filter by evaluator if present
348 )
349
350 pending_directory: Optional[ObjectMetadata] = None
351 for key in sorted(keys):
352 if include_directories:
353 relative = key[len(path) :].lstrip("/")
354 subdirectory = relative.split("/", 1)[0] if "/" in relative else None
355
356 if subdirectory:
357 directory_name = f"{path}{subdirectory}/"
358
359 if pending_directory and pending_directory.key != directory_name:
360 yield pending_directory
361
362 obj_metadata = self.get_object_metadata(key)
363 if not pending_directory or pending_directory.key != directory_name:
364 pending_directory = ObjectMetadata(
365 key=directory_name,
366 type="directory",
367 last_modified=obj_metadata.last_modified,
368 content_length=0,
369 )
370 else:
371 pending_directory.last_modified = max(
372 pending_directory.last_modified, obj_metadata.last_modified
373 )
374 continue # Skip yielding this key as it's part of a directory
375
376 yield self._files[key]
377
378 if include_directories and pending_directory:
379 yield pending_directory
380
[docs]
381 def get_object_metadata(self, path: str, include_pending: bool = False) -> ObjectMetadata:
382 if path in self._files:
383 if include_pending and path in self._pending_removes:
384 raise FileNotFoundError(f"Object {path} does not exist.")
385 else:
386 # Return ManifestObjectMetadata directly (it extends ObjectMetadata)
387 return self._files[path]
388 elif include_pending and path in self._pending_adds:
389 return self._pending_adds[path]
390 else:
391 return self._get_directory_metadata(path, include_pending)
392
393 def _get_directory_metadata(self, path: str, include_pending: bool) -> ObjectMetadata:
394 dir_prefix = path.rstrip("/") + "/"
395
396 has_committed = any(
397 k.startswith(dir_prefix) and (not include_pending or k not in self._pending_removes) for k in self._files
398 )
399 has_pending = include_pending and any(k.startswith(dir_prefix) for k in self._pending_adds)
400
401 if not has_committed and not has_pending:
402 raise FileNotFoundError(f"Object {path} does not exist.")
403
404 return ObjectMetadata(
405 key=dir_prefix,
406 type="directory",
407 content_length=0,
408 last_modified=AWARE_DATETIME_MIN,
409 )
410
[docs]
411 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]:
412 """
413 List objects in the manifest.
414
415 :param pattern: The pattern to filter objects by.
416 :param attribute_filter_expression: The attribute filter expression to filter objects by.
417 """
418
419 all_objects = [
420 object.key for object in self.list_objects("", attribute_filter_expression=attribute_filter_expression)
421 ]
422 return [key for key in glob(all_objects, pattern)]
423
[docs]
424 def realpath(self, logical_path: str) -> ResolvedPath:
425 """
426 Resolves a logical path to its physical storage path if the object exists.
427 Only checks committed files, not pending changes.
428 Follows ``symlink_target`` chains in the manifest with depth limit and cycle detection.
429
430 :param logical_path: The user-facing logical path
431
432 :return: ResolvedPath with exists=True if found, exists=False otherwise
433 """
434 visited: set[str] = set()
435 current = logical_path
436 for _ in range(MAX_SYMLINK_DEPTH):
437 manifest_obj = self._files.get(current)
438 if not manifest_obj:
439 return ResolvedPath(physical_path=current, state=ResolvedPathState.UNTRACKED, profile=None)
440 if not manifest_obj.symlink_target:
441 assert manifest_obj.physical_path is not None
442 return ResolvedPath(
443 physical_path=manifest_obj.physical_path, state=ResolvedPathState.EXISTS, profile=None
444 )
445 if current in visited:
446 raise ValueError(f"Symlink cycle detected at: {current}")
447 visited.add(current)
448 current = ObjectMetadata.resolve_symlink_target(current, manifest_obj.symlink_target)
449 raise ValueError(f"Too many levels of symlinks (>{MAX_SYMLINK_DEPTH}): {logical_path}")
450
[docs]
451 def generate_physical_path(self, logical_path: str, for_overwrite: bool = False) -> ResolvedPath:
452 """
453 Generates a physical storage path for a new object or for overwriting an existing object.
454
455 For now, this simply returns the logical path (no path rewriting).
456 In the future, this could generate unique paths for overwrites.
457
458 :param logical_path: The user-facing logical path
459 :param for_overwrite: When ``True``, generate a path for overwriting an existing object.
460
461 :return: The physical storage path to use for writing
462 """
463 # For now, physical path = logical path
464 # Future enhancement: generate unique paths for overwrites
465 # if for_overwrite and self._allow_overwrites:
466 # return f"{logical_path}-{uuid.uuid4().hex}"
467 return ResolvedPath(physical_path=logical_path, state=ResolvedPathState.UNTRACKED, profile=None)
468
[docs]
469 def add_file(self, path: str, metadata: ObjectMetadata) -> None:
470 if not self.is_writable():
471 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to add {path}.")
472
473 # Check if file already exists in committed state and overwrites are not allowed
474 # Pending files can always be overwritten
475 if not self._allow_overwrites and path in self._files:
476 raise FileExistsError(f"File {path} already exists and overwrites are not allowed.")
477
478 # Handle two cases:
479 # 1. If metadata is already a ManifestObjectMetadata, use it directly
480 # 2. Otherwise, create one assuming metadata.key contains the physical path
481 if isinstance(metadata, ManifestObjectMetadata):
482 # Already has proper logical/physical path separation
483 manifest_metadata = metadata
484 # Ensure the logical path matches what was requested
485 if manifest_metadata.key != path:
486 raise ValueError(f"Logical path mismatch: expected {path}, got {manifest_metadata.key}")
487 else:
488 # For backward compatibility, create a ManifestObjectMetadata from ObjectMetadata
489 manifest_metadata = ManifestObjectMetadata(
490 key=path, # Logical path (user-facing)
491 content_length=metadata.content_length,
492 last_modified=metadata.last_modified,
493 content_type=metadata.content_type,
494 etag=metadata.etag,
495 metadata=metadata.metadata,
496 type=metadata.type,
497 physical_path=metadata.key, # Physical path (from storage provider)
498 )
499
500 # TODO: Time travel is not supported - we do not rename file paths when overwriting.
501 # When a file is overwritten, the manifest points to the new version of the file at the same location
502 # without maintaining history of previous versions.
503 self._pending_adds[path] = manifest_metadata
504
[docs]
505 def remove_file(self, path: str) -> None:
506 if not self.is_writable():
507 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to remove {path}.")
508 if path not in self._files:
509 raise FileNotFoundError(f"Object {path} does not exist.")
510 self._pending_removes.add(path)
511
514
517
520
[docs]
521 def commit_updates(self) -> None:
522 if not self._pending_adds and not self._pending_removes:
523 return
524
525 if self._pending_adds:
526 self._files.update(self._pending_adds)
527 self._pending_adds = {}
528
529 for path in self._pending_removes:
530 self._files.pop(path)
531 self._pending_removes = set()
532
533 # Serialize ManifestObjectMetadata directly
534 # to_dict() will include all fields including physical_path
535 object_metadata = list(self._files.values())
536 self._write_manifest_files(self._storage_provider, object_metadata, manifest_format=self._format)