Source code for multistorageclient.telemetry

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import atexit
 17import enum
 18import inspect
 19import json
 20import logging
 21import multiprocessing
 22import multiprocessing.managers
 23import os
 24import threading
 25from typing import Any, Literal, Optional, Union
 26
 27import opentelemetry.metrics as api_metrics
 28import opentelemetry.trace as api_trace
 29import psutil
 30
 31from .. import utils
 32
 33# MSC telemetry prefers publishing raw samples when possible to support arbitrary post-hoc aggregations.
 34#
 35# Some setups, however, may need resampling to reduce sample volume. The resampling methods we use
 36# sacrifice temporal resolution to preserve other information. Which method is used depends on if
 37# the expected post-hoc aggregate function is decomposable:
 38#
 39# * Decomposable aggregate functions (e.g. count, sum, min, max).
 40#   * Use client-side aggregation.
 41#     * E.g. preserve request + response counts.
 42# * Non-decomposable aggregate functions (e.g. average, percentile).
 43#   * Use decimation by an integer factor or last value.
 44#     * E.g. preserve the shape of the latency distribution (unlike tail sampling).
 45
 46_METRICS_EXPORTER_MAPPING = {
 47    "console": "opentelemetry.sdk.metrics.export.ConsoleMetricExporter",
 48    "otlp": "opentelemetry.exporter.otlp.proto.http.metric_exporter.OTLPMetricExporter",
 49    # "Private" until it's decided whether this will be official.
 50    "_otlp_msal": "multistorageclient.telemetry.metrics.exporters.otlp_msal._OTLPMSALMetricExporter",
 51    "_otlp_mtls_vault": "multistorageclient.telemetry.metrics.exporters.otlp_mtls_vault._OTLPmTLSVaultMetricExporter",
 52}
 53
 54_TRACE_EXPORTER_MAPPING = {
 55    "console": "opentelemetry.sdk.trace.export.ConsoleSpanExporter",
 56    "otlp": "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter",
 57    # "Private" until it's decided whether this will be official.
 58    "_otlp_msal": "multistorageclient.telemetry.traces.exporters.otlp_msal._OTLPMSALSpanExporter",
 59}
 60
 61logger = logging.getLogger(__name__)
 62
 63
