Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ x509-parser = "0.16.0"
[profile.release]
lto = true
codegen-units = 1
debug = true
debug = true
1 change: 1 addition & 0 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ default = ["jemallocator"]
unstable = []
logs-debug = ["sozu-lib/logs-debug", "sozu-command-lib/logs-debug"]
logs-trace = ["sozu-lib/logs-trace", "sozu-command-lib/logs-trace"]
opentelemetry = ["sozu-lib/opentelemetry"]
tolerant-http1-parser = ["sozu-lib/tolerant-http1-parser"]

[badges]
Expand Down
8 changes: 8 additions & 0 deletions command/src/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,12 @@ message InitialState {
repeated WorkerRequest requests = 1;
}

message OpenTelemetry {
required string trace_id = 1;
required string span_id = 2;
optional string parent_span_id = 3;
}

// An access log, meant to be passed to another agent
message ProtobufAccessLog {
// error message if any
Expand Down Expand Up @@ -795,6 +801,8 @@ message ProtobufAccessLog {
optional uint64 request_time = 20;
// time for the backend to respond (microseconds)
optional uint64 response_time = 21;
// OpenTelemetry tracing information
optional OpenTelemetry otel = 22;
}

message ProtobufEndpoint {
Expand Down
34 changes: 32 additions & 2 deletions command/src/logging/access_logs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::{collections::BTreeMap, mem::ManuallyDrop, net::SocketAddr, time::Duration};

use rusty_ulid::Ulid;
use std::fmt::Formatter;
use std::{collections::BTreeMap, fmt, mem::ManuallyDrop, net::SocketAddr, time::Duration};

use crate::proto::command::OpenTelemetry as ProtobufOpenTelemetry;
use crate::{
logging::{LogLevel, Rfc3339Time},
proto::command::{
Expand Down Expand Up @@ -98,6 +99,26 @@
pub user_agent: Option<&'a str>,
}

#[derive(Default)]
pub struct OpenTelemetry {
pub trace_id: [u8; 32],
pub span_id: [u8; 16],
pub parent_span_id: Option<[u8; 16]>,
}

impl fmt::Debug for OpenTelemetry {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let trace_id = unsafe { std::str::from_utf8_unchecked(&self.trace_id) };
let span_id = unsafe { std::str::from_utf8_unchecked(&self.span_id) };
let parent_span_id = self
.parent_span_id
.as_ref()
.map(|id| unsafe { std::str::from_utf8_unchecked(id) })
.unwrap_or("-");
write!(f, "{trace_id} {span_id} {parent_span_id}")
}
}

/// Intermediate representation of an access log agnostic of the final format.
/// Every field is a reference to avoid capturing ownership (as a logger should).
pub struct RequestRecord<'a> {
Expand All @@ -118,6 +139,7 @@
pub request_time: Duration,
pub bytes_in: usize,
pub bytes_out: usize,
pub otel: Option<&'a OpenTelemetry>,

// added by the logger itself
pub pid: i32,
Expand All @@ -128,7 +150,7 @@
}

impl RequestRecord<'_> {
pub fn full_tags(&self) -> FullTags {

Check warning on line 153 in command/src/logging/access_logs.rs

View workflow job for this annotation

GitHub Actions / Build bench_logger 🦀

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 153 in command/src/logging/access_logs.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 153 in command/src/logging/access_logs.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 153 in command/src/logging/access_logs.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 153 in command/src/logging/access_logs.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 153 in command/src/logging/access_logs.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 153 in command/src/logging/access_logs.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 153 in command/src/logging/access_logs.rs

View workflow job for this annotation

GitHub Actions / Build documentation

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 153 in command/src/logging/access_logs.rs

View workflow job for this annotation

GitHub Actions / Build bench_logger 🦀

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 153 in command/src/logging/access_logs.rs

View workflow job for this annotation

GitHub Actions / Build documentation

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 153 in command/src/logging/access_logs.rs

View workflow job for this annotation

GitHub Actions / Build Sozu 🦀

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 153 in command/src/logging/access_logs.rs

View workflow job for this annotation

GitHub Actions / Build Sozu 🦀

hiding a lifetime that's elided elsewhere is confusing
FullTags {
concatenated: self.tags.as_ref().map(|t| t.concatenated.as_str()),
user_agent: self.user_agent,
Expand Down Expand Up @@ -182,6 +204,14 @@
tag: self.tag.duplicate(),
time: self.precise_time.into(),
request_time: Some(self.request_time.as_micros() as u64),
otel: self.otel.map(|otel| ProtobufOpenTelemetry {
trace_id: std::str::from_utf8_unchecked(&otel.trace_id).duplicate(),
span_id: std::str::from_utf8_unchecked(&otel.span_id).duplicate(),
parent_span_id: otel
.parent_span_id
.as_ref()
.map(|id| std::str::from_utf8_unchecked(id).duplicate()),
}),
})
}
}
Expand Down
5 changes: 3 additions & 2 deletions command/src/logging/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl InnerLogger {
log.tag,
],
standard: {
formats: ["{} {} {} {}/{}/{}/{}/{} {} {} [{}] {} {}{}\n"],
formats: ["{} {} {} {}/{}/{}/{}/{} {} {} [{}] {:?} {} {}{}\n"],
args: [
log.context,
log.session_address.as_string_or("-"),
Expand All @@ -289,13 +289,14 @@ impl InnerLogger {
log.bytes_in,
log.bytes_out,
log.full_tags(),
log.otel,
log.protocol,
log.endpoint,
LogMessage(log.message),
]
},
colored: {
formats: ["\x1b[;1m{}\x1b[m {} {} {}/{}/{}/{}/{} {} {} \x1b[2m[{}] \x1b[;1m{} {:#}\x1b[m{}\n"],
formats: ["\x1b[;1m{}\x1b[m {} {} {}/{}/{}/{}/{} {} {} \x1b[2m[{}] {:?} \x1b[;1m{} {:#}\x1b[m{}\n"],
args: @,
}
},
Expand Down
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ tiny_http = { workspace = true }
default = ["simd"]
logs-debug = []
logs-trace = []
opentelemetry = []
simd = ["kawa/simd"]
splice = []
tolerant-http1-parser = ["kawa/tolerant-parsing"]
unstable = []

