diff --git a/alibaba-rsocket-core/src/main/java/com/alibaba/rsocket/loadbalance/LoadBalancedRSocket.java b/alibaba-rsocket-core/src/main/java/com/alibaba/rsocket/loadbalance/LoadBalancedRSocket.java index 764a05110..3f5c256cd 100644 --- a/alibaba-rsocket-core/src/main/java/com/alibaba/rsocket/loadbalance/LoadBalancedRSocket.java +++ b/alibaba-rsocket-core/src/main/java/com/alibaba/rsocket/loadbalance/LoadBalancedRSocket.java @@ -138,10 +138,11 @@ private void refreshRsockets(Collection rsocketUris) { return connect(rsocketUri) //health check after connection .flatMap(rsocket -> healthCheck(rsocket, rsocketUri).map(payload -> Tuples.of(rsocketUri, rsocket))) - .doOnError(error -> { + .onErrorResume(error -> { log.error(RsocketErrorCode.message("RST-400500", rsocketUri), error); this.unHealthyUriSet.add(rsocketUri); tryToReconnect(rsocketUri, error); + return Mono.empty(); }); } }) diff --git a/alibaba-rsocket-core/src/main/java/com/alibaba/rsocket/upstream/UpstreamManagerImpl.java b/alibaba-rsocket-core/src/main/java/com/alibaba/rsocket/upstream/UpstreamManagerImpl.java index 571f9f88e..513808585 100644 --- a/alibaba-rsocket-core/src/main/java/com/alibaba/rsocket/upstream/UpstreamManagerImpl.java +++ b/alibaba-rsocket-core/src/main/java/com/alibaba/rsocket/upstream/UpstreamManagerImpl.java @@ -146,9 +146,9 @@ public void monitorClusters() { if (!brokerCluster.isLocal()) { //interval sync to broker to get last broker list in case of UpstreamClusterChangedEvent lost Flux.interval(Duration.ofSeconds(120)) - .flatMap(timestamp -> findBrokerDiscoveryService().getInstances("*")) - .map(serviceInstances -> serviceInstances.stream().map(RSocketServiceInstance::getUri).collect(Collectors.toList())) - .subscribe(uris -> brokerCluster.setUris(uris)); + .subscribe(timestamp -> findBrokerDiscoveryService().getInstances("*") + .map(serviceInstances -> serviceInstances.stream().map(RSocketServiceInstance::getUri).collect(Collectors.toList())) + .subscribe(uris -> brokerCluster.setUris(uris))); } if (p2pServices != null && !p2pServices.isEmpty()) { // interval sync to p2p service instances list