|
50 | 50 | import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
|
51 | 51 | import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
|
52 | 52 | import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
|
| 53 | +import software.amazon.kinesis.metrics.MetricsConfig; |
| 54 | +import software.amazon.kinesis.metrics.MetricsLevel; |
53 | 55 | import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
|
54 | 56 | import software.amazon.kinesis.processor.MultiStreamTracker;
|
55 | 57 | import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
|
@@ -145,6 +147,8 @@ public class KclMessageDrivenChannelAdapter extends MessageProducerSupport
|
145 | 147 |
|
146 | 148 | private volatile Scheduler scheduler;
|
147 | 149 |
|
| 150 | + private MetricsLevel metricsLevel = MetricsLevel.DETAILED; |
| 151 | + |
148 | 152 | public KclMessageDrivenChannelAdapter(String... streams) {
|
149 | 153 | this(KinesisAsyncClient.create(), CloudWatchAsyncClient.create(), DynamoDbAsyncClient.create(), streams);
|
150 | 154 | }
|
@@ -267,6 +271,15 @@ public void setFanOut(boolean fanOut) {
|
267 | 271 | this.fanOut = fanOut;
|
268 | 272 | }
|
269 | 273 |
|
| 274 | + /** |
| 275 | + * Specify a metrics level to emit (DETAILED; default). |
| 276 | + * @param metricsLevel {@link MetricsLevel} DETAILED for emitting all metrics. |
| 277 | + * @author Minkyu Moon |
| 278 | + */ |
| 279 | + public void setMetricsLevel(MetricsLevel metricsLevel) { |
| 280 | + this.metricsLevel = metricsLevel; |
| 281 | + } |
| 282 | + |
270 | 283 | @Override
|
271 | 284 | protected void onInit() {
|
272 | 285 | super.onInit();
|
@@ -321,13 +334,18 @@ protected void doStart() {
|
321 | 334 | .glueSchemaRegistryDeserializer(this.glueSchemaRegistryDeserializer)
|
322 | 335 | .retrievalSpecificConfig(retrievalSpecificConfig);
|
323 | 336 |
|
| 337 | + MetricsConfig metricsConfig = this.config.metricsConfig(); |
| 338 | + if(this.metricsLevel != null) { |
| 339 | + metricsConfig.metricsLevel(this.metricsLevel); |
| 340 | + } |
| 341 | + |
324 | 342 | this.scheduler =
|
325 | 343 | new Scheduler(
|
326 | 344 | this.config.checkpointConfig(),
|
327 | 345 | this.config.coordinatorConfig(),
|
328 | 346 | this.config.leaseManagementConfig(),
|
329 | 347 | lifecycleConfig,
|
330 |
| - this.config.metricsConfig(), |
| 348 | + metricsConfig, |
331 | 349 | this.config.processorConfig(),
|
332 | 350 | retrievalConfig);
|
333 | 351 |
|
|
0 commit comments