1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import enum
18import inspect
19import json
20import logging
21import multiprocessing
22import multiprocessing.managers
23import threading
24from typing import Any, Optional, Union
25
26import opentelemetry.metrics as api_metrics
27import opentelemetry.trace as api_trace
28
29from .. import utils
30
31# MSC telemetry prefers publishing raw samples when possible to support arbitrary post-hoc aggregations.
32#
33# Some setups, however, may need resampling to reduce sample volume. The resampling methods we use
34# sacrifice temporal resolution to preserve other information. Which method is used depends on if
35# the expected post-hoc aggregate function is decomposable:
36#
37# * Decomposable aggregate functions (e.g. count, sum, min, max).
38# * Use client-side aggregation.
39# * E.g. preserve request + response counts.
40# * Non-decomposable aggregate functions (e.g. average, percentile).
41# * Use decimation by an integer factor or last value.
42# * E.g. preserve the shape of the latency distribution (unlike tail sampling).
43
44_METRICS_EXPORTER_MAPPING = {
45 "console": "opentelemetry.sdk.metrics.export.ConsoleMetricExporter",
46 "otlp": "opentelemetry.exporter.otlp.proto.http.metric_exporter.OTLPMetricExporter",
47 # "Private" until it's decided whether this will be official.
48 "_otlp_msal": "multistorageclient.telemetry.metrics.exporters.otlp_msal._OTLPMSALMetricExporter",
49}
50
51_TRACE_EXPORTER_MAPPING = {
52 "console": "opentelemetry.sdk.trace.export.ConsoleSpanExporter",
53 "otlp": "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter",
54}
55
56logger = logging.Logger(__name__)
57
58
[docs]
59class Telemetry:
60 """
61 Provides telemetry resources.
62
63 Instances shouldn't be copied between processes. Not fork-safe or pickleable.
64
65 Instances can be shared between processes by registering with a :py:class:`multiprocessing.managers.BaseManager` and using proxy objects.
66 """
67
68 # Metrics are named `multistorageclient.{property}(.{aggregation})?`.
69 #
70 # For example:
71 #
72 # - multistorageclient.data_size
73 # - Gauge for data size per individual operation.
74 # - For distributions (e.g. post-hoc histograms + heatmaps).
75 # - multistorageclient.data_size.sum
76 # - Counter (sum) for data size across all operations.
77 # - For aggregates (e.g. post-hoc data rate calculations).
78
79 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics
80 class GaugeName(enum.Enum):
81 LATENCY = "multistorageclient.latency"
82 DATA_SIZE = "multistorageclient.data_size"
83 DATA_RATE = "multistorageclient.data_rate"
84
85 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units
86 _GAUGE_UNIT_MAPPING: dict[GaugeName, str] = {
87 # Seconds.
88 GaugeName.LATENCY: "s",
89 # Bytes.
90 GaugeName.DATA_SIZE: "By",
91 # Bytes/second.
92 GaugeName.DATA_RATE: "By/s",
93 }
94
95 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics
96 class CounterName(enum.Enum):
97 REQUEST_SUM = "multistorageclient.request.sum"
98 RESPONSE_SUM = "multistorageclient.response.sum"
99 DATA_SIZE_SUM = "multistorageclient.data_size.sum"
100
101 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units
102 _COUNTER_UNIT_MAPPING: dict[CounterName, str] = {
103 # Unitless.
104 CounterName.REQUEST_SUM: "{request}",
105 # Unitless.
106 CounterName.RESPONSE_SUM: "{response}",
107 # Bytes.
108 CounterName.DATA_SIZE_SUM: "By",
109 }
110
111 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter provider.
112 _meter_provider_cache: dict[str, api_metrics.MeterProvider]
113 _meter_provider_cache_lock: threading.Lock
114 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter.
115 _meter_cache: dict[str, api_metrics.Meter]
116 _meter_cache_lock: threading.Lock
117 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to gauge name to gauge.
118 _gauge_cache: dict[str, dict[GaugeName, api_metrics._Gauge]]
119 _gauge_cache_lock: threading.Lock
120 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to counter name to counter.
121 _counter_cache: dict[str, dict[CounterName, api_metrics.Counter]]
122 _counter_cache_lock: threading.Lock
123 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer provider.
124 _tracer_provider_cache: dict[str, api_trace.TracerProvider]
125 _tracer_provider_cache_lock: threading.Lock
126 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer.
127 _tracer_cache: dict[str, api_trace.Tracer]
128 _tracer_cache_lock: threading.Lock
129
130 def __init__(self):
131 self._meter_provider_cache = {}
132 self._meter_provider_cache_lock = threading.Lock()
133 self._meter_cache = {}
134 self._meter_cache_lock = threading.Lock()
135 self._gauge_cache = {}
136 self._gauge_cache_lock = threading.Lock()
137 self._counter_cache = {}
138 self._counter_cache_lock = threading.Lock()
139 self._tracer_provider_cache = {}
140 self._tracer_provider_cache_lock = threading.Lock()
141 self._tracer_cache = {}
142 self._tracer_cache_lock = threading.Lock()
143
144 def meter_provider(self, config: dict[str, Any]) -> Optional[api_metrics.MeterProvider]:
145 """
146 Create or return an existing :py:class:`api_metrics.MeterProvider` for a config.
147
148 :param config: ``.opentelemetry.metrics`` config dict.
149 :return: A :py:class:`api_metrics.MeterProvider` or ``None`` if no valid exporter is configured.
150 """
151 config_json = json.dumps(config, sort_keys=True)
152 with self._meter_provider_cache_lock:
153 if config_json in self._meter_provider_cache:
154 return self._meter_provider_cache[config_json]
155 else:
156 if "exporter" in config:
157 try:
158 import opentelemetry.sdk.metrics as sdk_metrics
159 import opentelemetry.sdk.metrics.export as sdk_metrics_export
160
161 from .metrics.readers.diperiodic_exporting import DiperiodicExportingMetricReader
162
163 exporter_type: str = config["exporter"]["type"]
164 exporter_fully_qualified_name = _METRICS_EXPORTER_MAPPING.get(exporter_type, exporter_type)
165 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1)
166 cls = utils.import_class(exporter_class_name, exporter_module_name)
167 exporter_options = config["exporter"].get("options", {})
168 exporter: sdk_metrics_export.MetricExporter = cls(**exporter_options)
169
170 reader_options = config.get("reader", {}).get("options", {})
171 reader: sdk_metrics_export.MetricReader = DiperiodicExportingMetricReader(
172 **reader_options, exporter=exporter
173 )
174
175 return self._meter_provider_cache.setdefault(
176 config_json, sdk_metrics.MeterProvider(metric_readers=[reader])
177 )
178 except (AttributeError, ImportError):
179 logger.error(
180 "Failed to import OpenTelemetry Python SDK or exporter! Disabling metrics.", exc_info=True
181 )
182 return None
183 else:
184 # Don't return a no-op meter provider to avoid unnecessary overhead.
185 logger.error("No exporter configured! Disabling metrics.")
186 return None
187
188 def meter(self, config: dict[str, Any]) -> Optional[api_metrics.Meter]:
189 """
190 Create or return an existing :py:class:`api_metrics.Meter` for a config.
191
192 :param config: ``.opentelemetry.metrics`` config dict.
193 :return: A :py:class:`api_metrics.Meter` or ``None`` if no valid exporter is configured.
194 """
195 config_json = json.dumps(config, sort_keys=True)
196 with self._meter_cache_lock:
197 if config_json in self._meter_cache:
198 return self._meter_cache[config_json]
199 else:
200 meter_provider = self.meter_provider(config=config)
201 if meter_provider is None:
202 return None
203 else:
204 return self._meter_cache.setdefault(
205 config_json, meter_provider.get_meter(name="multistorageclient")
206 )
207
208 def gauge(self, config: dict[str, Any], name: GaugeName) -> Optional[api_metrics._Gauge]:
209 """
210 Create or return an existing :py:class:`api_metrics.Gauge` for a config and gauge name.
211
212 :param config: ``.opentelemetry.metrics`` config dict.
213 :return: A :py:class:`api_metrics.Gauge` or ``None`` if no valid exporter is configured.
214 """
215 config_json = json.dumps(config, sort_keys=True)
216 with self._gauge_cache_lock:
217 if config_json in self._gauge_cache and name in self._gauge_cache[config_json]:
218 return self._gauge_cache[config_json][name]
219 else:
220 meter = self.meter(config=config)
221 if meter is None:
222 return None
223 else:
224 return self._gauge_cache.setdefault(config_json, {}).setdefault(
225 name,
226 meter.create_gauge(name=name.value, unit=Telemetry._GAUGE_UNIT_MAPPING.get(name, "")),
227 )
228
229 def counter(self, config: dict[str, Any], name: CounterName) -> Optional[api_metrics.Counter]:
230 """
231 Create or return an existing :py:class:`api_metrics.Counter` for a config and counter name.
232
233 :param config: ``.opentelemetry.metrics`` config dict.
234 :return: A :py:class:`api_metrics.Counter` or ``None`` if no valid exporter is configured.
235 """
236 config_json = json.dumps(config, sort_keys=True)
237 with self._counter_cache_lock:
238 if config_json in self._counter_cache and name in self._counter_cache[config_json]:
239 return self._counter_cache[config_json][name]
240 else:
241 meter = self.meter(config=config)
242 if meter is None:
243 return None
244 else:
245 return self._counter_cache.setdefault(config_json, {}).setdefault(
246 name,
247 meter.create_counter(name=name.value, unit=Telemetry._COUNTER_UNIT_MAPPING.get(name, "")),
248 )
249
250 def tracer_provider(self, config: dict[str, Any]) -> Optional[api_trace.TracerProvider]:
251 """
252 Create or return an existing :py:class:`api_trace.TracerProvider` for a config.
253
254 :param config: ``.opentelemetry.traces`` config dict.
255 :return: A :py:class:`api_trace.TracerProvider` or ``None`` if no valid exporter is configured.
256 """
257 config_json = json.dumps(config, sort_keys=True)
258 with self._tracer_provider_cache_lock:
259 if config_json in self._tracer_provider_cache:
260 return self._tracer_provider_cache[config_json]
261 else:
262 if "exporter" in config:
263 try:
264 import opentelemetry.sdk.trace as sdk_trace
265 import opentelemetry.sdk.trace.export as sdk_trace_export
266 import opentelemetry.sdk.trace.sampling as sdk_trace_sampling
267
268 exporter_type: str = config["exporter"]["type"]
269 exporter_fully_qualified_name = _TRACE_EXPORTER_MAPPING.get(exporter_type, exporter_type)
270 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1)
271 cls = utils.import_class(exporter_class_name, exporter_module_name)
272 exporter_options = config["exporter"].get("options", {})
273 exporter: sdk_trace_export.SpanExporter = cls(**exporter_options)
274
275 processor: sdk_trace.SpanProcessor = sdk_trace.SynchronousMultiSpanProcessor()
276 processor.add_span_processor(sdk_trace_export.BatchSpanProcessor(span_exporter=exporter))
277
278 # TODO: Add sampler to configuration schema.
279 sampler: sdk_trace_sampling.Sampler = sdk_trace_sampling.ALWAYS_ON
280
281 return self._tracer_provider_cache.setdefault(
282 config_json,
283 sdk_trace.TracerProvider(active_span_processor=processor, sampler=sampler),
284 )
285 except (AttributeError, ImportError):
286 logger.error(
287 "Failed to import OpenTelemetry Python SDK or exporter! Disabling traces.", exc_info=True
288 )
289 return None
290 else:
291 logger.error("No exporter configured! Disabling traces.")
292 return None
293
294 def tracer(self, config: dict[str, Any]) -> Optional[api_trace.Tracer]:
295 """
296 Create or return an existing :py:class:`api_trace.Tracer` for a config.
297
298 :param config: ``.opentelemetry.traces`` config dict.
299 :return: A :py:class:`api_trace.Tracer` or ``None`` if no valid exporter is configured.
300 """
301 config_json = json.dumps(config, sort_keys=True)
302 with self._tracer_cache_lock:
303 if config_json in self._tracer_cache:
304 return self._tracer_cache[config_json]
305 else:
306 tracer_provider = self.tracer_provider(config=config)
307 if tracer_provider is None:
308 return None
309 else:
310 return self._tracer_cache.setdefault(
311 config_json, tracer_provider.get_tracer(instrumenting_module_name="multistorageclient")
312 )
313
314
315# To share a single :py:class:`Telemetry` within a process (e.g. local, manager).
316#
317# A manager's server processes shouldn't be forked, so this should be safe.
318_TELEMETRY: Optional[Telemetry] = None
319_TELEMETRY_LOCK = threading.Lock()
320
321
322def _init() -> Telemetry:
323 """
324 Create or return an existing :py:class:`Telemetry`.
325
326 :return: A telemetry instance.
327 """
328 global _TELEMETRY
329 global _TELEMETRY_LOCK
330
331 with _TELEMETRY_LOCK:
332 if _TELEMETRY is None:
333 _TELEMETRY = Telemetry()
334 return _TELEMETRY
335
336
[docs]
337class TelemetryManager(multiprocessing.managers.BaseManager):
338 """
339 A :py:class:`multiprocessing.managers.BaseManager` for telemetry resources.
340
341 The OpenTelemetry Python SDK isn't fork-safe since telemetry sample buffers can be duplicated.
342
343 In addition, Python ≤3.12 doesn't call exit handlers for forked processes.
344 This causes the OpenTelemetry Python SDK to not flush telemetry before exiting.
345
346 * https://github.com/open-telemetry/opentelemetry-python/issues/4215
347 * https://github.com/open-telemetry/opentelemetry-python/issues/3307
348
349 Forking is multiprocessing's default start method for non-macOS POSIX systems until Python 3.14.
350
351 * https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
352
353 To fully support multiprocessing, resampling + publishing is handled by
354 a single process that's (ideally) a child of (i.e. directly under) the main process. This:
355
356 * Relieves other processes of this work.
357
358 * Avoids issues with duplicate samples when forking and unpublished samples when exiting forks.
359
360 * Allows cross-process resampling.
361 * Reuses a single connection pool to telemetry backends.
362
363 The downside is it essentially re-introduces global interpreter lock (GIL) with
364 additional IPC overhead. Telemetry operations, however, should be lightweight so
365 this isn't expected to be a problem. Remote data store latency should still be
366 the primary throughput limiter for storage clients.
367
368 :py:class:`multiprocessing.managers.BaseManager` is used for this since it creates
369 a separate server process for shared objects.
370
371 Telemetry resources are provided as
372 `proxy objects <https://docs.python.org/3/library/multiprocessing.html#proxy-objects>`_
373 for location transparency.
374
375 The documentation isn't particularly detailed, but others have written comprehensively on this:
376
377 * https://zpz.github.io/blog/python-mp-manager-1
378 * https://zpz.github.io/blog/python-mp-manager-2
379 * https://zpz.github.io/blog/python-mp-manager-3
380
381 By specification, metric and tracer providers must call shutdown on any
382 underlying metric readers + span processors + exporters.
383
384 * https://opentelemetry.io/docs/specs/otel/metrics/sdk#shutdown
385 * https://opentelemetry.io/docs/specs/otel/trace/sdk#shutdown
386
387 In the OpenTelemetry Python SDK, provider shutdown is called automatically
388 by exit handlers (when they work at least). Consequently, clients should:
389
390 * Only receive proxy objects.
391
392 * Enables metric reader + span processor + exporter re-use across processes.
393
394 * Never call shutdown on the proxy objects.
395
396 * The shutdown exit handler is registered on the manager's server process.
397 * ⚠️ We expect a finite number of providers (i.e. no dynamic configs) so we don't leak them.
398 """
399
400 pass
401
402
403def _fully_qualified_name(c: type[Any]) -> str:
404 """
405 Return the fully qualified name for a class (e.g. ``module.Class``).
406
407 For :py:class:`multiprocessing.Manager` type IDs.
408 """
409 return ".".join([c.__module__, c.__qualname__])
410
411
412# Metrics proxy object setup.
413TelemetryManager.register(typeid=_fully_qualified_name(api_metrics._Gauge))
414TelemetryManager.register(typeid=_fully_qualified_name(api_metrics.Counter))
415TelemetryManager.register(
416 typeid=_fully_qualified_name(api_metrics.Meter),
417 method_to_typeid={
418 api_metrics.Meter.create_gauge.__name__: _fully_qualified_name(api_metrics._Gauge),
419 api_metrics.Meter.create_counter.__name__: _fully_qualified_name(api_metrics.Counter),
420 },
421)
422TelemetryManager.register(
423 typeid=_fully_qualified_name(api_metrics.MeterProvider),
424 method_to_typeid={api_metrics.MeterProvider.get_meter.__name__: _fully_qualified_name(api_metrics.Meter)},
425)
426
427# Traces proxy object setup.
428TelemetryManager.register(
429 typeid=_fully_qualified_name(api_trace.Span),
430 # Non-public methods (i.e. ones starting with a ``_``) are omitted by default.
431 #
432 # We need ``__enter__`` and ``__exit__`` so the ``Span`` can be used as a ``ContextManager``.
433 exposed=[name for name, _ in inspect.getmembers(api_trace.Span, predicate=inspect.isfunction)],
434 method_to_typeid={api_trace.Span.__enter__.__name__: _fully_qualified_name(api_trace.Span)},
435)
436TelemetryManager.register(
437 typeid=_fully_qualified_name(api_trace.Tracer),
438 # Can't proxy ``Tracer.start_as_current_span`` since it returns a generator (not pickleable)
439 # and tries to use the process-local global context (in this case, the manager's server process).
440 #
441 # Instead, spans should be constructed by:
442 #
443 # 1. Calling ``opentelemetry.context.get_current()`` to get the process-local global context (pickleable).
444 # 2. Creating a new span with the process-local global context.
445 # 3. Calling ``opentelemetry.trace.use_span()`` with the span and ``end_on_exit=True``.
446 method_to_typeid={api_trace.Tracer.start_span.__name__: _fully_qualified_name(api_trace.Span)},
447)
448TelemetryManager.register(
449 typeid=_fully_qualified_name(api_trace.TracerProvider),
450 method_to_typeid={api_trace.TracerProvider.get_tracer.__name__: _fully_qualified_name(api_trace.Tracer)},
451)
452
453# Telemetry proxy object setup.
454#
455# This should be the only registered type with a ``callable``.
456# It's the only type we create directly with a ``TelemetryManager``.
457TelemetryManager.register(
458 typeid=Telemetry.__name__,
459 callable=_init,
460 method_to_typeid={
461 Telemetry.meter_provider.__name__: _fully_qualified_name(api_metrics.MeterProvider),
462 Telemetry.meter.__name__: _fully_qualified_name(api_metrics.Meter),
463 Telemetry.gauge.__name__: _fully_qualified_name(api_metrics._Gauge),
464 Telemetry.counter.__name__: _fully_qualified_name(api_metrics.Counter),
465 Telemetry.tracer_provider.__name__: _fully_qualified_name(api_trace.TracerProvider),
466 Telemetry.tracer.__name__: _fully_qualified_name(api_trace.Tracer),
467 },
468)
469
470
471# Map of init options as a sorted JSON string (since dictionaries can't be hashed) to telemetry proxy.
472_TELEMETRY_PROXIES: dict[str, Telemetry] = {}
473# To share :py:class:`Telemetry` proxy objects within a process (e.g. client, server).
474#
475# Forking isn't expected to happen while this is held (may lead to a deadlock).
476_TELEMETRY_PROXIES_LOCK = threading.Lock()
477
478
[docs]
479class TelemetryMode(enum.Enum):
480 """
481 How to create a :py:class:`Telemetry` object.
482 """
483
484 #: Keep everything local to the process (not fork-safe).
485 LOCAL = "local"
486 #: Start + connect to a telemetry IPC server.
487 SERVER = "server"
488 #: Connect to a telemetry IPC server.
489 CLIENT = "client"
490
491
492def _telemetry_manager_server_port() -> int:
493 """
494 Get the default telemetry manager server port.
495
496 This is PID-based to:
497
498 * Avoid collisions between multiple independent Python interpreters running on the same machine.
499 * Let child processes deterministically find their parent's telemetry manager server.
500 """
501
502 # This won't work with 2+ high Python processes trees, but such setups are uncommon.
503 process = multiprocessing.parent_process() or multiprocessing.current_process()
504 if process.pid is None:
505 raise ValueError(
506 "Can't calculate the default telemetry manager server port from an unstarted parent or current process!"
507 )
508
509 # Use the dynamic/private/ephemeral port range.
510 #
511 # https://www.rfc-editor.org/rfc/rfc6335.html#section-6
512 # https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Dynamic,_private_or_ephemeral_ports
513 #
514 # Modulo the parent/child process PID by the port range length, then add the initial offset.
515 return (2**15 + 2**14) + (process.pid % ((2**16) - (2**15 + 2**14)))
516
517
[docs]
518def init(
519 mode: TelemetryMode = TelemetryMode.SERVER if multiprocessing.parent_process() is None else TelemetryMode.CLIENT,
520 address: Optional[Union[str, tuple[str, int]]] = None,
521) -> Telemetry:
522 """
523 Create or return an existing :py:class:`Telemetry` instance or :py:class:`Telemetry` proxy object.
524
525 :param mode: How to create a :py:class:`Telemetry` object.
526 :param address: Telemetry IPC server address. Passed directly to a :py:class:`multiprocessing.managers.BaseManager`. Ignored if the mode is :py:const:`TelemetryMode.LOCAL`.
527 :return: A telemetry instance.
528 """
529
530 if mode == TelemetryMode.LOCAL:
531 return _init()
532 elif mode == TelemetryMode.SERVER or mode == TelemetryMode.CLIENT:
533 global _TELEMETRY_PROXIES
534 global _TELEMETRY_PROXIES_LOCK
535
536 if address is None:
537 address = ("127.0.0.1", _telemetry_manager_server_port())
538
539 init_options = {"mode": mode.value, "address": address}
540 init_options_json = json.dumps(init_options, sort_keys=True)
541
542 with _TELEMETRY_PROXIES_LOCK:
543 if init_options_json in _TELEMETRY_PROXIES:
544 return _TELEMETRY_PROXIES[init_options_json]
545 else:
546 telemetry_manager = TelemetryManager(
547 address=address,
548 authkey="multistorageclient-telemetry".encode(),
549 # Use spawn instead of the platform-specific default (may be fork) to avoid aforementioned issues with fork.
550 ctx=multiprocessing.get_context(method="spawn"),
551 )
552
553 if mode == TelemetryMode.SERVER:
554 logger.debug(f"Creating telemetry manager server at {telemetry_manager.address}.")
555 try:
556 telemetry_manager.start()
557 atexit.register(telemetry_manager.shutdown)
558 logger.debug(f"Started telemetry manager server at {telemetry_manager.address}.")
559 except Exception as e:
560 logger.error(
561 f"Failed to create telemetry manager server at {telemetry_manager.address}!", exc_info=True
562 )
563 raise e
564
565 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.")
566 try:
567 telemetry_manager.connect()
568 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.")
569 except Exception as e:
570 logger.error(
571 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!", exc_info=True
572 )
573 raise e
574
575 return _TELEMETRY_PROXIES.setdefault(init_options_json, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue]
576 else:
577 raise ValueError(f"Unsupported telemetry mode: {mode}")