1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import enum
18import inspect
19import json
20import logging
21import multiprocessing
22import multiprocessing.managers
23import os
24import threading
25from typing import Any, Literal, Optional, Union
26
27import opentelemetry.metrics as api_metrics
28import opentelemetry.trace as api_trace
29import psutil
30
31from .. import utils
32
33# MSC telemetry prefers publishing raw samples when possible to support arbitrary post-hoc aggregations.
34#
35# Some setups, however, may need resampling to reduce sample volume. The resampling methods we use
36# sacrifice temporal resolution to preserve other information. Which method is used depends on if
37# the expected post-hoc aggregate function is decomposable:
38#
39# * Decomposable aggregate functions (e.g. count, sum, min, max).
40# * Use client-side aggregation.
41# * E.g. preserve request + response counts.
42# * Non-decomposable aggregate functions (e.g. average, percentile).
43# * Use decimation by an integer factor or last value.
44# * E.g. preserve the shape of the latency distribution (unlike tail sampling).
45
46_METRICS_EXPORTER_MAPPING = {
47 "console": "opentelemetry.sdk.metrics.export.ConsoleMetricExporter",
48 "otlp": "opentelemetry.exporter.otlp.proto.http.metric_exporter.OTLPMetricExporter",
49 # "Private" until it's decided whether this will be official.
50 "_otlp_msal": "multistorageclient.telemetry.metrics.exporters.otlp_msal._OTLPMSALMetricExporter",
51}
52
53_TRACE_EXPORTER_MAPPING = {
54 "console": "opentelemetry.sdk.trace.export.ConsoleSpanExporter",
55 "otlp": "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter",
56 # "Private" until it's decided whether this will be official.
57 "_otlp_msal": "multistorageclient.telemetry.traces.exporters.otlp_msal._OTLPMSALSpanExporter",
58}
59
60logger = logging.getLogger(__name__)
61
62
[docs]
63class Telemetry:
64 """
65 Provides telemetry resources.
66
67 Instances shouldn't be copied between processes. Not fork-safe or pickleable.
68
69 Instances can be shared between processes by registering with a :py:class:`multiprocessing.managers.BaseManager` and using proxy objects.
70 """
71
72 # Metrics are named `multistorageclient.{property}(.{aggregation})?`.
73 #
74 # For example:
75 #
76 # - multistorageclient.data_size
77 # - Gauge for data size per individual operation.
78 # - For distributions (e.g. post-hoc histograms + heatmaps).
79 # - multistorageclient.data_size.sum
80 # - Counter (sum) for data size across all operations.
81 # - For aggregates (e.g. post-hoc data rate calculations).
82
83 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics
84 class GaugeName(enum.Enum):
85 LATENCY = "multistorageclient.latency"
86 DATA_SIZE = "multistorageclient.data_size"
87 DATA_RATE = "multistorageclient.data_rate"
88
89 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units
90 _GAUGE_UNIT_MAPPING: dict[GaugeName, str] = {
91 # Seconds.
92 GaugeName.LATENCY: "s",
93 # Bytes.
94 GaugeName.DATA_SIZE: "By",
95 # Bytes/second.
96 GaugeName.DATA_RATE: "By/s",
97 }
98
99 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics
100 class CounterName(enum.Enum):
101 REQUEST_SUM = "multistorageclient.request.sum"
102 RESPONSE_SUM = "multistorageclient.response.sum"
103 DATA_SIZE_SUM = "multistorageclient.data_size.sum"
104
105 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units
106 _COUNTER_UNIT_MAPPING: dict[CounterName, str] = {
107 # Unitless.
108 CounterName.REQUEST_SUM: "{request}",
109 # Unitless.
110 CounterName.RESPONSE_SUM: "{response}",
111 # Bytes.
112 CounterName.DATA_SIZE_SUM: "By",
113 }
114
115 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter provider.
116 _meter_provider_cache: dict[str, api_metrics.MeterProvider]
117 _meter_provider_cache_lock: threading.Lock
118 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter.
119 _meter_cache: dict[str, api_metrics.Meter]
120 _meter_cache_lock: threading.Lock
121 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to gauge name to gauge.
122 _gauge_cache: dict[str, dict[GaugeName, api_metrics._Gauge]]
123 _gauge_cache_lock: threading.Lock
124 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to counter name to counter.
125 _counter_cache: dict[str, dict[CounterName, api_metrics.Counter]]
126 _counter_cache_lock: threading.Lock
127 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer provider.
128 _tracer_provider_cache: dict[str, api_trace.TracerProvider]
129 _tracer_provider_cache_lock: threading.Lock
130 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer.
131 _tracer_cache: dict[str, api_trace.Tracer]
132 _tracer_cache_lock: threading.Lock
133
134 def __init__(self):
135 self._meter_provider_cache = {}
136 self._meter_provider_cache_lock = threading.Lock()
137 self._meter_cache = {}
138 self._meter_cache_lock = threading.Lock()
139 self._gauge_cache = {}
140 self._gauge_cache_lock = threading.Lock()
141 self._counter_cache = {}
142 self._counter_cache_lock = threading.Lock()
143 self._tracer_provider_cache = {}
144 self._tracer_provider_cache_lock = threading.Lock()
145 self._tracer_cache = {}
146 self._tracer_cache_lock = threading.Lock()
147
148 def _reinitialize_instance_locks_after_fork(self) -> None:
149 """
150 Reinitialize internal locks after fork to prevent deadlocks.
151
152 This method reinitializes all internal locks. Caches are kept as they
153 may still be valid, but locks must be fresh to avoid deadlocks.
154 """
155 self._meter_provider_cache_lock = threading.Lock()
156 self._meter_cache_lock = threading.Lock()
157 self._gauge_cache_lock = threading.Lock()
158 self._counter_cache_lock = threading.Lock()
159 self._tracer_provider_cache_lock = threading.Lock()
160 self._tracer_cache_lock = threading.Lock()
161
162 def meter_provider(self, config: dict[str, Any]) -> Optional[api_metrics.MeterProvider]:
163 """
164 Create or return an existing :py:class:`api_metrics.MeterProvider` for a config.
165
166 :param config: ``.opentelemetry.metrics`` config dict.
167 :return: A :py:class:`api_metrics.MeterProvider` or ``None`` if no valid exporter is configured.
168 """
169 config_json = json.dumps(config, sort_keys=True)
170 with self._meter_provider_cache_lock:
171 if config_json in self._meter_provider_cache:
172 return self._meter_provider_cache[config_json]
173 else:
174 if "exporter" in config:
175 try:
176 import opentelemetry.sdk.metrics as sdk_metrics
177 import opentelemetry.sdk.metrics.export as sdk_metrics_export
178
179 from .metrics.readers.diperiodic_exporting import DiperiodicExportingMetricReader
180
181 exporter_type: str = config["exporter"]["type"]
182 exporter_fully_qualified_name = _METRICS_EXPORTER_MAPPING.get(exporter_type, exporter_type)
183 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1)
184 cls = utils.import_class(exporter_class_name, exporter_module_name)
185 exporter_options = config["exporter"].get("options", {})
186 exporter: sdk_metrics_export.MetricExporter = cls(**exporter_options)
187
188 reader_options = config.get("reader", {}).get("options", {})
189 reader: sdk_metrics_export.MetricReader = DiperiodicExportingMetricReader(
190 **reader_options, exporter=exporter
191 )
192
193 return self._meter_provider_cache.setdefault(
194 config_json, sdk_metrics.MeterProvider(metric_readers=[reader])
195 )
196 except (AttributeError, ImportError):
197 logger.error(
198 "Failed to import OpenTelemetry Python SDK or exporter! Disabling metrics.", exc_info=True
199 )
200 return None
201 else:
202 # Don't return a no-op meter provider to avoid unnecessary overhead.
203 logger.error("No exporter configured! Disabling metrics.")
204 return None
205
206 def meter(self, config: dict[str, Any]) -> Optional[api_metrics.Meter]:
207 """
208 Create or return an existing :py:class:`api_metrics.Meter` for a config.
209
210 :param config: ``.opentelemetry.metrics`` config dict.
211 :return: A :py:class:`api_metrics.Meter` or ``None`` if no valid exporter is configured.
212 """
213 config_json = json.dumps(config, sort_keys=True)
214 with self._meter_cache_lock:
215 if config_json in self._meter_cache:
216 return self._meter_cache[config_json]
217 else:
218 meter_provider = self.meter_provider(config=config)
219 if meter_provider is None:
220 return None
221 else:
222 return self._meter_cache.setdefault(
223 config_json, meter_provider.get_meter(name="multistorageclient")
224 )
225
226 def gauge(self, config: dict[str, Any], name: GaugeName) -> Optional[api_metrics._Gauge]:
227 """
228 Create or return an existing :py:class:`api_metrics.Gauge` for a config and gauge name.
229
230 :param config: ``.opentelemetry.metrics`` config dict.
231 :return: A :py:class:`api_metrics.Gauge` or ``None`` if no valid exporter is configured.
232 """
233 config_json = json.dumps(config, sort_keys=True)
234 with self._gauge_cache_lock:
235 if config_json in self._gauge_cache and name in self._gauge_cache[config_json]:
236 return self._gauge_cache[config_json][name]
237 else:
238 meter = self.meter(config=config)
239 if meter is None:
240 return None
241 else:
242 return self._gauge_cache.setdefault(config_json, {}).setdefault(
243 name,
244 meter.create_gauge(name=name.value, unit=Telemetry._GAUGE_UNIT_MAPPING.get(name, "")),
245 )
246
247 def counter(self, config: dict[str, Any], name: CounterName) -> Optional[api_metrics.Counter]:
248 """
249 Create or return an existing :py:class:`api_metrics.Counter` for a config and counter name.
250
251 :param config: ``.opentelemetry.metrics`` config dict.
252 :return: A :py:class:`api_metrics.Counter` or ``None`` if no valid exporter is configured.
253 """
254 config_json = json.dumps(config, sort_keys=True)
255 with self._counter_cache_lock:
256 if config_json in self._counter_cache and name in self._counter_cache[config_json]:
257 return self._counter_cache[config_json][name]
258 else:
259 meter = self.meter(config=config)
260 if meter is None:
261 return None
262 else:
263 return self._counter_cache.setdefault(config_json, {}).setdefault(
264 name,
265 meter.create_counter(name=name.value, unit=Telemetry._COUNTER_UNIT_MAPPING.get(name, "")),
266 )
267
268 def tracer_provider(self, config: dict[str, Any]) -> Optional[api_trace.TracerProvider]:
269 """
270 Create or return an existing :py:class:`api_trace.TracerProvider` for a config.
271
272 :param config: ``.opentelemetry.traces`` config dict.
273 :return: A :py:class:`api_trace.TracerProvider` or ``None`` if no valid exporter is configured.
274 """
275 config_json = json.dumps(config, sort_keys=True)
276 with self._tracer_provider_cache_lock:
277 if config_json in self._tracer_provider_cache:
278 return self._tracer_provider_cache[config_json]
279 else:
280 if "exporter" in config:
281 try:
282 import opentelemetry.sdk.trace as sdk_trace
283 import opentelemetry.sdk.trace.export as sdk_trace_export
284 import opentelemetry.sdk.trace.sampling as sdk_trace_sampling
285
286 exporter_type: str = config["exporter"]["type"]
287 exporter_fully_qualified_name = _TRACE_EXPORTER_MAPPING.get(exporter_type, exporter_type)
288 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1)
289 cls = utils.import_class(exporter_class_name, exporter_module_name)
290 exporter_options = config["exporter"].get("options", {})
291 exporter: sdk_trace_export.SpanExporter = cls(**exporter_options)
292
293 processor: sdk_trace.SpanProcessor = sdk_trace.SynchronousMultiSpanProcessor()
294 processor.add_span_processor(sdk_trace_export.BatchSpanProcessor(span_exporter=exporter))
295
296 # TODO: Add sampler to configuration schema.
297 sampler: sdk_trace_sampling.Sampler = sdk_trace_sampling.ALWAYS_ON
298
299 return self._tracer_provider_cache.setdefault(
300 config_json,
301 sdk_trace.TracerProvider(active_span_processor=processor, sampler=sampler),
302 )
303 except (AttributeError, ImportError):
304 logger.error(
305 "Failed to import OpenTelemetry Python SDK or exporter! Disabling traces.", exc_info=True
306 )
307 return None
308 else:
309 logger.error("No exporter configured! Disabling traces.")
310 return None
311
312 def tracer(self, config: dict[str, Any]) -> Optional[api_trace.Tracer]:
313 """
314 Create or return an existing :py:class:`api_trace.Tracer` for a config.
315
316 :param config: ``.opentelemetry.traces`` config dict.
317 :return: A :py:class:`api_trace.Tracer` or ``None`` if no valid exporter is configured.
318 """
319 config_json = json.dumps(config, sort_keys=True)
320 with self._tracer_cache_lock:
321 if config_json in self._tracer_cache:
322 return self._tracer_cache[config_json]
323 else:
324 tracer_provider = self.tracer_provider(config=config)
325 if tracer_provider is None:
326 return None
327 else:
328 return self._tracer_cache.setdefault(
329 config_json, tracer_provider.get_tracer(instrumenting_module_name="multistorageclient")
330 )
331
332
333# To share a single :py:class:`Telemetry` within a process (e.g. local, manager).
334#
335# A manager's server processes shouldn't be forked, so this should be safe.
336_TELEMETRY: Optional[Telemetry] = None
337_TELEMETRY_LOCK = threading.Lock()
338
339
340def _init() -> Telemetry:
341 """
342 Create or return an existing :py:class:`Telemetry`.
343
344 :return: A telemetry instance.
345 """
346 global _TELEMETRY
347 global _TELEMETRY_LOCK
348
349 with _TELEMETRY_LOCK:
350 if _TELEMETRY is None:
351 _TELEMETRY = Telemetry()
352 return _TELEMETRY
353
354
[docs]
355class TelemetryManager(multiprocessing.managers.BaseManager):
356 """
357 A :py:class:`multiprocessing.managers.BaseManager` for telemetry resources.
358
359 The OpenTelemetry Python SDK isn't fork-safe since telemetry sample buffers can be duplicated.
360
361 In addition, Python ≤3.12 doesn't call exit handlers for forked processes.
362 This causes the OpenTelemetry Python SDK to not flush telemetry before exiting.
363
364 * https://github.com/open-telemetry/opentelemetry-python/issues/4215
365 * https://github.com/open-telemetry/opentelemetry-python/issues/3307
366
367 Forking is multiprocessing's default start method for non-macOS POSIX systems until Python 3.14.
368
369 * https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
370
371 To fully support multiprocessing, resampling + publishing is handled by
372 a single process that's (ideally) a child of (i.e. directly under) the main process. This:
373
374 * Relieves other processes of this work.
375
376 * Avoids issues with duplicate samples when forking and unpublished samples when exiting forks.
377
378 * Allows cross-process resampling.
379 * Reuses a single connection pool to telemetry backends.
380
381 The downside is it essentially re-introduces global interpreter lock (GIL) with
382 additional IPC overhead. Telemetry operations, however, should be lightweight so
383 this isn't expected to be a problem. Remote data store latency should still be
384 the primary throughput limiter for storage clients.
385
386 :py:class:`multiprocessing.managers.BaseManager` is used for this since it creates
387 a separate server process for shared objects.
388
389 Telemetry resources are provided as
390 `proxy objects <https://docs.python.org/3/library/multiprocessing.html#proxy-objects>`_
391 for location transparency.
392
393 The documentation isn't particularly detailed, but others have written comprehensively on this:
394
395 * https://zpz.github.io/blog/python-mp-manager-1
396 * https://zpz.github.io/blog/python-mp-manager-2
397 * https://zpz.github.io/blog/python-mp-manager-3
398
399 By specification, metric and tracer providers must call shutdown on any
400 underlying metric readers + span processors + exporters.
401
402 * https://opentelemetry.io/docs/specs/otel/metrics/sdk#shutdown
403 * https://opentelemetry.io/docs/specs/otel/trace/sdk#shutdown
404
405 In the OpenTelemetry Python SDK, provider shutdown is called automatically
406 by exit handlers (when they work at least). Consequently, clients should:
407
408 * Only receive proxy objects.
409
410 * Enables metric reader + span processor + exporter re-use across processes.
411
412 * Never call shutdown on the proxy objects.
413
414 * The shutdown exit handler is registered on the manager's server process.
415 * ⚠️ We expect a finite number of providers (i.e. no dynamic configs) so we don't leak them.
416 """
417
418 pass
419
420
421def _fully_qualified_name(c: type[Any]) -> str:
422 """
423 Return the fully qualified name for a class (e.g. ``module.Class``).
424
425 For :py:class:`multiprocessing.Manager` type IDs.
426 """
427 return ".".join([c.__module__, c.__qualname__])
428
429
430# Metrics proxy object setup.
431TelemetryManager.register(typeid=_fully_qualified_name(api_metrics._Gauge))
432TelemetryManager.register(typeid=_fully_qualified_name(api_metrics.Counter))
433TelemetryManager.register(
434 typeid=_fully_qualified_name(api_metrics.Meter),
435 method_to_typeid={
436 api_metrics.Meter.create_gauge.__name__: _fully_qualified_name(api_metrics._Gauge),
437 api_metrics.Meter.create_counter.__name__: _fully_qualified_name(api_metrics.Counter),
438 },
439)
440TelemetryManager.register(
441 typeid=_fully_qualified_name(api_metrics.MeterProvider),
442 method_to_typeid={api_metrics.MeterProvider.get_meter.__name__: _fully_qualified_name(api_metrics.Meter)},
443)
444
445# Traces proxy object setup.
446TelemetryManager.register(
447 typeid=_fully_qualified_name(api_trace.Span),
448 # Non-public methods (i.e. ones starting with a ``_``) are omitted by default.
449 #
450 # We need ``__enter__`` and ``__exit__`` so the ``Span`` can be used as a ``ContextManager``.
451 exposed=[name for name, _ in inspect.getmembers(api_trace.Span, predicate=inspect.isfunction)],
452 method_to_typeid={api_trace.Span.__enter__.__name__: _fully_qualified_name(api_trace.Span)},
453)
454TelemetryManager.register(
455 typeid=_fully_qualified_name(api_trace.Tracer),
456 # Can't proxy ``Tracer.start_as_current_span`` since it returns a generator (not pickleable)
457 # and tries to use the process-local global context (in this case, the manager's server process).
458 #
459 # Instead, spans should be constructed by:
460 #
461 # 1. Calling ``opentelemetry.context.get_current()`` to get the process-local global context (pickleable).
462 # 2. Creating a new span with the process-local global context.
463 # 3. Calling ``opentelemetry.trace.use_span()`` with the span and ``end_on_exit=True``.
464 method_to_typeid={api_trace.Tracer.start_span.__name__: _fully_qualified_name(api_trace.Span)},
465)
466TelemetryManager.register(
467 typeid=_fully_qualified_name(api_trace.TracerProvider),
468 method_to_typeid={api_trace.TracerProvider.get_tracer.__name__: _fully_qualified_name(api_trace.Tracer)},
469)
470
471# Telemetry proxy object setup.
472#
473# This should be the only registered type with a ``callable``.
474# It's the only type we create directly with a ``TelemetryManager``.
475TelemetryManager.register(
476 typeid=Telemetry.__name__,
477 callable=_init,
478 method_to_typeid={
479 Telemetry.meter_provider.__name__: _fully_qualified_name(api_metrics.MeterProvider),
480 Telemetry.meter.__name__: _fully_qualified_name(api_metrics.Meter),
481 Telemetry.gauge.__name__: _fully_qualified_name(api_metrics._Gauge),
482 Telemetry.counter.__name__: _fully_qualified_name(api_metrics.Counter),
483 Telemetry.tracer_provider.__name__: _fully_qualified_name(api_trace.TracerProvider),
484 Telemetry.tracer.__name__: _fully_qualified_name(api_trace.Tracer),
485 },
486)
487
488
489# Map of init options as a sorted JSON string (since dictionaries can't be hashed) to telemetry proxy.
490_TELEMETRY_PROXIES: dict[str, Telemetry] = {}
491# To share :py:class:`Telemetry` proxy objects within a process (e.g. client, server).
492#
493# Forking isn't expected to happen while this is held (may lead to a deadlock).
494_TELEMETRY_PROXIES_LOCK = threading.Lock()
495
496
497def _reinitialize_locks_after_fork() -> None:
498 """
499 Reinitialize telemetry locks after fork to prevent deadlocks.
500
501 This function is called automatically after a fork to reinitialize all locks.
502 Caches and instances are kept as they may still be valid, but locks must be
503 fresh to avoid deadlocks.
504 """
505 global _TELEMETRY, _TELEMETRY_LOCK, _TELEMETRY_PROXIES_LOCK
506
507 _TELEMETRY_LOCK = threading.Lock()
508 _TELEMETRY_PROXIES_LOCK = threading.Lock()
509
510 # Reinitialize LOCAL mode telemetry instance locks if it exists
511 if _TELEMETRY is not None:
512 _TELEMETRY._reinitialize_instance_locks_after_fork()
513
514
515if hasattr(os, "register_at_fork"):
516 os.register_at_fork(after_in_child=_reinitialize_locks_after_fork)
517
518
[docs]
519class TelemetryMode(enum.Enum):
520 """
521 How to create a :py:class:`Telemetry` object.
522 """
523
524 #: Keep everything local to the process (not fork-safe).
525 LOCAL = "local"
526 #: Start + connect to a telemetry IPC server.
527 SERVER = "server"
528 #: Connect to a telemetry IPC server.
529 CLIENT = "client"
530
531
532def _telemetry_proxies_key(
533 mode: Literal[TelemetryMode.SERVER, TelemetryMode.CLIENT], address: Union[str, tuple[str, int]]
534) -> str:
535 """
536 Get the key for the _TELEMETRY_PROXIES dictionary.
537 """
538 init_options = {"mode": mode.value, "address": address}
539 return json.dumps(init_options, sort_keys=True)
540
541
542def _telemetry_manager_server_port(process_id: int) -> int:
543 """
544 Get the default telemetry manager server port.
545
546 This is PID-based to:
547
548 * Avoid collisions between multiple independent Python interpreters running on the same machine.
549 * Let child processes deterministically find their parent's telemetry manager server.
550
551 :param process_id: Process ID used to calculate the port.
552 """
553
554 # Use the dynamic/private/ephemeral port range.
555 #
556 # https://www.rfc-editor.org/rfc/rfc6335.html#section-6
557 # https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Dynamic,_private_or_ephemeral_ports
558 #
559 # Modulo the parent/child process PID by the port range length, then add the initial offset.
560 return (2**15 + 2**14) + (process_id % ((2**16) - (2**15 + 2**14)))
561
562
563def _init_server(address: Optional[Union[str, tuple[str, int]]] = None) -> Telemetry:
564 """
565 Start + connect to a telemetry IPC server.
566 """
567 global _TELEMETRY_PROXIES
568 global _TELEMETRY_PROXIES_LOCK
569
570 address = address or ("127.0.0.1", _telemetry_manager_server_port(process_id=psutil.Process().pid))
571 telemetry_proxies_key = _telemetry_proxies_key(mode=TelemetryMode.SERVER, address=address)
572
573 with _TELEMETRY_PROXIES_LOCK:
574 if telemetry_proxies_key in _TELEMETRY_PROXIES:
575 return _TELEMETRY_PROXIES[telemetry_proxies_key]
576 else:
577 telemetry_manager = TelemetryManager(
578 address=address,
579 # Use spawn instead of the platform-specific default (may be fork) to avoid aforementioned issues with fork.
580 ctx=multiprocessing.get_context(method="spawn"),
581 )
582
583 logger.debug(f"Creating telemetry manager server at {telemetry_manager.address}.")
584 try:
585 telemetry_manager.start()
586 atexit.register(telemetry_manager.shutdown)
587 logger.debug(f"Started telemetry manager server at {telemetry_manager.address}.")
588 except Exception as e:
589 logger.debug(
590 f"Failed to create telemetry manager server at {telemetry_manager.address}!", exc_info=True
591 )
592 raise e
593
594 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.")
595 try:
596 telemetry_manager.connect()
597 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.")
598 return _TELEMETRY_PROXIES.setdefault(telemetry_proxies_key, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue]
599 except Exception as e:
600 logger.debug(
601 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!", exc_info=True
602 )
603 raise e
604
605
606def _init_client(address: Optional[Union[str, tuple[str, int]]] = None) -> Telemetry:
607 """
608 Connect to a telemetry IPC server.
609 """
610 global _TELEMETRY_PROXIES
611 global _TELEMETRY_PROXIES_LOCK
612
613 candidate_addresses: list[Union[str, tuple[str, int]]] = []
614
615 if address is not None:
616 candidate_addresses = [address]
617 else:
618 current_process = psutil.Process()
619 # Python processes from leaf to root.
620 python_process_ancestry: list[psutil.Process] = [
621 current_process,
622 # Try the default telemetry manager server port for all ancestor process IDs.
623 #
624 # psutil is used since multiprocessing only exposes the parent process.
625 #
626 # We can't use `itertools.takewhile(lambda ancestor_process: ancestor_process.name() == current_process.name(), ...)`
627 # to limit ourselves to ancestor Python processes by process name since some may not be named
628 # `python{optional version}` in some cases (e.g. may be named `pytest`).
629 *current_process.parents(),
630 ]
631 # Try to connect from leaf to root.
632 candidate_addresses = [
633 ("127.0.0.1", _telemetry_manager_server_port(process_id=process.pid)) for process in python_process_ancestry
634 ]
635
636 for candidate_address in candidate_addresses:
637 telemetry_proxies_key = _telemetry_proxies_key(mode=TelemetryMode.CLIENT, address=candidate_address)
638
639 with _TELEMETRY_PROXIES_LOCK:
640 if telemetry_proxies_key in _TELEMETRY_PROXIES:
641 return _TELEMETRY_PROXIES[telemetry_proxies_key]
642 else:
643 telemetry_manager = TelemetryManager(address=candidate_address)
644
645 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.")
646 try:
647 telemetry_manager.connect()
648 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.")
649 return _TELEMETRY_PROXIES.setdefault(telemetry_proxies_key, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue]
650 except Exception:
651 logger.debug(
652 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!",
653 exc_info=True,
654 )
655
656 raise ConnectionError(f"Failed to connect to telemetry manager server at any of {candidate_addresses}!")
657
658
[docs]
659def init(
660 mode: Optional[TelemetryMode] = None,
661 address: Optional[Union[str, tuple[str, int]]] = None,
662) -> Telemetry:
663 """
664 Create or return an existing :py:class:`Telemetry` instance or :py:class:`Telemetry` proxy object.
665
666 :param mode: How to create a :py:class:`Telemetry` object. If ``None``, the default heuristic chooses a mode based on the presence of telemetry IPC servers in the process tree.
667 :param address: Telemetry IPC server address. Passed directly to a :py:class:`multiprocessing.managers.BaseManager`. Ignored if the mode is :py:const:`TelemetryMode.LOCAL`.
668 :return: A telemetry instance.
669 """
670
671 if mode is None:
672 # Main process.
673 if multiprocessing.parent_process() is None:
674 # Daemons can't have child processes.
675 #
676 # Try to create a telemetry instance in local mode.
677 #
678 # ⚠️ This may cause CPU contention if the current process is compute-intensive
679 # and a high collect and/or export frequency is used due to global interpreter lock (GIL).
680 if multiprocessing.current_process().daemon:
681 return _init()
682 # Start + connect to a telemetry IPC server.
683 else:
684 return _init_server(address=address)
685 # Child process.
686 else:
687 # Connect to a telemetry IPC server.
688 try:
689 return _init_client(address=address)
690 # No existing telemetry IPC server.
691 except ConnectionError:
692 # Daemons can't have child processes.
693 #
694 # Try to create a telemetry instance in local mode.
695 #
696 # ⚠️ This may cause CPU contention if the current process is compute-intensive
697 # and a high collect and/or export frequency is used due to global interpreter lock (GIL).
698 if multiprocessing.current_process().daemon:
699 return _init()
700 # Start + connect to a telemetry IPC server.
701 else:
702 return _init_server(address=address)
703 elif mode == TelemetryMode.LOCAL:
704 return _init()
705 elif mode == TelemetryMode.SERVER:
706 return _init_server(address=address)
707 elif mode == TelemetryMode.CLIENT:
708 return _init_client(address=address)
709 else:
710 raise ValueError(f"Unsupported telemetry mode: {mode}")