Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,18 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
}

object WithOffsetSync {
def apply(topic: String)(func: () => Unit): StreamAction = {
/**
* Run `func` to write some Kafka messages and wait until the latest offset of the given
* `TopicPartition` is not less than `expectedOffset`.
*/
def apply(
topicPartition: TopicPartition,
expectedOffset: Long)(func: () => Unit): StreamAction = {
Execute("Run Kafka Producer")(_ => {
func()
// This is a hack for the race condition that the committed message may be not visible to
// consumer for a short time.
// Looks like after the following call returns, the consumer can always read the committed
// messages.
testUtils.getLatestOffsets(Set(topic))
testUtils.waitUntilOffsetAppears(topicPartition, expectedOffset)
})
}
}
Expand Down Expand Up @@ -652,13 +656,14 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
}
}

val topicPartition = new TopicPartition(topic, 0)
// The message values are the same as their offsets to make the test easy to follow
testUtils.withTranscationalProducer { producer =>
testStream(mapped)(
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
CheckAnswer(),
WithOffsetSync(topic) { () =>
WithOffsetSync(topicPartition, expectedOffset = 5) { () =>
// Send 5 messages. They should be visible only after being committed.
producer.beginTransaction()
(0 to 4).foreach { i =>
Expand All @@ -669,7 +674,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
waitUntilBatchProcessed,
// Should not see any uncommitted messages
CheckNewAnswer(),
WithOffsetSync(topic) { () =>
WithOffsetSync(topicPartition, expectedOffset = 6) { () =>
producer.commitTransaction()
},
AdvanceManualClock(100),
Expand All @@ -678,7 +683,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
AdvanceManualClock(100),
waitUntilBatchProcessed,
CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message]
WithOffsetSync(topic) { () =>
WithOffsetSync(topicPartition, expectedOffset = 12) { () =>
// Send 5 messages and abort the transaction. They should not be read.
producer.beginTransaction()
(6 to 10).foreach { i =>
Expand All @@ -692,7 +697,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
AdvanceManualClock(100),
waitUntilBatchProcessed,
CheckNewAnswer(), // offset: 9*, 10*, 11*
WithOffsetSync(topic) { () =>
WithOffsetSync(topicPartition, expectedOffset = 18) { () =>
// Send 5 messages again. The consumer should skip the above aborted messages and read
// them.
producer.beginTransaction()
Expand All @@ -707,7 +712,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
AdvanceManualClock(100),
waitUntilBatchProcessed,
CheckNewAnswer(15, 16), // offset: 15, 16, 17*
WithOffsetSync(topic) { () =>
WithOffsetSync(topicPartition, expectedOffset = 25) { () =>
producer.beginTransaction()
producer.send(new ProducerRecord[String, String](topic, "18")).get()
producer.commitTransaction()
Expand Down Expand Up @@ -774,13 +779,14 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
}
}

val topicPartition = new TopicPartition(topic, 0)
// The message values are the same as their offsets to make the test easy to follow
testUtils.withTranscationalProducer { producer =>
testStream(mapped)(
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
CheckNewAnswer(),
WithOffsetSync(topic) { () =>
WithOffsetSync(topicPartition, expectedOffset = 5) { () =>
// Send 5 messages. They should be visible only after being committed.
producer.beginTransaction()
(0 to 4).foreach { i =>
Expand All @@ -790,13 +796,13 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
AdvanceManualClock(100),
waitUntilBatchProcessed,
CheckNewAnswer(0, 1, 2), // offset 0, 1, 2
WithOffsetSync(topic) { () =>
WithOffsetSync(topicPartition, expectedOffset = 6) { () =>
producer.commitTransaction()
},
AdvanceManualClock(100),
waitUntilBatchProcessed,
CheckNewAnswer(3, 4), // offset: 3, 4, 5* [* means it's not a committed data message]
WithOffsetSync(topic) { () =>
WithOffsetSync(topicPartition, expectedOffset = 12) { () =>
// Send 5 messages and abort the transaction. They should not be read.
producer.beginTransaction()
(6 to 10).foreach { i =>
Expand All @@ -810,7 +816,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
AdvanceManualClock(100),
waitUntilBatchProcessed,
CheckNewAnswer(9, 10), // offset: 9, 10, 11*
WithOffsetSync(topic) { () =>
WithOffsetSync(topicPartition, expectedOffset = 18) { () =>
// Send 5 messages again. The consumer should skip the above aborted messages and read
// them.
producer.beginTransaction()
Expand All @@ -825,7 +831,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase {
AdvanceManualClock(100),
waitUntilBatchProcessed,
CheckNewAnswer(15, 16), // offset: 15, 16, 17*
WithOffsetSync(topic) { () =>
WithOffsetSync(topicPartition, expectedOffset = 25) { () =>
producer.beginTransaction()
producer.send(new ProducerRecord[String, String](topic, "18")).get()
producer.commitTransaction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
producer.commitTransaction()

// Should read all committed messages
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 6)
checkAnswer(df, (1 to 5).map(_.toString).toDF)

producer.beginTransaction()
Expand All @@ -269,6 +270,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
producer.abortTransaction()

// Should not read aborted messages
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 12)
checkAnswer(df, (1 to 5).map(_.toString).toDF)

producer.beginTransaction()
Expand All @@ -278,6 +280,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
producer.commitTransaction()

// Should skip aborted messages and read new committed ones.
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 18)
checkAnswer(df, ((1 to 5) ++ (11 to 15)).map(_.toString).toDF)
}
}
Expand All @@ -301,11 +304,13 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
}

// "read_uncommitted" should see all messages including uncommitted ones
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 5)
checkAnswer(df, (1 to 5).map(_.toString).toDF)

producer.commitTransaction()

// Should read all committed messages
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 6)
checkAnswer(df, (1 to 5).map(_.toString).toDF)

producer.beginTransaction()
Expand All @@ -315,6 +320,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
producer.abortTransaction()

// "read_uncommitted" should see all messages including uncommitted or aborted ones
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 12)
checkAnswer(df, (1 to 10).map(_.toString).toDF)

producer.beginTransaction()
Expand All @@ -324,6 +330,7 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest
producer.commitTransaction()

// Should read all messages
testUtils.waitUntilOffsetAppears(new TopicPartition(topic, 0), 18)
checkAnswer(df, (1 to 15).map(_.toString).toDF)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,16 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L
}
}

/**
* Wait until the latest offset of the given `TopicPartition` is not less than `offset`.
*/
def waitUntilOffsetAppears(topicPartition: TopicPartition, offset: Long): Unit = {
eventually(timeout(60.seconds)) {
val currentOffset = getLatestOffsets(Set(topicPartition.topic)).get(topicPartition)
assert(currentOffset.nonEmpty && currentOffset.get >= offset)
}
}

private class EmbeddedZookeeper(val zkConnect: String) {
val snapshotDir = Utils.createTempDir()
val logDir = Utils.createTempDir()
Expand Down