Skip to content

Commit a372179

Browse files
craig[bot]stevendanna
andcommitted
Merge #133789
133789: kvserver,changefeeds,crosscluster: set per-consumer catchup scan limit r=stevendanna a=stevendanna Currently, it is easily possible for a single slow rangefeed consumer to acquire the entire catchup scan quota for a given store, preventing any other consumers from advancing. In the long run, we need a more sophisticated approach to solve this. This change is aimed to be a small improvement that solves the most egregious case: a single slow consumer consuming the entire quota. It introduces the concept of a ConsumerID into the rangefeed request. The idea of a ConsumerID is that it represents a logical rangefeed consumer such as a changefeed or LDR stream. Such consumers may make multiple MuxRangeFeed requests to a given node despite sharing the same downstream consumer. When per-consumer catchup scan limiting is enabled, no single consumer is allowed to consumer more than 75% of a given store's capacity. If no ConsumerID is specified, a random consumer ID is assigned to all rangefeeds originating from a given MuxRangeFeed call. Epic: none Release note: None Co-authored-by: Steven Danna <[email protected]>
2 parents 982decd + 47e6b28 commit a372179

File tree

18 files changed

+272
-16
lines changed

18 files changed

+272
-16
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
521521
Knobs: ca.knobs.FeedKnobs,
522522
ScopedTimers: ca.sliMetrics.Timers,
523523
MonitoringCfg: monitoringCfg,
524+
ConsumerID: int64(ca.spec.JobID),
524525
}, nil
525526
}
526527

pkg/ccl/changefeedccl/kvfeed/kv_feed.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ type Config struct {
9393
Knobs TestingKnobs
9494

9595
ScopedTimers *timers.ScopedTimers
96+
97+
ConsumerID int64
9698
}
9799

