11use super :: { backend:: metrics as backend, retry} ;
2+ use crate :: { ParentRef , RouteRef } ;
23use linkerd_app_core:: {
34 metrics:: prom:: { self , EncodeLabelSetMut } ,
45 svc,
56} ;
6- use linkerd_http_prom:: record_response:: { self , StreamLabel } ;
7+ use linkerd_http_prom:: {
8+ body_data:: request:: { BodyDataMetrics , NewRecordBodyData , RequestBodyFamilies } ,
9+ record_response:: { self , StreamLabel } ,
10+ } ;
711
812pub use linkerd_http_prom:: record_response:: MkStreamLabel ;
913
@@ -23,6 +27,7 @@ pub struct RouteMetrics<R: StreamLabel, B: StreamLabel> {
2327 pub ( super ) retry : retry:: RouteRetryMetrics ,
2428 pub ( super ) requests : RequestMetrics < R > ,
2529 pub ( super ) backend : backend:: RouteBackendMetrics < B > ,
30+ pub ( super ) body_data : RequestBodyFamilies < labels:: Route > ,
2631}
2732
2833pub type HttpRouteMetrics = RouteMetrics < LabelHttpRouteRsp , LabelHttpRouteBackendRsp > ;
@@ -56,13 +61,26 @@ pub type NewRecordDuration<T, M, N> =
5661#[ derive( Clone , Debug ) ]
5762pub struct ExtractRecordDurationParams < M > ( pub M ) ;
5863
64+ #[ derive( Clone , Debug ) ]
65+ pub struct ExtractRecordBodyDataParams ( RequestBodyFamilies < labels:: Route > ) ;
66+
5967pub fn layer < T , N > (
6068 metrics : & RequestMetrics < T :: StreamLabel > ,
61- ) -> impl svc:: Layer < N , Service = NewRecordDuration < T , RequestMetrics < T :: StreamLabel > , N > > + Clone
69+ body_data : & RequestBodyFamilies < labels:: Route > ,
70+ ) -> impl svc:: Layer <
71+ N ,
72+ Service = NewRecordBodyData <
73+ ExtractRecordBodyDataParams ,
74+ NewRecordDuration < T , RequestMetrics < T :: StreamLabel > , N > ,
75+ > ,
76+ >
6277where
6378 T : Clone + MkStreamLabel ,
6479{
65- NewRecordDuration :: layer_via ( ExtractRecordDurationParams ( metrics. clone ( ) ) )
80+ let record = NewRecordDuration :: layer_via ( ExtractRecordDurationParams ( metrics. clone ( ) ) ) ;
81+ let body_data = NewRecordBodyData :: layer_via ( ExtractRecordBodyDataParams ( body_data. clone ( ) ) ) ;
82+
83+ svc:: layers ( ) . push ( record) . push ( body_data)
6684}
6785
6886// === impl RouteMetrics ===
@@ -89,6 +107,7 @@ impl<R: StreamLabel, B: StreamLabel> Default for RouteMetrics<R, B> {
89107 requests : Default :: default ( ) ,
90108 backend : Default :: default ( ) ,
91109 retry : Default :: default ( ) ,
110+ body_data : Default :: default ( ) ,
92111 }
93112 }
94113}
@@ -99,6 +118,7 @@ impl<R: StreamLabel, B: StreamLabel> Clone for RouteMetrics<R, B> {
99118 requests : self . requests . clone ( ) ,
100119 backend : self . backend . clone ( ) ,
101120 retry : self . retry . clone ( ) ,
121+ body_data : self . body_data . clone ( ) ,
102122 }
103123 }
104124}
@@ -113,11 +133,13 @@ impl<R: StreamLabel, B: StreamLabel> RouteMetrics<R, B> {
113133 ) ;
114134
115135 let retry = retry:: RouteRetryMetrics :: register ( reg. sub_registry_with_prefix ( "retry" ) ) ;
136+ let body_data = RequestBodyFamilies :: register ( reg) ;
116137
117138 Self {
118139 requests,
119140 backend,
120141 retry,
142+ body_data,
121143 }
122144 }
123145
@@ -147,6 +169,27 @@ where
147169 }
148170}
149171
172+ // === impl ExtractRecordBodyDataParams ===
173+
174+ impl < T > svc:: ExtractParam < BodyDataMetrics , T > for ExtractRecordBodyDataParams
175+ where
176+ T : svc:: Param < ParentRef > + svc:: Param < RouteRef > ,
177+ {
178+ fn extract_param ( & self , target : & T ) -> BodyDataMetrics {
179+ let Self ( families) = self ;
180+
181+ #[ allow( unreachable_code) ] // XXX(kate); remove this once `todo!()` is addressed.
182+ let labels = {
183+ let parent = target. param ( ) ;
184+ let route = target. param ( ) ;
185+ let uri = todo ! ( ) ;
186+ labels:: Route :: new ( parent, route, uri)
187+ } ;
188+
189+ families. get ( & labels)
190+ }
191+ }
192+
150193// === impl LabelHttpRsp ===
151194
152195impl < P > From < P > for LabelHttpRsp < P > {
0 commit comments