Skip to content

Add max tenant config to tenant federation #6493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* [FEATURE] Query Frontend: Support an exemplar federated query when `-tenant-federation.enabled=true`. #6455
* [FEATURE] Ingester/StoreGateway: Add support for cache regex query matchers via `-ingester.matchers-cache-max-items` and `-blocks-storage.bucket-store.matchers-cache-max-items`. #6477 #6491
* [ENHANCEMENT] Query Frontend: Add a `source` label to query stat metrics. #6470
* [ENHANCEMENT] Query Frontend: Add a flag `-tenant-federation.max-tenant` to limit the number of tenants for federated query. #6493
* [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449
* [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ tenant_federation:
# CLI flag: -tenant-federation.max-concurrent
[max_concurrent: <int> | default = 16]

# A maximum number of tenants to query at once. 0 means no limit.
# CLI flag: -tenant-federation.max-tenant
[max_tenant: <int> | default = 0]

# The ruler_config configures the Cortex ruler.
[ruler: <ruler_config>]

Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (t *Cortex) initQueryFrontend() (serv services.Service, err error) {
// Wrap roundtripper into Tripperware.
roundTripper = t.QueryFrontendTripperware(roundTripper)

handler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util_log.Logger, prometheus.DefaultRegisterer)
handler := transport.NewHandler(t.Cfg.Frontend.Handler, t.Cfg.TenantFederation, roundTripper, util_log.Logger, prometheus.DefaultRegisterer)
t.API.RegisterQueryFrontendHandler(handler)

if frontendV1 != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cortexproject/cortex/pkg/frontend/transport"
frontendv1 "github.com/cortexproject/cortex/pkg/frontend/v1"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/util/concurrency"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -279,7 +280,7 @@ func testFrontend(t *testing.T, config CombinedFrontendConfig, handler http.Hand
r.PathPrefix("/").Handler(middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(transport.NewHandler(config.Handler, rt, logger, nil)))
).Wrap(transport.NewHandler(config.Handler, tenantfederation.Config{}, rt, logger, nil)))

httpServer := http.Server{
Handler: r,
Expand Down
28 changes: 21 additions & 7 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"google.golang.org/grpc/status"

querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -33,6 +34,8 @@ const (
// StatusClientClosedRequest is the status code for when a client request cancellation of a http request
StatusClientClosedRequest = 499
ServiceTimingHeaderName = "Server-Timing"

errTooManyTenants = "too many tenants, max: %d, actual: %d"
)

var (
Expand Down Expand Up @@ -84,9 +87,10 @@ func (cfg *HandlerConfig) RegisterFlags(f *flag.FlagSet) {
// Handler accepts queries and forwards them to RoundTripper. It can log slow queries,
// but all other logic is inside the RoundTripper.
type Handler struct {
cfg HandlerConfig
log log.Logger
roundTripper http.RoundTripper
cfg HandlerConfig
tenantFederationCfg tenantfederation.Config
log log.Logger
roundTripper http.RoundTripper

// Metrics.
querySeconds *prometheus.CounterVec
Expand All @@ -101,11 +105,12 @@ type Handler struct {
}

// NewHandler creates a new frontend handler.
func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) *Handler {
func NewHandler(cfg HandlerConfig, tenantFederationCfg tenantfederation.Config, roundTripper http.RoundTripper, log log.Logger, reg prometheus.Registerer) *Handler {
h := &Handler{
cfg: cfg,
log: log,
roundTripper: roundTripper,
cfg: cfg,
tenantFederationCfg: tenantFederationCfg,
log: log,
roundTripper: roundTripper,
}

if cfg.QueryStatsEnabled {
Expand Down Expand Up @@ -185,6 +190,15 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if err != nil {
return
}

if f.tenantFederationCfg.Enabled {
maxTenant := f.tenantFederationCfg.MaxTenant
if maxTenant > 0 && len(tenantIDs) > maxTenant {
http.Error(w, fmt.Errorf(errTooManyTenants, maxTenant, len(tenantIDs)).Error(), http.StatusBadRequest)
return
}
}

userID := tenant.JoinTenantIDs(tenantIDs)

// Initialise the stats in the context and make sure it's propagated
Expand Down
106 changes: 104 additions & 2 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/codes"

querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/tenant"
util_api "github.com/cortexproject/cortex/pkg/util/api"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)
Expand Down Expand Up @@ -178,6 +181,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
}, nil
})
userID := "12345"
tenantFederationCfg := tenantfederation.Config{}
for _, tt := range []struct {
name string
cfg HandlerConfig
Expand Down Expand Up @@ -379,7 +383,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
} {
t.Run(tt.name, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
handler := NewHandler(tt.cfg, tt.roundTripperFunc, log.NewNopLogger(), reg)
handler := NewHandler(tt.cfg, tenantFederationCfg, tt.roundTripperFunc, log.NewNopLogger(), reg)

ctx := user.InjectOrgID(context.Background(), userID)
req := httptest.NewRequest("GET", "/", nil)
Expand Down Expand Up @@ -413,7 +417,7 @@ func TestHandler_ServeHTTP(t *testing.T) {
func TestReportQueryStatsFormat(t *testing.T) {
outputBuf := bytes.NewBuffer(nil)
logger := log.NewSyncLogger(log.NewLogfmtLogger(outputBuf))
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, http.DefaultTransport, logger, nil)
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, tenantfederation.Config{}, http.DefaultTransport, logger, nil)
userID := "fake"
req, _ := http.NewRequest(http.MethodGet, "http://localhost:8080/prometheus/api/v1/query", nil)
resp := &http.Response{ContentLength: 1000}
Expand Down Expand Up @@ -506,3 +510,101 @@ func TestReportQueryStatsFormat(t *testing.T) {
})
}
}

func Test_TenantFederation_MaxTenant(t *testing.T) {
// set a multi tenant resolver
tenant.WithDefaultResolver(tenant.NewMultiResolver())

roundTripper := roundTripperFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("{}")),
}, nil
})