[docs] 64class Telemetry: 65 """ 66 Provides telemetry resources. 67 68 Instances shouldn't be copied between processes. Not fork-safe or pickleable. 69 70 Instances can be shared between processes by registering with a :py:class:`multiprocessing.managers.BaseManager` and using proxy objects. 71 """ 72 73 # Metrics are named `multistorageclient.{property}(.{aggregation})?`. 74 # 75 # For example: 76 # 77 # - multistorageclient.data_size 78 # - Gauge for data size per individual operation. 79 # - For distributions (e.g. post-hoc histograms + heatmaps). 80 # - multistorageclient.data_size.sum 81 # - Counter (sum) for data size across all operations. 82 # - For aggregates (e.g. post-hoc data rate calculations). 83 84 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics 85 class GaugeName(enum.Enum): 86 LATENCY = "multistorageclient.latency" 87 DATA_SIZE = "multistorageclient.data_size" 88 DATA_RATE = "multistorageclient.data_rate" 89 90 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units 91 _GAUGE_UNIT_MAPPING: dict[GaugeName, str] = { 92 # Seconds. 93 GaugeName.LATENCY: "s", 94 # Bytes. 95 GaugeName.DATA_SIZE: "By", 96 # Bytes/second. 97 GaugeName.DATA_RATE: "By/s", 98 } 99 100 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics 101 class CounterName(enum.Enum): 102 REQUEST_SUM = "multistorageclient.request.sum" 103 RESPONSE_SUM = "multistorageclient.response.sum" 104 DATA_SIZE_SUM = "multistorageclient.data_size.sum" 105 106 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units 107 _COUNTER_UNIT_MAPPING: dict[CounterName, str] = { 108 # Unitless. 109 CounterName.REQUEST_SUM: "{request}", 110 # Unitless. 111 CounterName.RESPONSE_SUM: "{response}", 112 # Bytes. 113 CounterName.DATA_SIZE_SUM: "By", 114 } 115 116 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter provider. 117 _meter_provider_cache: dict[str, api_metrics.MeterProvider] 118 _meter_provider_cache_lock: threading.Lock 119 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter. 120 _meter_cache: dict[str, api_metrics.Meter] 121 _meter_cache_lock: threading.Lock 122 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to gauge name to gauge. 123 _gauge_cache: dict[str, dict[GaugeName, api_metrics._Gauge]] 124 _gauge_cache_lock: threading.Lock 125 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to counter name to counter. 126 _counter_cache: dict[str, dict[CounterName, api_metrics.Counter]] 127 _counter_cache_lock: threading.Lock 128 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer provider. 129 _tracer_provider_cache: dict[str, api_trace.TracerProvider] 130 _tracer_provider_cache_lock: threading.Lock 131 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer. 132 _tracer_cache: dict[str, api_trace.Tracer] 133 _tracer_cache_lock: threading.Lock 134 135 def __init__(self): 136 self._meter_provider_cache = {} 137 self._meter_provider_cache_lock = threading.Lock() 138 self._meter_cache = {} 139 self._meter_cache_lock = threading.Lock() 140 self._gauge_cache = {} 141 self._gauge_cache_lock = threading.Lock() 142 self._counter_cache = {} 143 self._counter_cache_lock = threading.Lock() 144 self._tracer_provider_cache = {} 145 self._tracer_provider_cache_lock = threading.Lock() 146 self._tracer_cache = {} 147 self._tracer_cache_lock = threading.Lock() 148 149 def _reinitialize_instance_locks_after_fork(self) -> None: 150 """ 151 Reinitialize internal locks after fork to prevent deadlocks. 152 153 This method reinitializes all internal locks. Caches are kept as they 154 may still be valid, but locks must be fresh to avoid deadlocks. 155 """ 156 self._meter_provider_cache_lock = threading.Lock() 157 self._meter_cache_lock = threading.Lock() 158 self._gauge_cache_lock = threading.Lock() 159 self._counter_cache_lock = threading.Lock() 160 self._tracer_provider_cache_lock = threading.Lock() 161 self._tracer_cache_lock = threading.Lock() 162 163 def meter_provider(self, config: dict[str, Any]) -> Optional[api_metrics.MeterProvider]: 164 """ 165 Create or return an existing :py:class:`api_metrics.MeterProvider` for a config. 166 167 :param config: ``.opentelemetry.metrics`` config dict. 168 :return: A :py:class:`api_metrics.MeterProvider` or ``None`` if no valid exporter is configured. 169 """ 170 config_json = json.dumps(config, sort_keys=True) 171 with self._meter_provider_cache_lock: 172 if config_json in self._meter_provider_cache: 173 return self._meter_provider_cache[config_json] 174 else: 175 if "exporter" in config: 176 try: 177 import opentelemetry.sdk.metrics as sdk_metrics 178 import opentelemetry.sdk.metrics.export as sdk_metrics_export 179 180 from .metrics.readers.diperiodic_exporting import DiperiodicExportingMetricReader 181 182 exporter_type: str = config["exporter"]["type"] 183 exporter_fully_qualified_name = _METRICS_EXPORTER_MAPPING.get(exporter_type, exporter_type) 184 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1) 185 cls = utils.import_class(exporter_class_name, exporter_module_name) 186 exporter_options = config["exporter"].get("options", {}).copy() 187 exporter: sdk_metrics_export.MetricExporter = cls(**exporter_options) 188 189 reader_options = config.get("reader", {}).get("options", {}) 190 reader: sdk_metrics_export.MetricReader = DiperiodicExportingMetricReader( 191 **reader_options, exporter=exporter 192 ) 193 194 return self._meter_provider_cache.setdefault( 195 config_json, sdk_metrics.MeterProvider(metric_readers=[reader]) 196 ) 197 except (AttributeError, ImportError): 198 logger.error( 199 "Failed to import OpenTelemetry Python SDK or exporter! Disabling metrics.", exc_info=True 200 ) 201 return None 202 else: 203 # Don't return a no-op meter provider to avoid unnecessary overhead. 204 logger.error("No exporter configured! Disabling metrics.") 205 return None 206 207 def meter(self, config: dict[str, Any]) -> Optional[api_metrics.Meter]: 208 """ 209 Create or return an existing :py:class:`api_metrics.Meter` for a config. 210 211 :param config: ``.opentelemetry.metrics`` config dict. 212 :return: A :py:class:`api_metrics.Meter` or ``None`` if no valid exporter is configured. 213 """ 214 config_json = json.dumps(config, sort_keys=True) 215 with self._meter_cache_lock: 216 if config_json in self._meter_cache: 217 return self._meter_cache[config_json] 218 else: 219 meter_provider = self.meter_provider(config=config) 220 if meter_provider is None: 221 return None 222 else: 223 return self._meter_cache.setdefault( 224 config_json, meter_provider.get_meter(name="multistorageclient") 225 ) 226 227 def gauge(self, config: dict[str, Any], name: GaugeName) -> Optional[api_metrics._Gauge]: 228 """ 229 Create or return an existing :py:class:`api_metrics.Gauge` for a config and gauge name. 230 231 :param config: ``.opentelemetry.metrics`` config dict. 232 :return: A :py:class:`api_metrics.Gauge` or ``None`` if no valid exporter is configured. 233 """ 234 config_json = json.dumps(config, sort_keys=True) 235 with self._gauge_cache_lock: 236 if config_json in self._gauge_cache and name in self._gauge_cache[config_json]: 237 return self._gauge_cache[config_json][name] 238 else: 239 meter = self.meter(config=config) 240 if meter is None: 241 return None 242 else: 243 return self._gauge_cache.setdefault(config_json, {}).setdefault( 244 name, 245 meter.create_gauge(name=name.value, unit=Telemetry._GAUGE_UNIT_MAPPING.get(name, "")), 246 ) 247 248 def counter(self, config: dict[str, Any], name: CounterName) -> Optional[api_metrics.Counter]: 249 """ 250 Create or return an existing :py:class:`api_metrics.Counter` for a config and counter name. 251 252 :param config: ``.opentelemetry.metrics`` config dict. 253 :return: A :py:class:`api_metrics.Counter` or ``None`` if no valid exporter is configured. 254 """ 255 config_json = json.dumps(config, sort_keys=True) 256 with self._counter_cache_lock: 257 if config_json in self._counter_cache and name in self._counter_cache[config_json]: 258 return self._counter_cache[config_json][name] 259 else: 260 meter = self.meter(config=config) 261 if meter is None: 262 return None 263 else: 264 return self._counter_cache.setdefault(config_json, {}).setdefault( 265 name, 266 meter.create_counter(name=name.value, unit=Telemetry._COUNTER_UNIT_MAPPING.get(name, "")), 267 ) 268 269 def tracer_provider(self, config: dict[str, Any]) -> Optional[api_trace.TracerProvider]: 270 """ 271 Create or return an existing :py:class:`api_trace.TracerProvider` for a config. 272 273 :param config: ``.opentelemetry.traces`` config dict. 274 :return: A :py:class:`api_trace.TracerProvider` or ``None`` if no valid exporter is configured. 275 """ 276 config_json = json.dumps(config, sort_keys=True) 277 with self._tracer_provider_cache_lock: 278 if config_json in self._tracer_provider_cache: 279 return self._tracer_provider_cache[config_json] 280 else: 281 if "exporter" in config: 282 try: 283 import opentelemetry.sdk.trace as sdk_trace 284 import opentelemetry.sdk.trace.export as sdk_trace_export 285 import opentelemetry.sdk.trace.sampling as sdk_trace_sampling 286 287 exporter_type: str = config["exporter"]["type"] 288 exporter_fully_qualified_name = _TRACE_EXPORTER_MAPPING.get(exporter_type, exporter_type) 289 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1) 290 cls = utils.import_class(exporter_class_name, exporter_module_name) 291 exporter_options = config["exporter"].get("options", {}) 292 exporter: sdk_trace_export.SpanExporter = cls(**exporter_options) 293 294 processor: sdk_trace.SpanProcessor = sdk_trace.SynchronousMultiSpanProcessor() 295 processor.add_span_processor(sdk_trace_export.BatchSpanProcessor(span_exporter=exporter)) 296 297 # TODO: Add sampler to configuration schema. 298 sampler: sdk_trace_sampling.Sampler = sdk_trace_sampling.ALWAYS_ON 299 300 return self._tracer_provider_cache.setdefault( 301 config_json, 302 sdk_trace.TracerProvider(active_span_processor=processor, sampler=sampler), 303 ) 304 except (AttributeError, ImportError): 305 logger.error( 306 "Failed to import OpenTelemetry Python SDK or exporter! Disabling traces.", exc_info=True 307 ) 308 return None 309 else: 310 logger.error("No exporter configured! Disabling traces.") 311 return None 312 313 def tracer(self, config: dict[str, Any]) -> Optional[api_trace.Tracer]: 314 """ 315 Create or return an existing :py:class:`api_trace.Tracer` for a config. 316 317 :param config: ``.opentelemetry.traces`` config dict. 318 :return: A :py:class:`api_trace.Tracer` or ``None`` if no valid exporter is configured. 319 """ 320 config_json = json.dumps(config, sort_keys=True) 321 with self._tracer_cache_lock: 322 if config_json in self._tracer_cache: 323 return self._tracer_cache[config_json] 324 else: 325 tracer_provider = self.tracer_provider(config=config) 326 if tracer_provider is None: 327 return None 328 else: 329 return self._tracer_cache.setdefault( 330 config_json, tracer_provider.get_tracer(instrumenting_module_name="multistorageclient") 331 )
332 333 334# To share a single :py:class:`Telemetry` within a process (e.g. local, manager). 335# 336# A manager's server processes shouldn't be forked, so this should be safe. 337_TELEMETRY: Optional[Telemetry] = None 338_TELEMETRY_LOCK = threading.Lock() 339 340 341def _init() -> Telemetry: 342 """ 343 Create or return an existing :py:class:`Telemetry`. 344 345 :return: A telemetry instance. 346 """ 347 global _TELEMETRY 348 global _TELEMETRY_LOCK 349 350 with _TELEMETRY_LOCK: 351 if _TELEMETRY is None: 352 _TELEMETRY = Telemetry() 353 return _TELEMETRY 354 355
[docs] 356class TelemetryManager(multiprocessing.managers.BaseManager): 357 """ 358 A :py:class:`multiprocessing.managers.BaseManager` for telemetry resources. 359 360 The OpenTelemetry Python SDK isn't fork-safe since telemetry sample buffers can be duplicated. 361 362 In addition, Python ≤3.12 doesn't call exit handlers for forked processes. 363 This causes the OpenTelemetry Python SDK to not flush telemetry before exiting. 364 365 * https://github.com/open-telemetry/opentelemetry-python/issues/4215 366 * https://github.com/open-telemetry/opentelemetry-python/issues/3307 367 368 Forking is multiprocessing's default start method for non-macOS POSIX systems until Python 3.14. 369 370 * https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods 371 372 To fully support multiprocessing, resampling + publishing is handled by 373 a single process that's (ideally) a child of (i.e. directly under) the main process. This: 374 375 * Relieves other processes of this work. 376 377 * Avoids issues with duplicate samples when forking and unpublished samples when exiting forks. 378 379 * Allows cross-process resampling. 380 * Reuses a single connection pool to telemetry backends. 381 382 The downside is it essentially re-introduces global interpreter lock (GIL) with 383 additional IPC overhead. Telemetry operations, however, should be lightweight so 384 this isn't expected to be a problem. Remote data store latency should still be 385 the primary throughput limiter for storage clients. 386 387 :py:class:`multiprocessing.managers.BaseManager` is used for this since it creates 388 a separate server process for shared objects. 389 390 Telemetry resources are provided as 391 `proxy objects <https://docs.python.org/3/library/multiprocessing.html#proxy-objects>`_ 392 for location transparency. 393 394 The documentation isn't particularly detailed, but others have written comprehensively on this: 395 396 * https://zpz.github.io/blog/python-mp-manager-1 397 * https://zpz.github.io/blog/python-mp-manager-2 398 * https://zpz.github.io/blog/python-mp-manager-3 399 400 By specification, metric and tracer providers must call shutdown on any 401 underlying metric readers + span processors + exporters. 402 403 * https://opentelemetry.io/docs/specs/otel/metrics/sdk#shutdown 404 * https://opentelemetry.io/docs/specs/otel/trace/sdk#shutdown 405 406 In the OpenTelemetry Python SDK, provider shutdown is called automatically 407 by exit handlers (when they work at least). Consequently, clients should: 408 409 * Only receive proxy objects. 410 411 * Enables metric reader + span processor + exporter re-use across processes. 412 413 * Never call shutdown on the proxy objects. 414 415 * The shutdown exit handler is registered on the manager's server process. 416 * ⚠️ We expect a finite number of providers (i.e. no dynamic configs) so we don't leak them. 417 """ 418 419 pass
420 421 422def _fully_qualified_name(c: type[Any]) -> str: 423 """ 424 Return the fully qualified name for a class (e.g. ``module.Class``). 425 426 For :py:class:`multiprocessing.Manager` type IDs. 427 """ 428 return ".".join([c.__module__, c.__qualname__]) 429 430 431# Metrics proxy object setup. 432TelemetryManager.register(typeid=_fully_qualified_name(api_metrics._Gauge)) 433TelemetryManager.register(typeid=_fully_qualified_name(api_metrics.Counter)) 434TelemetryManager.register( 435 typeid=_fully_qualified_name(api_metrics.Meter), 436 method_to_typeid={ 437 api_metrics.Meter.create_gauge.__name__: _fully_qualified_name(api_metrics._Gauge), 438 api_metrics.Meter.create_counter.__name__: _fully_qualified_name(api_metrics.Counter), 439 }, 440) 441TelemetryManager.register( 442 typeid=_fully_qualified_name(api_metrics.MeterProvider), 443 method_to_typeid={api_metrics.MeterProvider.get_meter.__name__: _fully_qualified_name(api_metrics.Meter)}, 444) 445 446# Traces proxy object setup. 447TelemetryManager.register( 448 typeid=_fully_qualified_name(api_trace.Span), 449 # Non-public methods (i.e. ones starting with a ``_``) are omitted by default. 450 # 451 # We need ``__enter__`` and ``__exit__`` so the ``Span`` can be used as a ``ContextManager``. 452 exposed=[name for name, _ in inspect.getmembers(api_trace.Span, predicate=inspect.isfunction)], 453 method_to_typeid={api_trace.Span.__enter__.__name__: _fully_qualified_name(api_trace.Span)}, 454) 455TelemetryManager.register( 456 typeid=_fully_qualified_name(api_trace.Tracer), 457 # Can't proxy ``Tracer.start_as_current_span`` since it returns a generator (not pickleable) 458 # and tries to use the process-local global context (in this case, the manager's server process). 459 # 460 # Instead, spans should be constructed by: 461 # 462 # 1. Calling ``opentelemetry.context.get_current()`` to get the process-local global context (pickleable). 463 # 2. Creating a new span with the process-local global context. 464 # 3. Calling ``opentelemetry.trace.use_span()`` with the span and ``end_on_exit=True``. 465 method_to_typeid={api_trace.Tracer.start_span.__name__: _fully_qualified_name(api_trace.Span)}, 466) 467TelemetryManager.register( 468 typeid=_fully_qualified_name(api_trace.TracerProvider), 469 method_to_typeid={api_trace.TracerProvider.get_tracer.__name__: _fully_qualified_name(api_trace.Tracer)}, 470) 471 472# Telemetry proxy object setup. 473# 474# This should be the only registered type with a ``callable``. 475# It's the only type we create directly with a ``TelemetryManager``. 476TelemetryManager.register( 477 typeid=Telemetry.__name__, 478 callable=_init, 479 method_to_typeid={ 480 Telemetry.meter_provider.__name__: _fully_qualified_name(api_metrics.MeterProvider), 481 Telemetry.meter.__name__: _fully_qualified_name(api_metrics.Meter), 482 Telemetry.gauge.__name__: _fully_qualified_name(api_metrics._Gauge), 483 Telemetry.counter.__name__: _fully_qualified_name(api_metrics.Counter), 484 Telemetry.tracer_provider.__name__: _fully_qualified_name(api_trace.TracerProvider), 485 Telemetry.tracer.__name__: _fully_qualified_name(api_trace.Tracer), 486 }, 487) 488 489 490# Map of init options as a sorted JSON string (since dictionaries can't be hashed) to telemetry proxy. 491_TELEMETRY_PROXIES: dict[str, Telemetry] = {} 492# To share :py:class:`Telemetry` proxy objects within a process (e.g. client, server). 493# 494# Forking isn't expected to happen while this is held (may lead to a deadlock). 495_TELEMETRY_PROXIES_LOCK = threading.Lock() 496 497 498def _reinitialize_locks_after_fork() -> None: 499 """ 500 Reinitialize telemetry locks after fork to prevent deadlocks. 501 502 This function is called automatically after a fork to reinitialize all locks. 503 Caches and instances are kept as they may still be valid, but locks must be 504 fresh to avoid deadlocks. 505 """ 506 global _TELEMETRY, _TELEMETRY_LOCK, _TELEMETRY_PROXIES_LOCK 507 508 _TELEMETRY_LOCK = threading.Lock() 509 _TELEMETRY_PROXIES_LOCK = threading.Lock() 510 511 # Reinitialize LOCAL mode telemetry instance locks if it exists 512 if _TELEMETRY is not None: 513 _TELEMETRY._reinitialize_instance_locks_after_fork() 514 515 516if hasattr(os, "register_at_fork"): 517 os.register_at_fork(after_in_child=_reinitialize_locks_after_fork) 518 519
[docs] 520class TelemetryMode(enum.Enum): 521 """ 522 How to create a :py:class:`Telemetry` object. 523 """ 524 525 #: Keep everything local to the process (not fork-safe). 526 LOCAL = "local" 527 #: Start + connect to a telemetry IPC server. 528 SERVER = "server" 529 #: Connect to a telemetry IPC server. 530 CLIENT = "client"
531 532 533def _telemetry_proxies_key( 534 mode: Literal[TelemetryMode.SERVER, TelemetryMode.CLIENT], address: Union[str, tuple[str, int]] 535) -> str: 536 """ 537 Get the key for the _TELEMETRY_PROXIES dictionary. 538 """ 539 init_options = {"mode": mode.value, "address": address} 540 return json.dumps(init_options, sort_keys=True) 541 542 543def _telemetry_manager_server_port(process_id: int) -> int: 544 """ 545 Get the default telemetry manager server port. 546 547 This is PID-based to: 548 549 * Avoid collisions between multiple independent Python interpreters running on the same machine. 550 * Let child processes deterministically find their parent's telemetry manager server. 551 552 :param process_id: Process ID used to calculate the port. 553 """ 554 555 # Use the dynamic/private/ephemeral port range. 556 # 557 # https://www.rfc-editor.org/rfc/rfc6335.html#section-6 558 # https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Dynamic,_private_or_ephemeral_ports 559 # 560 # Modulo the parent/child process PID by the port range length, then add the initial offset. 561 return (2**15 + 2**14) + (process_id % ((2**16) - (2**15 + 2**14))) 562 563 564def _init_server(address: Optional[Union[str, tuple[str, int]]] = None) -> Telemetry: 565 """ 566 Start + connect to a telemetry IPC server. 567 """ 568 global _TELEMETRY_PROXIES 569 global _TELEMETRY_PROXIES_LOCK 570 571 address = address or ("127.0.0.1", _telemetry_manager_server_port(process_id=psutil.Process().pid)) 572 telemetry_proxies_key = _telemetry_proxies_key(mode=TelemetryMode.SERVER, address=address) 573 574 with _TELEMETRY_PROXIES_LOCK: 575 if telemetry_proxies_key in _TELEMETRY_PROXIES: 576 return _TELEMETRY_PROXIES[telemetry_proxies_key] 577 else: 578 telemetry_manager = TelemetryManager( 579 address=address, 580 # Use spawn instead of the platform-specific default (may be fork) to avoid aforementioned issues with fork. 581 ctx=multiprocessing.get_context(method="spawn"), 582 ) 583 584 logger.debug(f"Creating telemetry manager server at {telemetry_manager.address}.") 585 try: 586 telemetry_manager.start() 587 atexit.register(telemetry_manager.shutdown) 588 logger.debug(f"Started telemetry manager server at {telemetry_manager.address}.") 589 except Exception as e: 590 logger.debug( 591 f"Failed to create telemetry manager server at {telemetry_manager.address}!", exc_info=True 592 ) 593 raise e 594 595 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.") 596 try: 597 telemetry_manager.connect() 598 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.") 599 return _TELEMETRY_PROXIES.setdefault(telemetry_proxies_key, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue] 600 except Exception as e: 601 logger.debug( 602 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!", exc_info=True 603 ) 604 raise e 605 606 607def _init_client(address: Optional[Union[str, tuple[str, int]]] = None) -> Telemetry: 608 """ 609 Connect to a telemetry IPC server. 610 """ 611 global _TELEMETRY_PROXIES 612 global _TELEMETRY_PROXIES_LOCK 613 614 candidate_addresses: list[Union[str, tuple[str, int]]] = [] 615 616 if address is not None: 617 candidate_addresses = [address] 618 else: 619 current_process = psutil.Process() 620 # Python processes from leaf to root. 621 python_process_ancestry: list[psutil.Process] = [ 622 current_process, 623 # Try the default telemetry manager server port for all ancestor process IDs. 624 # 625 # psutil is used since multiprocessing only exposes the parent process. 626 # 627 # We can't use `itertools.takewhile(lambda ancestor_process: ancestor_process.name() == current_process.name(), ...)` 628 # to limit ourselves to ancestor Python processes by process name since some may not be named 629 # `python{optional version}` in some cases (e.g. may be named `pytest`). 630 *current_process.parents(), 631 ] 632 # Try to connect from leaf to root. 633 candidate_addresses = [ 634 ("127.0.0.1", _telemetry_manager_server_port(process_id=process.pid)) for process in python_process_ancestry 635 ] 636 637 for candidate_address in candidate_addresses: 638 telemetry_proxies_key = _telemetry_proxies_key(mode=TelemetryMode.CLIENT, address=candidate_address) 639 640 with _TELEMETRY_PROXIES_LOCK: 641 if telemetry_proxies_key in _TELEMETRY_PROXIES: 642 return _TELEMETRY_PROXIES[telemetry_proxies_key] 643 else: 644 telemetry_manager = TelemetryManager(address=candidate_address) 645 646 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.") 647 try: 648 telemetry_manager.connect() 649 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.") 650 return _TELEMETRY_PROXIES.setdefault(telemetry_proxies_key, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue] 651 except Exception: 652 logger.debug( 653 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!", 654 exc_info=True, 655 ) 656 657 raise ConnectionError(f"Failed to connect to telemetry manager server at any of {candidate_addresses}!") 658 659
[docs] 660def init( 661 mode: Optional[TelemetryMode] = None, 662 address: Optional[Union[str, tuple[str, int]]] = None, 663) -> Telemetry: 664 """ 665 Create or return an existing :py:class:`Telemetry` instance or :py:class:`Telemetry` proxy object. 666 667 :param mode: How to create a :py:class:`Telemetry` object. If ``None``, the default heuristic chooses a mode based on the presence of telemetry IPC servers in the process tree. 668 :param address: Telemetry IPC server address. Passed directly to a :py:class:`multiprocessing.managers.BaseManager`. Ignored if the mode is :py:const:`TelemetryMode.LOCAL`. 669 :return: A telemetry instance. 670 """ 671 672 if mode is None: 673 # Main process. 674 if multiprocessing.parent_process() is None: 675 # Daemons can't have child processes. 676 # 677 # Try to create a telemetry instance in local mode. 678 # 679 # ⚠️ This may cause CPU contention if the current process is compute-intensive 680 # and a high collect and/or export frequency is used due to global interpreter lock (GIL). 681 if multiprocessing.current_process().daemon: 682 return _init() 683 # Start + connect to a telemetry IPC server. 684 else: 685 return _init_server(address=address) 686 # Child process. 687 else: 688 # Connect to a telemetry IPC server. 689 try: 690 return _init_client(address=address) 691 # No existing telemetry IPC server. 692 except ConnectionError: 693 # Daemons can't have child processes. 694 # 695 # Try to create a telemetry instance in local mode. 696 # 697 # ⚠️ This may cause CPU contention if the current process is compute-intensive 698 # and a high collect and/or export frequency is used due to global interpreter lock (GIL). 699 if multiprocessing.current_process().daemon: 700 return _init() 701 # Start + connect to a telemetry IPC server. 702 else: 703 return _init_server(address=address) 704 elif mode == TelemetryMode.LOCAL: 705 return _init() 706 elif mode == TelemetryMode.SERVER: 707 return _init_server(address=address) 708 elif mode == TelemetryMode.CLIENT: 709 return _init_client(address=address) 710 else: 711 raise ValueError(f"Unsupported telemetry mode: {mode}")