Source code for multistorageclient.telemetry

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import atexit
 17import enum
 18import inspect
 19import json
 20import logging
 21import multiprocessing
 22import multiprocessing.managers
 23import multiprocessing.process
 24import threading
 25from typing import Any, Optional, Union
 26
 27import opentelemetry.metrics as api_metrics
 28import opentelemetry.trace as api_trace
 29
 30from .. import utils
 31
 32# MSC telemetry prefers publishing raw samples when possible to support arbitrary post-hoc aggregations.
 33#
 34# Some setups, however, may need resampling to reduce sample volume. The resampling methods we use
 35# sacrifice temporal resolution to preserve other information. Which method is used depends on if
 36# the expected post-hoc aggregate function is decomposable:
 37#
 38# * Decomposable aggregate functions (e.g. count, sum, min, max).
 39#   * Use client-side aggregation.
 40#     * E.g. preserve request + response counts.
 41# * Non-decomposable aggregate functions (e.g. average, percentile).
 42#   * Use decimation by an integer factor or last value.
 43#     * E.g. preserve the shape of the latency distribution (unlike tail sampling).
 44
 45_METRICS_EXPORTER_MAPPING = {
 46    "console": "opentelemetry.sdk.metrics.export.ConsoleMetricExporter",
 47    "otlp": "opentelemetry.exporter.otlp.proto.http.metric_exporter.OTLPMetricExporter",
 48    # "Private" until it's decided whether this will be official.
 49    "_otlp_msal": "multistorageclient.telemetry.metrics.exporters.otlp_msal._OTLPMSALMetricExporter",
 50}
 51
 52_TRACE_EXPORTER_MAPPING = {
 53    "console": "opentelemetry.sdk.trace.export.ConsoleSpanExporter",
 54    "otlp": "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter",
 55}
 56
 57logger = logging.getLogger(__name__)
 58
 59
