|
1 | 1 | /*
|
2 |
| - * Copyright 2019-2023 the original author or authors. |
| 2 | + * Copyright 2019-2024 the original author or authors. |
3 | 3 | *
|
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 5 | * you may not use this file except in compliance with the License.
|
|
50 | 50 | import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
51 | 51 | import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
|
52 | 52 | import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
|
| 53 | +import software.amazon.kinesis.metrics.MetricsConfig; |
| 54 | +import software.amazon.kinesis.metrics.MetricsLevel; |
53 | 55 | import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
|
54 | 56 | import software.amazon.kinesis.processor.MultiStreamTracker;
|
55 | 57 | import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
|
|
92 | 94 | * @author Artem Bilan
|
93 | 95 | * @author Dirk Bonhomme
|
94 | 96 | * @author Siddharth Jain
|
| 97 | + * @author Minkyu Moon |
95 | 98 | *
|
96 | 99 | * @since 2.2.0
|
97 | 100 | */
|
@@ -145,6 +148,8 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport
|
145 | 148 |
|
146 | 149 | private volatile Scheduler scheduler;
|
147 | 150 |
|
| 151 | + private MetricsLevel metricsLevel = MetricsLevel.DETAILED; |
| 152 | + |
148 | 153 | public KclMessageDrivenChannelAdapter(String... streams) {
|
149 | 154 | this(KinesisAsyncClient.create(), CloudWatchAsyncClient.create(), DynamoDbAsyncClient.create(), streams);
|
150 | 155 | }
|
@@ -267,6 +272,16 @@ public void setFanOut(boolean fanOut) {
|
267 | 272 | this.fanOut = fanOut;
|
268 | 273 | }
|
269 | 274 |
|
| 275 | + /** |
| 276 | + * Specify a metrics level to emit. |
| 277 | + * Defaults to {@link MetricsLevel#DETAILED}. |
| 278 | + * @param metricsLevel the {@link MetricsLevel} for emitting (or not) metrics into Cloud Watch. |
| 279 | + */ |
| 280 | + public void setMetricsLevel(MetricsLevel metricsLevel) { |
| 281 | + Assert.notNull(metricsLevel, "'metricsLevel' must not be null"); |
| 282 | + this.metricsLevel = metricsLevel; |
| 283 | + } |
| 284 | + |
270 | 285 | @Override
|
271 | 286 | protected void onInit() {
|
272 | 287 | super.onInit();
|
@@ -321,13 +336,16 @@ protected void doStart() {
|
321 | 336 | .glueSchemaRegistryDeserializer(this.glueSchemaRegistryDeserializer)
|
322 | 337 | .retrievalSpecificConfig(retrievalSpecificConfig);
|
323 | 338 |
|
| 339 | + MetricsConfig metricsConfig = this.config.metricsConfig(); |
| 340 | + metricsConfig.metricsLevel(this.metricsLevel); |
| 341 | + |
324 | 342 | this.scheduler =
|
325 | 343 | new Scheduler(
|
326 | 344 | this.config.checkpointConfig(),
|
327 | 345 | this.config.coordinatorConfig(),
|
328 | 346 | this.config.leaseManagementConfig(),
|
329 | 347 | lifecycleConfig,
|
330 |
| - this.config.metricsConfig(), |
| 348 | + metricsConfig, |
331 | 349 | this.config.processorConfig(),
|
332 | 350 | retrievalConfig);
|
333 | 351 |
|
|
0 commit comments