9
9
10
10
package org .elasticsearch .index .engine ;
11
11
12
+ import org .apache .logging .log4j .LogManager ;
12
13
import org .apache .logging .log4j .Logger ;
13
14
import org .apache .lucene .codecs .perfield .PerFieldKnnVectorsFormat ;
14
15
import org .apache .lucene .index .ByteVectorValues ;
87
88
import org .elasticsearch .index .store .Store ;
88
89
import org .elasticsearch .index .translog .Translog ;
89
90
import org .elasticsearch .index .translog .TranslogStats ;
91
+ import org .elasticsearch .indices .IndexingMemoryController ;
90
92
import org .elasticsearch .indices .recovery .RecoverySettings ;
91
93
import org .elasticsearch .search .suggest .completion .CompletionStats ;
92
94
import org .elasticsearch .threadpool .ThreadPool ;
108
110
import java .util .Set ;
109
111
import java .util .concurrent .CountDownLatch ;
110
112
import java .util .concurrent .ExecutionException ;
113
+ import java .util .concurrent .Semaphore ;
111
114
import java .util .concurrent .TimeUnit ;
112
115
import java .util .concurrent .atomic .AtomicBoolean ;
113
116
import java .util .concurrent .locks .Condition ;
@@ -145,6 +148,7 @@ public abstract class Engine implements Closeable {
145
148
protected final ReentrantLock failEngineLock = new ReentrantLock ();
146
149
protected final SetOnce <Exception > failedEngine = new SetOnce <>();
147
150
protected final boolean enableRecoverySource ;
151
+ protected final boolean pauseIndexingOnThrottle ;
148
152
149
153
private final AtomicBoolean isClosing = new AtomicBoolean ();
150
154
private final SubscribableListener <Void > drainOnCloseListener = new SubscribableListener <>();
@@ -176,6 +180,9 @@ protected Engine(EngineConfig engineConfig) {
176
180
this .enableRecoverySource = RecoverySettings .INDICES_RECOVERY_SOURCE_ENABLED_SETTING .get (
177
181
engineConfig .getIndexSettings ().getSettings ()
178
182
);
183
+ this .pauseIndexingOnThrottle = IndexingMemoryController .PAUSE_INDEXING_ON_THROTTLE .get (
184
+ engineConfig .getIndexSettings ().getSettings ()
185
+ );
179
186
}
180
187
181
188
/**
@@ -444,12 +451,19 @@ public interface IndexCommitListener {
444
451
* is enabled
445
452
*/
446
453
protected static final class IndexThrottle {
454
+ private static final Logger logger = LogManager .getLogger (IndexThrottle .class );
447
455
private final CounterMetric throttleTimeMillisMetric = new CounterMetric ();
448
456
private volatile long startOfThrottleNS ;
449
457
private static final ReleasableLock NOOP_LOCK = new ReleasableLock (new NoOpLock ());
450
- private final ReleasableLock lockReference = new ReleasableLock (new ReentrantLock ());
458
+ private final PauseLock throttlingLock ;
459
+ private final ReleasableLock lockReference ;
451
460
private volatile ReleasableLock lock = NOOP_LOCK ;
452
461
462
+ public IndexThrottle (boolean pause ) {
463
+ throttlingLock = new PauseLock (pause ? 0 : 1 );
464
+ lockReference = new ReleasableLock (throttlingLock );
465
+ }
466
+
453
467
public Releasable acquireThrottle () {
454
468
return lock .acquire ();
455
469
}
@@ -458,12 +472,15 @@ public Releasable acquireThrottle() {
458
472
public void activate () {
459
473
assert lock == NOOP_LOCK : "throttling activated while already active" ;
460
474
startOfThrottleNS = System .nanoTime ();
475
+ throttlingLock .throttle ();
461
476
lock = lockReference ;
462
477
}
463
478
464
479
/** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */
465
480
public void deactivate () {
466
481
assert lock != NOOP_LOCK : "throttling deactivated but not active" ;
482
+
483
+ throttlingLock .unthrottle ();
467
484
lock = NOOP_LOCK ;
468
485
469
486
assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS" ;
@@ -553,6 +570,58 @@ public Condition newCondition() {
553
570
}
554
571
}
555
572
573
+ /* A lock implementation that allows us to control how many threads can take the lock
574
+ * In particular, this is used to set the number of allowed threads to 1 or 0
575
+ * when index throttling is activated.
576
+ */
577
+ protected static final class PauseLock implements Lock {
578
+ private final Semaphore semaphore = new Semaphore (Integer .MAX_VALUE );
579
+ private final int allowThreads ;
580
+
581
+ public PauseLock (int allowThreads ) {
582
+ this .allowThreads = allowThreads ;
583
+ }
584
+
585
+ public void lock () {
586
+ semaphore .acquireUninterruptibly ();
587
+ }
588
+
589
+ @ Override
590
+ public void lockInterruptibly () throws InterruptedException {
591
+ semaphore .acquire ();
592
+ }
593
+
594
+ @ Override
595
+ public void unlock () {
596
+ semaphore .release ();
597
+ }
598
+
599
+ @ Override
600
+ public boolean tryLock () {
601
+ throw new UnsupportedOperationException ();
602
+ }
603
+
604
+ @ Override
605
+ public boolean tryLock (long time , TimeUnit unit ) throws InterruptedException {
606
+ throw new UnsupportedOperationException ();
607
+ }
608
+
609
+ @ Override
610
+ public Condition newCondition () {
611
+ throw new UnsupportedOperationException ();
612
+ }
613
+
614
+ public void throttle () {
615
+ assert semaphore .availablePermits () == Integer .MAX_VALUE ;
616
+ semaphore .acquireUninterruptibly (Integer .MAX_VALUE - allowThreads );
617
+ }
618
+
619
+ public void unthrottle () {
620
+ assert semaphore .availablePermits () <= allowThreads ;
621
+ semaphore .release (Integer .MAX_VALUE - allowThreads );
622
+ }
623
+ }
624
+
556
625
/**
557
626
* Perform document index operation on the engine
558
627
* @param index operation to perform
0 commit comments