[docs] 60class Telemetry: 61 """ 62 Provides telemetry resources. 63 64 Instances shouldn't be copied between processes. Not fork-safe or pickleable. 65 66 Instances can be shared between processes by registering with a :py:class:`multiprocessing.managers.BaseManager` and using proxy objects. 67 """ 68 69 # Metrics are named `multistorageclient.{property}(.{aggregation})?`. 70 # 71 # For example: 72 # 73 # - multistorageclient.data_size 74 # - Gauge for data size per individual operation. 75 # - For distributions (e.g. post-hoc histograms + heatmaps). 76 # - multistorageclient.data_size.sum 77 # - Counter (sum) for data size across all operations. 78 # - For aggregates (e.g. post-hoc data rate calculations). 79 80 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics 81 class GaugeName(enum.Enum): 82 LATENCY = "multistorageclient.latency" 83 DATA_SIZE = "multistorageclient.data_size" 84 DATA_RATE = "multistorageclient.data_rate" 85 86 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units 87 _GAUGE_UNIT_MAPPING: dict[GaugeName, str] = { 88 # Seconds. 89 GaugeName.LATENCY: "s", 90 # Bytes. 91 GaugeName.DATA_SIZE: "By", 92 # Bytes/second. 93 GaugeName.DATA_RATE: "By/s", 94 } 95 96 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics 97 class CounterName(enum.Enum): 98 REQUEST_SUM = "multistorageclient.request.sum" 99 RESPONSE_SUM = "multistorageclient.response.sum" 100 DATA_SIZE_SUM = "multistorageclient.data_size.sum" 101 102 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units 103 _COUNTER_UNIT_MAPPING: dict[CounterName, str] = { 104 # Unitless. 105 CounterName.REQUEST_SUM: "{request}", 106 # Unitless. 107 CounterName.RESPONSE_SUM: "{response}", 108 # Bytes. 109 CounterName.DATA_SIZE_SUM: "By", 110 } 111 112 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter provider. 113 _meter_provider_cache: dict[str, api_metrics.MeterProvider] 114 _meter_provider_cache_lock: threading.Lock 115 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter. 116 _meter_cache: dict[str, api_metrics.Meter] 117 _meter_cache_lock: threading.Lock 118 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to gauge name to gauge. 119 _gauge_cache: dict[str, dict[GaugeName, api_metrics._Gauge]] 120 _gauge_cache_lock: threading.Lock 121 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to counter name to counter. 122 _counter_cache: dict[str, dict[CounterName, api_metrics.Counter]] 123 _counter_cache_lock: threading.Lock 124 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer provider. 125 _tracer_provider_cache: dict[str, api_trace.TracerProvider] 126 _tracer_provider_cache_lock: threading.Lock 127 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer. 128 _tracer_cache: dict[str, api_trace.Tracer] 129 _tracer_cache_lock: threading.Lock 130 131 def __init__(self): 132 self._meter_provider_cache = {} 133 self._meter_provider_cache_lock = threading.Lock() 134 self._meter_cache = {} 135 self._meter_cache_lock = threading.Lock() 136 self._gauge_cache = {} 137 self._gauge_cache_lock = threading.Lock() 138 self._counter_cache = {} 139 self._counter_cache_lock = threading.Lock() 140 self._tracer_provider_cache = {} 141 self._tracer_provider_cache_lock = threading.Lock() 142 self._tracer_cache = {} 143 self._tracer_cache_lock = threading.Lock() 144 145 def meter_provider(self, config: dict[str, Any]) -> Optional[api_metrics.MeterProvider]: 146 """ 147 Create or return an existing :py:class:`api_metrics.MeterProvider` for a config. 148 149 :param config: ``.opentelemetry.metrics`` config dict. 150 :return: A :py:class:`api_metrics.MeterProvider` or ``None`` if no valid exporter is configured. 151 """ 152 config_json = json.dumps(config, sort_keys=True) 153 with self._meter_provider_cache_lock: 154 if config_json in self._meter_provider_cache: 155 return self._meter_provider_cache[config_json] 156 else: 157 if "exporter" in config: 158 try: 159 import opentelemetry.sdk.metrics as sdk_metrics 160 import opentelemetry.sdk.metrics.export as sdk_metrics_export 161 162 from .metrics.readers.diperiodic_exporting import DiperiodicExportingMetricReader 163 164 exporter_type: str = config["exporter"]["type"] 165 exporter_fully_qualified_name = _METRICS_EXPORTER_MAPPING.get(exporter_type, exporter_type) 166 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1) 167 cls = utils.import_class(exporter_class_name, exporter_module_name) 168 exporter_options = config["exporter"].get("options", {}) 169 exporter: sdk_metrics_export.MetricExporter = cls(**exporter_options) 170 171 reader_options = config.get("reader", {}).get("options", {}) 172 reader: sdk_metrics_export.MetricReader = DiperiodicExportingMetricReader( 173 **reader_options, exporter=exporter 174 ) 175 176 return self._meter_provider_cache.setdefault( 177 config_json, sdk_metrics.MeterProvider(metric_readers=[reader]) 178 ) 179 except (AttributeError, ImportError): 180 logger.error( 181 "Failed to import OpenTelemetry Python SDK or exporter! Disabling metrics.", exc_info=True 182 ) 183 return None 184 else: 185 # Don't return a no-op meter provider to avoid unnecessary overhead. 186 logger.error("No exporter configured! Disabling metrics.") 187 return None 188 189 def meter(self, config: dict[str, Any]) -> Optional[api_metrics.Meter]: 190 """ 191 Create or return an existing :py:class:`api_metrics.Meter` for a config. 192 193 :param config: ``.opentelemetry.metrics`` config dict. 194 :return: A :py:class:`api_metrics.Meter` or ``None`` if no valid exporter is configured. 195 """ 196 config_json = json.dumps(config, sort_keys=True) 197 with self._meter_cache_lock: 198 if config_json in self._meter_cache: 199 return self._meter_cache[config_json] 200 else: 201 meter_provider = self.meter_provider(config=config) 202 if meter_provider is None: 203 return None 204 else: 205 return self._meter_cache.setdefault( 206 config_json, meter_provider.get_meter(name="multistorageclient") 207 ) 208 209 def gauge(self, config: dict[str, Any], name: GaugeName) -> Optional[api_metrics._Gauge]: 210 """ 211 Create or return an existing :py:class:`api_metrics.Gauge` for a config and gauge name. 212 213 :param config: ``.opentelemetry.metrics`` config dict. 214 :return: A :py:class:`api_metrics.Gauge` or ``None`` if no valid exporter is configured. 215 """ 216 config_json = json.dumps(config, sort_keys=True) 217 with self._gauge_cache_lock: 218 if config_json in self._gauge_cache and name in self._gauge_cache[config_json]: 219 return self._gauge_cache[config_json][name] 220 else: 221 meter = self.meter(config=config) 222 if meter is None: 223 return None 224 else: 225 return self._gauge_cache.setdefault(config_json, {}).setdefault( 226 name, 227 meter.create_gauge(name=name.value, unit=Telemetry._GAUGE_UNIT_MAPPING.get(name, "")), 228 ) 229 230 def counter(self, config: dict[str, Any], name: CounterName) -> Optional[api_metrics.Counter]: 231 """ 232 Create or return an existing :py:class:`api_metrics.Counter` for a config and counter name. 233 234 :param config: ``.opentelemetry.metrics`` config dict. 235 :return: A :py:class:`api_metrics.Counter` or ``None`` if no valid exporter is configured. 236 """ 237 config_json = json.dumps(config, sort_keys=True) 238 with self._counter_cache_lock: 239 if config_json in self._counter_cache and name in self._counter_cache[config_json]: 240 return self._counter_cache[config_json][name] 241 else: 242 meter = self.meter(config=config) 243 if meter is None: 244 return None 245 else: 246 return self._counter_cache.setdefault(config_json, {}).setdefault( 247 name, 248 meter.create_counter(name=name.value, unit=Telemetry._COUNTER_UNIT_MAPPING.get(name, "")), 249 ) 250 251 def tracer_provider(self, config: dict[str, Any]) -> Optional[api_trace.TracerProvider]: 252 """ 253 Create or return an existing :py:class:`api_trace.TracerProvider` for a config. 254 255 :param config: ``.opentelemetry.traces`` config dict. 256 :return: A :py:class:`api_trace.TracerProvider` or ``None`` if no valid exporter is configured. 257 """ 258 config_json = json.dumps(config, sort_keys=True) 259 with self._tracer_provider_cache_lock: 260 if config_json in self._tracer_provider_cache: 261 return self._tracer_provider_cache[config_json] 262 else: 263 if "exporter" in config: 264 try: 265 import opentelemetry.sdk.trace as sdk_trace 266 import opentelemetry.sdk.trace.export as sdk_trace_export 267 import opentelemetry.sdk.trace.sampling as sdk_trace_sampling 268 269 exporter_type: str = config["exporter"]["type"] 270 exporter_fully_qualified_name = _TRACE_EXPORTER_MAPPING.get(exporter_type, exporter_type) 271 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1) 272 cls = utils.import_class(exporter_class_name, exporter_module_name) 273 exporter_options = config["exporter"].get("options", {}) 274 exporter: sdk_trace_export.SpanExporter = cls(**exporter_options) 275 276 processor: sdk_trace.SpanProcessor = sdk_trace.SynchronousMultiSpanProcessor() 277 processor.add_span_processor(sdk_trace_export.BatchSpanProcessor(span_exporter=exporter)) 278 279 # TODO: Add sampler to configuration schema. 280 sampler: sdk_trace_sampling.Sampler = sdk_trace_sampling.ALWAYS_ON 281 282 return self._tracer_provider_cache.setdefault( 283 config_json, 284 sdk_trace.TracerProvider(active_span_processor=processor, sampler=sampler), 285 ) 286 except (AttributeError, ImportError): 287 logger.error( 288 "Failed to import OpenTelemetry Python SDK or exporter! Disabling traces.", exc_info=True 289 ) 290 return None 291 else: 292 logger.error("No exporter configured! Disabling traces.") 293 return None 294 295 def tracer(self, config: dict[str, Any]) -> Optional[api_trace.Tracer]: 296 """ 297 Create or return an existing :py:class:`api_trace.Tracer` for a config. 298 299 :param config: ``.opentelemetry.traces`` config dict. 300 :return: A :py:class:`api_trace.Tracer` or ``None`` if no valid exporter is configured. 301 """ 302 config_json = json.dumps(config, sort_keys=True) 303 with self._tracer_cache_lock: 304 if config_json in self._tracer_cache: 305 return self._tracer_cache[config_json] 306 else: 307 tracer_provider = self.tracer_provider(config=config) 308 if tracer_provider is None: 309 return None 310 else: 311 return self._tracer_cache.setdefault( 312 config_json, tracer_provider.get_tracer(instrumenting_module_name="multistorageclient") 313 )
314 315 316# To share a single :py:class:`Telemetry` within a process (e.g. local, manager). 317# 318# A manager's server processes shouldn't be forked, so this should be safe. 319_TELEMETRY: Optional[Telemetry] = None 320_TELEMETRY_LOCK = threading.Lock() 321 322 323def _init() -> Telemetry: 324 """ 325 Create or return an existing :py:class:`Telemetry`. 326 327 :return: A telemetry instance. 328 """ 329 global _TELEMETRY 330 global _TELEMETRY_LOCK 331 332 with _TELEMETRY_LOCK: 333 if _TELEMETRY is None: 334 _TELEMETRY = Telemetry() 335 return _TELEMETRY 336 337
[docs] 338class TelemetryManager(multiprocessing.managers.BaseManager): 339 """ 340 A :py:class:`multiprocessing.managers.BaseManager` for telemetry resources. 341 342 The OpenTelemetry Python SDK isn't fork-safe since telemetry sample buffers can be duplicated. 343 344 In addition, Python ≤3.12 doesn't call exit handlers for forked processes. 345 This causes the OpenTelemetry Python SDK to not flush telemetry before exiting. 346 347 * https://github.com/open-telemetry/opentelemetry-python/issues/4215 348 * https://github.com/open-telemetry/opentelemetry-python/issues/3307 349 350 Forking is multiprocessing's default start method for non-macOS POSIX systems until Python 3.14. 351 352 * https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods 353 354 To fully support multiprocessing, resampling + publishing is handled by 355 a single process that's (ideally) a child of (i.e. directly under) the main process. This: 356 357 * Relieves other processes of this work. 358 359 * Avoids issues with duplicate samples when forking and unpublished samples when exiting forks. 360 361 * Allows cross-process resampling. 362 * Reuses a single connection pool to telemetry backends. 363 364 The downside is it essentially re-introduces global interpreter lock (GIL) with 365 additional IPC overhead. Telemetry operations, however, should be lightweight so 366 this isn't expected to be a problem. Remote data store latency should still be 367 the primary throughput limiter for storage clients. 368 369 :py:class:`multiprocessing.managers.BaseManager` is used for this since it creates 370 a separate server process for shared objects. 371 372 Telemetry resources are provided as 373 `proxy objects <https://docs.python.org/3/library/multiprocessing.html#proxy-objects>`_ 374 for location transparency. 375 376 The documentation isn't particularly detailed, but others have written comprehensively on this: 377 378 * https://zpz.github.io/blog/python-mp-manager-1 379 * https://zpz.github.io/blog/python-mp-manager-2 380 * https://zpz.github.io/blog/python-mp-manager-3 381 382 By specification, metric and tracer providers must call shutdown on any 383 underlying metric readers + span processors + exporters. 384 385 * https://opentelemetry.io/docs/specs/otel/metrics/sdk#shutdown 386 * https://opentelemetry.io/docs/specs/otel/trace/sdk#shutdown 387 388 In the OpenTelemetry Python SDK, provider shutdown is called automatically 389 by exit handlers (when they work at least). Consequently, clients should: 390 391 * Only receive proxy objects. 392 393 * Enables metric reader + span processor + exporter re-use across processes. 394 395 * Never call shutdown on the proxy objects. 396 397 * The shutdown exit handler is registered on the manager's server process. 398 * ⚠️ We expect a finite number of providers (i.e. no dynamic configs) so we don't leak them. 399 """ 400 401 pass
402 403 404def _fully_qualified_name(c: type[Any]) -> str: 405 """ 406 Return the fully qualified name for a class (e.g. ``module.Class``). 407 408 For :py:class:`multiprocessing.Manager` type IDs. 409 """ 410 return ".".join([c.__module__, c.__qualname__]) 411 412 413# Metrics proxy object setup. 414TelemetryManager.register(typeid=_fully_qualified_name(api_metrics._Gauge)) 415TelemetryManager.register(typeid=_fully_qualified_name(api_metrics.Counter)) 416TelemetryManager.register( 417 typeid=_fully_qualified_name(api_metrics.Meter), 418 method_to_typeid={ 419 api_metrics.Meter.create_gauge.__name__: _fully_qualified_name(api_metrics._Gauge), 420 api_metrics.Meter.create_counter.__name__: _fully_qualified_name(api_metrics.Counter), 421 }, 422) 423TelemetryManager.register( 424 typeid=_fully_qualified_name(api_metrics.MeterProvider), 425 method_to_typeid={api_metrics.MeterProvider.get_meter.__name__: _fully_qualified_name(api_metrics.Meter)}, 426) 427 428# Traces proxy object setup. 429TelemetryManager.register( 430 typeid=_fully_qualified_name(api_trace.Span), 431 # Non-public methods (i.e. ones starting with a ``_``) are omitted by default. 432 # 433 # We need ``__enter__`` and ``__exit__`` so the ``Span`` can be used as a ``ContextManager``. 434 exposed=[name for name, _ in inspect.getmembers(api_trace.Span, predicate=inspect.isfunction)], 435 method_to_typeid={api_trace.Span.__enter__.__name__: _fully_qualified_name(api_trace.Span)}, 436) 437TelemetryManager.register( 438 typeid=_fully_qualified_name(api_trace.Tracer), 439 # Can't proxy ``Tracer.start_as_current_span`` since it returns a generator (not pickleable) 440 # and tries to use the process-local global context (in this case, the manager's server process). 441 # 442 # Instead, spans should be constructed by: 443 # 444 # 1. Calling ``opentelemetry.context.get_current()`` to get the process-local global context (pickleable). 445 # 2. Creating a new span with the process-local global context. 446 # 3. Calling ``opentelemetry.trace.use_span()`` with the span and ``end_on_exit=True``. 447 method_to_typeid={api_trace.Tracer.start_span.__name__: _fully_qualified_name(api_trace.Span)}, 448) 449TelemetryManager.register( 450 typeid=_fully_qualified_name(api_trace.TracerProvider), 451 method_to_typeid={api_trace.TracerProvider.get_tracer.__name__: _fully_qualified_name(api_trace.Tracer)}, 452) 453 454# Telemetry proxy object setup. 455# 456# This should be the only registered type with a ``callable``. 457# It's the only type we create directly with a ``TelemetryManager``. 458TelemetryManager.register( 459 typeid=Telemetry.__name__, 460 callable=_init, 461 method_to_typeid={ 462 Telemetry.meter_provider.__name__: _fully_qualified_name(api_metrics.MeterProvider), 463 Telemetry.meter.__name__: _fully_qualified_name(api_metrics.Meter), 464 Telemetry.gauge.__name__: _fully_qualified_name(api_metrics._Gauge), 465 Telemetry.counter.__name__: _fully_qualified_name(api_metrics.Counter), 466 Telemetry.tracer_provider.__name__: _fully_qualified_name(api_trace.TracerProvider), 467 Telemetry.tracer.__name__: _fully_qualified_name(api_trace.Tracer), 468 }, 469) 470 471 472# Map of init options as a sorted JSON string (since dictionaries can't be hashed) to telemetry proxy. 473_TELEMETRY_PROXIES: dict[str, Telemetry] = {} 474# To share :py:class:`Telemetry` proxy objects within a process (e.g. client, server). 475# 476# Forking isn't expected to happen while this is held (may lead to a deadlock). 477_TELEMETRY_PROXIES_LOCK = threading.Lock() 478 479
[docs] 480class TelemetryMode(enum.Enum): 481 """ 482 How to create a :py:class:`Telemetry` object. 483 """ 484 485 #: Keep everything local to the process (not fork-safe). 486 LOCAL = "local" 487 #: Start + connect to a telemetry IPC server. 488 SERVER = "server" 489 #: Connect to a telemetry IPC server. 490 CLIENT = "client"
491 492 493def _telemetry_manager_server_port(process: multiprocessing.process.BaseProcess) -> int: 494 """ 495 Get the default telemetry manager server port. 496 497 This is PID-based to: 498 499 * Avoid collisions between multiple independent Python interpreters running on the same machine. 500 * Let child processes deterministically find their parent's telemetry manager server. 501 502 :param process: Process whose PID is used to calculate the port. 503 """ 504 505 if process.pid is None: 506 raise ValueError("Can't calculate the default telemetry manager server port from an unstarted process!") 507 508 # Use the dynamic/private/ephemeral port range. 509 # 510 # https://www.rfc-editor.org/rfc/rfc6335.html#section-6 511 # https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Dynamic,_private_or_ephemeral_ports 512 # 513 # Modulo the parent/child process PID by the port range length, then add the initial offset. 514 return (2**15 + 2**14) + (process.pid % ((2**16) - (2**15 + 2**14))) 515 516
[docs] 517def init( 518 mode: TelemetryMode = TelemetryMode.SERVER if multiprocessing.parent_process() is None else TelemetryMode.CLIENT, 519 address: Optional[Union[str, tuple[str, int]]] = None, 520) -> Telemetry: 521 """ 522 Create or return an existing :py:class:`Telemetry` instance or :py:class:`Telemetry` proxy object. 523 524 :param mode: How to create a :py:class:`Telemetry` object. Defaults to :py:const:`TelemetryMode.SERVER`/:py:const:`TelemetryMode.CLIENT` if the current process is a main/child Python process. 525 :param address: Telemetry IPC server address. Passed directly to a :py:class:`multiprocessing.managers.BaseManager`. Ignored if the mode is :py:const:`TelemetryMode.LOCAL`. 526 :return: A telemetry instance. 527 """ 528 529 if mode == TelemetryMode.LOCAL: 530 return _init() 531 elif mode == TelemetryMode.SERVER or mode == TelemetryMode.CLIENT: 532 global _TELEMETRY_PROXIES 533 global _TELEMETRY_PROXIES_LOCK 534 535 if address is None: 536 process = multiprocessing.current_process() 537 if mode == TelemetryMode.CLIENT: 538 # If this is a child process, try to use the parent process's default telemetry manager server port instead. 539 # 540 # This won't work with 2+ high Python process trees, but such setups seem uncommon. 541 # 542 # TODO: Consider a service discovery propagation mechanism so grandchild processes can discover grandparent process IDs. 543 process = multiprocessing.parent_process() or process 544 545 address = ("127.0.0.1", _telemetry_manager_server_port(process=process)) 546 547 init_options = {"mode": mode.value, "address": address} 548 init_options_json = json.dumps(init_options, sort_keys=True) 549 550 with _TELEMETRY_PROXIES_LOCK: 551 if init_options_json in _TELEMETRY_PROXIES: 552 return _TELEMETRY_PROXIES[init_options_json] 553 else: 554 telemetry_manager = TelemetryManager( 555 address=address, 556 # Use spawn instead of the platform-specific default (may be fork) to avoid aforementioned issues with fork. 557 ctx=multiprocessing.get_context(method="spawn"), 558 ) 559 560 if mode == TelemetryMode.SERVER: 561 logger.debug(f"Creating telemetry manager server at {telemetry_manager.address}.") 562 try: 563 telemetry_manager.start() 564 atexit.register(telemetry_manager.shutdown) 565 logger.debug(f"Started telemetry manager server at {telemetry_manager.address}.") 566 except Exception as e: 567 logger.error( 568 f"Failed to create telemetry manager server at {telemetry_manager.address}!", exc_info=True 569 ) 570 raise e 571 572 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.") 573 try: 574 telemetry_manager.connect() 575 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.") 576 except Exception as e: 577 logger.error( 578 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!", exc_info=True 579 ) 580 raise e 581 582 return _TELEMETRY_PROXIES.setdefault(init_options_json, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue] 583 else: 584 raise ValueError(f"Unsupported telemetry mode: {mode}")