Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.rsocket.resume;

import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.PayloadFrameCodec;
import io.rsocket.internal.UnboundedProcessor;
import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.LL_Result;
import reactor.core.Disposable;

public class InMemoryResumableFramesStoreStressTest {
boolean storeClosed;

InMemoryResumableFramesStore store =
new InMemoryResumableFramesStore("test", Unpooled.EMPTY_BUFFER, 128);
boolean processorClosed;
UnboundedProcessor processor = new UnboundedProcessor(() -> processorClosed = true);

void subscribe() {
store.saveFrames(processor).subscribe();
store.onClose().subscribe(null, t -> storeClosed = true, () -> storeClosed = true);
}

@JCStressTest
@Outcome(
id = {"true, true"},
expect = ACCEPTABLE)
@State
public static class TwoSubscribesRaceStressTest extends InMemoryResumableFramesStoreStressTest {

Disposable d1;

final ByteBuf b1 =
PayloadFrameCodec.encode(
ByteBufAllocator.DEFAULT,
1,
false,
true,
false,
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello1"),
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello2"));
final ByteBuf b2 =
PayloadFrameCodec.encode(
ByteBufAllocator.DEFAULT,
3,
false,
true,
false,
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello3"),
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello4"));
final ByteBuf b3 =
PayloadFrameCodec.encode(
ByteBufAllocator.DEFAULT,
5,
false,
true,
false,
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello5"),
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "hello6"));

final ByteBuf c1 =
ErrorFrameCodec.encode(ByteBufAllocator.DEFAULT, 0, new ConnectionErrorException("closed"));

{
subscribe();
d1 = store.doOnDiscard(ByteBuf.class, ByteBuf::release).subscribe(ByteBuf::release, t -> {});
}

@Actor
public void producer1() {
processor.tryEmitNormal(b1);
processor.tryEmitNormal(b2);
processor.tryEmitNormal(b3);
}

@Actor
public void producer2() {
processor.tryEmitFinal(c1);
}

@Actor
public void producer3() {
d1.dispose();
store
.doOnDiscard(ByteBuf.class, ByteBuf::release)
.subscribe(ByteBuf::release, t -> {})
.dispose();
store
.doOnDiscard(ByteBuf.class, ByteBuf::release)
.subscribe(ByteBuf::release, t -> {})
.dispose();
store.doOnDiscard(ByteBuf.class, ByteBuf::release).subscribe(ByteBuf::release, t -> {});
}

@Actor
public void producer4() {
store.releaseFrames(0);
store.releaseFrames(0);
store.releaseFrames(0);
}

@Arbiter
public void arbiter(LL_Result r) {
r.r1 = storeClosed;
r.r2 = processorClosed;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public ClientServerInputMultiplexer(
this.source = source;
this.isClient = isClient;

this.serverReceiver = new InternalDuplexConnection(this, source);
this.clientReceiver = new InternalDuplexConnection(this, source);
this.serverReceiver = new InternalDuplexConnection(Type.SERVER, this, source);
this.clientReceiver = new InternalDuplexConnection(Type.CLIENT, this, source);
this.serverConnection = registry.initConnection(Type.SERVER, serverReceiver);
this.clientConnection = registry.initConnection(Type.CLIENT, clientReceiver);
}
Expand Down Expand Up @@ -195,8 +195,33 @@ int incrementAndGetCheckingState() {
}
}

@Override
public String toString() {
return "ClientServerInputMultiplexer{"
+ "serverReceiver="
+ serverReceiver
+ ", clientReceiver="
+ clientReceiver
+ ", serverConnection="
+ serverConnection
+ ", clientConnection="
+ clientConnection
+ ", source="
+ source
+ ", isClient="
+ isClient
+ ", s="
+ s
+ ", t="
+ t
+ ", state="
+ state
+ '}';
}

private static class InternalDuplexConnection extends Flux<ByteBuf>
implements Subscription, DuplexConnection {
private final Type type;
private final ClientServerInputMultiplexer clientServerInputMultiplexer;
private final DuplexConnection source;

Expand All @@ -207,7 +232,10 @@ private static class InternalDuplexConnection extends Flux<ByteBuf>
CoreSubscriber<? super ByteBuf> actual;

public InternalDuplexConnection(
ClientServerInputMultiplexer clientServerInputMultiplexer, DuplexConnection source) {
Type type,
ClientServerInputMultiplexer clientServerInputMultiplexer,
DuplexConnection source) {
this.type = type;
this.clientServerInputMultiplexer = clientServerInputMultiplexer;
this.source = source;
}
Expand Down Expand Up @@ -304,5 +332,17 @@ public Mono<Void> onClose() {
public double availability() {
return source.availability();
}

@Override
public String toString() {
return "InternalDuplexConnection{"
+ "type="
+ type
+ ", source="
+ source
+ ", state="
+ state
+ '}';
}
}
}
18 changes: 14 additions & 4 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.function.Supplier;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuples;
import reactor.util.retry.Retry;
Expand Down Expand Up @@ -633,8 +634,7 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
wrappedConnection = resumableDuplexConnection;
} else {
keepAliveHandler =
new KeepAliveHandler.DefaultKeepAliveHandler(
clientServerConnection);
new KeepAliveHandler.DefaultKeepAliveHandler();
wrappedConnection = clientServerConnection;
}

Expand All @@ -655,6 +655,11 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
requesterLeaseTracker = null;
}

final Sinks.Empty<Void> requesterOnAllClosedSink =
Sinks.unsafe().empty();
final Sinks.Empty<Void> responderOnAllClosedSink =
Sinks.unsafe().empty();

RSocket rSocketRequester =
new RSocketRequester(
multiplexer.asClientConnection(),
Expand All @@ -667,7 +672,11 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
(int) keepAliveMaxLifeTime.toMillis(),
keepAliveHandler,
interceptors::initRequesterRequestInterceptor,
requesterLeaseTracker);
requesterLeaseTracker,
requesterOnAllClosedSink,
Mono.whenDelayError(
responderOnAllClosedSink.asMono(),
requesterOnAllClosedSink.asMono()));

RSocket wrappedRSocketRequester =
interceptors.initRequester(rSocketRequester);
Expand Down Expand Up @@ -715,7 +724,8 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
(RequestInterceptor)
leases.sender)
: interceptors
::initResponderRequestInterceptor);
::initResponderRequestInterceptor,
responderOnAllClosedSink);

return wrappedRSocketRequester;
})
Expand Down
Loading