Source code for multistorageclient.telemetry

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import atexit
 17import enum
 18import inspect
 19import json
 20import logging
 21import multiprocessing
 22import multiprocessing.managers
 23import threading
 24from typing import Any, Literal, Optional, Union
 25
 26import opentelemetry.metrics as api_metrics
 27import opentelemetry.trace as api_trace
 28import psutil
 29
 30from .. import utils
 31
 32# MSC telemetry prefers publishing raw samples when possible to support arbitrary post-hoc aggregations.
 33#
 34# Some setups, however, may need resampling to reduce sample volume. The resampling methods we use
 35# sacrifice temporal resolution to preserve other information. Which method is used depends on if
 36# the expected post-hoc aggregate function is decomposable:
 37#
 38# * Decomposable aggregate functions (e.g. count, sum, min, max).
 39#   * Use client-side aggregation.
 40#     * E.g. preserve request + response counts.
 41# * Non-decomposable aggregate functions (e.g. average, percentile).
 42#   * Use decimation by an integer factor or last value.
 43#     * E.g. preserve the shape of the latency distribution (unlike tail sampling).
 44
 45_METRICS_EXPORTER_MAPPING = {
 46    "console": "opentelemetry.sdk.metrics.export.ConsoleMetricExporter",
 47    "otlp": "opentelemetry.exporter.otlp.proto.http.metric_exporter.OTLPMetricExporter",
 48    # "Private" until it's decided whether this will be official.
 49    "_otlp_msal": "multistorageclient.telemetry.metrics.exporters.otlp_msal._OTLPMSALMetricExporter",
 50}
 51
 52_TRACE_EXPORTER_MAPPING = {
 53    "console": "opentelemetry.sdk.trace.export.ConsoleSpanExporter",
 54    "otlp": "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter",
 55}
 56
 57logger = logging.getLogger(__name__)
 58
 59
