-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Pause indexing completely in serverless when throttling #127173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pause indexing completely in serverless when throttling #127173
Conversation
Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing) |
…ingES11516 refresh branch
…ingES11516 Refresh branch
…ingES11516 refresh
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a suggestion.
private volatile ReleasableLock lock = NOOP_LOCK; | ||
|
||
public Releasable acquireThrottle() { | ||
return lock.acquire(); | ||
if (lock == pauseLockReference) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems counter to how the current structure is, special casing this pause lock. Can we do something like following instead? We could also go all in on the Semaphore and remove the Lock
interface from the equation.
Subject: [PATCH] phase2 variable name
---
Index: server/src/main/java/org/elasticsearch/index/engine/Engine.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java
--- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java (revision 10a60a55f949e04fb0ea513a877e8bd0ea4f8baf)
+++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java (date 1746355091402)
@@ -110,6 +110,7 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
@@ -454,67 +455,31 @@
private final CounterMetric throttleTimeMillisMetric = new CounterMetric();
private volatile long startOfThrottleNS;
private static final ReleasableLock NOOP_LOCK = new ReleasableLock(new NoOpLock());
- private final ReleasableLock lockReference = new ReleasableLock(new ReentrantLock());
- private final Lock pauseIndexingLock = new ReentrantLock();
- private final Condition pauseCondition = pauseIndexingLock.newCondition();
- private final ReleasableLock pauseLockReference = new ReleasableLock(pauseIndexingLock);
- private volatile AtomicBoolean pauseIndexing = new AtomicBoolean();
+ private final PauseLock throttlingLock;
+ private final ReleasableLock lockReference;
private volatile ReleasableLock lock = NOOP_LOCK;
+ public IndexThrottle(boolean pause) {
+ throttlingLock = new PauseLock(pause ? 0 : 1);
+ lockReference = new ReleasableLock(throttlingLock);
+ }
+
public Releasable acquireThrottle() {
- if (lock == pauseLockReference) {
- pauseLockReference.acquire();
- try {
- while (pauseIndexing.getAcquire()) {
- // System.out.println("Waiting on pause indexing lock");
- logger.trace("Waiting on pause indexing lock");
- pauseCondition.await();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } finally {
- // System.out.println("Acquired pause indexing lock");
- logger.trace("Acquired pause indexing lock");
- }
- return pauseLockReference;
- } else {
- return lock.acquire();
- }
+ return lock.acquire();
}
/** Activate throttling, which switches the lock to be a real lock */
public void activate() {
assert lock == NOOP_LOCK : "throttling activated while already active";
startOfThrottleNS = System.nanoTime();
+ throttlingLock.throttle();
lock = lockReference;
}
- public void activatePause() {
- assert lock == NOOP_LOCK : "throttling activated while already active";
- startOfThrottleNS = System.nanoTime();
- // System.out.println("Activate pause");
- pauseIndexing.setRelease(true);
- lock = pauseLockReference;
- }
-
/** Deactivate throttling, which switches the lock to be an always-acquirable NoOpLock */
public void deactivate() {
assert lock != NOOP_LOCK : "throttling deactivated but not active";
-
- if (lock == pauseLockReference) {
- logger.trace("Deactivate index throttling pause");
-
- // Signal the threads that are waiting on pauseCondition
- pauseLockReference.acquire();
- try {
- // System.out.println("Deactivate pause");
- pauseIndexing.setRelease(false);
- pauseCondition.signalAll();
- } finally {
- pauseLockReference.close();
- }
- }
+ throttlingLock.unthrottle();
lock = NOOP_LOCK;
assert startOfThrottleNS > 0 : "Bad state of startOfThrottleNS";
@@ -604,6 +569,53 @@
}
}
+ protected static final class PauseLock implements Lock {
+ private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
+ private final int allowThreads;
+
+ public PauseLock(int allowThreads) {
+ this.allowThreads = allowThreads;
+ }
+
+ public void lock() {} {
+ semaphore.acquireUninterruptibly();
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ semaphore.acquire();
+ }
+
+ @Override
+ public void unlock() {
+ semaphore.release();
+ }
+
+ @Override
+ public boolean tryLock() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Condition newCondition() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void throttle() {
+ assert semaphore.availablePermits() == Integer.MAX_VALUE;
+ semaphore.acquireUninterruptibly(Integer.MAX_VALUE - allowThreads);
+ }
+
+ public void unthrottle() {
+ assert semaphore.availablePermits() <= allowThreads;
+ semaphore.release(Integer.MAX_VALUE - allowThreads);
+ }
+ }
/**
* Perform document index operation on the engine
* @param index operation to perform
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I like this and it works so I can switch to this. I guess the only thing we loose with this is Re-entrancy, but I don't think we need that for the way we use this lock right now.
…lasticsearch into 04212025/PauseIndexingES11516 pull
…ingES11516 refresh es branch
…ingES11516 refresh
…ingES11516 refresh
…ingES11516 refresh branch
…ingES11516 refresh
…ingES11516 refresh branch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
It would be good to follow-up with a unittest - and also an integration test in this repo to ensure we have it tested here. But given the testing elsewhere we can merge this now to start experimenting.
This PR seeks to address stability issues seen in some CSPs. See ES-11516 for details. Serverless PR#3801 associated with this change. When throttling is enabled for indexing, we limit indexing to 1 thread per shard. However, this might not be sufficient throttling in serverless where we might have a large number of shards. With this change we pause indexing completely when throttling is enabled.
This PR seeks to address stability issues seen in some CSPs. See ES-11516 for details. Serverless PR#3801 associated with this change. When throttling is enabled for indexing, we limit indexing to 1 thread per shard. However, this might not be sufficient throttling in serverless where we might have a large number of shards. With this change we pause indexing completely when throttling is enabled.
`Engine.PauseLock#throttle` can be called when the lock is being throttled, so we can't guarantee that all permits are available before throttling. Resolve elastic#126359 See elastic#127173
`Engine.PauseLock#throttle` can be called when the lock is being throttled, so we can't guarantee that all permits are available before throttling. Resolve elastic#126359 See elastic#127173
`Engine.PauseLock#throttle` can be called when the lock is being throttled, so we can't guarantee that all permits are available before throttling. Resolve elastic#126359 See elastic#127173
This PR seeks to address stability issues seen in some CSPs. See ES-11516 for details. Serverless PR#3801 associated with this change.
When throttling is enabled for indexing, we limit indexing to 1 thread per shard. However, this might not be sufficient throttling in serverless where we might have a large number of shards. With this change we pause indexing completely when throttling is enabled.