Skip to content

Commit 8b21209

Browse files
lhaiespxiliu
authored andcommitted
SAMZA-1471: SystemConsumers should not poll ssp that hit end of stream
When SystemConsumers poll from SSPs that have hit end of stream, obviously there will be no data return and the poll will exhaust the timeout. This would cause performance issue. Author: Hai Lu <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes apache#342 from lhaiesp/master
1 parent 20698f9 commit 8b21209

File tree

2 files changed

+70
-4
lines changed

2 files changed

+70
-4
lines changed

samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.samza.util.{Logging, TimerUtils}
2828
import org.apache.samza.system.chooser.MessageChooser
2929
import org.apache.samza.SamzaException
3030
import java.util.ArrayDeque
31+
import java.util.Collections
3132
import java.util.HashSet
3233
import java.util.HashMap
3334
import java.util.Queue
@@ -186,7 +187,7 @@ class SystemConsumers (
186187
if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(offset)) {
187188
info("Stream : %s is already at end of stream" format (systemStreamPartition))
188189
endOfStreamSSPs.add(systemStreamPartition)
189-
return;
190+
return
190191
}
191192

192193
metrics.registerSystemStreamPartition(systemStreamPartition)
@@ -258,7 +259,14 @@ class SystemConsumers (
258259

259260
trace("Getting fetch map for system: %s" format systemName)
260261

261-
val systemFetchSet = emptySystemStreamPartitionsBySystem.get(systemName)
262+
val systemFetchSet : util.Set[SystemStreamPartition] =
263+
if (emptySystemStreamPartitionsBySystem.containsKey(systemName)) {
264+
val sspToFetch = new util.HashSet(emptySystemStreamPartitionsBySystem.get(systemName))
265+
sspToFetch.removeAll(endOfStreamSSPs)
266+
sspToFetch
267+
} else {
268+
Collections.emptySet()
269+
}
262270

263271
// Poll when at least one SSP in this system needs more messages.
264272

@@ -293,7 +301,7 @@ class SystemConsumers (
293301
}
294302
}
295303
} else {
296-
trace("Skipping polling for %s. Already have messages available for all registered SystemStreamPartitions." format (systemName))
304+
trace("Skipping polling for %s. Already have messages available for all registered SystemStreamPartitions." format systemName)
297305
}
298306
}
299307

samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
package org.apache.samza.system
2121

22+
import java.util
23+
import java.util.Collections
24+
2225
import org.junit.Assert._
2326
import org.junit.Test
2427
import org.apache.samza.Partition
@@ -285,10 +288,62 @@ class TestSystemConsumers {
285288

286289
}
287290

291+
@Test
292+
def testSystemConsumersShouldNotPollEndOfStreamSSPs {
293+
val system = "test-system"
294+
val stream = "some-stream"
295+
val systemStreamPartition1 = new SystemStreamPartition(system, stream, new Partition(1))
296+
val systemStreamPartition2 = new SystemStreamPartition(system, stream, new Partition(2))
297+
val normalEnvelope = new IncomingMessageEnvelope(systemStreamPartition1, "1", "k", "v")
298+
val endOfStreamEnvelope = IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition2)
299+
val consumer = new CustomPollResponseSystemConsumer(normalEnvelope)
300+
val consumers = new SystemConsumers(new MockMessageChooser, Map(system -> consumer),
301+
new SerdeManager, new SystemConsumersMetrics,
302+
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT,
303+
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR,
304+
SystemConsumers.DEFAULT_POLL_INTERVAL_MS, clock = () => 0)
305+
306+
consumers.register(systemStreamPartition1, "0")
307+
consumers.register(systemStreamPartition2, "0")
308+
consumers.start
309+
310+
// Start should trigger a poll to the consumer.
311+
assertEquals(1, consumer.polls)
312+
assertEquals(2, consumer.lastPoll.size())
313+
314+
// Tell the consumer to start returning messages when polled.
315+
val nextResponse = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]](
316+
systemStreamPartition1 -> Collections.singletonList(normalEnvelope),
317+
systemStreamPartition2 -> Collections.singletonList(endOfStreamEnvelope)
318+
)
319+
consumer.setNextResponse(nextResponse)
320+
321+
// Choose to trigger a refresh with data.
322+
assertNull(consumers.choose())
323+
324+
// Choose should have triggered a second poll, since no messages are available.
325+
assertEquals(2, consumer.polls)
326+
assertEquals(2, consumer.lastPoll.size())
327+
328+
// Choose a few times and let chooser handle the end of stream message
329+
assertNotNull(consumers.choose())
330+
assertNotNull(consumers.choose())
331+
consumers.tryUpdate(systemStreamPartition1)
332+
// Now assuming that chooser has processed end of stream message,
333+
// tryUpdate shouldn't add ssp back to emptySystemStreamPartitionsBySystem
334+
consumers.tryUpdate(systemStreamPartition2)
335+
assertNull(consumers.choose())
336+
assertEquals(3, consumer.polls)
337+
// SystemConsumers should poll only one partition: ssp1
338+
assertEquals(1, consumer.lastPoll.size())
339+
assertTrue(consumer.lastPoll.contains(systemStreamPartition1))
340+
}
341+
288342
/**
289343
* A simple MockSystemConsumer that keeps track of what was polled, and lets
290344
* you define how many envelopes to return in the poll response. You can
291345
* supply the envelope to use for poll responses through the constructor.
346+
* You can also directly set the next response by calling setNextResponse
292347
*/
293348
private class CustomPollResponseSystemConsumer(envelope: IncomingMessageEnvelope) extends SystemConsumer {
294349
var polls = 0
@@ -299,7 +354,7 @@ class TestSystemConsumers {
299354
def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
300355
def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = {
301356
polls += 1
302-
lastPoll = systemStreamPartitions
357+
lastPoll = new util.HashSet[SystemStreamPartition](systemStreamPartitions)
303358
pollResponse.asJava
304359
}
305360
def setResponseSizes(numEnvelopes: Int) {
@@ -308,6 +363,9 @@ class TestSystemConsumers {
308363
pollResponse = Map(envelope.getSystemStreamPartition -> q)
309364
pollResponse = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
310365
}
366+
def setNextResponse(nextResponse: Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]) {
367+
pollResponse = nextResponse
368+
}
311369
}
312370

313371
/**

0 commit comments

Comments
 (0)