diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java b/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java
index 7e2b41ebbdb..73d347d533e 100644
--- a/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java
+++ b/core/src/main/java/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.java
@@ -23,10 +23,10 @@
/**
* Limits the number of concurrent requests executed by the driver.
*
- *
Usage in non-blocking applications: beware that all built-in implementations of this interface
- * use locks for internal coordination, and do not qualify as lock-free, with the obvious exception
- * of {@code PassThroughRequestThrottler}. If your application enforces strict lock-freedom, then
- * request throttling should not be enabled.
+ *
Usage in non-blocking applications: beware that some implementations of this interface use
+ * locks for internal coordination, and do not qualify as lock-free. If your application enforces
+ * strict lock-freedom, then you should use the {@code PassThroughRequestThrottler} or the {@code
+ * ConcurrencyLimitingRequestThrottler}.
*/
public interface RequestThrottler extends Closeable {
diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
index ffe0ffe9650..8146c5b113a 100644
--- a/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
+++ b/core/src/main/java/com/datastax/oss/driver/internal/core/session/throttling/ConcurrencyLimitingRequestThrottler.java
@@ -26,10 +26,9 @@
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
-import java.util.ArrayDeque;
import java.util.Deque;
-import java.util.concurrent.locks.ReentrantLock;
-import net.jcip.annotations.GuardedBy;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,17 +60,12 @@ public class ConcurrencyLimitingRequestThrottler implements RequestThrottler {
private final String logPrefix;
private final int maxConcurrentRequests;
private final int maxQueueSize;
-
- private final ReentrantLock lock = new ReentrantLock();
-
- @GuardedBy("lock")
- private int concurrentRequests;
-
- @GuardedBy("lock")
- private final Deque queue = new ArrayDeque<>();
-
- @GuardedBy("lock")
- private boolean closed;
+ private final AtomicInteger concurrentRequests = new AtomicInteger(0);
+ // CLQ is not O(1) for size(), as it forces a full iteration of the queue. So, we track
+ // the size of the queue explicitly.
+ private final Deque queue = new ConcurrentLinkedDeque<>();
+ private final AtomicInteger queueSize = new AtomicInteger(0);
+ private volatile boolean closed = false;
public ConcurrencyLimitingRequestThrottler(DriverContext context) {
this.logPrefix = context.getSessionName();
@@ -88,50 +82,62 @@ public ConcurrencyLimitingRequestThrottler(DriverContext context) {
@Override
public void register(@NonNull Throttled request) {
- boolean notifyReadyRequired = false;
+ if (closed) {
+ LOG.trace("[{}] Rejecting request after shutdown", logPrefix);
+ fail(request, "The session is shutting down");
+ return;
+ }
- lock.lock();
- try {
- if (closed) {
- LOG.trace("[{}] Rejecting request after shutdown", logPrefix);
- fail(request, "The session is shutting down");
- } else if (queue.isEmpty() && concurrentRequests < maxConcurrentRequests) {
- // We have capacity for one more concurrent request
+ // Implementation note: Technically the "concurrent requests" or "queue size"
+ // could read transiently over the limit, but the queue itself will never grow
+ // beyond the limit since we always check for that condition and revert if
+ // over-limit. We do this instead of a CAS-loop to avoid the potential loop.
+
+ // If no backlog exists AND we get capacity, we can execute immediately
+ if (queueSize.get() == 0) {
+ // Take a claim first, and then check if we are OK to proceed
+ int newConcurrent = concurrentRequests.incrementAndGet();
+ if (newConcurrent <= maxConcurrentRequests) {
LOG.trace("[{}] Starting newly registered request", logPrefix);
- concurrentRequests += 1;
- notifyReadyRequired = true;
- } else if (queue.size() < maxQueueSize) {
- LOG.trace("[{}] Enqueuing request", logPrefix);
- queue.add(request);
+ request.onThrottleReady(false);
+ return;
} else {
- LOG.trace("[{}] Rejecting request because of full queue", logPrefix);
- fail(
- request,
- String.format(
- "The session has reached its maximum capacity "
- + "(concurrent requests: %d, queue size: %d)",
- maxConcurrentRequests, maxQueueSize));
+ // We exceeded the limit, decrement the count and fall through to the queuing logic
+ concurrentRequests.decrementAndGet();
}
- } finally {
- lock.unlock();
}
- // no need to hold the lock while allowing the task to progress
- if (notifyReadyRequired) {
- request.onThrottleReady(false);
+ // If we have a backlog, or we failed to claim capacity, try to enqueue
+ int newQueueSize = queueSize.incrementAndGet();
+ if (newQueueSize <= maxQueueSize) {
+ LOG.trace("[{}] Enqueuing request", logPrefix);
+ queue.offer(request);
+
+ // Double-check that we were still supposed to be enqueued; it is possible
+ // that the session was closed while we were enqueuing, it's also possible
+ // that it is right now removing the request, so we need to check both
+ if (closed) {
+ if (queue.remove(request)) {
+ queueSize.decrementAndGet();
+ LOG.trace("[{}] Rejecting late request after shutdown", logPrefix);
+ fail(request, "The session is shutting down");
+ }
+ }
+ } else {
+ LOG.trace("[{}] Rejecting request because of full queue", logPrefix);
+ queueSize.decrementAndGet();
+ fail(
+ request,
+ String.format(
+ "The session has reached its maximum capacity "
+ + "(concurrent requests: %d, queue size: %d)",
+ maxConcurrentRequests, maxQueueSize));
}
}
@Override
public void signalSuccess(@NonNull Throttled request) {
- Throttled nextRequest = null;
- lock.lock();
- try {
- nextRequest = onRequestDoneAndDequeNext();
- } finally {
- lock.unlock();
- }
-
+ Throttled nextRequest = onRequestDoneAndDequeNext();
if (nextRequest != null) {
nextRequest.onThrottleReady(true);
}
@@ -145,17 +151,13 @@ public void signalError(@NonNull Throttled request, @NonNull Throwable error) {
@Override
public void signalTimeout(@NonNull Throttled request) {
Throttled nextRequest = null;
- lock.lock();
- try {
- if (!closed) {
- if (queue.remove(request)) { // The request timed out before it was active
- LOG.trace("[{}] Removing timed out request from the queue", logPrefix);
- } else {
- nextRequest = onRequestDoneAndDequeNext();
- }
+ if (!closed) {
+ if (queue.remove(request)) { // The request timed out before it was active
+ queueSize.decrementAndGet();
+ LOG.trace("[{}] Removing timed out request from the queue", logPrefix);
+ } else {
+ nextRequest = onRequestDoneAndDequeNext();
}
- } finally {
- lock.unlock();
}
if (nextRequest != null) {
@@ -166,17 +168,13 @@ public void signalTimeout(@NonNull Throttled request) {
@Override
public void signalCancel(@NonNull Throttled request) {
Throttled nextRequest = null;
- lock.lock();
- try {
- if (!closed) {
- if (queue.remove(request)) { // The request has been cancelled before it was active
- LOG.trace("[{}] Removing cancelled request from the queue", logPrefix);
- } else {
- nextRequest = onRequestDoneAndDequeNext();
- }
+ if (!closed) {
+ if (queue.remove(request)) { // The request has been cancelled before it was active
+ queueSize.decrementAndGet();
+ LOG.trace("[{}] Removing cancelled request from the queue", logPrefix);
+ } else {
+ nextRequest = onRequestDoneAndDequeNext();
}
- } finally {
- lock.unlock();
}
if (nextRequest != null) {
@@ -184,17 +182,16 @@ public void signalCancel(@NonNull Throttled request) {
}
}
- @SuppressWarnings("GuardedBy") // this method is only called with the lock held
@Nullable
private Throttled onRequestDoneAndDequeNext() {
- assert lock.isHeldByCurrentThread();
if (!closed) {
- if (queue.isEmpty()) {
- concurrentRequests -= 1;
+ Throttled nextRequest = queue.poll();
+ if (nextRequest == null) {
+ concurrentRequests.decrementAndGet();
} else {
+ queueSize.decrementAndGet();
LOG.trace("[{}] Starting dequeued request", logPrefix);
- // don't touch concurrentRequests since we finished one but started another
- return queue.poll();
+ return nextRequest;
}
}
@@ -204,45 +201,28 @@ private Throttled onRequestDoneAndDequeNext() {
@Override
public void close() {
- lock.lock();
- try {
- closed = true;
- LOG.debug("[{}] Rejecting {} queued requests after shutdown", logPrefix, queue.size());
- for (Throttled request : queue) {
- fail(request, "The session is shutting down");
- }
- } finally {
- lock.unlock();
+ closed = true;
+
+ LOG.debug("[{}] Rejecting {} queued requests after shutdown", logPrefix, queueSize.get());
+ Throttled request;
+ while ((request = queue.poll()) != null) {
+ queueSize.decrementAndGet();
+ fail(request, "The session is shutting down");
}
}
public int getQueueSize() {
- lock.lock();
- try {
- return queue.size();
- } finally {
- lock.unlock();
- }
+ return queueSize.get();
}
@VisibleForTesting
int getConcurrentRequests() {
- lock.lock();
- try {
- return concurrentRequests;
- } finally {
- lock.unlock();
- }
+ return concurrentRequests.get();
}
@VisibleForTesting
Deque getQueue() {
- lock.lock();
- try {
- return queue;
- } finally {
- lock.unlock();
- }
+ return queue;
}
private static void fail(Throttled request, String message) {
diff --git a/manual/core/non_blocking/README.md b/manual/core/non_blocking/README.md
index 7abe9d856a3..f320ffd13d2 100644
--- a/manual/core/non_blocking/README.md
+++ b/manual/core/non_blocking/README.md
@@ -152,15 +152,13 @@ should not be used if strict lock-freedom is enforced.
[`SafeInitNodeStateListener`]: https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/metadata/SafeInitNodeStateListener.html
-The same is valid for both built-in [request throttlers]:
+The `RateLimitingRequestThrottler` is currently blocking. The `ConcurrencyLimitingRequestThrottler`
+is lock-free.
-* `ConcurrencyLimitingRequestThrottler`
-* `RateLimitingRequestThrottler`
-
-See the section about [throttling](../throttling) for details about these components. Again, they
-use locks internally, and depending on how many requests are being executed in parallel, the thread
-contention on these locks can be high: in short, if your application enforces strict lock-freedom,
-then these components should not be used.
+See the section about [throttling](../throttling) for details about these components. Depending on
+how many requests are being executed in parallel, the thread contention on these locks can be high:
+in short, if your application enforces strict lock-freedom, then you should not use the
+`RateLimitingRequestThrottler`.
[request throttlers]: https://docs.datastax.com/en/drivers/java/4.17/com/datastax/oss/driver/api/core/session/throttling/RequestThrottler.html