Skip to content

Commit dab3c94

Browse files
committed
Added Events() and ProduceChannel() getters (issue confluentinc#5)
1 parent 9a023d4 commit dab3c94

File tree

9 files changed

+56
-41
lines changed

9 files changed

+56
-41
lines changed

examples/consumer_channel_example/consumer_channel_example.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func main() {
6565
fmt.Printf("Caught signal %v: terminating\n", sig)
6666
run = false
6767

68-
case ev := <-c.Events:
68+
case ev := <-c.Events():
6969
switch e := ev.(type) {
7070
case kafka.AssignedPartitions:
7171
fmt.Fprintf(os.Stderr, "%% %v\n", e)

examples/go-kafkacat/go-kafkacat.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func runProducer(config *kafka.ConfigMap, topic string, partition int32) {
6060
fmt.Fprintf(os.Stderr, "%% Delivered %v\n", m)
6161
}
6262
}
63-
}(p.Events)
63+
}(p.Events())
6464

6565
reader := bufio.NewReader(os.Stdin)
6666
stdinChan := make(chan string)
@@ -110,7 +110,7 @@ func runProducer(config *kafka.ConfigMap, topic string, partition int32) {
110110
msg.Value = ([]byte)(line)
111111
}
112112

113-
p.ProduceChannel <- &msg
113+
p.ProduceChannel() <- &msg
114114
}
115115
}
116116

