Source code for multistorageclient.telemetry

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import atexit
 17import enum
 18import inspect
 19import json
 20import logging
 21import multiprocessing
 22import multiprocessing.managers
 23import threading
 24from typing import Any, Optional, Union
 25
 26import opentelemetry.metrics as api_metrics
 27import opentelemetry.trace as api_trace
 28
 29from .. import utils
 30
 31# MSC telemetry prefers publishing raw samples when possible to support arbitrary post-hoc aggregations.
 32#
 33# Some setups, however, may need resampling to reduce sample volume. The resampling methods we use
 34# sacrifice temporal resolution to preserve other information. Which method is used depends on if
 35# the expected post-hoc aggregate function is decomposable:
 36#
 37# * Decomposable aggregate functions (e.g. count, sum, min, max).
 38#   * Use client-side aggregation.
 39#     * E.g. preserve request + response counts.
 40# * Non-decomposable aggregate functions (e.g. average, percentile).
 41#   * Use decimation by an integer factor or last value.
 42#     * E.g. preserve the shape of the latency distribution (unlike tail sampling).
 43
 44_METRICS_EXPORTER_MAPPING = {
 45    "console": "opentelemetry.sdk.metrics.export.ConsoleMetricExporter",
 46    "otlp": "opentelemetry.exporter.otlp.proto.http.metric_exporter.OTLPMetricExporter",
 47    # "Private" until it's decided whether this will be official.
 48    "_otlp_msal": "multistorageclient.telemetry.metrics.exporters.otlp_msal._OTLPMSALMetricExporter",
 49}
 50
 51_TRACE_EXPORTER_MAPPING = {
 52    "console": "opentelemetry.sdk.trace.export.ConsoleSpanExporter",
 53    "otlp": "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter",
 54}
 55
 56logger = logging.Logger(__name__)
 57
 58
