Skip to content

Commit d70c247

Browse files
committed
fixes
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 2fee350 commit d70c247

File tree

7 files changed

+71
-67
lines changed

7 files changed

+71
-67
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -676,12 +676,8 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
676676
requesterLeaseTracker,
677677
requesterOnAllClosedSink,
678678
Mono.whenDelayError(
679-
responderOnAllClosedSink
680-
.asMono()
681-
.log("client-responder"),
682-
requesterOnAllClosedSink
683-
.asMono()
684-
.log("client-requester")));
679+
responderOnAllClosedSink.asMono(),
680+
requesterOnAllClosedSink.asMono()));
685681

686682
RSocket wrappedRSocketRequester =
687683
interceptors.initRequester(rSocketRequester);

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -315,23 +315,29 @@ private void tryTerminateOnKeepAlive(KeepAliveSupport.KeepAlive keepAlive) {
315315
}
316316

317317
private void tryShutdown(Throwable e) {
318-
LOGGER.info("trying to close requester " + getDuplexConnection());
318+
if (LOGGER.isDebugEnabled()) {
319+
LOGGER.debug("trying to close requester " + getDuplexConnection());
320+
}
319321
if (terminationError == null) {
320322
if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
321323
terminate(CLOSED_CHANNEL_EXCEPTION);
322324
} else {
325+
if (LOGGER.isDebugEnabled()) {
326+
LOGGER.debug(
327+
"trying to close requester failed because of "
328+
+ terminationError
329+
+ " "
330+
+ getDuplexConnection());
331+
}
332+
}
333+
} else {
334+
if (LOGGER.isDebugEnabled()) {
323335
LOGGER.info(
324336
"trying to close requester failed because of "
325337
+ terminationError
326338
+ " "
327339
+ getDuplexConnection());
328340
}
329-
} else {
330-
LOGGER.info(
331-
"trying to close requester failed because of "
332-
+ terminationError
333-
+ " "
334-
+ getDuplexConnection());
335341
}
336342
}
337343

@@ -340,50 +346,64 @@ private void tryTerminateOnZeroError(ByteBuf errorFrame) {
340346
}
341347

