Skip to content

Adding support for list rules API filter parameters #5417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [FEATURE] Querier/StoreGateway: Allow the tenant shard sizes to be a percent of total instances. #5393
* [FEATURE] Added the flag `-alertmanager.api-concurrency` to configure alert manager api concurrency limit. #5412
* [FEATURE] Store Gateway: Add `-store-gateway.sharding-ring.keep-instance-in-the-ring-on-shutdown` to skip unregistering instance from the ring in shutdown. #5421
* [FEATURE] Ruler: Support for filtering rules in the API. #5417
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323
Expand Down
2 changes: 1 addition & 1 deletion docs/contributing/how-integration-tests-work.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ Integration tests have `requires_docker` tag (`// +build requires_docker` line f

## Isolation

Each integration test runs in isolation. For each integration test, we do create a Docker network, start Cortex and its dependencies containers, push/query series to/from Cortex and run assertions on it. Once the test has done, both the Docker network and containers are terminated and deleted.
Each integration test runs in isolation. For each integration test, we do create a Docker network, start Cortex and its dependencies containers, push/query series to/from Cortex and run assertions on it. Once the test has done, both the Docker network and containers are terminated and deleted.
63 changes: 63 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,69 @@ type ServerStatus struct {
} `json:"data"`
}

type RuleFilter struct {
Namespaces []string
RuleGroupNames []string
RuleNames []string
RuleType string
}

func addQueryParams(urlValues url.Values, paramName string, params ...string) {
for _, paramValue := range params {
urlValues.Add(paramName, paramValue)
}
}

// GetPrometheusRulesWithFilter fetches the rules from the Prometheus endpoint /api/v1/rules.
func (c *Client) GetPrometheusRulesWithFilter(filter RuleFilter) ([]*ruler.RuleGroup, error) {
// Create HTTP request

req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/api/v1/rules", c.rulerAddress), nil)
if err != nil {
return nil, err
}
req.Header.Set("X-Scope-OrgID", c.orgID)

urlValues := req.URL.Query()
addQueryParams(urlValues, "file[]", filter.Namespaces...)
addQueryParams(urlValues, "rule_name[]", filter.RuleNames...)
addQueryParams(urlValues, "rule_group[]", filter.RuleGroupNames...)
addQueryParams(urlValues, "type", filter.RuleType)
req.URL.RawQuery = urlValues.Encode()

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}

// Decode the response.
type response struct {
Status string `json:"status"`
Data ruler.RuleDiscovery `json:"data"`
}

decoded := &response{}
if err := json.Unmarshal(body, decoded); err != nil {
return nil, err
}

if decoded.Status != "success" {
return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status)
}

return decoded.Data.RuleGroups, nil
}

// GetPrometheusRules fetches the rules from the Prometheus endpoint /api/v1/rules.
func (c *Client) GetPrometheusRules() ([]*ruler.RuleGroup, error) {
// Create HTTP request
Expand Down
160 changes: 160 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/x509/pkix"
"fmt"
"math"
"math/rand"
"net/http"
"os"
"path/filepath"
Expand All @@ -17,6 +18,8 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ruler"

"github.com/cortexproject/cortex/pkg/storage/tsdb"

"github.com/prometheus/common/model"
Expand Down Expand Up @@ -389,6 +392,163 @@ func TestRulerSharding(t *testing.T) {
assert.ElementsMatch(t, expectedNames, actualNames)
}

func TestRulerAPISharding(t *testing.T) {
const numRulesGroups = 100

random := rand.New(rand.NewSource(time.Now().UnixNano()))
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Generate multiple rule groups, with 1 rule each.
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
expectedNames := make([]string, numRulesGroups)
alertCount := 0
for i := 0; i < numRulesGroups; i++ {
num := random.Intn(100)
var ruleNode yaml.Node
var exprNode yaml.Node

ruleNode.SetString(fmt.Sprintf("rule_%d", i))
exprNode.SetString(strconv.Itoa(i))
ruleName := fmt.Sprintf("test_%d", i)

expectedNames[i] = ruleName
if num%2 == 0 {
alertCount++
ruleGroups[i] = rulefmt.RuleGroup{
Name: ruleName,
Interval: 60,
Rules: []rulefmt.RuleNode{{
Alert: ruleNode,
Expr: exprNode,
}},
}
} else {
ruleGroups[i] = rulefmt.RuleGroup{
Name: ruleName,
Interval: 60,
Rules: []rulefmt.RuleNode{{
Record: ruleNode,
Expr: exprNode,
}},
}
}
}

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
rulerFlags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
map[string]string{
// Since we're not going to run any rule, we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
},
)

// Start rulers.
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2)
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2))

// Upload rule groups to one of the rulers.
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
require.NoError(t, err)

namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"}
namespaceNameCount := make([]int, 5)
nsRand := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, ruleGroup := range ruleGroups {
index := nsRand.Intn(len(namespaceNames))
namespaceNameCount[index] = namespaceNameCount[index] + 1
require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index]))
}