[badges]
travis-ci = { repository = "sozu-proxy/sozu" }
159 changes: 149 additions & 10 deletions lib/src/protocol/kawa_h1/editor.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,67 @@
use crate::{
pool::Checkout,
protocol::http::{parser::compare_no_case, GenericHttpStream, Method},
Protocol,
};
use rusty_ulid::Ulid;
use sozu_command_lib::logging::LogContext;
use std::{
net::{IpAddr, SocketAddr},
str::{from_utf8, from_utf8_unchecked},
};

use rusty_ulid::Ulid;
#[cfg(feature = "opentelemetry")]
fn parse_traceparent(val: &kawa::Store, buf: &[u8]) -> Option<([u8; 32], [u8; 16])> {
let val = val.data(buf);
let (version, val) = parse_hex::<2>(val)?;
if version.as_slice() != b"00" {
return None;
}
let val = skip_separator(val)?;
let (trace_id, val) = parse_hex::<32>(val)?;
let val = skip_separator(val)?;
let (parent_id, val) = parse_hex::<16>(val)?;
let val = skip_separator(val)?;
let (_, val) = parse_hex::<2>(val)?;
val.is_empty().then_some((trace_id, parent_id))
}

use crate::{
pool::Checkout,
protocol::http::{parser::compare_no_case, GenericHttpStream, Method},
Protocol,
};
#[cfg(feature = "opentelemetry")]
fn parse_hex<const N: usize>(buf: &[u8]) -> Option<([u8; N], &[u8])> {
let val: [u8; N] = buf.get(..N)?.try_into().unwrap();
val.iter()
.all(|c| c.is_ascii_hexdigit())
.then_some((val, &buf[N..]))
}

