Source code for multistorageclient.telemetry.metrics.readers.diperiodic_exporting
1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import logging
17import math
18import os
19import threading
20import time
21import weakref
22from typing import Optional
23
24import opentelemetry.sdk.environment_variables as sdk_environment_variables
25import opentelemetry.sdk.metrics as sdk_metrics
26import opentelemetry.sdk.metrics.export as sdk_metrics_export
27
28# Not OTel spec. Use 1 second to keep the data volume per export interval reasonably small.
29DEFAULT_COLLECT_INTERVAL_MILLIS: float = 1000
30# Not OTel spec. Use the default on :py:meth:`sdk_metrics_export.MetricReader.collect`.
31DEFAULT_COLLECT_TIMEOUT_MILLIS: float = 10000
32# OTel spec.
33DEFAULT_EXPORT_INTERVAL_MILLIS: float = 60000
34# OTel spec.
35DEFAULT_EXPORT_TIMEOUT_MILLIS: float = 30000
36
37logger = logging.getLogger(__name__)
38
39
[docs]
40class DiperiodicExportingMetricReader(sdk_metrics_export.MetricReader):
41 """
42 :py:class:`opentelemetry.sdk.metrics.export.MetricReader` that collects + exports metrics on separate user-configurable time intervals.
43 This is in contrast with :py:class:`opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` which couples them with a 1 minute default.
44
45 The metrics collection interval limits the temporal resolution. Most metric backends have 1 millisecond or finer temporal resolution.
46 """
47
48 #: Collect buffer.
49 _collect_metrics_data: Optional[sdk_metrics_export.MetricsData]
50 _collect_metrics_data_lock: threading.Lock
51 #: Export buffer.
52 _export_metrics_data: Optional[sdk_metrics_export.MetricsData]
53 _export_metrics_data_lock: threading.Lock
54
55 _exporter: sdk_metrics_export.MetricExporter
56 _collect_interval_millis: float
57 _collect_timeout_millis: float
58 _export_interval_millis: float
59 _export_timeout_millis: float
60
61 _shutdown_event: threading.Event
62 _shutdown_event_lock: threading.Lock
63 _collect_daemon: Optional[threading.Thread]
64 _export_daemon: Optional[threading.Thread]
65
66 def __init__(
67 self,
68 exporter: sdk_metrics_export.MetricExporter,
69 collect_interval_millis: Optional[float] = None,
70 collect_timeout_millis: Optional[float] = None,
71 export_interval_millis: Optional[float] = None,
72 export_timeout_millis: Optional[float] = None,
73 ):
74 """
75 :param exporter: Metrics exporter.
76 :param collect_interval_millis: Collect interval in milliseconds.
77 :param collect_timeout_millis: Collect timeout in milliseconds.
78 :param export_interval_millis: Export interval in milliseconds.
79 :param export_timeout_millis: Export timeout in milliseconds.
80 """
81
82 # Defer to the exporter for aggregation and temporality configurations.
83 super().__init__(
84 preferred_aggregation=exporter._preferred_aggregation, preferred_temporality=exporter._preferred_temporality
85 )
86
87 self._collect_metrics_data = None
88 self._collect_metrics_data_lock = threading.Lock()
89 self._export_metrics_data = None
90 self._export_metrics_data_lock = threading.Lock()
91
92 self._exporter = exporter
93 if collect_interval_millis is None:
94 # OTEL_METRIC_COLLECT_INTERVAL isn't an official OTel SDK environment variable (yet).
95 collect_interval_millis = DEFAULT_COLLECT_INTERVAL_MILLIS
96 if collect_timeout_millis is None:
97 # OTEL_METRIC_COLLECT_TIMEOUT isn't an official OTel SDK environment variable (yet).
98 collect_timeout_millis = DEFAULT_COLLECT_TIMEOUT_MILLIS
99 if export_interval_millis is None:
100 try:
101 export_interval_millis = float(
102 os.environ.get(
103 sdk_environment_variables.OTEL_METRIC_EXPORT_INTERVAL, DEFAULT_EXPORT_INTERVAL_MILLIS
104 )
105 )
106 except ValueError:
107 logger.warning(
108 f"Found invalid value for export interval. Using default of {DEFAULT_EXPORT_INTERVAL_MILLIS}."
109 )
110 export_interval_millis = DEFAULT_EXPORT_INTERVAL_MILLIS
111 if export_timeout_millis is None:
112 try:
113 export_timeout_millis = float(
114 os.environ.get(sdk_environment_variables.OTEL_METRIC_EXPORT_TIMEOUT, DEFAULT_EXPORT_TIMEOUT_MILLIS)
115 )
116 except ValueError:
117 logger.warning(
118 f"Found invalid value for export timeout. Using default of {DEFAULT_EXPORT_TIMEOUT_MILLIS}."
119 )
120 export_timeout_millis = DEFAULT_EXPORT_TIMEOUT_MILLIS
121 self._collect_interval_millis = collect_interval_millis
122 self._collect_timeout_millis = collect_timeout_millis
123 self._export_interval_millis = export_interval_millis
124 self._export_timeout_millis = export_timeout_millis
125
126 self._shutdown_event = threading.Event()
127 self._shutdown_event_lock = threading.Lock()
128 self._collect_daemon = None
129 self._export_daemon = None
130 if (
131 self._collect_interval_millis > 0
132 and self._collect_interval_millis < math.inf
133 and self._export_interval_millis > 0
134 and self._export_interval_millis < math.inf
135 ):
136 self._init_daemons()
137 if hasattr(os, "register_at_fork"):
138 os.register_at_fork(after_in_child=weakref.WeakMethod(self._init_daemons)())
139 else:
140 raise ValueError("Collect and export intervals must be in (0, infinity).")
141
142 def _init_daemons(self) -> None:
143 # Empty the buffers. Prevents duplicate metrics when forking.
144 with self._collect_metrics_data_lock, self._export_metrics_data_lock:
145 self._collect_metrics_data, self._export_metrics_data = None, None
146
147 # Create the collect daemon.
148 self._collect_daemon = threading.Thread(
149 name="OtelDiperiodicExportingMetricReader._collect_daemon", target=self._collect_daemon_target, daemon=True
150 )
151 self._collect_daemon.start()
152
153 # Create the export daemon.
154 self._export_daemon = threading.Thread(
155 name="OtelDiperiodicExportingMetricReader._export_daemon", target=self._export_daemon_target, daemon=True
156 )
157 self._export_daemon.start()
158
159 def _collect_daemon_target(self) -> None:
160 while not self._shutdown_event.wait(timeout=self._collect_interval_millis / 10**3):
161 self._collect_iteration()
162
163 def _export_daemon_target(self) -> None:
164 while not self._shutdown_event.wait(timeout=self._export_interval_millis / 10**3):
165 self._export_iteration()
166 # Final collect + export.
167 self._collect_iteration()
168 self._export_iteration()
169
170 # :py:meth:`sdk_metrics_export.MetricReader._collect` is reserved. Using another name.
171 def _collect_iteration(self, timeout_millis: Optional[float] = None) -> None:
172 try:
173 # Only set when registered on a :py:class:`sdk_metrics.MeterProvider` which calls
174 # :py:meth:`sdk_metrics_export.MetricReader._set_collect_callback`.
175 if self._collect is not None:
176 # Inherited from :py:class:``sdk_metrics_export.MetricReader``.
177 self.collect(timeout_millis=timeout_millis or self._collect_timeout_millis)
178 except sdk_metrics.MetricsTimeoutError:
179 logger.warning("Metrics collection timed out.", exc_info=True)
180 except Exception:
181 logger.exception("Exception while collecting metrics.")
182
183 # Called by :py:meth:`sdk_metrics_export.MetricReader.collect`.
184 def _receive_metrics(
185 self, metrics_data: sdk_metrics_export.MetricsData, timeout_millis: float = 0, **kwargs
186 ) -> None:
187 with self._collect_metrics_data_lock:
188 self._collect_metrics_data = sdk_metrics_export.MetricsData(
189 resource_metrics=(
190 *(() if self._collect_metrics_data is None else self._collect_metrics_data.resource_metrics),
191 *metrics_data.resource_metrics,
192 )
193 )
194
195 def _export_iteration(self, timeout_millis: Optional[float] = None) -> None:
196 with self._export_metrics_data_lock:
197 with self._collect_metrics_data_lock:
198 # Rotate the collect + export buffers.
199 #
200 # We don't merge the collect buffer into the export buffer to prevent infinite accumulation.
201 self._collect_metrics_data, self._export_metrics_data = None, self._collect_metrics_data
202
203 if self._export_metrics_data is not None:
204 try:
205 # Export.
206 self._exporter.export(
207 metrics_data=self._export_metrics_data,
208 timeout_millis=timeout_millis or self._export_timeout_millis,
209 )
210 except sdk_metrics.MetricsTimeoutError:
211 logger.warning(
212 f"Metrics export timed out. {sum(len(rm.scope_metrics) for rm in self._export_metrics_data.resource_metrics)} data points lost.",
213 exc_info=True,
214 )
215 except Exception:
216 logger.exception(
217 f"Exception while exporting metrics. {sum(len(rm.scope_metrics) for rm in self._export_metrics_data.resource_metrics)} data points lost."
218 )
219 finally:
220 # Immediately empty the export buffer for garbage collection.
221 self._export_metrics_data = None
222
[docs]
223 def force_flush(
224 self, timeout_millis: float = DEFAULT_COLLECT_TIMEOUT_MILLIS + DEFAULT_EXPORT_TIMEOUT_MILLIS
225 ) -> bool:
226 deadline_ns = time.time_ns() + (timeout_millis * 10**6)
227
228 # Calls :py:meth:`sdk_metrics_export.MetricReader.collect`.
229 super().force_flush(timeout_millis=(deadline_ns - time.time_ns()) / 10**6)
230 self._export_iteration(timeout_millis=(deadline_ns - time.time_ns()) / 10**6)
231 self._exporter.force_flush(timeout_millis=(deadline_ns - time.time_ns()) / 10**6)
232 return True
233
[docs]
234 def shutdown(
235 self, timeout_millis: float = DEFAULT_COLLECT_TIMEOUT_MILLIS + DEFAULT_EXPORT_TIMEOUT_MILLIS, **kwargs
236 ) -> None:
237 deadline_ns = time.time_ns() + (timeout_millis * 10**6)
238
239 with self._shutdown_event_lock:
240 if not self._shutdown_event.is_set():
241 if self._collect_daemon is not None:
242 self._collect_daemon.join(timeout=(deadline_ns - time.time_ns()) / 10**9)
243 if self._export_daemon is not None:
244 self._export_daemon.join(timeout=(deadline_ns - time.time_ns()) / 10**9)
245 self._exporter.shutdown(timeout_millis=(deadline_ns - time.time_ns()) / 10**6)
246 self._shutdown_event.set()