From e623abc309c402ab016f6319f6ca8d36604bb05c Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 24 May 2021 23:36:56 +0300 Subject: [PATCH 1/3] reworks and improves Resumability impl this includes: * rework of InMemoryResumableFramesStore and improvement in its tests coverage * improvements in Client/Server resume Session and ensuring that if connection is rejected for any reasons - it is fully closed on both outbound and inbound ends (This fix is needed for LocalDuplexConnection scenario which may be in unterminated state if it will not be subscribed on the inbound) * enabling resumability tests for LocalTransport * improvements in logging * general cleanups and polishing Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- .../io/rsocket/core/RSocketConnector.java | 1 + .../src/main/java/io/rsocket/core/Resume.java | 2 +- .../java/io/rsocket/core/ServerSetup.java | 3 +- .../rsocket/resume/ClientRSocketSession.java | 73 +- .../resume/InMemoryResumableFramesStore.java | 824 ++++++++++++++---- .../resume/ResumableDuplexConnection.java | 64 +- .../rsocket/resume/ServerRSocketSession.java | 55 +- .../resume/InMemoryResumeStoreTest.java | 492 ++++++++++- .../rsocket/resume/ResumeIntegrationTest.java | 6 +- .../java/io/rsocket/test/TransportTest.java | 20 +- .../local/LocalResumableTransportTest.java | 2 - ...sumableWithFragmentationTransportTest.java | 42 + ...sumableWithFragmentationTransportTest.java | 55 ++ .../WebsocketResumableTransportTest.java | 58 ++ ...sumableWithFragmentationTransportTest.java | 58 ++ 15 files changed, 1500 insertions(+), 255 deletions(-) create mode 100644 rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableWithFragmentationTransportTest.java create mode 100644 rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java create mode 100644 rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java create mode 100644 rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java index fe91cdb6f..edd13b48c 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java @@ -612,6 +612,7 @@ public Mono connect(Supplier transportSupplier) { final ResumableDuplexConnection resumableDuplexConnection = new ResumableDuplexConnection( CLIENT_TAG, + resumeToken, clientServerConnection, resumableFramesStore); final ResumableClientSetup resumableClientSetup = diff --git a/rsocket-core/src/main/java/io/rsocket/core/Resume.java b/rsocket-core/src/main/java/io/rsocket/core/Resume.java index 48133af98..fa0eedbfa 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/Resume.java +++ b/rsocket-core/src/main/java/io/rsocket/core/Resume.java @@ -160,7 +160,7 @@ boolean isCleanupStoreOnKeepAlive() { Function getStoreFactory(String tag) { return storeFactory != null ? storeFactory - : token -> new InMemoryResumableFramesStore(tag, 100_000); + : token -> new InMemoryResumableFramesStore(tag, token, 100_000); } Duration getStreamTimeout() { diff --git a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java index 318c54816..0b23bcde5 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java @@ -109,7 +109,8 @@ public Mono acceptRSocketSetup( final ResumableFramesStore resumableFramesStore = resumeStoreFactory.apply(resumeToken); final ResumableDuplexConnection resumableDuplexConnection = - new ResumableDuplexConnection("server", duplexConnection, resumableFramesStore); + new ResumableDuplexConnection( + "server", resumeToken, duplexConnection, resumableFramesStore); final ServerRSocketSession serverRSocketSession = new ServerRSocketSession( resumeToken, diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java index 9fd95ad17..c58cc4954 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java @@ -18,6 +18,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.util.CharsetUtil; import io.rsocket.DuplexConnection; import io.rsocket.exceptions.ConnectionErrorException; import io.rsocket.exceptions.Exceptions; @@ -54,6 +55,7 @@ public class ClientRSocketSession final Retry retry; final boolean cleanupStoreOnKeepAlive; final ByteBuf resumeToken; + final String session; volatile Subscription s; static final AtomicReferenceFieldUpdater S = @@ -71,20 +73,30 @@ public ClientRSocketSession( Retry retry, boolean cleanupStoreOnKeepAlive) { this.resumeToken = resumeToken; + this.session = resumeToken.toString(CharsetUtil.UTF_8); this.connectionFactory = connectionFactory.flatMap( dc -> { + final long impliedPosition = resumableFramesStore.frameImpliedPosition(); + final long position = resumableFramesStore.framePosition(); dc.sendFrame( 0, ResumeFrameCodec.encode( dc.alloc(), resumeToken.retain(), // server uses this to release its cache - resumableFramesStore.frameImpliedPosition(), // observed on the client side + impliedPosition, // observed on the client side // server uses this to check whether there is no mismatch - resumableFramesStore.framePosition() // sent from the client sent + position // sent from the client sent )); - logger.debug("Resume Frame has been sent"); + + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. ResumeFrame[impliedPosition[{}], position[{}]] has been sent.", + session, + impliedPosition, + position); + } return connectionTransformer.apply(dc); }); @@ -105,7 +117,12 @@ void reconnect(int index) { if (this.s == Operators.cancelledSubscription() && S.compareAndSet(this, Operators.cancelledSubscription(), null)) { keepAliveSupport.stop(); - logger.debug("Connection[" + index + "] is lost. Reconnecting to resume..."); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting to resume...", + session, + index); + } connectionFactory.retryWhen(retry).timeout(resumeSessionDuration).subscribe(this); } } @@ -155,21 +172,30 @@ public void onNext(Tuple2 tuple2) { DuplexConnection nextDuplexConnection = tuple2.getT2(); if (!Operators.terminate(S, this)) { - logger.debug("Session has already been expired. Terminating received connection"); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Session has already been expired. Terminating received connection", + session); + } final ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server=[Session Expired]"); nextDuplexConnection.sendErrorAndClose(connectionErrorException); + nextDuplexConnection.receive().subscribe().dispose(); return; } final int streamId = FrameHeaderCodec.streamId(shouldBeResumeOKFrame); if (streamId != 0) { - logger.debug( - "Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection"); - resumableConnection.dispose(); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection", + session); + } final ConnectionErrorException connectionErrorException = new ConnectionErrorException("RESUME_OK frame must be received before any others"); + resumableConnection.dispose(connectionErrorException); nextDuplexConnection.sendErrorAndClose(connectionErrorException); + nextDuplexConnection.receive().subscribe().dispose(); return; } @@ -183,7 +209,8 @@ public void onNext(Tuple2 tuple2) { final long position = resumableFramesStore.framePosition(); final long impliedPosition = resumableFramesStore.frameImpliedPosition(); logger.debug( - "ResumeOK FRAME received. ServerResumeState{observedFramesPosition[{}]}. ClientResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}", + "Side[client]|Session[{}]. ResumeOK FRAME received. ServerResumeState[remoteImpliedPosition[{}]]. ClientResumeState[impliedPosition[{}], position[{}]]", + session, remoteImpliedPos, impliedPosition, position); @@ -194,42 +221,54 @@ public void onNext(Tuple2 tuple2) { } } catch (IllegalStateException e) { logger.debug("Exception occurred while releasing frames in the frameStore", e); - resumableConnection.dispose(); + resumableConnection.dispose(e); final ConnectionErrorException t = new ConnectionErrorException(e.getMessage(), e); nextDuplexConnection.sendErrorAndClose(t); + nextDuplexConnection.receive().subscribe().dispose(); return; } if (resumableConnection.connect(nextDuplexConnection)) { keepAliveSupport.start(); - logger.debug("Session has been resumed successfully"); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Session has been resumed successfully", session); + } } else { - logger.debug("Session has already been expired. Terminating received connection"); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Session has already been expired. Terminating received connection", + session); + } final ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server_pos=[Session Expired]"); nextDuplexConnection.sendErrorAndClose(connectionErrorException); + nextDuplexConnection.receive().subscribe().dispose(); } } else { logger.debug( - "Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection", + "Side[client]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection", + session, remoteImpliedPos, position); - resumableConnection.dispose(); final ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server_pos=[" + remoteImpliedPos + "]"); + resumableConnection.dispose(connectionErrorException); nextDuplexConnection.sendErrorAndClose(connectionErrorException); + nextDuplexConnection.receive().subscribe().dispose(); } } else if (frameType == FrameType.ERROR) { final RuntimeException exception = Exceptions.from(0, shouldBeResumeOKFrame); logger.debug("Received error frame. Terminating received connection", exception); - resumableConnection.dispose(); + resumableConnection.dispose(exception); } else { logger.debug( "Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection"); - resumableConnection.dispose(); final ConnectionErrorException connectionErrorException = new ConnectionErrorException("RESUME_OK frame must be received before any others"); + resumableConnection.dispose(connectionErrorException); nextDuplexConnection.sendErrorAndClose(connectionErrorException); + nextDuplexConnection.receive().subscribe().dispose(); } } @@ -239,7 +278,7 @@ public void onError(Throwable t) { Operators.onErrorDropped(t, currentContext()); } - resumableConnection.dispose(); + resumableConnection.dispose(t); } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java b/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java index 03516af92..87d82048d 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java @@ -19,118 +19,286 @@ import static io.rsocket.resume.ResumableDuplexConnection.isResumableFrame; import io.netty.buffer.ByteBuf; -import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import io.netty.util.CharsetUtil; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.CoreSubscriber; +import reactor.core.Fuseable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.core.publisher.Sinks; +import reactor.util.annotation.Nullable; /** * writes - n (where n is frequent, primary operation) reads - m (where m == KeepAliveFrequency) * skip - k -> 0 (where k is the rare operation which happens after disconnection */ public class InMemoryResumableFramesStore extends Flux - implements CoreSubscriber, ResumableFramesStore, Subscription { + implements ResumableFramesStore, Subscription { + private FramesSubscriber framesSubscriber; private static final Logger logger = LoggerFactory.getLogger(InMemoryResumableFramesStore.class); final Sinks.Empty disposed = Sinks.empty(); - final ArrayList cachedFrames; - final String tag; + final Queue cachedFrames; + final String side; + final String session; final int cacheLimit; volatile long impliedPosition; static final AtomicLongFieldUpdater IMPLIED_POSITION = AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "impliedPosition"); - volatile long position; - static final AtomicLongFieldUpdater POSITION = - AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "position"); + volatile long firstAvailableFramePosition; + static final AtomicLongFieldUpdater FIRST_AVAILABLE_FRAME_POSITION = + AtomicLongFieldUpdater.newUpdater( + InMemoryResumableFramesStore.class, "firstAvailableFramePosition"); - volatile int cacheSize; - static final AtomicIntegerFieldUpdater CACHE_SIZE = - AtomicIntegerFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "cacheSize"); + long remoteImpliedPosition; - CoreSubscriber saveFramesSubscriber; + int cacheSize; + + Throwable terminal; CoreSubscriber actual; + CoreSubscriber pendingActual; + + volatile long state; + static final AtomicLongFieldUpdater STATE = + AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "state"); /** - * Indicates whether there is an active connection or not. - * - *
    - *
  • 0 - no active connection - *
  • 1 - active connection - *
  • 2 - disposed - *
- * - *
-   * 0 <-----> 1
-   * |         |
-   * +--> 2 <--+
-   * 
+ * Flag which indicates that {@link InMemoryResumableFramesStore} is finalized and all related + * stores are cleaned */ - volatile int state; - - static final AtomicIntegerFieldUpdater STATE = - AtomicIntegerFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "state"); + static final long FINALIZED_FLAG = + 0b1000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** + * Flag which indicates that {@link InMemoryResumableFramesStore} is terminated via the {@link + * InMemoryResumableFramesStore#dispose()} method + */ + static final long DISPOSED_FLAG = + 0b0100_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** + * Flag which indicates that {@link InMemoryResumableFramesStore} is terminated via the {@link + * FramesSubscriber#onComplete()} or {@link FramesSubscriber#onError(Throwable)} ()} methods + */ + static final long TERMINATED_FLAG = + 0b0010_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** Flag which indicates that {@link InMemoryResumableFramesStore} has active frames consumer */ + static final long CONNECTED_FLAG = + 0b0001_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** + * Flag which indicates that {@link InMemoryResumableFramesStore} has no active frames consumer + * but there is a one pending + */ + static final long PENDING_CONNECTION_FLAG = + 0b0000_1000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** + * Flag which indicates that there are some received implied position changes from the remote + * party + */ + static final long REMOTE_IMPLIED_POSITION_CHANGED_FLAG = + 0b0000_0100_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** + * Flag which indicates that there are some frames stored in the {@link + * io.rsocket.internal.UnboundedProcessor} which has to be cached and sent to the remote party + */ + static final long HAS_FRAME_FLAG = + 0b0000_0010_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** + * Flag which indicates that {@link InMemoryResumableFramesStore#drain(long)} has an actor which + * is currently progressing on the work. This flag should work as a guard to enter|exist into|from + * the {@link InMemoryResumableFramesStore#drain(long)} method. + */ + static final long MAX_WORK_IN_PROGRESS = + 0b0000_0000_0000_0000_0000_0000_0000_0000_1111_1111_1111_1111_1111_1111_1111_1111L; - public InMemoryResumableFramesStore(String tag, int cacheSizeBytes) { - this.tag = tag; + public InMemoryResumableFramesStore(String side, ByteBuf session, int cacheSizeBytes) { + this.side = side; + this.session = session.toString(CharsetUtil.UTF_8); this.cacheLimit = cacheSizeBytes; - this.cachedFrames = new ArrayList<>(); + this.cachedFrames = new ArrayDeque<>(); } public Mono saveFrames(Flux frames) { return frames .transform( - Operators.lift( - (__, actual) -> { - this.saveFramesSubscriber = actual; - return this; - })) + Operators.lift( + (__, actual) -> this.framesSubscriber = new FramesSubscriber(actual, this))) .then(); } @Override public void releaseFrames(long remoteImpliedPos) { - long pos = position; - logger.debug( - "{} Removing frames for local: {}, remote implied: {}", tag, pos, remoteImpliedPos); - long toRemoveBytes = Math.max(0, remoteImpliedPos - pos); - int removedBytes = 0; - final ArrayList frames = cachedFrames; - synchronized (this) { - while (toRemoveBytes > removedBytes && frames.size() > 0) { - ByteBuf cachedFrame = frames.remove(0); - int frameSize = cachedFrame.readableBytes(); - cachedFrame.release(); - removedBytes += frameSize; + long lastReceivedRemoteImpliedPosition = this.remoteImpliedPosition; + if (lastReceivedRemoteImpliedPosition > remoteImpliedPos) { + throw new IllegalStateException( + "Given Remote Implied Position is behind the last received Remote Implied Position"); + } + + this.remoteImpliedPosition = remoteImpliedPos; + + final long previousState = markRemoteImpliedPositionChanged(this); + if (isFinalized(previousState) || isWorkInProgress(previousState)) { + return; + } + + drain((previousState + 1) | REMOTE_IMPLIED_POSITION_CHANGED_FLAG); + } + + void drain(long expectedState) { + final Fuseable.QueueSubscription qs = this.framesSubscriber.qs; + final Queue cachedFrames = this.cachedFrames; + + for (; ; ) { + if (hasRemoteImpliedPositionChanged(expectedState)) { + expectedState = handlePendingRemoteImpliedPositionChanges(expectedState, cachedFrames); + } + + if (hasPendingConnection(expectedState)) { + expectedState = handlePendingConnection(expectedState, cachedFrames); + } + + if (isConnected(expectedState)) { + if (isTerminated(expectedState)) { + handleTerminal(this.terminal); + } else if (isDisposed()) { + handleTerminal(new CancellationException("Disposed")); + } else if (hasFrames(expectedState)) { + handlePendingFrames(qs); + } + } + + if (isDisposed(expectedState) || isTerminated(expectedState)) { + clearAndFinalize(this); + return; + } + + expectedState = markWorkDone(this, expectedState); + if (isFinalized(expectedState)) { + return; + } + + if (!isWorkInProgress(expectedState)) { + return; } } + } - if (toRemoveBytes > removedBytes) { - throw new IllegalStateException( - String.format( - "Local and remote state disagreement: " - + "need to remove additional %d bytes, but cache is empty", - toRemoveBytes)); - } else if (toRemoveBytes < removedBytes) { - throw new IllegalStateException( - "Local and remote state disagreement: local and remote frame sizes are not equal"); - } else { - POSITION.addAndGet(this, removedBytes); - if (cacheLimit != Integer.MAX_VALUE) { - CACHE_SIZE.addAndGet(this, -removedBytes); - logger.debug("{} Removed frames. Current cache size: {}", tag, cacheSize); + long handlePendingRemoteImpliedPositionChanges(long expectedState, Queue cachedFrames) { + final long remoteImpliedPosition = this.remoteImpliedPosition; + final long firstAvailableFramePosition = this.firstAvailableFramePosition; + final long toDropFromCache = Math.max(0, remoteImpliedPosition - firstAvailableFramePosition); + + if (toDropFromCache > 0) { + final int droppedFromCache = dropFramesFromCache(toDropFromCache, cachedFrames); + + if (toDropFromCache > droppedFromCache) { + this.terminal = + new IllegalStateException( + String.format( + "Local and remote state disagreement: " + + "need to remove additional %d bytes, but cache is empty", + toDropFromCache)); + expectedState = markTerminated(this) | TERMINATED_FLAG; + } + + if (toDropFromCache < droppedFromCache) { + this.terminal = + new IllegalStateException( + "Local and remote state disagreement: local and remote frame sizes are not equal"); + expectedState = markTerminated(this) | TERMINATED_FLAG; + } + + FIRST_AVAILABLE_FRAME_POSITION.lazySet(this, firstAvailableFramePosition + droppedFromCache); + if (this.cacheLimit != Integer.MAX_VALUE) { + this.cacheSize -= droppedFromCache; + + if (logger.isDebugEnabled()) { + logger.debug( + "Side[{}]|Session[{}]. Removed frames from cache to position[{}]. CacheSize[{}]", + this.side, + this.session, + this.remoteImpliedPosition, + this.cacheSize); + } } } + + return expectedState; + } + + void handlePendingFrames(Fuseable.QueueSubscription qs) { + for (; ; ) { + final ByteBuf frame = qs.poll(); + final boolean empty = frame == null; + + if (empty) { + break; + } + + handleFrame(frame); + + if (!isConnected(this.state)) { + break; + } + } + } + + long handlePendingConnection(long expectedState, Queue cachedFrames) { + CoreSubscriber lastActual = null; + for (; ; ) { + final CoreSubscriber nextActual = this.pendingActual; + + if (nextActual != lastActual) { + for (final ByteBuf frame : cachedFrames) { + nextActual.onNext(frame.retainedSlice()); + } + } + + expectedState = markConnected(this, expectedState); + if (isConnected(expectedState)) { + if (logger.isDebugEnabled()) { + logger.debug( + "Side[{}]|Session[{}]. Connected at Position[{}] and ImpliedPosition[{}]", + side, + session, + firstAvailableFramePosition, + impliedPosition); + } + + this.actual = nextActual; + break; + } + + if (!hasPendingConnection(expectedState)) { + break; + } + + lastActual = nextActual; + } + return expectedState; + } + + static int dropFramesFromCache(long toRemoveBytes, Queue cache) { + int removedBytes = 0; + while (toRemoveBytes > removedBytes && cache.size() > 0) { + final ByteBuf cachedFrame = cache.poll(); + final int frameSize = cachedFrame.readableBytes(); + + cachedFrame.release(); + + removedBytes += frameSize; + } + + return removedBytes; } @Override @@ -140,12 +308,12 @@ public Flux resumeStream() { @Override public long framePosition() { - return position; + return this.firstAvailableFramePosition; } @Override public long frameImpliedPosition() { - return impliedPosition & Long.MAX_VALUE; + return this.impliedPosition & Long.MAX_VALUE; } @Override @@ -169,7 +337,8 @@ void pauseImplied() { final long impliedPosition = this.impliedPosition; if (IMPLIED_POSITION.compareAndSet(this, impliedPosition, impliedPosition | Long.MIN_VALUE)) { - logger.debug("Tag {}. Paused at position[{}]", tag, impliedPosition); + logger.debug( + "Side[{}]|Session[{}]. Paused at position[{}]", side, session, impliedPosition); return; } } @@ -181,7 +350,11 @@ void resumeImplied() { final long restoredImpliedPosition = impliedPosition & Long.MAX_VALUE; if (IMPLIED_POSITION.compareAndSet(this, impliedPosition, restoredImpliedPosition)) { - logger.debug("Tag {}. Resumed at position[{}]", tag, restoredImpliedPosition); + logger.debug( + "Side[{}]|Session[{}]. Resumed at position[{}]", + side, + session, + restoredImpliedPosition); return; } } @@ -194,102 +367,94 @@ public Mono onClose() { @Override public void dispose() { - if (STATE.getAndSet(this, 2) != 2) { - cacheSize = 0; - synchronized (this) { - logger.debug("Tag {}.Disposing InMemoryFrameStore", tag); - for (ByteBuf frame : cachedFrames) { - if (frame != null) { - frame.release(); - } - } - cachedFrames.clear(); - } - disposed.tryEmitEmpty(); + final long previousState = markDisposed(this); + if (isFinalized(previousState) + || isDisposed(previousState) + || isWorkInProgress(previousState)) { + return; + } + + drain(previousState | DISPOSED_FLAG); + } + + void clearCache() { + final Queue frames = this.cachedFrames; + this.cacheSize = 0; + + ByteBuf frame; + while ((frame = frames.poll()) != null) { + frame.release(); } } @Override public boolean isDisposed() { - return state == 2; + return isDisposed(this.state); } - @Override - public void onSubscribe(Subscription s) { - saveFramesSubscriber.onSubscribe(Operators.emptySubscription()); - s.request(Long.MAX_VALUE); + void handleFrame(ByteBuf frame) { + final boolean isResumable = isResumableFrame(frame); + if (isResumable) { + handleResumableFrame(frame); + return; + } + + handleConnectionFrame(frame); } - @Override - public void onError(Throwable t) { - saveFramesSubscriber.onError(t); + void handleTerminal(@Nullable Throwable t) { + if (t != null) { + this.actual.onError(t); + } else { + this.actual.onComplete(); + } } - @Override - public void onComplete() { - saveFramesSubscriber.onComplete(); + void handleConnectionFrame(ByteBuf frame) { + this.actual.onNext(frame); } - @Override - public void onNext(ByteBuf frame) { - final int state; - final boolean isResumable = isResumableFrame(frame); - boolean canBeStore = isResumable; - if (isResumable) { - final ArrayList frames = cachedFrames; - final int incomingFrameSize = frame.readableBytes(); - final int cacheLimit = this.cacheLimit; + void handleResumableFrame(ByteBuf frame) { + final Queue frames = this.cachedFrames; + final int incomingFrameSize = frame.readableBytes(); + final int cacheLimit = this.cacheLimit; - if (cacheLimit != Integer.MAX_VALUE) { - long availableSize = cacheLimit - cacheSize; - if (availableSize < incomingFrameSize) { - int removedBytes = 0; - synchronized (this) { - while (availableSize < incomingFrameSize) { - if (frames.size() == 0) { - break; - } - ByteBuf cachedFrame; - cachedFrame = frames.remove(0); - final int frameSize = cachedFrame.readableBytes(); - availableSize += frameSize; - removedBytes += frameSize; - cachedFrame.release(); - } - } - CACHE_SIZE.addAndGet(this, -removedBytes); - - canBeStore = availableSize >= incomingFrameSize; - POSITION.addAndGet(this, removedBytes + (canBeStore ? 0 : incomingFrameSize)); + final boolean canBeStore; + int cacheSize = this.cacheSize; + if (cacheLimit != Integer.MAX_VALUE) { + final long availableSize = cacheLimit - cacheSize; + + if (availableSize < incomingFrameSize) { + final long firstAvailableFramePosition = this.firstAvailableFramePosition; + final long toRemoveBytes = incomingFrameSize - availableSize; + final int removedBytes = dropFramesFromCache(toRemoveBytes, frames); + + cacheSize = cacheSize - removedBytes; + canBeStore = removedBytes >= toRemoveBytes; + + if (canBeStore) { + FIRST_AVAILABLE_FRAME_POSITION.lazySet(this, firstAvailableFramePosition + removedBytes); } else { - canBeStore = true; + this.cacheSize = cacheSize; + FIRST_AVAILABLE_FRAME_POSITION.lazySet( + this, firstAvailableFramePosition + removedBytes + incomingFrameSize); } } else { canBeStore = true; } + } else { + canBeStore = true; + } - state = this.state; - if (canBeStore) { - synchronized (this) { - if (state != 2) { - frames.add(frame); - } - } + if (canBeStore) { + frames.offer(frame); - if (cacheLimit != Integer.MAX_VALUE) { - CACHE_SIZE.addAndGet(this, incomingFrameSize); - } + if (cacheLimit != Integer.MAX_VALUE) { + this.cacheSize = cacheSize + incomingFrameSize; } - } else { - state = this.state; } - final CoreSubscriber actual = this.actual; - if (state == 1) { - actual.onNext(isResumable && canBeStore ? frame.retainedSlice() : frame); - } else if (!isResumable || !canBeStore || state == 2) { - frame.release(); - } + this.actual.onNext(canBeStore ? frame.retainedSlice() : frame); } @Override @@ -298,30 +463,377 @@ public void request(long n) {} @Override public void cancel() { pauseImplied(); - state = 0; + markDisconnected(this); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[{}]|Session[{}]. Disconnected at Position[{}] and ImpliedPosition[{}]", + side, + session, + firstAvailableFramePosition, + frameImpliedPosition()); + } } @Override public void subscribe(CoreSubscriber actual) { - final int state = this.state; - if (state != 2) { - resumeImplied(); - logger.debug( - "Tag: {}. Subscribed at Position[{}] and ImpliedPosition[{}]", - tag, - position, - impliedPosition); - actual.onSubscribe(this); - synchronized (this) { - for (final ByteBuf frame : cachedFrames) { - actual.onNext(frame.retainedSlice()); + resumeImplied(); + actual.onSubscribe(this); + this.pendingActual = actual; + + final long previousState = markPendingConnection(this); + if (isDisposed(previousState)) { + actual.onError(new CancellationException("Disposed")); + return; + } + + if (isTerminated(previousState)) { + actual.onError(new CancellationException("Disposed")); + return; + } + + if (isWorkInProgress(previousState)) { + return; + } + + drain((previousState + 1) | PENDING_CONNECTION_FLAG); + } + + static class FramesSubscriber + implements CoreSubscriber, Fuseable.QueueSubscription { + + final CoreSubscriber actual; + final InMemoryResumableFramesStore parent; + + Fuseable.QueueSubscription qs; + + boolean done; + + FramesSubscriber(CoreSubscriber actual, InMemoryResumableFramesStore parent) { + this.actual = actual; + this.parent = parent; + } + + @Override + @SuppressWarnings("unchecked") + public void onSubscribe(Subscription s) { + if (Operators.validate(this.qs, s)) { + final Fuseable.QueueSubscription qs = (Fuseable.QueueSubscription) s; + this.qs = qs; + + final int m = qs.requestFusion(Fuseable.ANY); + + if (m != Fuseable.ASYNC) { + s.cancel(); + this.actual.onSubscribe(this); + this.actual.onError(new IllegalStateException("Source has to be ASYNC fuseable")); + return; } + + this.actual.onSubscribe(this); } + } - this.actual = actual; - STATE.compareAndSet(this, 0, 1); - } else { - Operators.complete(actual); + @Override + public void onNext(ByteBuf byteBuf) { + final InMemoryResumableFramesStore parent = this.parent; + long previousState = InMemoryResumableFramesStore.markFrameAdded(parent); + + if (isFinalized(previousState)) { + this.qs.clear(); + return; + } + + if (isWorkInProgress(previousState) + || (!isConnected(previousState) && !hasPendingConnection(previousState))) { + return; + } + + parent.drain(previousState + 1); } + + @Override + public void onError(Throwable t) { + if (this.done) { + Operators.onErrorDropped(t, this.actual.currentContext()); + return; + } + + final InMemoryResumableFramesStore parent = this.parent; + + parent.terminal = t; + this.done = true; + + final long previousState = InMemoryResumableFramesStore.markTerminated(parent); + if (isFinalized(previousState)) { + Operators.onErrorDropped(t, this.actual.currentContext()); + return; + } + + if (isWorkInProgress(previousState)) { + return; + } + + parent.drain(previousState | TERMINATED_FLAG); + } + + @Override + public void onComplete() { + if (this.done) { + return; + } + + final InMemoryResumableFramesStore parent = this.parent; + + this.done = true; + + final long previousState = InMemoryResumableFramesStore.markTerminated(parent); + if (isFinalized(previousState)) { + return; + } + + if (isWorkInProgress(previousState)) { + return; + } + + parent.drain(previousState | TERMINATED_FLAG); + } + + @Override + public void cancel() { + if (this.done) { + return; + } + + this.done = true; + + final long previousState = InMemoryResumableFramesStore.markTerminated(parent); + if (isFinalized(previousState)) { + return; + } + + if (isWorkInProgress(previousState)) { + return; + } + + parent.drain(previousState | TERMINATED_FLAG); + } + + @Override + public void request(long n) {} + + @Override + public int requestFusion(int requestedMode) { + return Fuseable.NONE; + } + + @Override + public Void poll() { + return null; + } + + @Override + public int size() { + return 0; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public void clear() {} + } + + static long markFrameAdded(InMemoryResumableFramesStore store) { + for (; ; ) { + final long state = store.state; + + if (isFinalized(state)) { + return state; + } + + long nextState = state; + if (isConnected(state) || hasPendingConnection(state) || isWorkInProgress(state)) { + nextState = + (state & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? nextState : nextState + 1; + } + + if (STATE.compareAndSet(store, state, nextState | HAS_FRAME_FLAG)) { + return state; + } + } + } + + static long markPendingConnection(InMemoryResumableFramesStore store) { + for (; ; ) { + final long state = store.state; + + if (isFinalized(state) || isDisposed(state) || isTerminated(state)) { + return state; + } + + if (isConnected(state)) { + return state; + } + + final long nextState = + (state & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? state : state + 1; + if (STATE.compareAndSet(store, state, nextState | PENDING_CONNECTION_FLAG)) { + return state; + } + } + } + + static long markRemoteImpliedPositionChanged(InMemoryResumableFramesStore store) { + for (; ; ) { + final long state = store.state; + + if (isFinalized(state)) { + return state; + } + + final long nextState = + (state & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? state : (state + 1); + if (STATE.compareAndSet(store, state, nextState | REMOTE_IMPLIED_POSITION_CHANGED_FLAG)) { + return state; + } + } + } + + static long markDisconnected(InMemoryResumableFramesStore store) { + for (; ; ) { + final long state = store.state; + + if (isFinalized(state)) { + return state; + } + + if (STATE.compareAndSet(store, state, state & ~CONNECTED_FLAG & ~PENDING_CONNECTION_FLAG)) { + return state; + } + } + } + + static long markWorkDone(InMemoryResumableFramesStore store, long expectedState) { + for (; ; ) { + final long state = store.state; + + if (expectedState != state) { + return state; + } + + if (isFinalized(state)) { + return state; + } + + final long nextState = state & ~MAX_WORK_IN_PROGRESS & ~REMOTE_IMPLIED_POSITION_CHANGED_FLAG; + if (STATE.compareAndSet(store, state, nextState)) { + return nextState; + } + } + } + + static long markConnected(InMemoryResumableFramesStore store, long expectedState) { + for (; ; ) { + final long state = store.state; + + if (state != expectedState) { + return state; + } + + if (isFinalized(state)) { + return state; + } + + final long nextState = state ^ PENDING_CONNECTION_FLAG | CONNECTED_FLAG; + if (STATE.compareAndSet(store, state, nextState)) { + return nextState; + } + } + } + + static long markTerminated(InMemoryResumableFramesStore store) { + for (; ; ) { + final long state = store.state; + + if (isFinalized(state)) { + return state; + } + + final long nextState = + (state & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? state : (state + 1); + if (STATE.compareAndSet(store, state, nextState | TERMINATED_FLAG)) { + return state; + } + } + } + + static long markDisposed(InMemoryResumableFramesStore store) { + for (; ; ) { + final long state = store.state; + + if (isFinalized(state)) { + return state; + } + + final long nextState = + (state & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? state : (state + 1); + if (STATE.compareAndSet(store, state, nextState | DISPOSED_FLAG)) { + return state; + } + } + } + + static void clearAndFinalize(InMemoryResumableFramesStore store) { + final Fuseable.QueueSubscription qs = store.framesSubscriber.qs; + for (; ; ) { + final long state = store.state; + + qs.clear(); + store.clearCache(); + + if (isFinalized(state)) { + return; + } + + if (STATE.compareAndSet(store, state, state | FINALIZED_FLAG & ~MAX_WORK_IN_PROGRESS)) { + store.disposed.tryEmitEmpty(); + store.framesSubscriber.onComplete(); + return; + } + } + } + + static boolean isConnected(long state) { + return (state & CONNECTED_FLAG) == CONNECTED_FLAG; + } + + static boolean hasRemoteImpliedPositionChanged(long state) { + return (state & REMOTE_IMPLIED_POSITION_CHANGED_FLAG) == REMOTE_IMPLIED_POSITION_CHANGED_FLAG; + } + + static boolean hasPendingConnection(long state) { + return (state & PENDING_CONNECTION_FLAG) == PENDING_CONNECTION_FLAG; + } + + static boolean hasFrames(long state) { + return (state & HAS_FRAME_FLAG) == HAS_FRAME_FLAG; + } + + static boolean isTerminated(long state) { + return (state & TERMINATED_FLAG) == TERMINATED_FLAG; + } + + static boolean isDisposed(long state) { + return (state & DISPOSED_FLAG) == DISPOSED_FLAG; + } + + static boolean isFinalized(long state) { + return (state & FINALIZED_FLAG) == FINALIZED_FLAG; + } + + static boolean isWorkInProgress(long state) { + return (state & MAX_WORK_IN_PROGRESS) > 0; } } diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java index 6e90e6d63..18cd7167a 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java @@ -18,8 +18,11 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.util.CharsetUtil; import io.rsocket.DuplexConnection; import io.rsocket.RSocketErrorException; +import io.rsocket.exceptions.ConnectionCloseException; +import io.rsocket.exceptions.ConnectionErrorException; import io.rsocket.frame.FrameHeaderCodec; import io.rsocket.internal.UnboundedProcessor; import java.net.SocketAddress; @@ -35,13 +38,15 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.core.publisher.Sinks; +import reactor.util.annotation.Nullable; public class ResumableDuplexConnection extends Flux implements DuplexConnection, Subscription { static final Logger logger = LoggerFactory.getLogger(ResumableDuplexConnection.class); - final String tag; + final String side; + final String session; final ResumableFramesStore resumableFramesStore; final UnboundedProcessor savableFramesSender; @@ -66,8 +71,12 @@ public class ResumableDuplexConnection extends Flux int connectionIndex = 0; public ResumableDuplexConnection( - String tag, DuplexConnection initialConnection, ResumableFramesStore resumableFramesStore) { - this.tag = tag; + String side, + ByteBuf session, + DuplexConnection initialConnection, + ResumableFramesStore resumableFramesStore) { + this.side = side; + this.session = session.toString(CharsetUtil.UTF_8); this.onConnectionClosedSink = Sinks.unsafe().many().unicast().onBackpressureBuffer(); this.resumableFramesStore = resumableFramesStore; this.savableFramesSender = new UnboundedProcessor(); @@ -94,29 +103,51 @@ public boolean connect(DuplexConnection nextConnection) { } void initConnection(DuplexConnection nextConnection) { - logger.debug("Tag {}. Initializing connection {}", tag, nextConnection); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[{}]|Session[{}]. Connecting to DuplexConnection[{}]", + side, + session, + nextConnection); + } final int currentConnectionIndex = connectionIndex; final FrameReceivingSubscriber frameReceivingSubscriber = - new FrameReceivingSubscriber(tag, resumableFramesStore, receiveSubscriber); + new FrameReceivingSubscriber(side, resumableFramesStore, receiveSubscriber); this.connectionIndex = currentConnectionIndex + 1; this.activeReceivingSubscriber = frameReceivingSubscriber; - final Disposable disposable = + final Disposable resumeStreamSubscription = resumableFramesStore .resumeStream() - .subscribe(f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), f)); + .subscribe( + f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), f), + t -> sendErrorAndClose(new ConnectionErrorException(t.getMessage())), + () -> + sendErrorAndClose( + new ConnectionCloseException("Connection Closed Unexpectedly"))); nextConnection.receive().subscribe(frameReceivingSubscriber); nextConnection .onClose() .doFinally( __ -> { frameReceivingSubscriber.dispose(); - disposable.dispose(); + resumeStreamSubscription.dispose(); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[{}]|Session[{}]. Disconnected from DuplexConnection[{}]", + side, + session, + nextConnection); + } Sinks.EmitResult result = onConnectionClosedSink.tryEmitNext(currentConnectionIndex); if (!result.equals(Sinks.EmitResult.OK)) { - logger.error("Failed to notify session of closed connection: {}", result); + logger.error( + "Side[{}]|Session[{}]. Failed to notify session of closed connection: {}", + side, + session, + result); } }) .subscribe(); @@ -196,6 +227,10 @@ public Mono onClose() { @Override public void dispose() { + dispose(null); + } + + void dispose(@Nullable Throwable e) { final DuplexConnection activeConnection = ACTIVE_CONNECTION.getAndSet(this, DisposedConnection.INSTANCE); if (activeConnection == DisposedConnection.INSTANCE) { @@ -206,11 +241,20 @@ public void dispose() { activeConnection.dispose(); } + if (logger.isDebugEnabled()) { + logger.debug("Side[{}]|Session[{}]. Disposing...", side, session); + } + framesSaverDisposable.dispose(); activeReceivingSubscriber.dispose(); savableFramesSender.dispose(); onConnectionClosedSink.tryEmitComplete(); - onClose.tryEmitEmpty(); + + if (e != null) { + onClose.tryEmitError(e); + } else { + onClose.tryEmitEmpty(); + } } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java index b62c615f3..a57899cac 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java @@ -123,12 +123,15 @@ void doResume(long remotePos, long remoteImpliedPos, DuplexConnection nextDuplex long impliedPosition = resumableFramesStore.frameImpliedPosition(); long position = resumableFramesStore.framePosition(); - logger.debug( - "Resume FRAME received. ClientResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}, ServerResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}", - remoteImpliedPos, - remotePos, - impliedPosition, - position); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[server]|Session[{}]. Resume FRAME received. ClientResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}, ServerResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}", + resumeToken, + remoteImpliedPos, + remotePos, + impliedPosition, + position); + } for (; ; ) { final Subscription subscription = this.s; @@ -138,6 +141,7 @@ void doResume(long remotePos, long remoteImpliedPos, DuplexConnection nextDuplex final RejectedResumeException rejectedResumeException = new RejectedResumeException("resume_internal_error: Session Expired"); nextDuplexConnection.sendErrorAndClose(rejectedResumeException); + nextDuplexConnection.receive().subscribe().dispose(); return; } @@ -152,31 +156,47 @@ void doResume(long remotePos, long remoteImpliedPos, DuplexConnection nextDuplex if (position != remoteImpliedPos) { resumableFramesStore.releaseFrames(remoteImpliedPos); } - nextDuplexConnection.sendFrame( - 0, ResumeOkFrameCodec.encode(allocator, resumableFramesStore.frameImpliedPosition())); - logger.debug("ResumeOK Frame has been sent"); + nextDuplexConnection.sendFrame(0, ResumeOkFrameCodec.encode(allocator, impliedPosition)); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[server]|Session[{}]. ResumeOKFrame[impliedPosition[{}]] has been sent", + resumeToken, + impliedPosition); + } } catch (Throwable t) { logger.debug("Exception occurred while releasing frames in the frameStore", t); tryTimeoutSession(); nextDuplexConnection.sendErrorAndClose(new RejectedResumeException(t.getMessage(), t)); + nextDuplexConnection.receive().subscribe().dispose(); return; } if (resumableConnection.connect(nextDuplexConnection)) { keepAliveSupport.start(); - logger.debug("Session[{}] has been resumed successfully", resumeToken); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[server]|Session[{}]. Session has been resumed successfully", resumeToken); + } } else { - logger.debug("Session has already been expired. Terminating received connection"); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[server]|Session[{}]. Session has already been expired. Terminating received connection", + resumeToken); + } final ConnectionErrorException connectionErrorException = new ConnectionErrorException("resume_internal_error: Session Expired"); nextDuplexConnection.sendErrorAndClose(connectionErrorException); + nextDuplexConnection.receive().subscribe().dispose(); } } else { - logger.debug( - "Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}] and RemotePosition[{}] to be less or equal to LocalImpliedPosition[{}]. Terminating received connection", - remoteImpliedPos, - position, - remotePos, - impliedPosition); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[server]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}] and RemotePosition[{}] to be less or equal to LocalImpliedPosition[{}]. Terminating received connection", + resumeToken, + remoteImpliedPos, + position, + remotePos, + impliedPosition); + } tryTimeoutSession(); final RejectedResumeException rejectedResumeException = new RejectedResumeException( @@ -184,6 +204,7 @@ void doResume(long remotePos, long remoteImpliedPos, DuplexConnection nextDuplex "resumption_pos=[ remote: { pos: %d, impliedPos: %d }, local: { pos: %d, impliedPos: %d }]", remotePos, remoteImpliedPos, position, impliedPosition)); nextDuplexConnection.sendErrorAndClose(rejectedResumeException); + nextDuplexConnection.receive().subscribe().dispose(); } } diff --git a/rsocket-core/src/test/java/io/rsocket/resume/InMemoryResumeStoreTest.java b/rsocket-core/src/test/java/io/rsocket/resume/InMemoryResumeStoreTest.java index a595faa86..bba40d674 100644 --- a/rsocket-core/src/test/java/io/rsocket/resume/InMemoryResumeStoreTest.java +++ b/rsocket-core/src/test/java/io/rsocket/resume/InMemoryResumeStoreTest.java @@ -4,59 +4,123 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; +import io.rsocket.RaceTestConstants; +import io.rsocket.internal.UnboundedProcessor; +import io.rsocket.internal.subscriber.AssertSubscriber; import java.util.Arrays; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import reactor.core.Disposable; +import reactor.core.publisher.Hooks; +import reactor.test.util.RaceTestUtils; public class InMemoryResumeStoreTest { @Test void saveNonResumableFrame() { - InMemoryResumableFramesStore store = inMemoryStore(25); - ByteBuf frame1 = fakeConnectionFrame(10); - ByteBuf frame2 = fakeConnectionFrame(35); - store.saveFrames(Flux.just(frame1, frame2)).block(); + final InMemoryResumableFramesStore store = inMemoryStore(25); + final UnboundedProcessor sender = new UnboundedProcessor(); + + store.saveFrames(sender).subscribe(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + + final ByteBuf frame1 = fakeConnectionFrame(10); + final ByteBuf frame2 = fakeConnectionFrame(35); + + sender.onNext(frame1); + sender.onNext(frame2); + assertThat(store.cachedFrames.size()).isZero(); assertThat(store.cacheSize).isZero(); - assertThat(store.position).isZero(); + assertThat(store.firstAvailableFramePosition).isZero(); + + assertSubscriber.assertValueCount(2).values().forEach(ByteBuf::release); + assertThat(frame1.refCnt()).isZero(); assertThat(frame2.refCnt()).isZero(); } @Test void saveWithoutTailRemoval() { - InMemoryResumableFramesStore store = inMemoryStore(25); - ByteBuf frame = fakeResumableFrame(10); - store.saveFrames(Flux.just(frame)).block(); + final InMemoryResumableFramesStore store = inMemoryStore(25); + final UnboundedProcessor sender = new UnboundedProcessor(); + + store.saveFrames(sender).subscribe(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + + final ByteBuf frame = fakeResumableFrame(10); + + sender.onNext(frame); + assertThat(store.cachedFrames.size()).isEqualTo(1); assertThat(store.cacheSize).isEqualTo(frame.readableBytes()); - assertThat(store.position).isZero(); + assertThat(store.firstAvailableFramePosition).isZero(); + + assertSubscriber.assertValueCount(1).values().forEach(ByteBuf::release); + assertThat(frame.refCnt()).isOne(); } @Test void saveRemoveOneFromTail() { - InMemoryResumableFramesStore store = inMemoryStore(25); - ByteBuf frame1 = fakeResumableFrame(20); - ByteBuf frame2 = fakeResumableFrame(10); - store.saveFrames(Flux.just(frame1, frame2)).block(); + final InMemoryResumableFramesStore store = inMemoryStore(25); + final UnboundedProcessor sender = new UnboundedProcessor(); + + store.saveFrames(sender).subscribe(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + final ByteBuf frame1 = fakeResumableFrame(20); + final ByteBuf frame2 = fakeResumableFrame(10); + + sender.onNext(frame1); + sender.onNext(frame2); + assertThat(store.cachedFrames.size()).isOne(); assertThat(store.cacheSize).isEqualTo(frame2.readableBytes()); - assertThat(store.position).isEqualTo(frame1.readableBytes()); + assertThat(store.firstAvailableFramePosition).isEqualTo(frame1.readableBytes()); + + assertSubscriber.assertValueCount(2).values().forEach(ByteBuf::release); + assertThat(frame1.refCnt()).isZero(); assertThat(frame2.refCnt()).isOne(); } @Test void saveRemoveTwoFromTail() { - InMemoryResumableFramesStore store = inMemoryStore(25); - ByteBuf frame1 = fakeResumableFrame(10); - ByteBuf frame2 = fakeResumableFrame(10); - ByteBuf frame3 = fakeResumableFrame(20); - store.saveFrames(Flux.just(frame1, frame2, frame3)).block(); + final InMemoryResumableFramesStore store = inMemoryStore(25); + final UnboundedProcessor sender = new UnboundedProcessor(); + + store.saveFrames(sender).subscribe(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + + final ByteBuf frame1 = fakeResumableFrame(10); + final ByteBuf frame2 = fakeResumableFrame(10); + final ByteBuf frame3 = fakeResumableFrame(20); + + sender.onNext(frame1); + sender.onNext(frame2); + sender.onNext(frame3); + assertThat(store.cachedFrames.size()).isOne(); assertThat(store.cacheSize).isEqualTo(frame3.readableBytes()); - assertThat(store.position).isEqualTo(size(frame1, frame2)); + assertThat(store.firstAvailableFramePosition).isEqualTo(size(frame1, frame2)); + + assertSubscriber.assertValueCount(3).values().forEach(ByteBuf::release); + assertThat(frame1.refCnt()).isZero(); assertThat(frame2.refCnt()).isZero(); assertThat(frame3.refCnt()).isOne(); @@ -64,14 +128,27 @@ void saveRemoveTwoFromTail() { @Test void saveBiggerThanStore() { - InMemoryResumableFramesStore store = inMemoryStore(25); - ByteBuf frame1 = fakeResumableFrame(10); - ByteBuf frame2 = fakeResumableFrame(10); - ByteBuf frame3 = fakeResumableFrame(30); - store.saveFrames(Flux.just(frame1, frame2, frame3)).block(); + final InMemoryResumableFramesStore store = inMemoryStore(25); + final UnboundedProcessor sender = new UnboundedProcessor(); + + store.saveFrames(sender).subscribe(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + final ByteBuf frame1 = fakeResumableFrame(10); + final ByteBuf frame2 = fakeResumableFrame(10); + final ByteBuf frame3 = fakeResumableFrame(30); + + sender.onNext(frame1); + sender.onNext(frame2); + sender.onNext(frame3); + assertThat(store.cachedFrames.size()).isZero(); assertThat(store.cacheSize).isZero(); - assertThat(store.position).isEqualTo(size(frame1, frame2, frame3)); + assertThat(store.firstAvailableFramePosition).isEqualTo(size(frame1, frame2, frame3)); + + assertSubscriber.assertValueCount(3).values().forEach(ByteBuf::release); + assertThat(frame1.refCnt()).isZero(); assertThat(frame2.refCnt()).isZero(); assertThat(frame3.refCnt()).isZero(); @@ -79,15 +156,30 @@ void saveBiggerThanStore() { @Test void releaseFrames() { - InMemoryResumableFramesStore store = inMemoryStore(100); - ByteBuf frame1 = fakeResumableFrame(10); - ByteBuf frame2 = fakeResumableFrame(10); - ByteBuf frame3 = fakeResumableFrame(30); - store.saveFrames(Flux.just(frame1, frame2, frame3)).block(); + final InMemoryResumableFramesStore store = inMemoryStore(100); + + final UnboundedProcessor producer = new UnboundedProcessor(); + store.saveFrames(producer).subscribe(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + + final ByteBuf frame1 = fakeResumableFrame(10); + final ByteBuf frame2 = fakeResumableFrame(10); + final ByteBuf frame3 = fakeResumableFrame(30); + + producer.onNext(frame1); + producer.onNext(frame2); + producer.onNext(frame3); + store.releaseFrames(20); + assertThat(store.cachedFrames.size()).isOne(); assertThat(store.cacheSize).isEqualTo(frame3.readableBytes()); - assertThat(store.position).isEqualTo(size(frame1, frame2)); + assertThat(store.firstAvailableFramePosition).isEqualTo(size(frame1, frame2)); + + assertSubscriber.assertValueCount(3).values().forEach(ByteBuf::release); + assertThat(frame1.refCnt()).isZero(); assertThat(frame2.refCnt()).isZero(); assertThat(frame3.refCnt()).isOne(); @@ -95,20 +187,350 @@ void releaseFrames() { @Test void receiveImpliedPosition() { - InMemoryResumableFramesStore store = inMemoryStore(100); + final InMemoryResumableFramesStore store = inMemoryStore(100); + ByteBuf frame1 = fakeResumableFrame(10); ByteBuf frame2 = fakeResumableFrame(30); + store.resumableFrameReceived(frame1); store.resumableFrameReceived(frame2); + assertThat(store.frameImpliedPosition()).isEqualTo(size(frame1, frame2)); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void ensuresCleansOnTerminal(boolean hasSubscriber) { + final InMemoryResumableFramesStore store = inMemoryStore(100); + + final UnboundedProcessor producer = new UnboundedProcessor(); + store.saveFrames(producer).subscribe(); + + final AssertSubscriber assertSubscriber = + hasSubscriber ? store.resumeStream().subscribeWith(AssertSubscriber.create()) : null; + + final ByteBuf frame1 = fakeResumableFrame(10); + final ByteBuf frame2 = fakeResumableFrame(10); + final ByteBuf frame3 = fakeResumableFrame(30); + + producer.onNext(frame1); + producer.onNext(frame2); + producer.onNext(frame3); + producer.onComplete(); + + assertThat(store.cachedFrames.size()).isZero(); + assertThat(store.cacheSize).isZero(); + + assertThat(producer.isDisposed()).isTrue(); + + if (hasSubscriber) { + assertSubscriber.assertValueCount(3).assertTerminated().values().forEach(ByteBuf::release); + } + + assertThat(frame1.refCnt()).isZero(); + assertThat(frame2.refCnt()).isZero(); + assertThat(frame3.refCnt()).isZero(); + } + + @Test + void ensuresCleansOnTerminalLateSubscriber() { + final InMemoryResumableFramesStore store = inMemoryStore(100); + + final UnboundedProcessor producer = new UnboundedProcessor(); + store.saveFrames(producer).subscribe(); + + final ByteBuf frame1 = fakeResumableFrame(10); + final ByteBuf frame2 = fakeResumableFrame(10); + final ByteBuf frame3 = fakeResumableFrame(30); + + producer.onNext(frame1); + producer.onNext(frame2); + producer.onNext(frame3); + producer.onComplete(); + + assertThat(store.cachedFrames.size()).isZero(); + assertThat(store.cacheSize).isZero(); + + assertThat(producer.isDisposed()).isTrue(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + assertSubscriber.assertTerminated(); + + assertThat(frame1.refCnt()).isZero(); + assertThat(frame2.refCnt()).isZero(); + assertThat(frame3.refCnt()).isZero(); + } + + @ParameterizedTest(name = "Sending vs Reconnect Race Test. WithLateSubscriber[{0}]") + @ValueSource(booleans = {true, false}) + void sendingVsReconnectRaceTest(boolean withLateSubscriber) { + for (int i = 0; i < RaceTestConstants.REPEATS; i++) { + final InMemoryResumableFramesStore store = inMemoryStore(Integer.MAX_VALUE); + final UnboundedProcessor frames = new UnboundedProcessor(); + final BlockingQueue receivedFrames = new ArrayBlockingQueue<>(10); + final AtomicInteger receivedPosition = new AtomicInteger(); + + store.saveFrames(frames).subscribe(); + + final Consumer consumer = + f -> { + if (ResumableDuplexConnection.isResumableFrame(f)) { + receivedPosition.addAndGet(f.readableBytes()); + receivedFrames.offer(f); + return; + } + f.release(); + }; + final AtomicReference disposableReference = + new AtomicReference<>( + withLateSubscriber ? null : store.resumeStream().subscribe(consumer)); + + final ByteBuf byteBuf1 = fakeResumableFrame(5); + final ByteBuf byteBuf11 = fakeConnectionFrame(5); + final ByteBuf byteBuf2 = fakeResumableFrame(6); + final ByteBuf byteBuf21 = fakeConnectionFrame(5); + final ByteBuf byteBuf3 = fakeResumableFrame(7); + final ByteBuf byteBuf31 = fakeConnectionFrame(5); + final ByteBuf byteBuf4 = fakeResumableFrame(8); + final ByteBuf byteBuf41 = fakeConnectionFrame(5); + final ByteBuf byteBuf5 = fakeResumableFrame(25); + final ByteBuf byteBuf51 = fakeConnectionFrame(35); + + RaceTestUtils.race( + () -> { + if (withLateSubscriber) { + disposableReference.set(store.resumeStream().subscribe(consumer)); + } + + // disconnect + disposableReference.get().dispose(); + + while (InMemoryResumableFramesStore.isWorkInProgress(store.state)) { + // ignore + } + + // mimic RESUME_OK frame received + store.releaseFrames(receivedPosition.get()); + disposableReference.set(store.resumeStream().subscribe(consumer)); + + // disconnect + disposableReference.get().dispose(); + + while (InMemoryResumableFramesStore.isWorkInProgress(store.state)) { + // ignore + } + + // mimic RESUME_OK frame received + store.releaseFrames(receivedPosition.get()); + disposableReference.set(store.resumeStream().subscribe(consumer)); + }, + () -> { + frames.onNext(byteBuf1); + frames.onNextPrioritized(byteBuf11); + frames.onNext(byteBuf2); + frames.onNext(byteBuf3); + frames.onNextPrioritized(byteBuf31); + frames.onNext(byteBuf4); + frames.onNext(byteBuf5); + }, + () -> { + frames.onNextPrioritized(byteBuf21); + frames.onNextPrioritized(byteBuf41); + frames.onNextPrioritized(byteBuf51); + }); + + store.releaseFrames(receivedFrames.stream().mapToInt(ByteBuf::readableBytes).sum()); + + assertThat(store.cacheSize).isZero(); + assertThat(store.cachedFrames).isEmpty(); + + assertThat(receivedFrames) + .hasSize(5) + .containsSequence(byteBuf1, byteBuf2, byteBuf3, byteBuf4, byteBuf5); + receivedFrames.forEach(ReferenceCounted::release); + + assertThat(byteBuf1.refCnt()).isZero(); + assertThat(byteBuf11.refCnt()).isZero(); + assertThat(byteBuf2.refCnt()).isZero(); + assertThat(byteBuf21.refCnt()).isZero(); + assertThat(byteBuf3.refCnt()).isZero(); + assertThat(byteBuf31.refCnt()).isZero(); + assertThat(byteBuf4.refCnt()).isZero(); + assertThat(byteBuf41.refCnt()).isZero(); + assertThat(byteBuf5.refCnt()).isZero(); + assertThat(byteBuf51.refCnt()).isZero(); + } + } + + @ParameterizedTest( + name = "Sending vs Reconnect with incorrect position Race Test. WithLateSubscriber[{0}]") + @ValueSource(booleans = {true, false}) + void incorrectReleaseFramesWithOnNextRaceTest(boolean withLateSubscriber) { + Hooks.onErrorDropped(t -> {}); + try { + for (int i = 0; i < RaceTestConstants.REPEATS; i++) { + final InMemoryResumableFramesStore store = inMemoryStore(Integer.MAX_VALUE); + final UnboundedProcessor frames = new UnboundedProcessor(); + + store.saveFrames(frames).subscribe(); + + final AtomicInteger terminationCnt = new AtomicInteger(); + final Consumer consumer = ReferenceCounted::release; + final Consumer errorConsumer = __ -> terminationCnt.incrementAndGet(); + final AtomicReference disposableReference = + new AtomicReference<>( + withLateSubscriber + ? null + : store.resumeStream().subscribe(consumer, errorConsumer)); + + final ByteBuf byteBuf1 = fakeResumableFrame(5); + final ByteBuf byteBuf11 = fakeConnectionFrame(5); + final ByteBuf byteBuf2 = fakeResumableFrame(6); + final ByteBuf byteBuf21 = fakeConnectionFrame(5); + final ByteBuf byteBuf3 = fakeResumableFrame(7); + final ByteBuf byteBuf31 = fakeConnectionFrame(5); + final ByteBuf byteBuf4 = fakeResumableFrame(8); + final ByteBuf byteBuf41 = fakeConnectionFrame(5); + final ByteBuf byteBuf5 = fakeResumableFrame(25); + final ByteBuf byteBuf51 = fakeConnectionFrame(35); + + RaceTestUtils.race( + () -> { + if (withLateSubscriber) { + disposableReference.set(store.resumeStream().subscribe(consumer, errorConsumer)); + } + // disconnect + disposableReference.get().dispose(); + + // mimic RESUME_OK frame received but with incorrect position + store.releaseFrames(25); + disposableReference.set(store.resumeStream().subscribe(consumer, errorConsumer)); + }, + () -> { + frames.onNext(byteBuf1); + frames.onNextPrioritized(byteBuf11); + frames.onNext(byteBuf2); + frames.onNext(byteBuf3); + frames.onNextPrioritized(byteBuf31); + frames.onNext(byteBuf4); + frames.onNext(byteBuf5); + }, + () -> { + frames.onNextPrioritized(byteBuf21); + frames.onNextPrioritized(byteBuf41); + frames.onNextPrioritized(byteBuf51); + }); + + assertThat(store.cacheSize).isZero(); + assertThat(store.cachedFrames).isEmpty(); + assertThat(disposableReference.get().isDisposed()).isTrue(); + assertThat(terminationCnt).hasValue(1); + + assertThat(byteBuf1.refCnt()).isZero(); + assertThat(byteBuf11.refCnt()).isZero(); + assertThat(byteBuf2.refCnt()).isZero(); + assertThat(byteBuf21.refCnt()).isZero(); + assertThat(byteBuf3.refCnt()).isZero(); + assertThat(byteBuf31.refCnt()).isZero(); + assertThat(byteBuf4.refCnt()).isZero(); + assertThat(byteBuf41.refCnt()).isZero(); + assertThat(byteBuf5.refCnt()).isZero(); + assertThat(byteBuf51.refCnt()).isZero(); + } + } finally { + Hooks.resetOnErrorDropped(); + } + } + + @ParameterizedTest( + name = + "Dispose vs Sending vs Reconnect with incorrect position Race Test. WithLateSubscriber[{0}]") + @ValueSource(booleans = {true, false}) + void incorrectReleaseFramesWithOnNextWithDisposeRaceTest(boolean withLateSubscriber) { + Hooks.onErrorDropped(t -> {}); + try { + for (int i = 0; i < RaceTestConstants.REPEATS; i++) { + final InMemoryResumableFramesStore store = inMemoryStore(Integer.MAX_VALUE); + final UnboundedProcessor frames = new UnboundedProcessor(); + + store.saveFrames(frames).subscribe(); + + final AtomicInteger terminationCnt = new AtomicInteger(); + final Consumer consumer = ReferenceCounted::release; + final Consumer errorConsumer = __ -> terminationCnt.incrementAndGet(); + final AtomicReference disposableReference = + new AtomicReference<>( + withLateSubscriber + ? null + : store.resumeStream().subscribe(consumer, errorConsumer)); + + final ByteBuf byteBuf1 = fakeResumableFrame(5); + final ByteBuf byteBuf11 = fakeConnectionFrame(5); + final ByteBuf byteBuf2 = fakeResumableFrame(6); + final ByteBuf byteBuf21 = fakeConnectionFrame(5); + final ByteBuf byteBuf3 = fakeResumableFrame(7); + final ByteBuf byteBuf31 = fakeConnectionFrame(5); + final ByteBuf byteBuf4 = fakeResumableFrame(8); + final ByteBuf byteBuf41 = fakeConnectionFrame(5); + final ByteBuf byteBuf5 = fakeResumableFrame(25); + final ByteBuf byteBuf51 = fakeConnectionFrame(35); + + RaceTestUtils.race( + () -> { + if (withLateSubscriber) { + disposableReference.set(store.resumeStream().subscribe(consumer, errorConsumer)); + } + // disconnect + disposableReference.get().dispose(); + + // mimic RESUME_OK frame received but with incorrect position + store.releaseFrames(25); + disposableReference.set(store.resumeStream().subscribe(consumer, errorConsumer)); + }, + () -> { + frames.onNext(byteBuf1); + frames.onNextPrioritized(byteBuf11); + frames.onNext(byteBuf2); + frames.onNext(byteBuf3); + frames.onNextPrioritized(byteBuf31); + frames.onNext(byteBuf4); + frames.onNext(byteBuf5); + }, + () -> { + frames.onNextPrioritized(byteBuf21); + frames.onNextPrioritized(byteBuf41); + frames.onNextPrioritized(byteBuf51); + }, + store::dispose); + + assertThat(store.cacheSize).isZero(); + assertThat(store.cachedFrames).isEmpty(); + assertThat(disposableReference.get().isDisposed()).isTrue(); + assertThat(terminationCnt).hasValueGreaterThanOrEqualTo(1).hasValueLessThanOrEqualTo(2); + + assertThat(byteBuf1.refCnt()).isZero(); + assertThat(byteBuf11.refCnt()).isZero(); + assertThat(byteBuf2.refCnt()).isZero(); + assertThat(byteBuf21.refCnt()).isZero(); + assertThat(byteBuf3.refCnt()).isZero(); + assertThat(byteBuf31.refCnt()).isZero(); + assertThat(byteBuf4.refCnt()).isZero(); + assertThat(byteBuf41.refCnt()).isZero(); + assertThat(byteBuf5.refCnt()).isZero(); + assertThat(byteBuf51.refCnt()).isZero(); + } + } finally { + Hooks.resetOnErrorDropped(); + } + } + private int size(ByteBuf... byteBufs) { return Arrays.stream(byteBufs).mapToInt(ByteBuf::readableBytes).sum(); } private static InMemoryResumableFramesStore inMemoryStore(int size) { - return new InMemoryResumableFramesStore("test", size); + return new InMemoryResumableFramesStore("test", Unpooled.EMPTY_BUFFER, size); } private static ByteBuf fakeResumableFrame(int size) { diff --git a/rsocket-examples/src/test/java/io/rsocket/resume/ResumeIntegrationTest.java b/rsocket-examples/src/test/java/io/rsocket/resume/ResumeIntegrationTest.java index b2dad0022..5eb78fabe 100644 --- a/rsocket-examples/src/test/java/io/rsocket/resume/ResumeIntegrationTest.java +++ b/rsocket-examples/src/test/java/io/rsocket/resume/ResumeIntegrationTest.java @@ -182,7 +182,7 @@ private static Mono newClientRSocket( .resume( new Resume() .sessionDuration(Duration.ofSeconds(sessionDurationSeconds)) - .storeFactory(t -> new InMemoryResumableFramesStore("client", 500_000)) + .storeFactory(t -> new InMemoryResumableFramesStore("client", t, 500_000)) .cleanupStoreOnKeepAlive() .retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1)))) .keepAlive(Duration.ofSeconds(5), Duration.ofMinutes(5)) @@ -199,7 +199,7 @@ private static Mono newServerRSocket(int sessionDurationSecond new Resume() .sessionDuration(Duration.ofSeconds(sessionDurationSeconds)) .cleanupStoreOnKeepAlive() - .storeFactory(t -> new InMemoryResumableFramesStore("server", 500_000))) + .storeFactory(t -> new InMemoryResumableFramesStore("server", t, 500_000))) .bind(serverTransport(SERVER_HOST, SERVER_PORT)); } @@ -212,7 +212,7 @@ public Flux requestChannel(Publisher payloads) { return duplicate( Flux.interval(Duration.ofMillis(1)) .onBackpressureLatest() - .publishOn(Schedulers.elastic()), + .publishOn(Schedulers.boundedElastic()), 20) .map(v -> DefaultPayload.create(String.valueOf(counter.getAndIncrement()))) .takeUntilOther(Flux.from(payloads).then()); diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index 0bae8cd69..5384c7e8d 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -51,7 +51,6 @@ import org.assertj.core.api.Assertions; import org.assertj.core.api.Assumptions; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; @@ -60,7 +59,6 @@ import reactor.core.Exceptions; import reactor.core.Fuseable; import reactor.core.publisher.Flux; -import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Operators; @@ -93,14 +91,8 @@ static String read(String resourceName) { } } - @BeforeEach - default void setUp() { - Hooks.onOperatorDebug(); - } - @AfterEach default void close() { - Hooks.resetOnOperatorDebug(); getTransportPair().responder.awaitAllInteractionTermination(getTimeout()); getTransportPair().dispose(); getTransportPair().awaitClosed(); @@ -547,7 +539,7 @@ public TransportPair( "Server", duplexConnection, Duration.ofMillis( - ThreadLocalRandom.current().nextInt(10, 1500))) + ThreadLocalRandom.current().nextInt(100, 1000))) : duplexConnection); } }); @@ -555,7 +547,8 @@ public TransportPair( if (withResumability) { rSocketServer.resume( new Resume() - .storeFactory(__ -> new InMemoryResumableFramesStore("server", Integer.MAX_VALUE))); + .storeFactory( + token -> new InMemoryResumableFramesStore("server", token, Integer.MAX_VALUE))); } if (withRandomFragmentation) { @@ -568,7 +561,7 @@ public TransportPair( final RSocketConnector rSocketConnector = RSocketConnector.create() .payloadDecoder(PayloadDecoder.ZERO_COPY) - .keepAlive(Duration.ofMillis(Integer.MAX_VALUE), Duration.ofMillis(Integer.MAX_VALUE)) + .keepAlive(Duration.ofMillis(10), Duration.ofHours(1)) .interceptors( registry -> { if (runClientWithAsyncInterceptors && !withResumability) { @@ -594,7 +587,7 @@ public TransportPair( "Client", duplexConnection, Duration.ofMillis( - ThreadLocalRandom.current().nextInt(1, 2000))) + ThreadLocalRandom.current().nextInt(10, 1500))) : duplexConnection); } }); @@ -602,7 +595,8 @@ public TransportPair( if (withResumability) { rSocketConnector.resume( new Resume() - .storeFactory(__ -> new InMemoryResumableFramesStore("client", Integer.MAX_VALUE))); + .storeFactory( + token -> new InMemoryResumableFramesStore("client", token, Integer.MAX_VALUE))); } if (withRandomFragmentation) { diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java index 8bea7c682..51c812cc3 100644 --- a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java @@ -19,9 +19,7 @@ import io.rsocket.test.TransportTest; import java.time.Duration; import java.util.UUID; -import org.junit.jupiter.api.Disabled; -@Disabled("leaking somewhere for no clear reason") final class LocalResumableTransportTest implements TransportTest { private final TransportPair transportPair = diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableWithFragmentationTransportTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableWithFragmentationTransportTest.java new file mode 100644 index 000000000..124cecec9 --- /dev/null +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableWithFragmentationTransportTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.local; + +import io.rsocket.test.TransportTest; +import java.time.Duration; +import java.util.UUID; + +final class LocalResumableWithFragmentationTransportTest implements TransportTest { + + private final TransportPair transportPair = + new TransportPair<>( + () -> "test-" + UUID.randomUUID(), + (address, server, allocator) -> LocalClientTransport.create(address, allocator), + (address, allocator) -> LocalServerTransport.create(address), + true, + true); + + @Override + public Duration getTimeout() { + return Duration.ofSeconds(10); + } + + @Override + public TransportPair getTransportPair() { + return transportPair; + } +} diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java new file mode 100644 index 000000000..7d9d80542 --- /dev/null +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.netty; + +import io.netty.channel.ChannelOption; +import io.rsocket.test.TransportTest; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.TcpServerTransport; +import java.net.InetSocketAddress; +import java.time.Duration; +import reactor.netty.tcp.TcpClient; +import reactor.netty.tcp.TcpServer; + +final class TcpResumableWithFragmentationTransportTest implements TransportTest { + + private final TransportPair transportPair = + new TransportPair<>( + () -> InetSocketAddress.createUnresolved("localhost", 0), + (address, server, allocator) -> + TcpClientTransport.create( + TcpClient.create() + .remoteAddress(server::address) + .option(ChannelOption.ALLOCATOR, allocator)), + (address, allocator) -> + TcpServerTransport.create( + TcpServer.create() + .bindAddress(() -> address) + .option(ChannelOption.ALLOCATOR, allocator)), + true, + true); + + @Override + public Duration getTimeout() { + return Duration.ofMinutes(3); + } + + @Override + public TransportPair getTransportPair() { + return transportPair; + } +} diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java new file mode 100644 index 000000000..34dc99ae0 --- /dev/null +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java @@ -0,0 +1,58 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.netty; + +import io.netty.channel.ChannelOption; +import io.rsocket.test.TransportTest; +import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.transport.netty.server.WebsocketServerTransport; +import java.net.InetSocketAddress; +import java.time.Duration; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.server.HttpServer; + +final class WebsocketResumableTransportTest implements TransportTest { + + private final TransportPair transportPair = + new TransportPair<>( + () -> InetSocketAddress.createUnresolved("localhost", 0), + (address, server, allocator) -> + WebsocketClientTransport.create( + HttpClient.create() + .host(server.address().getHostName()) + .port(server.address().getPort()) + .option(ChannelOption.ALLOCATOR, allocator), + ""), + (address, allocator) -> + WebsocketServerTransport.create( + HttpServer.create() + .host(address.getHostName()) + .port(address.getPort()) + .option(ChannelOption.ALLOCATOR, allocator)), + false, + true); + + @Override + public Duration getTimeout() { + return Duration.ofMinutes(3); + } + + @Override + public TransportPair getTransportPair() { + return transportPair; + } +} diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java new file mode 100644 index 000000000..21c027e88 --- /dev/null +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java @@ -0,0 +1,58 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.netty; + +import io.netty.channel.ChannelOption; +import io.rsocket.test.TransportTest; +import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.transport.netty.server.WebsocketServerTransport; +import java.net.InetSocketAddress; +import java.time.Duration; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.server.HttpServer; + +final class WebsocketResumableWithFragmentationTransportTest implements TransportTest { + + private final TransportPair transportPair = + new TransportPair<>( + () -> InetSocketAddress.createUnresolved("localhost", 0), + (address, server, allocator) -> + WebsocketClientTransport.create( + HttpClient.create() + .host(server.address().getHostName()) + .port(server.address().getPort()) + .option(ChannelOption.ALLOCATOR, allocator), + ""), + (address, allocator) -> + WebsocketServerTransport.create( + HttpServer.create() + .host(address.getHostName()) + .port(address.getPort()) + .option(ChannelOption.ALLOCATOR, allocator)), + true, + true); + + @Override + public Duration getTimeout() { + return Duration.ofMinutes(3); + } + + @Override + public TransportPair getTransportPair() { + return transportPair; + } +} From e721aa0b37f4d077473228c786327e5b04a76989 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Wed, 2 Jun 2021 23:08:22 +0300 Subject: [PATCH 2/3] improves LocalDuplexConnection (#onClose notification + ByteBufs releases) At the moment, the onClose hook has no "wait until cleaned" logic, which leads to unpredicted behaviors when used with resumability or others scenarios where we need to wait until all the queues are cleaned and there are no other resources in use (e.g. ByteBufs). For that porpuse, this commit adds onFinalizeHook to the UnboundedProcessor so we can now listen when the UnboundedProcessor is finalized and only after that send the onClose signal Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- .../rsocket/internal/UnboundedProcessor.java | 10 +++++++ .../transport/local/LocalClientTransport.java | 15 ++++++---- .../local/LocalDuplexConnection.java | 30 +++++++++++-------- 3 files changed, 37 insertions(+), 18 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index 9e7500465..c3278a09c 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -45,6 +45,7 @@ public final class UnboundedProcessor extends FluxProcessor final Queue queue; final Queue priorityQueue; + final Runnable onFinalizedHook; boolean cancelled; boolean done; @@ -88,6 +89,11 @@ public final class UnboundedProcessor extends FluxProcessor boolean outputFused; public UnboundedProcessor() { + this(() -> {}); + } + + public UnboundedProcessor(Runnable onFinalizedHook) { + this.onFinalizedHook = onFinalizedHook; this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); this.priorityQueue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); } @@ -793,6 +799,9 @@ static long markTerminatedOrFinalized(UnboundedProcessor instance) { } if (STATE.compareAndSet(instance, state, nextState | FLAG_TERMINATED)) { + if (isFinalized(nextState)) { + instance.onFinalizedHook.run(); + } return state; } } @@ -906,6 +915,7 @@ static void clearAndFinalize(UnboundedProcessor instance) { if (STATE.compareAndSet( instance, state, (state & ~MAX_WIP_VALUE & ~FLAG_HAS_VALUE) | FLAG_FINALIZED)) { + instance.onFinalizedHook.run(); break; } } diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java index 588f772d3..113b7a2f8 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java @@ -77,14 +77,17 @@ public Mono connect() { return Mono.error(new IllegalArgumentException("Could not find server: " + name)); } - UnboundedProcessor in = new UnboundedProcessor(); - UnboundedProcessor out = new UnboundedProcessor(); - Sinks.Empty closeSink = Sinks.empty(); + Sinks.One inSink = Sinks.one(); + Sinks.One outSink = Sinks.one(); + UnboundedProcessor in = new UnboundedProcessor(() -> inSink.tryEmitValue(inSink)); + UnboundedProcessor out = new UnboundedProcessor(() -> outSink.tryEmitValue(outSink)); - server.apply(new LocalDuplexConnection(name, allocator, out, in, closeSink)).subscribe(); + Mono onClose = inSink.asMono().zipWith(outSink.asMono()).then(); - return Mono.just( - (DuplexConnection) new LocalDuplexConnection(name, allocator, in, out, closeSink)); + server.apply(new LocalDuplexConnection(name, allocator, out, in, onClose)).subscribe(); + + return Mono.just( + new LocalDuplexConnection(name, allocator, in, out, onClose)); }); } } diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java index 5e18aa4cc..5c395156c 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java @@ -27,11 +27,9 @@ import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Fuseable; -import reactor.core.Scannable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; -import reactor.core.publisher.Sinks; /** An implementation of {@link DuplexConnection} that connects inside the same JVM. */ final class LocalDuplexConnection implements DuplexConnection { @@ -40,7 +38,7 @@ final class LocalDuplexConnection implements DuplexConnection { private final ByteBufAllocator allocator; private final Flux in; - private final Sinks.Empty onClose; + private final Mono onClose; private final UnboundedProcessor out; @@ -58,7 +56,7 @@ final class LocalDuplexConnection implements DuplexConnection { ByteBufAllocator allocator, Flux in, UnboundedProcessor out, - Sinks.Empty onClose) { + Mono onClose) { this.address = new LocalSocketAddress(name); this.allocator = Objects.requireNonNull(allocator, "allocator must not be null"); this.in = Objects.requireNonNull(in, "in must not be null"); @@ -69,24 +67,23 @@ final class LocalDuplexConnection implements DuplexConnection { @Override public void dispose() { out.onComplete(); - onClose.tryEmitEmpty(); } @Override - @SuppressWarnings("ConstantConditions") public boolean isDisposed() { - return onClose.scan(Scannable.Attr.TERMINATED) || onClose.scan(Scannable.Attr.CANCELLED); + return out.isDisposed(); } @Override public Mono onClose() { - return onClose.asMono(); + return onClose; } @Override public Flux receive() { return in.transform( - Operators.lift((__, actual) -> new ByteBufReleaserOperator(actual))); + Operators.lift( + (__, actual) -> new ByteBufReleaserOperator(actual, this))); } @Override @@ -119,11 +116,14 @@ static class ByteBufReleaserOperator implements CoreSubscriber, Subscription, Fuseable.QueueSubscription { final CoreSubscriber actual; + final LocalDuplexConnection parent; Subscription s; - public ByteBufReleaserOperator(CoreSubscriber actual) { + public ByteBufReleaserOperator( + CoreSubscriber actual, LocalDuplexConnection parent) { this.actual = actual; + this.parent = parent; } @Override @@ -136,17 +136,22 @@ public void onSubscribe(Subscription s) { @Override public void onNext(ByteBuf buf) { - actual.onNext(buf); - buf.release(); + try { + actual.onNext(buf); + } finally { + buf.release(); + } } @Override public void onError(Throwable t) { + parent.out.onError(t); actual.onError(t); } @Override public void onComplete() { + parent.out.onComplete(); actual.onComplete(); } @@ -158,6 +163,7 @@ public void request(long n) { @Override public void cancel() { s.cancel(); + parent.out.onComplete(); } @Override From f70cd3556acc0064586e82f1363f0877fb1f4b76 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 1 Jun 2021 19:12:31 +0300 Subject: [PATCH 3/3] increase tests logging verbosity and fork every testclass on a new JVM Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- build.gradle | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index dcc133833..7cfc3a5bd 100644 --- a/build.gradle +++ b/build.gradle @@ -153,8 +153,9 @@ subprojects { test { useJUnitPlatform() testLogging { - events "FAILED" + events "PASSED", "FAILED" showExceptions true + showCauses true exceptionFormat "FULL" stackTraceFilters "ENTRY_POINT" maxGranularity 3 @@ -169,6 +170,8 @@ subprojects { } } + forkEvery = 1 + if (isCiServer) { def stdout = new LinkedList() beforeTest { TestDescriptor td ->