Skip to content

Commit cf12660

Browse files
authored
GH-241: Add metrics-level option to KclMessageDrivenChannelAdapter
Fixes: #241 * Add test that `metricsLevel` is set correctly
1 parent a82a0b6 commit cf12660

File tree

2 files changed

+38
-3
lines changed

2 files changed

+38
-3
lines changed

src/main/java/org/springframework/integration/aws/inbound/kinesis/KclMessageDrivenChannelAdapter.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,6 +50,8 @@
5050
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
5151
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
5252
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
53+
import software.amazon.kinesis.metrics.MetricsConfig;
54+
import software.amazon.kinesis.metrics.MetricsLevel;
5355
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
5456
import software.amazon.kinesis.processor.MultiStreamTracker;
5557
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
@@ -92,6 +94,7 @@
9294
* @author Artem Bilan
9395
* @author Dirk Bonhomme
9496
* @author Siddharth Jain
97+
* @author Minkyu Moon
9598
*
9699
* @since 2.2.0
97100
*/
@@ -145,6 +148,8 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport
145148

146149
private volatile Scheduler scheduler;
147150

151+
private MetricsLevel metricsLevel = MetricsLevel.DETAILED;
152+
148153
public KclMessageDrivenChannelAdapter(String... streams) {
149154
this(KinesisAsyncClient.create(), CloudWatchAsyncClient.create(), DynamoDbAsyncClient.create(), streams);
150155
}
@@ -267,6 +272,16 @@ public void setFanOut(boolean fanOut) {
267272
this.fanOut = fanOut;
268273
}
269274

275+
/**
276+
* Specify a metrics level to emit.
277+
* Defaults to {@link MetricsLevel#DETAILED}.
278+
* @param metricsLevel the {@link MetricsLevel} for emitting (or not) metrics into Cloud Watch.
279+
*/
280+
public void setMetricsLevel(MetricsLevel metricsLevel) {
281+
Assert.notNull(metricsLevel, "'metricsLevel' must not be null");
282+
this.metricsLevel = metricsLevel;
283+
}
284+
270285
@Override
271286
protected void onInit() {
272287
super.onInit();
@@ -321,13 +336,16 @@ protected void doStart() {
321336
.glueSchemaRegistryDeserializer(this.glueSchemaRegistryDeserializer)
322337
.retrievalSpecificConfig(retrievalSpecificConfig);
323338

339+
MetricsConfig metricsConfig = this.config.metricsConfig();
340+
metricsConfig.metricsLevel(this.metricsLevel);
341+
324342
this.scheduler =
325343
new Scheduler(
326344
this.config.checkpointConfig(),
327345
this.config.coordinatorConfig(),
328346
this.config.leaseManagementConfig(),
329347
lifecycleConfig,
330-
this.config.metricsConfig(),
348+
metricsConfig,
331349
this.config.processorConfig(),
332350
retrievalConfig);
333351

src/test/java/org/springframework/integration/aws/kinesis/KclMessageDrivenChannelAdapterTests.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
import software.amazon.awssdk.services.kinesis.model.Consumer;
2929
import software.amazon.kinesis.common.InitialPositionInStream;
3030
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
31+
import software.amazon.kinesis.metrics.MetricsLevel;
3132

3233
import org.springframework.beans.factory.annotation.Autowired;
3334
import org.springframework.context.annotation.Bean;
@@ -38,6 +39,7 @@
3839
import org.springframework.integration.aws.support.AwsHeaders;
3940
import org.springframework.integration.channel.QueueChannel;
4041
import org.springframework.integration.config.EnableIntegration;
42+
import org.springframework.integration.test.util.TestUtils;
4143
import org.springframework.messaging.Message;
4244
import org.springframework.messaging.PollableChannel;
4345
import org.springframework.test.annotation.DirtiesContext;
@@ -48,6 +50,7 @@
4850
/**
4951
* @author Artem Bilan
5052
* @author Siddharth Jain
53+
* @author Minkyu Moon
5154
*
5255
* @since 3.0
5356
*/
@@ -66,6 +69,9 @@ public class KclMessageDrivenChannelAdapterTests implements LocalstackContainerT
6669
@Autowired
6770
private PollableChannel kinesisReceiveChannel;
6871

72+
@Autowired
73+
private KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter;
74+
6975
@BeforeAll
7076
static void setup() {
7177
AMAZON_KINESIS = LocalstackContainerTest.kinesisClient();
@@ -116,6 +122,16 @@ void kclChannelAdapterReceivesRecords() {
116122
assertThat(streamConsumers).hasSize(0);
117123
}
118124

125+
@Test
126+
public void metricsLevelOfMetricsFactoryShouldBeSetToMetricsLevelOfAdapter() {
127+
MetricsLevel metricsLevel = TestUtils.getPropertyValue(
128+
this.kclMessageDrivenChannelAdapter,
129+
"scheduler.metricsFactory.metricsLevel",
130+
MetricsLevel.class
131+
);
132+
assertThat(metricsLevel).isEqualTo(MetricsLevel.NONE);
133+
}
134+
119135
@Configuration
120136
@EnableIntegration
121137
public static class TestConfiguration {
@@ -130,6 +146,7 @@ public KclMessageDrivenChannelAdapter kclMessageDrivenChannelAdapter() {
130146
adapter.setConverter(String::new);
131147
adapter.setConsumerGroup("single_stream_group");
132148
adapter.setFanOut(false);
149+
adapter.setMetricsLevel(MetricsLevel.NONE);
133150
adapter.setBindSourceRecord(true);
134151
return adapter;
135152
}

0 commit comments

Comments
 (0)