Skip to content

Commit aeae7dc

Browse files
[FIXED] Consistent filtered consumer state reporting
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 55efd1d commit aeae7dc

File tree

3 files changed

+85
-14
lines changed

3 files changed

+85
-14
lines changed

server/consumer.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -2983,13 +2983,14 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
29832983
}
29842984
}
29852985

2986-
// If we are replicated, we need to pull certain data from our store.
2987-
if rg != nil && rg.node != nil && o.store != nil {
2986+
// We always need to pull certain data from our store.
2987+
if o.store != nil {
29882988
state, err := o.store.BorrowState()
29892989
if err != nil {
29902990
o.mu.Unlock()
29912991
return nil
29922992
}
2993+
29932994
// If we are the leader we could have o.sseq that is skipped ahead.
29942995
// To maintain consistency in reporting (e.g. jsz) we always take the state for our delivered/ackfloor stream sequence.
29952996
// Only use skipped ahead o.sseq if we're a new consumer and have not yet replicated this state yet.

server/jetstream_cluster_3_test.go

+3-8
Original file line numberDiff line numberDiff line change
@@ -5474,14 +5474,9 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
54745474
t.Helper()
54755475
require_Equal(t, a.Delivered.Consumer, 10)
54765476
require_Equal(t, a.Delivered.Stream, 10)
5477-
// If replicated, agreed upon state is used. Otherwise, o.asflr and o.adflr would be skipped ahead for R1.
5478-
if replicated {
5479-
require_Equal(t, a.AckFloor.Consumer, 0)
5480-
require_Equal(t, a.AckFloor.Stream, 0)
5481-
} else {
5482-
require_Equal(t, a.AckFloor.Consumer, 10)
5483-
require_Equal(t, a.AckFloor.Stream, 10)
5484-
}
5477+
// Agreed upon state is always used. Otherwise, o.asflr and o.adflr would be skipped ahead.
5478+
require_Equal(t, a.AckFloor.Consumer, 0)
5479+
require_Equal(t, a.AckFloor.Stream, 0)
54855480
require_Equal(t, a.NumPending, 40)
54865481
require_Equal(t, a.NumRedelivered, 10)
54875482
a.Cluster, b.Cluster = nil, nil

server/jetstream_consumer_test.go

+79-4
Original file line numberDiff line numberDiff line change
@@ -6467,7 +6467,7 @@ func TestJetStreamConsumerPendingLowerThanStreamFirstSeq(t *testing.T) {
64676467
})
64686468
require_NoError(t, err)
64696469

6470-
for i := 0; i < 100; i++ {
6470+
for i := 0; i < 10; i++ {
64716471
sendStreamMsg(t, nc, "foo", "msg")
64726472
}
64736473

@@ -6482,7 +6482,8 @@ func TestJetStreamConsumerPendingLowerThanStreamFirstSeq(t *testing.T) {
64826482

64836483
sub := natsSubSync(t, nc, inbox)
64846484
for i := 0; i < 10; i++ {
6485-
natsNexMsg(t, sub, time.Second)
6485+
msg := natsNexMsg(t, sub, time.Second)
6486+
require_NoError(t, msg.AckSync())
64866487
}
64876488

64886489
acc, err := s.lookupAccount(globalAccountName)
@@ -6508,6 +6509,13 @@ func TestJetStreamConsumerPendingLowerThanStreamFirstSeq(t *testing.T) {
65086509
require_True(t, si.State.FirstSeq == 1_000_000)
65096510
require_True(t, si.State.LastSeq == 999_999)
65106511

6512+
acc, err = s.lookupAccount(globalAccountName)
6513+
require_NoError(t, err)
6514+
mset, err = acc.lookupStream("TEST")
6515+
require_NoError(t, err)
6516+
o = mset.lookupConsumer("dur")
6517+
require_True(t, o != nil)
6518+
65116519
natsSubSync(t, nc, inbox)
65126520
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
65136521
ci, err := js.ConsumerInfo("TEST", "dur")
@@ -6517,8 +6525,18 @@ func TestJetStreamConsumerPendingLowerThanStreamFirstSeq(t *testing.T) {
65176525
if ci.NumAckPending != 0 {
65186526
return fmt.Errorf("NumAckPending should be 0, got %v", ci.NumAckPending)
65196527
}
6520-
if ci.Delivered.Stream != 999_999 {
6521-
return fmt.Errorf("Delivered.Stream should be 999,999, got %v", ci.Delivered.Stream)
6528+
// Delivered stays the same because it reflects what was actually delivered.
6529+
// And must not be influenced by purges/compacts.
6530+
if ci.Delivered.Stream != 10 {
6531+
return fmt.Errorf("Delivered.Stream should be 10, got %v", ci.Delivered.Stream)
6532+
}
6533+
6534+
// Starting sequence should be skipped ahead, respecting the compact.
6535+
o.mu.RLock()
6536+
sseq := o.sseq
6537+
o.mu.RUnlock()
6538+
if sseq != 1_000_000 {
6539+
return fmt.Errorf("o.sseq should be 1,000,000, got %v", sseq)
65226540
}
65236541
return nil
65246542
})
@@ -9531,3 +9549,60 @@ func TestJetStreamConsumerDeliverAllOverlappingFilterSubjects(t *testing.T) {
95319549
}
95329550
}
95339551
}
9552+
9553+
func TestJetStreamConsumerStateAlwaysFromStore(t *testing.T) {
9554+
s := RunBasicJetStreamServer(t)
9555+
defer s.Shutdown()
9556+
9557+
nc, js := jsClientConnect(t, s)
9558+
defer nc.Close()
9559+
9560+
_, err := js.AddStream(&nats.StreamConfig{
9561+
Name: "TEST",
9562+
Subjects: []string{"foo.>"},
9563+
})
9564+
require_NoError(t, err)
9565+
9566+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
9567+
Durable: "CONSUMER",
9568+
AckWait: 2 * time.Second,
9569+
AckPolicy: nats.AckExplicitPolicy,
9570+
FilterSubject: "foo.bar",
9571+
})
9572+
require_NoError(t, err)
9573+
9574+
// Publish two messages, one the consumer is interested in.
9575+
_, err = js.Publish("foo.bar", nil)
9576+
require_NoError(t, err)
9577+
_, err = js.Publish("foo.other", nil)
9578+
require_NoError(t, err)
9579+
9580+
sub, err := js.PullSubscribe("foo.bar", "CONSUMER")
9581+
require_NoError(t, err)
9582+
defer sub.Drain()
9583+
9584+
// Consumer info should start empty.
9585+
ci, err := js.ConsumerInfo("TEST", "CONSUMER")
9586+
require_NoError(t, err)
9587+
require_Equal(t, ci.Delivered.Stream, 0)
9588+
require_Equal(t, ci.AckFloor.Stream, 0)
9589+
9590+
// Fetch more messages than match our filter.
9591+
msgs, err := sub.Fetch(2, nats.MaxWait(time.Second))
9592+
require_NoError(t, err)
9593+
require_Len(t, len(msgs), 1)
9594+
9595+
// We have received, but not acknowledged, consumer info must reflect that.
9596+
ci, err = js.ConsumerInfo("TEST", "CONSUMER")
9597+
require_NoError(t, err)
9598+
require_Equal(t, ci.Delivered.Stream, 1)
9599+
require_Equal(t, ci.AckFloor.Stream, 0)
9600+
9601+
// Now we acknowledge the message and expect our delivered/ackfloor to be correct.
9602+
require_NoError(t, msgs[0].AckSync())
9603+
9604+
ci, err = js.ConsumerInfo("TEST", "CONSUMER")
9605+
require_NoError(t, err)
9606+
require_Equal(t, ci.Delivered.Stream, 1)
9607+
require_Equal(t, ci.AckFloor.Stream, 1)
9608+
}

0 commit comments

Comments
 (0)