Skip to content

Commit 0dc9dd2

Browse files
prateekmjagadish-v0
authored andcommitted
Minor fix to some config variable names and accessor methods.
Author: Prateek Maheshwari <[email protected]> Reviewers: Jagadish<[email protected]> Closes apache#840 from prateekm/fix-config-names
1 parent 8a22e05 commit 0dc9dd2

File tree

7 files changed

+13
-20
lines changed

7 files changed

+13
-20
lines changed

samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ object TaskConfig {
3636
val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // task.lifecycle.listener.li-generator.class
3737
val CHECKPOINT_MANAGER_FACTORY = TaskConfigJava.CHECKPOINT_MANAGER_FACTORY // class name to use when sending offset checkpoints
3838
val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
39-
val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
40-
val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
41-
val DROP_PRODUCER_ERROR = "task.drop.producer.errors" // whether to ignore producer errors and drop the messages that failed to send
39+
val DROP_DESERIALIZATION_ERRORS = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails
40+
val DROP_SERIALIZATION_ERRORS = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails
41+
val DROP_PRODUCER_ERRORS = "task.drop.producer.errors" // whether to ignore producer errors and drop the messages that failed to send
4242
val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore in process and window
4343
val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task grouper
4444
val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent process for a AsyncStreamTask
@@ -115,11 +115,11 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
115115

116116
def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME)
117117

118-
def getDropDeserialization = getOption(TaskConfig.DROP_DESERIALIZATION_ERROR)
118+
def getDropDeserializationErrors = getBoolean(TaskConfig.DROP_DESERIALIZATION_ERRORS, false)
119119

120-
def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR)
120+
def getDropSerializationErrors = getBoolean(TaskConfig.DROP_SERIALIZATION_ERRORS, false)
121121

122-
def getDropProducerError = getBoolean(TaskConfig.DROP_PRODUCER_ERROR, false)
122+
def getDropProducerErrors = getBoolean(TaskConfig.DROP_PRODUCER_ERRORS, false)
123123

124124
def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS)
125125

samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -425,15 +425,8 @@ object SamzaContainer extends Logging {
425425
val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics)
426426
info("Got offset manager: %s" format offsetManager)
427427

428-
val dropDeserializationError = config.getDropDeserialization match {
429-
case Some(dropError) => dropError.toBoolean
430-
case _ => false
431-
}
432-
433-
val dropSerializationError = config.getDropSerialization match {
434-
case Some(dropError) => dropError.toBoolean
435-
case _ => false
436-
}
428+
val dropDeserializationError = config.getDropDeserializationErrors
429+
val dropSerializationError = config.getDropSerializationErrors
437430

438431
val pollIntervalMs = config
439432
.getPollIntervalMs

samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
8383
new ExponentialSleepStrategy(initialDelayMs = producerConfig.reconnectIntervalMs),
8484
getProducer,
8585
metrics,
86-
dropProducerExceptions = config.getDropProducerError)
86+
dropProducerExceptions = config.getDropProducerErrors)
8787
}
8888

8989
def getAdmin(systemName: String, config: Config): SystemAdmin = {

samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
101101
new ExponentialSleepStrategy(initialDelayMs = producerConfig.reconnectIntervalMs),
102102
getProducer,
103103
metrics,
104-
dropProducerExceptions = config.getDropProducerError)
104+
dropProducerExceptions = config.getDropProducerErrors)
105105
}
106106

107107
def getAdmin(systemName: String, config: Config): SystemAdmin = {

samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ protected Config getConfig() {
279279
throw new SamzaException("can not read the config", e);
280280
}
281281
// Make system producer drop producer errors for StreamAppender
282-
config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERROR(), "true"));
282+
config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS(), "true"));
283283

284284
return config;
285285
}

samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ protected Config getConfig() {
300300
throw new SamzaException("can not read the config", e);
301301
}
302302
// Make system producer drop producer errors for StreamAppender
303-
config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERROR(), "true"));
303+
config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS(), "true"));
304304

305305
return config;
306306
}

samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ private Map<String, String> buildStreamApplicationConfigMap(String systemName, S
206206
.put(JobConfig.JOB_NAME(), appName)
207207
.put(JobConfig.JOB_ID(), appId)
208208
.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
209-
.put(TaskConfig.DROP_PRODUCER_ERROR(), "true")
209+
.put(TaskConfig.DROP_PRODUCER_ERRORS(), "true")
210210
.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
211211
.put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000")
212212
.build();

0 commit comments

Comments
 (0)