Skip to content

Commit f6e0118

Browse files
authored
Merge pull request apache#1043 from lhaiesp/master
SAMZA-2202: Create log compact topic with larger message size
2 parents 933ce10 + c78dbf8 commit f6e0118

File tree

4 files changed

+109
-10
lines changed

4 files changed

+109
-10
lines changed

docs/learn/documentation/versioned/jobs/configuration-table.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,15 @@ <h1>Samza Configuration Reference</h1>
227227
</td>
228228
</tr>
229229

230+
<tr>
231+
<td class="property" id="job-coordinator-max-message-bytes">job.coordinator.<br>max.message.bytes</td>
232+
<td class="default">1000012</td>
233+
<td class="description">
234+
If you are using Kafka for coordinator stream, this sets the largest record size for the checkpoint
235+
topic.
236+
</td>
237+
</tr>
238+
230239
<tr>
231240
<td class="property" id="job-coordinator-monitor-partition-change">job.coordinator.<br />monitor-partition-change</td>
232241
<td class="default">false</td>
@@ -1541,6 +1550,15 @@ <h1>Samza Configuration Reference</h1>
15411550
</td>
15421551
</tr>
15431552

1553+
<tr>
1554+
<td class="property" id="task-checkpoint-max-message-bytes">task.checkpoint.<br>max.message.bytes</td>
1555+
<td class="default">1000012</td>
1556+
<td class="description">
1557+
If you are using Kafka for checkpoints, this sets the largest record size for the checkpoint
1558+
topic.
1559+
</td>
1560+
</tr>
1561+
15441562
<tr>
15451563
<td class="property" id="store-changelog-replication-factor">stores.<span class="store">store-name</span>.changelog.<br>replication.factor</td>
15461564
<td class="default">stores.default.changelog.replication.factor</td>

samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,24 +39,30 @@ object KafkaConfig {
3939
val TOPIC_REPLICATION_FACTOR = "replication.factor"
4040
val TOPIC_DEFAULT_REPLICATION_FACTOR = "2"
4141

42-
4342
val SEGMENT_BYTES = "segment.bytes"
43+
val MAX_MESSAGE_BYTES = "max.message.bytes"
44+
45+
// The default max message bytes for log compact topic
46+
val DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES = "1000012"
4447

4548
val CHECKPOINT_SYSTEM = "task.checkpoint.system"
4649
val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint." + TOPIC_REPLICATION_FACTOR
4750
val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint." + SEGMENT_BYTES
51+
val CHECKPOINT_MAX_MESSAGE_BYTES = "task.checkpoint." + MAX_MESSAGE_BYTES
4852

4953
val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog." + TOPIC_REPLICATION_FACTOR
5054
val DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR = CHANGELOG_STREAM_REPLICATION_FACTOR format "default"
5155
val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
5256
// The default segment size to use for changelog topics
5357
val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912"
58+
val CHANGELOG_MAX_MESSAGE_BYTES = "stores.%s.changelog." + MAX_MESSAGE_BYTES
5459

5560
// Helper regular expression definitions to extract/match configurations
5661
val CHANGELOG_STREAM_NAMES_REGEX = "stores\\.(.*)\\.changelog$"
5762

5863
val JOB_COORDINATOR_REPLICATION_FACTOR = "job.coordinator." + TOPIC_REPLICATION_FACTOR
5964
val JOB_COORDINATOR_SEGMENT_BYTES = "job.coordinator." + SEGMENT_BYTES
65+
val JOB_COORDINATOR_MAX_MESSAGE_BYTES = "job.coordinator." + MAX_MESSAGE_BYTES
6066

6167
val CONSUMER_CONFIGS_CONFIG_KEY = "systems.%s.consumer.%s"
6268
val PRODUCER_BOOTSTRAP_SERVERS_CONFIG_KEY = "systems.%s.producer.bootstrap.servers"
@@ -116,6 +122,20 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
116122
defaultReplicationFactor
117123
}
118124

