diff --git a/src/metrics/log_encoder.rs b/src/metrics/log_encoder.rs index 4d0d572996b..05f081ce4d7 100644 --- a/src/metrics/log_encoder.rs +++ b/src/metrics/log_encoder.rs @@ -2,7 +2,14 @@ use base64::write::EncoderWriter; use indexmap::IndexMap; use prometheus::proto::{MetricFamily, MetricType}; use prometheus::{Encoder, Error}; +use serde::ser::SerializeSeq; +use serde::{Serialize, Serializer as _}; +use serde_json::Serializer; +use std::cell::Cell; use std::io::Write; +use std::rc::Rc; + +const CHUNKS_MAX_SIZE_BYTES: usize = 5000; /// The `LogEncoder` struct encodes Prometheus metrics in the format [`crates-io-heroku-metrics`] /// expects metrics to be logged. This can be used to forward instance metrics to it, allowing them @@ -27,17 +34,17 @@ impl LogEncoder { } impl Encoder for LogEncoder { - fn encode( - &self, - families: &[MetricFamily], - mut dest: &mut W, - ) -> prometheus::Result<()> { + fn encode(&self, families: &[MetricFamily], dest: &mut W) -> prometheus::Result<()> { let events = families_to_json_events(families); - dest.write_all(b"crates-io-heroku-metrics:ingest ")?; - let base64_dest = EncoderWriter::new(&mut dest, base64::STANDARD); - serde_json::to_writer(base64_dest, &events).map_err(|e| Error::Msg(e.to_string()))?; - dest.write_all(b"\n")?; + let chunks = serialize_and_split_list(events.iter(), CHUNKS_MAX_SIZE_BYTES) + .map_err(|e| Error::Msg(e.to_string()))?; + + for chunk in chunks { + dest.write_all(b"crates-io-heroku-metrics:ingest ")?; + dest.write_all(&chunk)?; + dest.write_all(b"\n")?; + } Ok(()) } @@ -100,6 +107,72 @@ fn families_to_json_events(families: &[MetricFamily]) -> Vec> { events } +/// Serialize a list of items into multiple Base64-encoded JSON chunks. +/// +/// Our hosting platform (Heroku) limits the size of log lines, arbitrarily splitting them once +/// they reach a threshold. We can't let Heroku do the split as it doesn't know where to properly +/// do that, so we need to do the splitting ourselves. +/// +/// This function takes an iterator of serializable items and returns the serialized version, +/// possibly split into multiple chunks. Each chunk is *at least* `max_size_hint` long, as the +/// function stops serializing new items in the same chunk only when the size limit is reached +/// after serializing an item. +/// +/// Because of that `max_size_hint` should be lower than the upper bound we can't cross. +fn serialize_and_split_list<'a, S: Serialize + 'a>( + items: impl Iterator, + max_size_hint: usize, +) -> Result>, serde_json::Error> { + let mut items = items.peekable(); + + let mut result = Vec::new(); + while items.peek().is_some() { + let mut writer = TrackedWriter::new(); + let written_count = writer.written_count.clone(); + let mut serializer = Serializer::new(EncoderWriter::new(&mut writer, base64::STANDARD)); + + let mut seq = serializer.serialize_seq(None)?; + while let Some(next) = items.next() { + seq.serialize_element(next)?; + if written_count.get() >= max_size_hint { + break; + } + } + seq.end()?; + drop(serializer); + + result.push(writer.buffer); + } + + Ok(result) +} + +struct TrackedWriter { + buffer: Vec, + written_count: Rc>, +} + +impl TrackedWriter { + fn new() -> Self { + Self { + buffer: Vec::new(), + written_count: Rc::new(Cell::new(0)), + } + } +} + +impl Write for TrackedWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let written = self.buffer.write(buf)?; + self.written_count.set(self.written_count.get() + written); + Ok(written) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.buffer.flush() + } +} + #[derive(Serialize, Debug, PartialEq)] struct VectorEvent<'a> { metric: VectorMetric<'a>, @@ -349,4 +422,39 @@ mod tests { Ok(()) } + + #[test] + fn test_serialize_and_split_list_small() -> Result<(), Error> { + let small = (0..10).collect::>(); + let chunks = serialize_and_split_list(small.iter(), 256)?; + + assert_eq!(chunks.len(), 1); + assert!(chunks[0].len() <= 256); + assert_eq!( + serde_json::from_slice::>(&base64::decode(&chunks[0])?)?, + small, + ); + + Ok(()) + } + + #[test] + fn test_serialize_and_split_list_long() -> Result<(), Error> { + let small = (0..100).collect::>(); + let chunks = serialize_and_split_list(small.iter(), 256)?; + + assert_eq!(chunks.len(), 2); + assert!(chunks[0].len() >= 256); + assert!(chunks[1].len() <= 256); + assert_eq!( + serde_json::from_slice::>(&base64::decode(&chunks[0])?)?, + (0..=67).collect::>(), + ); + assert_eq!( + serde_json::from_slice::>(&base64::decode(&chunks[1])?)?, + (68..100).collect::>(), + ); + + Ok(()) + } }