Skip to content

Commit a5dd302

Browse files
committed
feat(http/prom): add status middleware
this commit introduces a new submodule to `linkerd-http-prom`. this submodule includes a `NewRecordStatusCode<N>` middleware that records Prometheus metrics counting response status codes for HTTP and gRPC traffic. Signed-off-by: katelyn martin <[email protected]>
1 parent 215ab04 commit a5dd302

File tree

7 files changed

+841
-4
lines changed

7 files changed

+841
-4
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1836,10 +1836,12 @@ dependencies = [
18361836
"futures",
18371837
"http",
18381838
"http-body",
1839+
"http-body-util",
18391840
"linkerd-error",
18401841
"linkerd-http-body-eos",
18411842
"linkerd-http-box",
18421843
"linkerd-metrics",
1844+
"linkerd-mock-http-body",
18431845
"linkerd-stack",
18441846
"pin-project",
18451847
"prometheus-client",

linkerd/http/prom/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@ test-util = []
1414

1515
[dependencies]
1616
bytes = { workspace = true }
17-
futures = { version = "0.3", default-features = false }
17+
futures = { version = "0.3", default-features = false, features = ["executor"] }
1818
http = { workspace = true }
1919
http-body = { workspace = true }
20+
http-body-util = { workspace = true }
2021
pin-project = "1"
2122
prometheus-client = { workspace = true }
2223
thiserror = "2"
@@ -28,3 +29,6 @@ linkerd-http-body-eos = { path = "../body-eos" }
2829
linkerd-http-box = { path = "../box" }
2930
linkerd-metrics = { path = "../../metrics" }
3031
linkerd-stack = { path = "../../stack" }
32+
33+
[dev-dependencies]
34+
linkerd-mock-http-body = { path = "../../mock/http-body" }

linkerd/http/prom/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@
44
pub mod body_data;
55
pub mod count_reqs;
66
pub mod record_response;
7+
pub mod status;
78
pub mod stream_label;

linkerd/http/prom/src/record_response.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub struct Params<L: MkStreamLabel, M> {
3232

3333
#[derive(Clone, Debug, thiserror::Error)]
3434
#[error("request was cancelled before completion")]
35-
pub struct RequestCancelled(());
35+
pub struct RequestCancelled(pub ());
3636

3737
/// Instruments an `N`-typed [`svc::NewService<T>`] with metrics.
3838
///

linkerd/http/prom/src/status.rs

Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
//! A tower middleware for counting response status codes.
2+
3+
use crate::{
4+
record_response::RequestCancelled,
5+
stream_label::{LabelSet, MkStreamLabel, StreamLabel},
6+
};
7+
use http::{Request, Response};
8+
use http_body::Body;
9+
use linkerd_error::Error;
10+
use linkerd_http_body_eos::{BodyWithEosFn, EosRef};
11+
use linkerd_metrics::prom::{Counter, Family, Registry};
12+
use linkerd_stack::{self as svc, layer::Layer, ExtractParam, NewService, Service};
13+
use pin_project::pin_project;
14+
use std::{
15+
future::Future,
16+
hash::Hash,
17+
marker::PhantomData,
18+
pin::Pin,
19+
task::{Context, Poll},
20+
};
21+
22+
#[cfg(test)]
23+
mod tests;
24+
25+
/// Wraps a [`NewService<T>`] with response status code metrics.
26+
pub struct NewRecordStatusCode<N, X, ML, L> {
27+
/// The inner wrapped service generator.
28+
mk_svc: N,
29+
/// Extracts an `ML`-typed [`MkStreamLabel`] from targets.
30+
extract: X,
31+
/// Marker indicating the `ML`-typed [`MkStreamLabel`] extracted from targets.
32+
labeler: PhantomData<ML>,
33+
/// Marker indicating the labels for the [`Family<L>`] extracted from targets.
34+
label: PhantomData<L>,
35+
}
36+
37+
/// Wraps a [`Service<T>`] with response status code metrics.
38+
#[derive(Clone)]
39+
pub struct RecordStatusCode<S, ML, L> {
40+
/// The inner wrapped service.
41+
svc: S,
42+
/// A [`MkStreamLabel`] implementation to label the service's traffic.
43+
///
44+
/// This generates an `L`-typed label set to acquire a [`Counter`] from the metrics family.
45+
mk_stream_label: ML,
46+
/// The [`Family`] of Prometheus [`Counter`]s tracking status codes.
47+
metrics: StatusMetrics<L>,
48+
}
49+
50+
/// A [`Future`] returned by a [`RecordStatusCode<S>`] service.
51+
#[pin_project(project = RecordStatusFutureProj)]
52+
pub enum RecordStatusFuture<F, SL, L> {
53+
/// A transparent [`RecordStatusFuture`].
54+
///
55+
/// This means that the [`MkStreamLabel`] did not emit a [`StreamLabel`], and that the
56+
/// request/response pair did not need to be recorded.
57+
Passthru(#[pin] F),
58+
/// An instrumented [`RecordStatusFuture`].
59+
///
60+
/// This will use the `SL`-typed [`StreamLabel`] type to label the traffic.
61+
Instrumented {
62+
/// The inner wrapped future.
63+
#[pin]
64+
fut: F,
65+
stream_label: Option<SL>,
66+
/// A [`Family`] of labeled counters.
67+
metrics: StatusMetrics<L>,
68+
},
69+
}
70+
71+
/// Parameters for [`NewRecordStatusCode<S, ML, L>`] services.
72+
pub struct Params<ML, L> {
73+
/// A [`MkStreamLabel`] implementation to label the service's traffic.
74+
///
75+
/// This generates an `L`-typed label set to acquire a [`Counter`] from the metrics family.
76+
pub mk_stream_label: ML,
77+
/// The [`Family`] of Prometheus [`Counter`]s tracking status codes.
78+
pub metrics: StatusMetrics<L>,
79+
}
80+
81+
/// Prometheus metrics for [`NewRecordStatusCode<N, X, ML, L>`].
82+
#[derive(Clone, Debug)]
83+
pub struct StatusMetrics<L> {
84+
counters: CounterFamily<L>,
85+
}
86+
87+
/// A [`Family`] of labeled counters.
88+
type CounterFamily<L> = Family<L, Counter>;
89+
90+
/// A [`Body`] returned by [`RecordStatusFuture`].
91+
pub type RecordStatusBody<B> = http_body_util::Either<InstrumentedBody<B>, B>;
92+
93+
/// A [`Body`] that will invoke a closure upon reaching the end of the stream.
94+
type InstrumentedBody<B> = BodyWithEosFn<B, EosCallback>;
95+
96+
/// A boxed callback used by [`InstrumentedBody<B>`] to inspect the end of the stream.
97+
type EosCallback = Box<dyn FnOnce(EosRef<'_, Error>) + Send>;
98+
99+
// === impl NewRecordStatusCode ===
100+
101+
impl<N, X, ML, L> NewRecordStatusCode<N, X, ML, L>
102+
where
103+
X: Clone,
104+
{
105+
/// Returns a [`Layer`] that can be applied to an inner [`NewService<T>`].
106+
pub fn layer_via(extract: X) -> impl Layer<N, Service = Self> {
107+
svc::layer::mk(move |inner| Self {
108+
mk_svc: inner,
109+
extract: extract.clone(),
110+
labeler: PhantomData,
111+
label: PhantomData,
112+
})
113+
}
114+
115+
/// A helper to confirm that this type can be used as a [`NewService<T>`].
116+
///
117+
/// This helps provide friendlier error messages.
118+
pub fn check_new_service<T>(self) -> Self
119+
where
120+
N: NewService<T>,
121+
X: ExtractParam<Params<ML, L>, T>,
122+
{
123+
self
124+
}
125+
}
126+
127+
impl<N, T, L, ML, X> NewService<T> for NewRecordStatusCode<N, X, ML, L>
128+
where
129+
N: NewService<T>,
130+
X: ExtractParam<Params<ML, L>, T>,
131+
{
132+
type Service = RecordStatusCode<N::Service, ML, L>;
133+
134+
fn new_service(&self, target: T) -> Self::Service {
135+
let Self {
136+
mk_svc,
137+
extract,
138+
labeler: _,
139+
label: _,
140+
} = self;
141+
142+
// Extract a stream labeler and a family of counters.
143+
let Params {
144+
mk_stream_label,
145+
metrics,
146+
} = extract.extract_param(&target);
147+
let svc = mk_svc.new_service(target);
148+
149+
RecordStatusCode {
150+
svc,
151+
mk_stream_label,
152+
metrics,
153+
}
154+
}
155+
}
156+
157+
// === impl RecordStatusCode ===
158+
159+
impl<S, ML, L, ReqB, RspB> Service<Request<ReqB>> for RecordStatusCode<S, ML, L>
160+
where
161+
S: Service<Request<ReqB>, Response = Response<RspB>, Error = Error>,
162+
S::Error: Into<Error>,
163+
ML: MkStreamLabel<StatusLabels = L>,
164+
RspB: Body<Error = Error>,
165+
L: Clone + Hash + Eq + Send + Sync + 'static,
166+
{
167+
type Response = Response<RecordStatusBody<RspB>>;
168+
type Error = Error;
169+
type Future = RecordStatusFuture<S::Future, ML::StreamLabel, ML::StatusLabels>;
170+
171+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
172+
let Self {
173+
svc,
174+
mk_stream_label: _,
175+
metrics: _,
176+
} = self;
177+
178+
svc.poll_ready(cx).map_err(Into::into)
179+
}
180+
181+
fn call(&mut self, req: http::Request<ReqB>) -> Self::Future {
182+
let Self {
183+
svc,
184+
mk_stream_label,
185+
metrics,
186+
} = self;
187+
188+
if let stream_label @ Some(_) = mk_stream_label.mk_stream_labeler(&req) {
189+
// If this request should be recorded, return an instrumented future.
190+
let fut = svc.call(req);
191+
let metrics = metrics.clone();
192+
RecordStatusFuture::Instrumented {
193+
fut,
194+
stream_label,
195+
metrics,
196+
}
197+
} else {
198+
let fut = svc.call(req);
199+
RecordStatusFuture::Passthru(fut)
200+
}
201+
}
202+
}
203+
204+
// === impl RecordStatusFuture ===
205+
206+
impl<F, B, SL, L> Future for RecordStatusFuture<F, SL, L>
207+
where
208+
F: Future<Output = Result<Response<B>, Error>>,
209+
B: Body<Error = Error>,
210+
SL: StreamLabel<StatusLabels = L>,
211+
L: Clone + Hash + Eq + Send + Sync + 'static,
212+
{
213+
type Output = Result<Response<RecordStatusBody<B>>, Error>;
214+
215+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
216+
use http_body_util::Either::{Left as Enabled, Right as Disabled};
217+
218+
match self.project() {
219+
// If this is a transparent future, poll the future and return the response.
220+
RecordStatusFutureProj::Passthru(fut) => fut.poll(cx).map_ok(|rsp| rsp.map(Disabled)),
221+
// If this reponse should be recorded, instrument the response when it is ready.
222+
RecordStatusFutureProj::Instrumented {
223+
fut,
224+
stream_label,
225+
metrics,
226+
} => {
227+
// Wait for the inner future. Once ready, take the labeler and metric family.
228+
let rsp = futures::ready!(fut.poll(cx));
229+
let metrics = metrics.clone();
230+
let mut stream_label = stream_label.take().expect("futures only yield once");
231+
232+
let rsp = match rsp {
233+
Ok(rsp) => {
234+
// Observe the start of the response, and then instrument the response
235+
// body with a callback that will record the outcome once finished.
236+
stream_label.init_response(&rsp);
237+
let on_eos =
238+
move |eos: EosRef<'_>| Self::on_eos(eos, stream_label, metrics);
239+
let instrument = |body| InstrumentedBody::new(body, Box::new(on_eos));
240+
Ok(rsp.map(instrument).map(Enabled))
241+
}
242+
Err(err) => {
243+
// Record an error if there is no response.
244+
Self::on_eos(EosRef::Error(&err), stream_label, metrics);
245+
Err(err)
246+
}
247+
};
248+
249+
Poll::Ready(rsp)
250+
}
251+
}
252+
}
253+
}
254+
255+
impl<F, SL, L> RecordStatusFuture<F, SL, L>
256+
where
257+
SL: StreamLabel<StatusLabels = L>,
258+
L: Clone + Hash + Eq + Send + Sync + 'static,
259+
{
260+
fn on_eos(eos: EosRef<'_, Error>, mut stream_label: SL, metrics: StatusMetrics<L>) {
261+
let ugh = RequestCancelled(()).into(); // XXX(kate)
262+
263+
stream_label.end_response(match eos {
264+
EosRef::None => Ok(None),
265+
EosRef::Trailers(trls) => Ok(Some(trls)),
266+
EosRef::Error(error) => Err(error),
267+
EosRef::Cancelled => Err(&ugh),
268+
});
269+
270+
let labels = stream_label.status_labels();
271+
let counter = metrics.metric(&labels);
272+
counter.inc();
273+
}
274+
}
275+
276+
// === impl StatusMetrics ===
277+
278+
impl<L> Default for StatusMetrics<L>
279+
where
280+
L: LabelSet,
281+
{
282+
fn default() -> Self {
283+
Self {
284+
counters: Default::default(),
285+
}
286+
}
287+
}
288+
289+
impl<L> StatusMetrics<L>
290+
where
291+
L: LabelSet,
292+
{
293+
/// Registers a new [`StatusMetrics<L>`] with the given metrics registry.
294+
pub fn register(registry: &mut Registry, help: &'static str) -> Self {
295+
let counters = Family::default();
296+
297+
registry.register("statuses", help, counters.clone());
298+
299+
Self { counters }
300+
}
301+
}
302+
303+
impl<L> StatusMetrics<L>
304+
where
305+
L: Clone + Hash + Eq,
306+
{
307+
pub fn metric(&self, labels: &L) -> Counter {
308+
let Self { counters } = self;
309+
310+
counters.get_or_create(labels).to_owned()
311+
}
312+
}

0 commit comments

Comments
 (0)