Skip to content

Commit d179227

Browse files
committed
Auto merge of #3690 - pietroalbini:max-metrics-size, r=jtgeibel
Split logged instance metrics into multiple lines when necessary Heroku has a limit for log lines of 10,000 bytes, and when a line reaches that size the platform will automatically split it into multiple lines, breaking the tooling reading it. To avoid that problem when outputting metrics, this commit implements the splitting on the application side when a line is longer than 5,000 bytes. The threshold is just 5,000 because the newline is inserted *after* the item that made the line too long, so it's possible that the actual lines will be longer than 5,000 bytes. Fixes #3689.
2 parents bbcc4ce + 7604682 commit d179227

File tree

1 file changed

+117
-9
lines changed

1 file changed

+117
-9
lines changed

src/metrics/log_encoder.rs

Lines changed: 117 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,14 @@ use base64::write::EncoderWriter;
22
use indexmap::IndexMap;
33
use prometheus::proto::{MetricFamily, MetricType};
44
use prometheus::{Encoder, Error};
5+
use serde::ser::SerializeSeq;
6+
use serde::{Serialize, Serializer as _};
7+
use serde_json::Serializer;
8+
use std::cell::Cell;
59
use std::io::Write;
10+
use std::rc::Rc;
11+
12+
const CHUNKS_MAX_SIZE_BYTES: usize = 5000;
613

714
/// The `LogEncoder` struct encodes Prometheus metrics in the format [`crates-io-heroku-metrics`]
815
/// expects metrics to be logged. This can be used to forward instance metrics to it, allowing them
@@ -27,17 +34,17 @@ impl LogEncoder {
2734
}
2835

2936
impl Encoder for LogEncoder {
30-
fn encode<W: Write>(
31-
&self,
32-
families: &[MetricFamily],
33-
mut dest: &mut W,
34-
) -> prometheus::Result<()> {
37+
fn encode<W: Write>(&self, families: &[MetricFamily], dest: &mut W) -> prometheus::Result<()> {
3538
let events = families_to_json_events(families);
3639

37-
dest.write_all(b"crates-io-heroku-metrics:ingest ")?;
38-
let base64_dest = EncoderWriter::new(&mut dest, base64::STANDARD);
39-
serde_json::to_writer(base64_dest, &events).map_err(|e| Error::Msg(e.to_string()))?;
40-
dest.write_all(b"\n")?;
40+
let chunks = serialize_and_split_list(events.iter(), CHUNKS_MAX_SIZE_BYTES)
41+
.map_err(|e| Error::Msg(e.to_string()))?;
42+
43+
for chunk in chunks {
44+
dest.write_all(b"crates-io-heroku-metrics:ingest ")?;
45+
dest.write_all(&chunk)?;
46+
dest.write_all(b"\n")?;
47+
}
4148

4249
Ok(())
4350
}
@@ -100,6 +107,72 @@ fn families_to_json_events(families: &[MetricFamily]) -> Vec<VectorEvent<'_>> {
100107
events
101108
}
102109

110+
/// Serialize a list of items into multiple Base64-encoded JSON chunks.
111+
///
112+
/// Our hosting platform (Heroku) limits the size of log lines, arbitrarily splitting them once
113+
/// they reach a threshold. We can't let Heroku do the split as it doesn't know where to properly
114+
/// do that, so we need to do the splitting ourselves.
115+
///
116+
/// This function takes an iterator of serializable items and returns the serialized version,
117+
/// possibly split into multiple chunks. Each chunk is *at least* `max_size_hint` long, as the
118+
/// function stops serializing new items in the same chunk only when the size limit is reached
119+
/// after serializing an item.
120+
///
121+
/// Because of that `max_size_hint` should be lower than the upper bound we can't cross.
122+
fn serialize_and_split_list<'a, S: Serialize + 'a>(
123+
items: impl Iterator<Item = &'a S>,
124+
max_size_hint: usize,
125+
) -> Result<Vec<Vec<u8>>, serde_json::Error> {
126+
let mut items = items.peekable();
127+
128+
let mut result = Vec::new();
129+
while items.peek().is_some() {
130+
let mut writer = TrackedWriter::new();
131+
let written_count = writer.written_count.clone();
132+
let mut serializer = Serializer::new(EncoderWriter::new(&mut writer, base64::STANDARD));
133+
134+
let mut seq = serializer.serialize_seq(None)?;
135+
while let Some(next) = items.next() {
136+
seq.serialize_element(next)?;
137+
if written_count.get() >= max_size_hint {
138+
break;
139+
}
140+
}
141+
seq.end()?;
142+
drop(serializer);
143+
144+
result.push(writer.buffer);
145+
}
146+
147+
Ok(result)
148+
}
149+
150+
struct TrackedWriter {
151+
buffer: Vec<u8>,
152+
written_count: Rc<Cell<usize>>,
153+
}
154+
155+
impl TrackedWriter {
156+
fn new() -> Self {
157+
Self {
158+
buffer: Vec::new(),
159+
written_count: Rc::new(Cell::new(0)),
160+
}
161+
}
162+
}
163+
164+
impl Write for TrackedWriter {
165+
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
166+
let written = self.buffer.write(buf)?;
167+
self.written_count.set(self.written_count.get() + written);
168+
Ok(written)
169+
}
170+
171+
fn flush(&mut self) -> std::io::Result<()> {
172+
self.buffer.flush()
173+
}
174+
}
175+
103176
#[derive(Serialize, Debug, PartialEq)]
104177
struct VectorEvent<'a> {
105178
metric: VectorMetric<'a>,
@@ -349,4 +422,39 @@ mod tests {
349422

350423
Ok(())
351424
}
425+
426+
#[test]
427+
fn test_serialize_and_split_list_small() -> Result<(), Error> {
428+
let small = (0..10).collect::<Vec<_>>();
429+
let chunks = serialize_and_split_list(small.iter(), 256)?;
430+
431+
assert_eq!(chunks.len(), 1);
432+
assert!(chunks[0].len() <= 256);
433+
assert_eq!(
434+
serde_json::from_slice::<Vec<usize>>(&base64::decode(&chunks[0])?)?,
435+
small,
436+
);
437+
438+
Ok(())
439+
}
440+
441+
#[test]
442+
fn test_serialize_and_split_list_long() -> Result<(), Error> {
443+
let small = (0..100).collect::<Vec<_>>();
444+
let chunks = serialize_and_split_list(small.iter(), 256)?;
445+
446+
assert_eq!(chunks.len(), 2);
447+
assert!(chunks[0].len() >= 256);
448+
assert!(chunks[1].len() <= 256);
449+
assert_eq!(
450+
serde_json::from_slice::<Vec<usize>>(&base64::decode(&chunks[0])?)?,
451+
(0..=67).collect::<Vec<_>>(),
452+
);
453+
assert_eq!(
454+
serde_json::from_slice::<Vec<usize>>(&base64::decode(&chunks[1])?)?,
455+
(68..100).collect::<Vec<_>>(),
456+
);
457+
458+
Ok(())
459+
}
352460
}

0 commit comments

Comments
 (0)