Source code for multistorageclient.providers.manifest_metadata
1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from __future__ import annotations # Enables forward references in type hints
17
18import io
19import json
20import os
21from dataclasses import dataclass, asdict
22from datetime import datetime, timezone
23from typing import Any, Dict, Iterator, List, Optional, Tuple
24
25from ..types import MetadataProvider, ObjectMetadata, StorageProvider
26from ..utils import glob
27
28DEFAULT_MANIFEST_BASE_DIR = ".msc_manifests"
29MANIFEST_INDEX_FILENAME = "msc_manifest_index.json"
30MANIFEST_PART_PREFIX = "msc_manifest_part"
31MANIFEST_PART_SUFFIX = ".jsonl" # Suffix for the manifest part files
32SEQUENCE_PADDING = 6 # Define padding for the sequence number (e.g., 6 for "000001")
33
34
35@dataclass
36class ManifestPartReference:
37 """
38 A data class representing a reference to dataset manifest part.
39
40 Attributes:
41 path (str): The path of the manifest part relative to the main manifest.
42 """
43
44 path: str
45
46 @staticmethod
47 def from_dict(data: Dict[str, Any]) -> ManifestPartReference:
48 """
49 Creates a ManifestPartReference instance from a dictionary.
50 """
51 # Validate that the required 'path' field is present
52 if "path" not in data:
53 raise ValueError("Missing required field: 'path'")
54
55 return ManifestPartReference(path=data["path"])
56
57 def to_dict(self) -> dict:
58 """
59 Converts ManifestPartReference instance to a dictionary.
60 """
61 return {
62 "path": self.path,
63 }
64
65
66@dataclass
67class Manifest:
68 """
69 A data class representing a dataset manifest.
70
71 Attributes:
72 :version (str): Defines the version of the manifest schema.
73 :parts (List[ManifestPartReference]): References to manifest parts.
74 """
75
76 version: str
77 parts: List[ManifestPartReference]
78
79 @staticmethod
80 def from_dict(data: dict) -> "Manifest":
81 """
82 Creates a Manifest instance from a dictionary (parsed from JSON).
83 """
84 # Perform any necessary validation here
85 try:
86 version = data["version"]
87 parts = [ManifestPartReference.from_dict(part) for part in data["parts"]]
88 except KeyError as e:
89 raise ValueError("Invalid manifest data: Missing required field") from e
90
91 return Manifest(version=version, parts=parts)
92
93 def to_json(self) -> str:
94 # Convert dataclass to dict and parts to JSON-compatible format
95 data = asdict(self)
96 data["parts"] = [part.to_dict() for part in self.parts]
97 return json.dumps(data)
98
99
100def _metadata_to_manifest_dict(metadata: ObjectMetadata) -> dict:
101 """
102 Convert an ObjectMetadata instance to a dictionary suitable with manifest format,
103 replacing 'content_length' with 'size_bytes' and removing 'content_length'.
104 """
105 metadata_dict = metadata.to_dict()
106 # Pop out content_length, store it in size_bytes
107 size_bytes = metadata_dict.pop("content_length", None)
108 metadata_dict["size_bytes"] = size_bytes
109 return metadata_dict
110
111
[docs]
112class ManifestMetadataProvider(MetadataProvider):
113 _storage_provider: StorageProvider
114 _files: Dict[str, ObjectMetadata]
115 _pending_adds: Dict[str, ObjectMetadata]
116 _pending_removes: list[str]
117 _manifest_path: str
118 _writable: bool
119
120 def __init__(self, storage_provider: StorageProvider, manifest_path: str, writable: bool = False) -> None:
121 """
122 Creates a :py:class:`ManifestMetadataProvider`.
123
124 :param storage_provider: Storage provider.
125 :param manifest_path: Main manifest file path.
126 :param writable: If true, allows modifications and new manifests to be written.
127 """
128 self._storage_provider = storage_provider
129 self._files = {}
130 self._pending_adds = {}
131 self._pending_removes = []
132 self._manifest_path = manifest_path
133 self._writable = writable
134
135 self._load_manifest(storage_provider, self._manifest_path)
136
137 def _load_manifest(self, storage_provider: StorageProvider, manifest_path: str) -> None:
138 """
139 Loads manifest.
140
141 :param storage_provider: Storage provider.
142 :param manifest_path: Main manifest file path
143 """
144
145 def helper_find_manifest_file(manifest_path: str) -> str:
146 if storage_provider.is_file(manifest_path):
147 return manifest_path
148
149 if storage_provider.is_file(os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)):
150 return os.path.join(manifest_path, MANIFEST_INDEX_FILENAME)
151
152 # Now go looking and select newest manifest.
153 if DEFAULT_MANIFEST_BASE_DIR not in manifest_path.split("/"):
154 manifest_path = os.path.join(manifest_path, DEFAULT_MANIFEST_BASE_DIR)
155
156 candidates = storage_provider.glob(os.path.join(manifest_path, "*", MANIFEST_INDEX_FILENAME))
157 candidates = sorted(candidates)
158 return candidates[-1] if candidates else ""
159
160 manifest_path = helper_find_manifest_file(manifest_path)
161 if not manifest_path:
162 return
163
164 file_content = storage_provider.get_object(manifest_path)
165
166 prefix = os.path.dirname(manifest_path)
167 _, file_extension = os.path.splitext(manifest_path)
168 self._load_manifest_file(storage_provider, file_content, prefix, file_extension[1:])
169
170 def _load_manifest_file(
171 self, storage_provider: StorageProvider, file_content: bytes, manifest_base: str, file_type: str
172 ) -> None:
173 """
174 Loads a manifest.
175
176 :param storage_provider: Storage provider.
177 :param file_content: Manifest file content bytes.
178 :param manifest_base: Manifest file base path.
179 :param file_type: Manifest file type.
180 """
181 if file_type == "json":
182 manifest_dict = json.loads(file_content.decode("utf-8"))
183 manifest = Manifest.from_dict(manifest_dict)
184
185 # Check manifest version. Not needed once we make the manifest model use sum types/discriminated unions.
186 if manifest.version != "1":
187 raise ValueError(f"Manifest version {manifest.version} is not supported.")
188
189 # Load manifest parts.
190 for manifest_part_reference in manifest.parts:
191 object_metadata: List[ObjectMetadata] = self._load_manifest_part_file(
192 storage_provider=storage_provider,
193 manifest_base=manifest_base,
194 manifest_part_reference=manifest_part_reference,
195 )
196
197 for object_metadatum in object_metadata:
198 self._files[object_metadatum.key] = object_metadatum
199 else:
200 raise NotImplementedError(f"Manifest file type {file_type} is not supported.")
201
202 def _load_manifest_part_file(
203 self, storage_provider: StorageProvider, manifest_base: str, manifest_part_reference: ManifestPartReference
204 ) -> List[ObjectMetadata]:
205 """
206 Loads a manifest part.
207
208 :param storage_provider: Storage provider.
209 :param manifest_base: Manifest file base path. Prepend to manifest part reference paths.
210 :param manifest_part_reference: Manifest part reference.
211 """
212 object_metadata = []
213
214 if not os.path.isabs(manifest_part_reference.path):
215 remote_path = os.path.join(manifest_base, manifest_part_reference.path)
216 else:
217 remote_path = manifest_part_reference.path
218 manifest_part_file_content = storage_provider.get_object(remote_path)
219
220 # The manifest part is a JSON lines file. Each line is a JSON-serialized ObjectMetadata.
221 for line in io.TextIOWrapper(io.BytesIO(manifest_part_file_content), encoding="utf-8"):
222 object_metadatum_dict = json.loads(line)
223 object_metadatum_dict["content_length"] = object_metadatum_dict.pop("size_bytes")
224 object_metadatum = ObjectMetadata.from_dict(object_metadatum_dict)
225 object_metadata.append(object_metadatum)
226
227 return object_metadata
228
229 def _write_manifest_files(self, storage_provider: StorageProvider, object_metadata: List[ObjectMetadata]) -> None:
230 """
231 Writes the main manifest and its part files.
232
233 Args:
234 storage_provider (StorageProvider): The storage provider to use for writing.
235 object_metadata (List[ObjectMetadata]): objects to include in manifest.
236 """
237
238 def helper_write_file_to_storage(storage_provider: StorageProvider, path: str, content: str) -> None:
239 # Convert content to bytes and write it to the storage provider
240 storage_provider.put_object(path, content.encode("utf-8"))
241
242 base_path = self._manifest_path
243 manifest_base_path = base_path
244
245 base_path_parts = base_path.split(os.sep)
246 if DEFAULT_MANIFEST_BASE_DIR in base_path_parts:
247 manifests_index = base_path_parts.index(DEFAULT_MANIFEST_BASE_DIR)
248 if manifests_index > 0:
249 manifest_base_path = os.path.join(*base_path_parts[:manifests_index])
250 else:
251 manifest_base_path = ""
252 if base_path.startswith(os.sep):
253 manifest_base_path = os.sep + manifest_base_path
254
255 current_time = datetime.now(timezone.utc)
256 current_time_str = current_time.isoformat(timespec="seconds")
257 manifest_folderpath = os.path.join(manifest_base_path, DEFAULT_MANIFEST_BASE_DIR, current_time_str)
258 # We currently write only one part by default
259 part_sequence_number = 1
260 manifest_part_file_path = os.path.join(
261 "parts", f"{MANIFEST_PART_PREFIX}{part_sequence_number:0{SEQUENCE_PADDING}}{MANIFEST_PART_SUFFIX}"
262 )
263
264 manifest = Manifest(version="1", parts=[ManifestPartReference(path=manifest_part_file_path)])
265
266 # Write single manifest part with metadata as JSON lines (each object on a new line)
267 manifest_part_content = "\n".join(
268 [json.dumps(_metadata_to_manifest_dict(metadata)) for metadata in object_metadata]
269 )
270 storage_provider.put_object(
271 os.path.join(manifest_folderpath, manifest_part_file_path), manifest_part_content.encode("utf-8")
272 )
273
274 # Write the main manifest file
275 manifest_file_path = os.path.join(manifest_folderpath, MANIFEST_INDEX_FILENAME)
276 manifest_content = manifest.to_json()
277 storage_provider.put_object(manifest_file_path, manifest_content.encode("utf-8"))
278
[docs]
279 def list_objects(
280 self, prefix: str, start_after: Optional[str] = None, end_at: Optional[str] = None
281 ) -> Iterator[ObjectMetadata]:
282 if (start_after is not None) and (end_at is not None) and not (start_after < end_at):
283 raise ValueError(f"start_after ({start_after}) must be before end_at ({end_at})!")
284
285 # Note that this is a generator, not a tuple (there's no tuple comprehension).
286 keys = (
287 key
288 for key in self._files
289 if key.startswith(prefix)
290 and (start_after is None or start_after < key)
291 and (end_at is None or key <= end_at)
292 )
293
294 # Dictionaries don't guarantee lexicographical order.
295 for key in sorted(keys):
296 yield ObjectMetadata(key=key, content_length=0, last_modified=datetime.now(timezone.utc))
297
[docs]
298 def get_object_metadata(self, path: str) -> ObjectMetadata:
299 metadata = self._files.get(path, None)
300 if metadata is None:
301 raise FileNotFoundError(f"Object {path} does not exist.")
302 return metadata
303
[docs]
304 def glob(self, pattern: str) -> List[str]:
305 all_objects = [object.key for object in self.list_objects("")]
306 return [key for key in glob(all_objects, pattern)]
307
[docs]
308 def realpath(self, path: str) -> Tuple[str, bool]:
309 exists = path in self._files
310 return path, exists
311
[docs]
312 def add_file(self, path: str, metadata: ObjectMetadata) -> None:
313 if not self.is_writable():
314 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to add {path}.")
315 self._pending_adds[path] = metadata
316
[docs]
317 def remove_file(self, path: str) -> None:
318 if not self.is_writable():
319 raise RuntimeError(f"Manifest update support not enabled in configuration. Attempted to remove {path}.")
320 if path not in self._files:
321 raise FileNotFoundError(f"Object {path} does not exist.")
322 self._pending_removes.append(path)
323
326
[docs]
327 def commit_updates(self) -> None:
328 if not self._pending_adds and not self._pending_removes:
329 return
330
331 if self._pending_adds:
332 self._files.update(self._pending_adds)
333 self._pending_adds = {}
334
335 for path in self._pending_removes:
336 self._files.pop(path)
337
338 # Collect metadata for each object to write out in this part file.
339 object_metadata = [
340 ObjectMetadata(key=file_path, content_length=metadata.content_length, last_modified=metadata.last_modified)
341 for file_path, metadata in self._files.items()
342 ]
343
344 self._write_manifest_files(self._storage_provider, object_metadata)