Source code for multistorageclient.telemetry

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import atexit
 17import enum
 18import inspect
 19import json
 20import logging
 21import multiprocessing
 22import multiprocessing.managers
 23import os
 24import threading
 25from typing import Any, Literal, Optional, Union
 26
 27import opentelemetry.metrics as api_metrics
 28import opentelemetry.trace as api_trace
 29import psutil
 30
 31from .. import utils
 32
 33# MSC telemetry prefers publishing raw samples when possible to support arbitrary post-hoc aggregations.
 34#
 35# Some setups, however, may need resampling to reduce sample volume. The resampling methods we use
 36# sacrifice temporal resolution to preserve other information. Which method is used depends on if
 37# the expected post-hoc aggregate function is decomposable:
 38#
 39# * Decomposable aggregate functions (e.g. count, sum, min, max).
 40#   * Use client-side aggregation.
 41#     * E.g. preserve request + response counts.
 42# * Non-decomposable aggregate functions (e.g. average, percentile).
 43#   * Use decimation by an integer factor or last value.
 44#     * E.g. preserve the shape of the latency distribution (unlike tail sampling).
 45
 46_METRICS_EXPORTER_MAPPING = {
 47    "console": "opentelemetry.sdk.metrics.export.ConsoleMetricExporter",
 48    "otlp": "opentelemetry.exporter.otlp.proto.http.metric_exporter.OTLPMetricExporter",
 49    # "Private" until it's decided whether this will be official.
 50    "_otlp_msal": "multistorageclient.telemetry.metrics.exporters.otlp_msal._OTLPMSALMetricExporter",
 51}
 52
 53_TRACE_EXPORTER_MAPPING = {
 54    "console": "opentelemetry.sdk.trace.export.ConsoleSpanExporter",
 55    "otlp": "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter",
 56    # "Private" until it's decided whether this will be official.
 57    "_otlp_msal": "multistorageclient.telemetry.traces.exporters.otlp_msal._OTLPMSALSpanExporter",
 58}
 59
 60logger = logging.getLogger(__name__)
 61
 62
