Source code for multistorageclient.generators.manifest_metadata
1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import json
17from concurrent.futures import ThreadPoolExecutor
18from typing import Optional
19
20from multistorageclient.types import ObjectMetadata
21from multistorageclient.utils import calculate_worker_processes_and_threads
22
23from .. import StorageClient
24from ..providers.manifest_formats import ManifestFormat
25from ..providers.manifest_metadata import DEFAULT_MANIFEST_BASE_DIR, ManifestMetadataProvider
26
27
[docs]
28class ManifestMetadataGenerator:
29 """
30 Generates a file metadata manifest for use with a :py:class:`multistorageclient.providers.ManifestMetadataProvider`.
31 """
32
33 @staticmethod
34 def _generate_manifest_part_body(object_metadata: list[ObjectMetadata]) -> bytes:
35 return "\n".join(
36 [
37 json.dumps({**metadata_dict, "size_bytes": metadata_dict.pop("content_length")})
38 for metadata in object_metadata
39 for metadata_dict in [metadata.to_dict()]
40 ]
41 ).encode(encoding="utf-8")
42
[docs]
43 @staticmethod
44 def generate_and_write_manifest(
45 data_storage_client: StorageClient,
46 manifest_storage_client: StorageClient,
47 partition_keys: Optional[list[str]] = None,
48 manifest_format: ManifestFormat = ManifestFormat.JSONL,
49 allow_overwrites: bool = True,
50 ) -> None:
51 """
52 Generates a file metadata manifest.
53
54 The data storage client's base path should be set to the root path for data objects (e.g. ``my-bucket/my-data-prefix``).
55
56 The manifest storage client's base path should be set to the root path for manifest objects (e.g. ``my-bucket/my-manifest-prefix``).
57
58 The following manifest objects will be written with the destination storage client (with the total number of manifest parts being variable)::
59
60 .msc_manifests/
61 ├── msc_manifest_index.json
62 └── parts/
63 ├── msc_manifest_part000001.jsonl (or .parquet)
64 ├── ...
65 └── msc_manifest_part999999.jsonl (or .parquet)
66
67 :param data_storage_client: Storage client for reading data objects.
68 :param manifest_storage_client: Storage client for writing manifest objects.
69 :param partition_keys: Optional list of keys to partition the listing operation. If provided, objects will be listed concurrently using these keys as boundaries.
70 :param manifest_format: Format for manifest parts. Defaults to ManifestFormat.JSONL.
71 :param allow_overwrites: Whether to allow overwriting existing files in the manifest. Defaults to True for backwards compatibility.
72 """
73 data_storage_provider = data_storage_client._storage_provider
74 manifest_storage_provider = manifest_storage_client._storage_provider
75
76 if data_storage_provider is None or manifest_storage_provider is None:
77 raise ValueError(
78 "ManifestMetadataGenerator requires SingleStorageClient with _storage_provider. "
79 "CompositeStorageClient (multi-backend) is not supported."
80 )
81
82 manifest_metadata_provider = ManifestMetadataProvider(
83 storage_provider=manifest_storage_provider,
84 manifest_path="",
85 writable=True,
86 manifest_format=manifest_format,
87 allow_overwrites=allow_overwrites,
88 )
89
90 if partition_keys is not None:
91 _, num_worker_threads = calculate_worker_processes_and_threads()
92
93 boundaries = list(zip([""] + partition_keys, partition_keys + [None]))
94
95 def process_partition(boundary):
96 start_after, end_at = boundary
97 for object_metadata in data_storage_provider.list_objects(
98 path="", start_after=start_after, end_at=end_at, show_attributes=True
99 ):
100 if DEFAULT_MANIFEST_BASE_DIR not in object_metadata.key.split("/"): # Do not track manifest files
101 manifest_metadata_provider.add_file(path=object_metadata.key, metadata=object_metadata)
102
103 with ThreadPoolExecutor(max_workers=num_worker_threads) as executor:
104 futures = [executor.submit(process_partition, boundary) for boundary in boundaries]
105 for future in futures:
106 future.result()
107 else:
108 for object_metadata in data_storage_provider.list_objects(path="", show_attributes=True):
109 if DEFAULT_MANIFEST_BASE_DIR not in object_metadata.key.split("/"): # Do not track manifest files
110 manifest_metadata_provider.add_file(path=object_metadata.key, metadata=object_metadata)
111
112 manifest_metadata_provider.commit_updates()