Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 51 additions & 27 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use bottlecap::{
},
logger,
logs::{agent::LogsAgent, flusher::LogsFlusher},
metrics::enhanced::lambda::Lambda as enhanced_metrics,
otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent},
proxy::{interceptor, should_start_proxy},
secrets::decrypt,
Expand Down Expand Up @@ -86,9 +87,7 @@ use std::{
collections::{HashMap, hash_map},
env,
io::{Error, Result},
os::unix::process::CommandExt,
path::Path,
process::Command,
sync::Arc,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -404,14 +403,7 @@ fn load_configs(start_time: Instant) -> (AwsConfig, AwsCredentials, Arc<Config>)
let aws_credentials = AwsCredentials::from_env();
let lambda_directory: String =
env::var("LAMBDA_TASK_ROOT").unwrap_or_else(|_| "/var/task".to_string());
let config = match config::get_config(Path::new(&lambda_directory)) {
Ok(config) => Arc::new(config),
Err(_e) => {
let err = Command::new("/opt/datadog-agent-go").exec();
panic!("Error starting the extension: {err:?}");
}
};

let config = Arc::new(config::get_config(Path::new(&lambda_directory)));
(aws_config, aws_credentials, config)
}

Expand Down Expand Up @@ -510,19 +502,29 @@ async fn extension_loop_active(
);
start_dogstatsd_aggregator(metrics_aggr_service);

let metrics_intake_url = create_metrics_intake_url_prefix(config);
let metrics_flushers = Arc::new(TokioMutex::new(start_metrics_flushers(
Arc::clone(&api_key_factory),
&metrics_aggr_handle,
&metrics_intake_url,
config,
)));

// Create lambda enhanced metrics instance once
let lambda_enhanced_metrics =
enhanced_metrics::new(metrics_aggr_handle.clone(), Arc::clone(config));

// Send config issue metrics
let config_issues = config::inspect_config(config);
send_config_issue_metric(&config_issues, &lambda_enhanced_metrics);

let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(config)));
// Lifecycle Invocation Processor
let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new(
Arc::clone(&tags_provider),
Arc::clone(config),
Arc::clone(&aws_config),
metrics_aggr_handle.clone(),
lambda_enhanced_metrics,
Arc::clone(&propagator),
)));
// AppSec processor (if enabled)
Expand Down Expand Up @@ -1018,33 +1020,33 @@ fn start_logs_agent(
(logs_agent_channel, logs_flusher)
}

fn start_metrics_flushers(
api_key_factory: Arc<ApiKeyFactory>,
metrics_aggr_handle: &MetricsAggregatorHandle,
config: &Arc<Config>,
) -> Vec<MetricsFlusher> {
let mut flushers = Vec::new();

let metrics_intake_url = if !config.dd_url.is_empty() {
fn create_metrics_intake_url_prefix(config: &Config) -> MetricsIntakeUrlPrefix {
if !config.dd_url.is_empty() {
let dd_dd_url = DdDdUrl::new(config.dd_url.clone()).expect("can't parse DD_DD_URL");

let prefix_override = MetricsIntakeUrlPrefixOverride::maybe_new(None, Some(dd_dd_url));
MetricsIntakeUrlPrefix::new(None, prefix_override)
MetricsIntakeUrlPrefix::new(None, prefix_override).expect("can't parse DD_DD_URL prefix")
} else if !config.url.is_empty() {
let dd_url = DdUrl::new(config.url.clone()).expect("can't parse DD_URL");

let prefix_override = MetricsIntakeUrlPrefixOverride::maybe_new(Some(dd_url), None);
MetricsIntakeUrlPrefix::new(None, prefix_override)
MetricsIntakeUrlPrefix::new(None, prefix_override).expect("can't parse DD_URL prefix")
} else {
// use site
let metrics_site = MetricsSite::new(config.site.clone()).expect("can't parse site");
MetricsIntakeUrlPrefix::new(Some(metrics_site), None)
};
MetricsIntakeUrlPrefix::new(Some(metrics_site), None).expect("can't parse site prefix")
}
}

fn start_metrics_flushers(
api_key_factory: Arc<ApiKeyFactory>,
metrics_aggr_handle: &MetricsAggregatorHandle,
metrics_intake_url: &MetricsIntakeUrlPrefix,
config: &Arc<Config>,
) -> Vec<MetricsFlusher> {
let mut flushers = Vec::new();

let flusher_config = MetricsFlusherConfig {
api_key_factory,
aggregator_handle: metrics_aggr_handle.clone(),
metrics_intake_url_prefix: metrics_intake_url.expect("can't parse site or override"),
metrics_intake_url_prefix: metrics_intake_url.clone(),
https_proxy: config.proxy_https.clone(),
timeout: Duration::from_secs(config.flush_timeout),
retry_strategy: DsdRetryStrategy::Immediate(3),
Expand Down Expand Up @@ -1169,6 +1171,28 @@ fn start_trace_agent(
)
}

/// Sends metrics indicating issue with configuration.
///
/// # Arguments
/// * `issue_reasons` - Vector of messages describing the issue with the configurations
/// * `lambda_enhanced_metrics` - The lambda enhanced metrics instance
fn send_config_issue_metric(issue_reasons: &[String], lambda_enhanced_metrics: &enhanced_metrics) {
if issue_reasons.is_empty() {
return;
}
let now = std::time::UNIX_EPOCH
.elapsed()
.expect("can't poll clock")
.as_secs()
.try_into()
.unwrap_or_default();

// Setup a separate metric for each config issue reason
for issue_reason in issue_reasons {
lambda_enhanced_metrics.set_config_load_issue_metric(now, issue_reason);
}
}

fn start_dogstatsd_aggregator(aggr_service: MetricsAggregatorService) {
tokio::spawn(async move {
aggr_service.run().await;
Expand Down
Loading
Loading