Skip to content

Commit a96e83a

Browse files
authored
fix(lifecycle): spawn tasks on listener (#796)
# What? Make Lifecycle Listener for Universal Instrumentation to spawn tasks to process data # Motivation There is no need to await for it to finish in order to continue, also [SVLS-7464](https://datadoghq.atlassian.net/browse/SVLS-7464) # Notes Still trying to figure out how to handle invocation end [SVLS-7464]: https://datadoghq.atlassian.net/browse/SVLS-7464?atlOrigin=eyJpIjoiNWRkNTljNzYxNjVmNDY3MDlhMDU5Y2ZhYzA5YTRkZjUiLCJwIjoiZ2l0aHViLWNvbS1KU1cifQ
1 parent 38e8e60 commit a96e83a

File tree

9 files changed

+554
-458
lines changed

9 files changed

+554
-458
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
#![deny(missing_copy_implementations)]
1010
#![deny(missing_debug_implementations)]
1111

12-
use bottlecap::traces::trace_processor::SendingTraceProcessor;
1312
#[cfg(not(target_env = "msvc"))]
1413
use tikv_jemallocator::Jemalloc;
1514

@@ -21,8 +20,9 @@ use bottlecap::{
2120
DOGSTATSD_PORT, EXTENSION_ACCEPT_FEATURE_HEADER, EXTENSION_FEATURES, EXTENSION_HOST,
2221
EXTENSION_HOST_IP, EXTENSION_ID_HEADER, EXTENSION_NAME, EXTENSION_NAME_HEADER, EXTENSION_ROUTE,
2322
LAMBDA_RUNTIME_SLUG, TELEMETRY_PORT,
24-
appsec::processor::Error::FeatureDisabled as AppSecFeatureDisabled,
25-
appsec::processor::Processor as AppSecProcessor,
23+
appsec::processor::{
24+
Error::FeatureDisabled as AppSecFeatureDisabled, Processor as AppSecProcessor,
25+
},
2626
base_url,
2727
config::{
2828
self, Config,
@@ -50,14 +50,15 @@ use bottlecap::{
5050
listener::TelemetryListener,
5151
},
5252
traces::{
53+
propagation::DatadogCompositePropagator,
5354
proxy_aggregator,
5455
proxy_flusher::Flusher as ProxyFlusher,
5556
stats_aggregator::StatsAggregator,
5657
stats_flusher::{self, StatsFlusher},
5758
stats_processor, trace_agent,
5859
trace_aggregator::{self, SendDataBuilderInfo},
5960
trace_flusher::{self, ServerlessTraceFlusher, TraceFlusher},
60-
trace_processor,
61+
trace_processor::{self, SendingTraceProcessor},
6162
},
6263
};
6364
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
@@ -514,12 +515,15 @@ async fn extension_loop_active(
514515
&metrics_aggr_handle,
515516
config,
516517
)));
518+
519+
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(config)));
517520
// Lifecycle Invocation Processor
518521
let invocation_processor = Arc::new(TokioMutex::new(InvocationProcessor::new(
519522
Arc::clone(&tags_provider),
520523
Arc::clone(config),
521524
Arc::clone(&aws_config),
522525
metrics_aggr_handle.clone(),
526+
Arc::clone(&propagator),
523527
)));
524528
// AppSec processor (if enabled)
525529
let appsec_processor = match AppSecProcessor::new(config) {
@@ -555,11 +559,12 @@ async fn extension_loop_active(
555559
aws_config,
556560
&invocation_processor,
557561
appsec_processor.as_ref(),
562+
Arc::clone(&propagator),
558563
);
559564

560-
let lifecycle_listener = LifecycleListener {
561-
invocation_processor: Arc::clone(&invocation_processor),
562-
};
565+
let lifecycle_listener =
566+
LifecycleListener::new(Arc::clone(&invocation_processor), Arc::clone(&propagator));
567+
let lifecycle_listener_shutdown_token = lifecycle_listener.get_shutdown_token();
563568
// TODO(astuyve): deprioritize this task after the first request
564569
tokio::spawn(async move {
565570
let res = lifecycle_listener.start().await;
@@ -801,6 +806,7 @@ async fn extension_loop_active(
801806
trace_agent_shutdown_token.cancel();
802807
dogstatsd_cancel_token.cancel();
803808
telemetry_listener_cancel_token.cancel();
809+
lifecycle_listener_shutdown_token.cancel();
804810

805811
// gotta lock here
806812
let mut locked_metrics = metrics_flushers.lock().await;
@@ -1237,6 +1243,7 @@ fn start_api_runtime_proxy(
12371243
aws_config: Arc<AwsConfig>,
12381244
invocation_processor: &Arc<TokioMutex<InvocationProcessor>>,
12391245
appsec_processor: Option<&Arc<TokioMutex<AppSecProcessor>>>,
1246+
propagator: Arc<DatadogCompositePropagator>,
12401247
) -> Option<CancellationToken> {
12411248
if !should_start_proxy(config, Arc::clone(&aws_config)) {
12421249
debug!("Skipping API runtime proxy, no LWA proxy or datadog wrapper found");
@@ -1245,5 +1252,11 @@ fn start_api_runtime_proxy(
12451252

12461253
let invocation_processor = invocation_processor.clone();
12471254
let appsec_processor = appsec_processor.map(Arc::clone);
1248-
interceptor::start(aws_config, invocation_processor, appsec_processor).ok()
1255+
interceptor::start(
1256+
aws_config,
1257+
invocation_processor,
1258+
appsec_processor,
1259+
propagator,
1260+
)
1261+
.ok()
12491262
}

bottlecap/src/http.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
use crate::config;
22
use axum::{
33
extract::{FromRequest, Request},
4-
http::{self, StatusCode},
4+
http::{self, HeaderMap, StatusCode},
55
response::{IntoResponse, Response},
66
};
77
use bytes::Bytes;
88
use core::time::Duration;
99
use datadog_fips::reqwest_adapter::create_reqwest_client_builder;
10-
use std::error::Error;
1110
use std::sync::Arc;
11+
use std::{collections::HashMap, error::Error};
1212
use tracing::error;
1313

1414
#[must_use]
@@ -68,3 +68,16 @@ pub async fn extract_request_body(
6868

6969
Ok((parts, bytes))
7070
}
71+
72+
#[must_use]
73+
pub fn headers_to_map(headers: &HeaderMap) -> HashMap<String, String> {
74+
headers
75+
.iter()
76+
.map(|(k, v)| {
77+
(
78+
k.as_str().to_string(),
79+
v.to_str().unwrap_or_default().to_string(),
80+
)
81+
})
82+
.collect()
83+
}

bottlecap/src/lifecycle/invocation/context.rs

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88
};
99

1010
use datadog_trace_protobuf::pb::Span;
11+
use serde_json::Value;
1112
use tracing::debug;
1213

1314
#[derive(Debug, Clone, PartialEq)]
@@ -127,7 +128,7 @@ pub struct ContextBuffer {
127128

128129
struct UniversalInstrumentationData {
129130
headers: HashMap<String, String>,
130-
payload: Vec<u8>,
131+
payload_value: Value,
131132
}
132133

133134
impl Default for ContextBuffer {
@@ -227,12 +228,14 @@ impl ContextBuffer {
227228
pub fn pair_invoke_event(
228229
&mut self,
229230
request_id: &str,
230-
) -> Option<(HashMap<String, String>, Vec<u8>)> {
231-
if let Some(UniversalInstrumentationData { headers, payload }) =
232-
self.universal_instrumentation_start_events.pop_front()
231+
) -> Option<(HashMap<String, String>, Value)> {
232+
if let Some(UniversalInstrumentationData {
233+
headers,
234+
payload_value,
235+
}) = self.universal_instrumentation_start_events.pop_front()
233236
{
234237
// Bad scenario, we found an `UniversalInstrumentationStart`
235-
Some((headers, payload))
238+
Some((headers, payload_value))
236239
} else {
237240
// `UniversalInstrumentationStart` event hasn't occurred yet, this is good,
238241
// push the Invoke event to the queue and return `None`
@@ -248,7 +251,7 @@ impl ContextBuffer {
248251
pub fn pair_universal_instrumentation_start_event(
249252
&mut self,
250253
headers: &HashMap<String, String>,
251-
payload: &[u8],
254+
payload_value: &Value,
252255
) -> Option<String> {
253256
if let Some(request_id) = self.invoke_events_request_ids.pop_front() {
254257
// Bad scenario, we found an `UniversalInstrumentationStart`
@@ -259,7 +262,7 @@ impl ContextBuffer {
259262
self.universal_instrumentation_start_events
260263
.push_back(UniversalInstrumentationData {
261264
headers: headers.clone(),
262-
payload: payload.to_vec(),
265+
payload_value: payload_value.clone(),
263266
});
264267
None
265268
}
@@ -271,7 +274,7 @@ impl ContextBuffer {
271274
pub fn pair_universal_instrumentation_end_event(
272275
&mut self,
273276
headers: &HashMap<String, String>,
274-
payload: &[u8],
277+
payload_value: &Value,
275278
) -> Option<String> {
276279
if let Some(request_id) = self.platform_runtime_done_events_request_ids.pop_front() {
277280
// Bad scenario, we found a `PlatformRuntimeDone`
@@ -282,7 +285,7 @@ impl ContextBuffer {
282285
self.universal_instrumentation_end_events
283286
.push_back(UniversalInstrumentationData {
284287
headers: headers.clone(),
285-
payload: payload.to_vec(),
288+
payload_value: payload_value.clone(),
286289
});
287290
None
288291
}
@@ -294,12 +297,14 @@ impl ContextBuffer {
294297
pub fn pair_platform_runtime_done_event(
295298
&mut self,
296299
request_id: &str,
297-
) -> Option<(HashMap<String, String>, Vec<u8>)> {
298-
if let Some(UniversalInstrumentationData { headers, payload }) =
299-
self.universal_instrumentation_end_events.pop_front()
300+
) -> Option<(HashMap<String, String>, Value)> {
301+
if let Some(UniversalInstrumentationData {
302+
headers,
303+
payload_value,
304+
}) = self.universal_instrumentation_end_events.pop_front()
300305
{
301306
// Good scenario, we found an `UniversalInstrumentationEnd`
302-
Some((headers, payload))
307+
Some((headers, payload_value))
303308
} else {
304309
// `UniversalInstrumentationEnd` hasn't occurred yet, this is bad,
305310
// push the `PlatformRuntimeDone` event to the queue and return `None`
@@ -425,6 +430,7 @@ impl ContextBuffer {
425430
#[allow(clippy::unwrap_used)]
426431
mod tests {
427432
use crate::proc::{CPUData, NetworkData};
433+
use serde_json::json;
428434
use std::collections::HashMap;
429435
use tokio::sync::watch;
430436

@@ -619,14 +625,14 @@ mod tests {
619625
let request_id = "test-request-1".to_string();
620626
let mut headers = HashMap::new();
621627
headers.insert("test-header".to_string(), "test-value".to_string());
622-
let payload = vec![1, 2, 3];
628+
let payload = json!({ "test": "payload" });
623629

624630
// Test case 1: UniversalInstrumentationStart arrives before Invoke
625631
buffer
626632
.universal_instrumentation_start_events
627633
.push_back(UniversalInstrumentationData {
628634
headers: headers.clone(),
629-
payload: payload.clone(),
635+
payload_value: payload.clone(),
630636
});
631637

632638
// When Invoke arrives, it should pair with the existing UniversalInstrumentationStart
@@ -653,7 +659,7 @@ mod tests {
653659
let request_id = "test-request-1".to_string();
654660
let mut headers = HashMap::new();
655661
headers.insert("test-header".to_string(), "test-value".to_string());
656-
let payload = vec![1, 2, 3];
662+
let payload = json!({ "test": "payload" });
657663

658664
// Test case 1: Invoke arrives before UniversalInstrumentationStart
659665
buffer
@@ -674,7 +680,7 @@ mod tests {
674680
.front()
675681
.unwrap();
676682
assert_eq!(stored_event.headers, headers);
677-
assert_eq!(stored_event.payload, payload);
683+
assert_eq!(stored_event.payload_value, payload);
678684
}
679685

680686
#[test]
@@ -683,7 +689,7 @@ mod tests {
683689
let request_id = "test-request-1".to_string();
684690
let mut headers = HashMap::new();
685691
headers.insert("test-header".to_string(), "test-value".to_string());
686-
let payload = vec![1, 2, 3];
692+
let payload = json!({ "test": "payload" });
687693

688694
// Test case 1: PlatformRuntimeDone arrives before UniversalInstrumentationEnd
689695
buffer
@@ -701,7 +707,7 @@ mod tests {
701707
assert_eq!(buffer.universal_instrumentation_end_events.len(), 1);
702708
let stored_event = buffer.universal_instrumentation_end_events.front().unwrap();
703709
assert_eq!(stored_event.headers, headers);
704-
assert_eq!(stored_event.payload, payload);
710+
assert_eq!(stored_event.payload_value, payload);
705711
}
706712

707713
#[test]
@@ -710,14 +716,14 @@ mod tests {
710716
let request_id = "test-request-1".to_string();
711717
let mut headers = HashMap::new();
712718
headers.insert("test-header".to_string(), "test-value".to_string());
713-
let payload = vec![1, 2, 3];
719+
let payload = json!({ "test": "payload" });
714720

715721
// Test case 1: UniversalInstrumentationEnd arrives before PlatformRuntimeDone
716722
buffer
717723
.universal_instrumentation_end_events
718724
.push_back(UniversalInstrumentationData {
719725
headers: headers.clone(),
720-
payload: payload.clone(),
726+
payload_value: payload.clone(),
721727
});
722728

723729
// When PlatformRuntimeDone arrives, it should pair with the existing UniversalInstrumentationEnd
@@ -747,15 +753,15 @@ mod tests {
747753
let request_id = "test-request-1".to_string();
748754
let mut headers = HashMap::new();
749755
headers.insert("test-header".to_string(), "test-value".to_string());
750-
let payload = vec![1, 2, 3];
756+
let payload = json!({ "test": "payload" });
751757

752758
// Test the complete flow with events arriving in different orders
753759
// Case 1: Events arrive in reverse order
754760
buffer
755761
.universal_instrumentation_end_events
756762
.push_back(UniversalInstrumentationData {
757763
headers: headers.clone(),
758-
payload: payload.clone(),
764+
payload_value: payload.clone(),
759765
});
760766
buffer
761767
.platform_runtime_done_events_request_ids

0 commit comments

Comments
 (0)