diff --git a/Cargo.toml b/Cargo.toml index 2ba5bb57a..0809f5213 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,4 +53,4 @@ x509-parser = "0.16.0" [profile.release] lto = true codegen-units = 1 -debug = true +debug = true \ No newline at end of file diff --git a/bin/Cargo.toml b/bin/Cargo.toml index 15844b3b3..16ed535b7 100644 --- a/bin/Cargo.toml +++ b/bin/Cargo.toml @@ -53,6 +53,7 @@ default = ["jemallocator"] unstable = [] logs-debug = ["sozu-lib/logs-debug", "sozu-command-lib/logs-debug"] logs-trace = ["sozu-lib/logs-trace", "sozu-command-lib/logs-trace"] +opentelemetry = ["sozu-lib/opentelemetry"] tolerant-http1-parser = ["sozu-lib/tolerant-http1-parser"] [badges] diff --git a/command/src/command.proto b/command/src/command.proto index ad93b9e34..31b13424b 100644 --- a/command/src/command.proto +++ b/command/src/command.proto @@ -753,6 +753,12 @@ message InitialState { repeated WorkerRequest requests = 1; } +message OpenTelemetry { + required string trace_id = 1; + required string span_id = 2; + optional string parent_span_id = 3; +} + // An access log, meant to be passed to another agent message ProtobufAccessLog { // error message if any @@ -795,6 +801,8 @@ message ProtobufAccessLog { optional uint64 request_time = 20; // time for the backend to respond (microseconds) optional uint64 response_time = 21; + // OpenTelemetry tracing information + optional OpenTelemetry otel = 22; } message ProtobufEndpoint { diff --git a/command/src/logging/access_logs.rs b/command/src/logging/access_logs.rs index 0e57c0cdb..78bad747c 100644 --- a/command/src/logging/access_logs.rs +++ b/command/src/logging/access_logs.rs @@ -1,7 +1,8 @@ -use std::{collections::BTreeMap, mem::ManuallyDrop, net::SocketAddr, time::Duration}; - use rusty_ulid::Ulid; +use std::fmt::Formatter; +use std::{collections::BTreeMap, fmt, mem::ManuallyDrop, net::SocketAddr, time::Duration}; +use crate::proto::command::OpenTelemetry as ProtobufOpenTelemetry; use crate::{ logging::{LogLevel, Rfc3339Time}, proto::command::{ @@ -98,6 +99,26 @@ pub struct FullTags<'a> { pub user_agent: Option<&'a str>, } +#[derive(Default)] +pub struct OpenTelemetry { + pub trace_id: [u8; 32], + pub span_id: [u8; 16], + pub parent_span_id: Option<[u8; 16]>, +} + +impl fmt::Debug for OpenTelemetry { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + let trace_id = unsafe { std::str::from_utf8_unchecked(&self.trace_id) }; + let span_id = unsafe { std::str::from_utf8_unchecked(&self.span_id) }; + let parent_span_id = self + .parent_span_id + .as_ref() + .map(|id| unsafe { std::str::from_utf8_unchecked(id) }) + .unwrap_or("-"); + write!(f, "{trace_id} {span_id} {parent_span_id}") + } +} + /// Intermediate representation of an access log agnostic of the final format. /// Every field is a reference to avoid capturing ownership (as a logger should). pub struct RequestRecord<'a> { @@ -118,6 +139,7 @@ pub struct RequestRecord<'a> { pub request_time: Duration, pub bytes_in: usize, pub bytes_out: usize, + pub otel: Option<&'a OpenTelemetry>, // added by the logger itself pub pid: i32, @@ -182,6 +204,14 @@ impl RequestRecord<'_> { tag: self.tag.duplicate(), time: self.precise_time.into(), request_time: Some(self.request_time.as_micros() as u64), + otel: self.otel.map(|otel| ProtobufOpenTelemetry { + trace_id: std::str::from_utf8_unchecked(&otel.trace_id).duplicate(), + span_id: std::str::from_utf8_unchecked(&otel.span_id).duplicate(), + parent_span_id: otel + .parent_span_id + .as_ref() + .map(|id| std::str::from_utf8_unchecked(id).duplicate()), + }), }) } } diff --git a/command/src/logging/logs.rs b/command/src/logging/logs.rs index d0b231ebb..6d0b5f006 100644 --- a/command/src/logging/logs.rs +++ b/command/src/logging/logs.rs @@ -276,7 +276,7 @@ impl InnerLogger { log.tag, ], standard: { - formats: ["{} {} {} {}/{}/{}/{}/{} {} {} [{}] {} {}{}\n"], + formats: ["{} {} {} {}/{}/{}/{}/{} {} {} [{}] {:?} {} {}{}\n"], args: [ log.context, log.session_address.as_string_or("-"), @@ -289,13 +289,14 @@ impl InnerLogger { log.bytes_in, log.bytes_out, log.full_tags(), + log.otel, log.protocol, log.endpoint, LogMessage(log.message), ] }, colored: { - formats: ["\x1b[;1m{}\x1b[m {} {} {}/{}/{}/{}/{} {} {} \x1b[2m[{}] \x1b[;1m{} {:#}\x1b[m{}\n"], + formats: ["\x1b[;1m{}\x1b[m {} {} {}/{}/{}/{}/{} {} {} \x1b[2m[{}] {:?} \x1b[;1m{} {:#}\x1b[m{}\n"], args: @, } }, diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 74e0cb931..038e8ddeb 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -68,10 +68,10 @@ tiny_http = { workspace = true } default = ["simd"] logs-debug = [] logs-trace = [] +opentelemetry = [] simd = ["kawa/simd"] splice = [] tolerant-http1-parser = ["kawa/tolerant-parsing"] -unstable = [] [badges] travis-ci = { repository = "sozu-proxy/sozu" } diff --git a/lib/src/protocol/kawa_h1/editor.rs b/lib/src/protocol/kawa_h1/editor.rs index 202bab059..a94dc049d 100644 --- a/lib/src/protocol/kawa_h1/editor.rs +++ b/lib/src/protocol/kawa_h1/editor.rs @@ -1,17 +1,67 @@ +use crate::{ + pool::Checkout, + protocol::http::{parser::compare_no_case, GenericHttpStream, Method}, + Protocol, +}; +use rusty_ulid::Ulid; +use sozu_command_lib::logging::LogContext; use std::{ net::{IpAddr, SocketAddr}, str::{from_utf8, from_utf8_unchecked}, }; -use rusty_ulid::Ulid; +#[cfg(feature = "opentelemetry")] +fn parse_traceparent(val: &kawa::Store, buf: &[u8]) -> Option<([u8; 32], [u8; 16])> { + let val = val.data(buf); + let (version, val) = parse_hex::<2>(val)?; + if version.as_slice() != b"00" { + return None; + } + let val = skip_separator(val)?; + let (trace_id, val) = parse_hex::<32>(val)?; + let val = skip_separator(val)?; + let (parent_id, val) = parse_hex::<16>(val)?; + let val = skip_separator(val)?; + let (_, val) = parse_hex::<2>(val)?; + val.is_empty().then_some((trace_id, parent_id)) +} -use crate::{ - pool::Checkout, - protocol::http::{parser::compare_no_case, GenericHttpStream, Method}, - Protocol, -}; +#[cfg(feature = "opentelemetry")] +fn parse_hex(buf: &[u8]) -> Option<([u8; N], &[u8])> { + let val: [u8; N] = buf.get(..N)?.try_into().unwrap(); + val.iter() + .all(|c| c.is_ascii_hexdigit()) + .then_some((val, &buf[N..])) +} -use sozu_command_lib::logging::LogContext; +#[cfg(feature = "opentelemetry")] +fn skip_separator(buf: &[u8]) -> Option<&[u8]> { + buf.first().filter(|b| **b == b'-').map(|_| &buf[1..]) +} + +#[cfg(feature = "opentelemetry")] +fn random_id() -> [u8; N] { + use rand::Rng; + const CHARSET: &[u8] = b"0123456789abcdef"; + let mut rng = rand::thread_rng(); + let mut buf = [0; N]; + buf.fill_with(|| { + let n = rng.gen_range(0..CHARSET.len()); + CHARSET[n] + }); + buf +} + +#[cfg(feature = "opentelemetry")] +fn build_traceparent(trace_id: &[u8; 32], parent_id: &[u8; 16]) -> [u8; 55] { + let mut buf = [0; 55]; + buf[..3].copy_from_slice(b"00-"); + buf[3..35].copy_from_slice(trace_id); + buf[35] = b'-'; + buf[36..52].copy_from_slice(parent_id); + buf[52..55].copy_from_slice(b"-01"); + buf +} /// This is the container used to store and use information about the session from within a Kawa parser callback #[derive(Debug)] @@ -37,6 +87,9 @@ pub struct HttpContext { // ---------- Additional optional data pub user_agent: Option, + #[cfg(feature = "opentelemetry")] + pub otel: Option, + // ========== Read only /// signals wether Kawa should write a "Connection" header with a "close" value (request and response) pub closing: bool, @@ -67,6 +120,41 @@ impl kawa::h1::ParserCallbacks for HttpContext { } impl HttpContext { + /// Creates a new instance + pub fn new( + request_id: Ulid, + protocol: Protocol, + public_address: SocketAddr, + session_address: Option, + sticky_name: String, + ) -> Self { + Self { + id: request_id, + backend_id: None, + cluster_id: None, + + closing: false, + keep_alive_backend: true, + keep_alive_frontend: true, + protocol, + public_address, + session_address, + sticky_name, + sticky_session: None, + sticky_session_found: None, + + method: None, + authority: None, + path: None, + status: None, + reason: None, + user_agent: None, + + #[cfg(feature = "opentelemetry")] + otel: Default::default(), + } + } + /// Callback for request: /// /// - edit headers (connection, forwarded, sticky cookie, sozu-id) @@ -78,7 +166,7 @@ impl HttpContext { /// - sticky cookie /// - user-agent fn on_request_headers(&mut self, request: &mut GenericHttpStream) { - let buf = &mut request.storage.mut_buffer(); + let buf = request.storage.mut_buffer(); // Captures the request line if let kawa::StatusLine::Request { @@ -135,6 +223,10 @@ impl HttpContext { let mut has_x_port = false; let mut has_x_proto = false; let mut has_connection = false; + #[cfg(feature = "opentelemetry")] + let mut traceparent: Option<&mut kawa::Pair> = None; + #[cfg(feature = "opentelemetry")] + let mut tracestate: Option<&mut kawa::Pair> = None; for block in &mut request.blocks { match block { kawa::Block::Header(header) if !header.is_elided() => { @@ -182,12 +274,48 @@ impl HttpContext { .data_opt(buf) .and_then(|data| from_utf8(data).ok()) .map(ToOwned::to_owned); + } else { + #[cfg(feature = "opentelemetry")] + if compare_no_case(key, b"traceparent") { + if let Some(hdr) = traceparent { + hdr.elide(); + } + traceparent = Some(header); + } else if compare_no_case(key, b"tracestate") { + if let Some(hdr) = tracestate { + hdr.elide(); + } + tracestate = Some(header); + } } } _ => {} } } + #[cfg(feature = "opentelemetry")] + let (otel, has_traceparent) = { + let mut otel = sozu_command_lib::logging::OpenTelemetry::default(); + let tp = traceparent + .as_ref() + .and_then(|hdr| parse_traceparent(&hdr.val, buf)) + .map(|(trace_id, parent_id)| (trace_id, Some(parent_id))); + // Remove tracestate if no traceparent is present + if let (None, Some(tracestate)) = (tp, tracestate) { + tracestate.elide(); + } + let (trace_id, parent_id) = tp.unwrap_or_else(|| (random_id(), None)); + otel.trace_id = trace_id; + otel.parent_span_id = parent_id; + otel.span_id = random_id(); + // Modify header if present + if let Some(id) = &mut traceparent { + let new_val = build_traceparent(&otel.trace_id, &otel.span_id); + id.val.modify(buf, &new_val); + } + (otel, traceparent.is_some()) + }; + // If session_address is set: // - append its ip address to the list of "X-Forwarded-For" if it was found, creates it if not // - append "proto=[PROTO];for=[PEER];by=[PUBLIC]" to the list of "Forwarded" if it was found, creates it if not @@ -240,6 +368,19 @@ impl HttpContext { })); } } + + #[cfg(feature = "opentelemetry")] + { + if !has_traceparent { + let val = build_traceparent(&otel.trace_id, &otel.span_id); + request.push_block(kawa::Block::Header(kawa::Pair { + key: kawa::Store::Static(b"traceparent"), + val: kawa::Store::from_slice(&val), + })); + } + self.otel = Some(otel); + } + if !has_x_port { request.push_block(kawa::Block::Header(kawa::Pair { key: kawa::Store::Static(b"X-Forwarded-Port"), @@ -252,7 +393,6 @@ impl HttpContext { val: kawa::Store::Static(proto.as_bytes()), })); } - // Create a "Connection" header in case it was not found and closing it set if !has_connection && self.closing { request.push_block(kawa::Block::Header(kawa::Pair { @@ -260,7 +400,6 @@ impl HttpContext { val: kawa::Store::Static(b"close"), })); } - // Create a custom "Sozu-Id" header request.push_block(kawa::Block::Header(kawa::Pair { key: kawa::Store::Static(b"Sozu-Id"), diff --git a/lib/src/protocol/kawa_h1/mod.rs b/lib/src/protocol/kawa_h1/mod.rs index 270ca7e7e..6be9d1498 100644 --- a/lib/src/protocol/kawa_h1/mod.rs +++ b/lib/src/protocol/kawa_h1/mod.rs @@ -241,28 +241,13 @@ impl Http Http Http Http Pipe { request_time: metrics.request_time(), bytes_in: metrics.bin, bytes_out: metrics.bout, - user_agent: None + user_agent: None, + otel: None, ); } diff --git a/lib/src/tcp.rs b/lib/src/tcp.rs index ea6955e86..93cb79ae4 100644 --- a/lib/src/tcp.rs +++ b/lib/src/tcp.rs @@ -232,7 +232,8 @@ impl TcpSession { response_time: self.metrics.backend_response_time(), request_time: self.metrics.request_time(), bytes_in: self.metrics.bin, - bytes_out: self.metrics.bout + bytes_out: self.metrics.bout, + otel: None, ); }