|
1 | | -// TODO(kate): write a middleware for request body. |
| 1 | +//! Tower middleware to instrument request bodies. |
| 2 | +
|
| 3 | +pub use super::metrics::{BodyDataMetrics, RequestBodyFamilies}; |
| 4 | + |
| 5 | +use http::{Request, Response}; |
| 6 | +use http_body::Body; |
| 7 | +use linkerd_error::Error; |
| 8 | +use linkerd_http_box::BoxBody; |
| 9 | +use linkerd_stack::{self as svc, layer::Layer, ExtractParam, NewService, Service}; |
| 10 | +use std::{future::Future, pin::Pin}; |
| 11 | + |
| 12 | +/// A [`NewService<T>`] that creates [`RecordBodyData`] services. |
| 13 | +#[derive(Clone, Debug)] |
| 14 | +pub struct NewRecordBodyData<X, N> { |
| 15 | + /// The [`ExtractParam<P, T>`] strategy for obtaining our parameters. |
| 16 | + extract: X, |
| 17 | + /// The inner [`NewService<T>`]. |
| 18 | + inner: N, |
| 19 | +} |
| 20 | + |
| 21 | +/// Tracks body frames for an inner `S`-typed [`Service`]. |
| 22 | +#[derive(Clone, Debug)] |
| 23 | +pub struct RecordBodyData<S> { |
| 24 | + /// The inner [`Service<T>`]. |
| 25 | + inner: S, |
| 26 | + /// The metrics to be affixed to the response body. |
| 27 | + metrics: BodyDataMetrics, |
| 28 | +} |
| 29 | + |
| 30 | +// === impl NewRecordBodyData === |
| 31 | + |
| 32 | +impl<X: Clone, N> NewRecordBodyData<X, N> { |
| 33 | + /// Returns a [`Layer<S>`] that tracks body chunks. |
| 34 | + /// |
| 35 | + /// This uses an `X`-typed [`ExtractParam<P, T>`] implementation to extract service parameters |
| 36 | + /// from a `T`-typed target. |
| 37 | + pub fn layer_via(extract: X) -> impl Layer<N, Service = Self> { |
| 38 | + svc::layer::mk(move |inner| Self { |
| 39 | + extract: extract.clone(), |
| 40 | + inner, |
| 41 | + }) |
| 42 | + } |
| 43 | +} |
| 44 | + |
| 45 | +impl<T, X, N> NewService<T> for NewRecordBodyData<X, N> |
| 46 | +where |
| 47 | + X: ExtractParam<BodyDataMetrics, T>, |
| 48 | + N: NewService<T>, |
| 49 | +{ |
| 50 | + type Service = RecordBodyData<N::Service>; |
| 51 | + |
| 52 | + fn new_service(&self, target: T) -> Self::Service { |
| 53 | + let Self { extract, inner } = self; |
| 54 | + |
| 55 | + let metrics = extract.extract_param(&target); |
| 56 | + let inner = inner.new_service(target); |
| 57 | + |
| 58 | + RecordBodyData { inner, metrics } |
| 59 | + } |
| 60 | +} |
| 61 | + |
| 62 | +// === impl RecordBodyData === |
| 63 | + |
| 64 | +impl<ReqB, RespB, S> Service<Request<ReqB>> for RecordBodyData<S> |
| 65 | +where |
| 66 | + S: Service<Request<ReqB>, Response = Response<RespB>>, |
| 67 | + S::Future: Send + 'static, |
| 68 | + RespB: Body + Send + 'static, |
| 69 | + RespB::Data: Send + 'static, |
| 70 | + RespB::Error: Into<Error>, |
| 71 | +{ |
| 72 | + type Response = Response<BoxBody>; |
| 73 | + type Error = S::Error; |
| 74 | + type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>; |
| 75 | + |
| 76 | + #[inline] |
| 77 | + fn poll_ready( |
| 78 | + &mut self, |
| 79 | + cx: &mut std::task::Context<'_>, |
| 80 | + ) -> std::task::Poll<Result<(), Self::Error>> { |
| 81 | + self.inner.poll_ready(cx) |
| 82 | + } |
| 83 | + |
| 84 | + fn call(&mut self, req: Request<ReqB>) -> Self::Future { |
| 85 | + use futures::{FutureExt, TryFutureExt}; |
| 86 | + |
| 87 | + let Self { inner, metrics } = self; |
| 88 | + let metrics = metrics.clone(); |
| 89 | + let instrument = Box::new(|resp| Self::instrument_response(resp, metrics)); |
| 90 | + |
| 91 | + inner.call(req).map_ok(instrument).boxed() |
| 92 | + } |
| 93 | +} |
| 94 | + |
| 95 | +impl<S> RecordBodyData<S> { |
| 96 | + fn instrument_response<B>(resp: Response<B>, metrics: BodyDataMetrics) -> Response<BoxBody> |
| 97 | + where |
| 98 | + B: Body + Send + 'static, |
| 99 | + B::Data: Send + 'static, |
| 100 | + B::Error: Into<Error>, |
| 101 | + { |
| 102 | + resp.map(|b| super::body::Body::new(b, metrics)) |
| 103 | + .map(BoxBody::new) |
| 104 | + } |
| 105 | +} |
0 commit comments