Skip to content

Commit c4d2d02

Browse files
committed
kvserver,changefeeds,crosscluster: set per-consumer catchup scan limit
Currently, it is easily possible for a single slow rangefeed consumer to acquire the entire catchup scan quota for a given store, preventing any other consumers from advancing. Here, we introduce the concept of a ConsumerID into the rangefeed request. A ConsumerID is that it represents a logical rangefeed consumer such as a changefeed or LDR stream. Such consumers may make multiple MuxRangeFeed requests to a given node despite sharing the same downstream consumer. When per-consumer catchup scan limiting is enabled, no single consumer is allowed to consumer more than 75% of a given store's capacity. If no ConsumerID is specified, a random consumer ID is assigned to all rangefeeds originating from a given MuxRangeFeed call. In the long run, we need a more sophisticated approach to solve this. This change is aimed to be a small improvement that solves the most egregious case: a single slow consumer consuming the entire quota. The goal of this change is an easily backportable feature, however, it comes at the cost of adding yet-another mechanism to the existing systems attempting to limit catchup scans: 1. Client-side rate limiter, 2. Store-level CatchupIter semaphore, 3. Elastic CPU rate limiting in the main CatchUpScan loop, and 4. Any limiting imposed now or in the future by virtue of including and admission header in the request. Informs #132438 Epic: none Release note: None
1 parent 75cda80 commit c4d2d02

File tree

18 files changed

+266
-16
lines changed

18 files changed

+266
-16
lines changed

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,7 @@ func (ca *changeAggregator) makeKVFeedCfg(
514514
Knobs: ca.knobs.FeedKnobs,
515515
ScopedTimers: ca.sliMetrics.Timers,
516516
MonitoringCfg: monitoringCfg,
517+
ConsumerID: int64(ca.spec.JobID),
517518
}, nil
518519
}
519520

pkg/ccl/changefeedccl/kvfeed/kv_feed.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ type Config struct {
9393
Knobs TestingKnobs
9494

9595
ScopedTimers *timers.ScopedTimers
96+
97+
ConsumerID int64
9698
}
9799

