Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl Config {
.to_layer::<classify::Response, _, Permitted>(),
)
.push(classify::NewClassify::layer_default())
.push_map_target(|(permit, http)| Permitted { permit, http })
.push_map_target(Permitted::from)
.push(inbound::policy::NewHttpPolicy::layer(
metrics.http_authz.clone(),
))
Expand Down Expand Up @@ -281,6 +281,17 @@ impl Param<metrics::EndpointLabels> for Permitted {
}
}

impl From<inbound::policy::Permitted<Http>> for Permitted {
fn from(
inbound::policy::Permitted { permit, target }: inbound::policy::Permitted<Http>,
) -> Self {
Self {
permit,
http: target,
}
}
}

// === TlsParams ===

impl<T> ExtractParam<tls::server::Timeout, T> for TlsParams {
Expand Down
95 changes: 51 additions & 44 deletions linkerd/app/gateway/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Gateway;
use inbound::{GatewayAddr, GatewayDomainInvalid};
use inbound::{policy::Permitted, GatewayAddr, GatewayDomainInvalid};
use linkerd_app_core::{
metrics::ServerLabel,
profiles,
Expand Down Expand Up @@ -81,50 +81,57 @@ impl Gateway {
R: Resolve<ConcreteAddr, Endpoint = Metadata, Error = Error>,
R::Resolution: Unpin,
{
let http = self
.outbound
.clone()
.with_stack(inner)
.push_http_cached(resolve)
.into_stack()
// Discard `T` and its associated client-specific metadata.
.push_map_target(Target::discard_parent)
.push(svc::ArcNewService::layer())
// Add headers to prevent loops.
.push(NewHttpGateway::layer(
self.inbound.identity().local_id().clone(),
))
.push_on_service(svc::LoadShed::layer())
.lift_new()
.push(svc::ArcNewService::layer())
// After protocol-downgrade, we need to build an inner stack for
// each request-level HTTP version.
.push(svc::NewOneshotRoute::layer_via(|t: &Target<T>| {
ByRequestVersion(t.clone())
}))
// Only permit gateway traffic to endpoints for which we have
// discovery information.
.push_filter(|(_, parent): (_, T)| -> Result<_, GatewayDomainInvalid> {
let routes = {
let mut profile =
svc::Param::<Option<watch::Receiver<profiles::Profile>>>::param(&parent)
let http =
self.outbound
.clone()
.with_stack(inner)
.push_http_cached(resolve)
.into_stack()
// Discard `T` and its associated client-specific metadata.
.push_map_target(Target::discard_parent)
.push(svc::ArcNewService::layer())
// Add headers to prevent loops.
.push(NewHttpGateway::layer(
self.inbound.identity().local_id().clone(),
))
.push_on_service(svc::LoadShed::layer())
.lift_new()
.push(svc::ArcNewService::layer())
// After protocol-downgrade, we need to build an inner stack for
// each request-level HTTP version.
.push(svc::NewOneshotRoute::layer_via(|t: &Target<T>| {
ByRequestVersion(t.clone())
}))
// Only permit gateway traffic to endpoints for which we have
// discovery information.
.push_filter(
|Permitted {
permit: _,
target: parent,
}: Permitted<T>|
-> Result<_, GatewayDomainInvalid> {
let routes = {
let mut profile = svc::Param::<
Option<watch::Receiver<profiles::Profile>>,
>::param(&parent)
.ok_or(GatewayDomainInvalid)?;
let init =
mk_routes(&profile.borrow_and_update()).ok_or(GatewayDomainInvalid)?;
outbound::http::spawn_routes(profile, init, mk_routes)
};

Ok(Target {
routes,
addr: parent.param(),
version: parent.param(),
parent,
})
})
.push(svc::ArcNewService::layer())
// Authorize requests to the gateway.
.push(self.inbound.authorize_http())
.arc_new_clone_http();
let init = mk_routes(&profile.borrow_and_update())
.ok_or(GatewayDomainInvalid)?;
outbound::http::spawn_routes(profile, init, mk_routes)
};

Ok(Target {
routes,
addr: parent.param(),
version: parent.param(),
parent,
})
},
)
.push(svc::ArcNewService::layer())
// Authorize requests to the gateway.
.push(self.inbound.authorize_http())
.arc_new_clone_http();

self.inbound
.clone()
Expand Down
18 changes: 9 additions & 9 deletions linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,12 @@ impl<C> Inbound<C> {
.push_on_service(svc::LoadShed::layer())
.push(svc::NewMapErr::layer_from_target::<LogicalError, _>())
.lift_new()
.check_new_new::<(policy::HttpRoutePermit, T), Logical>()
.check_new_new::<policy::Permitted<T>, Logical>()
.push(svc::ArcNewService::layer())
.push(svc::NewOneshotRoute::layer_via(|(permit, t): &(policy::HttpRoutePermit, T)| {
LogicalPerRequest::from((permit.clone(), t.clone()))
.push(svc::NewOneshotRoute::layer_via(|t: &policy::Permitted<T>| {
LogicalPerRequest::from(t)
}))
.check_new_service::<(policy::HttpRoutePermit, T), http::Request<http::BoxBody>>()
.check_new_service::<policy::Permitted<T>, http::Request<http::BoxBody>>()
.push(svc::ArcNewService::layer())
.push(policy::NewHttpPolicy::layer(rt.metrics.http_authz.clone()))
// Used by tap.
Expand All @@ -254,12 +254,12 @@ impl<C> Inbound<C> {

// === impl LogicalPerRequest ===

impl<T> From<(policy::HttpRoutePermit, T)> for LogicalPerRequest
impl<'a, T> From<&'a policy::Permitted<T>> for LogicalPerRequest
where
T: Param<Remote<ServerAddr>>,
T: Param<tls::ConditionalServerTls>,
{
fn from((permit, t): (policy::HttpRoutePermit, T)) -> Self {
fn from(policy::Permitted { permit, target }: &'a policy::Permitted<T>) -> Self {
let labels = [
("srv", &permit.labels.route.server.0),
("route", &permit.labels.route.route),
Expand All @@ -276,9 +276,9 @@ where
.collect::<std::collections::BTreeMap<_, _>>();

Self {
server: t.param(),
tls: t.param(),
permit,
server: target.param(),
tls: target.param(),
permit: permit.clone(),
labels: labels.into(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub use self::{
config::Config,
http::{
HttpInvalidPolicy, HttpRouteInvalidRedirect, HttpRouteNotFound, HttpRouteRedirect,
HttpRouteUnauthorized, NewHttpPolicy,
HttpRouteUnauthorized, NewHttpPolicy, Permitted,
},
tcp::NewTcpPolicy,
};
Expand Down
28 changes: 26 additions & 2 deletions linkerd/app/inbound/src/policy/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ struct ConnectionMeta {
tls: tls::ConditionalServerTls,
}

/// A `T`-typed target with policy enforced by a [`NewHttpPolicy<N>`] layer.
#[derive(Debug)]
pub struct Permitted<T> {
pub permit: HttpRoutePermit,
pub target: T,
}

#[derive(Debug, thiserror::Error)]
#[error("no route found for request")]
pub struct HttpRouteNotFound(());
Expand Down Expand Up @@ -138,7 +145,7 @@ macro_rules! try_fut {
impl<B, T, N, S> svc::Service<::http::Request<B>> for HttpPolicyService<T, N>
where
T: Clone,
N: svc::NewService<(HttpRoutePermit, T), Service = S>,
N: svc::NewService<Permitted<T>, Service = S>,
S: svc::Service<::http::Request<B>>,
S::Error: Into<Error>,
{
Expand Down Expand Up @@ -175,7 +182,10 @@ where

future::Either::Left(
self.inner
.new_service((permit, self.target.clone()))
.new_service(Permitted {
permit,
target: self.target.clone(),
})
.oneshot(req)
.err_into::<Error>(),
)
Expand Down Expand Up @@ -383,3 +393,17 @@ fn apply_grpc_filters<B>(route: &grpc::Policy, req: &mut ::http::Request<B>) ->

Ok(())
}

// === impl Permitted ===

/// An authorized `T`-typed target can produce `P`-typed parameters.
impl<T, P> svc::Param<P> for Permitted<T>
where
T: svc::Param<P>,
{
fn param(&self) -> P {
let Self { target, .. } = self;

target.param()
}
}
2 changes: 1 addition & 1 deletion linkerd/app/inbound/src/policy/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ macro_rules! new_svc {
policy,
connection: $conn,
metrics: HttpAuthzMetrics::default(),
inner: |(permit, _): (HttpRoutePermit, ())| {
inner: |Permitted { permit, target: () }: Permitted<()>| {
let f = $rsp;
svc::mk(move |req: ::http::Request<BoxBody>| {
futures::future::ready((f)(permit.clone(), req))
Expand Down
Loading