Source code for sdp.processors.inference.quality_estimation.pymarian

# Copyright (c) 2025, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import json
from tqdm import tqdm
import termplotlib as tpl
import numpy as np

from sdp.logging import logger
from sdp.processors.base_processor import BaseProcessor


[docs] class CometoidWMTQualityEstimation(BaseProcessor): """ A processor for estimating translation quality using pretrained COMET-like models based on MarianNMT and the pymarian Evaluator. This processor evaluates the quality of source-target text pairs (bitext) using COMETOID-style quality estimation and appends the resulting score to each dataset entry. Args: source_text_field (str): The key in the data entry containing the source (original) text. target_text_field (str): The key in the data entry containing the target (translated) text. model_name_or_path (str): Hugging Face model name or path to local model checkpoint. vocab_path (str, optional): Path to the vocabulary file. If None and model is from HF, it will be downloaded. save_model_to (str, optional): Directory to download and cache the model and vocab. mini_batch (int): Mini-batch size for evaluation. maxi_batch (int): Maxi-batch size for evaluation. output_field (str): The name of the field where the quality score will be saved in the output manifest. device_type (str): Device type to use: 'cpu' or 'gpu'. num_devices (int): Number of CPU threads or GPU devices to use. Use -1 to use all available. chunksize (int): Number of lines to process in each chunk. Returns: A manifest file where each entry has an added key (`output_field`) with the computed score. .. note:: This processor uses MarianNMT models fine-tuned for quality estimation. See https://marian-nmt.github.io/. Make sure to install `pymarian` before using this processor: pip install pymarian """ # Mapping of supported model aliases to Hugging Face repo paths MODEL_NAME_TO_HF_PATH = { "cometoid-wmt23": "marian-nmt/cometoid22-wmt23", "cometoid-wmt23-mqm": "marian-nmt/cometoid22-wmt23", } # Marian evaluation arguments depending on device MARIAN_GPU_ARGS = "-w 8000 -d {device_indicies}" MARIAN_CPU_ARGS = "-w 2000 --cpu-threads {num_threads}" def __init__(self, source_text_field: str, target_text_field: str, model_name_or_path: str, vocab_path: str = None, save_model_to: str = None, mini_batch: int = 16, maxi_batch: int = 96, output_field: str = 'cometoid_score', device_type: str = 'cpu', num_devices: int = -1, chunksize = 5000, **kwargs, ): super().__init__(**kwargs) self.source_text_field = source_text_field self.target_text_field = target_text_field self.model_name_or_path = model_name_or_path self.vocab_path = vocab_path self.save_model_to = save_model_to self.device_type = device_type self.max_workers = num_devices self.mini_batch = mini_batch self.maxi_batch = maxi_batch self.output_field = output_field self.model = None self.chunksize = chunksize self.number_of_entries = 0 def load_model(self): try: from pymarian import Evaluator except ImportError: raise ImportError("`pymarian` is not installed. Please install it using `pip install pymarian`.") from huggingface_hub import hf_hub_download """ Load the model and vocabulary from Hugging Face if necessary. Assemble command-line arguments for launching pymarian Evaluator. Depending on the device (CPU/GPU), configure parallelism parameters. """ repo_id = None if self.model_name_or_path in self.MODEL_NAME_TO_HF_PATH: repo_id = self.MODEL_NAME_TO_HF_PATH[self.model_name_or_path] self.model_name_or_path = hf_hub_download(repo_id, filename="checkpoints/marian.model.bin", local_dir = self.save_model_to) if not os.path.exists(self.model_name_or_path): raise ValueError(f'`model_name_or_path`: model name is not valid or model path does not exist ({self.model_name_or_path}).') if not self.vocab_path and repo_id is not None: self.vocab_path = hf_hub_download(repo_id=repo_id, filename="vocab.spm", local_dir = self.save_model_to) if not os.path.exists(self.vocab_path): raise FileNotFoundError(f'`vocab_path`: path does not exist ({self.vocab_path}).') marian_args = f"-m {self.model_name_or_path} -v {self.vocab_path} {self.vocab_path} --like comet-qe" if self.device_type == "cpu": max_available_cpus = os.cpu_count() if self.max_workers == -1 or self.max_workers > max_available_cpus: self.max_workers = max_available_cpus cpu_args = self.MARIAN_CPU_ARGS.format(num_threads = self.max_workers) marian_args += f' {cpu_args}' else: try: import torch if torch.cuda.is_available(): max_available_gpus = torch.cuda.device_count() if self.max_workers == -1 or self.max_workers > max_available_gpus: self.max_workers = max_available_gpus except Exception: pass device_indicies = ' '.join([str(i) for i in range(self.max_workers)]) gpu_args = self.MARIAN_GPU_ARGS.format(device_indicies = device_indicies) marian_args += f' {gpu_args}' marian_args += f' --mini-batch {self.mini_batch} --maxi-batch {self.maxi_batch}' self.model = Evaluator(marian_args) def _chunk_manifest(self): """Splits the manifest into smaller chunks defined by ``in_memory_chunksize``.""" manifest_chunk = [] for idx, data_entry in enumerate(self.read_manifest(), 1): manifest_chunk.append(data_entry) if idx % self.chunksize == 0: yield manifest_chunk manifest_chunk = [] if manifest_chunk: yield manifest_chunk def read_manifest(self): """Reading the input manifest file. .. note:: This function should be overridden in the "initial" class creating manifest to read from the original source of data. """ if not self.input_manifest_file: raise NotImplementedError("Override this method if no input manifest file is used") with open(self.input_manifest_file, "rt", encoding="utf8") as fin: for line in fin: yield json.loads(line) def process(self): """ Process the entire manifest in chunks. For each pair of texts (source–target), compute the translation quality score. Save the resulting scores in output_manifest_file. """ self.load_model() os.makedirs(os.path.dirname(self.output_manifest_file), exist_ok=True) metrics = [] with open(self.output_manifest_file, "wt", encoding="utf8") as fout: for manifest_chunk in self._chunk_manifest(): entries = [] bitext_pairs = [] for data_entry in manifest_chunk: src = str(data_entry[self.source_text_field]).replace('\t', ' ') tgt = str(data_entry[self.target_text_field]).replace('\t', ' ') bitext_pairs.append(f'{src}\t{tgt}') entries.append(data_entry) scores = self.model.evaluate(bitext_pairs) for entry, score in tqdm(zip(entries, scores)): metrics.append(score) entry[self.output_field] = score json.dump(entry, fout, ensure_ascii=False) self.number_of_entries += 1 fout.write("\n") self.finalize(metrics) def finalize(self, metrics): """ Print statistics about the quality scores: histogram, min, max, mean, median. Use termplotlib to render the histogram directly in the terminal. """ logger.info("Total number of entries after processing: %d", self.number_of_entries) logger.info("Histogram of scores:") bins = np.arange(0, 1.1, 0.1) hist, bin_edges = np.histogram(metrics, bins=bins) labels = [] for i in range(len(bin_edges) - 1): left = f"{bin_edges[i]:.1f}" right = f"{bin_edges[i+1]:.1f}" if i < len(bin_edges) - 2: labels.append(f"[{left}{right})") else: labels.append(f"[{left}{right}]") fig = tpl.figure() fig.barh(hist, labels) fig.show() logger.info(f"Min score: {np.min(metrics):.4f}") logger.info(f"Max score: {np.max(metrics):.4f}") logger.info(f"Mean score: {np.mean(metrics):.4f}") logger.info(f"Median score: {np.median(metrics):.4f}")