125+
/**
126+
* Gets the max message bytes for the checkpoint topic. Uses the following precedence.
127+
*
128+
* 1. If task.checkpoint.max.message.bytes is configured, that value is used.
129+
* 2. If systems.checkpoint-system.default.stream.max.message.bytes is configured, that value is used.
130+
* 3. 1000012
131+
*
132+
* Note that the checkpoint-system has a similar precedence. See [[getCheckpointSystem]]
133+
*/
134+
def getCheckpointMaxMessageBytes() = {
135+
val defaultmessageBytes = new SystemConfig(config).getDefaultStreamProperties(getCheckpointSystem.orNull).getInt(KafkaConfig.MAX_MESSAGE_BYTES, KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES.toInt)
136+
getInt(KafkaConfig.CHECKPOINT_MAX_MESSAGE_BYTES, defaultmessageBytes)
137+
}
138+
119139
/**
120140
* Gets the segment bytes for the checkpoint topic. Uses the following precedence.
121141
*
@@ -130,6 +150,23 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
130150
getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, defaultsegBytes)
131151
}
132152

153+
/**
154+
* Gets the max message bytes for the coordinator topic. Uses the following precedence.
155+
*
156+
* 1. If job.coordinator.max.message.bytes is configured, that value is used.
157+
* 2. If systems.coordinator-system.default.stream.max.message.bytes is configured, that value is used.
158+
* 3. 1000012
159+
*
160+
* Note that the coordinator-system has a similar precedence. See [[JobConfig.getCoordinatorSystemName]]
161+
*/
162+
def getCoordinatorMaxMessageByte = getOption(KafkaConfig.JOB_COORDINATOR_MAX_MESSAGE_BYTES) match {
163+
case Some(maxMessageBytes) => maxMessageBytes
164+
case _ =>
165+
val coordinatorSystem = new JobConfig(config).getCoordinatorSystemNameOrNull
166+
val systemMaxMessageBytes = new SystemConfig(config).getDefaultStreamProperties(coordinatorSystem).getOrDefault(KafkaConfig.MAX_MESSAGE_BYTES, KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES)
167+
systemMaxMessageBytes
168+
}
169+
133170
/**
134171
* Gets the replication factor for the coordinator topic. Uses the following precedence.
135172
*
@@ -224,6 +261,23 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
224261
getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse(getSystemDefaultReplicationFactor(changelogSystem, "2"))
225262
}
226263

264+
/**
265+
* Gets the max message bytes for the changelog topics. Uses the following precedence.
266+
*
267+
* 1. If stores.myStore.changelog.max.message.bytes is configured, that value is used.
268+
* 2. If systems.changelog-system.default.stream.max.message.bytes is configured, that value is used.
269+
* 3. 1000012
270+
*
271+
* Note that the changelog-system has a similar precedence. See [[StorageConfig]]
272+
*/
273+
def getChangelogStreamMaxMessageByte(name: String) = getOption(KafkaConfig.CHANGELOG_MAX_MESSAGE_BYTES format name) match {
274+
case Some(maxMessageBytes) => maxMessageBytes
275+
case _ =>
276+
val changelogSystem = new StorageConfig(config).getChangelogSystem.orElse(null)
277+
val systemMaxMessageBytes = new SystemConfig(config).getDefaultStreamProperties(changelogSystem).getOrDefault(KafkaConfig.MAX_MESSAGE_BYTES, KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES)
278+
systemMaxMessageBytes
279+
}
280+
227281
// The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream
228282
def getKafkaChangelogEnabledStores() = {
229283
val changelogConfigs = config.regexSubset(KafkaConfig.CHANGELOG_STREAM_NAMES_REGEX).asScala
@@ -267,6 +321,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
267321
}
268322
case _ =>
269323
kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
324+
kafkaChangeLogProperties.setProperty("max.message.bytes", getChangelogStreamMaxMessageByte(name))
270325
}
271326

272327
kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
@@ -281,19 +336,22 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
281336
// topic.
282337
def getCheckpointTopicProperties() = {
283338
val segmentBytes: Int = getCheckpointSegmentBytes()
339+
val maxMessageBytes: Int = getCheckpointMaxMessageBytes()
284340
val appConfig = new ApplicationConfig(config)
285341
val isStreamMode = appConfig.getAppMode == ApplicationMode.STREAM
286342
val properties = new Properties()
287343

288344
if (isStreamMode) {
289345
properties.putAll(ImmutableMap.of(
290346
"cleanup.policy", "compact",
291-
"segment.bytes", String.valueOf(segmentBytes)))
347+
"segment.bytes", String.valueOf(segmentBytes),
348+
"max.message.bytes", String.valueOf(maxMessageBytes)))
292349
} else {
293350
properties.putAll(ImmutableMap.of(
294351
"cleanup.policy", "compact,delete",
295352
"retention.ms", String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH),
296-
"segment.bytes", String.valueOf(segmentBytes)))
353+
"segment.bytes", String.valueOf(segmentBytes),
354+
"max.message.bytes", String.valueOf(maxMessageBytes)))
297355
}
298356
properties
299357
}

samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,11 @@ class KafkaSystemFactory extends SystemFactory with Logging {
9898

9999
def getCoordinatorTopicProperties(config: Config) = {
100100
val segmentBytes = config.getCoordinatorSegmentBytes
101+
val maxMessageBytes = config.getCoordinatorMaxMessageByte
101102
(new Properties /: Map(
102103
"cleanup.policy" -> "compact",
103-
"segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
104+
"segment.bytes" -> segmentBytes,
105+
"max.message.bytes" -> maxMessageBytes)) { case (props, (k, v)) => props.put(k, v); props }
104106
}
105107

106108
def getIntermediateStreamProperties(config: Config): Map[String, Properties] = {

samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ class TestKafkaConfig {
8585
props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
8686
props.setProperty("stores.test1.changelog", "kafka.mychangelog1")
8787
props.setProperty("stores.test2.changelog", "kafka.mychangelog2")
88+
props.setProperty("stores.test2.changelog.max.message.bytes", "1024000")
8889
props.setProperty("job.changelog.system", "kafka")
8990
props.setProperty("stores.test3.changelog", "otherstream")
9091
props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete")
@@ -98,6 +99,7 @@ class TestKafkaConfig {
9899
val kafkaConfig = new KafkaConfig(mapConfig)
99100
assertEquals(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete")
100101
assertEquals(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact")
102+
assertEquals(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("max.message.bytes"), "1024000")
101103
assertEquals(kafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact")
102104
val storeToChangelog = kafkaConfig.getKafkaChangelogEnabledStores()
103105
assertEquals("mychangelog1", storeToChangelog.get("test1").getOrElse(""))
@@ -114,6 +116,7 @@ class TestKafkaConfig {
114116

115117
assertEquals(kafkaConfig.getChangelogKafkaProperties("test4").getProperty("cleanup.policy"), "delete")
116118
assertEquals(kafkaConfig.getChangelogKafkaProperties("test4").getProperty("retention.ms"), "3600")
119+
assertEquals(kafkaConfig.getChangelogKafkaProperties("test4").getProperty("max.message.bytes"), null)
117120

118121
assertEquals(kafkaConfig.getChangelogKafkaProperties("test5").getProperty("cleanup.policy"), "delete")
119122
assertEquals(kafkaConfig.getChangelogKafkaProperties("test5").getProperty("retention.ms"), "1000")
@@ -133,6 +136,8 @@ class TestKafkaConfig {
133136
assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("cleanup.policy"), "compact")
134137
assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("delete.retention.ms"),
135138
String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH))
139+
assertEquals(batchKafkaConfig.getChangelogKafkaProperties("test3").getProperty("max.message.bytes"),
140+
KafkaConfig.DEFAULT_LOG_COMPACT_TOPIC_MAX_MESSAGE_BYTES)
136141
}
137142

138143
@Test
@@ -247,97 +252,113 @@ class TestKafkaConfig {
247252
}
248253

249254
@Test
250-
def testCheckpointReplicationFactor() {
255+
def testCheckpointConfigs() {
251256
val emptyConfig = new KafkaConfig(new MapConfig())
252257
assertEquals("3", emptyConfig.getCheckpointReplicationFactor.orNull)
258+
assertEquals(1000012, emptyConfig.getCheckpointMaxMessageBytes())
253259
assertNull(emptyConfig.getCheckpointSystem.orNull)
254260

255261
props.setProperty(KafkaConfig.CHECKPOINT_SYSTEM, "kafka-system")
256262
props.setProperty("task.checkpoint.replication.factor", "4")
263+
props.setProperty("task.checkpoint.max.message.bytes", "2048000")
257264

258265
val mapConfig = new MapConfig(props.asScala.asJava)
259266
val kafkaConfig = new KafkaConfig(mapConfig)
260267
assertEquals("kafka-system", kafkaConfig.getCheckpointSystem.orNull)
261268
assertEquals("4", kafkaConfig.getCheckpointReplicationFactor.orNull)
269+
assertEquals(2048000, kafkaConfig.getCheckpointMaxMessageBytes())
262270
}
263271

264272
@Test
265-
def testCheckpointReplicationFactorWithSystemDefault() {
273+
def testCheckpointConfigsWithSystemDefault() {
266274
props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
267275
props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
268276
props.setProperty("systems.other-kafka-system.default.stream.segment.bytes", "8675309")
277+
props.setProperty("systems.other-kafka-system.default.stream.max.message.bytes", "4096000")
269278

270279
val mapConfig = new MapConfig(props.asScala.asJava)
271280
val kafkaConfig = new KafkaConfig(mapConfig)
272281
assertEquals("other-kafka-system", kafkaConfig.getCheckpointSystem.orNull)
273282
assertEquals("8", kafkaConfig.getCheckpointReplicationFactor.orNull)
274283
assertEquals(8675309, kafkaConfig.getCheckpointSegmentBytes)
284+
assertEquals(4096000, kafkaConfig.getCheckpointMaxMessageBytes())
275285
}
276286

277287
@Test
278-
def testCheckpointReplicationFactorWithSystemOverriddenDefault() {
288+
def testCheckpointConfigsWithSystemOverriddenDefault() {
279289
// Defaults
280290
props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
281291
props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
292+
props.setProperty("systems.other-kafka-system.default.stream.max.message.bytes", "4096000")
282293
props.setProperty("systems.kafka-system.default.stream.segment.bytes", "8675309")
283294

284295
// Overrides
285296
props.setProperty(KafkaConfig.CHECKPOINT_SYSTEM, "kafka-system")
286297
props.setProperty(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR, "4")
298+
props.setProperty(KafkaConfig.CHECKPOINT_MAX_MESSAGE_BYTES, "8192000")
287299

288300
val mapConfig = new MapConfig(props.asScala.asJava)
289301
val kafkaConfig = new KafkaConfig(mapConfig)
290302
assertEquals("kafka-system", kafkaConfig.getCheckpointSystem.orNull)
291303
assertEquals("4", kafkaConfig.getCheckpointReplicationFactor.orNull)
292304
assertEquals(8675309, kafkaConfig.getCheckpointSegmentBytes)
305+
assertEquals(8192000, kafkaConfig.getCheckpointMaxMessageBytes())
293306
}
294307

295308
@Test
296-
def testCoordinatorReplicationFactor() {
309+
def testCoordinatorConfigs() {
297310
val emptyConfig = new KafkaConfig(new MapConfig())
298311
assertEquals("3", emptyConfig.getCoordinatorReplicationFactor)
312+
assertEquals("1000012", emptyConfig.getCoordinatorMaxMessageByte)
299313
assertNull(new JobConfig(new MapConfig()).getCoordinatorSystemNameOrNull)
300314

301315
props.setProperty(JobConfig.JOB_COORDINATOR_SYSTEM, "kafka-system")
302316
props.setProperty(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR, "4")
317+
props.setProperty(KafkaConfig.JOB_COORDINATOR_MAX_MESSAGE_BYTES, "1024000")
303318

304319
val mapConfig = new MapConfig(props.asScala.asJava)
305320
val kafkaConfig = new KafkaConfig(mapConfig)
306321
val jobConfig = new JobConfig(mapConfig)
307322
assertEquals("kafka-system", jobConfig.getCoordinatorSystemNameOrNull)
308323
assertEquals("4", kafkaConfig.getCoordinatorReplicationFactor)
324+
assertEquals("1024000", kafkaConfig.getCoordinatorMaxMessageByte)
309325
}
310326

311327
@Test
312-
def testCoordinatorReplicationFactorWithSystemDefault() {
328+
def testCoordinatorConfigsWithSystemDefault() {
313329
props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
314330
props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
315331
props.setProperty("systems.other-kafka-system.default.stream.segment.bytes", "8675309")
332+
props.setProperty("systems.other-kafka-system.default.stream.max.message.bytes", "2048000")
316333

317334
val mapConfig = new MapConfig(props.asScala.asJava)
318335
val kafkaConfig = new KafkaConfig(mapConfig)
319336
val jobConfig = new JobConfig(mapConfig)
320337
assertEquals("other-kafka-system", jobConfig.getCoordinatorSystemNameOrNull)
321338
assertEquals("8", kafkaConfig.getCoordinatorReplicationFactor)
322339
assertEquals("8675309", kafkaConfig.getCoordinatorSegmentBytes)
340+
assertEquals("2048000", kafkaConfig.getCoordinatorMaxMessageByte)
323341
}
324342

325343
@Test
326-
def testCoordinatorReplicationFactorWithSystemOverriddenDefault() {
344+
def testCoordinatorConfigsWithSystemOverriddenDefault() {
327345
// Defaults
328346
props.setProperty(JobConfig.JOB_DEFAULT_SYSTEM, "other-kafka-system")
329347
props.setProperty("systems.other-kafka-system.default.stream.replication.factor", "8")
348+
props.setProperty("systems.other-kafka-system.default.stream.max.message.byte", "2048000")
330349
props.setProperty("systems.kafka-system.default.stream.segment.bytes", "8675309")
331350

332351
// Overrides
333352
props.setProperty(JobConfig.JOB_COORDINATOR_SYSTEM, "kafka-system")
334353
props.setProperty(KafkaConfig.JOB_COORDINATOR_REPLICATION_FACTOR, "4")
354+
props.setProperty(KafkaConfig.JOB_COORDINATOR_MAX_MESSAGE_BYTES, "4096000")
335355

336356
val mapConfig = new MapConfig(props.asScala.asJava)
337357
val kafkaConfig = new KafkaConfig(mapConfig)
338358
val jobConfig = new JobConfig(mapConfig)
339359
assertEquals("kafka-system", jobConfig.getCoordinatorSystemNameOrNull)
340360
assertEquals("4", kafkaConfig.getCoordinatorReplicationFactor)
361+
assertEquals("4096000", kafkaConfig.getCoordinatorMaxMessageByte)
341362
assertEquals("8675309", kafkaConfig.getCoordinatorSegmentBytes)
342363
}
343364
}

0 commit comments

Comments
 (0)