Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ public class RocketMQOptions {
public static final ConfigOption<Long> OPTIONAL_PARTITION_DISCOVERY_INTERVAL_MS =
ConfigOptions.key("partitionDiscoveryIntervalMs").longType().defaultValue(30000L);

public static final ConfigOption<Long> OPTIONAL_CONSUMER_POLL_MS =
ConfigOptions.key("consumer.timeout").longType().defaultValue(3000L);

public static final ConfigOption<Boolean> OPTIONAL_USE_NEW_API =
ConfigOptions.key("useNewApi").booleanType().defaultValue(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

Expand Down Expand Up @@ -59,8 +59,12 @@ public class RocketMQConfig {
public static final int DEFAULT_PRODUCER_RETRY_TIMES = 3;

public static final String PRODUCER_TIMEOUT = "producer.timeout";

public static final String CONSUMER_TIMEOUT = "consumer.timeout";
public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds

public static final int DEFAULT_CONSUMER_TIMEOUT = 3000; // 3 seconds

// Consumer related config
public static final String CONSUMER_GROUP = "consumer.group"; // Required
public static final String CONSUMER_TOPIC = "consumer.topic"; // Required
Expand Down Expand Up @@ -142,9 +146,9 @@ public static void buildProducerConfigs(Properties props, DefaultMQProducer prod
* Build Consumer Configs.
*
* @param props Properties
* @param consumer DefaultMQPullConsumer
* @param consumer DefaultLitePullConsumer
*/
public static void buildConsumerConfigs(Properties props, DefaultMQPullConsumer consumer) {
public static void buildConsumerConfigs(Properties props, DefaultLitePullConsumer consumer) {
buildCommonConfigs(props, consumer);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setPersistConsumerOffsetInterval(
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class RocketMQSource<OUT>

private final String consumerOffsetMode;
private final long consumerOffsetTimestamp;
private final long pollTime;
private final String topic;
private final String consumerGroup;
private final String nameServerAddress;
Expand All @@ -79,6 +80,7 @@ public class RocketMQSource<OUT>
private final RocketMQDeserializationSchema<OUT> deserializationSchema;

public RocketMQSource(
long pollTime,
String topic,
String consumerGroup,
String nameServerAddress,
Expand All @@ -97,6 +99,7 @@ public RocketMQSource(
Validate.isTrue(
!(StringUtils.isNotEmpty(tag) && StringUtils.isNotEmpty(sql)),
"Consumer tag and sql can not set value at the same time");
this.pollTime = pollTime;
this.topic = topic;
this.consumerGroup = consumerGroup;
this.nameServerAddress = nameServerAddress;
Expand Down Expand Up @@ -140,6 +143,7 @@ public UserCodeClassLoader getUserCodeClassLoader() {
Supplier<SplitReader<Tuple3<OUT, Long, Long>, RocketMQPartitionSplit>> splitReaderSupplier =
() ->
new RocketMQPartitionSplitReader<>(
pollTime,
topic,
consumerGroup,
nameServerAddress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
Expand Down Expand Up @@ -100,7 +100,8 @@ public class RocketMQSourceEnumerator
private final Map<Integer, Set<RocketMQPartitionSplit>> pendingPartitionSplitAssignment;

// Lazily instantiated or mutable fields.
private DefaultMQPullConsumer consumer;
private DefaultLitePullConsumer consumer;

private boolean noMoreNewPartitionSplits = false;

public RocketMQSourceEnumerator(
Expand Down Expand Up @@ -233,7 +234,8 @@ private Set<RocketMQPartitionSplit> discoverAndInitializePartitionSplit()
Set<Tuple3<String, String, Integer>> newPartitions = new HashSet<>();
Set<Tuple3<String, String, Integer>> removedPartitions =
new HashSet<>(Collections.unmodifiableSet(discoveredPartitions));
Set<MessageQueue> messageQueues = consumer.fetchSubscribeMessageQueues(topic);

Collection<MessageQueue> messageQueues = consumer.fetchMessageQueues(topic);
Set<RocketMQPartitionSplit> result = new HashSet<>();
for (MessageQueue messageQueue : messageQueues) {
Tuple3<String, String, Integer> topicPartition =
Expand Down Expand Up @@ -337,16 +339,16 @@ private long getOffsetByMessageQueue(MessageQueue mq) throws MQClientException {
} else {
switch (consumerOffsetMode) {
case CONSUMER_OFFSET_EARLIEST:
offset = consumer.minOffset(mq);
break;
consumer.seekToBegin(mq);
return -1;
case CONSUMER_OFFSET_LATEST:
offset = consumer.maxOffset(mq);
break;
consumer.seekToEnd(mq);
return -1;
case CONSUMER_OFFSET_TIMESTAMP:
offset = consumer.searchOffset(mq, consumerOffsetTimestamp);
offset = consumer.offsetForTimestamp(mq, consumerOffsetTimestamp);
break;
default:
offset = consumer.fetchConsumeOffset(mq, false);
offset = consumer.committed(mq);
if (offset < 0) {
throw new IllegalArgumentException(
"Unknown value for CONSUMER_OFFSET_RESET_TO.");
Expand All @@ -364,11 +366,10 @@ private void initialRocketMQConsumer() {
&& !StringUtils.isNullOrWhitespaceOnly(secretKey)) {
AclClientRPCHook aclClientRPCHook =
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
consumer = new DefaultMQPullConsumer(consumerGroup, aclClientRPCHook);
consumer = new DefaultLitePullConsumer(consumerGroup, aclClientRPCHook);
} else {
consumer = new DefaultMQPullConsumer(consumerGroup);
consumer = new DefaultLitePullConsumer(consumerGroup);
}

consumer.setNamesrvAddr(nameServerAddress);
consumer.setInstanceName(
String.join(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.flink.source.reader.deserializer.RocketMQDeserializationSchema;
import org.apache.rocketmq.flink.source.split.RocketMQPartitionSplit;
import org.apache.rocketmq.remoting.exception.RemotingException;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
Expand Down Expand Up @@ -57,8 +54,6 @@
import java.util.Map;
import java.util.Set;

import static org.apache.rocketmq.client.consumer.PullStatus.FOUND;

/**
* A {@link SplitReader} implementation that reads records from RocketMQ partitions.
*
Expand All @@ -75,6 +70,8 @@ public class RocketMQPartitionSplitReader<T>
private final long startTime;
private final long startOffset;

private final long pollTime;

private final String accessKey;
private final String secretKey;

Expand All @@ -83,13 +80,14 @@ public class RocketMQPartitionSplitReader<T>
private final Map<Tuple3<String, String, Integer>, Long> stoppingTimestamps;
private final SimpleCollector<T> collector;

private DefaultMQPullConsumer consumer;
private DefaultLitePullConsumer consumer;

private volatile boolean wakeup = false;

private static final int MAX_MESSAGE_NUMBER_PER_BLOCK = 64;

public RocketMQPartitionSplitReader(
long pollTime,
String topic,
String consumerGroup,
String nameServerAddress,
Expand All @@ -101,6 +99,7 @@ public RocketMQPartitionSplitReader(
long startTime,
long startOffset,
RocketMQDeserializationSchema<T> deserializationSchema) {
this.pollTime = pollTime;
this.topic = topic;
this.tag = tag;
this.sql = sql;
Expand All @@ -120,9 +119,9 @@ public RocketMQPartitionSplitReader(
public RecordsWithSplitIds<Tuple3<T, Long, Long>> fetch() throws IOException {
RocketMQPartitionSplitRecords<Tuple3<T, Long, Long>> recordsBySplits =
new RocketMQPartitionSplitRecords<>();
Set<MessageQueue> messageQueues;
Collection<MessageQueue> messageQueues;
try {
messageQueues = consumer.fetchSubscribeMessageQueues(topic);
messageQueues = consumer.fetchMessageQueues(topic);
} catch (MQClientException e) {
LOG.error(
String.format(
Expand All @@ -144,7 +143,7 @@ public RecordsWithSplitIds<Tuple3<T, Long, Long>> fetch() throws IOException {
try {
messageOffset =
startTime > 0
? consumer.searchOffset(messageQueue, startTime)
? consumer.offsetForTimestamp(messageQueue, startTime)
: startOffset;
} catch (MQClientException e) {
LOG.warn(
Expand All @@ -157,7 +156,7 @@ public RecordsWithSplitIds<Tuple3<T, Long, Long>> fetch() throws IOException {
}
messageOffset = messageOffset > -1 ? messageOffset : 0;
}
PullResult pullResult = null;
List<MessageExt> messageExts = null;
try {
if (wakeup) {
LOG.info(
Expand All @@ -173,25 +172,11 @@ public RecordsWithSplitIds<Tuple3<T, Long, Long>> fetch() throws IOException {
recordsBySplits.prepareForRead();
return recordsBySplits;
}
if (StringUtils.isNotEmpty(sql)) {
pullResult =
consumer.pull(
messageQueue,
MessageSelector.bySql(sql),
messageOffset,
MAX_MESSAGE_NUMBER_PER_BLOCK);
} else {
pullResult =
consumer.pull(
messageQueue,
tag,
messageOffset,
MAX_MESSAGE_NUMBER_PER_BLOCK);
}
} catch (MQClientException
| RemotingException
| MQBrokerException
| InterruptedException e) {

consumer.setPullBatchSize(MAX_MESSAGE_NUMBER_PER_BLOCK);
consumer.seek(messageQueue, messageOffset);
messageExts = consumer.poll(pollTime);
} catch (MQClientException e) {
LOG.warn(
String.format(
"Pull RocketMQ messages of topic[%s] broker[%s] queue[%d] tag[%s] sql[%s] from offset[%d] exception.",
Expand All @@ -203,18 +188,31 @@ public RecordsWithSplitIds<Tuple3<T, Long, Long>> fetch() throws IOException {
messageOffset),
e);
}
startingOffsets.put(
topicPartition,
pullResult == null ? messageOffset : pullResult.getNextBeginOffset());
if (pullResult != null && pullResult.getPullStatus() == FOUND) {
try {
startingOffsets.put(
topicPartition,
messageExts == null ? messageOffset : consumer.committed(messageQueue));
} catch (MQClientException e) {
LOG.warn(
String.format(
"Pull RocketMQ messages of topic[%s] broker[%s] queue[%d] tag[%s] sql[%s] from offset[%d] exception.",
messageQueue.getTopic(),
messageQueue.getBrokerName(),
messageQueue.getQueueId(),
tag,
sql,
messageOffset),
e);
}
if (messageExts != null) {
Collection<Tuple3<T, Long, Long>> recordsForSplit =
recordsBySplits.recordsForSplit(
messageQueue.getTopic()
+ "-"
+ messageQueue.getBrokerName()
+ "-"
+ messageQueue.getQueueId());
for (MessageExt messageExt : pullResult.getMsgFoundList()) {
for (MessageExt messageExt : messageExts) {
long stoppingTimestamp = getStoppingTimestamp(topicPartition);
long storeTimestamp = messageExt.getStoreTimestamp();
if (storeTimestamp > stoppingTimestamp) {
Expand Down Expand Up @@ -320,9 +318,9 @@ private void initialRocketMQConsumer(
if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) {
AclClientRPCHook aclClientRPCHook =
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
consumer = new DefaultMQPullConsumer(consumerGroup, aclClientRPCHook);
consumer = new DefaultLitePullConsumer(consumerGroup, aclClientRPCHook);
} else {
consumer = new DefaultMQPullConsumer(consumerGroup);
consumer = new DefaultLitePullConsumer(consumerGroup);
}
consumer.setNamesrvAddr(nameServerAddress);
consumer.setInstanceName(
Expand All @@ -333,6 +331,11 @@ private void initialRocketMQConsumer(
consumerGroup,
"" + System.nanoTime()));
consumer.start();
if (StringUtils.isNotEmpty(sql)) {
consumer.subscribe(topic, MessageSelector.bySql(sql));
} else {
consumer.subscribe(topic, tag);
}
} catch (MQClientException e) {
LOG.error("Failed to initial RocketMQ consumer.", e);
consumer.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.apache.rocketmq.flink.common.RocketMQOptions.NAME_SERVER_ADDRESS;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ACCESS_KEY;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_COLUMN_ERROR_DEBUG;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_CONSUMER_POLL_MS;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_ENCODING;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_END_TIME;
import static org.apache.rocketmq.flink.common.RocketMQOptions.OPTIONAL_FIELD_DELIMITER;
Expand Down Expand Up @@ -104,6 +105,7 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(OPTIONAL_ACCESS_KEY);
optionalOptions.add(OPTIONAL_SECRET_KEY);
optionalOptions.add(OPTIONAL_SCAN_STARTUP_MODE);
optionalOptions.add(OPTIONAL_CONSUMER_POLL_MS);
return optionalOptions;
}

Expand Down Expand Up @@ -182,6 +184,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
configuration.getLong(
RocketMQOptions.OPTIONAL_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis());
return new RocketMQScanTableSource(
configuration.getLong(OPTIONAL_CONSUMER_POLL_MS),
descriptorProperties,
physicalSchema,
topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ public class RocketMQScanTableSource implements ScanTableSource, SupportsReading
private final long startMessageOffset;
private final long startTime;
private final boolean useNewApi;
private final long pollTime;

private List<String> metadataKeys;

public RocketMQScanTableSource(
long pollTime,
DescriptorProperties properties,
TableSchema schema,
String topic,
Expand All @@ -93,6 +95,7 @@ public RocketMQScanTableSource(
String consumerOffsetMode,
long consumerOffsetTimestamp,
boolean useNewApi) {
this.pollTime = pollTime;
this.properties = properties;
this.schema = schema;
this.topic = topic;
Expand Down Expand Up @@ -122,6 +125,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
if (useNewApi) {
return SourceProvider.of(
new RocketMQSource<>(
pollTime,
topic,
consumerGroup,
nameServerAddress,
Expand Down Expand Up @@ -162,6 +166,7 @@ public void applyReadableMetadata(List<String> metadataKeys, DataType producedDa
public DynamicTableSource copy() {
RocketMQScanTableSource tableSource =
new RocketMQScanTableSource(
pollTime,
properties,
schema,
topic,
Expand Down
Loading