// Wait until rulers have loaded all rules.
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))

// Since rulers have loaded all rules, we expect that rules have been sharded
// between the two rulers.
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))

testCases := map[string]struct {
filter e2ecortex.RuleFilter
resultCheckFn func(assert.TestingT, []*ruler.RuleGroup)
}{
"Filter for Alert Rules": {
filter: e2ecortex.RuleFilter{
RuleType: "alert",
},
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
assert.Len(t, ruleGroups, alertCount, "Expected %d rules but got %d", alertCount, len(ruleGroups))
},
},
"Filter for Recording Rules": {
filter: e2ecortex.RuleFilter{
RuleType: "record",
},
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
assert.Len(t, ruleGroups, numRulesGroups-alertCount, "Expected %d rules but got %d", numRulesGroups-alertCount, len(ruleGroups))
},
},
"Filter by Namespace Name": {
filter: e2ecortex.RuleFilter{
Namespaces: []string{namespaceNames[2]},
},
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
assert.Len(t, ruleGroups, namespaceNameCount[2], "Expected %d rules but got %d", namespaceNameCount[2], len(ruleGroups))
},
},
"Filter by Namespace Name and Alert Rules": {
filter: e2ecortex.RuleFilter{
RuleType: "alert",
Namespaces: []string{namespaceNames[2]},
},
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
for _, ruleGroup := range ruleGroups {
rule := ruleGroup.Rules[0].(map[string]interface{})
ruleType := rule["type"]
assert.Equal(t, "alerting", ruleType, "Expected 'alerting' rule type but got %s", ruleType)
}
},
},
"Filter by Rule Names": {
filter: e2ecortex.RuleFilter{
RuleNames: []string{"rule_3", "rule_64", "rule_99"},
},
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
ruleNames := []string{}
for _, ruleGroup := range ruleGroups {
rule := ruleGroup.Rules[0].(map[string]interface{})
ruleName := rule["name"]
ruleNames = append(ruleNames, ruleName.(string))

}
assert.Len(t, ruleNames, 3, "Expected %d rules but got %d", 3, len(ruleNames))
},
},
}
// For each test case, fetch the rules with configured filters, and ensure the results match.
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
actualGroups, err := c.GetPrometheusRulesWithFilter(tc.filter)
require.NoError(t, err)
tc.resultCheckFn(t, actualGroups)
})
}
}

func TestRulerAlertmanager(t *testing.T) {
var namespaceOne = "test_/encoded_+namespace/?"
ruleGroup := createTestRuleGroup(t)
Expand Down
47 changes: 45 additions & 2 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ruler

import (
"encoding/json"
"fmt"
io "io"
"net/http"
"net/url"
Expand Down Expand Up @@ -119,6 +120,26 @@ func respondError(logger log.Logger, w http.ResponseWriter, msg string) {
}
}

func respondBadRequest(logger log.Logger, w http.ResponseWriter, msg string) {
b, err := json.Marshal(&response{
Status: "error",
ErrorType: v1.ErrBadData,
Error: msg,
Data: nil,
})

if err != nil {
level.Error(logger).Log("msg", "error marshaling json response", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusBadRequest)
if n, err := w.Write(b); err != nil {
level.Error(logger).Log("msg", "error writing response", "bytesWritten", n, "err", err)
}
}

// API is used to handle HTTP requests for the ruler service
type API struct {
ruler *Ruler
Expand All @@ -145,8 +166,27 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {
return
}

if err := req.ParseForm(); err != nil {
level.Error(logger).Log("msg", "error parsing form/query params", "err", err)
respondBadRequest(logger, w, "error parsing form/query params")
return
}

typ := strings.ToLower(req.URL.Query().Get("type"))
if typ != "" && typ != alertingRuleFilter && typ != recordingRuleFilter {
respondBadRequest(logger, w, fmt.Sprintf("unsupported rule type %q", typ))
return
}

rulesRequest := RulesRequest{
RuleNames: req.Form["rule_name[]"],
RuleGroupNames: req.Form["rule_group[]"],
Files: req.Form["file[]"],
Type: typ,
}

w.Header().Set("Content-Type", "application/json")
rgs, err := a.ruler.GetRules(req.Context())
rgs, err := a.ruler.GetRules(req.Context(), rulesRequest)

if err != nil {
respondError(logger, w, err.Error())
Expand Down Expand Up @@ -238,7 +278,10 @@ func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) {
}

w.Header().Set("Content-Type", "application/json")
rgs, err := a.ruler.GetRules(req.Context())
rulesRequest := RulesRequest{
Type: alertingRuleFilter,
}
rgs, err := a.ruler.GetRules(req.Context(), rulesRequest)

if err != nil {
respondError(logger, w, err.Error())
Expand Down
Loading