Source code for multistorageclient.telemetry.metrics.readers.diperiodic_exporting

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import logging
 17import math
 18import os
 19import threading
 20import time
 21import weakref
 22from typing import Optional
 23
 24import opentelemetry.sdk.environment_variables as sdk_environment_variables
 25import opentelemetry.sdk.metrics as sdk_metrics
 26import opentelemetry.sdk.metrics.export as sdk_metrics_export
 27
 28# Not OTel spec. Use 1 second to keep the data volume per export interval reasonably small.
 29DEFAULT_COLLECT_INTERVAL_MILLIS: float = 1000
 30# Not OTel spec. Use the default on :py:meth:`sdk_metrics_export.MetricReader.collect`.
 31DEFAULT_COLLECT_TIMEOUT_MILLIS: float = 10000
 32# OTel spec.
 33DEFAULT_EXPORT_INTERVAL_MILLIS: float = 60000
 34# OTel spec.
 35DEFAULT_EXPORT_TIMEOUT_MILLIS: float = 30000
 36
 37logger = logging.getLogger(__name__)
 38
 39
[docs] 40class DiperiodicExportingMetricReader(sdk_metrics_export.MetricReader): 41 """ 42 :py:class:`opentelemetry.sdk.metrics.export.MetricReader` that collects + exports metrics on separate user-configurable time intervals. 43 This is in contrast with :py:class:`opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` which couples them with a 1 minute default. 44 45 The metrics collection interval limits the temporal resolution. Most metric backends have 1 millisecond or finer temporal resolution. 46 """ 47 48 #: Collect buffer. 49 _collect_metrics_data: Optional[sdk_metrics_export.MetricsData] 50 _collect_metrics_data_lock: threading.Lock 51 #: Export buffer. 52 _export_metrics_data: Optional[sdk_metrics_export.MetricsData] 53 _export_metrics_data_lock: threading.Lock 54 55 _exporter: sdk_metrics_export.MetricExporter 56 _collect_interval_millis: float 57 _collect_timeout_millis: float 58 _export_interval_millis: float 59 _export_timeout_millis: float 60 61 _shutdown_event: threading.Event 62 _shutdown_event_lock: threading.Lock 63 _collect_daemon: Optional[threading.Thread] 64 _export_daemon: Optional[threading.Thread] 65 66 def __init__( 67 self, 68 exporter: sdk_metrics_export.MetricExporter, 69 collect_interval_millis: Optional[float] = None, 70 collect_timeout_millis: Optional[float] = None, 71 export_interval_millis: Optional[float] = None, 72 export_timeout_millis: Optional[float] = None, 73 ): 74 """ 75 :param exporter: Metrics exporter. 76 :param collect_interval_millis: Collect interval in milliseconds. 77 :param collect_timeout_millis: Collect timeout in milliseconds. 78 :param export_interval_millis: Export interval in milliseconds. 79 :param export_timeout_millis: Export timeout in milliseconds. 80 """ 81 82 # Defer to the exporter for aggregation and temporality configurations. 83 super().__init__( 84 preferred_aggregation=exporter._preferred_aggregation, preferred_temporality=exporter._preferred_temporality 85 ) 86 87 self._collect_metrics_data = None 88 self._collect_metrics_data_lock = threading.Lock() 89 self._export_metrics_data = None 90 self._export_metrics_data_lock = threading.Lock() 91 92 self._exporter = exporter 93 if collect_interval_millis is None: 94 # OTEL_METRIC_COLLECT_INTERVAL isn't an official OTel SDK environment variable (yet). 95 collect_interval_millis = DEFAULT_COLLECT_INTERVAL_MILLIS 96 if collect_timeout_millis is None: 97 # OTEL_METRIC_COLLECT_TIMEOUT isn't an official OTel SDK environment variable (yet). 98 collect_timeout_millis = DEFAULT_COLLECT_TIMEOUT_MILLIS 99 if export_interval_millis is None: 100 try: 101 export_interval_millis = float( 102 os.environ.get( 103 sdk_environment_variables.OTEL_METRIC_EXPORT_INTERVAL, DEFAULT_EXPORT_INTERVAL_MILLIS 104 ) 105 ) 106 except ValueError: 107 logger.warning( 108 f"Found invalid value for export interval. Using default of {DEFAULT_EXPORT_INTERVAL_MILLIS}." 109 ) 110 export_interval_millis = DEFAULT_EXPORT_INTERVAL_MILLIS 111 if export_timeout_millis is None: 112 try: 113 export_timeout_millis = float( 114 os.environ.get(sdk_environment_variables.OTEL_METRIC_EXPORT_TIMEOUT, DEFAULT_EXPORT_TIMEOUT_MILLIS) 115 ) 116 except ValueError: 117 logger.warning( 118 f"Found invalid value for export timeout. Using default of {DEFAULT_EXPORT_TIMEOUT_MILLIS}." 119 ) 120 export_timeout_millis = DEFAULT_EXPORT_TIMEOUT_MILLIS 121 self._collect_interval_millis = collect_interval_millis 122 self._collect_timeout_millis = collect_timeout_millis 123 self._export_interval_millis = export_interval_millis 124 self._export_timeout_millis = export_timeout_millis 125 126 self._shutdown_event = threading.Event() 127 self._shutdown_event_lock = threading.Lock() 128 self._collect_daemon = None 129 self._export_daemon = None 130 if ( 131 self._collect_interval_millis > 0 132 and self._collect_interval_millis < math.inf 133 and self._export_interval_millis > 0 134 and self._export_interval_millis < math.inf 135 ): 136 self._init_daemons() 137 if hasattr(os, "register_at_fork"): 138 os.register_at_fork(after_in_child=weakref.WeakMethod(self._init_daemons)()) 139 else: 140 raise ValueError("Collect and export intervals must be in (0, infinity).") 141 142 def _init_daemons(self) -> None: 143 # Empty the buffers. Prevents duplicate metrics when forking. 144 with self._collect_metrics_data_lock, self._export_metrics_data_lock: 145 self._collect_metrics_data, self._export_metrics_data = None, None 146 147 # Create the collect daemon. 148 self._collect_daemon = threading.Thread( 149 name="OtelDiperiodicExportingMetricReader._collect_daemon", target=self._collect_daemon_target, daemon=True 150 ) 151 self._collect_daemon.start() 152 153 # Create the export daemon. 154 self._export_daemon = threading.Thread( 155 name="OtelDiperiodicExportingMetricReader._export_daemon", target=self._export_daemon_target, daemon=True 156 ) 157 self._export_daemon.start() 158 159 def _collect_daemon_target(self) -> None: 160 while not self._shutdown_event.wait(timeout=self._collect_interval_millis / 10**3): 161 self._collect_iteration() 162 163 def _export_daemon_target(self) -> None: 164 while not self._shutdown_event.wait(timeout=self._export_interval_millis / 10**3): 165 self._export_iteration() 166 # Final collect + export. 167 self._collect_iteration() 168 self._export_iteration() 169 170 # :py:meth:`sdk_metrics_export.MetricReader._collect` is reserved. Using another name. 171 def _collect_iteration(self, timeout_millis: Optional[float] = None) -> None: 172 try: 173 # Only set when registered on a :py:class:`sdk_metrics.MeterProvider` which calls 174 # :py:meth:`sdk_metrics_export.MetricReader._set_collect_callback`. 175 if self._collect is not None: 176 # Inherited from :py:class:``sdk_metrics_export.MetricReader``. 177 self.collect(timeout_millis=timeout_millis or self._collect_timeout_millis) 178 except sdk_metrics.MetricsTimeoutError: 179 logger.warning("Metrics collection timed out.", exc_info=True) 180 except Exception: 181 logger.exception("Exception while collecting metrics.") 182 183 # Called by :py:meth:`sdk_metrics_export.MetricReader.collect`. 184 def _receive_metrics( 185 self, metrics_data: sdk_metrics_export.MetricsData, timeout_millis: float = 0, **kwargs 186 ) -> None: 187 with self._collect_metrics_data_lock: 188 self._collect_metrics_data = sdk_metrics_export.MetricsData( 189 resource_metrics=( 190 *(() if self._collect_metrics_data is None else self._collect_metrics_data.resource_metrics), 191 *metrics_data.resource_metrics, 192 ) 193 ) 194 195 def _export_iteration(self, timeout_millis: Optional[float] = None) -> None: 196 with self._export_metrics_data_lock: 197 with self._collect_metrics_data_lock: 198 # Rotate the collect + export buffers. 199 # 200 # We don't merge the collect buffer into the export buffer to prevent infinite accumulation. 201 self._collect_metrics_data, self._export_metrics_data = None, self._collect_metrics_data 202 203 if self._export_metrics_data is not None: 204 try: 205 # Export. 206 self._exporter.export( 207 metrics_data=self._export_metrics_data, 208 timeout_millis=timeout_millis or self._export_timeout_millis, 209 ) 210 except sdk_metrics.MetricsTimeoutError: 211 logger.warning("Metrics export timed out.", exc_info=True) 212 except Exception: 213 logger.exception("Exception while exporting metrics.") 214 finally: 215 # Immediately empty the export buffer for garbage collection. 216 self._export_metrics_data = None 217
[docs] 218 def force_flush( 219 self, timeout_millis: float = DEFAULT_COLLECT_TIMEOUT_MILLIS + DEFAULT_EXPORT_TIMEOUT_MILLIS 220 ) -> bool: 221 deadline_ns = time.time_ns() + (timeout_millis * 10**6) 222 223 # Calls :py:meth:`sdk_metrics_export.MetricReader.collect`. 224 super().force_flush(timeout_millis=(deadline_ns - time.time_ns()) / 10**6) 225 self._export_iteration(timeout_millis=(deadline_ns - time.time_ns()) / 10**6) 226 self._exporter.force_flush(timeout_millis=(deadline_ns - time.time_ns()) / 10**6) 227 return True
228
[docs] 229 def shutdown( 230 self, timeout_millis: float = DEFAULT_COLLECT_TIMEOUT_MILLIS + DEFAULT_EXPORT_TIMEOUT_MILLIS, **kwargs 231 ) -> None: 232 deadline_ns = time.time_ns() + (timeout_millis * 10**6) 233 234 with self._shutdown_event_lock: 235 if not self._shutdown_event.is_set(): 236 if self._collect_daemon is not None: 237 self._collect_daemon.join(timeout=(deadline_ns - time.time_ns()) / 10**9) 238 if self._export_daemon is not None: 239 self._export_daemon.join(timeout=(deadline_ns - time.time_ns()) / 10**9) 240 self._exporter.shutdown(timeout_millis=(deadline_ns - time.time_ns()) / 10**6) 241 self._shutdown_event.set()