Source code for multistorageclient.telemetry.metrics.readers.diperiodic_exporting

  1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
  2# SPDX-License-Identifier: Apache-2.0
  3#
  4# Licensed under the Apache License, Version 2.0 (the "License");
  5# you may not use this file except in compliance with the License.
  6# You may obtain a copy of the License at
  7#
  8# http://www.apache.org/licenses/LICENSE-2.0
  9#
 10# Unless required by applicable law or agreed to in writing, software
 11# distributed under the License is distributed on an "AS IS" BASIS,
 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13# See the License for the specific language governing permissions and
 14# limitations under the License.
 15
 16import logging
 17import math
 18import os
 19import threading
 20import time
 21import weakref
 22from typing import Optional
 23
 24import opentelemetry.sdk.environment_variables as sdk_environment_variables
 25import opentelemetry.sdk.metrics as sdk_metrics
 26import opentelemetry.sdk.metrics.export as sdk_metrics_export
 27
 28# Not OTel spec. Use 1 second to keep the data volume per export interval reasonably small.
 29DEFAULT_COLLECT_INTERVAL_MILLIS: float = 1000
 30# Not OTel spec. Use the default on :py:meth:`sdk_metrics_export.MetricReader.collect`.
 31DEFAULT_COLLECT_TIMEOUT_MILLIS: float = 10000
 32# OTel spec.
 33DEFAULT_EXPORT_INTERVAL_MILLIS: float = 60000
 34# OTel spec.
 35DEFAULT_EXPORT_TIMEOUT_MILLIS: float = 30000
 36
 37logger = logging.Logger(__name__)
 38
 39