[docs] 59class Telemetry: 60 """ 61 Provides telemetry resources. 62 63 Instances shouldn't be copied between processes. Not fork-safe or pickleable. 64 65 Instances can be shared between processes by registering with a :py:class:`multiprocessing.managers.BaseManager` and using proxy objects. 66 """ 67 68 # Metrics are named `multistorageclient.{property}(.{aggregation})?`. 69 # 70 # For example: 71 # 72 # - multistorageclient.data_size 73 # - Gauge for data size per individual operation. 74 # - For distributions (e.g. post-hoc histograms + heatmaps). 75 # - multistorageclient.data_size.sum 76 # - Counter (sum) for data size across all operations. 77 # - For aggregates (e.g. post-hoc data rate calculations). 78 79 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics 80 class GaugeName(enum.Enum): 81 LATENCY = "multistorageclient.latency" 82 DATA_SIZE = "multistorageclient.data_size" 83 DATA_RATE = "multistorageclient.data_rate" 84 85 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units 86 _GAUGE_UNIT_MAPPING: dict[GaugeName, str] = { 87 # Seconds. 88 GaugeName.LATENCY: "s", 89 # Bytes. 90 GaugeName.DATA_SIZE: "By", 91 # Bytes/second. 92 GaugeName.DATA_RATE: "By/s", 93 } 94 95 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics 96 class CounterName(enum.Enum): 97 REQUEST_SUM = "multistorageclient.request.sum" 98 RESPONSE_SUM = "multistorageclient.response.sum" 99 DATA_SIZE_SUM = "multistorageclient.data_size.sum" 100 101 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units 102 _COUNTER_UNIT_MAPPING: dict[CounterName, str] = { 103 # Unitless. 104 CounterName.REQUEST_SUM: "{request}", 105 # Unitless. 106 CounterName.RESPONSE_SUM: "{response}", 107 # Bytes. 108 CounterName.DATA_SIZE_SUM: "By", 109 } 110 111 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter provider. 112 _meter_provider_cache: dict[str, api_metrics.MeterProvider] 113 _meter_provider_cache_lock: threading.Lock 114 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter. 115 _meter_cache: dict[str, api_metrics.Meter] 116 _meter_cache_lock: threading.Lock 117 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to gauge name to gauge. 118 _gauge_cache: dict[str, dict[GaugeName, api_metrics._Gauge]] 119 _gauge_cache_lock: threading.Lock 120 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to counter name to counter. 121 _counter_cache: dict[str, dict[CounterName, api_metrics.Counter]] 122 _counter_cache_lock: threading.Lock 123 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer provider. 124 _tracer_provider_cache: dict[str, api_trace.TracerProvider] 125 _tracer_provider_cache_lock: threading.Lock 126 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer. 127 _tracer_cache: dict[str, api_trace.Tracer] 128 _tracer_cache_lock: threading.Lock 129 130 def __init__(self): 131 self._meter_provider_cache = {} 132 self._meter_provider_cache_lock = threading.Lock() 133 self._meter_cache = {} 134 self._meter_cache_lock = threading.Lock() 135 self._gauge_cache = {} 136 self._gauge_cache_lock = threading.Lock() 137 self._counter_cache = {} 138 self._counter_cache_lock = threading.Lock() 139 self._tracer_provider_cache = {} 140 self._tracer_provider_cache_lock = threading.Lock() 141 self._tracer_cache = {} 142 self._tracer_cache_lock = threading.Lock() 143 144 def meter_provider(self, config: dict[str, Any]) -> Optional[api_metrics.MeterProvider]: 145 """ 146 Create or return an existing :py:class:`api_metrics.MeterProvider` for a config. 147 148 :param config: ``.opentelemetry.metrics`` config dict. 149 :return: A :py:class:`api_metrics.MeterProvider` or ``None`` if no valid exporter is configured. 150 """ 151 config_json = json.dumps(config, sort_keys=True) 152 with self._meter_provider_cache_lock: 153 if config_json in self._meter_provider_cache: 154 return self._meter_provider_cache[config_json] 155 else: 156 if "exporter" in config: 157 try: 158 import opentelemetry.sdk.metrics as sdk_metrics 159 import opentelemetry.sdk.metrics.export as sdk_metrics_export 160 161 from .metrics.readers.diperiodic_exporting import DiperiodicExportingMetricReader 162 163 exporter_type: str = config["exporter"]["type"] 164 exporter_fully_qualified_name = _METRICS_EXPORTER_MAPPING.get(exporter_type, exporter_type) 165 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1) 166 cls = utils.import_class(exporter_class_name, exporter_module_name) 167 exporter_options = config["exporter"].get("options", {}) 168 exporter: sdk_metrics_export.MetricExporter = cls(**exporter_options) 169 170 reader_options = config.get("reader", {}).get("options", {}) 171 reader: sdk_metrics_export.MetricReader = DiperiodicExportingMetricReader( 172 **reader_options, exporter=exporter 173 ) 174 175 return self._meter_provider_cache.setdefault( 176 config_json, sdk_metrics.MeterProvider(metric_readers=[reader]) 177 ) 178 except (AttributeError, ImportError): 179 logger.error( 180 "Failed to import OpenTelemetry Python SDK or exporter! Disabling metrics.", exc_info=True 181 ) 182 return None 183 else: 184 # Don't return a no-op meter provider to avoid unnecessary overhead. 185 logger.error("No exporter configured! Disabling metrics.") 186 return None 187 188 def meter(self, config: dict[str, Any]) -> Optional[api_metrics.Meter]: 189 """ 190 Create or return an existing :py:class:`api_metrics.Meter` for a config. 191 192 :param config: ``.opentelemetry.metrics`` config dict. 193 :return: A :py:class:`api_metrics.Meter` or ``None`` if no valid exporter is configured. 194 """ 195 config_json = json.dumps(config, sort_keys=True) 196 with self._meter_cache_lock: 197 if config_json in self._meter_cache: 198 return self._meter_cache[config_json] 199 else: 200 meter_provider = self.meter_provider(config=config) 201 if meter_provider is None: 202 return None 203 else: 204 return self._meter_cache.setdefault( 205 config_json, meter_provider.get_meter(name="multistorageclient") 206 ) 207 208 def gauge(self, config: dict[str, Any], name: GaugeName) -> Optional[api_metrics._Gauge]: 209 """ 210 Create or return an existing :py:class:`api_metrics.Gauge` for a config and gauge name. 211 212 :param config: ``.opentelemetry.metrics`` config dict. 213 :return: A :py:class:`api_metrics.Gauge` or ``None`` if no valid exporter is configured. 214 """ 215 config_json = json.dumps(config, sort_keys=True) 216 with self._gauge_cache_lock: 217 if config_json in self._gauge_cache and name in self._gauge_cache[config_json]: 218 return self._gauge_cache[config_json][name] 219 else: 220 meter = self.meter(config=config) 221 if meter is None: 222 return None 223 else: 224 return self._gauge_cache.setdefault(config_json, {}).setdefault( 225 name, 226 meter.create_gauge(name=name.value, unit=Telemetry._GAUGE_UNIT_MAPPING.get(name, "")), 227 ) 228 229 def counter(self, config: dict[str, Any], name: CounterName) -> Optional[api_metrics.Counter]: 230 """ 231 Create or return an existing :py:class:`api_metrics.Counter` for a config and counter name. 232 233 :param config: ``.opentelemetry.metrics`` config dict. 234 :return: A :py:class:`api_metrics.Counter` or ``None`` if no valid exporter is configured. 235 """ 236 config_json = json.dumps(config, sort_keys=True) 237 with self._counter_cache_lock: 238 if config_json in self._counter_cache and name in self._counter_cache[config_json]: 239 return self._counter_cache[config_json][name] 240 else: 241 meter = self.meter(config=config) 242 if meter is None: 243 return None 244 else: 245 return self._counter_cache.setdefault(config_json, {}).setdefault( 246 name, 247 meter.create_counter(name=name.value, unit=Telemetry._COUNTER_UNIT_MAPPING.get(name, "")), 248 ) 249 250 def tracer_provider(self, config: dict[str, Any]) -> Optional[api_trace.TracerProvider]: 251 """ 252 Create or return an existing :py:class:`api_trace.TracerProvider` for a config. 253 254 :param config: ``.opentelemetry.traces`` config dict. 255 :return: A :py:class:`api_trace.TracerProvider` or ``None`` if no valid exporter is configured. 256 """ 257 config_json = json.dumps(config, sort_keys=True) 258 with self._tracer_provider_cache_lock: 259 if config_json in self._tracer_provider_cache: 260 return self._tracer_provider_cache[config_json] 261 else: 262 if "exporter" in config: 263 try: 264 import opentelemetry.sdk.trace as sdk_trace 265 import opentelemetry.sdk.trace.export as sdk_trace_export 266 import opentelemetry.sdk.trace.sampling as sdk_trace_sampling 267 268 exporter_type: str = config["exporter"]["type"] 269 exporter_fully_qualified_name = _TRACE_EXPORTER_MAPPING.get(exporter_type, exporter_type) 270 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1) 271 cls = utils.import_class(exporter_class_name, exporter_module_name) 272 exporter_options = config["exporter"].get("options", {}) 273 exporter: sdk_trace_export.SpanExporter = cls(**exporter_options) 274 275 processor: sdk_trace.SpanProcessor = sdk_trace.SynchronousMultiSpanProcessor() 276 processor.add_span_processor(sdk_trace_export.BatchSpanProcessor(span_exporter=exporter)) 277 278 # TODO: Add sampler to configuration schema. 279 sampler: sdk_trace_sampling.Sampler = sdk_trace_sampling.ALWAYS_ON 280 281 return self._tracer_provider_cache.setdefault( 282 config_json, 283 sdk_trace.TracerProvider(active_span_processor=processor, sampler=sampler), 284 ) 285 except (AttributeError, ImportError): 286 logger.error( 287 "Failed to import OpenTelemetry Python SDK or exporter! Disabling traces.", exc_info=True 288 ) 289 return None 290 else: 291 logger.error("No exporter configured! Disabling traces.") 292 return None 293 294 def tracer(self, config: dict[str, Any]) -> Optional[api_trace.Tracer]: 295 """ 296 Create or return an existing :py:class:`api_trace.Tracer` for a config. 297 298 :param config: ``.opentelemetry.traces`` config dict. 299 :return: A :py:class:`api_trace.Tracer` or ``None`` if no valid exporter is configured. 300 """ 301 config_json = json.dumps(config, sort_keys=True) 302 with self._tracer_cache_lock: 303 if config_json in self._tracer_cache: 304 return self._tracer_cache[config_json] 305 else: 306 tracer_provider = self.tracer_provider(config=config) 307 if tracer_provider is None: 308 return None 309 else: 310 return self._tracer_cache.setdefault( 311 config_json, tracer_provider.get_tracer(instrumenting_module_name="multistorageclient") 312 )
313 314 315# To share a single :py:class:`Telemetry` within a process (e.g. local, manager). 316# 317# A manager's server processes shouldn't be forked, so this should be safe. 318_TELEMETRY: Optional[Telemetry] = None 319_TELEMETRY_LOCK = threading.Lock() 320 321 322def _init() -> Telemetry: 323 """ 324 Create or return an existing :py:class:`Telemetry`. 325 326 :return: A telemetry instance. 327 """ 328 global _TELEMETRY 329 global _TELEMETRY_LOCK 330 331 with _TELEMETRY_LOCK: 332 if _TELEMETRY is None: 333 _TELEMETRY = Telemetry() 334 return _TELEMETRY 335 336
[docs] 337class TelemetryManager(multiprocessing.managers.BaseManager): 338 """ 339 A :py:class:`multiprocessing.managers.BaseManager` for telemetry resources. 340 341 The OpenTelemetry Python SDK isn't fork-safe since telemetry sample buffers can be duplicated. 342 343 In addition, Python ≤3.12 doesn't call exit handlers for forked processes. 344 This causes the OpenTelemetry Python SDK to not flush telemetry before exiting. 345 346 * https://github.com/open-telemetry/opentelemetry-python/issues/4215 347 * https://github.com/open-telemetry/opentelemetry-python/issues/3307 348 349 Forking is multiprocessing's default start method for non-macOS POSIX systems until Python 3.14. 350 351 * https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods 352 353 To fully support multiprocessing, resampling + publishing is handled by 354 a single process that's (ideally) a child of (i.e. directly under) the main process. This: 355 356 * Relieves other processes of this work. 357 358 * Avoids issues with duplicate samples when forking and unpublished samples when exiting forks. 359 360 * Allows cross-process resampling. 361 * Reuses a single connection pool to telemetry backends. 362 363 The downside is it essentially re-introduces global interpreter lock (GIL) with 364 additional IPC overhead. Telemetry operations, however, should be lightweight so 365 this isn't expected to be a problem. Remote data store latency should still be 366 the primary throughput limiter for storage clients. 367 368 :py:class:`multiprocessing.managers.BaseManager` is used for this since it creates 369 a separate server process for shared objects. 370 371 Telemetry resources are provided as 372 `proxy objects <https://docs.python.org/3/library/multiprocessing.html#proxy-objects>`_ 373 for location transparency. 374 375 The documentation isn't particularly detailed, but others have written comprehensively on this: 376 377 * https://zpz.github.io/blog/python-mp-manager-1 378 * https://zpz.github.io/blog/python-mp-manager-2 379 * https://zpz.github.io/blog/python-mp-manager-3 380 381 By specification, metric and tracer providers must call shutdown on any 382 underlying metric readers + span processors + exporters. 383 384 * https://opentelemetry.io/docs/specs/otel/metrics/sdk#shutdown 385 * https://opentelemetry.io/docs/specs/otel/trace/sdk#shutdown 386 387 In the OpenTelemetry Python SDK, provider shutdown is called automatically 388 by exit handlers (when they work at least). Consequently, clients should: 389 390 * Only receive proxy objects. 391 392 * Enables metric reader + span processor + exporter re-use across processes. 393 394 * Never call shutdown on the proxy objects. 395 396 * The shutdown exit handler is registered on the manager's server process. 397 * ⚠️ We expect a finite number of providers (i.e. no dynamic configs) so we don't leak them. 398 """ 399 400 pass
401 402 403def _fully_qualified_name(c: type[Any]) -> str: 404 """ 405 Return the fully qualified name for a class (e.g. ``module.Class``). 406 407 For :py:class:`multiprocessing.Manager` type IDs. 408 """ 409 return ".".join([c.__module__, c.__qualname__]) 410 411 412# Metrics proxy object setup. 413TelemetryManager.register(typeid=_fully_qualified_name(api_metrics._Gauge)) 414TelemetryManager.register(typeid=_fully_qualified_name(api_metrics.Counter)) 415TelemetryManager.register( 416 typeid=_fully_qualified_name(api_metrics.Meter), 417 method_to_typeid={ 418 api_metrics.Meter.create_gauge.__name__: _fully_qualified_name(api_metrics._Gauge), 419 api_metrics.Meter.create_counter.__name__: _fully_qualified_name(api_metrics.Counter), 420 }, 421) 422TelemetryManager.register( 423 typeid=_fully_qualified_name(api_metrics.MeterProvider), 424 method_to_typeid={api_metrics.MeterProvider.get_meter.__name__: _fully_qualified_name(api_metrics.Meter)}, 425) 426 427# Traces proxy object setup. 428TelemetryManager.register( 429 typeid=_fully_qualified_name(api_trace.Span), 430 # Non-public methods (i.e. ones starting with a ``_``) are omitted by default. 431 # 432 # We need ``__enter__`` and ``__exit__`` so the ``Span`` can be used as a ``ContextManager``. 433 exposed=[name for name, _ in inspect.getmembers(api_trace.Span, predicate=inspect.isfunction)], 434 method_to_typeid={api_trace.Span.__enter__.__name__: _fully_qualified_name(api_trace.Span)}, 435) 436TelemetryManager.register( 437 typeid=_fully_qualified_name(api_trace.Tracer), 438 # Can't proxy ``Tracer.start_as_current_span`` since it returns a generator (not pickleable) 439 # and tries to use the process-local global context (in this case, the manager's server process). 440 # 441 # Instead, spans should be constructed by: 442 # 443 # 1. Calling ``opentelemetry.context.get_current()`` to get the process-local global context (pickleable). 444 # 2. Creating a new span with the process-local global context. 445 # 3. Calling ``opentelemetry.trace.use_span()`` with the span and ``end_on_exit=True``. 446 method_to_typeid={api_trace.Tracer.start_span.__name__: _fully_qualified_name(api_trace.Span)}, 447) 448TelemetryManager.register( 449 typeid=_fully_qualified_name(api_trace.TracerProvider), 450 method_to_typeid={api_trace.TracerProvider.get_tracer.__name__: _fully_qualified_name(api_trace.Tracer)}, 451) 452 453# Telemetry proxy object setup. 454# 455# This should be the only registered type with a ``callable``. 456# It's the only type we create directly with a ``TelemetryManager``. 457TelemetryManager.register( 458 typeid=Telemetry.__name__, 459 callable=_init, 460 method_to_typeid={ 461 Telemetry.meter_provider.__name__: _fully_qualified_name(api_metrics.MeterProvider), 462 Telemetry.meter.__name__: _fully_qualified_name(api_metrics.Meter), 463 Telemetry.gauge.__name__: _fully_qualified_name(api_metrics._Gauge), 464 Telemetry.counter.__name__: _fully_qualified_name(api_metrics.Counter), 465 Telemetry.tracer_provider.__name__: _fully_qualified_name(api_trace.TracerProvider), 466 Telemetry.tracer.__name__: _fully_qualified_name(api_trace.Tracer), 467 }, 468) 469 470 471# Map of init options as a sorted JSON string (since dictionaries can't be hashed) to telemetry proxy. 472_TELEMETRY_PROXIES: dict[str, Telemetry] = {} 473# To share :py:class:`Telemetry` proxy objects within a process (e.g. client, server). 474# 475# Forking isn't expected to happen while this is held (may lead to a deadlock). 476_TELEMETRY_PROXIES_LOCK = threading.Lock() 477 478
[docs] 479class TelemetryMode(enum.Enum): 480 """ 481 How to create a :py:class:`Telemetry` object. 482 """ 483 484 #: Keep everything local to the process (not fork-safe). 485 LOCAL = "local" 486 #: Start + connect to a telemetry IPC server. 487 SERVER = "server" 488 #: Connect to a telemetry IPC server. 489 CLIENT = "client"
490 491 492def _telemetry_manager_server_port() -> int: 493 """ 494 Get the default telemetry manager server port. 495 496 This is PID-based to: 497 498 * Avoid collisions between multiple independent Python interpreters running on the same machine. 499 * Let child processes deterministically find their parent's telemetry manager server. 500 """ 501 502 # This won't work with 2+ high Python processes trees, but such setups are uncommon. 503 process = multiprocessing.parent_process() or multiprocessing.current_process() 504 if process.pid is None: 505 raise ValueError( 506 "Can't calculate the default telemetry manager server port from an unstarted parent or current process!" 507 ) 508 509 # Use the dynamic/private/ephemeral port range. 510 # 511 # https://www.rfc-editor.org/rfc/rfc6335.html#section-6 512 # https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Dynamic,_private_or_ephemeral_ports 513 # 514 # Modulo the parent/child process PID by the port range length, then add the initial offset. 515 return (2**15 + 2**14) + (process.pid % ((2**16) - (2**15 + 2**14))) 516 517
[docs] 518def init( 519 mode: TelemetryMode = TelemetryMode.SERVER if multiprocessing.parent_process() is None else TelemetryMode.CLIENT, 520 address: Optional[Union[str, tuple[str, int]]] = None, 521) -> Telemetry: 522 """ 523 Create or return an existing :py:class:`Telemetry` instance or :py:class:`Telemetry` proxy object. 524 525 :param mode: How to create a :py:class:`Telemetry` object. 526 :param address: Telemetry IPC server address. Passed directly to a :py:class:`multiprocessing.managers.BaseManager`. Ignored if the mode is :py:const:`TelemetryMode.LOCAL`. 527 :return: A telemetry instance. 528 """ 529 530 if mode == TelemetryMode.LOCAL: 531 return _init() 532 elif mode == TelemetryMode.SERVER or mode == TelemetryMode.CLIENT: 533 global _TELEMETRY_PROXIES 534 global _TELEMETRY_PROXIES_LOCK 535 536 if address is None: 537 address = ("127.0.0.1", _telemetry_manager_server_port()) 538 539 init_options = {"mode": mode.value, "address": address} 540 init_options_json = json.dumps(init_options, sort_keys=True) 541 542 with _TELEMETRY_PROXIES_LOCK: 543 if init_options_json in _TELEMETRY_PROXIES: 544 return _TELEMETRY_PROXIES[init_options_json] 545 else: 546 telemetry_manager = TelemetryManager( 547 address=address, 548 authkey="multistorageclient-telemetry".encode(), 549 # Use spawn instead of the platform-specific default (may be fork) to avoid aforementioned issues with fork. 550 ctx=multiprocessing.get_context(method="spawn"), 551 ) 552 553 if mode == TelemetryMode.SERVER: 554 logger.debug(f"Creating telemetry manager server at {telemetry_manager.address}.") 555 try: 556 telemetry_manager.start() 557 atexit.register(telemetry_manager.shutdown) 558 logger.debug(f"Started telemetry manager server at {telemetry_manager.address}.") 559 except Exception as e: 560 logger.error( 561 f"Failed to create telemetry manager server at {telemetry_manager.address}!", exc_info=True 562 ) 563 raise e 564 565 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.") 566 try: 567 telemetry_manager.connect() 568 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.") 569 except Exception as e: 570 logger.error( 571 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!", exc_info=True 572 ) 573 raise e 574 575 return _TELEMETRY_PROXIES.setdefault(init_options_json, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue] 576 else: 577 raise ValueError(f"Unsupported telemetry mode: {mode}")