Skip to content

Commit 7b8f4fb

Browse files
Throttle indexing when disk IO throttling is disabled (#129245)
The threadpool-based merge scheduler triggers indexing throttling if merges are still getting enqueued faster than they're executed, while they are also disk IO unthrottled. This PR fixes the case where indexing throttling was incorrectly NOT triggered when disk IO throttling was disabled via the index settings.
1 parent 4c21fbf commit 7b8f4fb

File tree

3 files changed

+28
-9
lines changed

3 files changed

+28
-9
lines changed

docs/changelog/129245.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 129245
2+
summary: Throttle indexing when disk IO throttling is disabled
3+
area: Engine
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,8 @@ private void checkMergeTaskThrottling() {
244244
// both currently running and enqueued merge tasks are considered "active" for throttling purposes
245245
int activeMerges = (int) (submittedMergesCount - doneMergesCount);
246246
if (activeMerges > configuredMaxMergeCount
247-
// only throttle indexing if disk IO is un-throttled, and we still can't keep up with the merge load
248-
&& threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec()
247+
// only throttle indexing if disk IO is un-throttled (if enabled), and we still can't keep up with the merge load
248+
&& (config.isAutoThrottle() == false || threadPoolMergeExecutorService.usingMaxTargetIORateBytesPerSec())
249249
&& shouldThrottleIncomingMerges.get() == false) {
250250
// maybe enable merge task throttling
251251
synchronized (shouldThrottleIncomingMerges) {

server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,15 @@ public void testSimpleMergeTaskReEnqueueingBySize() {
196196
}
197197
}
198198

199-
public void testIndexingThrottlingWhenSubmittingMerges() {
199+
public void testIndexingThrottlingWhenSubmittingMergesWithDiskIOThrottlingEnabled() {
200+
testIndexingThrottlingWhenSubmittingMerges(true);
201+
}
202+
203+
public void testIndexingThrottlingWhenSubmittingMergesWithDiskIOThrottlingDisabled() {
204+
testIndexingThrottlingWhenSubmittingMerges(false);
205+
}
206+
207+
private void testIndexingThrottlingWhenSubmittingMerges(boolean withDiskIOThrottlingEnabled) {
200208
final int maxThreadCount = randomIntBetween(1, 5);
201209
// settings validation requires maxMergeCount >= maxThreadCount
202210
final int maxMergeCount = maxThreadCount + randomIntBetween(0, 5);
@@ -209,6 +217,7 @@ public void testIndexingThrottlingWhenSubmittingMerges() {
209217
Settings mergeSchedulerSettings = Settings.builder()
210218
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), maxThreadCount)
211219
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), maxMergeCount)
220+
.put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), withDiskIOThrottlingEnabled)
212221
.build();
213222
TestThreadPoolMergeScheduler threadPoolMergeScheduler = new TestThreadPoolMergeScheduler(
214223
new ShardId("index", "_na_", 1),
@@ -224,20 +233,20 @@ public void testIndexingThrottlingWhenSubmittingMerges() {
224233
while (submittedMerges < mergesToSubmit - 1) {
225234
isUsingMaxTargetIORate.set(randomBoolean());
226235
if (submittedMergeTasks.isEmpty() == false && randomBoolean()) {
227-
// maybe schedule one submitted merge
236+
// maybe schedule one of the submitted merges (but still it's not run)
228237
MergeTask mergeTask = randomFrom(submittedMergeTasks);
229238
submittedMergeTasks.remove(mergeTask);
230239
mergeTask.schedule();
231240
} else {
232-
// submit one merge
241+
// submit one new merge
233242
MergeSource mergeSource = mock(MergeSource.class);
234243
OneMerge oneMerge = mock(OneMerge.class);
235244
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
236245
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
237246
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
238247
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
239248
submittedMerges++;
240-
if (isUsingMaxTargetIORate.get() && submittedMerges > maxMergeCount) {
249+
if ((isUsingMaxTargetIORate.get() || withDiskIOThrottlingEnabled == false) && submittedMerges > maxMergeCount) {
241250
expectIndexThrottling = true;
242251
} else if (submittedMerges <= maxMergeCount) {
243252
expectIndexThrottling = false;
@@ -246,15 +255,20 @@ public void testIndexingThrottlingWhenSubmittingMerges() {
246255
// assert IO throttle state
247256
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(expectIndexThrottling));
248257
}
249-
// submit one last merge when IO throttling is at max value
250-
isUsingMaxTargetIORate.set(true);
258+
if (withDiskIOThrottlingEnabled) {
259+
// submit one last merge when IO throttling is at max value
260+
isUsingMaxTargetIORate.set(true);
261+
} else {
262+
// but if disk IO throttling is not enabled, indexing throttling should still be triggered
263+
isUsingMaxTargetIORate.set(randomBoolean());
264+
}
251265
MergeSource mergeSource = mock(MergeSource.class);
252266
OneMerge oneMerge = mock(OneMerge.class);
253267
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
254268
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
255269
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
256270
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
257-
// assert index throttling because IO throttling is at max value
271+
// assert indexing throttling state because IO throttling is at max value OR disk IO throttling is disabled
258272
assertThat(threadPoolMergeScheduler.isIndexingThrottlingEnabled(), is(true));
259273
}
260274

0 commit comments

Comments
 (0)