From 8e62933aea77aea99f7e24b94a85c86e529ae8fa Mon Sep 17 00:00:00 2001 From: "kecheng.gkc" Date: Mon, 25 Aug 2025 11:40:20 +0800 Subject: [PATCH 1/3] [ISSUE-9632] Fix: Pop Long-polling Not Awakened for V1 Retry Messages --- .../longpolling/PopLongPollingService.java | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index e87a8e803fd..b7bec566af6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.KeyBuilder; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.constant.LoggerName; @@ -163,11 +164,47 @@ public void notifyMessageArrivingWithRetryTopic(final String topic, final int qu if (KeyBuilder.isPopRetryTopicV2(topic)) { notifyTopic = KeyBuilder.parseNormalTopic(topic); } else { - notifyTopic = topic; + if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) + notifyTopic = findTopicForV1RetryTopic(topic); + else notifyTopic = topic; } notifyMessageArriving(notifyTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); } + /** + * Find the correct topic name for V1 retry topic by checking topicCidMap + * @param retryTopic V1 retry topic name + * @return the original topic name, retryTopic otherwise + */ + private String findTopicForV1RetryTopic(String retryTopic) { + // Check if the potential group exists in topicCidMap + boolean hasDuplicatedTopic = false; + String originalTopic = null; + for (String topic : topicCidMap.keySet()) { + ConcurrentHashMap cids = topicCidMap.get(topic); + if (cids != null) { + for (String cid : cids.keySet()) { + // Check if this cid could be the correct consumer group + String expectedRetryTopic = KeyBuilder.buildPopRetryTopicV1(topic, cid); + if (expectedRetryTopic.equals(retryTopic)) { + if(originalTopic == null){ + originalTopic = topic; + } + else { + hasDuplicatedTopic = true; + break; + } + } + } + } + } + if (hasDuplicatedTopic){ + return retryTopic; + } else { + return originalTopic; + } + } + public void notifyMessageArriving(final String topic, final int queueId, long offset, Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map properties) { ConcurrentHashMap cids = topicCidMap.get(topic); From e4912cffc583f6dd56c0f306565f6b4e947e131b Mon Sep 17 00:00:00 2001 From: "kecheng.gkc" Date: Fri, 29 Aug 2025 11:37:26 +0800 Subject: [PATCH 2/3] [ISSUE-9632] Refactor retry message logic based after review. Use properties inside the msg to parser the originTopic. --- .../longpolling/PopLongPollingService.java | 60 ++++++------------- .../broker/processor/PopReviveService.java | 1 + .../rocketmq/common/message/MessageConst.java | 2 + 3 files changed, 22 insertions(+), 41 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index b7bec566af6..827c062608c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -31,6 +31,7 @@ import org.apache.rocketmq.common.PopAckConstants; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.CommandCallback; @@ -160,51 +161,28 @@ public void notifyMessageArrivingWithRetryTopic(final String topic, final int qu public void notifyMessageArrivingWithRetryTopic(final String topic, final int queueId, long offset, Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map properties) { - String notifyTopic; - if (KeyBuilder.isPopRetryTopicV2(topic)) { - notifyTopic = KeyBuilder.parseNormalTopic(topic); - } else { - if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) - notifyTopic = findTopicForV1RetryTopic(topic); - else notifyTopic = topic; - } - notifyMessageArriving(notifyTopic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); - } - - /** - * Find the correct topic name for V1 retry topic by checking topicCidMap - * @param retryTopic V1 retry topic name - * @return the original topic name, retryTopic otherwise - */ - private String findTopicForV1RetryTopic(String retryTopic) { - // Check if the potential group exists in topicCidMap - boolean hasDuplicatedTopic = false; - String originalTopic = null; - for (String topic : topicCidMap.keySet()) { - ConcurrentHashMap cids = topicCidMap.get(topic); - if (cids != null) { - for (String cid : cids.keySet()) { - // Check if this cid could be the correct consumer group - String expectedRetryTopic = KeyBuilder.buildPopRetryTopicV1(topic, cid); - if (expectedRetryTopic.equals(retryTopic)) { - if(originalTopic == null){ - originalTopic = topic; - } - else { - hasDuplicatedTopic = true; - break; - } - } - } + String prefix = MixAll.RETRY_GROUP_TOPIC_PREFIX; + if (topic.startsWith(prefix)) { + // 从properties获取原始topic名称 + String originTopic = properties.get(MessageConst.PROPERTY_ORIGIN_TOPIC); + //根据原始topic和retryTopic,最后获得retryTopic对应的cid (可能还可以与topicCidMap验证一下) + String suffix = "_" + originTopic; //这里把下划线换成加号也是一样的 + String cid = topic.substring(prefix.length(), topic.length() - suffix.length()); + POP_LOGGER.info("Processing retry topic: {}, originTopic: {}, properties: {}", + topic, originTopic, properties); //grep "Processing retry topic" ~/logs/rocketmqlogs/pop.log可以看到日志 + POP_LOGGER.info("Extracted cid: {} from retry topic: {}", cid, topic); + //然后调用包含cid的notifyMessageArriving + long interval = brokerController.getBrokerConfig().getPopLongPollingForceNotifyInterval(); + boolean force = interval > 0L && offset % interval == 0L; + if (queueId >= 0) { + notifyMessageArriving(originTopic, -1, cid, force, tagsCode, msgStoreTime, filterBitMap, properties); } - } - if (hasDuplicatedTopic){ - return retryTopic; + notifyMessageArriving(originTopic, queueId, cid, force, tagsCode, msgStoreTime, filterBitMap, properties); } else { - return originalTopic; + //普通消息(非重试消息)还是走之前的逻辑不变 + notifyMessageArriving(topic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); } } - public void notifyMessageArriving(final String topic, final int queueId, long offset, Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map properties) { ConcurrentHashMap cids = topicCidMap.get(topic); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 2be41a69d63..3e77c16593d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -129,6 +129,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) if (messageExt.getReconsumeTimes() == 0 || msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) { msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime())); } + msgInner.getProperties().computeIfAbsent(MessageConst.PROPERTY_ORIGIN_TOPIC, k -> popCheckPoint.getTopic()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId()); PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner); diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index 24f7bdb99a5..06f2b35c2d6 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -24,6 +24,7 @@ public class MessageConst { public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT"; public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY"; public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC"; + public static final String PROPERTY_ORIGIN_TOPIC = "ORIGIN_TOPIC"; //Distinct for retry topic public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC"; public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID"; public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"; @@ -113,6 +114,7 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_WAIT_STORE_MSG_OK); STRING_HASH_SET.add(PROPERTY_DELAY_TIME_LEVEL); STRING_HASH_SET.add(PROPERTY_RETRY_TOPIC); + STRING_HASH_SET.add(PROPERTY_ORIGIN_TOPIC); STRING_HASH_SET.add(PROPERTY_REAL_TOPIC); STRING_HASH_SET.add(PROPERTY_REAL_QUEUE_ID); STRING_HASH_SET.add(PROPERTY_TRANSACTION_PREPARED); From 070e5a7571446c404979daf2ab200c6d4c20a451 Mon Sep 17 00:00:00 2001 From: Kecheng <60416677+KingCide@users.noreply.github.com> Date: Thu, 4 Sep 2025 17:03:18 +0800 Subject: [PATCH 3/3] Remove Chinese comments --- .../broker/longpolling/PopLongPollingService.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index 827c062608c..49d56d88191 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -163,15 +163,12 @@ public void notifyMessageArrivingWithRetryTopic(final String topic, final int qu Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map properties) { String prefix = MixAll.RETRY_GROUP_TOPIC_PREFIX; if (topic.startsWith(prefix)) { - // 从properties获取原始topic名称 String originTopic = properties.get(MessageConst.PROPERTY_ORIGIN_TOPIC); - //根据原始topic和retryTopic,最后获得retryTopic对应的cid (可能还可以与topicCidMap验证一下) - String suffix = "_" + originTopic; //这里把下划线换成加号也是一样的 + String suffix = "_" + originTopic; String cid = topic.substring(prefix.length(), topic.length() - suffix.length()); POP_LOGGER.info("Processing retry topic: {}, originTopic: {}, properties: {}", - topic, originTopic, properties); //grep "Processing retry topic" ~/logs/rocketmqlogs/pop.log可以看到日志 + topic, originTopic, properties); POP_LOGGER.info("Extracted cid: {} from retry topic: {}", cid, topic); - //然后调用包含cid的notifyMessageArriving long interval = brokerController.getBrokerConfig().getPopLongPollingForceNotifyInterval(); boolean force = interval > 0L && offset % interval == 0L; if (queueId >= 0) { @@ -179,10 +176,10 @@ public void notifyMessageArrivingWithRetryTopic(final String topic, final int qu } notifyMessageArriving(originTopic, queueId, cid, force, tagsCode, msgStoreTime, filterBitMap, properties); } else { - //普通消息(非重试消息)还是走之前的逻辑不变 notifyMessageArriving(topic, queueId, offset, tagsCode, msgStoreTime, filterBitMap, properties); } } + public void notifyMessageArriving(final String topic, final int queueId, long offset, Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map properties) { ConcurrentHashMap cids = topicCidMap.get(topic);