use sozu_command_lib::logging::LogContext;
#[cfg(feature = "opentelemetry")]
fn skip_separator(buf: &[u8]) -> Option<&[u8]> {
buf.first().filter(|b| **b == b'-').map(|_| &buf[1..])
}

#[cfg(feature = "opentelemetry")]
fn random_id<const N: usize>() -> [u8; N] {
use rand::Rng;
const CHARSET: &[u8] = b"0123456789abcdef";
let mut rng = rand::thread_rng();
let mut buf = [0; N];
buf.fill_with(|| {
let n = rng.gen_range(0..CHARSET.len());
CHARSET[n]
});
buf
}

#[cfg(feature = "opentelemetry")]
fn build_traceparent(trace_id: &[u8; 32], parent_id: &[u8; 16]) -> [u8; 55] {
let mut buf = [0; 55];
buf[..3].copy_from_slice(b"00-");
buf[3..35].copy_from_slice(trace_id);
buf[35] = b'-';
buf[36..52].copy_from_slice(parent_id);
buf[52..55].copy_from_slice(b"-01");
buf
}

/// This is the container used to store and use information about the session from within a Kawa parser callback
#[derive(Debug)]
Expand All @@ -37,6 +87,9 @@
// ---------- Additional optional data
pub user_agent: Option<String>,

#[cfg(feature = "opentelemetry")]
pub otel: Option<sozu_command::logging::OpenTelemetry>,

// ========== Read only
/// signals wether Kawa should write a "Connection" header with a "close" value (request and response)
pub closing: bool,
Expand Down Expand Up @@ -67,6 +120,41 @@
}

impl HttpContext {
/// Creates a new instance
pub fn new(
request_id: Ulid,
protocol: Protocol,
public_address: SocketAddr,
session_address: Option<SocketAddr>,
sticky_name: String,
) -> Self {
Self {
id: request_id,
backend_id: None,
cluster_id: None,

closing: false,
keep_alive_backend: true,
keep_alive_frontend: true,
protocol,
public_address,
session_address,
sticky_name,
sticky_session: None,
sticky_session_found: None,

method: None,
authority: None,
path: None,
status: None,
reason: None,
user_agent: None,

#[cfg(feature = "opentelemetry")]
otel: Default::default(),
}
}

/// Callback for request:
///
/// - edit headers (connection, forwarded, sticky cookie, sozu-id)
Expand All @@ -78,7 +166,7 @@
/// - sticky cookie
/// - user-agent
fn on_request_headers(&mut self, request: &mut GenericHttpStream) {
let buf = &mut request.storage.mut_buffer();
let buf = request.storage.mut_buffer();

// Captures the request line
if let kawa::StatusLine::Request {
Expand Down Expand Up @@ -135,6 +223,10 @@
let mut has_x_port = false;
let mut has_x_proto = false;
let mut has_connection = false;
#[cfg(feature = "opentelemetry")]
let mut traceparent: Option<&mut kawa::Pair> = None;
#[cfg(feature = "opentelemetry")]
let mut tracestate: Option<&mut kawa::Pair> = None;
for block in &mut request.blocks {
match block {
kawa::Block::Header(header) if !header.is_elided() => {
Expand Down Expand Up @@ -182,12 +274,48 @@
.data_opt(buf)
.and_then(|data| from_utf8(data).ok())
.map(ToOwned::to_owned);
} else {
#[cfg(feature = "opentelemetry")]
if compare_no_case(key, b"traceparent") {
if let Some(hdr) = traceparent {
hdr.elide();
}
traceparent = Some(header);
} else if compare_no_case(key, b"tracestate") {
if let Some(hdr) = tracestate {
hdr.elide();
}
tracestate = Some(header);
}
}
}
_ => {}
}
}

#[cfg(feature = "opentelemetry")]
let (otel, has_traceparent) = {
let mut otel = sozu_command_lib::logging::OpenTelemetry::default();
let tp = traceparent
.as_ref()
.and_then(|hdr| parse_traceparent(&hdr.val, buf))
.map(|(trace_id, parent_id)| (trace_id, Some(parent_id)));
// Remove tracestate if no traceparent is present
if let (None, Some(tracestate)) = (tp, tracestate) {
tracestate.elide();
}
let (trace_id, parent_id) = tp.unwrap_or_else(|| (random_id(), None));
otel.trace_id = trace_id;
otel.parent_span_id = parent_id;
otel.span_id = random_id();
// Modify header if present
if let Some(id) = &mut traceparent {
let new_val = build_traceparent(&otel.trace_id, &otel.span_id);
id.val.modify(buf, &new_val);
}
(otel, traceparent.is_some())
};

// If session_address is set:
// - append its ip address to the list of "X-Forwarded-For" if it was found, creates it if not
// - append "proto=[PROTO];for=[PEER];by=[PUBLIC]" to the list of "Forwarded" if it was found, creates it if not
Expand Down Expand Up @@ -240,6 +368,19 @@
}));
}
}