tests := []struct {
name string
cfg tenantfederation.Config
orgId string
expectedStatusCode int
expectedErrMsg string
}{
{
name: "one tenant",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 0,
},
orgId: "org1",
expectedStatusCode: http.StatusOK,
},
{
name: "less than max tenant",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 3,
},
orgId: "org1|org2",
expectedStatusCode: http.StatusOK,
},
{
name: "equal to max tenant",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 2,
},
orgId: "org1|org2",
expectedStatusCode: http.StatusOK,
},
{
name: "exceeds max tenant",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 2,
},
orgId: "org1|org2|org3",
expectedStatusCode: http.StatusBadRequest,
expectedErrMsg: "too many tenants, max: 2, actual: 3",
},
{
name: "no org Id",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 0,
},
orgId: "",
expectedStatusCode: http.StatusUnauthorized,
expectedErrMsg: "no org id",
},
{
name: "no limit",
cfg: tenantfederation.Config{
Enabled: true,
MaxTenant: 0,
},
orgId: "org1|org2|org3",
expectedStatusCode: http.StatusOK,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
handler := NewHandler(HandlerConfig{}, test.cfg, roundTripper, log.NewNopLogger(), nil)
handlerWithAuth := middleware.Merge(middleware.AuthenticateUser).Wrap(handler)

req := httptest.NewRequest("GET", "http://fake", nil)
req.Header.Set("X-Scope-OrgId", test.orgId)
resp := httptest.NewRecorder()

handlerWithAuth.ServeHTTP(resp, req)

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, test.expectedStatusCode, resp.Code)

if test.expectedErrMsg != "" {
require.Contains(t, string(body), test.expectedErrMsg)
}
})
}
}
5 changes: 4 additions & 1 deletion pkg/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/cortexproject/cortex/pkg/frontend/transport"
"github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb"
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
"github.com/cortexproject/cortex/pkg/scheduler/queue"
"github.com/cortexproject/cortex/pkg/util/flagext"
Expand Down Expand Up @@ -264,14 +265,16 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a

// Default HTTP handler config.
handlerCfg := transport.HandlerConfig{}
tenantFederationCfg := tenantfederation.Config{}

flagext.DefaultValues(&handlerCfg)

rt := transport.AdaptGrpcRoundTripperToHTTPRoundTripper(v1)
r := mux.NewRouter()
r.PathPrefix("/").Handler(middleware.Merge(
middleware.AuthenticateUser,
middleware.Tracer{},
).Wrap(transport.NewHandler(handlerCfg, rt, logger, nil)))
).Wrap(transport.NewHandler(handlerCfg, tenantFederationCfg, rt, logger, nil)))

httpServer := http.Server{
Handler: r,
Expand Down
3 changes: 3 additions & 0 deletions pkg/querier/tenantfederation/tenant_federation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ type Config struct {
Enabled bool `yaml:"enabled"`
// MaxConcurrent The number of workers used for processing federated query.
MaxConcurrent int `yaml:"max_concurrent"`
// MaxTenant A maximum number of tenants to query at once.
MaxTenant int `yaml:"max_tenant"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "tenant-federation.enabled", false, "If enabled on all Cortex services, queries can be federated across multiple tenants. The tenant IDs involved need to be specified separated by a `|` character in the `X-Scope-OrgID` header (experimental).")
f.IntVar(&cfg.MaxConcurrent, "tenant-federation.max-concurrent", defaultMaxConcurrency, "The number of workers used to process each federated query.")
f.IntVar(&cfg.MaxTenant, "tenant-federation.max-tenant", 0, "A maximum number of tenants to query at once. 0 means no limit.")
}
Loading