342348
private void tryTerminate(Supplier<Throwable> errorSupplier) {
343-
LOGGER.info("trying to close requester " + getDuplexConnection());
349+
if (LOGGER.isDebugEnabled()) {
350+
LOGGER.debug("trying to close requester " + getDuplexConnection());
351+
}
344352
if (terminationError == null) {
345353
Throwable e = errorSupplier.get();
346354
if (TERMINATION_ERROR.compareAndSet(this, null, e)) {
347355
terminate(e);
348356
} else {
349-
LOGGER.info(
357+
if (LOGGER.isDebugEnabled()) {
358+
LOGGER.debug(
359+
"trying to close requester failed because of "
360+
+ terminationError
361+
+ " "
362+
+ getDuplexConnection());
363+
}
364+
}
365+
} else {
366+
if (LOGGER.isDebugEnabled()) {
367+
LOGGER.debug(
350368
"trying to close requester failed because of "
351369
+ terminationError
352370
+ " "
353371
+ getDuplexConnection());
354372
}
355-
} else {
356-
LOGGER.info(
357-
"trying to close requester failed because of "
358-
+ terminationError
359-
+ " "
360-
+ getDuplexConnection());
361373
}
362374
}
363375

364376
private void tryShutdown() {
365-
LOGGER.info("trying to close requester " + getDuplexConnection());
377+
if (LOGGER.isDebugEnabled()) {
378+
LOGGER.debug("trying to close requester " + getDuplexConnection());
379+
}
366380
if (terminationError == null) {
367381
if (TERMINATION_ERROR.compareAndSet(this, null, CLOSED_CHANNEL_EXCEPTION)) {
368382
terminate(CLOSED_CHANNEL_EXCEPTION);
369383
} else {
370-
LOGGER.info(
384+
if (LOGGER.isDebugEnabled()) {
385+
LOGGER.debug(
386+
"trying to close requester failed because of "
387+
+ terminationError
388+
+ " "
389+
+ getDuplexConnection());
390+
}
391+
}
392+
} else {
393+
if (LOGGER.isDebugEnabled()) {
394+
LOGGER.debug(
371395
"trying to close requester failed because of "
372396
+ terminationError
373397
+ " "
374398
+ getDuplexConnection());
375399
}
376-
} else {
377-
LOGGER.info(
378-
"trying to close requester failed because of "
379-
+ terminationError
380-
+ " "
381-
+ getDuplexConnection());
382400
}
383401
}
384402

385403
private void terminate(Throwable e) {
386-
LOGGER.info("closing requester " + getDuplexConnection() + " due to " + e);
404+
if (LOGGER.isDebugEnabled()) {
405+
LOGGER.debug("closing requester " + getDuplexConnection() + " due to " + e);
406+
}
387407
if (keepAliveFramesAcceptor != null) {
388408
keepAliveFramesAcceptor.dispose();
389409
}
@@ -417,7 +437,8 @@ private void terminate(Throwable e) {
417437
} else {
418438
onThisSideClosedSink.tryEmitError(e);
419439
}
420-
421-
LOGGER.info("requester closed " + getDuplexConnection());
440+
if (LOGGER.isDebugEnabled()) {
441+
LOGGER.debug("requester closed " + getDuplexConnection());
442+
}
422443
}
423444
}

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,17 @@ class RSocketResponder extends RequesterResponderSupport implements RSocket {
9696
}
9797

9898
private void tryTerminateOnConnectionError(Throwable e) {
99-
LOGGER.info("Try terminate connection on responder side");
99+
if (LOGGER.isDebugEnabled()) {
100+
101+
LOGGER.debug("Try terminate connection on responder side");
102+
}
100103
tryTerminate(() -> e);
101104
}
102105

103106
private void tryTerminateOnConnectionClose() {
104-
LOGGER.info("Try terminate connection on responder side");
107+
if (LOGGER.isDebugEnabled()) {
108+
LOGGER.info("Try terminate connection on responder side");
109+
}
105110
tryTerminate(() -> CLOSED_CHANNEL_EXCEPTION);
106111
}
107112

@@ -175,7 +180,9 @@ public Mono<Void> onClose() {
175180
}
176181

177182
final void doOnDispose() {
178-
LOGGER.info("closing responder " + getDuplexConnection());
183+
if (LOGGER.isDebugEnabled()) {
184+
LOGGER.debug("closing responder " + getDuplexConnection());
185+
}
179186
cleanUpSendingSubscriptions();
180187

181188
getDuplexConnection().dispose();
@@ -191,7 +198,9 @@ final void doOnDispose() {
191198

192199
requestHandler.dispose();
193200
onThisSideClosedSink.tryEmitEmpty();
194-
LOGGER.info("responder closed " + getDuplexConnection());
201+
if (LOGGER.isDebugEnabled()) {
202+
LOGGER.debug("responder closed " + getDuplexConnection());
203+
}
195204
}
196205

197206
private void cleanUpSendingSubscriptions() {

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ public ClientRSocketSession(
112112

113113
return connectionTransformer.apply(dc);
114114
})
115-
.log("checking what happens there")
116115
.doOnDiscard(
117116
Tuple2.class,
118117
tuple2 -> {

rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ public void dispose() {
374374
return;
375375
}
376376

377-
drain(previousState | DISPOSED_FLAG);
377+
drain((previousState + 1) | DISPOSED_FLAG);
378378
}
379379

380380
void clearCache() {
@@ -557,12 +557,13 @@ public void onNext(ByteBuf byteBuf) {
557557
return;
558558
}
559559

560-
if (isWorkInProgress(previousState)
561-
|| (!isConnected(previousState) && !hasPendingConnection(previousState))) {
560+
if (isWorkInProgress(previousState)) {
562561
return;
563562
}
564563

565-
parent.drain(previousState + 1);
564+
if (isConnected(previousState) || hasPendingConnection(previousState)) {
565+
parent.drain((previousState + 1) | HAS_FRAME_FLAG);
566+
}
566567
}
567568

568569
@Override
@@ -587,7 +588,7 @@ public void onError(Throwable t) {
587588
return;
588589
}
589590

590-
parent.drain(previousState | TERMINATED_FLAG);
591+
parent.drain((previousState + 1) | TERMINATED_FLAG);
591592
}
592593

593594
@Override
@@ -596,7 +597,6 @@ public void onComplete() {
596597
return;
597598
}
598599

599-
// logger.info("[side = {}] received onComplete", parent.side);
600600
final InMemoryResumableFramesStore parent = this.parent;
601601

602602
this.done = true;
@@ -610,7 +610,7 @@ public void onComplete() {
610610
return;
611611
}
612612

613-
parent.drain(previousState | TERMINATED_FLAG);
613+
parent.drain((previousState + 1) | TERMINATED_FLAG);
614614
}
615615

616616
@Override

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,10 @@
2525
import io.rsocket.internal.BaseDuplexConnection;
2626
import java.net.SocketAddress;
2727
import java.util.Objects;
28-
import java.util.logging.Level;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
3130
import reactor.core.publisher.Flux;
3231
import reactor.core.publisher.Mono;
33-
import reactor.core.publisher.SignalType;
3432
import reactor.netty.Connection;
3533

3634
/** An implementation of {@link DuplexConnection} that connects via TCP. */
@@ -95,16 +93,7 @@ public void sendErrorAndClose(RSocketErrorException e) {
9593

9694
@Override
9795
public Flux<ByteBuf> receive() {
98-
return connection
99-
.inbound()
100-
.receive()
101-
.map(FrameLengthCodec::frame)
102-
.log(
103-
this + " receive=",
104-
Level.INFO,
105-
SignalType.ON_COMPLETE,
106-
SignalType.ON_ERROR,
107-
SignalType.CANCEL);
96+
return connection.inbound().receive().map(FrameLengthCodec::frame);
10897
}
10998

11099
@Override

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,10 @@
2424
import io.rsocket.internal.BaseDuplexConnection;
2525
import java.net.SocketAddress;
2626
import java.util.Objects;
27-
import java.util.logging.Level;
2827
import org.slf4j.Logger;
2928
import org.slf4j.LoggerFactory;
3029
import reactor.core.publisher.Flux;
3130
import reactor.core.publisher.Mono;
32-
import reactor.core.publisher.SignalType;
3331
import reactor.netty.Connection;
3432

3533
/**
@@ -94,15 +92,7 @@ public Mono<Void> onClose() {
9492

9593
@Override
9694
public Flux<ByteBuf> receive() {
97-
return connection
98-
.inbound()
99-
.receive()
100-
.log(
101-
this + " receive=",
102-
Level.INFO,
103-
SignalType.ON_COMPLETE,
104-
SignalType.ON_ERROR,
105-
SignalType.CANCEL);
95+
return connection.inbound().receive();
10696
}
10797

10898
@Override

0 commit comments

Comments
 (0)