[docs] 60class Telemetry: 61 """ 62 Provides telemetry resources. 63 64 Instances shouldn't be copied between processes. Not fork-safe or pickleable. 65 66 Instances can be shared between processes by registering with a :py:class:`multiprocessing.managers.BaseManager` and using proxy objects. 67 """ 68 69 # Metrics are named `multistorageclient.{property}(.{aggregation})?`. 70 # 71 # For example: 72 # 73 # - multistorageclient.data_size 74 # - Gauge for data size per individual operation. 75 # - For distributions (e.g. post-hoc histograms + heatmaps). 76 # - multistorageclient.data_size.sum 77 # - Counter (sum) for data size across all operations. 78 # - For aggregates (e.g. post-hoc data rate calculations). 79 80 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics 81 class GaugeName(enum.Enum): 82 LATENCY = "multistorageclient.latency" 83 DATA_SIZE = "multistorageclient.data_size" 84 DATA_RATE = "multistorageclient.data_rate" 85 86 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units 87 _GAUGE_UNIT_MAPPING: dict[GaugeName, str] = { 88 # Seconds. 89 GaugeName.LATENCY: "s", 90 # Bytes. 91 GaugeName.DATA_SIZE: "By", 92 # Bytes/second. 93 GaugeName.DATA_RATE: "By/s", 94 } 95 96 # https://opentelemetry.io/docs/specs/semconv/general/naming#metrics 97 class CounterName(enum.Enum): 98 REQUEST_SUM = "multistorageclient.request.sum" 99 RESPONSE_SUM = "multistorageclient.response.sum" 100 DATA_SIZE_SUM = "multistorageclient.data_size.sum" 101 102 # https://opentelemetry.io/docs/specs/semconv/general/metrics#units 103 _COUNTER_UNIT_MAPPING: dict[CounterName, str] = { 104 # Unitless. 105 CounterName.REQUEST_SUM: "{request}", 106 # Unitless. 107 CounterName.RESPONSE_SUM: "{response}", 108 # Bytes. 109 CounterName.DATA_SIZE_SUM: "By", 110 } 111 112 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter provider. 113 _meter_provider_cache: dict[str, api_metrics.MeterProvider] 114 _meter_provider_cache_lock: threading.Lock 115 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to meter. 116 _meter_cache: dict[str, api_metrics.Meter] 117 _meter_cache_lock: threading.Lock 118 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to gauge name to gauge. 119 _gauge_cache: dict[str, dict[GaugeName, api_metrics._Gauge]] 120 _gauge_cache_lock: threading.Lock 121 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to counter name to counter. 122 _counter_cache: dict[str, dict[CounterName, api_metrics.Counter]] 123 _counter_cache_lock: threading.Lock 124 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer provider. 125 _tracer_provider_cache: dict[str, api_trace.TracerProvider] 126 _tracer_provider_cache_lock: threading.Lock 127 # Map of config as a sorted JSON string (since dictionaries can't be hashed) to tracer. 128 _tracer_cache: dict[str, api_trace.Tracer] 129 _tracer_cache_lock: threading.Lock 130 131 def __init__(self): 132 self._meter_provider_cache = {} 133 self._meter_provider_cache_lock = threading.Lock() 134 self._meter_cache = {} 135 self._meter_cache_lock = threading.Lock() 136 self._gauge_cache = {} 137 self._gauge_cache_lock = threading.Lock() 138 self._counter_cache = {} 139 self._counter_cache_lock = threading.Lock() 140 self._tracer_provider_cache = {} 141 self._tracer_provider_cache_lock = threading.Lock() 142 self._tracer_cache = {} 143 self._tracer_cache_lock = threading.Lock() 144 145 def meter_provider(self, config: dict[str, Any]) -> Optional[api_metrics.MeterProvider]: 146 """ 147 Create or return an existing :py:class:`api_metrics.MeterProvider` for a config. 148 149 :param config: ``.opentelemetry.metrics`` config dict. 150 :return: A :py:class:`api_metrics.MeterProvider` or ``None`` if no valid exporter is configured. 151 """ 152 config_json = json.dumps(config, sort_keys=True) 153 with self._meter_provider_cache_lock: 154 if config_json in self._meter_provider_cache: 155 return self._meter_provider_cache[config_json] 156 else: 157 if "exporter" in config: 158 try: 159 import opentelemetry.sdk.metrics as sdk_metrics 160 import opentelemetry.sdk.metrics.export as sdk_metrics_export 161 162 from .metrics.readers.diperiodic_exporting import DiperiodicExportingMetricReader 163 164 exporter_type: str = config["exporter"]["type"] 165 exporter_fully_qualified_name = _METRICS_EXPORTER_MAPPING.get(exporter_type, exporter_type) 166 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1) 167 cls = utils.import_class(exporter_class_name, exporter_module_name) 168 exporter_options = config["exporter"].get("options", {}) 169 exporter: sdk_metrics_export.MetricExporter = cls(**exporter_options) 170 171 reader_options = config.get("reader", {}).get("options", {}) 172 reader: sdk_metrics_export.MetricReader = DiperiodicExportingMetricReader( 173 **reader_options, exporter=exporter 174 ) 175 176 return self._meter_provider_cache.setdefault( 177 config_json, sdk_metrics.MeterProvider(metric_readers=[reader]) 178 ) 179 except (AttributeError, ImportError): 180 logger.error( 181 "Failed to import OpenTelemetry Python SDK or exporter! Disabling metrics.", exc_info=True 182 ) 183 return None 184 else: 185 # Don't return a no-op meter provider to avoid unnecessary overhead. 186 logger.error("No exporter configured! Disabling metrics.") 187 return None 188 189 def meter(self, config: dict[str, Any]) -> Optional[api_metrics.Meter]: 190 """ 191 Create or return an existing :py:class:`api_metrics.Meter` for a config. 192 193 :param config: ``.opentelemetry.metrics`` config dict. 194 :return: A :py:class:`api_metrics.Meter` or ``None`` if no valid exporter is configured. 195 """ 196 config_json = json.dumps(config, sort_keys=True) 197 with self._meter_cache_lock: 198 if config_json in self._meter_cache: 199 return self._meter_cache[config_json] 200 else: 201 meter_provider = self.meter_provider(config=config) 202 if meter_provider is None: 203 return None 204 else: 205 return self._meter_cache.setdefault( 206 config_json, meter_provider.get_meter(name="multistorageclient") 207 ) 208 209 def gauge(self, config: dict[str, Any], name: GaugeName) -> Optional[api_metrics._Gauge]: 210 """ 211 Create or return an existing :py:class:`api_metrics.Gauge` for a config and gauge name. 212 213 :param config: ``.opentelemetry.metrics`` config dict. 214 :return: A :py:class:`api_metrics.Gauge` or ``None`` if no valid exporter is configured. 215 """ 216 config_json = json.dumps(config, sort_keys=True) 217 with self._gauge_cache_lock: 218 if config_json in self._gauge_cache and name in self._gauge_cache[config_json]: 219 return self._gauge_cache[config_json][name] 220 else: 221 meter = self.meter(config=config) 222 if meter is None: 223 return None 224 else: 225 return self._gauge_cache.setdefault(config_json, {}).setdefault( 226 name, 227 meter.create_gauge(name=name.value, unit=Telemetry._GAUGE_UNIT_MAPPING.get(name, "")), 228 ) 229 230 def counter(self, config: dict[str, Any], name: CounterName) -> Optional[api_metrics.Counter]: 231 """ 232 Create or return an existing :py:class:`api_metrics.Counter` for a config and counter name. 233 234 :param config: ``.opentelemetry.metrics`` config dict. 235 :return: A :py:class:`api_metrics.Counter` or ``None`` if no valid exporter is configured. 236 """ 237 config_json = json.dumps(config, sort_keys=True) 238 with self._counter_cache_lock: 239 if config_json in self._counter_cache and name in self._counter_cache[config_json]: 240 return self._counter_cache[config_json][name] 241 else: 242 meter = self.meter(config=config) 243 if meter is None: 244 return None 245 else: 246 return self._counter_cache.setdefault(config_json, {}).setdefault( 247 name, 248 meter.create_counter(name=name.value, unit=Telemetry._COUNTER_UNIT_MAPPING.get(name, "")), 249 ) 250 251 def tracer_provider(self, config: dict[str, Any]) -> Optional[api_trace.TracerProvider]: 252 """ 253 Create or return an existing :py:class:`api_trace.TracerProvider` for a config. 254 255 :param config: ``.opentelemetry.traces`` config dict. 256 :return: A :py:class:`api_trace.TracerProvider` or ``None`` if no valid exporter is configured. 257 """ 258 config_json = json.dumps(config, sort_keys=True) 259 with self._tracer_provider_cache_lock: 260 if config_json in self._tracer_provider_cache: 261 return self._tracer_provider_cache[config_json] 262 else: 263 if "exporter" in config: 264 try: 265 import opentelemetry.sdk.trace as sdk_trace 266 import opentelemetry.sdk.trace.export as sdk_trace_export 267 import opentelemetry.sdk.trace.sampling as sdk_trace_sampling 268 269 exporter_type: str = config["exporter"]["type"] 270 exporter_fully_qualified_name = _TRACE_EXPORTER_MAPPING.get(exporter_type, exporter_type) 271 exporter_module_name, exporter_class_name = exporter_fully_qualified_name.rsplit(".", 1) 272 cls = utils.import_class(exporter_class_name, exporter_module_name) 273 exporter_options = config["exporter"].get("options", {}) 274 exporter: sdk_trace_export.SpanExporter = cls(**exporter_options) 275 276 processor: sdk_trace.SpanProcessor = sdk_trace.SynchronousMultiSpanProcessor() 277 processor.add_span_processor(sdk_trace_export.BatchSpanProcessor(span_exporter=exporter)) 278 279 # TODO: Add sampler to configuration schema. 280 sampler: sdk_trace_sampling.Sampler = sdk_trace_sampling.ALWAYS_ON 281 282 return self._tracer_provider_cache.setdefault( 283 config_json, 284 sdk_trace.TracerProvider(active_span_processor=processor, sampler=sampler), 285 ) 286 except (AttributeError, ImportError): 287 logger.error( 288 "Failed to import OpenTelemetry Python SDK or exporter! Disabling traces.", exc_info=True 289 ) 290 return None 291 else: 292 logger.error("No exporter configured! Disabling traces.") 293 return None 294 295 def tracer(self, config: dict[str, Any]) -> Optional[api_trace.Tracer]: 296 """ 297 Create or return an existing :py:class:`api_trace.Tracer` for a config. 298 299 :param config: ``.opentelemetry.traces`` config dict. 300 :return: A :py:class:`api_trace.Tracer` or ``None`` if no valid exporter is configured. 301 """ 302 config_json = json.dumps(config, sort_keys=True) 303 with self._tracer_cache_lock: 304 if config_json in self._tracer_cache: 305 return self._tracer_cache[config_json] 306 else: 307 tracer_provider = self.tracer_provider(config=config) 308 if tracer_provider is None: 309 return None 310 else: 311 return self._tracer_cache.setdefault( 312 config_json, tracer_provider.get_tracer(instrumenting_module_name="multistorageclient") 313 )
314 315 316# To share a single :py:class:`Telemetry` within a process (e.g. local, manager). 317# 318# A manager's server processes shouldn't be forked, so this should be safe. 319_TELEMETRY: Optional[Telemetry] = None 320_TELEMETRY_LOCK = threading.Lock() 321 322 323def _init() -> Telemetry: 324 """ 325 Create or return an existing :py:class:`Telemetry`. 326 327 :return: A telemetry instance. 328 """ 329 global _TELEMETRY 330 global _TELEMETRY_LOCK 331 332 with _TELEMETRY_LOCK: 333 if _TELEMETRY is None: 334 _TELEMETRY = Telemetry() 335 return _TELEMETRY 336 337
[docs] 338class TelemetryManager(multiprocessing.managers.BaseManager): 339 """ 340 A :py:class:`multiprocessing.managers.BaseManager` for telemetry resources. 341 342 The OpenTelemetry Python SDK isn't fork-safe since telemetry sample buffers can be duplicated. 343 344 In addition, Python ≤3.12 doesn't call exit handlers for forked processes. 345 This causes the OpenTelemetry Python SDK to not flush telemetry before exiting. 346 347 * https://github.com/open-telemetry/opentelemetry-python/issues/4215 348 * https://github.com/open-telemetry/opentelemetry-python/issues/3307 349 350 Forking is multiprocessing's default start method for non-macOS POSIX systems until Python 3.14. 351 352 * https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods 353 354 To fully support multiprocessing, resampling + publishing is handled by 355 a single process that's (ideally) a child of (i.e. directly under) the main process. This: 356 357 * Relieves other processes of this work. 358 359 * Avoids issues with duplicate samples when forking and unpublished samples when exiting forks. 360 361 * Allows cross-process resampling. 362 * Reuses a single connection pool to telemetry backends. 363 364 The downside is it essentially re-introduces global interpreter lock (GIL) with 365 additional IPC overhead. Telemetry operations, however, should be lightweight so 366 this isn't expected to be a problem. Remote data store latency should still be 367 the primary throughput limiter for storage clients. 368 369 :py:class:`multiprocessing.managers.BaseManager` is used for this since it creates 370 a separate server process for shared objects. 371 372 Telemetry resources are provided as 373 `proxy objects <https://docs.python.org/3/library/multiprocessing.html#proxy-objects>`_ 374 for location transparency. 375 376 The documentation isn't particularly detailed, but others have written comprehensively on this: 377 378 * https://zpz.github.io/blog/python-mp-manager-1 379 * https://zpz.github.io/blog/python-mp-manager-2 380 * https://zpz.github.io/blog/python-mp-manager-3 381 382 By specification, metric and tracer providers must call shutdown on any 383 underlying metric readers + span processors + exporters. 384 385 * https://opentelemetry.io/docs/specs/otel/metrics/sdk#shutdown 386 * https://opentelemetry.io/docs/specs/otel/trace/sdk#shutdown 387 388 In the OpenTelemetry Python SDK, provider shutdown is called automatically 389 by exit handlers (when they work at least). Consequently, clients should: 390 391 * Only receive proxy objects. 392 393 * Enables metric reader + span processor + exporter re-use across processes. 394 395 * Never call shutdown on the proxy objects. 396 397 * The shutdown exit handler is registered on the manager's server process. 398 * ⚠️ We expect a finite number of providers (i.e. no dynamic configs) so we don't leak them. 399 """ 400 401 pass
402 403 404def _fully_qualified_name(c: type[Any]) -> str: 405 """ 406 Return the fully qualified name for a class (e.g. ``module.Class``). 407 408 For :py:class:`multiprocessing.Manager` type IDs. 409 """ 410 return ".".join([c.__module__, c.__qualname__]) 411 412 413# Metrics proxy object setup. 414TelemetryManager.register(typeid=_fully_qualified_name(api_metrics._Gauge)) 415TelemetryManager.register(typeid=_fully_qualified_name(api_metrics.Counter)) 416TelemetryManager.register( 417 typeid=_fully_qualified_name(api_metrics.Meter), 418 method_to_typeid={ 419 api_metrics.Meter.create_gauge.__name__: _fully_qualified_name(api_metrics._Gauge), 420 api_metrics.Meter.create_counter.__name__: _fully_qualified_name(api_metrics.Counter), 421 }, 422) 423TelemetryManager.register( 424 typeid=_fully_qualified_name(api_metrics.MeterProvider), 425 method_to_typeid={api_metrics.MeterProvider.get_meter.__name__: _fully_qualified_name(api_metrics.Meter)}, 426) 427 428# Traces proxy object setup. 429TelemetryManager.register( 430 typeid=_fully_qualified_name(api_trace.Span), 431 # Non-public methods (i.e. ones starting with a ``_``) are omitted by default. 432 # 433 # We need ``__enter__`` and ``__exit__`` so the ``Span`` can be used as a ``ContextManager``. 434 exposed=[name for name, _ in inspect.getmembers(api_trace.Span, predicate=inspect.isfunction)], 435 method_to_typeid={api_trace.Span.__enter__.__name__: _fully_qualified_name(api_trace.Span)}, 436) 437TelemetryManager.register( 438 typeid=_fully_qualified_name(api_trace.Tracer), 439 # Can't proxy ``Tracer.start_as_current_span`` since it returns a generator (not pickleable) 440 # and tries to use the process-local global context (in this case, the manager's server process). 441 # 442 # Instead, spans should be constructed by: 443 # 444 # 1. Calling ``opentelemetry.context.get_current()`` to get the process-local global context (pickleable). 445 # 2. Creating a new span with the process-local global context. 446 # 3. Calling ``opentelemetry.trace.use_span()`` with the span and ``end_on_exit=True``. 447 method_to_typeid={api_trace.Tracer.start_span.__name__: _fully_qualified_name(api_trace.Span)}, 448) 449TelemetryManager.register( 450 typeid=_fully_qualified_name(api_trace.TracerProvider), 451 method_to_typeid={api_trace.TracerProvider.get_tracer.__name__: _fully_qualified_name(api_trace.Tracer)}, 452) 453 454# Telemetry proxy object setup. 455# 456# This should be the only registered type with a ``callable``. 457# It's the only type we create directly with a ``TelemetryManager``. 458TelemetryManager.register( 459 typeid=Telemetry.__name__, 460 callable=_init, 461 method_to_typeid={ 462 Telemetry.meter_provider.__name__: _fully_qualified_name(api_metrics.MeterProvider), 463 Telemetry.meter.__name__: _fully_qualified_name(api_metrics.Meter), 464 Telemetry.gauge.__name__: _fully_qualified_name(api_metrics._Gauge), 465 Telemetry.counter.__name__: _fully_qualified_name(api_metrics.Counter), 466 Telemetry.tracer_provider.__name__: _fully_qualified_name(api_trace.TracerProvider), 467 Telemetry.tracer.__name__: _fully_qualified_name(api_trace.Tracer), 468 }, 469) 470 471 472# Map of init options as a sorted JSON string (since dictionaries can't be hashed) to telemetry proxy. 473_TELEMETRY_PROXIES: dict[str, Telemetry] = {} 474# To share :py:class:`Telemetry` proxy objects within a process (e.g. client, server). 475# 476# Forking isn't expected to happen while this is held (may lead to a deadlock). 477_TELEMETRY_PROXIES_LOCK = threading.Lock() 478 479
[docs] 480class TelemetryMode(enum.Enum): 481 """ 482 How to create a :py:class:`Telemetry` object. 483 """ 484 485 #: Keep everything local to the process (not fork-safe). 486 LOCAL = "local" 487 #: Start + connect to a telemetry IPC server. 488 SERVER = "server" 489 #: Connect to a telemetry IPC server. 490 CLIENT = "client"
491 492 493def _telemetry_proxies_key( 494 mode: Literal[TelemetryMode.SERVER, TelemetryMode.CLIENT], address: Union[str, tuple[str, int]] 495) -> str: 496 """ 497 Get the key for the _TELEMETRY_PROXIES dictionary. 498 """ 499 init_options = {"mode": mode.value, "address": address} 500 return json.dumps(init_options, sort_keys=True) 501 502 503def _telemetry_manager_server_port(process_id: int) -> int: 504 """ 505 Get the default telemetry manager server port. 506 507 This is PID-based to: 508 509 * Avoid collisions between multiple independent Python interpreters running on the same machine. 510 * Let child processes deterministically find their parent's telemetry manager server. 511 512 :param process_id: Process ID used to calculate the port. 513 """ 514 515 # Use the dynamic/private/ephemeral port range. 516 # 517 # https://www.rfc-editor.org/rfc/rfc6335.html#section-6 518 # https://en.wikipedia.org/wiki/List_of_TCP_and_UDP_port_numbers#Dynamic,_private_or_ephemeral_ports 519 # 520 # Modulo the parent/child process PID by the port range length, then add the initial offset. 521 return (2**15 + 2**14) + (process_id % ((2**16) - (2**15 + 2**14))) 522 523 524def _init_server(address: Optional[Union[str, tuple[str, int]]] = None) -> Telemetry: 525 """ 526 Start + connect to a telemetry IPC server. 527 """ 528 global _TELEMETRY_PROXIES 529 global _TELEMETRY_PROXIES_LOCK 530 531 address = address or ("127.0.0.1", _telemetry_manager_server_port(process_id=psutil.Process().pid)) 532 telemetry_proxies_key = _telemetry_proxies_key(mode=TelemetryMode.SERVER, address=address) 533 534 with _TELEMETRY_PROXIES_LOCK: 535 if telemetry_proxies_key in _TELEMETRY_PROXIES: 536 return _TELEMETRY_PROXIES[telemetry_proxies_key] 537 else: 538 telemetry_manager = TelemetryManager( 539 address=address, 540 # Use spawn instead of the platform-specific default (may be fork) to avoid aforementioned issues with fork. 541 ctx=multiprocessing.get_context(method="spawn"), 542 ) 543 544 logger.debug(f"Creating telemetry manager server at {telemetry_manager.address}.") 545 try: 546 telemetry_manager.start() 547 atexit.register(telemetry_manager.shutdown) 548 logger.debug(f"Started telemetry manager server at {telemetry_manager.address}.") 549 except Exception as e: 550 logger.debug( 551 f"Failed to create telemetry manager server at {telemetry_manager.address}!", exc_info=True 552 ) 553 raise e 554 555 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.") 556 try: 557 telemetry_manager.connect() 558 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.") 559 return _TELEMETRY_PROXIES.setdefault(telemetry_proxies_key, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue] 560 except Exception as e: 561 logger.debug( 562 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!", exc_info=True 563 ) 564 raise e 565 566 567def _init_client(address: Optional[Union[str, tuple[str, int]]] = None) -> Telemetry: 568 """ 569 Connect to a telemetry IPC server. 570 """ 571 global _TELEMETRY_PROXIES 572 global _TELEMETRY_PROXIES_LOCK 573 574 candidate_addresses: list[Union[str, tuple[str, int]]] = [] 575 576 if address is not None: 577 candidate_addresses = [address] 578 else: 579 current_process = psutil.Process() 580 # Python processes from leaf to root. 581 python_process_ancestry: list[psutil.Process] = [ 582 current_process, 583 # Try the default telemetry manager server port for all ancestor process IDs. 584 # 585 # psutil is used since multiprocessing only exposes the parent process. 586 # 587 # We can't use `itertools.takewhile(lambda ancestor_process: ancestor_process.name() == current_process.name(), ...)` 588 # to limit ourselves to ancestor Python processes by process name since some may not be named 589 # `python{optional version}` in some cases (e.g. may be named `pytest`). 590 *current_process.parents(), 591 ] 592 # Try to connect from leaf to root. 593 candidate_addresses = [ 594 ("127.0.0.1", _telemetry_manager_server_port(process_id=process.pid)) for process in python_process_ancestry 595 ] 596 597 for candidate_address in candidate_addresses: 598 telemetry_proxies_key = _telemetry_proxies_key(mode=TelemetryMode.CLIENT, address=candidate_address) 599 600 with _TELEMETRY_PROXIES_LOCK: 601 if telemetry_proxies_key in _TELEMETRY_PROXIES: 602 return _TELEMETRY_PROXIES[telemetry_proxies_key] 603 else: 604 telemetry_manager = TelemetryManager(address=candidate_address) 605 606 logger.debug(f"Connecting to telemetry manager server at {telemetry_manager.address}.") 607 try: 608 telemetry_manager.connect() 609 logger.debug(f"Connected to telemetry manager server at {telemetry_manager.address}.") 610 return _TELEMETRY_PROXIES.setdefault(telemetry_proxies_key, telemetry_manager.Telemetry()) # pyright: ignore [reportAttributeAccessIssue] 611 except Exception: 612 logger.debug( 613 f"Failed to connect to telemetry manager server at {telemetry_manager.address}!", 614 exc_info=True, 615 ) 616 617 raise ConnectionError(f"Failed to connect to telemetry manager server at any of {candidate_addresses}!") 618 619
[docs] 620def init( 621 mode: Optional[TelemetryMode] = None, 622 address: Optional[Union[str, tuple[str, int]]] = None, 623) -> Telemetry: 624 """ 625 Create or return an existing :py:class:`Telemetry` instance or :py:class:`Telemetry` proxy object. 626 627 :param mode: How to create a :py:class:`Telemetry` object. If ``None``, the default heuristic chooses a mode based on the presence of telemetry IPC servers in the process tree. 628 :param address: Telemetry IPC server address. Passed directly to a :py:class:`multiprocessing.managers.BaseManager`. Ignored if the mode is :py:const:`TelemetryMode.LOCAL`. 629 :return: A telemetry instance. 630 """ 631 632 if mode is None: 633 # Main process. 634 if multiprocessing.parent_process() is None: 635 # Daemons can't have child processes. 636 # 637 # Try to create a telemetry instance in local mode. 638 # 639 # ⚠️ This may cause CPU contention if the current process is compute-intensive 640 # and a high collect and/or export frequency is used due to global interpreter lock (GIL). 641 if multiprocessing.current_process().daemon: 642 return _init() 643 # Start + connect to a telemetry IPC server. 644 else: 645 return _init_server(address=address) 646 # Child process. 647 else: 648 # Connect to a telemetry IPC server. 649 try: 650 return _init_client(address=address) 651 # No existing telemetry IPC server. 652 except ConnectionError: 653 # Daemons can't have child processes. 654 # 655 # Try to create a telemetry instance in local mode. 656 # 657 # ⚠️ This may cause CPU contention if the current process is compute-intensive 658 # and a high collect and/or export frequency is used due to global interpreter lock (GIL). 659 if multiprocessing.current_process().daemon: 660 return _init() 661 # Start + connect to a telemetry IPC server. 662 else: 663 return _init_server(address=address) 664 elif mode == TelemetryMode.LOCAL: 665 return _init() 666 elif mode == TelemetryMode.SERVER: 667 return _init_server(address=address) 668 elif mode == TelemetryMode.CLIENT: 669 return _init_client(address=address) 670 else: 671 raise ValueError(f"Unsupported telemetry mode: {mode}")