Source code for sdp.processors.toloka.create_pool

# Copyright (c) 2024, NVIDIA CORPORATION.  All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import json
import os

from sdp.logging import logger
from sdp.processors.base_processor import BaseParallelProcessor

try:
    import toloka.client
    import toloka.client.project.template_builder
    TOLOKA_AVAILABLE = True
except ImportError:
    TOLOKA_AVAILABLE = False
    toloka = None
    


[docs] class CreateTolokaPool(BaseParallelProcessor): """Creates a Toloka pool for a given project based on user-provided configurations. This class connects to Toloka, loads necessary settings, creates a new pool, and optionally sets up quality control mechanisms for worker submissions. Args: lang (str): The language filter for the pool. Default: 'HY'. **kwargs: Additional keyword arguments to be passed to the base class `BaseParallelProcessor`. Returns: A newly created pool on the Toloka platform, configured and ready for task assignment. """ def __init__( self, lang: str = 'HY', **kwargs, ): """ Constructs the necessary attributes for the CreateTolokaPool class. Parameters: ---------- lang : str, optional The language filter for the pool. Defaults to 'HY'. """ super().__init__(**kwargs) self.API_KEY = os.getenv('TOLOKA_API_KEY') if not self.API_KEY: raise ValueError("TOLOKA_API_KEY environment variable is not set") self.platform = os.getenv('TOLOKA_PLATFORM') if not self.platform: raise ValueError("TOLOKA_PLATFORM environment variable is not set") # Project ID will be read from the input manifest file in process_dataset_entry self.project_id = None self.lang = lang self.toloka_available = TOLOKA_AVAILABLE def process_dataset_entry(self, data_entry): """ Creates a new Toloka pool based on the provided dataset entry. This method retrieves the project ID from the dataset entry and uses Toloka's API to create a new pool for the specified project and returns the pool details. Parameters: ---------- data_entry : dict A dictionary containing the data entry information, which should include project_id. Returns: ------- list A list containing a DataEntry object with the new pool ID if successful, or an empty list if failed. """ if self.toloka_available != True: logger.warning("Toloka is currently not supported. CreatePool processor functionality will be limited.") # Get project_id from the data entry project_id = data_entry.get("project_id") if not project_id: logger.error("No project_id found in data entry") return [] try: toloka_client = toloka.client.TolokaClient(self.API_KEY, self.platform) new_pool = toloka.client.Pool( project_id=project_id, private_name='Voice recording', may_contain_adult_content=False, will_expire=datetime.datetime.utcnow() + datetime.timedelta(days=365), reward_per_assignment=0.01, assignment_max_duration_seconds=60 * 10, auto_accept_solutions=False, auto_accept_period_day=14, filter=( (toloka.client.filter.Languages.in_(self.lang)) & (toloka.client.filter.ClientType == 'TOLOKA_APP') ), ) new_pool.set_mixer_config(real_tasks_count=5) self.setup_quality_control(new_pool) new_pool = toloka_client.create_pool(new_pool) data = {"pool_id": new_pool.id} return [DataEntry(data=data)] except Exception as e: logger.error(f"Failed to create a new pool in Toloka: {e}") return [] def setup_quality_control(self, pool): """ Sets up quality control rules for the Toloka pool to ensure high-quality task results. Parameters: ---------- pool : toloka.client.Pool The pool object for which quality control rules will be set up. """ # Control for skipped tasks in a row pool.quality_control.add_action( collector=toloka.client.collectors.SkippedInRowAssignments(), conditions=[toloka.client.conditions.SkippedInRowCount >= 2], action=toloka.client.actions.RestrictionV2( scope='POOL', duration=1, duration_unit='DAYS', private_comment='Skips too many task suites in a row', ), ) # Control for fast responses that might indicate fraud pool.quality_control.add_action( collector=toloka.client.collectors.AssignmentSubmitTime(history_size=10, fast_submit_threshold_seconds=60), conditions=[toloka.client.conditions.FastSubmittedCount >= 5], action=toloka.client.actions.RestrictionV2( scope='ALL_PROJECTS', duration_unit='PERMANENT', private_comment='Fast responses', ), )