Source code for multistorageclient.providers.manifest_metadata
1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from __future__ import annotations # Enables forward references in type hints
17
18import io
19import json
20import logging
21import os
22from collections.abc import Iterator
23from dataclasses import asdict, dataclass
24from datetime import datetime, timezone
25from typing import Any, Optional
26
27from ..types import MetadataProvider, ObjectMetadata, StorageProvider
28from ..utils import glob
29
30logger = logging.getLogger(__name__)
31
32
33DEFAULT_MANIFEST_BASE_DIR = ".msc_manifests"
34MANIFEST_INDEX_FILENAME = "msc_manifest_index.json"
35MANIFEST_PARTS_CHILD_DIR = "parts"
36MANIFEST_PART_PREFIX = "msc_manifest_part"
37MANIFEST_PART_SUFFIX = ".jsonl" # Suffix for the manifest part files
38SEQUENCE_PADDING = 6 # Define padding for the sequence number (e.g., 6 for "000001")
39
40
[docs]
41@dataclass
42class ManifestPartReference:
43 """
44 A data class representing a reference to dataset manifest part.
45 """
46
47 #: The path of the manifest part relative to the main manifest.
48 path: str
49
[docs]
50 @staticmethod
51 def from_dict(data: dict[str, Any]) -> ManifestPartReference:
52 """
53 Creates a ManifestPartReference instance from a dictionary.
54 """
55 # Validate that the required 'path' field is present
56 if "path" not in data:
57 raise ValueError("Missing required field: 'path'")
58
59 return ManifestPartReference(path=data["path"])
60
[docs]
61 def to_dict(self) -> dict:
62 """
63 Converts ManifestPartReference instance to a dictionary.
64 """
65 return {
66 "path": self.path,
67 }
68
69
[docs]
70@dataclass
71class Manifest:
72 """
73 A data class representing a dataset manifest.
74 """
75
76 #: Defines the version of the manifest schema.
77 version: str
78 #: References to manifest parts.
79 parts: list[ManifestPartReference]
80
[docs]
81 @staticmethod
82 def from_dict(data: dict) -> "Manifest":
83 """
84 Creates a Manifest instance from a dictionary (parsed from JSON).
85 """
86 # Perform any necessary validation here
87 try:
88 version = data["version"]
89 parts = [ManifestPartReference.from_dict(part) for part in data["parts"]]
90 except KeyError as e:
91 raise ValueError("Invalid manifest data: Missing required field") from e
92
93 return Manifest(version=version, parts=parts)
94
[docs]
95 def to_json(self) -> str:
96 # Convert dataclass to dict and parts to JSON-compatible format
97 data = asdict(self)
98 data["parts"] = [part.to_dict() for part in self.parts]
99 return json.dumps(data)
100
101
102def _metadata_to_manifest_dict(metadata: ObjectMetadata) -> dict:
103 """
104 Convert an ObjectMetadata instance to a dictionary suitable with manifest format,
105 replacing 'content_length' with 'size_bytes' and removing 'content_length'.
106 """
107 metadata_dict = metadata.to_dict()
108 # Pop out content_length, store it in size_bytes
109 size_bytes = metadata_dict.pop("content_length", None)
110 metadata_dict["size_bytes"] = size_bytes
111 return metadata_dict
112
113
[docs]
114class ManifestMetadataProvider(MetadataProvider):
115 _storage_provider: StorageProvider
116 _files: dict[str, ObjectMetadata]
117 _pending_adds: dict[str, ObjectMetadata]
118 _pending_removes: set[str]
119 _manifest_path: str
120 _writable: bool
121
122 def __init__(self, storage_provider: StorageProvider, manifest_path: str, writable: bool = False) -> None:
123 """
124 Creates a :py:class:`ManifestMetadataProvider`.
125
126 :param storage_provider: Storage provider.
127 :param manifest_path: Main manifest file path.
128 :param writable: If true, allows modifications and new manifests to be written.
129 """
130 self._storage_provider = storage_provider
131 self._files = {}
132 self._pending_adds = {}
133 self._pending_removes = set()
134 self._manifest_path = manifest_path
135 self._writable = writable
136
137 self._load_manifest(storage_provider, self._manifest_path)
138
139 def _load_manifest(self, storage_provider: StorageProvider, manifest_path: str) -> None:
140 """
141 Loads manifest.
142
143 :param storage_provider: Storage provider.
144 :param manifest_path: Main manifest file path
145 """
146
147 def helper_find_manifest_file(manifest_path: str) -> str:
148 if storage_provider.is_file(manifest_path):
149 return manifest_path
150
151 if storage_provider.is_file(os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)):
152 return os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)
153
154 # Now go looking and select newest manifest.
155 if DEFAULT_MANIFEST_BASE_DIR not in manifest_path.split("/"):
156 manifest_path = os.path.join(manifest_path, DEFAULT_MANIFEST_BASE_DIR)
157
158 candidates = storage_provider.glob(os.path.join(manifest_path, "*", MANIFEST_INDEX_FILENAME))
159 candidates = sorted(candidates)
160 return candidates[-1] if candidates else ""
161
162 resolved_manifest_path = helper_find_manifest_file(manifest_path)
163 if not resolved_manifest_path:
164 logger.warning(f"No manifest found at '{manifest_path}'.")
165 return
166
167 file_content = storage_provider.get_object(resolved_manifest_path)
168
169 prefix = os.path.dirname(resolved_manifest_path)
170 _, file_extension = os.path.splitext(resolved_manifest_path)
171 self._load_manifest_file(storage_provider, file_content, prefix, file_extension[1:])
172
173 def _load_manifest_file(
174 self, storage_provider: StorageProvider, file_content: bytes, manifest_base: str, file_type: str
175 ) -> None:
176 """
177 Loads a manifest.
178
179 :param storage_provider: Storage provider.
180 :param file_content: Manifest file content bytes.
181 :param manifest_base: Manifest file base path.
182 :param file_type: Manifest file type.
183 """
184 if file_type == "json":
185 manifest_dict = json.loads(file_content.decode("utf-8"))
186 manifest = Manifest.from_dict(manifest_dict)
187
188 # Check manifest version. Not needed once we make the manifest model use sum types/discriminated unions.
189 if manifest.version != "1":
190 raise ValueError(f"Manifest version {manifest.version} is not supported.")
191
192 # Load manifest parts.
193 for manifest_part_reference in manifest.parts:
194 object_metadata: list[ObjectMetadata] = self._load_manifest_part_file(
195 storage_provider=storage_provider,
196 manifest_base=manifest_base,
197 manifest_part_reference=manifest_part_reference,
198 )
199
200 for object_metadatum in object_metadata:
201 self._files[object_metadatum.key] = object_metadatum
202 else:
203 raise NotImplementedError(f"Manifest file type {file_type} is not supported.")
204
205 def _load_manifest_part_file(
206 self, storage_provider: StorageProvider, manifest_base: str, manifest_part_reference: ManifestPartReference
207 ) -> list[ObjectMetadata]:
208 """
209 Loads a manifest part.
210
211 :param storage_provider: Storage provider.
212 :param manifest_base: Manifest file base path. Prepend to manifest part reference paths.
213 :param manifest_part_reference: Manifest part reference.
214 """
215 object_metadata = []
216
217 if not os.path.isabs(manifest_part_reference.path):
218 remote_path = os.path.join(manifest_base, manifest_part_reference.path)
219 else:
220 remote_path = manifest_part_reference.path
221 manifest_part_file_content = storage_provider.get_object(remote_path)
222
223 # The manifest part is a JSON lines file. Each line is a JSON-serialized ObjectMetadata.
224 for line in io.TextIOWrapper(io.BytesIO(manifest_part_file_content), encoding="utf-8"):
225 object_metadatum_dict = json.loads(line)
226 object_metadatum_dict["content_length"] = object_metadatum_dict.pop("size_bytes")
227 object_metadatum = ObjectMetadata.from_dict(object_metadatum_dict)
228 object_metadata.append(object_metadatum)
229
230 return object_metadata
231
232 def _write_manifest_files(self, storage_provider: StorageProvider, object_metadata: list[ObjectMetadata]) -> None:
233 """
234 Writes the main manifest and its part files.
235
236 :param storage_provider: The storage provider to use for writing.
237 :param object_metadata: objects to include in manifest.
238 """
239 if not object_metadata:
240 return
241
242 base_path = self._manifest_path
243 manifest_base_path = base_path
244
245 base_path_parts = base_path.split(os.sep)
246 if DEFAULT_MANIFEST_BASE_DIR in base_path_parts:
247 manifests_index = base_path_parts.index(DEFAULT_MANIFEST_BASE_DIR)
248 if manifests_index > 0:
249 manifest_base_path = os.path.join(*base_path_parts[:manifests_index])
250 else:
251 manifest_base_path = ""
252 if base_path.startswith(os.sep):
253 manifest_base_path = os.sep + manifest_base_path
254
255 current_time = datetime.now(timezone.utc)
256 current_time_str = current_time.isoformat(timespec="seconds")
257 manifest_folderpath = os.path.join(manifest_base_path, DEFAULT_MANIFEST_BASE_DIR, current_time_str)
258 # We currently write only one part by default
259 part_sequence_number = 1
260 manifest_part_file_path = os.path.join(
261 MANIFEST_PARTS_CHILD_DIR,
262 f"{MANIFEST_PART_PREFIX}{part_sequence_number:0{SEQUENCE_PADDING}}{MANIFEST_PART_SUFFIX}",
263 )
264
265 manifest = Manifest(version="1", parts=[ManifestPartReference(path=manifest_part_file_path)])
266
267 # Write single manifest part with metadata as JSON lines (each object on a new line)
268 manifest_part_content = "\n".join(
269 [json.dumps(_metadata_to_manifest_dict(metadata)) for metadata in object_metadata]
270 )
271 storage_provider.put_object(
272 os.path.join(manifest_folderpath, manifest_part_file_path), manifest_part_content.encode("utf-8")
273 )
274
275 # Write the main manifest file
276 manifest_file_path = os.path.join(manifest_folderpath, MANIFEST_INDEX_FILENAME)
277 manifest_content = manifest.to_json()
278 storage_provider.put_object(manifest_file_path, manifest_content.encode("utf-8"))
279
[docs]
280 def list_objects(
281 self,
282 prefix: str,
283 start_after: Optional[str] = None,
284 end_at: Optional[str] = None,
285 include_directories: bool = False,
286 attribute_filter_expression: Optional[str] = None,
287 ) -> Iterator[ObjectMetadata]:
288 if attribute_filter_expression:
289 raise NotImplementedError("Attribute filter expressions are not supported for manifest metadata provider.")
290
291 if (start_after is not None) and (end_at is not None) and not (start_after < end_at):
292 raise ValueError(f"start_after ({start_after}) must be before end_at ({end_at})!")
293
294 if prefix and not prefix.endswith("/"):
295 prefix = prefix + "/"
296
297 # Note that this is a generator, not a tuple (there's no tuple comprehension).
298 keys = (
299 key
300 for key in self._files
301 if key.startswith(prefix)
302 and (start_after is None or start_after < key)
303 and (end_at is None or key <= end_at)
304 )
305
306 pending_directory: Optional[ObjectMetadata] = None
307 for key in sorted(keys):
308 if include_directories:
309 relative = key[len(prefix) :].lstrip("/")
310 subdirectory = relative.split("/", 1)[0] if "/" in relative else None
311
312 if subdirectory:
313 directory_name = f"{prefix}{subdirectory}/"
314
315 if pending_directory and pending_directory.key != directory_name:
316 yield pending_directory
317
318 obj_metadata = self.get_object_metadata(key)
319 if not pending_directory or pending_directory.key != directory_name:
320 pending_directory = ObjectMetadata(
321 key=directory_name,
322 type="directory",
323 last_modified=obj_metadata.last_modified,
324 content_length=0,
325 )
326 else:
327 pending_directory.last_modified = max(
328 pending_directory.last_modified, obj_metadata.last_modified
329 )
330 continue # Skip yielding this key as it's part of a directory
331
332 obj = self._files[key]
333 obj.key = key # use key without base_path
334 yield obj
335
336 if include_directories and pending_directory:
337 yield pending_directory
338
[docs]
339 def get_object_metadata(self, path: str, include_pending: bool = False) -> ObjectMetadata:
340 if path in self._files:
341 if include_pending and path in self._pending_removes:
342 raise FileNotFoundError(f"Object {path} does not exist.")
343 else:
344 return self._files[path]
345 elif include_pending and path in self._pending_adds:
346 return self._pending_adds[path]
347 else:
348 raise FileNotFoundError(f"Object {path} does not exist.")
349
[docs]
350 def glob(self, pattern: str, attribute_filter_expression: Optional[str] = None) -> list[str]:
351 if attribute_filter_expression:
352 raise NotImplementedError("Attribute filter expressions are not supported for manifest metadata provider.")
353
354 all_objects = [object.key for object in self.list_objects("")]
355 return [key for key in glob(all_objects, pattern)]
356
[docs]
357 def realpath(self, path: str) -> tuple[str, bool]:
358 exists = path in self._files
359 return path, exists
360
[docs]
361 def add_file(self, path: str, metadata: ObjectMetadata) -> None:
362 if not self.is_writable():
363 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to add {path}.")
364 self._pending_adds[path] = metadata
365
[docs]
366 def remove_file(self, path: str) -> None:
367 if not self.is_writable():
368 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to remove {path}.")
369 if path not in self._files:
370 raise FileNotFoundError(f"Object {path} does not exist.")
371 self._pending_removes.add(path)
372
375
[docs]
376 def commit_updates(self) -> None:
377 if not self._pending_adds and not self._pending_removes:
378 return
379
380 if self._pending_adds:
381 self._files.update(self._pending_adds)
382 self._pending_adds = {}
383
384 for path in self._pending_removes:
385 self._files.pop(path)
386 self._pending_removes = set()
387
388 # Collect metadata for each object to write out in this part file.
389 object_metadata = [
390 ObjectMetadata(key=file_path, content_length=metadata.content_length, last_modified=metadata.last_modified)
391 for file_path, metadata in self._files.items()
392 ]
393 self._write_manifest_files(self._storage_provider, object_metadata)