Skip to content

Commit e93eb72

Browse files
committed
[SPARK-4964] refactor to add preferredLocations. depends on SPARK-4014
1 parent 356c7cc commit e93eb72

File tree

4 files changed

+114
-38
lines changed

4 files changed

+114
-38
lines changed

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaCluster.scala

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import kafka.consumer.{ConsumerConfig, SimpleConsumer}
3232
* NOT zookeeper servers, specified in host1:port1,host2:port2 form
3333
*/
3434
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
35-
import KafkaCluster.Err
35+
import KafkaCluster.{Err, LeaderOffset}
3636

3737
val seedBrokers: Array[(String, Int)] =
3838
kafkaParams.get("metadata.broker.list")
@@ -131,18 +131,18 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
131131

132132
def getLatestLeaderOffsets(
133133
topicAndPartitions: Set[TopicAndPartition]
134-
): Either[Err, Map[TopicAndPartition, Long]] =
134+
): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
135135
getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)
136136

137137
def getEarliestLeaderOffsets(
138138
topicAndPartitions: Set[TopicAndPartition]
139-
): Either[Err, Map[TopicAndPartition, Long]] =
139+
): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
140140
getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)
141141

142142
def getLeaderOffsets(
143143
topicAndPartitions: Set[TopicAndPartition],
144144
before: Long
145-
): Either[Err, Map[TopicAndPartition, Long]] =
145+
): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
146146
getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
147147
r.map { kv =>
148148
// mapValues isnt serializable, see SI-7005
@@ -159,11 +159,11 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
159159
topicAndPartitions: Set[TopicAndPartition],
160160
before: Long,
161161
maxNumOffsets: Int
162-
): Either[Err, Map[TopicAndPartition, Seq[Long]]] = {
162+
): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
163163
findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>
164164
val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
165165
val leaders = leaderToTp.keys
166-
var result = Map[TopicAndPartition, Seq[Long]]()
166+
var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
167167
val errs = new Err
168168
withBrokers(leaders, errs) { consumer =>
169169
val needed: Seq[TopicAndPartition] = leaderToTp((consumer.host, consumer.port))
@@ -178,7 +178,9 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
178178
respMap.get(tp).foreach { errAndOffsets =>
179179
if (errAndOffsets.error == ErrorMapping.NoError) {
180180
if (errAndOffsets.offsets.nonEmpty) {
181-
result += tp -> errAndOffsets.offsets
181+
result += tp -> errAndOffsets.offsets.map { off =>
182+
LeaderOffset(consumer.host, consumer.port, off)
183+
}
182184
} else {
183185
errs.append(new Exception(
184186
s"Empty offsets for ${tp}, is ${before} before log beginning?"))
@@ -297,6 +299,8 @@ class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
297299
object KafkaCluster {
298300
type Err = ArrayBuffer[Throwable]
299301

302+
case class LeaderOffset(host: String, port: Int, offset: Long)
303+
300304
/** Make a consumer config without requiring group.id or zookeeper.connect,
301305
* since communicating with brokers also needs common settings such as timeout
302306
*/

external/kafka/src/main/scala/org/apache/spark/rdd/kafka/KafkaRDD.scala

Lines changed: 90 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,24 @@ import kafka.message.{MessageAndMetadata, MessageAndOffset}
3131
import kafka.serializer.Decoder
3232
import kafka.utils.VerifiableProperties
3333

34+
3435
case class KafkaRDDPartition(
3536
override val index: Int,
37+
/** kafka topic name */
3638
topic: String,
39+
/** kafka partition id */
3740
partition: Int,
41+
/** inclusive starting offset */
3842
fromOffset: Long,
39-
untilOffset: Long
43+
/** exclusive ending offset */
44+
untilOffset: Long,
45+
/** preferred kafka host, i.e. the leader at the time the rdd was created */
46+
host: String,
47+
/** preferred kafka host's port */
48+
port: Int
4049
) extends Partition
4150

4251
/** A batch-oriented interface for consuming from Kafka.
43-
* Each given Kafka topic/partition corresponds to an RDD partition.
4452
* Starting and ending offsets are specified in advance,
4553
* so that you can control exactly-once semantics.
4654
* For an easy interface to Kafka-managed offsets,
@@ -49,10 +57,8 @@ case class KafkaRDDPartition(
4957
* configuration parameters</a>.
5058
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
5159
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
52-
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
53-
* starting point of the batch
54-
* @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive)
55-
* ending point of the batch
60+
* @param rddPartitions Each RDD partition corresponds to a
61+
* range of offsets for a given Kafka topic/partition
5662
* @param messageHandler function for translating each message into the desired type
5763
*/
5864
class KafkaRDD[
@@ -63,20 +69,31 @@ class KafkaRDD[
6369
R: ClassTag](
6470
sc: SparkContext,
6571
val kafkaParams: Map[String, String],
66-
val fromOffsets: Map[TopicAndPartition, Long],
67-
val untilOffsets: Map[TopicAndPartition, Long],
72+
val rddPartitions: Traversable[KafkaRDDPartition],
6873
messageHandler: MessageAndMetadata[K, V] => R
6974
) extends RDD[R](sc, Nil) with Logging {
7075

71-
assert(fromOffsets.keys == untilOffsets.keys,
72-
"Must provide both from and until offsets for each topic/partition")
76+
/** per-topic/partition Kafka offsets defining the (inclusive) starting point of the batch */
77+
def fromOffsets: Map[TopicAndPartition, Long] =
78+
rddPartitions.map { kr =>
79+
TopicAndPartition(kr.topic, kr.partition) -> kr.fromOffset
80+
}.toMap
81+
82+
/** per-topic/partition Kafka offsets defining the (exclusive) ending point of the batch */
83+
def untilOffsets: Map[TopicAndPartition, Long] =
84+
rddPartitions.map { kr =>
85+
TopicAndPartition(kr.topic, kr.partition) -> kr.untilOffset
86+
}.toMap
87+
88+
override def getPartitions: Array[Partition] = rddPartitions.toArray
7389

74-
override def getPartitions: Array[Partition] = fromOffsets.zipWithIndex.map { kvi =>
75-
val ((tp, from), index) = kvi
76-
new KafkaRDDPartition(index, tp.topic, tp.partition, from, untilOffsets(tp))
77-
}.toArray
90+
override def getPreferredLocations(thePart: Partition): Seq[String] = {
91+
val part = thePart.asInstanceOf[KafkaRDDPartition]
92+
// TODO is additional hostname resolution necessary here
93+
Seq(part.host)
94+
}
7895

79-
override def compute(thePart: Partition, context: TaskContext) = {
96+
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
8097
val part = thePart.asInstanceOf[KafkaRDDPartition]
8198
if (part.fromOffset >= part.untilOffset) {
8299
log.warn("Beginning offset is same or after ending offset " +
@@ -86,25 +103,37 @@ class KafkaRDD[
86103
new NextIterator[R] {
87104
context.addTaskCompletionListener{ context => closeIfNeeded() }
88105

89-
val kc = new KafkaCluster(kafkaParams)
90106
log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
91107
s"offsets ${part.fromOffset} -> ${part.untilOffset}")
108+
109+
val kc = new KafkaCluster(kafkaParams)
92110
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
93111
.newInstance(kc.config.props)
94112
.asInstanceOf[Decoder[K]]
95113
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
96114
.newInstance(kc.config.props)
97115
.asInstanceOf[Decoder[V]]
98-
val consumer: SimpleConsumer = kc.connectLeader(part.topic, part.partition).fold(
99-
errs => throw new Exception(
100-
s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
101-
errs.mkString("\n")),
102-
consumer => consumer
103-
)
116+
val consumer = connectLeader
104117
var requestOffset = part.fromOffset
105118
var iter: Iterator[MessageAndOffset] = null
106119

107-
def handleErr(resp: FetchResponse) {
120+
// TODO broken until SPARK-4014 is resolved and attemptId / attemptNumber is meaningful.
121+
// The idea is to use the provided preferred host, except on task retry atttempts,
122+
// to minimize number of kafka metadata requests
123+
private def connectLeader: SimpleConsumer = {
124+
if (context.attemptId > 0) {
125+
kc.connectLeader(part.topic, part.partition).fold(
126+
errs => throw new Exception(
127+
s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
128+
errs.mkString("\n")),
129+
consumer => consumer
130+
)
131+
} else {
132+
kc.connect(part.host, part.port)
133+
}
134+
}
135+
136+
private def handleErr(resp: FetchResponse) {
108137
if (resp.hasError) {
109138
val err = resp.errorCode(part.topic, part.partition)
110139
if (err == ErrorMapping.LeaderNotAvailableCode ||
@@ -160,3 +189,41 @@ class KafkaRDD[
160189
}
161190

162191
}
192+
193+
object KafkaRDD {
194+
import KafkaCluster.LeaderOffset
195+
196+
/**
197+
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
198+
* configuration parameters</a>.
199+
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
200+
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
201+
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
202+
* starting point of the batch
203+
* @param untilOffsets per-topic/partition Kafka offsets defining the (exclusive)
204+
* ending point of the batch
205+
* @param messageHandler function for translating each message into the desired type
206+
*/
207+
def apply[
208+
K: ClassTag,
209+
V: ClassTag,
210+
U <: Decoder[_]: ClassTag,
211+
T <: Decoder[_]: ClassTag,
212+
R: ClassTag](
213+
sc: SparkContext,
214+
kafkaParams: Map[String, String],
215+
fromOffsets: Map[TopicAndPartition, Long],
216+
untilOffsets: Map[TopicAndPartition, LeaderOffset],
217+
messageHandler: MessageAndMetadata[K, V] => R
218+
): KafkaRDD[K, V, U, T, R] = {
219+
assert(fromOffsets.keys == untilOffsets.keys,
220+
"Must provide both from and until offsets for each topic/partition")
221+
222+
val partitions = fromOffsets.zipWithIndex.map { case ((tp, from), index) =>
223+
val lo = untilOffsets(tp)
224+
new KafkaRDDPartition(index, tp.topic, tp.partition, from, lo.offset, lo.host, lo.port)
225+
}
226+
227+
new KafkaRDD[K, V, U, T, R](sc, kafkaParams, partitions, messageHandler)
228+
}
229+
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DeterministicKafkaInputDStream.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import kafka.serializer.Decoder
2727
import org.apache.spark.Logging
2828
import org.apache.spark.rdd.RDD
2929
import org.apache.spark.rdd.kafka.{KafkaCluster, KafkaRDD}
30+
import org.apache.spark.rdd.kafka.KafkaCluster.LeaderOffset
3031
import org.apache.spark.streaming.{StreamingContext, Time}
3132
import org.apache.spark.streaming.dstream._
3233

@@ -76,7 +77,7 @@ class DeterministicKafkaInputDStream[
7677
private var currentOffsets = fromOffsets
7778

7879
@tailrec
79-
private def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, Long] = {
80+
private def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
8081
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
8182
// Either.fold would confuse @tailrec, do it manually
8283
if (o.isLeft) {
@@ -93,20 +94,21 @@ class DeterministicKafkaInputDStream[
9394
}
9495
}
9596

96-
private def clamp(leaderOffsets: Map[TopicAndPartition, Long]): Map[TopicAndPartition, Long] = {
97+
private def clamp(
98+
leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
9799
maxMessagesPerPartition.map { mmp =>
98-
leaderOffsets.map { kv =>
99-
kv._1 -> Math.min(currentOffsets(kv._1) + mmp, kv._2)
100+
leaderOffsets.map { case (tp, lo) =>
101+
tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
100102
}
101103
}.getOrElse(leaderOffsets)
102104
}
103105

104106
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
105107
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
106-
val rdd = new KafkaRDD[K, V, U, T, R](
108+
val rdd = KafkaRDD[K, V, U, T, R](
107109
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
108110

109-
currentOffsets = untilOffsets
111+
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
110112
Some(rdd)
111113
}
112114

external/kafka/src/test/scala/org/apache/spark/rdd/kafka/KafkaRDDSuite.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,13 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
7777
for {
7878
topicPartitions <- kc.getPartitions(topics).right.toOption
7979
from <- kc.getConsumerOffsets(groupId, topicPartitions).right.toOption.orElse(
80-
kc.getEarliestLeaderOffsets(topicPartitions).right.toOption)
80+
kc.getEarliestLeaderOffsets(topicPartitions).right.toOption.map { offs =>
81+
offs.map(kv => kv._1 -> kv._2.offset)
82+
}
83+
)
8184
until <- kc.getLatestLeaderOffsets(topicPartitions).right.toOption
8285
} yield {
83-
new KafkaRDD[String, String, StringDecoder, StringDecoder, String](
86+
KafkaRDD[String, String, StringDecoder, StringDecoder, String](
8487
sc, kc.kafkaParams, from, until, mmd => s"${mmd.offset} ${mmd.message}")
8588
}
8689
}

0 commit comments

Comments
 (0)