Source code for multistorageclient.generators.manifest_metadata

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import json
 17from concurrent.futures import ThreadPoolExecutor
 18from typing import Optional
 19
 20from multistorageclient.types import ObjectMetadata
 21from multistorageclient.utils import calculate_worker_processes_and_threads
 22
 23from .. import StorageClient
 24from ..providers.manifest_formats import ManifestFormat
 25from ..providers.manifest_metadata import DEFAULT_MANIFEST_BASE_DIR, ManifestMetadataProvider
 26
 27
[docs] 28class ManifestMetadataGenerator: 29 """ 30 Generates a file metadata manifest for use with a :py:class:`multistorageclient.providers.ManifestMetadataProvider`. 31 """ 32 33 @staticmethod 34 def _generate_manifest_part_body(object_metadata: list[ObjectMetadata]) -> bytes: 35 return "\n".join( 36 [ 37 json.dumps({**metadata_dict, "size_bytes": metadata_dict.pop("content_length")}) 38 for metadata in object_metadata 39 for metadata_dict in [metadata.to_dict()] 40 ] 41 ).encode(encoding="utf-8") 42
[docs] 43 @staticmethod 44 def generate_and_write_manifest( 45 data_storage_client: StorageClient, 46 manifest_storage_client: StorageClient, 47 partition_keys: Optional[list[str]] = None, 48 manifest_format: ManifestFormat = ManifestFormat.JSONL, 49 allow_overwrites: bool = True, 50 ) -> None: 51 """ 52 Generates a file metadata manifest. 53 54 The data storage client's base path should be set to the root path for data objects (e.g. ``my-bucket/my-data-prefix``). 55 56 The manifest storage client's base path should be set to the root path for manifest objects (e.g. ``my-bucket/my-manifest-prefix``). 57 58 The following manifest objects will be written with the destination storage client (with the total number of manifest parts being variable):: 59 60 .msc_manifests/ 61 ├── msc_manifest_index.json 62 └── parts/ 63 ├── msc_manifest_part000001.jsonl (or .parquet) 64 ├── ... 65 └── msc_manifest_part999999.jsonl (or .parquet) 66 67 :param data_storage_client: Storage client for reading data objects. 68 :param manifest_storage_client: Storage client for writing manifest objects. 69 :param partition_keys: Optional list of keys to partition the listing operation. If provided, objects will be listed concurrently using these keys as boundaries. 70 :param manifest_format: Format for manifest parts. Defaults to ManifestFormat.JSONL. 71 :param allow_overwrites: Whether to allow overwriting existing files in the manifest. Defaults to True for backwards compatibility. 72 """ 73 data_storage_provider = data_storage_client._storage_provider 74 manifest_storage_provider = manifest_storage_client._storage_provider 75 76 manifest_metadata_provider = ManifestMetadataProvider( 77 storage_provider=manifest_storage_provider, 78 manifest_path="", 79 writable=True, 80 manifest_format=manifest_format, 81 allow_overwrites=allow_overwrites, 82 ) 83 84 if partition_keys is not None: 85 _, num_worker_threads = calculate_worker_processes_and_threads() 86 87 boundaries = list(zip([""] + partition_keys, partition_keys + [None])) 88 89 def process_partition(boundary): 90 start_after, end_at = boundary 91 for object_metadata in data_storage_provider.list_objects( 92 path="", start_after=start_after, end_at=end_at, show_attributes=True 93 ): 94 if DEFAULT_MANIFEST_BASE_DIR not in object_metadata.key.split("/"): # Do not track manifest files 95 manifest_metadata_provider.add_file(path=object_metadata.key, metadata=object_metadata) 96 97 with ThreadPoolExecutor(max_workers=num_worker_threads) as executor: 98 futures = [executor.submit(process_partition, boundary) for boundary in boundaries] 99 for future in futures: 100 future.result() 101 else: 102 for object_metadata in data_storage_provider.list_objects(path="", show_attributes=True): 103 if DEFAULT_MANIFEST_BASE_DIR not in object_metadata.key.split("/"): # Do not track manifest files 104 manifest_metadata_provider.add_file(path=object_metadata.key, metadata=object_metadata) 105 106 manifest_metadata_provider.commit_updates()