Skip to content

Conversation

fanpipi
Copy link

@fanpipi fanpipi commented Jul 10, 2024

版本:
1.1.5

部署情况:
三节点的Gossip广播模式集群部署。

缺陷现象描述:
broker三节点关停一段时间后启动,存在部分客户端无法全部自动注册上三个broker节点。

异常类:
com.alibaba.rsocket.loadbalance.LoadBalancedRSocket

问题原因:
在所有节点关停后,客户端中维持的最后一条活跃连接触发onRSocketClosed,并将原始三节点uri进行refreshRsockets。这三条链接在connect或healthcheck阶段中会抛出连接相关异常,进而触发后续的异常处理流程;
但在此链接的异常处理流程执行完成前,若其余连接仍未抛出异常执行对应异常处理流程,则会被异常撤销流程,以至于无法加入unHealthyUriSet。故checkUnhealthyUris方法无法重连此异常链接。
1720589728359

处理思路:
在refreshRsockets方法中对connect及healthcheck阶段的异常进行捕获并不再抛出,返回空流。
1720590425614

@minh-tn-hust
Copy link

Hi @fanpipi, I have a similar issue with reconnection in a Kubernetes (K8S) environment. Currently, the client connects to a broker pod. When scaling down, the pod is forcefully terminated without a proper shutdown, causing the health check to return a TimeoutException. This exception is not handled anywhere, so the client remains disconnected even though another broker pod is available. Does your solution also address this issue?

@fanpipi
Copy link
Author

fanpipi commented Jun 10, 2025

Hi @fanpipi, I have a similar issue with reconnection in a Kubernetes (K8S) environment. Currently, the client connects to a broker pod. When scaling down, the pod is forcefully terminated without a proper shutdown, causing the health check to return a TimeoutException. This exception is not handled anywhere, so the client remains disconnected even though another broker pod is available. Does your solution also address this issue?

HI @minh-tn-hust, It looks like your problem involves the broker failing to reconnect after abnormal recovery - this PR could potentially address the issue.

@minh-tn-hust
Copy link

minh-tn-hust commented Jun 10, 2025

Hi @fanpipi , my issue occurs when a client tries to connect to a broker after a scale-up operation in a Kubernetes environment. Existing brokers emit consistent CloudEvents to the client, but a newly created broker emits two events:

  • The first is triggered when ClusterDiscovery starts and queries Kubernetes to discover all available brokers.

  • The second is sent once the new broker finishes its initialization and joins the cluster.

This causes the client to receive inconsistent CloudEvents. The problem is worsened because "RefuseConnection" events are not handled correctly, and the disconnection process is asynchronous. As a result, when the new CloudEvent arrives, the client may have already missed the chance to connect properly to the new broker. The following udpate in RSocketBrokerManagerDiscoveryImpl.java could fix the issue (as I tested)

+private boolean isFinishStartUp = false;

public RSocketBrokerManagerDiscoveryImpl(ReactiveDiscoveryClient discoveryClient, RSocketBrokerProperties properties) {
        this.SERVICE_NAME = properties.getDiscovery().getServiceName();
        this.REFRESH_INTERVAL_SECONDS = properties.getDiscovery().getRefreshInterval();
        this.discoveryClient = discoveryClient;
        this.brokersFresher = Flux.interval(Duration.ofSeconds(REFRESH_INTERVAL_SECONDS))
                .flatMap(aLong -> this.discoveryClient.getInstances(SERVICE_NAME).collectList())
                .subscribe(serviceInstances -> {
                    boolean changed = serviceInstances.size() != currentBrokers.size();
                    for (ServiceInstance serviceInstance : serviceInstances) {
                        if (!currentBrokers.containsKey(serviceInstance.getHost())) {
                            changed = true;
                        }

+                        // make sure current broker is on running pod
+                       if (serviceInstance.getHost().equals(NetworkUtil.LOCAL_IP)) {
+                          isFinishStartUp = true;
+                        }
+                   }

+                    // make sure the service is fully startup before emitting brokerChangedEvent
+                   if (changed && isFinishStartUp) {
                        currentBrokers = serviceInstances.stream().map(serviceInstance -> {
                            RSocketBroker broker = new RSocketBroker();
                            broker.setIp(serviceInstance.getHost());
                            broker.setSchema(properties.getSchema());
                            broker.setPort(properties.getPortHealthCheck());
                            return broker;
                        }).collect(Collectors.toMap(RSocketBroker::getIp, Function.identity()));
                        log.info(RsocketErrorCode.message("RST-300206", String.join(",", currentBrokers.keySet())));
                        brokersEmitterProcessor.tryEmitNext(currentBrokers.values());
                    }
                }, error -> log.error("Broker refresher encounter an error: ", error));
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants