Skip to content

Commit db7034d

Browse files
Minimize missed rule group evaluations
Signed-off-by: Anand Rajagopal <[email protected]>
1 parent 1e8af09 commit db7034d

File tree

13 files changed

+1253
-132
lines changed

13 files changed

+1253
-132
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [CHANGE] Querier: Deprecate and enable by default `querier.ingester-metadata-streaming` flag. #6147
1010
* [CHANGE] QueryFrontend/QueryScheduler: Deprecate `-querier.max-outstanding-requests-per-tenant` and `-query-scheduler.max-outstanding-requests-per-tenant` flags. Use frontend.max-outstanding-requests-per-tenant instead. #6146
1111
* [CHANGE] Ingesters: Enable 'snappy-block' compression on ingester clients by default. #6148
12+
* [FEATURE] Ruler: Minimize rule group missed evaluations via `-ruler.enable-ha` flag. #6129
1213
* [FEATURE] Ingester/Distributor: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986 #6010 #6020
1314
* [FEATURE] Querier: Enable querying native histogram chunks. #5944 #6031
1415
* [FEATURE] Query Frontend: Support native histogram in query frontend response. #5996 #6043

docs/configuration/config-file-reference.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4335,6 +4335,10 @@ ring:
43354335
# CLI flag: -ruler.ring.final-sleep
43364336
[final_sleep: <duration> | default = 0s]
43374337
4338+
# Keep instance in the ring on shut down.
4339+
# CLI flag: -ruler.ring.keep-instance-in-the-ring-on-shutdown
4340+
[keep_instance_in_the_ring_on_shutdown: <boolean> | default = false]
4341+
43384342
# Period with which to attempt to flush rule groups.
43394343
# CLI flag: -ruler.flush-period
43404344
[flush_period: <duration> | default = 1m]
@@ -4369,6 +4373,14 @@ ring:
43694373
# Disable the rule_group label on exported metrics
43704374
# CLI flag: -ruler.disable-rule-group-label
43714375
[disable_rule_group_label: <boolean> | default = false]
4376+
4377+
# Enable high availability
4378+
# CLI flag: -ruler.enable-ha-evaluation
4379+
[enable_ha_evaluation: <boolean> | default = false]
4380+
4381+
# Timeout for fanout calls to other rulers
4382+
# CLI flag: -ruler.list-rules-fanout-timeout
4383+
[list_rules_fanout_timeout: <duration> | default = 2m]
43724384
```
43734385

43744386
### `ruler_storage_config`

integration/ruler_test.go

Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,6 +1093,281 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
10931093
})
10941094
}
10951095

1096+
func TestRulerHA(t *testing.T) {
1097+
const numRulesGroups = 20
1098+
1099+
random := rand.New(rand.NewSource(time.Now().UnixNano()))
1100+
s, err := e2e.NewScenario(networkName)
1101+
require.NoError(t, err)
1102+
defer s.Close()
1103+
1104+
// Generate multiple rule groups, with 1 rule each.
1105+
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
1106+
expectedNames := make([]string, numRulesGroups)
1107+
alertCount := 0
1108+
evalInterval, _ := model.ParseDuration("5s")
1109+
for i := 0; i < numRulesGroups; i++ {
1110+
num := random.Intn(10)
1111+
var ruleNode yaml.Node
1112+
var exprNode yaml.Node
1113+
1114+
ruleNode.SetString(fmt.Sprintf("rule_%d", i))
1115+
exprNode.SetString(strconv.Itoa(i))
1116+
ruleName := fmt.Sprintf("test_%d", i)
1117+
1118+
expectedNames[i] = ruleName
1119+
1120+
if num%2 == 0 {
1121+
alertCount++
1122+
ruleGroups[i] = rulefmt.RuleGroup{
1123+
Name: ruleName,
1124+
Interval: evalInterval,
1125+
Rules: []rulefmt.RuleNode{{
1126+
Alert: ruleNode,
1127+
Expr: exprNode,
1128+
}},
1129+
}
1130+
} else {
1131+
ruleGroups[i] = rulefmt.RuleGroup{
1132+
Name: ruleName,
1133+
Interval: evalInterval,
1134+
Rules: []rulefmt.RuleNode{{
1135+
Record: ruleNode,
1136+
Expr: exprNode,
1137+
}},
1138+
}
1139+
}
1140+
}
1141+
1142+
// Start dependencies.
1143+
consul := e2edb.NewConsul()
1144+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
1145+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1146+
1147+
// Configure the ruler.
1148+
overrides := map[string]string{
1149+
// Since we're not going to run any rule, we don't need the
1150+
// store-gateway to be configured to a valid address.
1151+
"-querier.store-gateway-addresses": "localhost:12345",
1152+
// Enable the bucket index so we can skip the initial bucket scan.
1153+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
1154+
"-ruler.ring.replication-factor": "2",
1155+
"-ruler.enable-ha-evaluation": "true",
1156+
"-ruler.poll-interval": "5s",
1157+
}
1158+
1159+
rulerFlags := mergeFlags(
1160+
BlocksStorageFlags(),
1161+
RulerFlags(),
1162+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
1163+
overrides,
1164+
)
1165+
1166+
// Start rulers.
1167+
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
1168+
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
1169+
ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "")
1170+
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2, ruler3)
1171+
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3))
1172+
1173+
// Upload rule groups to one of the rulers.
1174+
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
1175+
require.NoError(t, err)
1176+
namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"}
1177+
namespaceNameCount := make([]int, 5)
1178+
nsRand := rand.New(rand.NewSource(time.Now().UnixNano()))
1179+
for _, ruleGroup := range ruleGroups {
1180+
index := nsRand.Intn(len(namespaceNames))
1181+
namespaceNameCount[index] = namespaceNameCount[index] + 1
1182+
require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index]))
1183+
}
1184+
1185+
// Wait until rulers have loaded all rules.
1186+
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))
1187+
1188+
ruler1SyncTotal, err := ruler1.SumMetrics([]string{"cortex_ruler_sync_rules_total"})
1189+
require.NoError(t, err)
1190+
ruler3SyncTotal, err := ruler3.SumMetrics([]string{"cortex_ruler_sync_rules_total"})
1191+
require.NoError(t, err)
1192+
1193+
err = consul.Kill() // kill consul so the rulers will operate with the tokens/instances they already have
1194+
require.NoError(t, err)
1195+
1196+
err = ruler2.Kill()
1197+
require.NoError(t, err)
1198+
1199+
// wait for another sync
1200+
require.NoError(t, ruler1.WaitSumMetrics(e2e.Greater(ruler1SyncTotal[0]), "cortex_ruler_sync_rules_total"))
1201+
require.NoError(t, ruler3.WaitSumMetrics(e2e.Greater(ruler3SyncTotal[0]), "cortex_ruler_sync_rules_total"))
1202+
1203+
rulers = e2ecortex.NewCompositeCortexService(ruler1, ruler3)
1204+
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))
1205+
1206+
t.Log(ruler1.SumMetrics([]string{"cortex_prometheus_rule_group_rules"}))
1207+
t.Log(ruler3.SumMetrics([]string{"cortex_prometheus_rule_group_rules"}))
1208+
1209+
c3, err := e2ecortex.NewClient("", "", "", ruler3.HTTPEndpoint(), "user-1")
1210+
require.NoError(t, err)
1211+
1212+
ruler1Rules, err := c.GetRuleGroups()
1213+
require.NoError(t, err)
1214+
1215+
ruler3Rules, err := c3.GetRuleGroups()
1216+
require.NoError(t, err)
1217+
1218+
ruleCount := 0
1219+
countFunc := func(ruleGroups map[string][]rulefmt.RuleGroup) {
1220+
for _, v := range ruleGroups {
1221+
ruleCount += len(v)
1222+
}
1223+
}
1224+
1225+
countFunc(ruler1Rules)
1226+
require.Equal(t, numRulesGroups, ruleCount)
1227+
ruleCount = 0
1228+
countFunc(ruler3Rules)
1229+
require.Equal(t, numRulesGroups, ruleCount)
1230+
1231+
results, err := c.GetPrometheusRules(e2ecortex.RuleFilter{})
1232+
require.NoError(t, err)
1233+
require.Equal(t, numRulesGroups, len(results))
1234+
}
1235+
1236+
func TestRulerAPIRulerDownFails(t *testing.T) {
1237+
const numRulesGroups = 20
1238+
1239+
random := rand.New(rand.NewSource(time.Now().UnixNano()))
1240+
s, err := e2e.NewScenario(networkName)
1241+
require.NoError(t, err)
1242+
defer s.Close()
1243+
1244+
// Generate multiple rule groups, with 1 rule each.
1245+
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
1246+
expectedNames := make([]string, numRulesGroups)
1247+
alertCount := 0
1248+
evalInterval, _ := model.ParseDuration("5s")
1249+
for i := 0; i < numRulesGroups; i++ {
1250+
num := random.Intn(10)
1251+
var ruleNode yaml.Node
1252+
var exprNode yaml.Node
1253+
1254+
ruleNode.SetString(fmt.Sprintf("rule_%d", i))
1255+
exprNode.SetString(strconv.Itoa(i))
1256+
ruleName := fmt.Sprintf("test_%d", i)
1257+
1258+
expectedNames[i] = ruleName
1259+
1260+
if num%2 == 0 {
1261+
alertCount++
1262+
ruleGroups[i] = rulefmt.RuleGroup{
1263+
Name: ruleName,
1264+
Interval: evalInterval,
1265+
Rules: []rulefmt.RuleNode{{
1266+
Alert: ruleNode,
1267+
Expr: exprNode,
1268+
}},
1269+
}
1270+
} else {
1271+
ruleGroups[i] = rulefmt.RuleGroup{
1272+
Name: ruleName,
1273+
Interval: evalInterval,
1274+
Rules: []rulefmt.RuleNode{{
1275+
Record: ruleNode,
1276+
Expr: exprNode,
1277+
}},
1278+
}
1279+
}
1280+
}
1281+
1282+
// Start dependencies.
1283+
consul := e2edb.NewConsul()
1284+
minio := e2edb.NewMinio(9000, rulestoreBucketName)
1285+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1286+
1287+
// Configure the ruler.
1288+
overrides := map[string]string{
1289+
// Since we're not going to run any rule, we don't need the
1290+
// store-gateway to be configured to a valid address.
1291+
"-querier.store-gateway-addresses": "localhost:12345",
1292+
// Enable the bucket index so we can skip the initial bucket scan.
1293+
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
1294+
"-ruler.ring.replication-factor": "1",
1295+
"-ruler.poll-interval": "5s",
1296+
}
1297+
1298+
rulerFlags := mergeFlags(
1299+
BlocksStorageFlags(),
1300+
RulerFlags(),
1301+
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
1302+
overrides,
1303+
)
1304+
1305+
// Start rulers.
1306+
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
1307+
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
1308+
ruler3 := e2ecortex.NewRuler("ruler-3", consul.NetworkHTTPEndpoint(), rulerFlags, "")
1309+
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2, ruler3)
1310+
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2, ruler3))
1311+
1312+
// Upload rule groups to one of the rulers.
1313+
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
1314+
require.NoError(t, err)
1315+
namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"}
1316+
namespaceNameCount := make([]int, 5)
1317+
nsRand := rand.New(rand.NewSource(time.Now().UnixNano()))
1318+
for _, ruleGroup := range ruleGroups {
1319+
index := nsRand.Intn(len(namespaceNames))
1320+
namespaceNameCount[index] = namespaceNameCount[index] + 1
1321+
require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index]))
1322+
}
1323+
1324+
// Wait until rulers have loaded all rules.
1325+
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))
1326+
1327+
ruler1SyncTotal, err := ruler1.SumMetrics([]string{"cortex_ruler_sync_rules_total"})
1328+
require.NoError(t, err)
1329+
ruler3SyncTotal, err := ruler3.SumMetrics([]string{"cortex_ruler_sync_rules_total"})
1330+
require.NoError(t, err)
1331+
1332+
err = consul.Kill() // kill consul so the rulers will operate with the tokens/instances they already have
1333+
require.NoError(t, err)
1334+
1335+
err = ruler2.Kill()
1336+
require.NoError(t, err)
1337+
1338+
// wait for another sync
1339+
require.NoError(t, ruler1.WaitSumMetrics(e2e.Greater(ruler1SyncTotal[0]), "cortex_ruler_sync_rules_total"))
1340+
require.NoError(t, ruler3.WaitSumMetrics(e2e.Greater(ruler3SyncTotal[0]), "cortex_ruler_sync_rules_total"))
1341+
1342+
t.Log(ruler1.SumMetrics([]string{"cortex_prometheus_rule_group_rules"}))
1343+
t.Log(ruler3.SumMetrics([]string{"cortex_prometheus_rule_group_rules"}))
1344+
1345+
c3, err := e2ecortex.NewClient("", "", "", ruler3.HTTPEndpoint(), "user-1")
1346+
require.NoError(t, err)
1347+
1348+
ruler1Rules, err := c.GetRuleGroups()
1349+
require.NoError(t, err)
1350+
1351+
ruler3Rules, err := c3.GetRuleGroups()
1352+
require.NoError(t, err)
1353+
1354+
ruleCount := 0
1355+
countFunc := func(ruleGroups map[string][]rulefmt.RuleGroup) {
1356+
for _, v := range ruleGroups {
1357+
ruleCount += len(v)
1358+
}
1359+
}
1360+
1361+
countFunc(ruler1Rules)
1362+
require.Equal(t, numRulesGroups, ruleCount)
1363+
ruleCount = 0
1364+
countFunc(ruler3Rules)
1365+
require.Equal(t, numRulesGroups, ruleCount)
1366+
1367+
_, err = c.GetPrometheusRules(e2ecortex.RuleFilter{})
1368+
require.Error(t, err)
1369+
}
1370+
10961371
func TestRulerKeepFiring(t *testing.T) {
10971372
s, err := e2e.NewScenario(networkName)
10981373
require.NoError(t, err)

pkg/ring/replication_set.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ring
22

33
import (
44
"context"
5+
"fmt"
56
"sort"
67
"time"
78
)
@@ -24,6 +25,7 @@ type ReplicationSet struct {
2425
// MaxErrors and returning early otherwise. zoneResultsQuorum allows only include
2526
// results from zones that already reach quorum to improve performance.
2627
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResultsQuorum bool, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) {
28+
fmt.Println("max unavailable zones, max errors, length of instances -------->", r.MaxUnavailableZones, r.MaxErrors, len(r.Instances))
2729
type instanceResult struct {
2830
res interface{}
2931
err error

pkg/ruler/client_pool_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/cortexproject/cortex/pkg/util/flagext"
1616
"github.com/cortexproject/cortex/pkg/util/grpcclient"
17+
"github.com/cortexproject/cortex/pkg/util/services"
1718
)
1819

1920
func Test_newRulerClientFactory(t *testing.T) {
@@ -63,6 +64,12 @@ func Test_newRulerClientFactory(t *testing.T) {
6364

6465
type mockRulerServer struct{}
6566

67+
func (m *mockRulerServer) LivenessCheck(ctx context.Context, request *LivenessCheckRequest) (*LivenessCheckResponse, error) {
68+
return &LivenessCheckResponse{
69+
State: int32(services.Running),
70+
}, nil
71+
}
72+
6673
func (m *mockRulerServer) Rules(context.Context, *RulesRequest) (*RulesResponse, error) {
6774
return &RulesResponse{}, nil
6875
}

pkg/ruler/merger.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
// mergeGroupStateDesc removes duplicates from the provided []*GroupStateDesc by keeping the GroupStateDesc with the
1010
// latest information. It uses the EvaluationTimestamp of the GroupStateDesc and the EvaluationTimestamp of the
1111
// ActiveRules in a GroupStateDesc to determine the which GroupStateDesc has the latest information.
12-
func mergeGroupStateDesc(in []*GroupStateDesc) []*GroupStateDesc {
12+
func dedupStateDesc(in []*GroupStateDesc) []*GroupStateDesc {
1313
states := make(map[string]*GroupStateDesc)
1414
rgTime := make(map[string]time.Time)
1515
for _, state := range in {

pkg/ruler/merger_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func TestMergeGroupStateDesc(t *testing.T) {
9898

9999
for name, tc := range testCases {
100100
t.Run(name, func(t *testing.T) {
101-
out := mergeGroupStateDesc(tc.input)
101+
out := dedupStateDesc(tc.input)
102102
slices.SortFunc(out, func(a, b *GroupStateDesc) int {
103103
fileCompare := strings.Compare(a.Group.Namespace, b.Group.Namespace)
104104
if fileCompare != 0 {

0 commit comments

Comments
 (0)