1818 */ 
1919package  org .apache .samza .zk ;
2020
21+ import  java .util .ArrayList ;
22+ import  java .util .Collections ;
23+ import  java .util .List ;
2124import  org .apache .samza .config .ApplicationConfig ;
2225import  org .apache .samza .config .Config ;
2326import  org .apache .samza .config .ConfigException ;
27+ import  org .apache .samza .config .JobConfig ;
2428import  org .apache .samza .config .ZkConfig ;
2529import  org .apache .samza .coordinator .JobCoordinator ;
2630import  org .apache .samza .coordinator .JobCoordinatorListener ;
3438import  org .slf4j .Logger ;
3539import  org .slf4j .LoggerFactory ;
3640
37- import  java .util .ArrayList ;
38- import  java .util .Collections ;
39- import  java .util .List ;
40- 
4141/** 
4242 * JobCoordinator for stand alone processor managed via Zookeeper. 
4343 */ 
@@ -47,6 +47,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
4747  // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197 
4848  private  static  final  int  METADATA_CACHE_TTL_MS  = 5000 ;
4949
50+ 
5051  private  final  ZkUtils  zkUtils ;
5152  private  final  String  processorId ;
5253  private  final  ZkController  zkController ;
@@ -59,6 +60,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
5960  private  JobCoordinatorListener  coordinatorListener  = null ;
6061  private  JobModel  newJobModel ;
6162
63+   private  int  debounceTimeMs ;
64+ 
6265  public  ZkJobCoordinator (Config  config ) {
6366    this .config  = config ;
6467    ZkConfig  zkConfig  = new  ZkConfig (config );
@@ -79,11 +82,14 @@ public ZkJobCoordinator(Config config) {
7982        keyBuilder .getJobModelVersionBarrierPrefix (),
8083        zkUtils ,
8184        new  ZkBarrierListenerImpl ());
85+     this .debounceTimeMs  = new  JobConfig (config ).getDebounceTimeMs ();
86+ 
8287  }
8388
8489  @ Override 
8590  public  void  start () {
8691    streamMetadataCache  = StreamMetadataCache .apply (METADATA_CACHE_TTL_MS , config );
92+ 
8793    debounceTimer  = new  ScheduleAfterDebounceTime (throwable  -> {
8894        LOG .error ("Received exception from in JobCoordinator Processing!" , throwable );
8995        stop ();
@@ -126,7 +132,7 @@ public String getProcessorId() {
126132  public  void  onProcessorChange (List <String > processors ) {
127133    LOG .info ("ZkJobCoordinator::onProcessorChange - list of processors changed! List size="  + processors .size ());
128134    debounceTimer .scheduleAfterDebounceTime (ScheduleAfterDebounceTime .ON_PROCESSOR_CHANGE ,
129-         ScheduleAfterDebounceTime . DEBOUNCE_TIME_MS , () -> doOnProcessorChange (processors ));
135+         debounceTimeMs , () -> doOnProcessorChange (processors ));
130136  }
131137
132138  void  doOnProcessorChange (List <String > processors ) {
@@ -232,8 +238,8 @@ public void onBecomingLeader() {
232238      zkController .subscribeToProcessorChange ();
233239      debounceTimer .scheduleAfterDebounceTime (
234240        ScheduleAfterDebounceTime .ON_PROCESSOR_CHANGE ,
235-         ScheduleAfterDebounceTime . DEBOUNCE_TIME_MS , () -> {
236-           // actual actions to do are the same as onProcessorChange()  
241+         debounceTimeMs , () -> {
242+           // actual actions to do are the same as onProcessorChange 
237243          doOnProcessorChange (new  ArrayList <>());
238244        });
239245    }
0 commit comments