1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import enum
18import inspect
19import json
20import logging
21import multiprocessing
22import multiprocessing.managers
23import multiprocessing.process
24import threading
25from typing import Any, Optional, Union
26
27import opentelemetry.metrics as api_metrics
28import opentelemetry.trace as api_trace
29
30from .. import utils
31
32# MSC telemetry prefers publishing raw samples when possible to support arbitrary post-hoc aggregations.
33#
34# Some setups, however, may need resampling to reduce sample volume. The resampling methods we use
35# sacrifice temporal resolution to preserve other information. Which method is used depends on if
36# the expected post-hoc aggregate function is decomposable:
37#
38# * Decomposable aggregate functions (e.g. count, sum, min, max).
39# * Use client-side aggregation.
40# * E.g. preserve request + response counts.
41# * Non-decomposable aggregate functions (e.g. average, percentile).
42# * Use decimation by an integer factor or last value.
43# * E.g. preserve the shape of the latency distribution (unlike tail sampling).
44
45_METRICS_EXPORTER_MAPPING = {
46 "console": "opentelemetry.sdk.metrics.export.ConsoleMetricExporter",
47 "otlp": "opentelemetry.exporter.otlp.proto.http.metric_exporter.OTLPMetricExporter",
48 # "Private" until it's decided whether this will be official.
49 "_otlp_msal": "multistorageclient.telemetry.metrics.exporters.otlp_msal._OTLPMSALMetricExporter",
50}
51
52_TRACE_EXPORTER_MAPPING = {
53 "console": "opentelemetry.sdk.trace.export.ConsoleSpanExporter",
54 "otlp": "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter",
55}
56
57logger = logging.getLogger(__name__)
58
59
[docs]
60class Telemetry:
61 """
62 Provides telemetry resources.
63
64 Instances shouldn't be copied between processes. Not fork-safe or pickleable.
65
66 Instances can be shared between processes by registering with a :py:class:`multiprocessing.managers.BaseManager` and using proxy objects.
67 """
68
69 # Metrics are named `multistorageclient.{property}(.{aggregation})?`.
70 #
71 # For example:
72 #
73 # - multistorageclient.data_size
74 # - Gauge for data size per individual operation.
75 # - For distributions (e.g. post-hoc histograms + heatmaps).
76 # - multistorageclient.data_size.sum
77 # - Counter (sum) for data size across all operations.
78 # - For aggregates (e.g. post-hoc data rate calculations).
79
80 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics
81 class GaugeName(enum.Enum):
82 LATENCY = "multistorageclient.latency"
83 DATA_SIZE = "multistorageclient.data_size"
84 DATA_RATE = "multistorageclient.data_rate"
85
86 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units
87 _GAUGE_UNIT_MAPPING: dict[GaugeName, str] = {
88 # Seconds.
89 GaugeName.LATENCY: "s",
90 # Bytes.
91 GaugeName.DATA_SIZE: "By",
92 # Bytes/second.
93 GaugeName.DATA_RATE: "By/s",
94 }
95
96 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics
97 class CounterName(enum.Enum):
98 REQUEST_SUM = "multistorageclient.request.sum"
99 RESPONSE_SUM = "multistorageclient.response.sum"
100 DATA_SIZE_SUM = "multistorageclient.data_size.sum"
101
102 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units
103 _COUNTER_UNIT_MAPPING: dict[CounterName, str] = {
104 # Unitless.
105 CounterName.REQUEST_SUM: "{request}",
106 # Unitless.
107 CounterName.RESPONSE_SUM: "{response}",
108 # Bytes.
109 CounterName.DATA_SIZE_SUM: "By",
110 }
111
112 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter provider.
113 _meter_provider_cache: dict[str, api_metrics.MeterProvider]
114 _meter_provider_cache_lock: threading.Lock
115 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter.
116 _meter_cache: dict[str, api_metrics.Meter]
117 _meter_cache_lock: threading.Lock
118 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to gauge name to gauge.
119 _gauge_cache: dict[str, dict[GaugeName, api_metrics._Gauge]]
120 _gauge_cache_lock: threading.Lock
121 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to counter name to counter.
122 _counter_cache: dict[str, dict[CounterName, api_metrics.Counter]]
123 _counter_cache_lock: threading.Lock
124 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer provider.
125 _tracer_provider_cache: dict[str, api_trace.TracerProvider]
126 _tracer_provider_cache_lock: threading.Lock
127 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer.
128 _tracer_cache: dict[str, api_trace.Tracer]
129 _tracer_cache_lock: threading.Lock
130
131 def __init__(self):
132 self._meter_provider_cache = {}
133 self._meter_provider_cache_lock = threading.Lock()
134 self._meter_cache = {}
135 self._meter_cache_lock = threading.Lock()
136 self._gauge_cache = {}
137 self._gauge_cache_lock = threading.Lock()
138 self._counter_cache = {}
139 self._counter_cache_lock = threading.Lock()
140 self._tracer_provider_cache = {}
141 self._tracer_provider_cache_lock = threading.Lock()
142 self._tracer_cache = {}
143 self._tracer_cache_lock = threading.Lock()
144
145 def meter_provider(self, config: dict[str, Any]) -> Optional[api_metrics.MeterProvider]:
146 """
147 Create or return an existing :py:class:`api_metrics.MeterProvider` for a config.
148
149 :param config: ``.opentelemetry.metrics`` config dict.
150 :return: A :py:class:`api_metrics.MeterProvider` or ``None`` if no valid exporter is configured.
151 """
152 config_json = json.dumps(config, sort_keys=True)
153 with self._meter_provider_cache_lock:
154 if config_json in self._meter_provider_cache:
155 return self._meter_provider_cache[config_json]
156 else:
157 if "exporter" in config:
158 try:
159 import opentelemetry.sdk.metrics as sdk_metrics
160 import opentelemetry.sdk.metrics.export as sdk_metrics_export
161
162 from .metrics.readers.diperiodic_exporting import DiperiodicExportingMetricReader
163
164 exporter_type: str = config["exporter"]["type"]
165 exporter_fully_qualified_name = _METRICS_EXPORTER_MAPPING.get(exporter_type, exporter_type)
166 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1)
167 cls = utils.import_class(exporter_class_name, exporter_module_name)
168 exporter_options = config["exporter"].get("options", {})
169 exporter: sdk_metrics_export.MetricExporter = cls(**exporter_options)
170
171 reader_options = config.get("reader", {}).get("options", {})
172 reader: sdk_metrics_export.MetricReader = DiperiodicExportingMetricReader(
173 **reader_options, exporter=exporter
174 )
175
176 return self._meter_provider_cache.setdefault(
177 config_json, sdk_metrics.MeterProvider(metric_readers=[reader])
178 )
179 except (AttributeError, ImportError):
180 logger.error(
181 "Failed to import OpenTelemetry Python SDK or exporter! Disabling metrics.", exc_info=True
182 )
183 return None
184 else:
185 # Don't return a no-op meter provider to avoid unnecessary overhead.
186 logger.error("No exporter configured! Disabling metrics.")
187 return None
188
189 def meter(self, config: dict[str, Any]) -> Optional[api_metrics.Meter]:
190 """
191 Create or return an existing :py:class:`api_metrics.Meter` for a config.
192
193 :param config: ``.opentelemetry.metrics`` config dict.
194 :return: A :py:class:`api_metrics.Meter` or ``None`` if no valid exporter is configured.
195 """
196 config_json = json.dumps(config, sort_keys=True)
197 with self._meter_cache_lock:
198 if config_json in self._meter_cache:
199 return self._meter_cache[config_json]
200 else:
201 meter_provider = self.meter_provider(config=config)
202 if meter_provider is None:
203 return None
204 else:
205 return self._meter_cache.setdefault(
206 config_json, meter_provider.get_meter(name="multistorageclient")
207 )
208
209 def gauge(self, config: dict[str, Any], name: GaugeName) -> Optional[api_metrics._Gauge]:
210 """
211 Create or return an existing :py:class:`api_metrics.Gauge` for a config and gauge name.
212
213 :param config: ``.opentelemetry.metrics`` config dict.
214 :return: A :py:class:`api_metrics.Gauge` or ``None`` if no valid exporter is configured.
215 """
216 config_json = json.dumps(config, sort_keys=True)
217 with self._gauge_cache_lock:
218 if config_json in self._gauge_cache and name in self._gauge_cache[config_json]:
219 return self._gauge_cache[config_json][name]
220 else:
221 meter = self.meter(config=config)
222 if meter is None:
223 return None
224 else:
225 return self._gauge_cache.setdefault(config_json, {}).setdefault(
226 name,
227 meter.create_gauge(name=name.value, unit=Telemetry._GAUGE_UNIT_MAPPING.get(name, "")),
228 )
229
230 def counter(self, config: dict[str, Any], name: CounterName) -> Optional[api_metrics.Counter]:
231 """
232 Create or return an existing :py:class:`api_metrics.Counter` for a config and counter name.
233
234 :param config: ``.opentelemetry.metrics`` config dict.
235 :return: A :py:class:`api_metrics.Counter` or ``None`` if no valid exporter is configured.
236 """
237 config_json = json.dumps(config, sort_keys=True)
238 with self._counter_cache_lock:
239 if config_json in self._counter_cache and name in self._counter_cache[config_json]:
240 return self._counter_cache[config_json][name]
241 else:
242 meter = self.meter(config=config)
243 if meter is None:
244 return None
245 else:
246 return self._counter_cache.setdefault(config_json, {}).setdefault(
247 name,
248 meter.create_counter(name=name.value, unit=Telemetry._COUNTER_UNIT_MAPPING.get(name, "")),
249 )
250
251 def tracer_provider(self, config: dict[str, Any]) -> Optional[api_trace.TracerProvider]:
252 """
253 Create or return an existing :py:class:`api_trace.TracerProvider` for a config.
254
255 :param config: ``.opentelemetry.traces`` config dict.
256 :return: A :py:class:`api_trace.TracerProvider` or ``None`` if no valid exporter is configured.
257 """
258 config_json = json.dumps(config, sort_keys=True)
259 with self._tracer_provider_cache_lock:
260 if config_json in self._tracer_provider_cache:
261 return self._tracer_provider_cache[config_json]
262 else:
263 if "exporter" in config:
264 try:
265 import opentelemetry.sdk.trace as sdk_trace
266 import opentelemetry.sdk.trace.export as sdk_trace_export
267 import opentelemetry.sdk.trace.sampling as sdk_trace_sampling
268
269 exporter_type: str = config["exporter"]["type"]
270 exporter_fully_qualified_name = _TRACE_EXPORTER_MAPPING.get(exporter_type, exporter_type)
271 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1)
272 cls = utils.import_class(exporter_class_name, exporter_module_name)
273 exporter_options = config["exporter"].get("options", {})
274 exporter: sdk_trace_export.SpanExporter = cls(**exporter_options)
275
276 processor: sdk_trace.SpanProcessor = sdk_trace.SynchronousMultiSpanProcessor()
277 processor.add_span_processor(sdk_trace_export.BatchSpanProcessor(span_exporter=exporter))
278
279 # TODO: Add sampler to configuration schema.
280 sampler: sdk_trace_sampling.Sampler = sdk_trace_sampling.ALWAYS_ON
281
282 return self._tracer_provider_cache.setdefault(
283 config_json,
284 sdk_trace.TracerProvider(active_span_processor=processor, sampler=sampler),
285 )
286 except (AttributeError, ImportError):
287 logger.error(
288 "Failed to import OpenTelemetry Python SDK or exporter! Disabling traces.", exc_info=True
289 )
290 return None
291 else:
292 logger.error("No exporter configured! Disabling traces.")
293 return None
294
295 def tracer(self, config: dict[str, Any]) -> Optional[api_trace.Tracer]:
296 """
297 Create or return an existing :py:class:`api_trace.Tracer` for a config.
298
299 :param config: ``.opentelemetry.traces`` config dict.
300 :return: A :py:class:`api_trace.Tracer` or ``None`` if no valid exporter is configured.
301 """
302 config_json = json.dumps(config, sort_keys=True)
303 with self._tracer_cache_lock:
304 if config_json in self._tracer_cache:
305 return self._tracer_cache[config_json]
306 else:
307 tracer_provider = self.tracer_provider(config=config)
308 if tracer_provider is None:
309 return None
310 else:
311 return self._tracer_cache.setdefault(
312 config_json, tracer_provider.get_tracer(instrumenting_module_name="multistorageclient")
313 )
314
315
316# To share a single :py:class:`Telemetry` within a process (e.g. local, manager).
317#
318# A manager's server processes shouldn't be forked, so this should be safe.
319_TELEMETRY: Optional[Telemetry] = None
320_TELEMETRY_LOCK = threading.Lock()
321
322
323def _init() -> Telemetry:
324 """
325 Create or return an existing :py:class:`Telemetry`.
326
327 :return: A telemetry instance.
328 """
329 global _TELEMETRY
330 global _TELEMETRY_LOCK
331
332 with _TELEMETRY_LOCK:
333 if _TELEMETRY is None:
334 _TELEMETRY = Telemetry()
335 return _TELEMETRY
336
337
[docs]
338class TelemetryManager(multiprocessing.managers.BaseManager):
339 """
340 A :py:class:`multiprocessing.managers.BaseManager` for telemetry resources.
341
342 The OpenTelemetry Python SDK isn't fork-safe since telemetry sample buffers can be duplicated.
343
344 In addition, Python ≤3.12 doesn't call exit handlers for forked processes.
345 This causes the OpenTelemetry Python SDK to not flush telemetry before exiting.
346
347 * https://github.com/open-telemetry/opentelemetry-python/issues/4215
348 * https://github.com/open-telemetry/opentelemetry-python/issues/3307
349
350 Forking is multiprocessing's default start method for non-macOS POSIX systems until Python 3.14.
351
352 * https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
353
354 To fully support multiprocessing, resampling + publishing is handled by
355 a single process that's (ideally) a child of (i.e. directly under) the main process. This:
356
357 * Relieves other processes of this work.
358
359 * Avoids issues with duplicate samples when forking and unpublished samples when exiting forks.
360
361 * Allows cross-process resampling.
362 * Reuses a single connection pool to telemetry backends.
363
364 The downside is it essentially re-introduces global interpreter lock (GIL) with
365 additional IPC overhead. Telemetry operations, however, should be lightweight so
366 this isn't expected to be a problem. Remote data store latency should still be
367 the primary throughput limiter for storage clients.
368
369 :py:class:`multiprocessing.managers.BaseManager` is used for this since it creates
370 a separate server process for shared objects.
371
372 Telemetry resources are provided as
373 `proxy objects <https://docs.python.org/3/library/multiprocessing.html#proxy-objects>`_
374 for location transparency.
375
376 The documentation isn't particularly detailed, but others have written comprehensively on this:
377
378 * https://zpz.github.io/blog/python-mp-manager-1
379 * https://zpz.github.io/blog/python-mp-manager-2
380 * https://zpz.github.io/blog/python-mp-manager-3
381
382 By specification, metric and tracer providers must call shutdown on any
383 underlying metric readers + span processors + exporters.
384
385 * https://opentelemetry.io/docs/specs/otel/metrics/sdk#shutdown
386 * https://opentelemetry.io/docs/specs/otel/trace/sdk#shutdown
387
388 In the OpenTelemetry Python SDK, provider shutdown is called automatically
389 by exit handlers (when they work at least). Consequently, clients should:
390
391 * Only receive proxy objects.
392
393 * Enables metric reader + span processor + exporter re-use across processes.
394
395 * Never call shutdown on the proxy objects.
396
397 * The shutdown exit handler is registered on the manager's server process.
398 * ⚠️ We expect a finite number of providers (i.e. no dynamic configs) so we don't leak them.
399 """
400
401 pass
402
403
404def _fully_qualified_name(c: type[Any]) -> str:
405 """
406 Return the fully qualified name for a class (e.g. ``module.Class``).
407
408 For :py:class:`multiprocessing.Manager` type IDs.
409 """
410 return ".".join([c.__module__, c.__qualname__])
411
412
413# Metrics proxy object setup.
414TelemetryManager.register(typeid=_fully_qualified_name(api_metrics._Gauge))
415TelemetryManager.register(typeid=_fully_qualified_name(api_metrics.Counter))
416TelemetryManager.register(
417 typeid=_fully_qualified_name(api_metrics.Meter),
418 method_to_typeid={
419 api_metrics.Meter.create_gauge.__name__: _fully_qualified_name(api_metrics._Gauge),
420 api_metrics.Meter.create_counter.__name__: _fully_qualified_name(api_metrics.Counter),
421 },
422)
423TelemetryManager.register(
424 typeid=_fully_qualified_name(api_metrics.MeterProvider),
425 method_to_typeid={api_metrics.MeterProvider.get_meter.__name__: _fully_qualified_name(api_metrics.Meter)},
426)
427
428# Traces proxy object setup.
429TelemetryManager.register(
430 typeid=_fully_qualified_name(api_trace.Span),
431 # Non-public methods (i.e. ones starting with a ``_``) are omitted by default.
432 #
433 # We need ``__enter__`` and ``__exit__`` so the ``Span`` can be used as a ``ContextManager``.
434 exposed=[name for name, _ in inspect.getmembers(api_trace.Span, predicate=inspect.isfunction)],
435 method_to_typeid={api_trace.Span.__enter__.__name__: _fully_qualified_name(api_trace.Span)},
436)
437TelemetryManager.register(
438 typeid=_fully_qualified_name(api_trace.Tracer),
439 # Can't proxy ``Tracer.start_as_current_span`` since it returns a generator (not pickleable)
440 # and tries to use the process-local global context (in this case, the manager's server process).
441 #
442 # Instead, spans should be constructed by:
443 #
444 # 1. Calling ``opentelemetry.context.get_current()`` to get the process-local global context (pickleable).
445 # 2. Creating a new span with the process-local global context.
446 # 3. Calling ``opentelemetry.trace.use_span()`` with the span and ``end_on_exit=True``.
447 method_to_typeid={api_trace.Tracer.start_span.__name__: _fully_qualified_name(api_trace.Span)},
448)
449TelemetryManager.register(
450 typeid=_fully_qualified_name(api_trace.TracerProvider),
451 method_to_typeid={api_trace.TracerProvider.get_tracer.__name__: _fully_qualified_name(api_trace.Tracer)},
452)
453
454# Telemetry proxy object setup.
455#
456# This should be the only registered type with a ``callable``.
457# It's the only type we create directly with a ``TelemetryManager``.
458TelemetryManager.register(
459 typeid=Telemetry.__name__,
460 callable=_init,
461 method_to_typeid={
462 Telemetry.meter_provider.__name__: _fully_qualified_name(api_metrics.MeterProvider),
463 Telemetry.meter.__name__: _fully_qualified_name(api_metrics.Meter),
464 Telemetry.gauge.__name__: _fully_qualified_name(api_metrics._Gauge),
465 Telemetry.counter.__name__: _fully_qualified_name(api_metrics.Counter),
466 Telemetry.tracer_provider.__name__: _fully_qualified_name(api_trace.TracerProvider),
467 Telemetry.tracer.__name__: _fully_qualified_name(api_trace.Tracer),
468 },
469)
470
471
472# Map of init options as a sorted JSON string (since dictionaries can't be hashed) to telemetry proxy.
473_TELEMETRY_PROXIES: dict[str, Telemetry] = {}
474# To share :py:class:`Telemetry` proxy objects within a process (e.g. client, server).
475#
476# Forking isn't expected to happen while this is held (may lead to a deadlock).
477_TELEMETRY_PROXIES_LOCK = threading.Lock()
478
479
[docs]
480class TelemetryMode(enum.Enum):
481 """
482 How to create a :py:class:`Telemetry` object.
483 """
484
485 #: Keep everything local to the process (not fork-safe).
486 LOCAL = "local"
487 #: Start + connect to a telemetry IPC server.
488 SERVER = "server"
489 #: Connect to a telemetry IPC server.
490 CLIENT = "client"
491
492
493def _telemetry_manager_server_port(process: multiprocessing.process.BaseProcess) -> int:
494 """
495 Get the default telemetry manager server port.
496
497 This is PID-based to:
498
499 * Avoid collisions between multiple independent Python interpreters running on the same machine.
500 * Let child processes deterministically find their parent's telemetry manager server.
501
502 :param process: Process whose PID is used to calculate the port.
503 """
504
505 if process.pid is None:
506 raise ValueError("Can't calculate the default telemetry manager server port from an unstarted process!")
507
508 # Use the dynamic/private/ephemeral port range.
509 #
510 # https://www.rfc-editor.org/rfc/rfc6335.html#section-6
511 # https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Dynamic,_private_or_ephemeral_ports
512 #
513 # Modulo the parent/child process PID by the port range length, then add the initial offset.
514 return (2**15 + 2**14) + (process.pid % ((2**16) - (2**15 + 2**14)))
515
516
[docs]
517def init(
518 mode: TelemetryMode = TelemetryMode.SERVER if multiprocessing.parent_process() is None else TelemetryMode.CLIENT,
519 address: Optional[Union[str, tuple[str, int]]] = None,
520) -> Telemetry:
521 """
522 Create or return an existing :py:class:`Telemetry` instance or :py:class:`Telemetry` proxy object.
523
524 :param mode: How to create a :py:class:`Telemetry` object. Defaults to :py:const:`TelemetryMode.SERVER`/:py:const:`TelemetryMode.CLIENT` if the current process is a main/child Python process.
525 :param address: Telemetry IPC server address. Passed directly to a :py:class:`multiprocessing.managers.BaseManager`. Ignored if the mode is :py:const:`TelemetryMode.LOCAL`.
526 :return: A telemetry instance.
527 """
528
529 if mode == TelemetryMode.LOCAL:
530 return _init()
531 elif mode == TelemetryMode.SERVER or mode == TelemetryMode.CLIENT:
532 global _TELEMETRY_PROXIES
533 global _TELEMETRY_PROXIES_LOCK
534
535 if address is None:
536 process = multiprocessing.current_process()
537 if mode == TelemetryMode.CLIENT:
538 # If this is a child process, try to use the parent process's default telemetry manager server port instead.
539 #
540 # This won't work with 2+ high Python process trees, but such setups seem uncommon.
541 #
542 # TODO: Consider a service discovery propagation mechanism so grandchild processes can discover grandparent process IDs.
543 process = multiprocessing.parent_process() or process
544
545 address = ("127.0.0.1", _telemetry_manager_server_port(process=process))
546
547 init_options = {"mode": mode.value, "address": address}
548 init_options_json = json.dumps(init_options, sort_keys=True)
549
550 with _TELEMETRY_PROXIES_LOCK:
551 if init_options_json in _TELEMETRY_PROXIES:
552 return _TELEMETRY_PROXIES[init_options_json]
553 else:
554 telemetry_manager = TelemetryManager(
555 address=address,
556 # Use spawn instead of the platform-specific default (may be fork) to avoid aforementioned issues with fork.
557 ctx=multiprocessing.get_context(method="spawn"),
558 )
559
560 if mode == TelemetryMode.SERVER:
561 logger.debug(f"Creating telemetry manager server at {telemetry_manager.address}.")
562 try:
563 telemetry_manager.start()
564 atexit.register(telemetry_manager.shutdown)
565 logger.debug(f"Started telemetry manager server at {telemetry_manager.address}.")
566 except Exception as e:
567 logger.error(
568 f"Failed to create telemetry manager server at {telemetry_manager.address}!", exc_info=True
569 )
570 raise e
571
572 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.")
573 try:
574 telemetry_manager.connect()
575 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.")
576 except Exception as e:
577 logger.error(
578 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!", exc_info=True
579 )
580 raise e
581
582 return _TELEMETRY_PROXIES.setdefault(init_options_json, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue]
583 else:
584 raise ValueError(f"Unsupported telemetry mode: {mode}")