@@ -71,19 +71,15 @@ def __init__(
7171 timeout : int = 30 ,
7272 tls_config : Dict = None ,
7373 proxies : Dict = None ,
74+ resources_as_labels : bool = True ,
7475 ):
7576 self .endpoint = endpoint
7677 self .basic_auth = basic_auth
7778 self .headers = headers
7879 self .timeout = timeout
7980 self .tls_config = tls_config
8081 self .proxies = proxies
81-
82- self .converter_map = {
83- Sum : self ._convert_from_sum ,
84- Histogram : self ._convert_from_histogram ,
85- Gauge : self ._convert_from_gauge ,
86- }
82+ self .resources_as_labels = resources_as_labels
8783
8884
8985 @property
@@ -166,71 +162,73 @@ def headers(self, headers: Dict):
166162 self ._headers = headers
167163
168164 def export (
169- self , export_records
165+ self ,metrics_data : MetricsData
170166 ) -> MetricExportResult :
171- if not export_records :
172- return MetricsExportResult .SUCCESS
173- timeseries = self ._convert_to_timeseries ( export_records )
167+ if not metrics_data :
168+ return MetricExportResult .SUCCESS
169+ timeseries = self ._translate_data ( metrics_data )
174170 if not timeseries :
175171 logger .error (
176172 "All records contain unsupported aggregators, export aborted"
177173 )
178- return MetricsExportResult .FAILURE
174+ return MetricExportResult .FAILURE
179175 message = self ._build_message (timeseries )
180176 headers = self ._build_headers ()
181177 return self ._send_message (message , headers )
182178
183179 def shutdown (self ) -> None :
184180 pass
185181
186- def _translate_data (self , data : MetricsData ):
182+ def _translate_data (self , data : MetricsData ) -> Sequence [ TimeSeries ] :
187183 rw_timeseries = []
188184
189185 for resource_metrics in data .resource_metrics :
190186 resource = resource_metrics .resource
191187 # OTLP Data model suggests combining some attrs into job/instance
192188 # Should we do that here?
193- resource_labels = self ._get_resource_labels (resource .attributes )
189+ if self .resources_as_labels :
190+ resource_labels = [ (n ,str (v )) for n ,v in resource .attributes .items () ]
191+ else :
192+ resource_labels = []
194193 # Scope name/version probably not too useful from a labeling perspective
195194 for scope_metrics in resource_metrics .scope_metrics :
196195 for metric in scope_metrics .metrics :
197196 rw_timeseries .extend ( self ._parse_metric (metric ,resource_labels ) )
198-
199- def _get_resource_labels (self ,attrs ):
200- """ Converts Resource Attributes to Prometheus Labels based on
201- OTLP Metric Data Model's recommendations on Resource Attributes
202- """
203- return [ (n ,str (v )) for n ,v in resource .attributes .items () ]
197+ return rw_timeseries
204198
205199 def _parse_metric (self , metric : Metric , resource_labels : Sequence ) -> Sequence [TimeSeries ]:
206200 """
207201 Parses the Metric & lower objects, then converts the output into
208202 OM TimeSeries. Returns a List of TimeSeries objects based on one Metric
209203 """
204+
205+
206+ # Create the metric name, will be a label later
207+ if metric .unit :
208+ #Prom. naming guidelines add unit to the name
209+ name = f"{ metric .name } _{ metric .unit } "
210+ else :
211+ name = metric .name
212+
210213 # datapoints have attributes associated with them. these would be sent
211214 # to RW as different metrics: name & labels is a unique time series
212215 sample_sets = defaultdict (list )
213216 if isinstance (metric .data ,(Gauge ,Sum )):
214217 for dp in metric .data .data_points :
215- attrs ,sample = self ._parse_data_point (dp )
218+ attrs ,sample = self ._parse_data_point (dp , name )
216219 sample_sets [attrs ].append (sample )
217- elif isinstance (metric .data ,(HistogramType )):
218- raise NotImplementedError ("Coming sooN!" )
220+ elif isinstance (metric .data ,Histogram ):
221+ for dp in metric .data .data_points :
222+ dp_result = self ._parse_histogram_data_point (dp ,name )
223+ for attrs ,sample in dp_result :
224+ sample_sets [attrs ].append (sample )
219225 else :
220226 logger .warn ("Unsupported Metric Type: %s" ,type (metric .data ))
221227 return []
222228
223- # Create the metric name, will be a label later
224- if metric .unit :
225- #Prom. naming guidelines add unit to the name
226- name = f"{ metric .name } _{ metric .unit } "
227- else :
228- name = metric .name
229-
230229 timeseries = []
231230 for labels , samples in sample_sets .items ():
232231 ts = TimeSeries ()
233- ts .labels .append (self ._label ("__name__" ,name ))
234232 for label_name ,label_value in chain (resource_labels ,labels ):
235233 # Previous implementation did not str() the names...
236234 ts .labels .append (self ._label (label_name ,str (label_value )))
@@ -239,23 +237,61 @@ def _parse_metric(self, metric: Metric, resource_labels: Sequence) -> Sequence[T
239237 timeseries .append (ts )
240238 return timeseries
241239
242- def _sample (self ,value ,timestamp :int ):
240+ def _sample (self ,value : int ,timestamp :int ) -> Sample :
243241 sample = Sample ()
244242 sample .value = value
245243 sample .timestamp = timestamp
246244 return sample
247245
248- def _label (self ,name :str ,value :str ):
246+ def _label (self ,name :str ,value :str ) -> Label :
249247 label = Label ()
250248 label .name = name
251249 label .value = value
252250 return label
253251
254- def _parse_data_point (self , data_point ):
252+ def _parse_histogram_data_point (self , data_point , name ):
253+
254+ #if (len(data_point.explicit_bounds)+1) != len(data_point.bucket_counts):
255+ # raise ValueError("Number of buckets must be 1 more than the explicit bounds!")
256+
257+ sample_attr_pairs = []
258+
259+ base_attrs = [(n ,v ) for n ,v in data_point .attributes .items ()]
260+ timestamp = data_point .time_unix_nano // 1_000_000
261+
262+
263+ def handle_bucket (value ,bound = None ,name_override = None ):
264+ # Metric Level attributes + the bucket boundry attribute + name
265+ ts_attrs = base_attrs .copy ()
266+ ts_attrs .append (("__name__" ,name_override or name ))
267+ if bound :
268+ ts_attrs .append (("le" ,str (bound )))
269+ # Value is count of values in each bucket
270+ ts_sample = (value ,timestamp )
271+ return tuple (ts_attrs ), ts_sample
255272
256- attrs = tuple (data_point .attributes .items ())
257- #TODO: Optimize? create Sample here
258- # remote write time is in milliseconds
273+ for bound_pos ,bound in enumerate (data_point .explicit_bounds ):
274+ sample_attr_pairs .append (
275+ handle_bucket (data_point .bucket_counts [bound_pos ],bound )
276+ )
277+
278+ # Add the last label for implicit +inf bucket
279+ sample_attr_pairs .append (
280+ handle_bucket (data_point .bucket_counts [- 1 ],bound = "+Inf" )
281+ )
282+
283+ #Lastly, add series for count & sum
284+ sample_attr_pairs .append (
285+ handle_bucket (data_point .sum ,name_override = f"{ name } _sum" )
286+ )
287+ sample_attr_pairs .append (
288+ handle_bucket (data_point .count ,name_override = f"{ name } _count" )
289+ )
290+ return sample_attr_pairs
291+
292+ def _parse_data_point (self , data_point ,name = None ):
293+
294+ attrs = tuple (data_point .attributes .items ()) + (("__name__" ,name ),)
259295 sample = (data_point .value ,(data_point .time_unix_nano // 1_000_000 ))
260296 return attrs ,sample
261297
@@ -275,27 +311,17 @@ def _convert_to_timeseries(
275311 )
276312 return timeseries
277313
278- def _convert_from_sum (
279- self , sum_record
280- ) -> Sequence [TimeSeries ]:
281- return [
282- self ._create_timeseries (
283- sum_record ,
284- sum_record .instrument .name + "_sum" ,
285- sum_record .aggregator .checkpoint ,
286- )
287- ]
288-
289- def _convert_from_gauge (self , gauge_record ):
290- raise NotImplementedError ("Do this" )
291-
292314 def _convert_from_histogram (
293- self , histogram_record
315+ self , histogram : Histogram ,
294316 ) -> Sequence [TimeSeries ]:
295- timeseries = []
296- for bound in histogram_record .aggregator .checkpoint .keys ():
317+ sample_sets = defaultdict (list )
318+
319+ base_attrs = [self ._label (n ,v ) for n ,v in histogram .attributes ]
320+ for bound in histogram .explicit_bounds :
297321 bound_str = "+Inf" if bound == float ("inf" ) else str (bound )
298- value = histogram_record .aggregator .checkpoint [bound ]
322+ # General attributes apply
323+ ts_attrs = base_attrs .copy .append (self ._label ("le" ,str (bound )))
324+ sample_sets [attrs ].append (sample )
299325 timeseries .append (
300326 self ._create_timeseries (
301327 histogram_record ,
@@ -411,5 +437,5 @@ def _send_message(
411437 response .raise_for_status ()
412438 except requests .exceptions .RequestException as err :
413439 logger .error ("Export POST request failed with reason: %s" , err )
414- return MetricsExportResult .FAILURE
415- return MetricsExportResult .SUCCESS
440+ return MetricExportResult .FAILURE
441+ return MetricExportResult .SUCCESS
0 commit comments