98100
// Run will run the kvfeed. The feed runs synchronously and returns an
@@ -123,6 +125,7 @@ func Run(ctx context.Context, cfg Config) error {
123125
cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp,
124126
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy,
125127
cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering,
128+
cfg.ConsumerID,
126129
cfg.InitialHighWater, cfg.EndTime,
127130
cfg.Codec,
128131
cfg.SchemaFeed,
@@ -248,6 +251,7 @@ type kvFeed struct {
248251
withDiff bool
249252
withFiltering bool
250253
withInitialBackfill bool
254+
consumerID int64
251255
initialHighWater hlc.Timestamp
252256
endTime hlc.Timestamp
253257
writer kvevent.Writer
@@ -278,6 +282,7 @@ func newKVFeed(
278282
schemaChangeEvents changefeedbase.SchemaChangeEventClass,
279283
schemaChangePolicy changefeedbase.SchemaChangePolicy,
280284
withInitialBackfill, withDiff, withFiltering bool,
285+
consumerID int64,
281286
initialHighWater hlc.Timestamp,
282287
endTime hlc.Timestamp,
283288
codec keys.SQLCodec,
@@ -297,6 +302,7 @@ func newKVFeed(
297302
withInitialBackfill: withInitialBackfill,
298303
withDiff: withDiff,
299304
withFiltering: withFiltering,
305+
consumerID: consumerID,
300306
initialHighWater: initialHighWater,
301307
endTime: endTime,
302308
schemaChangeEvents: schemaChangeEvents,
@@ -585,6 +591,7 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
585591
Frontier: resumeFrontier.Frontier(),
586592
WithDiff: f.withDiff,
587593
WithFiltering: f.withFiltering,
594+
ConsumerID: f.consumerID,
588595
Knobs: f.knobs,
589596
Timers: f.timers,
590597
RangeObserver: f.rangeObserver,

pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ func TestKVFeed(t *testing.T) {
144144
f := newKVFeed(buf, tc.spans, tc.checkpoint, hlc.Timestamp{},
145145
tc.schemaChangeEvents, tc.schemaChangePolicy,
146146
tc.needsInitialScan, tc.withDiff, true, /* withFiltering */
147+
0, /* consumerID */
147148
tc.initialHighWater, tc.endTime,
148149
codec,
149150
tf, sf, rangefeedFactory(ref.run), bufferFactory,

pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type rangeFeedConfig struct {
3131
Spans []kvcoord.SpanTimePair
3232
WithDiff bool
3333
WithFiltering bool
34+
ConsumerID int64
3435
RangeObserver kvcoord.RangeObserver
3536
Knobs TestingKnobs
3637
Timers *timers.ScopedTimers

pkg/ccl/crosscluster/producer/event_stream.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
153153
rangefeed.WithFrontierQuantized(quantize.Get(&s.execCfg.Settings.SV)),
154154
rangefeed.WithOnValues(s.onValues),
155155
rangefeed.WithDiff(s.spec.WithDiff),
156+
rangefeed.WithConsumerID(int64(s.streamID)),
156157
rangefeed.WithInvoker(func(fn func() error) error { return fn() }),
157158
rangefeed.WithFiltering(s.spec.WithFiltering),
158159
}

pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error
283283

284284
for !s.transport.IsExhausted() {
285285
args := makeRangeFeedRequest(
286-
s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff, m.cfg.withFiltering, m.cfg.withMatchingOriginIDs)
286+
s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff, m.cfg.withFiltering, m.cfg.withMatchingOriginIDs, m.cfg.consumerID)
287287
args.Replica = s.transport.NextReplica()
288288
args.StreamID = streamID
289289
s.ReplicaDescriptor = args.Replica

pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ type rangeFeedConfig struct {
6767
withMetadata bool
6868
withMatchingOriginIDs []uint32
6969
rangeObserver RangeObserver
70+
consumerID int64
7071

7172
knobs struct {
7273
// onRangefeedEvent invoked on each rangefeed event.
@@ -138,6 +139,12 @@ func WithMetadata() RangeFeedOption {
138139
})
139140
}
140141

142+
func WithConsumerID(cid int64) RangeFeedOption {
143+
return optionFunc(func(c *rangeFeedConfig) {
144+
c.consumerID = cid
145+
})
146+
}
147+
141148
// SpanTimePair is a pair of span along with its starting time. The starting
142149
// time is exclusive, i.e. the first possible emitted event (including catchup
143150
// scans) will be at startAfter.Next().
@@ -620,6 +627,7 @@ func makeRangeFeedRequest(
620627
withDiff bool,
621628
withFiltering bool,
622629
withMatchingOriginIDs []uint32,
630+
consumerID int64,
623631
) kvpb.RangeFeedRequest {
624632
admissionPri := admissionpb.BulkNormalPri
625633
if isSystemRange {
@@ -631,6 +639,7 @@ func makeRangeFeedRequest(
631639
Timestamp: startAfter,
632640
RangeID: rangeID,
633641
},
642+
ConsumerID: consumerID,
634643
WithDiff: withDiff,
635644
WithFiltering: withFiltering,
636645
WithMatchingOriginIDs: withMatchingOriginIDs,

pkg/kv/kvclient/rangefeed/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type config struct {
3939
withDiff bool
4040
withFiltering bool
4141
withMatchingOriginIDs []uint32
42+
consumerID int64
4243
onUnrecoverableError OnUnrecoverableError
4344
onCheckpoint OnCheckpoint
4445
frontierQuantize time.Duration
@@ -159,6 +160,12 @@ func WithOriginIDsMatching(originIDs ...uint32) Option {
159160
})
160161
}
161162

163+
func WithConsumerID(cid int64) Option {
164+
return optionFunc(func(c *config) {
165+
c.consumerID = cid
166+
})
167+
}
168+
162169
// WithInvoker makes an option to invoke the rangefeed tasks such as running the
163170
// the client and processing events emitted by the client with a caller-supplied
164171
// function, which can make it easier to introspect into work done by a given

pkg/kv/kvclient/rangefeed/rangefeed.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier span.Frontier, resumeWithF
349349
if f.onMetadata != nil {
350350
rangefeedOpts = append(rangefeedOpts, kvcoord.WithMetadata())
351351
}
352+
rangefeedOpts = append(rangefeedOpts, kvcoord.WithConsumerID(f.consumerID))
352353

353354
for i := 0; r.Next(); i++ {
354355
ts := frontier.Frontier()

pkg/kv/kvclient/rangefeed/rangefeed_external_test.go

Lines changed: 97 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1451,7 +1451,7 @@ func TestRangeFeedIntentResolutionRace(t *testing.T) {
14511451
}
14521452
eventC := make(chan *kvpb.RangeFeedEvent)
14531453
sink := newChannelSink(ctx, eventC)
1454-
_, rErr := s3.RangeFeed(sink.ctx, &req, sink)
1454+
_, rErr := s3.RangeFeed(sink.ctx, &req, sink, nil)
14551455
require.NoError(t, rErr) // check if we've errored yet
14561456
require.NoError(t, sink.Error())
14571457
t.Logf("started rangefeed on %s", repl3)
@@ -1877,3 +1877,99 @@ func TestRangeFeedMetadataAutoSplit(t *testing.T) {
18771877
}
18781878
})
18791879
}
1880+
1881+
// TestRangefeedCatchupStarvation tests that a single MuxRangefeed
1882+
// call cannot starve other users. Note that starvation is still
1883+
// possible if there are more than 2 consumers of a given range.
1884+
func TestRangefeedCatchupStarvation(t *testing.T) {
1885+
defer leaktest.AfterTest(t)()
1886+
defer log.Scope(t).Close(t)
1887+
1888+
testutils.RunValues(t, "feed_type", feedTypes, func(t *testing.T, rt rangefeedTestType) {
1889+
ctx := context.Background()
1890+
settings := cluster.MakeTestingClusterSettings()
1891+
kvserver.RangefeedUseBufferedSender.Override(ctx, &settings.SV, rt.useBufferedSender)
1892+
kvserver.RangefeedEnabled.Override(ctx, &settings.SV, true)
1893+
// Lower the limit to make it more likely to get starved.
1894+
kvserver.ConcurrentRangefeedItersLimit.Override(ctx, &settings.SV, 8)
1895+
kvserver.PerConsumerCatchupLimit.Override(ctx, &settings.SV, 6)
1896+
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
1897+
Settings: settings,
1898+
})
1899+
defer srv.Stopper().Stop(ctx)
1900+
s := srv.ApplicationLayer()
1901+
ts := s.Clock().Now()
1902+
scratchKey := append(s.Codec().TenantPrefix(), keys.ScratchRangeMin...)
1903+
scratchKey = scratchKey[:len(scratchKey):len(scratchKey)]
1904+
mkKey := func(k string) roachpb.Key {
1905+
return encoding.EncodeStringAscending(scratchKey, k)
1906+
}
1907+
ranges := 32
1908+
keysPerRange := 128
1909+
totalKeys := ranges * keysPerRange
1910+
for i := range ranges {
1911+
for j := range keysPerRange {
1912+
k := mkKey(fmt.Sprintf("%d-%d", i, j))
1913+
require.NoError(t, db.Put(ctx, k, 1))
1914+
}
1915+
_, _, err := srv.SplitRange(mkKey(fmt.Sprintf("%d", i)))
1916+
require.NoError(t, err)
1917+
}
1918+
1919+
span := roachpb.Span{Key: scratchKey, EndKey: scratchKey.PrefixEnd()}
1920+
f, err := rangefeed.NewFactory(s.AppStopper(), db, s.ClusterSettings(), nil)
1921+
require.NoError(t, err)
1922+
1923+
blocked := make(chan struct{})
1924+
r1, err := f.RangeFeed(ctx, "consumer-1-rf-1", []roachpb.Span{span}, ts,
1925+
func(ctx context.Context, value *kvpb.RangeFeedValue) {
1926+
blocked <- struct{}{}
1927+
<-ctx.Done()
1928+
},
1929+
rangefeed.WithConsumerID(1),
1930+
)
1931+
require.NoError(t, err)
1932+
defer r1.Close()
1933+
<-blocked
1934+
1935+
// Multiple rangefeeds from the same ConsumeID should
1936+
// be treated as the same consumer and thus they
1937+
// shouldn't be able to overwhelm the overall store
1938+
// quota.
1939+
for i := range 8 {
1940+
r1, err := f.RangeFeed(ctx, fmt.Sprintf("consumer-1-rf-%d", i+2), []roachpb.Span{span}, ts,
1941+
func(ctx context.Context, value *kvpb.RangeFeedValue) { <-ctx.Done() },
1942+
rangefeed.WithConsumerID(1),
1943+
)
1944+
require.NoError(t, err)
1945+
defer r1.Close()
1946+
}
1947+
1948+
// Despite 9 rangefeeds above each needing 32 catchup
1949+
// scans, the following rangefeed should always make
1950+
// progress because it has a different consumer ID.
1951+
r2ConsumedRow := make(chan roachpb.Key)
1952+
r2, err := f.RangeFeed(ctx, "rf2", []roachpb.Span{span}, ts,
1953+
func(ctx context.Context, value *kvpb.RangeFeedValue) {
1954+
r2ConsumedRow <- value.Key
1955+
},
1956+
rangefeed.WithConsumerID(2),
1957+
)
1958+
require.NoError(t, err)
1959+
defer r2.Close()
1960+
1961+
// Wait until we see every key we've writen on rf2.
1962+
seen := make(map[string]struct{}, 0)
1963+
for {
1964+
select {
1965+
case r := <-r2ConsumedRow:
1966+
seen[r.String()] = struct{}{}
1967+
if len(seen) >= totalKeys {
1968+
return
1969+
}
1970+
case <-time.After(testutils.DefaultSucceedsSoonDuration):
1971+
t.Fatal("test timed out")
1972+
}
1973+
}
1974+
})
1975+
}

0 commit comments

Comments
 (0)