Source code for multistorageclient.telemetry.metrics.readers.diperiodic_exporting
1# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 2# SPDX-License-Identifier: Apache-2.0 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16importlogging 17importmath 18importos 19importthreading 20importtime 21importweakref 22fromtypingimportOptional 23 24importopentelemetry.sdk.environment_variablesassdk_environment_variables 25importopentelemetry.sdk.metricsassdk_metrics 26importopentelemetry.sdk.metrics.exportassdk_metrics_export 27 28# Not OTel spec. Use 1 second to keep the data volume per export interval reasonably small. 29DEFAULT_COLLECT_INTERVAL_MILLIS:float=1000 30# Not OTel spec. Use the default on :py:meth:`sdk_metrics_export.MetricReader.collect`. 31DEFAULT_COLLECT_TIMEOUT_MILLIS:float=10000 32# OTel spec. 33DEFAULT_EXPORT_INTERVAL_MILLIS:float=60000 34# OTel spec. 35DEFAULT_EXPORT_TIMEOUT_MILLIS:float=30000 36 37logger=logging.Logger(__name__) 38 39
[docs] 40classDiperiodicExportingMetricReader(sdk_metrics_export.MetricReader): 41""" 42 :py:class:`opentelemetry.sdk.metrics.export.MetricReader` that collects + exports metrics on separate user-configurable time intervals. 43 This is in contrast with :py:class:`opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` which couples them with a 1 minute default. 44 45 The metrics collection interval limits the temporal resolution. Most metric backends have 1 millisecond or finer temporal resolution. 46 """ 47 48#: Collect buffer. 49_collect_metrics_data:Optional[sdk_metrics_export.MetricsData] 50_collect_metrics_data_lock:threading.Lock 51#: Export buffer. 52_export_metrics_data:Optional[sdk_metrics_export.MetricsData] 53_export_metrics_data_lock:threading.Lock 54 55_exporter:sdk_metrics_export.MetricExporter 56_collect_interval_millis:float 57_collect_timeout_millis:float 58_export_interval_millis:float 59_export_timeout_millis:float 60 61_shutdown_event:threading.Event 62_shutdown_event_lock:threading.Lock 63_collect_daemon:Optional[threading.Thread] 64_export_daemon:Optional[threading.Thread] 65 66def__init__( 67self, 68exporter:sdk_metrics_export.MetricExporter, 69collect_interval_millis:Optional[float]=None, 70collect_timeout_millis:Optional[float]=None, 71export_interval_millis:Optional[float]=None, 72export_timeout_millis:Optional[float]=None, 73): 74""" 75 :param exporter: Metrics exporter. 76 :param collect_interval_millis: Collect interval in milliseconds. 77 :param collect_timeout_millis: Collect timeout in milliseconds. 78 :param export_interval_millis: Export interval in milliseconds. 79 :param export_timeout_millis: Export timeout in milliseconds. 80 """ 81 82# Defer to the exporter for aggregation and temporality configurations. 83super().__init__( 84preferred_aggregation=exporter._preferred_aggregation,preferred_temporality=exporter._preferred_temporality 85) 86 87self._collect_metrics_data=None 88self._collect_metrics_data_lock=threading.Lock() 89self._export_metrics_data=None 90self._export_metrics_data_lock=threading.Lock() 91 92self._exporter=exporter 93ifcollect_interval_millisisNone: 94# OTEL_METRIC_COLLECT_INTERVAL isn't an official OTel SDK environment variable (yet). 95collect_interval_millis=DEFAULT_COLLECT_INTERVAL_MILLIS 96ifcollect_timeout_millisisNone: 97# OTEL_METRIC_COLLECT_TIMEOUT isn't an official OTel SDK environment variable (yet). 98collect_timeout_millis=DEFAULT_COLLECT_TIMEOUT_MILLIS 99ifexport_interval_millisisNone:100try:101export_interval_millis=float(102os.environ.get(103sdk_environment_variables.OTEL_METRIC_EXPORT_INTERVAL,DEFAULT_EXPORT_INTERVAL_MILLIS104)105)106exceptValueError:107logger.warning(108f"Found invalid value for export interval. Using default of {DEFAULT_EXPORT_INTERVAL_MILLIS}."109)110export_interval_millis=DEFAULT_EXPORT_INTERVAL_MILLIS111ifexport_timeout_millisisNone:112try:113export_timeout_millis=float(114os.environ.get(sdk_environment_variables.OTEL_METRIC_EXPORT_TIMEOUT,DEFAULT_EXPORT_TIMEOUT_MILLIS)115)116exceptValueError:117logger.warning(118f"Found invalid value for export timeout. Using default of {DEFAULT_EXPORT_TIMEOUT_MILLIS}."119)120export_timeout_millis=DEFAULT_EXPORT_TIMEOUT_MILLIS121self._collect_interval_millis=collect_interval_millis122self._collect_timeout_millis=collect_timeout_millis123self._export_interval_millis=export_interval_millis124self._export_timeout_millis=export_timeout_millis125126self._shutdown_event=threading.Event()127self._shutdown_event_lock=threading.Lock()128self._collect_daemon=None129self._export_daemon=None130if(131self._collect_interval_millis>0132andself._collect_interval_millis<math.inf133andself._export_interval_millis>0134andself._export_interval_millis<math.inf135):136self._init_daemons()137ifhasattr(os,"register_at_fork"):138os.register_at_fork(after_in_child=weakref.WeakMethod(self._init_daemons)())139else:140raiseValueError("Collect and export intervals must be in (0, infinity).")141142def_init_daemons(self)->None:143# Empty the buffers. Prevents duplicate metrics when forking.144withself._collect_metrics_data_lock,self._export_metrics_data_lock:145self._collect_metrics_data,self._export_metrics_data=None,None146147# Create the collect daemon.148self._collect_daemon=threading.Thread(149name="OtelDiperiodicExportingMetricReader._collect_daemon",target=self._collect_daemon_target,daemon=True150)151self._collect_daemon.start()152153# Create the export daemon.154self._export_daemon=threading.Thread(155name="OtelDiperiodicExportingMetricReader._export_daemon",target=self._export_daemon_target,daemon=True156)157self._export_daemon.start()158159def_collect_daemon_target(self)->None:160whilenotself._shutdown_event.wait(timeout=self._collect_interval_millis/10**3):161self._collect_iteration()162163def_export_daemon_target(self)->None:164whilenotself._shutdown_event.wait(timeout=self._export_interval_millis/10**3):165self._export_iteration()166# Final collect + export.167self._collect_iteration()168self._export_iteration()169170# :py:class:`sdk_metrics_export.MetricReader._collect` already exists. Using another name.171def_collect_iteration(self,timeout_millis:Optional[float]=None)->None:172try:173# Inherited from :py:class:``sdk_metrics_export.MetricReader``.174self.collect(timeout_millis=timeout_millisorself._collect_timeout_millis)175exceptsdk_metrics.MetricsTimeoutError:176logger.warning("Metrics collection timed out.",exc_info=True)177exceptException:178logger.exception("Exception while collecting metrics.")179180# Called by :py:meth:`sdk_metrics_export.MetricReader.collect`.181def_receive_metrics(182self,metrics_data:sdk_metrics_export.MetricsData,timeout_millis:float=0,**kwargs183)->None:184withself._collect_metrics_data_lock:185self._collect_metrics_data=sdk_metrics_export.MetricsData(186resource_metrics=(187*(()ifself._collect_metrics_dataisNoneelseself._collect_metrics_data.resource_metrics),188*metrics_data.resource_metrics,189)190)191192def_export_iteration(self,timeout_millis:Optional[float]=None)->None:193withself._export_metrics_data_lock:194withself._collect_metrics_data_lock:195# Rotate the collect + export buffers.196#197# We don't merge the collect buffer into the export buffer to prevent infinite accumulation.198self._collect_metrics_data,self._export_metrics_data=None,self._collect_metrics_data199200ifself._export_metrics_dataisnotNone:201try:202# Export.203self._exporter.export(204metrics_data=self._export_metrics_data,205timeout_millis=timeout_millisorself._export_timeout_millis,206)207exceptsdk_metrics.MetricsTimeoutError:208logger.warning("Metrics export timed out.",exc_info=True)209exceptException:210logger.exception("Exception while exporting metrics.")211finally:212# Immediately empty the export buffer for garbage collection.213self._export_metrics_data=None214