[docs] 40class DiperiodicExportingMetricReader(sdk_metrics_export.MetricReader): 41 """ 42 :py:class:`opentelemetry.sdk.metrics.export.MetricReader` that collects + exports metrics on separate user-configurable time intervals. 43 This is in contrast with :py:class:`opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` which couples them with a 1 minute default. 44 45 The metrics collection interval limits the temporal resolution. Most metric backends have 1 millisecond or finer temporal resolution. 46 """ 47 48 #: Collect buffer. 49 _collect_metrics_data: Optional[sdk_metrics_export.MetricsData] 50 _collect_metrics_data_lock: threading.Lock 51 #: Export buffer. 52 _export_metrics_data: Optional[sdk_metrics_export.MetricsData] 53 _export_metrics_data_lock: threading.Lock 54 55 _exporter: sdk_metrics_export.MetricExporter 56 _collect_interval_millis: float 57 _collect_timeout_millis: float 58 _export_interval_millis: float 59 _export_timeout_millis: float 60 61 _shutdown_event: threading.Event 62 _shutdown_event_lock: threading.Lock 63 _collect_daemon: Optional[threading.Thread] 64 _export_daemon: Optional[threading.Thread] 65 66 def __init__( 67 self, 68 exporter: sdk_metrics_export.MetricExporter, 69 collect_interval_millis: Optional[float] = None, 70 collect_timeout_millis: Optional[float] = None, 71 export_interval_millis: Optional[float] = None, 72 export_timeout_millis: Optional[float] = None, 73 ): 74 """ 75 :param exporter: Metrics exporter. 76 :param collect_interval_millis: Collect interval in milliseconds. 77 :param collect_timeout_millis: Collect timeout in milliseconds. 78 :param export_interval_millis: Export interval in milliseconds. 79 :param export_timeout_millis: Export timeout in milliseconds. 80 """ 81 82 # Defer to the exporter for aggregation and temporality configurations. 83 super().__init__( 84 preferred_aggregation=exporter._preferred_aggregation, preferred_temporality=exporter._preferred_temporality 85 ) 86 87 self._collect_metrics_data = None 88 self._collect_metrics_data_lock = threading.Lock() 89 self._export_metrics_data = None 90 self._export_metrics_data_lock = threading.Lock() 91 92 self._exporter = exporter 93 if collect_interval_millis is None: 94 # OTEL_METRIC_COLLECT_INTERVAL isn't an official OTel SDK environment variable (yet). 95 collect_interval_millis = DEFAULT_COLLECT_INTERVAL_MILLIS 96 if collect_timeout_millis is None: 97 # OTEL_METRIC_COLLECT_TIMEOUT isn't an official OTel SDK environment variable (yet). 98 collect_timeout_millis = DEFAULT_COLLECT_TIMEOUT_MILLIS 99 if export_interval_millis is None: 100 try: 101 export_interval_millis = float( 102 os.environ.get( 103 sdk_environment_variables.OTEL_METRIC_EXPORT_INTERVAL, DEFAULT_EXPORT_INTERVAL_MILLIS 104 ) 105 ) 106 except ValueError: 107 logger.warning( 108 f"Found invalid value for export interval. Using default of {DEFAULT_EXPORT_INTERVAL_MILLIS}." 109 ) 110 export_interval_millis = DEFAULT_EXPORT_INTERVAL_MILLIS 111 if export_timeout_millis is None: 112 try: 113 export_timeout_millis = float( 114 os.environ.get(sdk_environment_variables.OTEL_METRIC_EXPORT_TIMEOUT, DEFAULT_EXPORT_TIMEOUT_MILLIS) 115 ) 116 except ValueError: 117 logger.warning( 118 f"Found invalid value for export timeout. Using default of {DEFAULT_EXPORT_TIMEOUT_MILLIS}." 119 ) 120 export_timeout_millis = DEFAULT_EXPORT_TIMEOUT_MILLIS 121 self._collect_interval_millis = collect_interval_millis 122 self._collect_timeout_millis = collect_timeout_millis 123 self._export_interval_millis = export_interval_millis 124 self._export_timeout_millis = export_timeout_millis 125 126 self._shutdown_event = threading.Event() 127 self._shutdown_event_lock = threading.Lock() 128 self._collect_daemon = None 129 self._export_daemon = None 130 if ( 131 self._collect_interval_millis > 0 132 and self._collect_interval_millis < math.inf 133 and self._export_interval_millis > 0 134 and self._export_interval_millis < math.inf 135 ): 136 self._init_daemons() 137 if hasattr(os, "register_at_fork"): 138 os.register_at_fork(after_in_child=weakref.WeakMethod(self._init_daemons)()) 139 else: 140 raise ValueError("Collect and export intervals must be in (0, infinity).") 141 142 def _init_daemons(self) -> None: 143 # Empty the buffers. Prevents duplicate metrics when forking. 144 with self._collect_metrics_data_lock, self._export_metrics_data_lock: 145 self._collect_metrics_data, self._export_metrics_data = None, None 146 147 # Create the collect daemon. 148 self._collect_daemon = threading.Thread( 149 name="OtelDiperiodicExportingMetricReader._collect_daemon", target=self._collect_daemon_target, daemon=True 150 ) 151 self._collect_daemon.start() 152 153 # Create the export daemon. 154 self._export_daemon = threading.Thread( 155 name="OtelDiperiodicExportingMetricReader._export_daemon", target=self._export_daemon_target, daemon=True 156 ) 157 self._export_daemon.start() 158 159 def _collect_daemon_target(self) -> None: 160 while not self._shutdown_event.wait(timeout=self._collect_interval_millis / 10**3): 161 self._collect_iteration() 162 163 def _export_daemon_target(self) -> None: 164 while not self._shutdown_event.wait(timeout=self._export_interval_millis / 10**3): 165 self._export_iteration() 166 # Final collect + export. 167 self._collect_iteration() 168 self._export_iteration() 169 170 # :py:class:`sdk_metrics_export.MetricReader._collect` already exists. Using another name. 171 def _collect_iteration(self, timeout_millis: Optional[float] = None) -> None: 172 try: 173 # Inherited from :py:class:``sdk_metrics_export.MetricReader``. 174 self.collect(timeout_millis=timeout_millis or self._collect_timeout_millis) 175 except sdk_metrics.MetricsTimeoutError: 176 logger.warning("Metrics collection timed out.", exc_info=True) 177 except Exception: 178 logger.exception("Exception while collecting metrics.") 179 180 # Called by :py:meth:`sdk_metrics_export.MetricReader.collect`. 181 def _receive_metrics( 182 self, metrics_data: sdk_metrics_export.MetricsData, timeout_millis: float = 0, **kwargs 183 ) -> None: 184 with self._collect_metrics_data_lock: 185 self._collect_metrics_data = sdk_metrics_export.MetricsData( 186 resource_metrics=( 187 *(() if self._collect_metrics_data is None else self._collect_metrics_data.resource_metrics), 188 *metrics_data.resource_metrics, 189 ) 190 ) 191 192 def _export_iteration(self, timeout_millis: Optional[float] = None) -> None: 193 with self._export_metrics_data_lock: 194 with self._collect_metrics_data_lock: 195 # Rotate the collect + export buffers. 196 # 197 # We don't merge the collect buffer into the export buffer to prevent infinite accumulation. 198 self._collect_metrics_data, self._export_metrics_data = None, self._collect_metrics_data 199 200 if self._export_metrics_data is not None: 201 try: 202 # Export. 203 self._exporter.export( 204 metrics_data=self._export_metrics_data, 205 timeout_millis=timeout_millis or self._export_timeout_millis, 206 ) 207 except sdk_metrics.MetricsTimeoutError: 208 logger.warning("Metrics export timed out.", exc_info=True) 209 except Exception: 210 logger.exception("Exception while exporting metrics.") 211 finally: 212 # Immediately empty the export buffer for garbage collection. 213 self._export_metrics_data = None 214
[docs] 215 def force_flush( 216 self, timeout_millis: float = DEFAULT_COLLECT_TIMEOUT_MILLIS + DEFAULT_EXPORT_TIMEOUT_MILLIS 217 ) -> bool: 218 deadline_ns = time.time_ns() + (timeout_millis * 10**6) 219 220 # Calls :py:meth:`sdk_metrics_export.MetricReader.collect`. 221 super().force_flush(timeout_millis=(deadline_ns - time.time_ns()) / 10**6) 222 self._export_iteration(timeout_millis=(deadline_ns - time.time_ns()) / 10**6) 223 self._exporter.force_flush(timeout_millis=(deadline_ns - time.time_ns()) / 10**6) 224 return True
225
[docs] 226 def shutdown( 227 self, timeout_millis: float = DEFAULT_COLLECT_TIMEOUT_MILLIS + DEFAULT_EXPORT_TIMEOUT_MILLIS, **kwargs 228 ) -> None: 229 deadline_ns = time.time_ns() + (timeout_millis * 10**6) 230 231 with self._shutdown_event_lock: 232 if not self._shutdown_event.is_set(): 233 if self._collect_daemon is not None: 234 self._collect_daemon.join(timeout=(deadline_ns - time.time_ns()) / 10**9) 235 if self._export_daemon is not None: 236 self._export_daemon.join(timeout=(deadline_ns - time.time_ns()) / 10**9) 237 self._exporter.shutdown(timeout_millis=(deadline_ns - time.time_ns()) / 10**6) 238 self._shutdown_event.set()