@@ -140,7 +140,7 @@ func runConsumer(config *kafka.ConfigMap, topics []string) {
140140
fmt.Fprintf(os.Stderr, "%% Terminating on signal %v\n", sig)
141141
run = false
142142

143-
case ev := <-c.Events:
143+
case ev := <-c.Events():
144144
switch e := ev.(type) {
145145
case kafka.AssignedPartitions:
146146
fmt.Fprintf(os.Stderr, "%% %v\n", e)

examples/producer_channel_example/producer_channel_example.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func main() {
4747

4848
go func() {
4949
outer:
50-
for e := range p.Events {
50+
for e := range p.Events() {
5151
switch ev := e.(type) {
5252
case *kafka.Message:
5353
m := ev
@@ -68,7 +68,7 @@ func main() {
6868
}()
6969

7070
value := "Hello Go!"
71-
p.ProduceChannel <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}
71+
p.ProduceChannel() <- &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}
7272

7373
// wait for delivery report goroutine to finish
7474
_ = <-doneChan

kafka/consumer.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type RebalanceCb func(*Consumer, Event) error
3333

3434
// Consumer implements a High-level Apache Kafka Consumer instance
3535
type Consumer struct {
36-
Events chan Event
36+
events chan Event
3737
handle handle
3838
eventsChanEnable bool
3939
readerTermChan chan bool
@@ -202,6 +202,11 @@ func (c *Consumer) Poll(timeoutMs int) (event Event) {
202202
return ev
203203
}
204204

205+
// Events returns the Events channel (if enabled)
206+
func (c *Consumer) Events() chan Event {
207+
return c.events
208+
}
209+
205210
// Close Consumer instance.
206211
// The object is no longer usable after this call.
207212
func (c *Consumer) Close() (err error) {
@@ -231,12 +236,12 @@ func (c *Consumer) Close() (err error) {
231236
// NewConsumer creates a new high-level Consumer instance.
232237
//
233238
// Supported special configuration properties:
234-
// go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events channel.
239+
// go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel.
235240
// If set to true the app must handle the AssignedPartitions and
236241
// RevokedPartitions events and call Assign() and Unassign()
237242
// respectively.
238-
// go.events.channel.enable (bool, false) - Enable the Events channel. Messages and events will be pushed on the Events channel and the Poll() interface will be disabled. (Experimental)
239-
// go.events.channel.size (int, 1000) - Events channel size
243+
// go.events.channel.enable (bool, false) - Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. (Experimental)
244+
// go.events.channel.size (int, 1000) - Events() channel size
240245
//
241246
// WARNING: Due to the buffering nature of channels (and queues in general) the
242247
// use of the events channel risks receiving outdated events and
@@ -299,10 +304,10 @@ func NewConsumer(conf *ConfigMap) (*Consumer, error) {
299304
}
300305

301306
if c.eventsChanEnable {
302-
c.Events = make(chan Event, eventsChanSize)
307+
c.events = make(chan Event, eventsChanSize)
303308
c.readerTermChan = make(chan bool)
304309

305-
/* Start rdkafka consumer queue reader -> Events writer goroutine */
310+
/* Start rdkafka consumer queue reader -> events writer goroutine */
306311
go consumerReader(c, c.readerTermChan)
307312
}
308313

@@ -332,7 +337,7 @@ out:
332337
case _ = <-termChan:
333338
break out
334339
default:
335-
_, term := c.handle.eventPoll(c.Events, 100, 1000, termChan)
340+
_, term := c.handle.eventPoll(c.events, 100, 1000, termChan)
336341
if term {
337342
break out
338343
}

kafka/consumer_performance_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func handleEvent(c *Consumer, rd *ratedisp, expCnt int, ev Event) bool {
102102

103103
// consume messages through the Events channel
104104
func eventChannelConsumer(c *Consumer, rd *ratedisp, expCnt int) {
105-
for ev := range c.Events {
105+
for ev := range c.Events() {
106106
if !handleEvent(c, rd, expCnt, ev) {
107107
break
108108
}
@@ -148,7 +148,7 @@ func testconsumerInit(b *testing.B) int {
148148
producerPerfTest(b, "Priming producer", msgcnt, false, false,
149149
true,
150150
func(p *Producer, m *Message, drChan chan Event) {
151-
p.ProduceChannel <- m
151+
p.ProduceChannel() <- m
152152
})
153153
}
154154

kafka/producer.go

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ import "C"
3838

3939
// Producer implements a High-level Apache Kafka Producer instance
4040
type Producer struct {
41-
Events chan Event
42-
ProduceChannel chan *Message
41+
events chan Event
42+
produceChannel chan *Message
4343
handle handle
4444

4545
// Terminates the poller() goroutine
@@ -114,7 +114,7 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)
114114
// This is an asynchronous call that enqueues the message on the internal
115115
// transmit queue, thus returning immediately.
116116
// The delivery report will be sent on the provided deliveryChan if specified,
117-
// or on the Producer object's Events channel if not.
117+
// or on the Producer object's Events() channel if not.
118118
// Returns an error if message could not be enqueued.
119119
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error {
120120
return p.produce(msg, 0, deliveryChan)
@@ -140,11 +140,21 @@ func (p *Producer) produceBatch(topic string, msgs []*Message, msgFlags int) err
140140
return nil
141141
}
142142

143+
// Events returns the Events channel (read)
144+
func (p *Producer) Events() chan Event {
145+
return p.events
146+
}
147+
148+
// ProduceChannel returns the produce *Message channel (write)
149+
func (p *Producer) ProduceChannel() chan *Message {
150+
return p.produceChannel
151+
}
152+
143153
// Len returns the number of messages and requests waiting to be transmitted to the broker
144154
// as well as delivery reports queued for the application.
145155
// Includes messages on ProduceChannel.
146156
func (p *Producer) Len() int {
147-
return len(p.ProduceChannel) + len(p.Events) + int(C.rd_kafka_outq_len(p.handle.rk))
157+
return len(p.produceChannel) + len(p.events) + int(C.rd_kafka_outq_len(p.handle.rk))
148158
}
149159

150160
// Flush and wait for outstanding messages and requests to complete delivery.
@@ -162,7 +172,7 @@ func (p *Producer) Flush(timeoutMs int) int {
162172
return p.Len()
163173
}
164174

165-
p.handle.eventPoll(p.Events,
175+
p.handle.eventPoll(p.events,
166176
int(math.Min(100, remain*1000)), 1000, termChan)
167177
}
168178

@@ -175,10 +185,10 @@ func (p *Producer) Close() {
175185
// Wait for poller() (signaled by closing pollerTermChan)
176186
// and channel_producer() (signaled by closing ProduceChannel)
177187
close(p.pollerTermChan)
178-
close(p.ProduceChannel)
188+
close(p.produceChannel)
179189
p.handle.waitTerminated(2)
180190

181-
close(p.Events)
191+
close(p.events)
182192

183193
p.handle.cleanup()
184194

@@ -197,8 +207,8 @@ func (p *Producer) Close() {
197207
// go.batch.producer (bool, false) - Enable batch producer (experimental for increased performance).
198208
// These batches do not relate to Kafka message batches in any way.
199209
// go.delivery.reports (bool, true) - Forward per-message delivery reports to the
200-
// Events channel.
201-
// go.produce.channel.size (int, 1000000) - ProduceChannel buffer size (in number of messages)
210+
// Events() channel.
211+
// go.produce.channel.size (int, 1000000) - ProduceChannel() buffer size (in number of messages)
202212
//
203213
func NewProducer(conf *ConfigMap) (*Producer, error) {
204214
p := &Producer{}
@@ -241,8 +251,8 @@ func NewProducer(conf *ConfigMap) (*Producer, error) {
241251
p.handle.p = p
242252
p.handle.setup()
243253
p.handle.rkq = C.rd_kafka_queue_get_main(p.handle.rk)
244-
p.Events = make(chan Event, 1000000)
245-
p.ProduceChannel = make(chan *Message, produceChannelSize)
254+
p.events = make(chan Event, 1000000)
255+
p.produceChannel = make(chan *Message, produceChannelSize)
246256
p.pollerTermChan = make(chan bool)
247257

248258
go poller(p, p.pollerTermChan)
@@ -260,11 +270,11 @@ func NewProducer(conf *ConfigMap) (*Producer, error) {
260270
// channel_producer serves the ProduceChannel channel
261271
func channelProducer(p *Producer) {
262272

263-
for m := range p.ProduceChannel {
273+
for m := range p.produceChannel {
264274
err := p.produce(m, C.RD_KAFKA_MSG_F_BLOCK, nil)
265275
if err != nil {
266276
m.TopicPartition.Error = err
267-
p.Events <- m
277+
p.events <- m
268278
}
269279
}
270280

@@ -280,14 +290,14 @@ func channelBatchProducer(p *Producer) {
280290
totMsgCnt := 0
281291
totBatchCnt := 0
282292

283-
for m := range p.ProduceChannel {
293+
for m := range p.produceChannel {
284294
buffered[*m.TopicPartition.Topic] = append(buffered[*m.TopicPartition.Topic], m)
285295
bufferedCnt++
286296

287297
loop2:
288298
for true {
289299
select {
290-
case m, ok := <-p.ProduceChannel:
300+
case m, ok := <-p.produceChannel:
291301
if !ok {
292302
break loop2
293303
}
@@ -315,7 +325,7 @@ func channelBatchProducer(p *Producer) {
315325
if err != nil {
316326
for _, m = range buffered2 {
317327
m.TopicPartition.Error = err
318-
p.Events <- m
328+
p.events <- m
319329
}
320330
}
321331
}
@@ -335,7 +345,7 @@ out:
335345
break out
336346

337347
default:
338-
_, term := p.handle.eventPoll(p.Events, 100, 1000, termChan)
348+
_, term := p.handle.eventPoll(p.events, 100, 1000, termChan)
339349
if term {
340350
break out
341351
}

kafka/producer_performance_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ func producerPerfTest(b *testing.B, testname string, msgcnt int, withDr bool, ba
9090

9191
if withDr {
9292
doneChan = make(chan int64)
93-
drChan = p.Events
94-
go deliveryHandler(b, int64(msgcnt), p.Events, doneChan)
93+
drChan = p.Events()
94+
go deliveryHandler(b, int64(msgcnt), p.Events(), doneChan)
9595
}
9696

9797
if !silent {
@@ -171,15 +171,15 @@ func BenchmarkProducerChannel(b *testing.B) {
171171
producerPerfTest(b, "Channel producer (without DR)",
172172
0, false, false, false,
173173
func(p *Producer, m *Message, drChan chan Event) {
174-
p.ProduceChannel <- m
174+
p.ProduceChannel() <- m
175175
})
176176
}
177177

178178
func BenchmarkProducerChannelDR(b *testing.B) {
179179
producerPerfTest(b, "Channel producer (with DR)",
180180
testconf.PerfMsgCount, true, false, false,
181181
func(p *Producer, m *Message, drChan chan Event) {
182-
p.ProduceChannel <- m
182+
p.ProduceChannel() <- m
183183
})
184184

185185
}
@@ -188,15 +188,15 @@ func BenchmarkProducerBatchChannel(b *testing.B) {
188188
producerPerfTest(b, "Channel producer (without DR, batch channel)",
189189
0, false, true, false,
190190
func(p *Producer, m *Message, drChan chan Event) {
191-
p.ProduceChannel <- m
191+
p.ProduceChannel() <- m
192192
})
193193
}
194194

195195
func BenchmarkProducerBatchChannelDR(b *testing.B) {
196196
producerPerfTest(b, "Channel producer (DR, batch channel)",
197197
0, true, true, false,
198198
func(p *Producer, m *Message, drChan chan Event) {
199-
p.ProduceChannel <- m
199+
p.ProduceChannel() <- m
200200
})
201201
}
202202

kafka/producer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestProducerAPIs(t *testing.T) {
5656
// Produce through ProducerChannel, uses default DR channel (Events),
5757
// pass Opaque object.
5858
myOpq := "My opaque"
59-
p.ProduceChannel <- &Message{TopicPartition: TopicPartition{Topic: &topic2, Partition: 0},
59+
p.ProduceChannel() <- &Message{TopicPartition: TopicPartition{Topic: &topic2, Partition: 0},
6060
Opaque: &myOpq,
6161
Value: []byte("ProducerChannel"), Key: []byte("This is my key")}
6262

@@ -85,7 +85,7 @@ func TestProducerAPIs(t *testing.T) {
8585

8686
// Events chan (2 messages and possibly events)
8787
for msgCnt := 0; msgCnt < 2; {
88-
ev = <-p.Events
88+
ev = <-p.Events()
8989
switch e := ev.(type) {
9090
case *Message:
9191
msgCnt++

kafkatest/go_verifiable_producer/go_verifiable_producer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func runProducer(config *kafka.ConfigMap, topic string) {
151151
fmt.Fprintf(os.Stderr, "%% Terminating on signal %v\n", sig)
152152
run = false
153153

154-
case ev := <-p.Events:
154+
case ev := <-p.Events():
155155
switch e := ev.(type) {
156156
case *kafka.Message:
157157
run = handleDr(e)

0 commit comments

Comments
 (0)