Skip to content

Commit a706add

Browse files
Daniel Nishimurajagadish-v0
authored andcommitted
SAMZA-1742: Add metrics reporter parameter to LocalApplicationRunner constructor.
vjagadish1989 this has already been reviewed and approved by you and cameronlee314 internally. Please approve here. Thanks! Author: Daniel Nishimura <[email protected]> Reviewers: Jagadish <[email protected]> Closes apache#550 from dnishimura/samza-1742-localapplicationrunner-custom-metrics
1 parent f249e71 commit a706add

File tree

1 file changed

+11
-3
lines changed

1 file changed

+11
-3
lines changed

samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.time.Duration;
2424
import java.util.HashMap;
2525
import java.util.List;
26+
import java.util.Map;
2627
import java.util.Set;
2728
import java.util.UUID;
2829
import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +43,7 @@
4243
import org.apache.samza.coordinator.DistributedLockWithState;
4344
import org.apache.samza.execution.ExecutionPlan;
4445
import org.apache.samza.job.ApplicationStatus;
46+
import org.apache.samza.metrics.MetricsReporter;
4547
import org.apache.samza.operators.StreamGraphSpec;
4648
import org.apache.samza.processor.StreamProcessor;
4749
import org.apache.samza.processor.StreamProcessorLifecycleListener;
@@ -65,6 +67,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
6567
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
6668
private final AtomicInteger numProcessorsToStart = new AtomicInteger();
6769
private final AtomicReference<Throwable> failure = new AtomicReference<>();
70+
private final Map<String, MetricsReporter> customMetricsReporters;
6871

6972
private ApplicationStatus appStatus = ApplicationStatus.New;
7073

@@ -124,8 +127,13 @@ private void shutdownAndNotify() {
124127
}
125128

126129
public LocalApplicationRunner(Config config) {
130+
this(config, new HashMap<>());
131+
}
132+
133+
public LocalApplicationRunner(Config config, Map<String, MetricsReporter> customMetricsReporters) {
127134
super(config);
128-
uid = UUID.randomUUID().toString();
135+
this.uid = UUID.randomUUID().toString();
136+
this.customMetricsReporters = customMetricsReporters;
129137
}
130138

131139
@Override
@@ -313,10 +321,10 @@ StreamProcessor createStreamProcessor(
313321
private StreamProcessor getStreamProcessorInstance(Config config, Object taskFactory, StreamProcessorLifecycleListener listener) {
314322
if (taskFactory instanceof StreamTaskFactory) {
315323
return new StreamProcessor(
316-
config, new HashMap<>(), (StreamTaskFactory) taskFactory, listener);
324+
config, customMetricsReporters, (StreamTaskFactory) taskFactory, listener);
317325
} else if (taskFactory instanceof AsyncStreamTaskFactory) {
318326
return new StreamProcessor(
319-
config, new HashMap<>(), (AsyncStreamTaskFactory) taskFactory, listener);
327+
config, customMetricsReporters, (AsyncStreamTaskFactory) taskFactory, listener);
320328
} else {
321329
throw new SamzaException(String.format("%s is not a valid task factory",
322330
taskFactory.getClass().getCanonicalName()));

0 commit comments

Comments
 (0)