diff --git a/build.gradle b/build.gradle index dcc133833..7cfc3a5bd 100644 --- a/build.gradle +++ b/build.gradle @@ -153,8 +153,9 @@ subprojects { test { useJUnitPlatform() testLogging { - events "FAILED" + events "PASSED", "FAILED" showExceptions true + showCauses true exceptionFormat "FULL" stackTraceFilters "ENTRY_POINT" maxGranularity 3 @@ -169,6 +170,8 @@ subprojects { } } + forkEvery = 1 + if (isCiServer) { def stdout = new LinkedList() beforeTest { TestDescriptor td -> diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java index fe91cdb6f..edd13b48c 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java @@ -612,6 +612,7 @@ public Mono connect(Supplier transportSupplier) { final ResumableDuplexConnection resumableDuplexConnection = new ResumableDuplexConnection( CLIENT_TAG, + resumeToken, clientServerConnection, resumableFramesStore); final ResumableClientSetup resumableClientSetup = diff --git a/rsocket-core/src/main/java/io/rsocket/core/Resume.java b/rsocket-core/src/main/java/io/rsocket/core/Resume.java index 48133af98..fa0eedbfa 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/Resume.java +++ b/rsocket-core/src/main/java/io/rsocket/core/Resume.java @@ -160,7 +160,7 @@ boolean isCleanupStoreOnKeepAlive() { Function getStoreFactory(String tag) { return storeFactory != null ? storeFactory - : token -> new InMemoryResumableFramesStore(tag, 100_000); + : token -> new InMemoryResumableFramesStore(tag, token, 100_000); } Duration getStreamTimeout() { diff --git a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java index 318c54816..0b23bcde5 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java @@ -109,7 +109,8 @@ public Mono acceptRSocketSetup( final ResumableFramesStore resumableFramesStore = resumeStoreFactory.apply(resumeToken); final ResumableDuplexConnection resumableDuplexConnection = - new ResumableDuplexConnection("server", duplexConnection, resumableFramesStore); + new ResumableDuplexConnection( + "server", resumeToken, duplexConnection, resumableFramesStore); final ServerRSocketSession serverRSocketSession = new ServerRSocketSession( resumeToken, diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index 9e7500465..c3278a09c 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -45,6 +45,7 @@ public final class UnboundedProcessor extends FluxProcessor final Queue queue; final Queue priorityQueue; + final Runnable onFinalizedHook; boolean cancelled; boolean done; @@ -88,6 +89,11 @@ public final class UnboundedProcessor extends FluxProcessor boolean outputFused; public UnboundedProcessor() { + this(() -> {}); + } + + public UnboundedProcessor(Runnable onFinalizedHook) { + this.onFinalizedHook = onFinalizedHook; this.queue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); this.priorityQueue = new MpscUnboundedArrayQueue<>(Queues.SMALL_BUFFER_SIZE); } @@ -793,6 +799,9 @@ static long markTerminatedOrFinalized(UnboundedProcessor instance) { } if (STATE.compareAndSet(instance, state, nextState | FLAG_TERMINATED)) { + if (isFinalized(nextState)) { + instance.onFinalizedHook.run(); + } return state; } } @@ -906,6 +915,7 @@ static void clearAndFinalize(UnboundedProcessor instance) { if (STATE.compareAndSet( instance, state, (state & ~MAX_WIP_VALUE & ~FLAG_HAS_VALUE) | FLAG_FINALIZED)) { + instance.onFinalizedHook.run(); break; } } diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java index 9fd95ad17..c58cc4954 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java @@ -18,6 +18,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.util.CharsetUtil; import io.rsocket.DuplexConnection; import io.rsocket.exceptions.ConnectionErrorException; import io.rsocket.exceptions.Exceptions; @@ -54,6 +55,7 @@ public class ClientRSocketSession final Retry retry; final boolean cleanupStoreOnKeepAlive; final ByteBuf resumeToken; + final String session; volatile Subscription s; static final AtomicReferenceFieldUpdater S = @@ -71,20 +73,30 @@ public ClientRSocketSession( Retry retry, boolean cleanupStoreOnKeepAlive) { this.resumeToken = resumeToken; + this.session = resumeToken.toString(CharsetUtil.UTF_8); this.connectionFactory = connectionFactory.flatMap( dc -> { + final long impliedPosition = resumableFramesStore.frameImpliedPosition(); + final long position = resumableFramesStore.framePosition(); dc.sendFrame( 0, ResumeFrameCodec.encode( dc.alloc(), resumeToken.retain(), // server uses this to release its cache - resumableFramesStore.frameImpliedPosition(), // observed on the client side + impliedPosition, // observed on the client side // server uses this to check whether there is no mismatch - resumableFramesStore.framePosition() // sent from the client sent + position // sent from the client sent )); - logger.debug("Resume Frame has been sent"); + + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. ResumeFrame[impliedPosition[{}], position[{}]] has been sent.", + session, + impliedPosition, + position); + } return connectionTransformer.apply(dc); }); @@ -105,7 +117,12 @@ void reconnect(int index) { if (this.s == Operators.cancelledSubscription() && S.compareAndSet(this, Operators.cancelledSubscription(), null)) { keepAliveSupport.stop(); - logger.debug("Connection[" + index + "] is lost. Reconnecting to resume..."); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting to resume...", + session, + index); + } connectionFactory.retryWhen(retry).timeout(resumeSessionDuration).subscribe(this); } } @@ -155,21 +172,30 @@ public void onNext(Tuple2 tuple2) { DuplexConnection nextDuplexConnection = tuple2.getT2(); if (!Operators.terminate(S, this)) { - logger.debug("Session has already been expired. Terminating received connection"); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Session has already been expired. Terminating received connection", + session); + } final ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server=[Session Expired]"); nextDuplexConnection.sendErrorAndClose(connectionErrorException); + nextDuplexConnection.receive().subscribe().dispose(); return; } final int streamId = FrameHeaderCodec.streamId(shouldBeResumeOKFrame); if (streamId != 0) { - logger.debug( - "Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection"); - resumableConnection.dispose(); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection", + session); + } final ConnectionErrorException connectionErrorException = new ConnectionErrorException("RESUME_OK frame must be received before any others"); + resumableConnection.dispose(connectionErrorException); nextDuplexConnection.sendErrorAndClose(connectionErrorException); + nextDuplexConnection.receive().subscribe().dispose(); return; } @@ -183,7 +209,8 @@ public void onNext(Tuple2 tuple2) { final long position = resumableFramesStore.framePosition(); final long impliedPosition = resumableFramesStore.frameImpliedPosition(); logger.debug( - "ResumeOK FRAME received. ServerResumeState{observedFramesPosition[{}]}. ClientResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}", + "Side[client]|Session[{}]. ResumeOK FRAME received. ServerResumeState[remoteImpliedPosition[{}]]. ClientResumeState[impliedPosition[{}], position[{}]]", + session, remoteImpliedPos, impliedPosition, position); @@ -194,42 +221,54 @@ public void onNext(Tuple2 tuple2) { } } catch (IllegalStateException e) { logger.debug("Exception occurred while releasing frames in the frameStore", e); - resumableConnection.dispose(); + resumableConnection.dispose(e); final ConnectionErrorException t = new ConnectionErrorException(e.getMessage(), e); nextDuplexConnection.sendErrorAndClose(t); + nextDuplexConnection.receive().subscribe().dispose(); return; } if (resumableConnection.connect(nextDuplexConnection)) { keepAliveSupport.start(); - logger.debug("Session has been resumed successfully"); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Session has been resumed successfully", session); + } } else { - logger.debug("Session has already been expired. Terminating received connection"); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[client]|Session[{}]. Session has already been expired. Terminating received connection", + session); + } final ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server_pos=[Session Expired]"); nextDuplexConnection.sendErrorAndClose(connectionErrorException); + nextDuplexConnection.receive().subscribe().dispose(); } } else { logger.debug( - "Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection", + "Side[client]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection", + session, remoteImpliedPos, position); - resumableConnection.dispose(); final ConnectionErrorException connectionErrorException = new ConnectionErrorException("resumption_server_pos=[" + remoteImpliedPos + "]"); + resumableConnection.dispose(connectionErrorException); nextDuplexConnection.sendErrorAndClose(connectionErrorException); + nextDuplexConnection.receive().subscribe().dispose(); } } else if (frameType == FrameType.ERROR) { final RuntimeException exception = Exceptions.from(0, shouldBeResumeOKFrame); logger.debug("Received error frame. Terminating received connection", exception); - resumableConnection.dispose(); + resumableConnection.dispose(exception); } else { logger.debug( "Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection"); - resumableConnection.dispose(); final ConnectionErrorException connectionErrorException = new ConnectionErrorException("RESUME_OK frame must be received before any others"); + resumableConnection.dispose(connectionErrorException); nextDuplexConnection.sendErrorAndClose(connectionErrorException); + nextDuplexConnection.receive().subscribe().dispose(); } } @@ -239,7 +278,7 @@ public void onError(Throwable t) { Operators.onErrorDropped(t, currentContext()); } - resumableConnection.dispose(); + resumableConnection.dispose(t); } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java b/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java index 03516af92..87d82048d 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java @@ -19,118 +19,286 @@ import static io.rsocket.resume.ResumableDuplexConnection.isResumableFrame; import io.netty.buffer.ByteBuf; -import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import io.netty.util.CharsetUtil; +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CancellationException; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.CoreSubscriber; +import reactor.core.Fuseable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.core.publisher.Sinks; +import reactor.util.annotation.Nullable; /** * writes - n (where n is frequent, primary operation) reads - m (where m == KeepAliveFrequency) * skip - k -> 0 (where k is the rare operation which happens after disconnection */ public class InMemoryResumableFramesStore extends Flux - implements CoreSubscriber, ResumableFramesStore, Subscription { + implements ResumableFramesStore, Subscription { + private FramesSubscriber framesSubscriber; private static final Logger logger = LoggerFactory.getLogger(InMemoryResumableFramesStore.class); final Sinks.Empty disposed = Sinks.empty(); - final ArrayList cachedFrames; - final String tag; + final Queue cachedFrames; + final String side; + final String session; final int cacheLimit; volatile long impliedPosition; static final AtomicLongFieldUpdater IMPLIED_POSITION = AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "impliedPosition"); - volatile long position; - static final AtomicLongFieldUpdater POSITION = - AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "position"); + volatile long firstAvailableFramePosition; + static final AtomicLongFieldUpdater FIRST_AVAILABLE_FRAME_POSITION = + AtomicLongFieldUpdater.newUpdater( + InMemoryResumableFramesStore.class, "firstAvailableFramePosition"); - volatile int cacheSize; - static final AtomicIntegerFieldUpdater CACHE_SIZE = - AtomicIntegerFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "cacheSize"); + long remoteImpliedPosition; - CoreSubscriber saveFramesSubscriber; + int cacheSize; + + Throwable terminal; CoreSubscriber actual; + CoreSubscriber pendingActual; + + volatile long state; + static final AtomicLongFieldUpdater STATE = + AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "state"); /** - * Indicates whether there is an active connection or not. - * - *
    - *
  • 0 - no active connection - *
  • 1 - active connection - *
  • 2 - disposed - *
- * - *
-   * 0 <-----> 1
-   * |         |
-   * +--> 2 <--+
-   * 
+ * Flag which indicates that {@link InMemoryResumableFramesStore} is finalized and all related + * stores are cleaned */ - volatile int state; - - static final AtomicIntegerFieldUpdater STATE = - AtomicIntegerFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "state"); + static final long FINALIZED_FLAG = + 0b1000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** + * Flag which indicates that {@link InMemoryResumableFramesStore} is terminated via the {@link + * InMemoryResumableFramesStore#dispose()} method + */ + static final long DISPOSED_FLAG = + 0b0100_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** + * Flag which indicates that {@link InMemoryResumableFramesStore} is terminated via the {@link + * FramesSubscriber#onComplete()} or {@link FramesSubscriber#onError(Throwable)} ()} methods + */ + static final long TERMINATED_FLAG = + 0b0010_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** Flag which indicates that {@link InMemoryResumableFramesStore} has active frames consumer */ + static final long CONNECTED_FLAG = + 0b0001_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** + * Flag which indicates that {@link InMemoryResumableFramesStore} has no active frames consumer + * but there is a one pending + */ + static final long PENDING_CONNECTION_FLAG = + 0b0000_1000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** + * Flag which indicates that there are some received implied position changes from the remote + * party + */ + static final long REMOTE_IMPLIED_POSITION_CHANGED_FLAG = + 0b0000_0100_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** + * Flag which indicates that there are some frames stored in the {@link + * io.rsocket.internal.UnboundedProcessor} which has to be cached and sent to the remote party + */ + static final long HAS_FRAME_FLAG = + 0b0000_0010_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L; + /** + * Flag which indicates that {@link InMemoryResumableFramesStore#drain(long)} has an actor which + * is currently progressing on the work. This flag should work as a guard to enter|exist into|from + * the {@link InMemoryResumableFramesStore#drain(long)} method. + */ + static final long MAX_WORK_IN_PROGRESS = + 0b0000_0000_0000_0000_0000_0000_0000_0000_1111_1111_1111_1111_1111_1111_1111_1111L; - public InMemoryResumableFramesStore(String tag, int cacheSizeBytes) { - this.tag = tag; + public InMemoryResumableFramesStore(String side, ByteBuf session, int cacheSizeBytes) { + this.side = side; + this.session = session.toString(CharsetUtil.UTF_8); this.cacheLimit = cacheSizeBytes; - this.cachedFrames = new ArrayList<>(); + this.cachedFrames = new ArrayDeque<>(); } public Mono saveFrames(Flux frames) { return frames .transform( - Operators.lift( - (__, actual) -> { - this.saveFramesSubscriber = actual; - return this; - })) + Operators.lift( + (__, actual) -> this.framesSubscriber = new FramesSubscriber(actual, this))) .then(); } @Override public void releaseFrames(long remoteImpliedPos) { - long pos = position; - logger.debug( - "{} Removing frames for local: {}, remote implied: {}", tag, pos, remoteImpliedPos); - long toRemoveBytes = Math.max(0, remoteImpliedPos - pos); - int removedBytes = 0; - final ArrayList frames = cachedFrames; - synchronized (this) { - while (toRemoveBytes > removedBytes && frames.size() > 0) { - ByteBuf cachedFrame = frames.remove(0); - int frameSize = cachedFrame.readableBytes(); - cachedFrame.release(); - removedBytes += frameSize; + long lastReceivedRemoteImpliedPosition = this.remoteImpliedPosition; + if (lastReceivedRemoteImpliedPosition > remoteImpliedPos) { + throw new IllegalStateException( + "Given Remote Implied Position is behind the last received Remote Implied Position"); + } + + this.remoteImpliedPosition = remoteImpliedPos; + + final long previousState = markRemoteImpliedPositionChanged(this); + if (isFinalized(previousState) || isWorkInProgress(previousState)) { + return; + } + + drain((previousState + 1) | REMOTE_IMPLIED_POSITION_CHANGED_FLAG); + } + + void drain(long expectedState) { + final Fuseable.QueueSubscription qs = this.framesSubscriber.qs; + final Queue cachedFrames = this.cachedFrames; + + for (; ; ) { + if (hasRemoteImpliedPositionChanged(expectedState)) { + expectedState = handlePendingRemoteImpliedPositionChanges(expectedState, cachedFrames); + } + + if (hasPendingConnection(expectedState)) { + expectedState = handlePendingConnection(expectedState, cachedFrames); + } + + if (isConnected(expectedState)) { + if (isTerminated(expectedState)) { + handleTerminal(this.terminal); + } else if (isDisposed()) { + handleTerminal(new CancellationException("Disposed")); + } else if (hasFrames(expectedState)) { + handlePendingFrames(qs); + } + } + + if (isDisposed(expectedState) || isTerminated(expectedState)) { + clearAndFinalize(this); + return; + } + + expectedState = markWorkDone(this, expectedState); + if (isFinalized(expectedState)) { + return; + } + + if (!isWorkInProgress(expectedState)) { + return; } } + } - if (toRemoveBytes > removedBytes) { - throw new IllegalStateException( - String.format( - "Local and remote state disagreement: " - + "need to remove additional %d bytes, but cache is empty", - toRemoveBytes)); - } else if (toRemoveBytes < removedBytes) { - throw new IllegalStateException( - "Local and remote state disagreement: local and remote frame sizes are not equal"); - } else { - POSITION.addAndGet(this, removedBytes); - if (cacheLimit != Integer.MAX_VALUE) { - CACHE_SIZE.addAndGet(this, -removedBytes); - logger.debug("{} Removed frames. Current cache size: {}", tag, cacheSize); + long handlePendingRemoteImpliedPositionChanges(long expectedState, Queue cachedFrames) { + final long remoteImpliedPosition = this.remoteImpliedPosition; + final long firstAvailableFramePosition = this.firstAvailableFramePosition; + final long toDropFromCache = Math.max(0, remoteImpliedPosition - firstAvailableFramePosition); + + if (toDropFromCache > 0) { + final int droppedFromCache = dropFramesFromCache(toDropFromCache, cachedFrames); + + if (toDropFromCache > droppedFromCache) { + this.terminal = + new IllegalStateException( + String.format( + "Local and remote state disagreement: " + + "need to remove additional %d bytes, but cache is empty", + toDropFromCache)); + expectedState = markTerminated(this) | TERMINATED_FLAG; + } + + if (toDropFromCache < droppedFromCache) { + this.terminal = + new IllegalStateException( + "Local and remote state disagreement: local and remote frame sizes are not equal"); + expectedState = markTerminated(this) | TERMINATED_FLAG; + } + + FIRST_AVAILABLE_FRAME_POSITION.lazySet(this, firstAvailableFramePosition + droppedFromCache); + if (this.cacheLimit != Integer.MAX_VALUE) { + this.cacheSize -= droppedFromCache; + + if (logger.isDebugEnabled()) { + logger.debug( + "Side[{}]|Session[{}]. Removed frames from cache to position[{}]. CacheSize[{}]", + this.side, + this.session, + this.remoteImpliedPosition, + this.cacheSize); + } } } + + return expectedState; + } + + void handlePendingFrames(Fuseable.QueueSubscription qs) { + for (; ; ) { + final ByteBuf frame = qs.poll(); + final boolean empty = frame == null; + + if (empty) { + break; + } + + handleFrame(frame); + + if (!isConnected(this.state)) { + break; + } + } + } + + long handlePendingConnection(long expectedState, Queue cachedFrames) { + CoreSubscriber lastActual = null; + for (; ; ) { + final CoreSubscriber nextActual = this.pendingActual; + + if (nextActual != lastActual) { + for (final ByteBuf frame : cachedFrames) { + nextActual.onNext(frame.retainedSlice()); + } + } + + expectedState = markConnected(this, expectedState); + if (isConnected(expectedState)) { + if (logger.isDebugEnabled()) { + logger.debug( + "Side[{}]|Session[{}]. Connected at Position[{}] and ImpliedPosition[{}]", + side, + session, + firstAvailableFramePosition, + impliedPosition); + } + + this.actual = nextActual; + break; + } + + if (!hasPendingConnection(expectedState)) { + break; + } + + lastActual = nextActual; + } + return expectedState; + } + + static int dropFramesFromCache(long toRemoveBytes, Queue cache) { + int removedBytes = 0; + while (toRemoveBytes > removedBytes && cache.size() > 0) { + final ByteBuf cachedFrame = cache.poll(); + final int frameSize = cachedFrame.readableBytes(); + + cachedFrame.release(); + + removedBytes += frameSize; + } + + return removedBytes; } @Override @@ -140,12 +308,12 @@ public Flux resumeStream() { @Override public long framePosition() { - return position; + return this.firstAvailableFramePosition; } @Override public long frameImpliedPosition() { - return impliedPosition & Long.MAX_VALUE; + return this.impliedPosition & Long.MAX_VALUE; } @Override @@ -169,7 +337,8 @@ void pauseImplied() { final long impliedPosition = this.impliedPosition; if (IMPLIED_POSITION.compareAndSet(this, impliedPosition, impliedPosition | Long.MIN_VALUE)) { - logger.debug("Tag {}. Paused at position[{}]", tag, impliedPosition); + logger.debug( + "Side[{}]|Session[{}]. Paused at position[{}]", side, session, impliedPosition); return; } } @@ -181,7 +350,11 @@ void resumeImplied() { final long restoredImpliedPosition = impliedPosition & Long.MAX_VALUE; if (IMPLIED_POSITION.compareAndSet(this, impliedPosition, restoredImpliedPosition)) { - logger.debug("Tag {}. Resumed at position[{}]", tag, restoredImpliedPosition); + logger.debug( + "Side[{}]|Session[{}]. Resumed at position[{}]", + side, + session, + restoredImpliedPosition); return; } } @@ -194,102 +367,94 @@ public Mono onClose() { @Override public void dispose() { - if (STATE.getAndSet(this, 2) != 2) { - cacheSize = 0; - synchronized (this) { - logger.debug("Tag {}.Disposing InMemoryFrameStore", tag); - for (ByteBuf frame : cachedFrames) { - if (frame != null) { - frame.release(); - } - } - cachedFrames.clear(); - } - disposed.tryEmitEmpty(); + final long previousState = markDisposed(this); + if (isFinalized(previousState) + || isDisposed(previousState) + || isWorkInProgress(previousState)) { + return; + } + + drain(previousState | DISPOSED_FLAG); + } + + void clearCache() { + final Queue frames = this.cachedFrames; + this.cacheSize = 0; + + ByteBuf frame; + while ((frame = frames.poll()) != null) { + frame.release(); } } @Override public boolean isDisposed() { - return state == 2; + return isDisposed(this.state); } - @Override - public void onSubscribe(Subscription s) { - saveFramesSubscriber.onSubscribe(Operators.emptySubscription()); - s.request(Long.MAX_VALUE); + void handleFrame(ByteBuf frame) { + final boolean isResumable = isResumableFrame(frame); + if (isResumable) { + handleResumableFrame(frame); + return; + } + + handleConnectionFrame(frame); } - @Override - public void onError(Throwable t) { - saveFramesSubscriber.onError(t); + void handleTerminal(@Nullable Throwable t) { + if (t != null) { + this.actual.onError(t); + } else { + this.actual.onComplete(); + } } - @Override - public void onComplete() { - saveFramesSubscriber.onComplete(); + void handleConnectionFrame(ByteBuf frame) { + this.actual.onNext(frame); } - @Override - public void onNext(ByteBuf frame) { - final int state; - final boolean isResumable = isResumableFrame(frame); - boolean canBeStore = isResumable; - if (isResumable) { - final ArrayList frames = cachedFrames; - final int incomingFrameSize = frame.readableBytes(); - final int cacheLimit = this.cacheLimit; + void handleResumableFrame(ByteBuf frame) { + final Queue frames = this.cachedFrames; + final int incomingFrameSize = frame.readableBytes(); + final int cacheLimit = this.cacheLimit; - if (cacheLimit != Integer.MAX_VALUE) { - long availableSize = cacheLimit - cacheSize; - if (availableSize < incomingFrameSize) { - int removedBytes = 0; - synchronized (this) { - while (availableSize < incomingFrameSize) { - if (frames.size() == 0) { - break; - } - ByteBuf cachedFrame; - cachedFrame = frames.remove(0); - final int frameSize = cachedFrame.readableBytes(); - availableSize += frameSize; - removedBytes += frameSize; - cachedFrame.release(); - } - } - CACHE_SIZE.addAndGet(this, -removedBytes); - - canBeStore = availableSize >= incomingFrameSize; - POSITION.addAndGet(this, removedBytes + (canBeStore ? 0 : incomingFrameSize)); + final boolean canBeStore; + int cacheSize = this.cacheSize; + if (cacheLimit != Integer.MAX_VALUE) { + final long availableSize = cacheLimit - cacheSize; + + if (availableSize < incomingFrameSize) { + final long firstAvailableFramePosition = this.firstAvailableFramePosition; + final long toRemoveBytes = incomingFrameSize - availableSize; + final int removedBytes = dropFramesFromCache(toRemoveBytes, frames); + + cacheSize = cacheSize - removedBytes; + canBeStore = removedBytes >= toRemoveBytes; + + if (canBeStore) { + FIRST_AVAILABLE_FRAME_POSITION.lazySet(this, firstAvailableFramePosition + removedBytes); } else { - canBeStore = true; + this.cacheSize = cacheSize; + FIRST_AVAILABLE_FRAME_POSITION.lazySet( + this, firstAvailableFramePosition + removedBytes + incomingFrameSize); } } else { canBeStore = true; } + } else { + canBeStore = true; + } - state = this.state; - if (canBeStore) { - synchronized (this) { - if (state != 2) { - frames.add(frame); - } - } + if (canBeStore) { + frames.offer(frame); - if (cacheLimit != Integer.MAX_VALUE) { - CACHE_SIZE.addAndGet(this, incomingFrameSize); - } + if (cacheLimit != Integer.MAX_VALUE) { + this.cacheSize = cacheSize + incomingFrameSize; } - } else { - state = this.state; } - final CoreSubscriber actual = this.actual; - if (state == 1) { - actual.onNext(isResumable && canBeStore ? frame.retainedSlice() : frame); - } else if (!isResumable || !canBeStore || state == 2) { - frame.release(); - } + this.actual.onNext(canBeStore ? frame.retainedSlice() : frame); } @Override @@ -298,30 +463,377 @@ public void request(long n) {} @Override public void cancel() { pauseImplied(); - state = 0; + markDisconnected(this); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[{}]|Session[{}]. Disconnected at Position[{}] and ImpliedPosition[{}]", + side, + session, + firstAvailableFramePosition, + frameImpliedPosition()); + } } @Override public void subscribe(CoreSubscriber actual) { - final int state = this.state; - if (state != 2) { - resumeImplied(); - logger.debug( - "Tag: {}. Subscribed at Position[{}] and ImpliedPosition[{}]", - tag, - position, - impliedPosition); - actual.onSubscribe(this); - synchronized (this) { - for (final ByteBuf frame : cachedFrames) { - actual.onNext(frame.retainedSlice()); + resumeImplied(); + actual.onSubscribe(this); + this.pendingActual = actual; + + final long previousState = markPendingConnection(this); + if (isDisposed(previousState)) { + actual.onError(new CancellationException("Disposed")); + return; + } + + if (isTerminated(previousState)) { + actual.onError(new CancellationException("Disposed")); + return; + } + + if (isWorkInProgress(previousState)) { + return; + } + + drain((previousState + 1) | PENDING_CONNECTION_FLAG); + } + + static class FramesSubscriber + implements CoreSubscriber, Fuseable.QueueSubscription { + + final CoreSubscriber actual; + final InMemoryResumableFramesStore parent; + + Fuseable.QueueSubscription qs; + + boolean done; + + FramesSubscriber(CoreSubscriber actual, InMemoryResumableFramesStore parent) { + this.actual = actual; + this.parent = parent; + } + + @Override + @SuppressWarnings("unchecked") + public void onSubscribe(Subscription s) { + if (Operators.validate(this.qs, s)) { + final Fuseable.QueueSubscription qs = (Fuseable.QueueSubscription) s; + this.qs = qs; + + final int m = qs.requestFusion(Fuseable.ANY); + + if (m != Fuseable.ASYNC) { + s.cancel(); + this.actual.onSubscribe(this); + this.actual.onError(new IllegalStateException("Source has to be ASYNC fuseable")); + return; } + + this.actual.onSubscribe(this); } + } - this.actual = actual; - STATE.compareAndSet(this, 0, 1); - } else { - Operators.complete(actual); + @Override + public void onNext(ByteBuf byteBuf) { + final InMemoryResumableFramesStore parent = this.parent; + long previousState = InMemoryResumableFramesStore.markFrameAdded(parent); + + if (isFinalized(previousState)) { + this.qs.clear(); + return; + } + + if (isWorkInProgress(previousState) + || (!isConnected(previousState) && !hasPendingConnection(previousState))) { + return; + } + + parent.drain(previousState + 1); } + + @Override + public void onError(Throwable t) { + if (this.done) { + Operators.onErrorDropped(t, this.actual.currentContext()); + return; + } + + final InMemoryResumableFramesStore parent = this.parent; + + parent.terminal = t; + this.done = true; + + final long previousState = InMemoryResumableFramesStore.markTerminated(parent); + if (isFinalized(previousState)) { + Operators.onErrorDropped(t, this.actual.currentContext()); + return; + } + + if (isWorkInProgress(previousState)) { + return; + } + + parent.drain(previousState | TERMINATED_FLAG); + } + + @Override + public void onComplete() { + if (this.done) { + return; + } + + final InMemoryResumableFramesStore parent = this.parent; + + this.done = true; + + final long previousState = InMemoryResumableFramesStore.markTerminated(parent); + if (isFinalized(previousState)) { + return; + } + + if (isWorkInProgress(previousState)) { + return; + } + + parent.drain(previousState | TERMINATED_FLAG); + } + + @Override + public void cancel() { + if (this.done) { + return; + } + + this.done = true; + + final long previousState = InMemoryResumableFramesStore.markTerminated(parent); + if (isFinalized(previousState)) { + return; + } + + if (isWorkInProgress(previousState)) { + return; + } + + parent.drain(previousState | TERMINATED_FLAG); + } + + @Override + public void request(long n) {} + + @Override + public int requestFusion(int requestedMode) { + return Fuseable.NONE; + } + + @Override + public Void poll() { + return null; + } + + @Override + public int size() { + return 0; + } + + @Override + public boolean isEmpty() { + return false; + } + + @Override + public void clear() {} + } + + static long markFrameAdded(InMemoryResumableFramesStore store) { + for (; ; ) { + final long state = store.state; + + if (isFinalized(state)) { + return state; + } + + long nextState = state; + if (isConnected(state) || hasPendingConnection(state) || isWorkInProgress(state)) { + nextState = + (state & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? nextState : nextState + 1; + } + + if (STATE.compareAndSet(store, state, nextState | HAS_FRAME_FLAG)) { + return state; + } + } + } + + static long markPendingConnection(InMemoryResumableFramesStore store) { + for (; ; ) { + final long state = store.state; + + if (isFinalized(state) || isDisposed(state) || isTerminated(state)) { + return state; + } + + if (isConnected(state)) { + return state; + } + + final long nextState = + (state & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? state : state + 1; + if (STATE.compareAndSet(store, state, nextState | PENDING_CONNECTION_FLAG)) { + return state; + } + } + } + + static long markRemoteImpliedPositionChanged(InMemoryResumableFramesStore store) { + for (; ; ) { + final long state = store.state; + + if (isFinalized(state)) { + return state; + } + + final long nextState = + (state & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? state : (state + 1); + if (STATE.compareAndSet(store, state, nextState | REMOTE_IMPLIED_POSITION_CHANGED_FLAG)) { + return state; + } + } + } + + static long markDisconnected(InMemoryResumableFramesStore store) { + for (; ; ) { + final long state = store.state; + + if (isFinalized(state)) { + return state; + } + + if (STATE.compareAndSet(store, state, state & ~CONNECTED_FLAG & ~PENDING_CONNECTION_FLAG)) { + return state; + } + } + } + + static long markWorkDone(InMemoryResumableFramesStore store, long expectedState) { + for (; ; ) { + final long state = store.state; + + if (expectedState != state) { + return state; + } + + if (isFinalized(state)) { + return state; + } + + final long nextState = state & ~MAX_WORK_IN_PROGRESS & ~REMOTE_IMPLIED_POSITION_CHANGED_FLAG; + if (STATE.compareAndSet(store, state, nextState)) { + return nextState; + } + } + } + + static long markConnected(InMemoryResumableFramesStore store, long expectedState) { + for (; ; ) { + final long state = store.state; + + if (state != expectedState) { + return state; + } + + if (isFinalized(state)) { + return state; + } + + final long nextState = state ^ PENDING_CONNECTION_FLAG | CONNECTED_FLAG; + if (STATE.compareAndSet(store, state, nextState)) { + return nextState; + } + } + } + + static long markTerminated(InMemoryResumableFramesStore store) { + for (; ; ) { + final long state = store.state; + + if (isFinalized(state)) { + return state; + } + + final long nextState = + (state & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? state : (state + 1); + if (STATE.compareAndSet(store, state, nextState | TERMINATED_FLAG)) { + return state; + } + } + } + + static long markDisposed(InMemoryResumableFramesStore store) { + for (; ; ) { + final long state = store.state; + + if (isFinalized(state)) { + return state; + } + + final long nextState = + (state & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? state : (state + 1); + if (STATE.compareAndSet(store, state, nextState | DISPOSED_FLAG)) { + return state; + } + } + } + + static void clearAndFinalize(InMemoryResumableFramesStore store) { + final Fuseable.QueueSubscription qs = store.framesSubscriber.qs; + for (; ; ) { + final long state = store.state; + + qs.clear(); + store.clearCache(); + + if (isFinalized(state)) { + return; + } + + if (STATE.compareAndSet(store, state, state | FINALIZED_FLAG & ~MAX_WORK_IN_PROGRESS)) { + store.disposed.tryEmitEmpty(); + store.framesSubscriber.onComplete(); + return; + } + } + } + + static boolean isConnected(long state) { + return (state & CONNECTED_FLAG) == CONNECTED_FLAG; + } + + static boolean hasRemoteImpliedPositionChanged(long state) { + return (state & REMOTE_IMPLIED_POSITION_CHANGED_FLAG) == REMOTE_IMPLIED_POSITION_CHANGED_FLAG; + } + + static boolean hasPendingConnection(long state) { + return (state & PENDING_CONNECTION_FLAG) == PENDING_CONNECTION_FLAG; + } + + static boolean hasFrames(long state) { + return (state & HAS_FRAME_FLAG) == HAS_FRAME_FLAG; + } + + static boolean isTerminated(long state) { + return (state & TERMINATED_FLAG) == TERMINATED_FLAG; + } + + static boolean isDisposed(long state) { + return (state & DISPOSED_FLAG) == DISPOSED_FLAG; + } + + static boolean isFinalized(long state) { + return (state & FINALIZED_FLAG) == FINALIZED_FLAG; + } + + static boolean isWorkInProgress(long state) { + return (state & MAX_WORK_IN_PROGRESS) > 0; } } diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java index 6e90e6d63..18cd7167a 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java @@ -18,8 +18,11 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.util.CharsetUtil; import io.rsocket.DuplexConnection; import io.rsocket.RSocketErrorException; +import io.rsocket.exceptions.ConnectionCloseException; +import io.rsocket.exceptions.ConnectionErrorException; import io.rsocket.frame.FrameHeaderCodec; import io.rsocket.internal.UnboundedProcessor; import java.net.SocketAddress; @@ -35,13 +38,15 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.core.publisher.Sinks; +import reactor.util.annotation.Nullable; public class ResumableDuplexConnection extends Flux implements DuplexConnection, Subscription { static final Logger logger = LoggerFactory.getLogger(ResumableDuplexConnection.class); - final String tag; + final String side; + final String session; final ResumableFramesStore resumableFramesStore; final UnboundedProcessor savableFramesSender; @@ -66,8 +71,12 @@ public class ResumableDuplexConnection extends Flux int connectionIndex = 0; public ResumableDuplexConnection( - String tag, DuplexConnection initialConnection, ResumableFramesStore resumableFramesStore) { - this.tag = tag; + String side, + ByteBuf session, + DuplexConnection initialConnection, + ResumableFramesStore resumableFramesStore) { + this.side = side; + this.session = session.toString(CharsetUtil.UTF_8); this.onConnectionClosedSink = Sinks.unsafe().many().unicast().onBackpressureBuffer(); this.resumableFramesStore = resumableFramesStore; this.savableFramesSender = new UnboundedProcessor(); @@ -94,29 +103,51 @@ public boolean connect(DuplexConnection nextConnection) { } void initConnection(DuplexConnection nextConnection) { - logger.debug("Tag {}. Initializing connection {}", tag, nextConnection); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[{}]|Session[{}]. Connecting to DuplexConnection[{}]", + side, + session, + nextConnection); + } final int currentConnectionIndex = connectionIndex; final FrameReceivingSubscriber frameReceivingSubscriber = - new FrameReceivingSubscriber(tag, resumableFramesStore, receiveSubscriber); + new FrameReceivingSubscriber(side, resumableFramesStore, receiveSubscriber); this.connectionIndex = currentConnectionIndex + 1; this.activeReceivingSubscriber = frameReceivingSubscriber; - final Disposable disposable = + final Disposable resumeStreamSubscription = resumableFramesStore .resumeStream() - .subscribe(f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), f)); + .subscribe( + f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), f), + t -> sendErrorAndClose(new ConnectionErrorException(t.getMessage())), + () -> + sendErrorAndClose( + new ConnectionCloseException("Connection Closed Unexpectedly"))); nextConnection.receive().subscribe(frameReceivingSubscriber); nextConnection .onClose() .doFinally( __ -> { frameReceivingSubscriber.dispose(); - disposable.dispose(); + resumeStreamSubscription.dispose(); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[{}]|Session[{}]. Disconnected from DuplexConnection[{}]", + side, + session, + nextConnection); + } Sinks.EmitResult result = onConnectionClosedSink.tryEmitNext(currentConnectionIndex); if (!result.equals(Sinks.EmitResult.OK)) { - logger.error("Failed to notify session of closed connection: {}", result); + logger.error( + "Side[{}]|Session[{}]. Failed to notify session of closed connection: {}", + side, + session, + result); } }) .subscribe(); @@ -196,6 +227,10 @@ public Mono onClose() { @Override public void dispose() { + dispose(null); + } + + void dispose(@Nullable Throwable e) { final DuplexConnection activeConnection = ACTIVE_CONNECTION.getAndSet(this, DisposedConnection.INSTANCE); if (activeConnection == DisposedConnection.INSTANCE) { @@ -206,11 +241,20 @@ public void dispose() { activeConnection.dispose(); } + if (logger.isDebugEnabled()) { + logger.debug("Side[{}]|Session[{}]. Disposing...", side, session); + } + framesSaverDisposable.dispose(); activeReceivingSubscriber.dispose(); savableFramesSender.dispose(); onConnectionClosedSink.tryEmitComplete(); - onClose.tryEmitEmpty(); + + if (e != null) { + onClose.tryEmitError(e); + } else { + onClose.tryEmitEmpty(); + } } @Override diff --git a/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java b/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java index b62c615f3..a57899cac 100644 --- a/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java +++ b/rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java @@ -123,12 +123,15 @@ void doResume(long remotePos, long remoteImpliedPos, DuplexConnection nextDuplex long impliedPosition = resumableFramesStore.frameImpliedPosition(); long position = resumableFramesStore.framePosition(); - logger.debug( - "Resume FRAME received. ClientResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}, ServerResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}", - remoteImpliedPos, - remotePos, - impliedPosition, - position); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[server]|Session[{}]. Resume FRAME received. ClientResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}, ServerResumeState{observedFramesPosition[{}], sentFramesPosition[{}]}", + resumeToken, + remoteImpliedPos, + remotePos, + impliedPosition, + position); + } for (; ; ) { final Subscription subscription = this.s; @@ -138,6 +141,7 @@ void doResume(long remotePos, long remoteImpliedPos, DuplexConnection nextDuplex final RejectedResumeException rejectedResumeException = new RejectedResumeException("resume_internal_error: Session Expired"); nextDuplexConnection.sendErrorAndClose(rejectedResumeException); + nextDuplexConnection.receive().subscribe().dispose(); return; } @@ -152,31 +156,47 @@ void doResume(long remotePos, long remoteImpliedPos, DuplexConnection nextDuplex if (position != remoteImpliedPos) { resumableFramesStore.releaseFrames(remoteImpliedPos); } - nextDuplexConnection.sendFrame( - 0, ResumeOkFrameCodec.encode(allocator, resumableFramesStore.frameImpliedPosition())); - logger.debug("ResumeOK Frame has been sent"); + nextDuplexConnection.sendFrame(0, ResumeOkFrameCodec.encode(allocator, impliedPosition)); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[server]|Session[{}]. ResumeOKFrame[impliedPosition[{}]] has been sent", + resumeToken, + impliedPosition); + } } catch (Throwable t) { logger.debug("Exception occurred while releasing frames in the frameStore", t); tryTimeoutSession(); nextDuplexConnection.sendErrorAndClose(new RejectedResumeException(t.getMessage(), t)); + nextDuplexConnection.receive().subscribe().dispose(); return; } if (resumableConnection.connect(nextDuplexConnection)) { keepAliveSupport.start(); - logger.debug("Session[{}] has been resumed successfully", resumeToken); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[server]|Session[{}]. Session has been resumed successfully", resumeToken); + } } else { - logger.debug("Session has already been expired. Terminating received connection"); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[server]|Session[{}]. Session has already been expired. Terminating received connection", + resumeToken); + } final ConnectionErrorException connectionErrorException = new ConnectionErrorException("resume_internal_error: Session Expired"); nextDuplexConnection.sendErrorAndClose(connectionErrorException); + nextDuplexConnection.receive().subscribe().dispose(); } } else { - logger.debug( - "Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}] and RemotePosition[{}] to be less or equal to LocalImpliedPosition[{}]. Terminating received connection", - remoteImpliedPos, - position, - remotePos, - impliedPosition); + if (logger.isDebugEnabled()) { + logger.debug( + "Side[server]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}] and RemotePosition[{}] to be less or equal to LocalImpliedPosition[{}]. Terminating received connection", + resumeToken, + remoteImpliedPos, + position, + remotePos, + impliedPosition); + } tryTimeoutSession(); final RejectedResumeException rejectedResumeException = new RejectedResumeException( @@ -184,6 +204,7 @@ void doResume(long remotePos, long remoteImpliedPos, DuplexConnection nextDuplex "resumption_pos=[ remote: { pos: %d, impliedPos: %d }, local: { pos: %d, impliedPos: %d }]", remotePos, remoteImpliedPos, position, impliedPosition)); nextDuplexConnection.sendErrorAndClose(rejectedResumeException); + nextDuplexConnection.receive().subscribe().dispose(); } } diff --git a/rsocket-core/src/test/java/io/rsocket/resume/InMemoryResumeStoreTest.java b/rsocket-core/src/test/java/io/rsocket/resume/InMemoryResumeStoreTest.java index a595faa86..bba40d674 100644 --- a/rsocket-core/src/test/java/io/rsocket/resume/InMemoryResumeStoreTest.java +++ b/rsocket-core/src/test/java/io/rsocket/resume/InMemoryResumeStoreTest.java @@ -4,59 +4,123 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; +import io.rsocket.RaceTestConstants; +import io.rsocket.internal.UnboundedProcessor; +import io.rsocket.internal.subscriber.AssertSubscriber; import java.util.Arrays; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import reactor.core.Disposable; +import reactor.core.publisher.Hooks; +import reactor.test.util.RaceTestUtils; public class InMemoryResumeStoreTest { @Test void saveNonResumableFrame() { - InMemoryResumableFramesStore store = inMemoryStore(25); - ByteBuf frame1 = fakeConnectionFrame(10); - ByteBuf frame2 = fakeConnectionFrame(35); - store.saveFrames(Flux.just(frame1, frame2)).block(); + final InMemoryResumableFramesStore store = inMemoryStore(25); + final UnboundedProcessor sender = new UnboundedProcessor(); + + store.saveFrames(sender).subscribe(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + + final ByteBuf frame1 = fakeConnectionFrame(10); + final ByteBuf frame2 = fakeConnectionFrame(35); + + sender.onNext(frame1); + sender.onNext(frame2); + assertThat(store.cachedFrames.size()).isZero(); assertThat(store.cacheSize).isZero(); - assertThat(store.position).isZero(); + assertThat(store.firstAvailableFramePosition).isZero(); + + assertSubscriber.assertValueCount(2).values().forEach(ByteBuf::release); + assertThat(frame1.refCnt()).isZero(); assertThat(frame2.refCnt()).isZero(); } @Test void saveWithoutTailRemoval() { - InMemoryResumableFramesStore store = inMemoryStore(25); - ByteBuf frame = fakeResumableFrame(10); - store.saveFrames(Flux.just(frame)).block(); + final InMemoryResumableFramesStore store = inMemoryStore(25); + final UnboundedProcessor sender = new UnboundedProcessor(); + + store.saveFrames(sender).subscribe(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + + final ByteBuf frame = fakeResumableFrame(10); + + sender.onNext(frame); + assertThat(store.cachedFrames.size()).isEqualTo(1); assertThat(store.cacheSize).isEqualTo(frame.readableBytes()); - assertThat(store.position).isZero(); + assertThat(store.firstAvailableFramePosition).isZero(); + + assertSubscriber.assertValueCount(1).values().forEach(ByteBuf::release); + assertThat(frame.refCnt()).isOne(); } @Test void saveRemoveOneFromTail() { - InMemoryResumableFramesStore store = inMemoryStore(25); - ByteBuf frame1 = fakeResumableFrame(20); - ByteBuf frame2 = fakeResumableFrame(10); - store.saveFrames(Flux.just(frame1, frame2)).block(); + final InMemoryResumableFramesStore store = inMemoryStore(25); + final UnboundedProcessor sender = new UnboundedProcessor(); + + store.saveFrames(sender).subscribe(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + final ByteBuf frame1 = fakeResumableFrame(20); + final ByteBuf frame2 = fakeResumableFrame(10); + + sender.onNext(frame1); + sender.onNext(frame2); + assertThat(store.cachedFrames.size()).isOne(); assertThat(store.cacheSize).isEqualTo(frame2.readableBytes()); - assertThat(store.position).isEqualTo(frame1.readableBytes()); + assertThat(store.firstAvailableFramePosition).isEqualTo(frame1.readableBytes()); + + assertSubscriber.assertValueCount(2).values().forEach(ByteBuf::release); + assertThat(frame1.refCnt()).isZero(); assertThat(frame2.refCnt()).isOne(); } @Test void saveRemoveTwoFromTail() { - InMemoryResumableFramesStore store = inMemoryStore(25); - ByteBuf frame1 = fakeResumableFrame(10); - ByteBuf frame2 = fakeResumableFrame(10); - ByteBuf frame3 = fakeResumableFrame(20); - store.saveFrames(Flux.just(frame1, frame2, frame3)).block(); + final InMemoryResumableFramesStore store = inMemoryStore(25); + final UnboundedProcessor sender = new UnboundedProcessor(); + + store.saveFrames(sender).subscribe(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + + final ByteBuf frame1 = fakeResumableFrame(10); + final ByteBuf frame2 = fakeResumableFrame(10); + final ByteBuf frame3 = fakeResumableFrame(20); + + sender.onNext(frame1); + sender.onNext(frame2); + sender.onNext(frame3); + assertThat(store.cachedFrames.size()).isOne(); assertThat(store.cacheSize).isEqualTo(frame3.readableBytes()); - assertThat(store.position).isEqualTo(size(frame1, frame2)); + assertThat(store.firstAvailableFramePosition).isEqualTo(size(frame1, frame2)); + + assertSubscriber.assertValueCount(3).values().forEach(ByteBuf::release); + assertThat(frame1.refCnt()).isZero(); assertThat(frame2.refCnt()).isZero(); assertThat(frame3.refCnt()).isOne(); @@ -64,14 +128,27 @@ void saveRemoveTwoFromTail() { @Test void saveBiggerThanStore() { - InMemoryResumableFramesStore store = inMemoryStore(25); - ByteBuf frame1 = fakeResumableFrame(10); - ByteBuf frame2 = fakeResumableFrame(10); - ByteBuf frame3 = fakeResumableFrame(30); - store.saveFrames(Flux.just(frame1, frame2, frame3)).block(); + final InMemoryResumableFramesStore store = inMemoryStore(25); + final UnboundedProcessor sender = new UnboundedProcessor(); + + store.saveFrames(sender).subscribe(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + final ByteBuf frame1 = fakeResumableFrame(10); + final ByteBuf frame2 = fakeResumableFrame(10); + final ByteBuf frame3 = fakeResumableFrame(30); + + sender.onNext(frame1); + sender.onNext(frame2); + sender.onNext(frame3); + assertThat(store.cachedFrames.size()).isZero(); assertThat(store.cacheSize).isZero(); - assertThat(store.position).isEqualTo(size(frame1, frame2, frame3)); + assertThat(store.firstAvailableFramePosition).isEqualTo(size(frame1, frame2, frame3)); + + assertSubscriber.assertValueCount(3).values().forEach(ByteBuf::release); + assertThat(frame1.refCnt()).isZero(); assertThat(frame2.refCnt()).isZero(); assertThat(frame3.refCnt()).isZero(); @@ -79,15 +156,30 @@ void saveBiggerThanStore() { @Test void releaseFrames() { - InMemoryResumableFramesStore store = inMemoryStore(100); - ByteBuf frame1 = fakeResumableFrame(10); - ByteBuf frame2 = fakeResumableFrame(10); - ByteBuf frame3 = fakeResumableFrame(30); - store.saveFrames(Flux.just(frame1, frame2, frame3)).block(); + final InMemoryResumableFramesStore store = inMemoryStore(100); + + final UnboundedProcessor producer = new UnboundedProcessor(); + store.saveFrames(producer).subscribe(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + + final ByteBuf frame1 = fakeResumableFrame(10); + final ByteBuf frame2 = fakeResumableFrame(10); + final ByteBuf frame3 = fakeResumableFrame(30); + + producer.onNext(frame1); + producer.onNext(frame2); + producer.onNext(frame3); + store.releaseFrames(20); + assertThat(store.cachedFrames.size()).isOne(); assertThat(store.cacheSize).isEqualTo(frame3.readableBytes()); - assertThat(store.position).isEqualTo(size(frame1, frame2)); + assertThat(store.firstAvailableFramePosition).isEqualTo(size(frame1, frame2)); + + assertSubscriber.assertValueCount(3).values().forEach(ByteBuf::release); + assertThat(frame1.refCnt()).isZero(); assertThat(frame2.refCnt()).isZero(); assertThat(frame3.refCnt()).isOne(); @@ -95,20 +187,350 @@ void releaseFrames() { @Test void receiveImpliedPosition() { - InMemoryResumableFramesStore store = inMemoryStore(100); + final InMemoryResumableFramesStore store = inMemoryStore(100); + ByteBuf frame1 = fakeResumableFrame(10); ByteBuf frame2 = fakeResumableFrame(30); + store.resumableFrameReceived(frame1); store.resumableFrameReceived(frame2); + assertThat(store.frameImpliedPosition()).isEqualTo(size(frame1, frame2)); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void ensuresCleansOnTerminal(boolean hasSubscriber) { + final InMemoryResumableFramesStore store = inMemoryStore(100); + + final UnboundedProcessor producer = new UnboundedProcessor(); + store.saveFrames(producer).subscribe(); + + final AssertSubscriber assertSubscriber = + hasSubscriber ? store.resumeStream().subscribeWith(AssertSubscriber.create()) : null; + + final ByteBuf frame1 = fakeResumableFrame(10); + final ByteBuf frame2 = fakeResumableFrame(10); + final ByteBuf frame3 = fakeResumableFrame(30); + + producer.onNext(frame1); + producer.onNext(frame2); + producer.onNext(frame3); + producer.onComplete(); + + assertThat(store.cachedFrames.size()).isZero(); + assertThat(store.cacheSize).isZero(); + + assertThat(producer.isDisposed()).isTrue(); + + if (hasSubscriber) { + assertSubscriber.assertValueCount(3).assertTerminated().values().forEach(ByteBuf::release); + } + + assertThat(frame1.refCnt()).isZero(); + assertThat(frame2.refCnt()).isZero(); + assertThat(frame3.refCnt()).isZero(); + } + + @Test + void ensuresCleansOnTerminalLateSubscriber() { + final InMemoryResumableFramesStore store = inMemoryStore(100); + + final UnboundedProcessor producer = new UnboundedProcessor(); + store.saveFrames(producer).subscribe(); + + final ByteBuf frame1 = fakeResumableFrame(10); + final ByteBuf frame2 = fakeResumableFrame(10); + final ByteBuf frame3 = fakeResumableFrame(30); + + producer.onNext(frame1); + producer.onNext(frame2); + producer.onNext(frame3); + producer.onComplete(); + + assertThat(store.cachedFrames.size()).isZero(); + assertThat(store.cacheSize).isZero(); + + assertThat(producer.isDisposed()).isTrue(); + + final AssertSubscriber assertSubscriber = + store.resumeStream().subscribeWith(AssertSubscriber.create()); + assertSubscriber.assertTerminated(); + + assertThat(frame1.refCnt()).isZero(); + assertThat(frame2.refCnt()).isZero(); + assertThat(frame3.refCnt()).isZero(); + } + + @ParameterizedTest(name = "Sending vs Reconnect Race Test. WithLateSubscriber[{0}]") + @ValueSource(booleans = {true, false}) + void sendingVsReconnectRaceTest(boolean withLateSubscriber) { + for (int i = 0; i < RaceTestConstants.REPEATS; i++) { + final InMemoryResumableFramesStore store = inMemoryStore(Integer.MAX_VALUE); + final UnboundedProcessor frames = new UnboundedProcessor(); + final BlockingQueue receivedFrames = new ArrayBlockingQueue<>(10); + final AtomicInteger receivedPosition = new AtomicInteger(); + + store.saveFrames(frames).subscribe(); + + final Consumer consumer = + f -> { + if (ResumableDuplexConnection.isResumableFrame(f)) { + receivedPosition.addAndGet(f.readableBytes()); + receivedFrames.offer(f); + return; + } + f.release(); + }; + final AtomicReference disposableReference = + new AtomicReference<>( + withLateSubscriber ? null : store.resumeStream().subscribe(consumer)); + + final ByteBuf byteBuf1 = fakeResumableFrame(5); + final ByteBuf byteBuf11 = fakeConnectionFrame(5); + final ByteBuf byteBuf2 = fakeResumableFrame(6); + final ByteBuf byteBuf21 = fakeConnectionFrame(5); + final ByteBuf byteBuf3 = fakeResumableFrame(7); + final ByteBuf byteBuf31 = fakeConnectionFrame(5); + final ByteBuf byteBuf4 = fakeResumableFrame(8); + final ByteBuf byteBuf41 = fakeConnectionFrame(5); + final ByteBuf byteBuf5 = fakeResumableFrame(25); + final ByteBuf byteBuf51 = fakeConnectionFrame(35); + + RaceTestUtils.race( + () -> { + if (withLateSubscriber) { + disposableReference.set(store.resumeStream().subscribe(consumer)); + } + + // disconnect + disposableReference.get().dispose(); + + while (InMemoryResumableFramesStore.isWorkInProgress(store.state)) { + // ignore + } + + // mimic RESUME_OK frame received + store.releaseFrames(receivedPosition.get()); + disposableReference.set(store.resumeStream().subscribe(consumer)); + + // disconnect + disposableReference.get().dispose(); + + while (InMemoryResumableFramesStore.isWorkInProgress(store.state)) { + // ignore + } + + // mimic RESUME_OK frame received + store.releaseFrames(receivedPosition.get()); + disposableReference.set(store.resumeStream().subscribe(consumer)); + }, + () -> { + frames.onNext(byteBuf1); + frames.onNextPrioritized(byteBuf11); + frames.onNext(byteBuf2); + frames.onNext(byteBuf3); + frames.onNextPrioritized(byteBuf31); + frames.onNext(byteBuf4); + frames.onNext(byteBuf5); + }, + () -> { + frames.onNextPrioritized(byteBuf21); + frames.onNextPrioritized(byteBuf41); + frames.onNextPrioritized(byteBuf51); + }); + + store.releaseFrames(receivedFrames.stream().mapToInt(ByteBuf::readableBytes).sum()); + + assertThat(store.cacheSize).isZero(); + assertThat(store.cachedFrames).isEmpty(); + + assertThat(receivedFrames) + .hasSize(5) + .containsSequence(byteBuf1, byteBuf2, byteBuf3, byteBuf4, byteBuf5); + receivedFrames.forEach(ReferenceCounted::release); + + assertThat(byteBuf1.refCnt()).isZero(); + assertThat(byteBuf11.refCnt()).isZero(); + assertThat(byteBuf2.refCnt()).isZero(); + assertThat(byteBuf21.refCnt()).isZero(); + assertThat(byteBuf3.refCnt()).isZero(); + assertThat(byteBuf31.refCnt()).isZero(); + assertThat(byteBuf4.refCnt()).isZero(); + assertThat(byteBuf41.refCnt()).isZero(); + assertThat(byteBuf5.refCnt()).isZero(); + assertThat(byteBuf51.refCnt()).isZero(); + } + } + + @ParameterizedTest( + name = "Sending vs Reconnect with incorrect position Race Test. WithLateSubscriber[{0}]") + @ValueSource(booleans = {true, false}) + void incorrectReleaseFramesWithOnNextRaceTest(boolean withLateSubscriber) { + Hooks.onErrorDropped(t -> {}); + try { + for (int i = 0; i < RaceTestConstants.REPEATS; i++) { + final InMemoryResumableFramesStore store = inMemoryStore(Integer.MAX_VALUE); + final UnboundedProcessor frames = new UnboundedProcessor(); + + store.saveFrames(frames).subscribe(); + + final AtomicInteger terminationCnt = new AtomicInteger(); + final Consumer consumer = ReferenceCounted::release; + final Consumer errorConsumer = __ -> terminationCnt.incrementAndGet(); + final AtomicReference disposableReference = + new AtomicReference<>( + withLateSubscriber + ? null + : store.resumeStream().subscribe(consumer, errorConsumer)); + + final ByteBuf byteBuf1 = fakeResumableFrame(5); + final ByteBuf byteBuf11 = fakeConnectionFrame(5); + final ByteBuf byteBuf2 = fakeResumableFrame(6); + final ByteBuf byteBuf21 = fakeConnectionFrame(5); + final ByteBuf byteBuf3 = fakeResumableFrame(7); + final ByteBuf byteBuf31 = fakeConnectionFrame(5); + final ByteBuf byteBuf4 = fakeResumableFrame(8); + final ByteBuf byteBuf41 = fakeConnectionFrame(5); + final ByteBuf byteBuf5 = fakeResumableFrame(25); + final ByteBuf byteBuf51 = fakeConnectionFrame(35); + + RaceTestUtils.race( + () -> { + if (withLateSubscriber) { + disposableReference.set(store.resumeStream().subscribe(consumer, errorConsumer)); + } + // disconnect + disposableReference.get().dispose(); + + // mimic RESUME_OK frame received but with incorrect position + store.releaseFrames(25); + disposableReference.set(store.resumeStream().subscribe(consumer, errorConsumer)); + }, + () -> { + frames.onNext(byteBuf1); + frames.onNextPrioritized(byteBuf11); + frames.onNext(byteBuf2); + frames.onNext(byteBuf3); + frames.onNextPrioritized(byteBuf31); + frames.onNext(byteBuf4); + frames.onNext(byteBuf5); + }, + () -> { + frames.onNextPrioritized(byteBuf21); + frames.onNextPrioritized(byteBuf41); + frames.onNextPrioritized(byteBuf51); + }); + + assertThat(store.cacheSize).isZero(); + assertThat(store.cachedFrames).isEmpty(); + assertThat(disposableReference.get().isDisposed()).isTrue(); + assertThat(terminationCnt).hasValue(1); + + assertThat(byteBuf1.refCnt()).isZero(); + assertThat(byteBuf11.refCnt()).isZero(); + assertThat(byteBuf2.refCnt()).isZero(); + assertThat(byteBuf21.refCnt()).isZero(); + assertThat(byteBuf3.refCnt()).isZero(); + assertThat(byteBuf31.refCnt()).isZero(); + assertThat(byteBuf4.refCnt()).isZero(); + assertThat(byteBuf41.refCnt()).isZero(); + assertThat(byteBuf5.refCnt()).isZero(); + assertThat(byteBuf51.refCnt()).isZero(); + } + } finally { + Hooks.resetOnErrorDropped(); + } + } + + @ParameterizedTest( + name = + "Dispose vs Sending vs Reconnect with incorrect position Race Test. WithLateSubscriber[{0}]") + @ValueSource(booleans = {true, false}) + void incorrectReleaseFramesWithOnNextWithDisposeRaceTest(boolean withLateSubscriber) { + Hooks.onErrorDropped(t -> {}); + try { + for (int i = 0; i < RaceTestConstants.REPEATS; i++) { + final InMemoryResumableFramesStore store = inMemoryStore(Integer.MAX_VALUE); + final UnboundedProcessor frames = new UnboundedProcessor(); + + store.saveFrames(frames).subscribe(); + + final AtomicInteger terminationCnt = new AtomicInteger(); + final Consumer consumer = ReferenceCounted::release; + final Consumer errorConsumer = __ -> terminationCnt.incrementAndGet(); + final AtomicReference disposableReference = + new AtomicReference<>( + withLateSubscriber + ? null + : store.resumeStream().subscribe(consumer, errorConsumer)); + + final ByteBuf byteBuf1 = fakeResumableFrame(5); + final ByteBuf byteBuf11 = fakeConnectionFrame(5); + final ByteBuf byteBuf2 = fakeResumableFrame(6); + final ByteBuf byteBuf21 = fakeConnectionFrame(5); + final ByteBuf byteBuf3 = fakeResumableFrame(7); + final ByteBuf byteBuf31 = fakeConnectionFrame(5); + final ByteBuf byteBuf4 = fakeResumableFrame(8); + final ByteBuf byteBuf41 = fakeConnectionFrame(5); + final ByteBuf byteBuf5 = fakeResumableFrame(25); + final ByteBuf byteBuf51 = fakeConnectionFrame(35); + + RaceTestUtils.race( + () -> { + if (withLateSubscriber) { + disposableReference.set(store.resumeStream().subscribe(consumer, errorConsumer)); + } + // disconnect + disposableReference.get().dispose(); + + // mimic RESUME_OK frame received but with incorrect position + store.releaseFrames(25); + disposableReference.set(store.resumeStream().subscribe(consumer, errorConsumer)); + }, + () -> { + frames.onNext(byteBuf1); + frames.onNextPrioritized(byteBuf11); + frames.onNext(byteBuf2); + frames.onNext(byteBuf3); + frames.onNextPrioritized(byteBuf31); + frames.onNext(byteBuf4); + frames.onNext(byteBuf5); + }, + () -> { + frames.onNextPrioritized(byteBuf21); + frames.onNextPrioritized(byteBuf41); + frames.onNextPrioritized(byteBuf51); + }, + store::dispose); + + assertThat(store.cacheSize).isZero(); + assertThat(store.cachedFrames).isEmpty(); + assertThat(disposableReference.get().isDisposed()).isTrue(); + assertThat(terminationCnt).hasValueGreaterThanOrEqualTo(1).hasValueLessThanOrEqualTo(2); + + assertThat(byteBuf1.refCnt()).isZero(); + assertThat(byteBuf11.refCnt()).isZero(); + assertThat(byteBuf2.refCnt()).isZero(); + assertThat(byteBuf21.refCnt()).isZero(); + assertThat(byteBuf3.refCnt()).isZero(); + assertThat(byteBuf31.refCnt()).isZero(); + assertThat(byteBuf4.refCnt()).isZero(); + assertThat(byteBuf41.refCnt()).isZero(); + assertThat(byteBuf5.refCnt()).isZero(); + assertThat(byteBuf51.refCnt()).isZero(); + } + } finally { + Hooks.resetOnErrorDropped(); + } + } + private int size(ByteBuf... byteBufs) { return Arrays.stream(byteBufs).mapToInt(ByteBuf::readableBytes).sum(); } private static InMemoryResumableFramesStore inMemoryStore(int size) { - return new InMemoryResumableFramesStore("test", size); + return new InMemoryResumableFramesStore("test", Unpooled.EMPTY_BUFFER, size); } private static ByteBuf fakeResumableFrame(int size) { diff --git a/rsocket-examples/src/test/java/io/rsocket/resume/ResumeIntegrationTest.java b/rsocket-examples/src/test/java/io/rsocket/resume/ResumeIntegrationTest.java index b2dad0022..5eb78fabe 100644 --- a/rsocket-examples/src/test/java/io/rsocket/resume/ResumeIntegrationTest.java +++ b/rsocket-examples/src/test/java/io/rsocket/resume/ResumeIntegrationTest.java @@ -182,7 +182,7 @@ private static Mono newClientRSocket( .resume( new Resume() .sessionDuration(Duration.ofSeconds(sessionDurationSeconds)) - .storeFactory(t -> new InMemoryResumableFramesStore("client", 500_000)) + .storeFactory(t -> new InMemoryResumableFramesStore("client", t, 500_000)) .cleanupStoreOnKeepAlive() .retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(1)))) .keepAlive(Duration.ofSeconds(5), Duration.ofMinutes(5)) @@ -199,7 +199,7 @@ private static Mono newServerRSocket(int sessionDurationSecond new Resume() .sessionDuration(Duration.ofSeconds(sessionDurationSeconds)) .cleanupStoreOnKeepAlive() - .storeFactory(t -> new InMemoryResumableFramesStore("server", 500_000))) + .storeFactory(t -> new InMemoryResumableFramesStore("server", t, 500_000))) .bind(serverTransport(SERVER_HOST, SERVER_PORT)); } @@ -212,7 +212,7 @@ public Flux requestChannel(Publisher payloads) { return duplicate( Flux.interval(Duration.ofMillis(1)) .onBackpressureLatest() - .publishOn(Schedulers.elastic()), + .publishOn(Schedulers.boundedElastic()), 20) .map(v -> DefaultPayload.create(String.valueOf(counter.getAndIncrement()))) .takeUntilOther(Flux.from(payloads).then()); diff --git a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java index 0bae8cd69..5384c7e8d 100644 --- a/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java +++ b/rsocket-test/src/main/java/io/rsocket/test/TransportTest.java @@ -51,7 +51,6 @@ import org.assertj.core.api.Assertions; import org.assertj.core.api.Assumptions; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; @@ -60,7 +59,6 @@ import reactor.core.Exceptions; import reactor.core.Fuseable; import reactor.core.publisher.Flux; -import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.Operators; @@ -93,14 +91,8 @@ static String read(String resourceName) { } } - @BeforeEach - default void setUp() { - Hooks.onOperatorDebug(); - } - @AfterEach default void close() { - Hooks.resetOnOperatorDebug(); getTransportPair().responder.awaitAllInteractionTermination(getTimeout()); getTransportPair().dispose(); getTransportPair().awaitClosed(); @@ -547,7 +539,7 @@ public TransportPair( "Server", duplexConnection, Duration.ofMillis( - ThreadLocalRandom.current().nextInt(10, 1500))) + ThreadLocalRandom.current().nextInt(100, 1000))) : duplexConnection); } }); @@ -555,7 +547,8 @@ public TransportPair( if (withResumability) { rSocketServer.resume( new Resume() - .storeFactory(__ -> new InMemoryResumableFramesStore("server", Integer.MAX_VALUE))); + .storeFactory( + token -> new InMemoryResumableFramesStore("server", token, Integer.MAX_VALUE))); } if (withRandomFragmentation) { @@ -568,7 +561,7 @@ public TransportPair( final RSocketConnector rSocketConnector = RSocketConnector.create() .payloadDecoder(PayloadDecoder.ZERO_COPY) - .keepAlive(Duration.ofMillis(Integer.MAX_VALUE), Duration.ofMillis(Integer.MAX_VALUE)) + .keepAlive(Duration.ofMillis(10), Duration.ofHours(1)) .interceptors( registry -> { if (runClientWithAsyncInterceptors && !withResumability) { @@ -594,7 +587,7 @@ public TransportPair( "Client", duplexConnection, Duration.ofMillis( - ThreadLocalRandom.current().nextInt(1, 2000))) + ThreadLocalRandom.current().nextInt(10, 1500))) : duplexConnection); } }); @@ -602,7 +595,8 @@ public TransportPair( if (withResumability) { rSocketConnector.resume( new Resume() - .storeFactory(__ -> new InMemoryResumableFramesStore("client", Integer.MAX_VALUE))); + .storeFactory( + token -> new InMemoryResumableFramesStore("client", token, Integer.MAX_VALUE))); } if (withRandomFragmentation) { diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java index 588f772d3..113b7a2f8 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalClientTransport.java @@ -77,14 +77,17 @@ public Mono connect() { return Mono.error(new IllegalArgumentException("Could not find server: " + name)); } - UnboundedProcessor in = new UnboundedProcessor(); - UnboundedProcessor out = new UnboundedProcessor(); - Sinks.Empty closeSink = Sinks.empty(); + Sinks.One inSink = Sinks.one(); + Sinks.One outSink = Sinks.one(); + UnboundedProcessor in = new UnboundedProcessor(() -> inSink.tryEmitValue(inSink)); + UnboundedProcessor out = new UnboundedProcessor(() -> outSink.tryEmitValue(outSink)); - server.apply(new LocalDuplexConnection(name, allocator, out, in, closeSink)).subscribe(); + Mono onClose = inSink.asMono().zipWith(outSink.asMono()).then(); - return Mono.just( - (DuplexConnection) new LocalDuplexConnection(name, allocator, in, out, closeSink)); + server.apply(new LocalDuplexConnection(name, allocator, out, in, onClose)).subscribe(); + + return Mono.just( + new LocalDuplexConnection(name, allocator, in, out, onClose)); }); } } diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java index 5e18aa4cc..5c395156c 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java @@ -27,11 +27,9 @@ import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Fuseable; -import reactor.core.Scannable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; -import reactor.core.publisher.Sinks; /** An implementation of {@link DuplexConnection} that connects inside the same JVM. */ final class LocalDuplexConnection implements DuplexConnection { @@ -40,7 +38,7 @@ final class LocalDuplexConnection implements DuplexConnection { private final ByteBufAllocator allocator; private final Flux in; - private final Sinks.Empty onClose; + private final Mono onClose; private final UnboundedProcessor out; @@ -58,7 +56,7 @@ final class LocalDuplexConnection implements DuplexConnection { ByteBufAllocator allocator, Flux in, UnboundedProcessor out, - Sinks.Empty onClose) { + Mono onClose) { this.address = new LocalSocketAddress(name); this.allocator = Objects.requireNonNull(allocator, "allocator must not be null"); this.in = Objects.requireNonNull(in, "in must not be null"); @@ -69,24 +67,23 @@ final class LocalDuplexConnection implements DuplexConnection { @Override public void dispose() { out.onComplete(); - onClose.tryEmitEmpty(); } @Override - @SuppressWarnings("ConstantConditions") public boolean isDisposed() { - return onClose.scan(Scannable.Attr.TERMINATED) || onClose.scan(Scannable.Attr.CANCELLED); + return out.isDisposed(); } @Override public Mono onClose() { - return onClose.asMono(); + return onClose; } @Override public Flux receive() { return in.transform( - Operators.lift((__, actual) -> new ByteBufReleaserOperator(actual))); + Operators.lift( + (__, actual) -> new ByteBufReleaserOperator(actual, this))); } @Override @@ -119,11 +116,14 @@ static class ByteBufReleaserOperator implements CoreSubscriber, Subscription, Fuseable.QueueSubscription { final CoreSubscriber actual; + final LocalDuplexConnection parent; Subscription s; - public ByteBufReleaserOperator(CoreSubscriber actual) { + public ByteBufReleaserOperator( + CoreSubscriber actual, LocalDuplexConnection parent) { this.actual = actual; + this.parent = parent; } @Override @@ -136,17 +136,22 @@ public void onSubscribe(Subscription s) { @Override public void onNext(ByteBuf buf) { - actual.onNext(buf); - buf.release(); + try { + actual.onNext(buf); + } finally { + buf.release(); + } } @Override public void onError(Throwable t) { + parent.out.onError(t); actual.onError(t); } @Override public void onComplete() { + parent.out.onComplete(); actual.onComplete(); } @@ -158,6 +163,7 @@ public void request(long n) { @Override public void cancel() { s.cancel(); + parent.out.onComplete(); } @Override diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java index 8bea7c682..51c812cc3 100644 --- a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableTransportTest.java @@ -19,9 +19,7 @@ import io.rsocket.test.TransportTest; import java.time.Duration; import java.util.UUID; -import org.junit.jupiter.api.Disabled; -@Disabled("leaking somewhere for no clear reason") final class LocalResumableTransportTest implements TransportTest { private final TransportPair transportPair = diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableWithFragmentationTransportTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableWithFragmentationTransportTest.java new file mode 100644 index 000000000..124cecec9 --- /dev/null +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalResumableWithFragmentationTransportTest.java @@ -0,0 +1,42 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.local; + +import io.rsocket.test.TransportTest; +import java.time.Duration; +import java.util.UUID; + +final class LocalResumableWithFragmentationTransportTest implements TransportTest { + + private final TransportPair transportPair = + new TransportPair<>( + () -> "test-" + UUID.randomUUID(), + (address, server, allocator) -> LocalClientTransport.create(address, allocator), + (address, allocator) -> LocalServerTransport.create(address), + true, + true); + + @Override + public Duration getTimeout() { + return Duration.ofSeconds(10); + } + + @Override + public TransportPair getTransportPair() { + return transportPair; + } +} diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java new file mode 100644 index 000000000..7d9d80542 --- /dev/null +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpResumableWithFragmentationTransportTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.netty; + +import io.netty.channel.ChannelOption; +import io.rsocket.test.TransportTest; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.TcpServerTransport; +import java.net.InetSocketAddress; +import java.time.Duration; +import reactor.netty.tcp.TcpClient; +import reactor.netty.tcp.TcpServer; + +final class TcpResumableWithFragmentationTransportTest implements TransportTest { + + private final TransportPair transportPair = + new TransportPair<>( + () -> InetSocketAddress.createUnresolved("localhost", 0), + (address, server, allocator) -> + TcpClientTransport.create( + TcpClient.create() + .remoteAddress(server::address) + .option(ChannelOption.ALLOCATOR, allocator)), + (address, allocator) -> + TcpServerTransport.create( + TcpServer.create() + .bindAddress(() -> address) + .option(ChannelOption.ALLOCATOR, allocator)), + true, + true); + + @Override + public Duration getTimeout() { + return Duration.ofMinutes(3); + } + + @Override + public TransportPair getTransportPair() { + return transportPair; + } +} diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java new file mode 100644 index 000000000..34dc99ae0 --- /dev/null +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableTransportTest.java @@ -0,0 +1,58 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.netty; + +import io.netty.channel.ChannelOption; +import io.rsocket.test.TransportTest; +import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.transport.netty.server.WebsocketServerTransport; +import java.net.InetSocketAddress; +import java.time.Duration; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.server.HttpServer; + +final class WebsocketResumableTransportTest implements TransportTest { + + private final TransportPair transportPair = + new TransportPair<>( + () -> InetSocketAddress.createUnresolved("localhost", 0), + (address, server, allocator) -> + WebsocketClientTransport.create( + HttpClient.create() + .host(server.address().getHostName()) + .port(server.address().getPort()) + .option(ChannelOption.ALLOCATOR, allocator), + ""), + (address, allocator) -> + WebsocketServerTransport.create( + HttpServer.create() + .host(address.getHostName()) + .port(address.getPort()) + .option(ChannelOption.ALLOCATOR, allocator)), + false, + true); + + @Override + public Duration getTimeout() { + return Duration.ofMinutes(3); + } + + @Override + public TransportPair getTransportPair() { + return transportPair; + } +} diff --git a/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java new file mode 100644 index 000000000..21c027e88 --- /dev/null +++ b/rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketResumableWithFragmentationTransportTest.java @@ -0,0 +1,58 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.netty; + +import io.netty.channel.ChannelOption; +import io.rsocket.test.TransportTest; +import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.transport.netty.server.WebsocketServerTransport; +import java.net.InetSocketAddress; +import java.time.Duration; +import reactor.netty.http.client.HttpClient; +import reactor.netty.http.server.HttpServer; + +final class WebsocketResumableWithFragmentationTransportTest implements TransportTest { + + private final TransportPair transportPair = + new TransportPair<>( + () -> InetSocketAddress.createUnresolved("localhost", 0), + (address, server, allocator) -> + WebsocketClientTransport.create( + HttpClient.create() + .host(server.address().getHostName()) + .port(server.address().getPort()) + .option(ChannelOption.ALLOCATOR, allocator), + ""), + (address, allocator) -> + WebsocketServerTransport.create( + HttpServer.create() + .host(address.getHostName()) + .port(address.getPort()) + .option(ChannelOption.ALLOCATOR, allocator)), + true, + true); + + @Override + public Duration getTimeout() { + return Duration.ofMinutes(3); + } + + @Override + public TransportPair getTransportPair() { + return transportPair; + } +}