@@ -37,6 +37,7 @@ use bottlecap::{
37
37
} ,
38
38
logger,
39
39
logs:: { agent:: LogsAgent , flusher:: LogsFlusher } ,
40
+ metrics:: enhanced:: lambda:: Lambda as enhanced_metrics,
40
41
otlp:: { agent:: Agent as OtlpAgent , should_enable_otlp_agent} ,
41
42
proxy:: { interceptor, should_start_proxy} ,
42
43
secrets:: decrypt,
@@ -86,9 +87,7 @@ use std::{
86
87
collections:: { HashMap , hash_map} ,
87
88
env,
88
89
io:: { Error , Result } ,
89
- os:: unix:: process:: CommandExt ,
90
90
path:: Path ,
91
- process:: Command ,
92
91
sync:: Arc ,
93
92
time:: { Duration , Instant } ,
94
93
} ;
@@ -404,14 +403,7 @@ fn load_configs(start_time: Instant) -> (AwsConfig, AwsCredentials, Arc<Config>)
404
403
let aws_credentials = AwsCredentials :: from_env ( ) ;
405
404
let lambda_directory: String =
406
405
env:: var ( "LAMBDA_TASK_ROOT" ) . unwrap_or_else ( |_| "/var/task" . to_string ( ) ) ;
407
- let config = match config:: get_config ( Path :: new ( & lambda_directory) ) {
408
- Ok ( config) => Arc :: new ( config) ,
409
- Err ( _e) => {
410
- let err = Command :: new ( "/opt/datadog-agent-go" ) . exec ( ) ;
411
- panic ! ( "Error starting the extension: {err:?}" ) ;
412
- }
413
- } ;
414
-
406
+ let config = Arc :: new ( config:: get_config ( Path :: new ( & lambda_directory) ) ) ;
415
407
( aws_config, aws_credentials, config)
416
408
}
417
409
@@ -510,12 +502,22 @@ async fn extension_loop_active(
510
502
) ;
511
503
start_dogstatsd_aggregator ( metrics_aggr_service) ;
512
504
505
+ let metrics_intake_url = create_metrics_intake_url_prefix ( config) ;
513
506
let metrics_flushers = Arc :: new ( TokioMutex :: new ( start_metrics_flushers (
514
507
Arc :: clone ( & api_key_factory) ,
515
508
& metrics_aggr_handle,
509
+ & metrics_intake_url,
516
510
config,
517
511
) ) ) ;
518
512
513
+ // Create lambda enhanced metrics instance once
514
+ let lambda_enhanced_metrics =
515
+ enhanced_metrics:: new ( metrics_aggr_handle. clone ( ) , Arc :: clone ( config) ) ;
516
+
517
+ // Send config issue metrics
518
+ let config_issues = config:: fallback ( config) ;
519
+ send_config_issue_metric ( & config_issues, & lambda_enhanced_metrics) ;
520
+
519
521
let propagator = Arc :: new ( DatadogCompositePropagator :: new ( Arc :: clone ( config) ) ) ;
520
522
// Lifecycle Invocation Processor
521
523
let invocation_processor = Arc :: new ( TokioMutex :: new ( InvocationProcessor :: new (
@@ -1018,33 +1020,33 @@ fn start_logs_agent(
1018
1020
( logs_agent_channel, logs_flusher)
1019
1021
}
1020
1022
1021
- fn start_metrics_flushers (
1022
- api_key_factory : Arc < ApiKeyFactory > ,
1023
- metrics_aggr_handle : & MetricsAggregatorHandle ,
1024
- config : & Arc < Config > ,
1025
- ) -> Vec < MetricsFlusher > {
1026
- let mut flushers = Vec :: new ( ) ;
1027
-
1028
- let metrics_intake_url = if !config. dd_url . is_empty ( ) {
1023
+ fn create_metrics_intake_url_prefix ( config : & Config ) -> MetricsIntakeUrlPrefix {
1024
+ if !config. dd_url . is_empty ( ) {
1029
1025
let dd_dd_url = DdDdUrl :: new ( config. dd_url . clone ( ) ) . expect ( "can't parse DD_DD_URL" ) ;
1030
-
1031
1026
let prefix_override = MetricsIntakeUrlPrefixOverride :: maybe_new ( None , Some ( dd_dd_url) ) ;
1032
- MetricsIntakeUrlPrefix :: new ( None , prefix_override)
1027
+ MetricsIntakeUrlPrefix :: new ( None , prefix_override) . expect ( "can't parse DD_DD_URL prefix" )
1033
1028
} else if !config. url . is_empty ( ) {
1034
1029
let dd_url = DdUrl :: new ( config. url . clone ( ) ) . expect ( "can't parse DD_URL" ) ;
1035
-
1036
1030
let prefix_override = MetricsIntakeUrlPrefixOverride :: maybe_new ( Some ( dd_url) , None ) ;
1037
- MetricsIntakeUrlPrefix :: new ( None , prefix_override)
1031
+ MetricsIntakeUrlPrefix :: new ( None , prefix_override) . expect ( "can't parse DD_URL prefix" )
1038
1032
} else {
1039
- // use site
1040
1033
let metrics_site = MetricsSite :: new ( config. site . clone ( ) ) . expect ( "can't parse site" ) ;
1041
- MetricsIntakeUrlPrefix :: new ( Some ( metrics_site) , None )
1042
- } ;
1034
+ MetricsIntakeUrlPrefix :: new ( Some ( metrics_site) , None ) . expect ( "can't parse site prefix" )
1035
+ }
1036
+ }
1037
+
1038
+ fn start_metrics_flushers (
1039
+ api_key_factory : Arc < ApiKeyFactory > ,
1040
+ metrics_aggr_handle : & MetricsAggregatorHandle ,
1041
+ metrics_intake_url : & MetricsIntakeUrlPrefix ,
1042
+ config : & Arc < Config > ,
1043
+ ) -> Vec < MetricsFlusher > {
1044
+ let mut flushers = Vec :: new ( ) ;
1043
1045
1044
1046
let flusher_config = MetricsFlusherConfig {
1045
1047
api_key_factory,
1046
1048
aggregator_handle : metrics_aggr_handle. clone ( ) ,
1047
- metrics_intake_url_prefix : metrics_intake_url. expect ( "can't parse site or override" ) ,
1049
+ metrics_intake_url_prefix : metrics_intake_url. clone ( ) ,
1048
1050
https_proxy : config. proxy_https . clone ( ) ,
1049
1051
timeout : Duration :: from_secs ( config. flush_timeout ) ,
1050
1052
retry_strategy : DsdRetryStrategy :: Immediate ( 3 ) ,
@@ -1169,6 +1171,28 @@ fn start_trace_agent(
1169
1171
)
1170
1172
}
1171
1173
1174
+ /// Sends metrics indicating issue with configuration.
1175
+ ///
1176
+ /// # Arguments
1177
+ /// * `issue_reasons` - Vector of messages describing the issue with the configurations
1178
+ /// * `lambda_enhanced_metrics` - The lambda enhanced metrics instance
1179
+ fn send_config_issue_metric ( issue_reasons : & [ String ] , lambda_enhanced_metrics : & enhanced_metrics ) {
1180
+ if issue_reasons. is_empty ( ) {
1181
+ return ;
1182
+ }
1183
+ let now = std:: time:: UNIX_EPOCH
1184
+ . elapsed ( )
1185
+ . expect ( "can't poll clock" )
1186
+ . as_secs ( )
1187
+ . try_into ( )
1188
+ . unwrap_or_default ( ) ;
1189
+
1190
+ // Setup a separate metric for each config issue reason
1191
+ for issue_reason in issue_reasons {
1192
+ lambda_enhanced_metrics. set_config_load_issue_metric ( now, issue_reason) ;
1193
+ }
1194
+ }
1195
+
1172
1196
fn start_dogstatsd_aggregator ( aggr_service : MetricsAggregatorService ) {
1173
1197
tokio:: spawn ( async move {
1174
1198
aggr_service. run ( ) . await ;
0 commit comments