[docs] 63class Telemetry: 64 """ 65 Provides telemetry resources. 66 67 Instances shouldn't be copied between processes. Not fork-safe or pickleable. 68 69 Instances can be shared between processes by registering with a :py:class:`multiprocessing.managers.BaseManager` and using proxy objects. 70 """ 71 72 # Metrics are named `multistorageclient.{property}(.{aggregation})?`. 73 # 74 # For example: 75 # 76 # - multistorageclient.data_size 77 # - Gauge for data size per individual operation. 78 # - For distributions (e.g. post-hoc histograms + heatmaps). 79 # - multistorageclient.data_size.sum 80 # - Counter (sum) for data size across all operations. 81 # - For aggregates (e.g. post-hoc data rate calculations). 82 83 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics 84 class GaugeName(enum.Enum): 85 LATENCY = "multistorageclient.latency" 86 DATA_SIZE = "multistorageclient.data_size" 87 DATA_RATE = "multistorageclient.data_rate" 88 89 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units 90 _GAUGE_UNIT_MAPPING: dict[GaugeName, str] = { 91 # Seconds. 92 GaugeName.LATENCY: "s", 93 # Bytes. 94 GaugeName.DATA_SIZE: "By", 95 # Bytes/second. 96 GaugeName.DATA_RATE: "By/s", 97 } 98 99 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics 100 class CounterName(enum.Enum): 101 REQUEST_SUM = "multistorageclient.request.sum" 102 RESPONSE_SUM = "multistorageclient.response.sum" 103 DATA_SIZE_SUM = "multistorageclient.data_size.sum" 104 105 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units 106 _COUNTER_UNIT_MAPPING: dict[CounterName, str] = { 107 # Unitless. 108 CounterName.REQUEST_SUM: "{request}", 109 # Unitless. 110 CounterName.RESPONSE_SUM: "{response}", 111 # Bytes. 112 CounterName.DATA_SIZE_SUM: "By", 113 } 114 115 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter provider. 116 _meter_provider_cache: dict[str, api_metrics.MeterProvider] 117 _meter_provider_cache_lock: threading.Lock 118 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter. 119 _meter_cache: dict[str, api_metrics.Meter] 120 _meter_cache_lock: threading.Lock 121 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to gauge name to gauge. 122 _gauge_cache: dict[str, dict[GaugeName, api_metrics._Gauge]] 123 _gauge_cache_lock: threading.Lock 124 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to counter name to counter. 125 _counter_cache: dict[str, dict[CounterName, api_metrics.Counter]] 126 _counter_cache_lock: threading.Lock 127 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer provider. 128 _tracer_provider_cache: dict[str, api_trace.TracerProvider] 129 _tracer_provider_cache_lock: threading.Lock 130 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer. 131 _tracer_cache: dict[str, api_trace.Tracer] 132 _tracer_cache_lock: threading.Lock 133 134 def __init__(self): 135 self._meter_provider_cache = {} 136 self._meter_provider_cache_lock = threading.Lock() 137 self._meter_cache = {} 138 self._meter_cache_lock = threading.Lock() 139 self._gauge_cache = {} 140 self._gauge_cache_lock = threading.Lock() 141 self._counter_cache = {} 142 self._counter_cache_lock = threading.Lock() 143 self._tracer_provider_cache = {} 144 self._tracer_provider_cache_lock = threading.Lock() 145 self._tracer_cache = {} 146 self._tracer_cache_lock = threading.Lock() 147 148 def _reinitialize_instance_locks_after_fork(self) -> None: 149 """ 150 Reinitialize internal locks after fork to prevent deadlocks. 151 152 This method reinitializes all internal locks. Caches are kept as they 153 may still be valid, but locks must be fresh to avoid deadlocks. 154 """ 155 self._meter_provider_cache_lock = threading.Lock() 156 self._meter_cache_lock = threading.Lock() 157 self._gauge_cache_lock = threading.Lock() 158 self._counter_cache_lock = threading.Lock() 159 self._tracer_provider_cache_lock = threading.Lock() 160 self._tracer_cache_lock = threading.Lock() 161 162 def meter_provider(self, config: dict[str, Any]) -> Optional[api_metrics.MeterProvider]: 163 """ 164 Create or return an existing :py:class:`api_metrics.MeterProvider` for a config. 165 166 :param config: ``.opentelemetry.metrics`` config dict. 167 :return: A :py:class:`api_metrics.MeterProvider` or ``None`` if no valid exporter is configured. 168 """ 169 config_json = json.dumps(config, sort_keys=True) 170 with self._meter_provider_cache_lock: 171 if config_json in self._meter_provider_cache: 172 return self._meter_provider_cache[config_json] 173 else: 174 if "exporter" in config: 175 try: 176 import opentelemetry.sdk.metrics as sdk_metrics 177 import opentelemetry.sdk.metrics.export as sdk_metrics_export 178 179 from .metrics.readers.diperiodic_exporting import DiperiodicExportingMetricReader 180 181 exporter_type: str = config["exporter"]["type"] 182 exporter_fully_qualified_name = _METRICS_EXPORTER_MAPPING.get(exporter_type, exporter_type) 183 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1) 184 cls = utils.import_class(exporter_class_name, exporter_module_name) 185 exporter_options = config["exporter"].get("options", {}) 186 exporter: sdk_metrics_export.MetricExporter = cls(**exporter_options) 187 188 reader_options = config.get("reader", {}).get("options", {}) 189 reader: sdk_metrics_export.MetricReader = DiperiodicExportingMetricReader( 190 **reader_options, exporter=exporter 191 ) 192 193 return self._meter_provider_cache.setdefault( 194 config_json, sdk_metrics.MeterProvider(metric_readers=[reader]) 195 ) 196 except (AttributeError, ImportError): 197 logger.error( 198 "Failed to import OpenTelemetry Python SDK or exporter! Disabling metrics.", exc_info=True 199 ) 200 return None 201 else: 202 # Don't return a no-op meter provider to avoid unnecessary overhead. 203 logger.error("No exporter configured! Disabling metrics.") 204 return None 205 206 def meter(self, config: dict[str, Any]) -> Optional[api_metrics.Meter]: 207 """ 208 Create or return an existing :py:class:`api_metrics.Meter` for a config. 209 210 :param config: ``.opentelemetry.metrics`` config dict. 211 :return: A :py:class:`api_metrics.Meter` or ``None`` if no valid exporter is configured. 212 """ 213 config_json = json.dumps(config, sort_keys=True) 214 with self._meter_cache_lock: 215 if config_json in self._meter_cache: 216 return self._meter_cache[config_json] 217 else: 218 meter_provider = self.meter_provider(config=config) 219 if meter_provider is None: 220 return None 221 else: 222 return self._meter_cache.setdefault( 223 config_json, meter_provider.get_meter(name="multistorageclient") 224 ) 225 226 def gauge(self, config: dict[str, Any], name: GaugeName) -> Optional[api_metrics._Gauge]: 227 """ 228 Create or return an existing :py:class:`api_metrics.Gauge` for a config and gauge name. 229 230 :param config: ``.opentelemetry.metrics`` config dict. 231 :return: A :py:class:`api_metrics.Gauge` or ``None`` if no valid exporter is configured. 232 """ 233 config_json = json.dumps(config, sort_keys=True) 234 with self._gauge_cache_lock: 235 if config_json in self._gauge_cache and name in self._gauge_cache[config_json]: 236 return self._gauge_cache[config_json][name] 237 else: 238 meter = self.meter(config=config) 239 if meter is None: 240 return None 241 else: 242 return self._gauge_cache.setdefault(config_json, {}).setdefault( 243 name, 244 meter.create_gauge(name=name.value, unit=Telemetry._GAUGE_UNIT_MAPPING.get(name, "")), 245 ) 246 247 def counter(self, config: dict[str, Any], name: CounterName) -> Optional[api_metrics.Counter]: 248 """ 249 Create or return an existing :py:class:`api_metrics.Counter` for a config and counter name. 250 251 :param config: ``.opentelemetry.metrics`` config dict. 252 :return: A :py:class:`api_metrics.Counter` or ``None`` if no valid exporter is configured. 253 """ 254 config_json = json.dumps(config, sort_keys=True) 255 with self._counter_cache_lock: 256 if config_json in self._counter_cache and name in self._counter_cache[config_json]: 257 return self._counter_cache[config_json][name] 258 else: 259 meter = self.meter(config=config) 260 if meter is None: 261 return None 262 else: 263 return self._counter_cache.setdefault(config_json, {}).setdefault( 264 name, 265 meter.create_counter(name=name.value, unit=Telemetry._COUNTER_UNIT_MAPPING.get(name, "")), 266 ) 267 268 def tracer_provider(self, config: dict[str, Any]) -> Optional[api_trace.TracerProvider]: 269 """ 270 Create or return an existing :py:class:`api_trace.TracerProvider` for a config. 271 272 :param config: ``.opentelemetry.traces`` config dict. 273 :return: A :py:class:`api_trace.TracerProvider` or ``None`` if no valid exporter is configured. 274 """ 275 config_json = json.dumps(config, sort_keys=True) 276 with self._tracer_provider_cache_lock: 277 if config_json in self._tracer_provider_cache: 278 return self._tracer_provider_cache[config_json] 279 else: 280 if "exporter" in config: 281 try: 282 import opentelemetry.sdk.trace as sdk_trace 283 import opentelemetry.sdk.trace.export as sdk_trace_export 284 import opentelemetry.sdk.trace.sampling as sdk_trace_sampling 285 286 exporter_type: str = config["exporter"]["type"] 287 exporter_fully_qualified_name = _TRACE_EXPORTER_MAPPING.get(exporter_type, exporter_type) 288 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1) 289 cls = utils.import_class(exporter_class_name, exporter_module_name) 290 exporter_options = config["exporter"].get("options", {}) 291 exporter: sdk_trace_export.SpanExporter = cls(**exporter_options) 292 293 processor: sdk_trace.SpanProcessor = sdk_trace.SynchronousMultiSpanProcessor() 294 processor.add_span_processor(sdk_trace_export.BatchSpanProcessor(span_exporter=exporter)) 295 296 # TODO: Add sampler to configuration schema. 297 sampler: sdk_trace_sampling.Sampler = sdk_trace_sampling.ALWAYS_ON 298 299 return self._tracer_provider_cache.setdefault( 300 config_json, 301 sdk_trace.TracerProvider(active_span_processor=processor, sampler=sampler), 302 ) 303 except (AttributeError, ImportError): 304 logger.error( 305 "Failed to import OpenTelemetry Python SDK or exporter! Disabling traces.", exc_info=True 306 ) 307 return None 308 else: 309 logger.error("No exporter configured! Disabling traces.") 310 return None 311 312 def tracer(self, config: dict[str, Any]) -> Optional[api_trace.Tracer]: 313 """ 314 Create or return an existing :py:class:`api_trace.Tracer` for a config. 315 316 :param config: ``.opentelemetry.traces`` config dict. 317 :return: A :py:class:`api_trace.Tracer` or ``None`` if no valid exporter is configured. 318 """ 319 config_json = json.dumps(config, sort_keys=True) 320 with self._tracer_cache_lock: 321 if config_json in self._tracer_cache: 322 return self._tracer_cache[config_json] 323 else: 324 tracer_provider = self.tracer_provider(config=config) 325 if tracer_provider is None: 326 return None 327 else: 328 return self._tracer_cache.setdefault( 329 config_json, tracer_provider.get_tracer(instrumenting_module_name="multistorageclient") 330 )
331 332 333# To share a single :py:class:`Telemetry` within a process (e.g. local, manager). 334# 335# A manager's server processes shouldn't be forked, so this should be safe. 336_TELEMETRY: Optional[Telemetry] = None 337_TELEMETRY_LOCK = threading.Lock() 338 339 340def _init() -> Telemetry: 341 """ 342 Create or return an existing :py:class:`Telemetry`. 343 344 :return: A telemetry instance. 345 """ 346 global _TELEMETRY 347 global _TELEMETRY_LOCK 348 349 with _TELEMETRY_LOCK: 350 if _TELEMETRY is None: 351 _TELEMETRY = Telemetry() 352 return _TELEMETRY 353 354
[docs] 355class TelemetryManager(multiprocessing.managers.BaseManager): 356 """ 357 A :py:class:`multiprocessing.managers.BaseManager` for telemetry resources. 358 359 The OpenTelemetry Python SDK isn't fork-safe since telemetry sample buffers can be duplicated. 360 361 In addition, Python ≤3.12 doesn't call exit handlers for forked processes. 362 This causes the OpenTelemetry Python SDK to not flush telemetry before exiting. 363 364 * https://github.com/open-telemetry/opentelemetry-python/issues/4215 365 * https://github.com/open-telemetry/opentelemetry-python/issues/3307 366 367 Forking is multiprocessing's default start method for non-macOS POSIX systems until Python 3.14. 368 369 * https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods 370 371 To fully support multiprocessing, resampling + publishing is handled by 372 a single process that's (ideally) a child of (i.e. directly under) the main process. This: 373 374 * Relieves other processes of this work. 375 376 * Avoids issues with duplicate samples when forking and unpublished samples when exiting forks. 377 378 * Allows cross-process resampling. 379 * Reuses a single connection pool to telemetry backends. 380 381 The downside is it essentially re-introduces global interpreter lock (GIL) with 382 additional IPC overhead. Telemetry operations, however, should be lightweight so 383 this isn't expected to be a problem. Remote data store latency should still be 384 the primary throughput limiter for storage clients. 385 386 :py:class:`multiprocessing.managers.BaseManager` is used for this since it creates 387 a separate server process for shared objects. 388 389 Telemetry resources are provided as 390 `proxy objects <https://docs.python.org/3/library/multiprocessing.html#proxy-objects>`_ 391 for location transparency. 392 393 The documentation isn't particularly detailed, but others have written comprehensively on this: 394 395 * https://zpz.github.io/blog/python-mp-manager-1 396 * https://zpz.github.io/blog/python-mp-manager-2 397 * https://zpz.github.io/blog/python-mp-manager-3 398 399 By specification, metric and tracer providers must call shutdown on any 400 underlying metric readers + span processors + exporters. 401 402 * https://opentelemetry.io/docs/specs/otel/metrics/sdk#shutdown 403 * https://opentelemetry.io/docs/specs/otel/trace/sdk#shutdown 404 405 In the OpenTelemetry Python SDK, provider shutdown is called automatically 406 by exit handlers (when they work at least). Consequently, clients should: 407 408 * Only receive proxy objects. 409 410 * Enables metric reader + span processor + exporter re-use across processes. 411 412 * Never call shutdown on the proxy objects. 413 414 * The shutdown exit handler is registered on the manager's server process. 415 * ⚠️ We expect a finite number of providers (i.e. no dynamic configs) so we don't leak them. 416 """ 417 418 pass
419 420 421def _fully_qualified_name(c: type[Any]) -> str: 422 """ 423 Return the fully qualified name for a class (e.g. ``module.Class``). 424 425 For :py:class:`multiprocessing.Manager` type IDs. 426 """ 427 return ".".join([c.__module__, c.__qualname__]) 428 429 430# Metrics proxy object setup. 431TelemetryManager.register(typeid=_fully_qualified_name(api_metrics._Gauge)) 432TelemetryManager.register(typeid=_fully_qualified_name(api_metrics.Counter)) 433TelemetryManager.register( 434 typeid=_fully_qualified_name(api_metrics.Meter), 435 method_to_typeid={ 436 api_metrics.Meter.create_gauge.__name__: _fully_qualified_name(api_metrics._Gauge), 437 api_metrics.Meter.create_counter.__name__: _fully_qualified_name(api_metrics.Counter), 438 }, 439) 440TelemetryManager.register( 441 typeid=_fully_qualified_name(api_metrics.MeterProvider), 442 method_to_typeid={api_metrics.MeterProvider.get_meter.__name__: _fully_qualified_name(api_metrics.Meter)}, 443) 444 445# Traces proxy object setup. 446TelemetryManager.register( 447 typeid=_fully_qualified_name(api_trace.Span), 448 # Non-public methods (i.e. ones starting with a ``_``) are omitted by default. 449 # 450 # We need ``__enter__`` and ``__exit__`` so the ``Span`` can be used as a ``ContextManager``. 451 exposed=[name for name, _ in inspect.getmembers(api_trace.Span, predicate=inspect.isfunction)], 452 method_to_typeid={api_trace.Span.__enter__.__name__: _fully_qualified_name(api_trace.Span)}, 453) 454TelemetryManager.register( 455 typeid=_fully_qualified_name(api_trace.Tracer), 456 # Can't proxy ``Tracer.start_as_current_span`` since it returns a generator (not pickleable) 457 # and tries to use the process-local global context (in this case, the manager's server process). 458 # 459 # Instead, spans should be constructed by: 460 # 461 # 1. Calling ``opentelemetry.context.get_current()`` to get the process-local global context (pickleable). 462 # 2. Creating a new span with the process-local global context. 463 # 3. Calling ``opentelemetry.trace.use_span()`` with the span and ``end_on_exit=True``. 464 method_to_typeid={api_trace.Tracer.start_span.__name__: _fully_qualified_name(api_trace.Span)}, 465) 466TelemetryManager.register( 467 typeid=_fully_qualified_name(api_trace.TracerProvider), 468 method_to_typeid={api_trace.TracerProvider.get_tracer.__name__: _fully_qualified_name(api_trace.Tracer)}, 469) 470 471# Telemetry proxy object setup. 472# 473# This should be the only registered type with a ``callable``. 474# It's the only type we create directly with a ``TelemetryManager``. 475TelemetryManager.register( 476 typeid=Telemetry.__name__, 477 callable=_init, 478 method_to_typeid={ 479 Telemetry.meter_provider.__name__: _fully_qualified_name(api_metrics.MeterProvider), 480 Telemetry.meter.__name__: _fully_qualified_name(api_metrics.Meter), 481 Telemetry.gauge.__name__: _fully_qualified_name(api_metrics._Gauge), 482 Telemetry.counter.__name__: _fully_qualified_name(api_metrics.Counter), 483 Telemetry.tracer_provider.__name__: _fully_qualified_name(api_trace.TracerProvider), 484 Telemetry.tracer.__name__: _fully_qualified_name(api_trace.Tracer), 485 }, 486) 487 488 489# Map of init options as a sorted JSON string (since dictionaries can't be hashed) to telemetry proxy. 490_TELEMETRY_PROXIES: dict[str, Telemetry] = {} 491# To share :py:class:`Telemetry` proxy objects within a process (e.g. client, server). 492# 493# Forking isn't expected to happen while this is held (may lead to a deadlock). 494_TELEMETRY_PROXIES_LOCK = threading.Lock() 495 496 497def _reinitialize_locks_after_fork() -> None: 498 """ 499 Reinitialize telemetry locks after fork to prevent deadlocks. 500 501 This function is called automatically after a fork to reinitialize all locks. 502 Caches and instances are kept as they may still be valid, but locks must be 503 fresh to avoid deadlocks. 504 """ 505 global _TELEMETRY, _TELEMETRY_LOCK, _TELEMETRY_PROXIES_LOCK 506 507 _TELEMETRY_LOCK = threading.Lock() 508 _TELEMETRY_PROXIES_LOCK = threading.Lock() 509 510 # Reinitialize LOCAL mode telemetry instance locks if it exists 511 if _TELEMETRY is not None: 512 _TELEMETRY._reinitialize_instance_locks_after_fork() 513 514 515if hasattr(os, "register_at_fork"): 516 os.register_at_fork(after_in_child=_reinitialize_locks_after_fork) 517 518
[docs] 519class TelemetryMode(enum.Enum): 520 """ 521 How to create a :py:class:`Telemetry` object. 522 """ 523 524 #: Keep everything local to the process (not fork-safe). 525 LOCAL = "local" 526 #: Start + connect to a telemetry IPC server. 527 SERVER = "server" 528 #: Connect to a telemetry IPC server. 529 CLIENT = "client"
530 531 532def _telemetry_proxies_key( 533 mode: Literal[TelemetryMode.SERVER, TelemetryMode.CLIENT], address: Union[str, tuple[str, int]] 534) -> str: 535 """ 536 Get the key for the _TELEMETRY_PROXIES dictionary. 537 """ 538 init_options = {"mode": mode.value, "address": address} 539 return json.dumps(init_options, sort_keys=True) 540 541 542def _telemetry_manager_server_port(process_id: int) -> int: 543 """ 544 Get the default telemetry manager server port. 545 546 This is PID-based to: 547 548 * Avoid collisions between multiple independent Python interpreters running on the same machine. 549 * Let child processes deterministically find their parent's telemetry manager server. 550 551 :param process_id: Process ID used to calculate the port. 552 """ 553 554 # Use the dynamic/private/ephemeral port range. 555 # 556 # https://www.rfc-editor.org/rfc/rfc6335.html#section-6 557 # https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Dynamic,_private_or_ephemeral_ports 558 # 559 # Modulo the parent/child process PID by the port range length, then add the initial offset. 560 return (2**15 + 2**14) + (process_id % ((2**16) - (2**15 + 2**14))) 561 562 563def _init_server(address: Optional[Union[str, tuple[str, int]]] = None) -> Telemetry: 564 """ 565 Start + connect to a telemetry IPC server. 566 """ 567 global _TELEMETRY_PROXIES 568 global _TELEMETRY_PROXIES_LOCK 569 570 address = address or ("127.0.0.1", _telemetry_manager_server_port(process_id=psutil.Process().pid)) 571 telemetry_proxies_key = _telemetry_proxies_key(mode=TelemetryMode.SERVER, address=address) 572 573 with _TELEMETRY_PROXIES_LOCK: 574 if telemetry_proxies_key in _TELEMETRY_PROXIES: 575 return _TELEMETRY_PROXIES[telemetry_proxies_key] 576 else: 577 telemetry_manager = TelemetryManager( 578 address=address, 579 # Use spawn instead of the platform-specific default (may be fork) to avoid aforementioned issues with fork. 580 ctx=multiprocessing.get_context(method="spawn"), 581 ) 582 583 logger.debug(f"Creating telemetry manager server at {telemetry_manager.address}.") 584 try: 585 telemetry_manager.start() 586 atexit.register(telemetry_manager.shutdown) 587 logger.debug(f"Started telemetry manager server at {telemetry_manager.address}.") 588 except Exception as e: 589 logger.debug( 590 f"Failed to create telemetry manager server at {telemetry_manager.address}!", exc_info=True 591 ) 592 raise e 593 594 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.") 595 try: 596 telemetry_manager.connect() 597 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.") 598 return _TELEMETRY_PROXIES.setdefault(telemetry_proxies_key, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue] 599 except Exception as e: 600 logger.debug( 601 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!", exc_info=True 602 ) 603 raise e 604 605 606def _init_client(address: Optional[Union[str, tuple[str, int]]] = None) -> Telemetry: 607 """ 608 Connect to a telemetry IPC server. 609 """ 610 global _TELEMETRY_PROXIES 611 global _TELEMETRY_PROXIES_LOCK 612 613 candidate_addresses: list[Union[str, tuple[str, int]]] = [] 614 615 if address is not None: 616 candidate_addresses = [address] 617 else: 618 current_process = psutil.Process() 619 # Python processes from leaf to root. 620 python_process_ancestry: list[psutil.Process] = [ 621 current_process, 622 # Try the default telemetry manager server port for all ancestor process IDs. 623 # 624 # psutil is used since multiprocessing only exposes the parent process. 625 # 626 # We can't use `itertools.takewhile(lambda ancestor_process: ancestor_process.name() == current_process.name(), ...)` 627 # to limit ourselves to ancestor Python processes by process name since some may not be named 628 # `python{optional version}` in some cases (e.g. may be named `pytest`). 629 *current_process.parents(), 630 ] 631 # Try to connect from leaf to root. 632 candidate_addresses = [ 633 ("127.0.0.1", _telemetry_manager_server_port(process_id=process.pid)) for process in python_process_ancestry 634 ] 635 636 for candidate_address in candidate_addresses: 637 telemetry_proxies_key = _telemetry_proxies_key(mode=TelemetryMode.CLIENT, address=candidate_address) 638 639 with _TELEMETRY_PROXIES_LOCK: 640 if telemetry_proxies_key in _TELEMETRY_PROXIES: 641 return _TELEMETRY_PROXIES[telemetry_proxies_key] 642 else: 643 telemetry_manager = TelemetryManager(address=candidate_address) 644 645 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.") 646 try: 647 telemetry_manager.connect() 648 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.") 649 return _TELEMETRY_PROXIES.setdefault(telemetry_proxies_key, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue] 650 except Exception: 651 logger.debug( 652 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!", 653 exc_info=True, 654 ) 655 656 raise ConnectionError(f"Failed to connect to telemetry manager server at any of {candidate_addresses}!") 657 658
[docs] 659def init( 660 mode: Optional[TelemetryMode] = None, 661 address: Optional[Union[str, tuple[str, int]]] = None, 662) -> Telemetry: 663 """ 664 Create or return an existing :py:class:`Telemetry` instance or :py:class:`Telemetry` proxy object. 665 666 :param mode: How to create a :py:class:`Telemetry` object. If ``None``, the default heuristic chooses a mode based on the presence of telemetry IPC servers in the process tree. 667 :param address: Telemetry IPC server address. Passed directly to a :py:class:`multiprocessing.managers.BaseManager`. Ignored if the mode is :py:const:`TelemetryMode.LOCAL`. 668 :return: A telemetry instance. 669 """ 670 671 if mode is None: 672 # Main process. 673 if multiprocessing.parent_process() is None: 674 # Daemons can't have child processes. 675 # 676 # Try to create a telemetry instance in local mode. 677 # 678 # ⚠️ This may cause CPU contention if the current process is compute-intensive 679 # and a high collect and/or export frequency is used due to global interpreter lock (GIL). 680 if multiprocessing.current_process().daemon: 681 return _init() 682 # Start + connect to a telemetry IPC server. 683 else: 684 return _init_server(address=address) 685 # Child process. 686 else: 687 # Connect to a telemetry IPC server. 688 try: 689 return _init_client(address=address) 690 # No existing telemetry IPC server. 691 except ConnectionError: 692 # Daemons can't have child processes. 693 # 694 # Try to create a telemetry instance in local mode. 695 # 696 # ⚠️ This may cause CPU contention if the current process is compute-intensive 697 # and a high collect and/or export frequency is used due to global interpreter lock (GIL). 698 if multiprocessing.current_process().daemon: 699 return _init() 700 # Start + connect to a telemetry IPC server. 701 else: 702 return _init_server(address=address) 703 elif mode == TelemetryMode.LOCAL: 704 return _init() 705 elif mode == TelemetryMode.SERVER: 706 return _init_server(address=address) 707 elif mode == TelemetryMode.CLIENT: 708 return _init_client(address=address) 709 else: 710 raise ValueError(f"Unsupported telemetry mode: {mode}")