98100
// Run will run the kvfeed. The feed runs synchronously and returns an
@@ -123,6 +125,7 @@ func Run(ctx context.Context, cfg Config) error {
123125
cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp,
124126
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy,
125127
cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering,
128+
cfg.ConsumerID,
126129
cfg.InitialHighWater, cfg.EndTime,
127130
cfg.Codec,
128131
cfg.SchemaFeed,
@@ -248,6 +251,7 @@ type kvFeed struct {
248251
withDiff bool
249252
withFiltering bool
250253
withInitialBackfill bool
254+
consumerID int64
251255
initialHighWater hlc.Timestamp
252256
endTime hlc.Timestamp
253257
writer kvevent.Writer
@@ -278,6 +282,7 @@ func newKVFeed(
278282
schemaChangeEvents changefeedbase.SchemaChangeEventClass,
279283
schemaChangePolicy changefeedbase.SchemaChangePolicy,
280284
withInitialBackfill, withDiff, withFiltering bool,
285+
consumerID int64,
281286
initialHighWater hlc.Timestamp,
282287
endTime hlc.Timestamp,
283288
codec keys.SQLCodec,
@@ -297,6 +302,7 @@ func newKVFeed(
297302
withInitialBackfill: withInitialBackfill,
298303
withDiff: withDiff,
299304
withFiltering: withFiltering,
305+
consumerID: consumerID,
300306
initialHighWater: initialHighWater,
301307
endTime: endTime,
302308
schemaChangeEvents: schemaChangeEvents,
@@ -585,6 +591,7 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro
585591
Frontier: resumeFrontier.Frontier(),
586592
WithDiff: f.withDiff,
587593
WithFiltering: f.withFiltering,
594+
ConsumerID: f.consumerID,
588595
Knobs: f.knobs,
589596
Timers: f.timers,
590597
RangeObserver: f.rangeObserver,

pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ func TestKVFeed(t *testing.T) {
144144
f := newKVFeed(buf, tc.spans, tc.checkpoint, hlc.Timestamp{},
145145
tc.schemaChangeEvents, tc.schemaChangePolicy,
146146
tc.needsInitialScan, tc.withDiff, true, /* withFiltering */
147+
0, /* consumerID */
147148
tc.initialHighWater, tc.endTime,
148149
codec,
149150
tf, sf, rangefeedFactory(ref.run), bufferFactory,

pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ type rangeFeedConfig struct {
3131
Spans []kvcoord.SpanTimePair
3232
WithDiff bool
3333
WithFiltering bool
34+
ConsumerID int64
3435
RangeObserver kvcoord.RangeObserver
3536
Knobs TestingKnobs
3637
Timers *timers.ScopedTimers

pkg/ccl/crosscluster/producer/event_stream.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) (retErr error) {
150150
rangefeed.WithFrontierQuantized(quantize.Get(&s.execCfg.Settings.SV)),
151151
rangefeed.WithOnValues(s.onValues),
152152
rangefeed.WithDiff(s.spec.WithDiff),
153+
rangefeed.WithConsumerID(int64(s.streamID)),
153154
rangefeed.WithInvoker(func(fn func() error) error { return fn() }),
154155
rangefeed.WithFiltering(s.spec.WithFiltering),
155156
}

pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error
283283

284284
for !s.transport.IsExhausted() {
285285
args := makeRangeFeedRequest(
286-
s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff, m.cfg.withFiltering, m.cfg.withMatchingOriginIDs)
286+
s.Span, s.token.Desc().RangeID, m.cfg.overSystemTable, s.startAfter, m.cfg.withDiff, m.cfg.withFiltering, m.cfg.withMatchingOriginIDs, m.cfg.consumerID)
287287
args.Replica = s.transport.NextReplica()
288288
args.StreamID = streamID
289289
s.ReplicaDescriptor = args.Replica

pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ type rangeFeedConfig struct {
6868
withMetadata bool
6969
withMatchingOriginIDs []uint32
7070
rangeObserver RangeObserver
71+
consumerID int64
7172

7273
knobs struct {
7374
// onRangefeedEvent invoked on each rangefeed event.
@@ -139,6 +140,12 @@ func WithMetadata() RangeFeedOption {
139140
})
140141
}
141142

143+
func WithConsumerID(cid int64) RangeFeedOption {
144+
return optionFunc(func(c *rangeFeedConfig) {
145+
c.consumerID = cid
146+
})
147+
}
148+
142149
// RangeFeed divides a RangeFeed request on range boundaries and establishes a
143150
// RangeFeed to each of the individual ranges. It streams back results on the
144151
// provided channel.
@@ -664,6 +671,7 @@ func makeRangeFeedRequest(
664671
withDiff bool,
665672
withFiltering bool,
666673
withMatchingOriginIDs []uint32,
674+
consumerID int64,
667675
) kvpb.RangeFeedRequest {
668676
admissionPri := admissionpb.BulkNormalPri
669677
if isSystemRange {
@@ -675,6 +683,7 @@ func makeRangeFeedRequest(
675683
Timestamp: startAfter,
676684
RangeID: rangeID,
677685
},
686+
ConsumerID: consumerID,
678687
WithDiff: withDiff,
679688
WithFiltering: withFiltering,
680689
WithMatchingOriginIDs: withMatchingOriginIDs,

pkg/kv/kvclient/rangefeed/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type config struct {
3939
withDiff bool
4040
withFiltering bool
4141
withMatchingOriginIDs []uint32
42+
consumerID int64
4243
onUnrecoverableError OnUnrecoverableError
4344
onCheckpoint OnCheckpoint
4445
frontierQuantize time.Duration
@@ -159,6 +160,12 @@ func WithOriginIDsMatching(originIDs ...uint32) Option {
159160
})
160161
}
161162

163+
func WithConsumerID(cid int64) Option {
164+
return optionFunc(func(c *config) {
165+
c.consumerID = cid
166+
})
167+
}
168+
162169
// WithInvoker makes an option to invoke the rangefeed tasks such as running the
163170
// the client and processing events emitted by the client with a caller-supplied
164171
// function, which can make it easier to introspect into work done by a given

pkg/kv/kvclient/rangefeed/rangefeed.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier span.Frontier, resumeWithF
349349
if f.onMetadata != nil {
350350
rangefeedOpts = append(rangefeedOpts, kvcoord.WithMetadata())
351351
}
352+
rangefeedOpts = append(rangefeedOpts, kvcoord.WithConsumerID(f.consumerID))
352353

353354
for i := 0; r.Next(); i++ {
354355
ts := frontier.Frontier()

pkg/kv/kvclient/rangefeed/rangefeed_external_test.go

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1451,7 +1451,7 @@ func TestRangeFeedIntentResolutionRace(t *testing.T) {
14511451
}
14521452
eventC := make(chan *kvpb.RangeFeedEvent)
14531453
sink := newChannelSink(ctx, eventC)
1454-
require.NoError(t, s3.RangeFeed(sink.ctx, &req, sink)) // check if we've errored yet
1454+
require.NoError(t, s3.RangeFeed(sink.ctx, &req, sink, nil)) // check if we've errored yet
14551455
require.NoError(t, sink.Error())
14561456
t.Logf("started rangefeed on %s", repl3)
14571457

@@ -1876,3 +1876,98 @@ func TestRangeFeedMetadataAutoSplit(t *testing.T) {
18761876
}
18771877
})
18781878
}
1879+
1880+
// TestRangefeedCatchupStarvation tests that a single MuxRangefeed
1881+
// call cannot starve other users. Note that starvation is still
1882+
// possible if there are more than 2 consumers of a given range.
1883+
func TestRangefeedCatchupStarvation(t *testing.T) {
1884+
defer leaktest.AfterTest(t)()
1885+
defer log.Scope(t).Close(t)
1886+
1887+
testutils.RunValues(t, "feed_type", feedTypes, func(t *testing.T, rt rangefeedTestType) {
1888+
ctx := context.Background()
1889+
settings := cluster.MakeTestingClusterSettings()
1890+
kvserver.RangefeedUseBufferedSender.Override(ctx, &settings.SV, rt.useBufferedSender)
1891+
kvserver.RangefeedEnabled.Override(ctx, &settings.SV, true)
1892+
// Lower the limit to make it more likely to get starved.
1893+
kvserver.ConcurrentRangefeedItersLimit.Override(ctx, &settings.SV, 8)
1894+
srv, _, db := serverutils.StartServer(t, base.TestServerArgs{
1895+
Settings: settings,
1896+
})
1897+
defer srv.Stopper().Stop(ctx)
1898+
s := srv.ApplicationLayer()
1899+
ts := s.Clock().Now()
1900+
scratchKey := append(s.Codec().TenantPrefix(), keys.ScratchRangeMin...)
1901+
scratchKey = scratchKey[:len(scratchKey):len(scratchKey)]
1902+
mkKey := func(k string) roachpb.Key {
1903+
return encoding.EncodeStringAscending(scratchKey, k)
1904+
}
1905+
ranges := 32
1906+
keysPerRange := 128
1907+
totalKeys := ranges * keysPerRange
1908+
for i := range ranges {
1909+
for j := range keysPerRange {
1910+
k := mkKey(fmt.Sprintf("%d-%d", i, j))
1911+
require.NoError(t, db.Put(ctx, k, 1))
1912+
}
1913+
_, _, err := srv.SplitRange(mkKey(fmt.Sprintf("%d", i)))
1914+
require.NoError(t, err)
1915+
}
1916+
1917+
span := roachpb.Span{Key: scratchKey, EndKey: scratchKey.PrefixEnd()}
1918+
f, err := rangefeed.NewFactory(s.AppStopper(), db, s.ClusterSettings(), nil)
1919+
require.NoError(t, err)
1920+
1921+
blocked := make(chan struct{})
1922+
r1, err := f.RangeFeed(ctx, "consumer-1-rf-1", []roachpb.Span{span}, ts,
1923+
func(ctx context.Context, value *kvpb.RangeFeedValue) {
1924+
blocked <- struct{}{}
1925+
<-ctx.Done()
1926+
},
1927+
rangefeed.WithConsumerID(1),
1928+
)
1929+
require.NoError(t, err)
1930+
defer r1.Close()
1931+
<-blocked
1932+
1933+
// Multiple rangefeeds from the same ConsumeID should
1934+
// be treated as the same consumer and thus they
1935+
// shouldn't be able to overwhelm the overall store
1936+
// quota.
1937+
for i := range 8 {
1938+
r1, err := f.RangeFeed(ctx, fmt.Sprintf("consumer-1-rf-%d", i+2), []roachpb.Span{span}, ts,
1939+
func(ctx context.Context, value *kvpb.RangeFeedValue) { <-ctx.Done() },
1940+
rangefeed.WithConsumerID(1),
1941+
)
1942+
require.NoError(t, err)
1943+
defer r1.Close()
1944+
}
1945+
1946+
// Despite 9 rangefeeds above each needing 32 catchup
1947+
// scans, the following rangefeed should always make
1948+
// progress because it has a different consumer ID.
1949+
r2ConsumedRow := make(chan roachpb.Key)
1950+
r2, err := f.RangeFeed(ctx, "rf2", []roachpb.Span{span}, ts,
1951+
func(ctx context.Context, value *kvpb.RangeFeedValue) {
1952+
r2ConsumedRow <- value.Key
1953+
},
1954+
rangefeed.WithConsumerID(2),
1955+
)
1956+
require.NoError(t, err)
1957+
defer r2.Close()
1958+
1959+
// Wait until we see every key we've writen on rf2.
1960+
seen := make(map[string]struct{}, 0)
1961+
for {
1962+
select {
1963+
case r := <-r2ConsumedRow:
1964+
seen[r.String()] = struct{}{}
1965+
if len(seen) >= totalKeys {
1966+
return
1967+
}
1968+
case <-time.After(testutils.DefaultSucceedsSoonDuration):
1969+
t.Fatal("test timed out")
1970+
}
1971+
}
1972+
})
1973+
}

0 commit comments

Comments
 (0)