#[cfg(feature = "opentelemetry")]
{
if !has_traceparent {
let val = build_traceparent(&otel.trace_id, &otel.span_id);
request.push_block(kawa::Block::Header(kawa::Pair {
key: kawa::Store::Static(b"traceparent"),
val: kawa::Store::from_slice(&val),
}));
}
self.otel = Some(otel);
}

if !has_x_port {
request.push_block(kawa::Block::Header(kawa::Pair {
key: kawa::Store::Static(b"X-Forwarded-Port"),
Expand All @@ -252,15 +393,13 @@
val: kawa::Store::Static(proto.as_bytes()),
}));
}

// Create a "Connection" header in case it was not found and closing it set
if !has_connection && self.closing {
request.push_block(kawa::Block::Header(kawa::Pair {
key: kawa::Store::Static(b"Connection"),
val: kawa::Store::Static(b"close"),
}));
}

// Create a custom "Sozu-Id" header
request.push_block(kawa::Block::Header(kawa::Pair {
key: kawa::Store::Static(b"Sozu-Id"),
Expand Down Expand Up @@ -344,7 +483,7 @@
self.user_agent = None;
}

pub fn log_context(&self) -> LogContext {

Check warning on line 486 in lib/src/protocol/kawa_h1/editor.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 486 in lib/src/protocol/kawa_h1/editor.rs

View workflow job for this annotation

GitHub Actions / Test (false, beta)

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 486 in lib/src/protocol/kawa_h1/editor.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 486 in lib/src/protocol/kawa_h1/editor.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 486 in lib/src/protocol/kawa_h1/editor.rs

View workflow job for this annotation

GitHub Actions / Test (false, stable)

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 486 in lib/src/protocol/kawa_h1/editor.rs

View workflow job for this annotation

GitHub Actions / Test (nightly, true)

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 486 in lib/src/protocol/kawa_h1/editor.rs

View workflow job for this annotation

GitHub Actions / Build documentation

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 486 in lib/src/protocol/kawa_h1/editor.rs

View workflow job for this annotation

GitHub Actions / Build documentation

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 486 in lib/src/protocol/kawa_h1/editor.rs

View workflow job for this annotation

GitHub Actions / Build Sozu 🦀

hiding a lifetime that's elided elsewhere is confusing

Check warning on line 486 in lib/src/protocol/kawa_h1/editor.rs

View workflow job for this annotation

GitHub Actions / Build Sozu 🦀

hiding a lifetime that's elided elsewhere is confusing
LogContext {
request_id: self.id,
cluster_id: self.cluster_id.as_deref(),
Expand Down
Loading
Loading