1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import atexit
17import enum
18import inspect
19import json
20import logging
21import multiprocessing
22import multiprocessing.managers
23import os
24import threading
25from typing import Any, Literal, Optional, Union
26
27import opentelemetry.metrics as api_metrics
28import opentelemetry.trace as api_trace
29import psutil
30
31from .. import utils
32
33# MSC telemetry prefers publishing raw samples when possible to support arbitrary post-hoc aggregations.
34#
35# Some setups, however, may need resampling to reduce sample volume. The resampling methods we use
36# sacrifice temporal resolution to preserve other information. Which method is used depends on if
37# the expected post-hoc aggregate function is decomposable:
38#
39# * Decomposable aggregate functions (e.g. count, sum, min, max).
40# * Use client-side aggregation.
41# * E.g. preserve request + response counts.
42# * Non-decomposable aggregate functions (e.g. average, percentile).
43# * Use decimation by an integer factor or last value.
44# * E.g. preserve the shape of the latency distribution (unlike tail sampling).
45
46_METRICS_EXPORTER_MAPPING = {
47 "console": "opentelemetry.sdk.metrics.export.ConsoleMetricExporter",
48 "otlp": "opentelemetry.exporter.otlp.proto.http.metric_exporter.OTLPMetricExporter",
49 # "Private" until it's decided whether this will be official.
50 "_otlp_msal": "multistorageclient.telemetry.metrics.exporters.otlp_msal._OTLPMSALMetricExporter",
51 "_otlp_mtls_vault": "multistorageclient.telemetry.metrics.exporters.otlp_mtls_vault._OTLPmTLSVaultMetricExporter",
52}
53
54_TRACE_EXPORTER_MAPPING = {
55 "console": "opentelemetry.sdk.trace.export.ConsoleSpanExporter",
56 "otlp": "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter",
57 # "Private" until it's decided whether this will be official.
58 "_otlp_msal": "multistorageclient.telemetry.traces.exporters.otlp_msal._OTLPMSALSpanExporter",
59}
60
61logger = logging.getLogger(__name__)
62
63
[docs]
64class Telemetry:
65 """
66 Provides telemetry resources.
67
68 Instances shouldn't be copied between processes. Not fork-safe or pickleable.
69
70 Instances can be shared between processes by registering with a :py:class:`multiprocessing.managers.BaseManager` and using proxy objects.
71 """
72
73 # Metrics are named `multistorageclient.{property}(.{aggregation})?`.
74 #
75 # For example:
76 #
77 # - multistorageclient.data_size
78 # - Gauge for data size per individual operation.
79 # - For distributions (e.g. post-hoc histograms + heatmaps).
80 # - multistorageclient.data_size.sum
81 # - Counter (sum) for data size across all operations.
82 # - For aggregates (e.g. post-hoc data rate calculations).
83
84 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics
85 class GaugeName(enum.Enum):
86 LATENCY = "multistorageclient.latency"
87 DATA_SIZE = "multistorageclient.data_size"
88 DATA_RATE = "multistorageclient.data_rate"
89
90 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units
91 _GAUGE_UNIT_MAPPING: dict[GaugeName, str] = {
92 # Seconds.
93 GaugeName.LATENCY: "s",
94 # Bytes.
95 GaugeName.DATA_SIZE: "By",
96 # Bytes/second.
97 GaugeName.DATA_RATE: "By/s",
98 }
99
100 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics
101 class CounterName(enum.Enum):
102 REQUEST_SUM = "multistorageclient.request.sum"
103 RESPONSE_SUM = "multistorageclient.response.sum"
104 DATA_SIZE_SUM = "multistorageclient.data_size.sum"
105
106 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units
107 _COUNTER_UNIT_MAPPING: dict[CounterName, str] = {
108 # Unitless.
109 CounterName.REQUEST_SUM: "{request}",
110 # Unitless.
111 CounterName.RESPONSE_SUM: "{response}",
112 # Bytes.
113 CounterName.DATA_SIZE_SUM: "By",
114 }
115
116 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter provider.
117 _meter_provider_cache: dict[str, api_metrics.MeterProvider]
118 _meter_provider_cache_lock: threading.Lock
119 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter.
120 _meter_cache: dict[str, api_metrics.Meter]
121 _meter_cache_lock: threading.Lock
122 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to gauge name to gauge.
123 _gauge_cache: dict[str, dict[GaugeName, api_metrics._Gauge]]
124 _gauge_cache_lock: threading.Lock
125 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to counter name to counter.
126 _counter_cache: dict[str, dict[CounterName, api_metrics.Counter]]
127 _counter_cache_lock: threading.Lock
128 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer provider.
129 _tracer_provider_cache: dict[str, api_trace.TracerProvider]
130 _tracer_provider_cache_lock: threading.Lock
131 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer.
132 _tracer_cache: dict[str, api_trace.Tracer]
133 _tracer_cache_lock: threading.Lock
134
135 def __init__(self):
136 self._meter_provider_cache = {}
137 self._meter_provider_cache_lock = threading.Lock()
138 self._meter_cache = {}
139 self._meter_cache_lock = threading.Lock()
140 self._gauge_cache = {}
141 self._gauge_cache_lock = threading.Lock()
142 self._counter_cache = {}
143 self._counter_cache_lock = threading.Lock()
144 self._tracer_provider_cache = {}
145 self._tracer_provider_cache_lock = threading.Lock()
146 self._tracer_cache = {}
147 self._tracer_cache_lock = threading.Lock()
148
149 def _reinitialize_instance_locks_after_fork(self) -> None:
150 """
151 Reinitialize internal locks after fork to prevent deadlocks.
152
153 This method reinitializes all internal locks. Caches are kept as they
154 may still be valid, but locks must be fresh to avoid deadlocks.
155 """
156 self._meter_provider_cache_lock = threading.Lock()
157 self._meter_cache_lock = threading.Lock()
158 self._gauge_cache_lock = threading.Lock()
159 self._counter_cache_lock = threading.Lock()
160 self._tracer_provider_cache_lock = threading.Lock()
161 self._tracer_cache_lock = threading.Lock()
162
163 def meter_provider(self, config: dict[str, Any]) -> Optional[api_metrics.MeterProvider]:
164 """
165 Create or return an existing :py:class:`api_metrics.MeterProvider` for a config.
166
167 :param config: ``.opentelemetry.metrics`` config dict.
168 :return: A :py:class:`api_metrics.MeterProvider` or ``None`` if no valid exporter is configured.
169 """
170 config_json = json.dumps(config, sort_keys=True)
171 with self._meter_provider_cache_lock:
172 if config_json in self._meter_provider_cache:
173 return self._meter_provider_cache[config_json]
174 else:
175 if "exporter" in config:
176 try:
177 import opentelemetry.sdk.metrics as sdk_metrics
178 import opentelemetry.sdk.metrics.export as sdk_metrics_export
179
180 from .metrics.readers.diperiodic_exporting import DiperiodicExportingMetricReader
181
182 exporter_type: str = config["exporter"]["type"]
183 exporter_fully_qualified_name = _METRICS_EXPORTER_MAPPING.get(exporter_type, exporter_type)
184 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1)
185 cls = utils.import_class(exporter_class_name, exporter_module_name)
186 exporter_options = config["exporter"].get("options", {}).copy()
187 exporter: sdk_metrics_export.MetricExporter = cls(**exporter_options)
188
189 reader_options = config.get("reader", {}).get("options", {})
190 reader: sdk_metrics_export.MetricReader = DiperiodicExportingMetricReader(
191 **reader_options, exporter=exporter
192 )
193
194 return self._meter_provider_cache.setdefault(
195 config_json, sdk_metrics.MeterProvider(metric_readers=[reader])
196 )
197 except (AttributeError, ImportError):
198 logger.error(
199 "Failed to import OpenTelemetry Python SDK or exporter! Disabling metrics.", exc_info=True
200 )
201 return None
202 else:
203 # Don't return a no-op meter provider to avoid unnecessary overhead.
204 logger.error("No exporter configured! Disabling metrics.")
205 return None
206
207 def meter(self, config: dict[str, Any]) -> Optional[api_metrics.Meter]:
208 """
209 Create or return an existing :py:class:`api_metrics.Meter` for a config.
210
211 :param config: ``.opentelemetry.metrics`` config dict.
212 :return: A :py:class:`api_metrics.Meter` or ``None`` if no valid exporter is configured.
213 """
214 config_json = json.dumps(config, sort_keys=True)
215 with self._meter_cache_lock:
216 if config_json in self._meter_cache:
217 return self._meter_cache[config_json]
218 else:
219 meter_provider = self.meter_provider(config=config)
220 if meter_provider is None:
221 return None
222 else:
223 return self._meter_cache.setdefault(
224 config_json, meter_provider.get_meter(name="multistorageclient")
225 )
226
227 def gauge(self, config: dict[str, Any], name: GaugeName) -> Optional[api_metrics._Gauge]:
228 """
229 Create or return an existing :py:class:`api_metrics.Gauge` for a config and gauge name.
230
231 :param config: ``.opentelemetry.metrics`` config dict.
232 :return: A :py:class:`api_metrics.Gauge` or ``None`` if no valid exporter is configured.
233 """
234 config_json = json.dumps(config, sort_keys=True)
235 with self._gauge_cache_lock:
236 if config_json in self._gauge_cache and name in self._gauge_cache[config_json]:
237 return self._gauge_cache[config_json][name]
238 else:
239 meter = self.meter(config=config)
240 if meter is None:
241 return None
242 else:
243 return self._gauge_cache.setdefault(config_json, {}).setdefault(
244 name,
245 meter.create_gauge(name=name.value, unit=Telemetry._GAUGE_UNIT_MAPPING.get(name, "")),
246 )
247
248 def counter(self, config: dict[str, Any], name: CounterName) -> Optional[api_metrics.Counter]:
249 """
250 Create or return an existing :py:class:`api_metrics.Counter` for a config and counter name.
251
252 :param config: ``.opentelemetry.metrics`` config dict.
253 :return: A :py:class:`api_metrics.Counter` or ``None`` if no valid exporter is configured.
254 """
255 config_json = json.dumps(config, sort_keys=True)
256 with self._counter_cache_lock:
257 if config_json in self._counter_cache and name in self._counter_cache[config_json]:
258 return self._counter_cache[config_json][name]
259 else:
260 meter = self.meter(config=config)
261 if meter is None:
262 return None
263 else:
264 return self._counter_cache.setdefault(config_json, {}).setdefault(
265 name,
266 meter.create_counter(name=name.value, unit=Telemetry._COUNTER_UNIT_MAPPING.get(name, "")),
267 )
268
269 def tracer_provider(self, config: dict[str, Any]) -> Optional[api_trace.TracerProvider]:
270 """
271 Create or return an existing :py:class:`api_trace.TracerProvider` for a config.
272
273 :param config: ``.opentelemetry.traces`` config dict.
274 :return: A :py:class:`api_trace.TracerProvider` or ``None`` if no valid exporter is configured.
275 """
276 config_json = json.dumps(config, sort_keys=True)
277 with self._tracer_provider_cache_lock:
278 if config_json in self._tracer_provider_cache:
279 return self._tracer_provider_cache[config_json]
280 else:
281 if "exporter" in config:
282 try:
283 import opentelemetry.sdk.trace as sdk_trace
284 import opentelemetry.sdk.trace.export as sdk_trace_export
285 import opentelemetry.sdk.trace.sampling as sdk_trace_sampling
286
287 exporter_type: str = config["exporter"]["type"]
288 exporter_fully_qualified_name = _TRACE_EXPORTER_MAPPING.get(exporter_type, exporter_type)
289 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1)
290 cls = utils.import_class(exporter_class_name, exporter_module_name)
291 exporter_options = config["exporter"].get("options", {})
292 exporter: sdk_trace_export.SpanExporter = cls(**exporter_options)
293
294 processor: sdk_trace.SpanProcessor = sdk_trace.SynchronousMultiSpanProcessor()
295 processor.add_span_processor(sdk_trace_export.BatchSpanProcessor(span_exporter=exporter))
296
297 # TODO: Add sampler to configuration schema.
298 sampler: sdk_trace_sampling.Sampler = sdk_trace_sampling.ALWAYS_ON
299
300 return self._tracer_provider_cache.setdefault(
301 config_json,
302 sdk_trace.TracerProvider(active_span_processor=processor, sampler=sampler),
303 )
304 except (AttributeError, ImportError):
305 logger.error(
306 "Failed to import OpenTelemetry Python SDK or exporter! Disabling traces.", exc_info=True
307 )
308 return None
309 else:
310 logger.error("No exporter configured! Disabling traces.")
311 return None
312
313 def tracer(self, config: dict[str, Any]) -> Optional[api_trace.Tracer]:
314 """
315 Create or return an existing :py:class:`api_trace.Tracer` for a config.
316
317 :param config: ``.opentelemetry.traces`` config dict.
318 :return: A :py:class:`api_trace.Tracer` or ``None`` if no valid exporter is configured.
319 """
320 config_json = json.dumps(config, sort_keys=True)
321 with self._tracer_cache_lock:
322 if config_json in self._tracer_cache:
323 return self._tracer_cache[config_json]
324 else:
325 tracer_provider = self.tracer_provider(config=config)
326 if tracer_provider is None:
327 return None
328 else:
329 return self._tracer_cache.setdefault(
330 config_json, tracer_provider.get_tracer(instrumenting_module_name="multistorageclient")
331 )
332
333
334# To share a single :py:class:`Telemetry` within a process (e.g. local, manager).
335#
336# A manager's server processes shouldn't be forked, so this should be safe.
337_TELEMETRY: Optional[Telemetry] = None
338_TELEMETRY_LOCK = threading.Lock()
339
340
341def _init() -> Telemetry:
342 """
343 Create or return an existing :py:class:`Telemetry`.
344
345 :return: A telemetry instance.
346 """
347 global _TELEMETRY
348 global _TELEMETRY_LOCK
349
350 with _TELEMETRY_LOCK:
351 if _TELEMETRY is None:
352 _TELEMETRY = Telemetry()
353 return _TELEMETRY
354
355
[docs]
356class TelemetryManager(multiprocessing.managers.BaseManager):
357 """
358 A :py:class:`multiprocessing.managers.BaseManager` for telemetry resources.
359
360 The OpenTelemetry Python SDK isn't fork-safe since telemetry sample buffers can be duplicated.
361
362 In addition, Python ≤3.12 doesn't call exit handlers for forked processes.
363 This causes the OpenTelemetry Python SDK to not flush telemetry before exiting.
364
365 * https://github.com/open-telemetry/opentelemetry-python/issues/4215
366 * https://github.com/open-telemetry/opentelemetry-python/issues/3307
367
368 Forking is multiprocessing's default start method for non-macOS POSIX systems until Python 3.14.
369
370 * https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
371
372 To fully support multiprocessing, resampling + publishing is handled by
373 a single process that's (ideally) a child of (i.e. directly under) the main process. This:
374
375 * Relieves other processes of this work.
376
377 * Avoids issues with duplicate samples when forking and unpublished samples when exiting forks.
378
379 * Allows cross-process resampling.
380 * Reuses a single connection pool to telemetry backends.
381
382 The downside is it essentially re-introduces global interpreter lock (GIL) with
383 additional IPC overhead. Telemetry operations, however, should be lightweight so
384 this isn't expected to be a problem. Remote data store latency should still be
385 the primary throughput limiter for storage clients.
386
387 :py:class:`multiprocessing.managers.BaseManager` is used for this since it creates
388 a separate server process for shared objects.
389
390 Telemetry resources are provided as
391 `proxy objects <https://docs.python.org/3/library/multiprocessing.html#proxy-objects>`_
392 for location transparency.
393
394 The documentation isn't particularly detailed, but others have written comprehensively on this:
395
396 * https://zpz.github.io/blog/python-mp-manager-1
397 * https://zpz.github.io/blog/python-mp-manager-2
398 * https://zpz.github.io/blog/python-mp-manager-3
399
400 By specification, metric and tracer providers must call shutdown on any
401 underlying metric readers + span processors + exporters.
402
403 * https://opentelemetry.io/docs/specs/otel/metrics/sdk#shutdown
404 * https://opentelemetry.io/docs/specs/otel/trace/sdk#shutdown
405
406 In the OpenTelemetry Python SDK, provider shutdown is called automatically
407 by exit handlers (when they work at least). Consequently, clients should:
408
409 * Only receive proxy objects.
410
411 * Enables metric reader + span processor + exporter re-use across processes.
412
413 * Never call shutdown on the proxy objects.
414
415 * The shutdown exit handler is registered on the manager's server process.
416 * ⚠️ We expect a finite number of providers (i.e. no dynamic configs) so we don't leak them.
417 """
418
419 pass
420
421
422def _fully_qualified_name(c: type[Any]) -> str:
423 """
424 Return the fully qualified name for a class (e.g. ``module.Class``).
425
426 For :py:class:`multiprocessing.Manager` type IDs.
427 """
428 return ".".join([c.__module__, c.__qualname__])
429
430
431# Metrics proxy object setup.
432TelemetryManager.register(typeid=_fully_qualified_name(api_metrics._Gauge))
433TelemetryManager.register(typeid=_fully_qualified_name(api_metrics.Counter))
434TelemetryManager.register(
435 typeid=_fully_qualified_name(api_metrics.Meter),
436 method_to_typeid={
437 api_metrics.Meter.create_gauge.__name__: _fully_qualified_name(api_metrics._Gauge),
438 api_metrics.Meter.create_counter.__name__: _fully_qualified_name(api_metrics.Counter),
439 },
440)
441TelemetryManager.register(
442 typeid=_fully_qualified_name(api_metrics.MeterProvider),
443 method_to_typeid={api_metrics.MeterProvider.get_meter.__name__: _fully_qualified_name(api_metrics.Meter)},
444)
445
446# Traces proxy object setup.
447TelemetryManager.register(
448 typeid=_fully_qualified_name(api_trace.Span),
449 # Non-public methods (i.e. ones starting with a ``_``) are omitted by default.
450 #
451 # We need ``__enter__`` and ``__exit__`` so the ``Span`` can be used as a ``ContextManager``.
452 exposed=[name for name, _ in inspect.getmembers(api_trace.Span, predicate=inspect.isfunction)],
453 method_to_typeid={api_trace.Span.__enter__.__name__: _fully_qualified_name(api_trace.Span)},
454)
455TelemetryManager.register(
456 typeid=_fully_qualified_name(api_trace.Tracer),
457 # Can't proxy ``Tracer.start_as_current_span`` since it returns a generator (not pickleable)
458 # and tries to use the process-local global context (in this case, the manager's server process).
459 #
460 # Instead, spans should be constructed by:
461 #
462 # 1. Calling ``opentelemetry.context.get_current()`` to get the process-local global context (pickleable).
463 # 2. Creating a new span with the process-local global context.
464 # 3. Calling ``opentelemetry.trace.use_span()`` with the span and ``end_on_exit=True``.
465 method_to_typeid={api_trace.Tracer.start_span.__name__: _fully_qualified_name(api_trace.Span)},
466)
467TelemetryManager.register(
468 typeid=_fully_qualified_name(api_trace.TracerProvider),
469 method_to_typeid={api_trace.TracerProvider.get_tracer.__name__: _fully_qualified_name(api_trace.Tracer)},
470)
471
472# Telemetry proxy object setup.
473#
474# This should be the only registered type with a ``callable``.
475# It's the only type we create directly with a ``TelemetryManager``.
476TelemetryManager.register(
477 typeid=Telemetry.__name__,
478 callable=_init,
479 method_to_typeid={
480 Telemetry.meter_provider.__name__: _fully_qualified_name(api_metrics.MeterProvider),
481 Telemetry.meter.__name__: _fully_qualified_name(api_metrics.Meter),
482 Telemetry.gauge.__name__: _fully_qualified_name(api_metrics._Gauge),
483 Telemetry.counter.__name__: _fully_qualified_name(api_metrics.Counter),
484 Telemetry.tracer_provider.__name__: _fully_qualified_name(api_trace.TracerProvider),
485 Telemetry.tracer.__name__: _fully_qualified_name(api_trace.Tracer),
486 },
487)
488
489
490# Map of init options as a sorted JSON string (since dictionaries can't be hashed) to telemetry proxy.
491_TELEMETRY_PROXIES: dict[str, Telemetry] = {}
492# To share :py:class:`Telemetry` proxy objects within a process (e.g. client, server).
493#
494# Forking isn't expected to happen while this is held (may lead to a deadlock).
495_TELEMETRY_PROXIES_LOCK = threading.Lock()
496
497
498def _reinitialize_locks_after_fork() -> None:
499 """
500 Reinitialize telemetry locks after fork to prevent deadlocks.
501
502 This function is called automatically after a fork to reinitialize all locks.
503 Caches and instances are kept as they may still be valid, but locks must be
504 fresh to avoid deadlocks.
505 """
506 global _TELEMETRY, _TELEMETRY_LOCK, _TELEMETRY_PROXIES_LOCK
507
508 _TELEMETRY_LOCK = threading.Lock()
509 _TELEMETRY_PROXIES_LOCK = threading.Lock()
510
511 # Reinitialize LOCAL mode telemetry instance locks if it exists
512 if _TELEMETRY is not None:
513 _TELEMETRY._reinitialize_instance_locks_after_fork()
514
515
516if hasattr(os, "register_at_fork"):
517 os.register_at_fork(after_in_child=_reinitialize_locks_after_fork)
518
519
[docs]
520class TelemetryMode(enum.Enum):
521 """
522 How to create a :py:class:`Telemetry` object.
523 """
524
525 #: Keep everything local to the process (not fork-safe).
526 LOCAL = "local"
527 #: Start + connect to a telemetry IPC server.
528 SERVER = "server"
529 #: Connect to a telemetry IPC server.
530 CLIENT = "client"
531
532
533def _telemetry_proxies_key(
534 mode: Literal[TelemetryMode.SERVER, TelemetryMode.CLIENT], address: Union[str, tuple[str, int]]
535) -> str:
536 """
537 Get the key for the _TELEMETRY_PROXIES dictionary.
538 """
539 init_options = {"mode": mode.value, "address": address}
540 return json.dumps(init_options, sort_keys=True)
541
542
543def _telemetry_manager_server_port(process_id: int) -> int:
544 """
545 Get the default telemetry manager server port.
546
547 This is PID-based to:
548
549 * Avoid collisions between multiple independent Python interpreters running on the same machine.
550 * Let child processes deterministically find their parent's telemetry manager server.
551
552 :param process_id: Process ID used to calculate the port.
553 """
554
555 # Use the dynamic/private/ephemeral port range.
556 #
557 # https://www.rfc-editor.org/rfc/rfc6335.html#section-6
558 # https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Dynamic,_private_or_ephemeral_ports
559 #
560 # Modulo the parent/child process PID by the port range length, then add the initial offset.
561 return (2**15 + 2**14) + (process_id % ((2**16) - (2**15 + 2**14)))
562
563
564def _init_server(address: Optional[Union[str, tuple[str, int]]] = None) -> Telemetry:
565 """
566 Start + connect to a telemetry IPC server.
567 """
568 global _TELEMETRY_PROXIES
569 global _TELEMETRY_PROXIES_LOCK
570
571 address = address or ("127.0.0.1", _telemetry_manager_server_port(process_id=psutil.Process().pid))
572 telemetry_proxies_key = _telemetry_proxies_key(mode=TelemetryMode.SERVER, address=address)
573
574 with _TELEMETRY_PROXIES_LOCK:
575 if telemetry_proxies_key in _TELEMETRY_PROXIES:
576 return _TELEMETRY_PROXIES[telemetry_proxies_key]
577 else:
578 telemetry_manager = TelemetryManager(
579 address=address,
580 # Use spawn instead of the platform-specific default (may be fork) to avoid aforementioned issues with fork.
581 ctx=multiprocessing.get_context(method="spawn"),
582 )
583
584 logger.debug(f"Creating telemetry manager server at {telemetry_manager.address}.")
585 try:
586 telemetry_manager.start()
587 atexit.register(telemetry_manager.shutdown)
588 logger.debug(f"Started telemetry manager server at {telemetry_manager.address}.")
589 except Exception as e:
590 logger.debug(
591 f"Failed to create telemetry manager server at {telemetry_manager.address}!", exc_info=True
592 )
593 raise e
594
595 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.")
596 try:
597 telemetry_manager.connect()
598 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.")
599 return _TELEMETRY_PROXIES.setdefault(telemetry_proxies_key, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue]
600 except Exception as e:
601 logger.debug(
602 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!", exc_info=True
603 )
604 raise e
605
606
607def _init_client(address: Optional[Union[str, tuple[str, int]]] = None) -> Telemetry:
608 """
609 Connect to a telemetry IPC server.
610 """
611 global _TELEMETRY_PROXIES
612 global _TELEMETRY_PROXIES_LOCK
613
614 candidate_addresses: list[Union[str, tuple[str, int]]] = []
615
616 if address is not None:
617 candidate_addresses = [address]
618 else:
619 current_process = psutil.Process()
620 # Python processes from leaf to root.
621 python_process_ancestry: list[psutil.Process] = [
622 current_process,
623 # Try the default telemetry manager server port for all ancestor process IDs.
624 #
625 # psutil is used since multiprocessing only exposes the parent process.
626 #
627 # We can't use `itertools.takewhile(lambda ancestor_process: ancestor_process.name() == current_process.name(), ...)`
628 # to limit ourselves to ancestor Python processes by process name since some may not be named
629 # `python{optional version}` in some cases (e.g. may be named `pytest`).
630 *current_process.parents(),
631 ]
632 # Try to connect from leaf to root.
633 candidate_addresses = [
634 ("127.0.0.1", _telemetry_manager_server_port(process_id=process.pid)) for process in python_process_ancestry
635 ]
636
637 for candidate_address in candidate_addresses:
638 telemetry_proxies_key = _telemetry_proxies_key(mode=TelemetryMode.CLIENT, address=candidate_address)
639
640 with _TELEMETRY_PROXIES_LOCK:
641 if telemetry_proxies_key in _TELEMETRY_PROXIES:
642 return _TELEMETRY_PROXIES[telemetry_proxies_key]
643 else:
644 telemetry_manager = TelemetryManager(address=candidate_address)
645
646 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.")
647 try:
648 telemetry_manager.connect()
649 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.")
650 return _TELEMETRY_PROXIES.setdefault(telemetry_proxies_key, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue]
651 except Exception:
652 logger.debug(
653 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!",
654 exc_info=True,
655 )
656
657 raise ConnectionError(f"Failed to connect to telemetry manager server at any of {candidate_addresses}!")
658
659
[docs]
660def init(
661 mode: Optional[TelemetryMode] = None,
662 address: Optional[Union[str, tuple[str, int]]] = None,
663) -> Telemetry:
664 """
665 Create or return an existing :py:class:`Telemetry` instance or :py:class:`Telemetry` proxy object.
666
667 :param mode: How to create a :py:class:`Telemetry` object. If ``None``, the default heuristic chooses a mode based on the presence of telemetry IPC servers in the process tree.
668 :param address: Telemetry IPC server address. Passed directly to a :py:class:`multiprocessing.managers.BaseManager`. Ignored if the mode is :py:const:`TelemetryMode.LOCAL`.
669 :return: A telemetry instance.
670 """
671
672 if mode is None:
673 # Main process.
674 if multiprocessing.parent_process() is None:
675 # Daemons can't have child processes.
676 #
677 # Try to create a telemetry instance in local mode.
678 #
679 # ⚠️ This may cause CPU contention if the current process is compute-intensive
680 # and a high collect and/or export frequency is used due to global interpreter lock (GIL).
681 if multiprocessing.current_process().daemon:
682 return _init()
683 # Start + connect to a telemetry IPC server.
684 else:
685 return _init_server(address=address)
686 # Child process.
687 else:
688 # Connect to a telemetry IPC server.
689 try:
690 return _init_client(address=address)
691 # No existing telemetry IPC server.
692 except ConnectionError:
693 # Daemons can't have child processes.
694 #
695 # Try to create a telemetry instance in local mode.
696 #
697 # ⚠️ This may cause CPU contention if the current process is compute-intensive
698 # and a high collect and/or export frequency is used due to global interpreter lock (GIL).
699 if multiprocessing.current_process().daemon:
700 return _init()
701 # Start + connect to a telemetry IPC server.
702 else:
703 return _init_server(address=address)
704 elif mode == TelemetryMode.LOCAL:
705 return _init()
706 elif mode == TelemetryMode.SERVER:
707 return _init_server(address=address)
708 elif mode == TelemetryMode.CLIENT:
709 return _init_client(address=address)
710 else:
711 raise ValueError(f"Unsupported telemetry mode: {mode}")