Skip to content

Commit 18b5a55

Browse files
hqinedenhill
authored andcommitted
Expose the librdkafka stats as events in the go client (confluentinc#57)
1 parent 181a992 commit 18b5a55

File tree

5 files changed

+148
-2
lines changed

5 files changed

+148
-2
lines changed

kafka/consumer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ func NewConsumer(conf *ConfigMap) (*Consumer, error) {
317317
cErrstr := (*C.char)(C.malloc(C.size_t(256)))
318318
defer C.free(unsafe.Pointer(cErrstr))
319319

320-
C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT)
320+
C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_REBALANCE|C.RD_KAFKA_EVENT_OFFSET_COMMIT|C.RD_KAFKA_EVENT_STATS)
321321

322322
c.handle.rk = C.rd_kafka_new(C.RD_KAFKA_CONSUMER, cConf, cErrstr, 256)
323323
if c.handle.rk == nil {

kafka/event.go

+12
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,15 @@ type Event interface {
5555

5656
// Specific event types
5757

58+
// Stats statistics event
59+
type Stats struct {
60+
statsJSON string
61+
}
62+
63+
func (e Stats) String() string {
64+
return e.statsJSON
65+
}
66+
5867
// AssignedPartitions consumer group rebalance event: assigned partition set
5968
type AssignedPartitions struct {
6069
Partitions []TopicPartition
@@ -190,6 +199,9 @@ out:
190199
retval = newErrorFromCString(cErr, C.rd_kafka_event_error_string(rkev))
191200
}
192201

202+
case C.RD_KAFKA_EVENT_STATS:
203+
retval = &Stats{C.GoString(C.rd_kafka_event_stats(rkev))}
204+
193205
case C.RD_KAFKA_EVENT_DR:
194206
// Producer Delivery Report event
195207
// Each such event contains delivery reports for all

kafka/event_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,7 @@ func TestEventAPIs(t *testing.T) {
3737

3838
committedOffsets := OffsetsCommitted{}
3939
t.Logf("%s\n", committedOffsets.String())
40+
41+
stats := Stats{"{\"name\": \"Producer-1\"}"}
42+
t.Logf("Stats: %s\n", stats.String())
4043
}

kafka/producer.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func NewProducer(conf *ConfigMap) (*Producer, error) {
275275
cErrstr := (*C.char)(C.malloc(C.size_t(256)))
276276
defer C.free(unsafe.Pointer(cErrstr))
277277

278-
C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR)
278+
C.rd_kafka_conf_set_events(cConf, C.RD_KAFKA_EVENT_DR|C.RD_KAFKA_EVENT_STATS)
279279

280280
// Create librdkafka producer instance
281281
p.handle.rk = C.rd_kafka_new(C.RD_KAFKA_PRODUCER, cConf, cErrstr, 256)

kafka/stats_event_test.go

+131
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* Copyright 2017 Confluent Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kafka
18+
19+
import (
20+
"encoding/json"
21+
"testing"
22+
"time"
23+
)
24+
25+
// handleStatsEvent checks for stats event and signals on statsReceived
26+
func handleStatsEvent(t *testing.T, eventCh chan Event, statsReceived chan bool) {
27+
for ev := range eventCh {
28+
switch e := ev.(type) {
29+
case *Stats:
30+
t.Logf("Stats: %v", e)
31+
32+
// test if the stats string can be decoded into JSON
33+
var raw map[string]interface{}
34+
err := json.Unmarshal([]byte(e.String()), &raw) // convert string to json
35+
if err != nil {
36+
t.Fatalf("json unmarshall error: %s", err)
37+
}
38+
t.Logf("Stats['name']: %s", raw["name"])
39+
close(statsReceived)
40+
return
41+
default:
42+
t.Logf("Ignored event: %v", e)
43+
}
44+
}
45+
}
46+
47+
// TestStatsEventProducerFunc dry-test stats event, no broker is needed.
48+
func TestStatsEventProducerFunc(t *testing.T) {
49+
testProducerFunc(t, false)
50+
}
51+
52+
func TestStatsEventProducerChannel(t *testing.T) {
53+
testProducerFunc(t, true)
54+
}
55+
56+
func testProducerFunc(t *testing.T, withProducerChannel bool) {
57+
58+
p, err := NewProducer(&ConfigMap{
59+
"statistics.interval.ms": 50,
60+
"socket.timeout.ms": 10,
61+
"default.topic.config": ConfigMap{"message.timeout.ms": 10}})
62+
if err != nil {
63+
t.Fatalf("%s", err)
64+
}
65+
defer p.Close()
66+
67+
t.Logf("Producer %s", p)
68+
69+
topic1 := "gotest"
70+
71+
// go routine to check for stats event
72+
statsReceived := make(chan bool)
73+
go handleStatsEvent(t, p.Events(), statsReceived)
74+
75+
if withProducerChannel {
76+
err = p.Produce(&Message{TopicPartition: TopicPartition{Topic: &topic1, Partition: 0},
77+
Value: []byte("Own drChan"), Key: []byte("This is my key")},
78+
nil)
79+
if err != nil {
80+
t.Errorf("Produce failed: %s", err)
81+
}
82+
} else {
83+
p.ProduceChannel() <- &Message{TopicPartition: TopicPartition{Topic: &topic1, Partition: 0},
84+
Value: []byte("Own drChan"), Key: []byte("This is my key")}
85+
86+
}
87+
88+
select {
89+
case <-statsReceived:
90+
t.Logf("Stats recevied")
91+
case <-time.After(time.Second * 3):
92+
t.Fatalf("Excepted stats but got none")
93+
}
94+
95+
return
96+
}
97+
98+
// TestStatsEventConsumerChannel dry-tests stats event for consumer, no broker is needed.
99+
func TestStatsEventConsumerChannel(t *testing.T) {
100+
101+
c, err := NewConsumer(&ConfigMap{
102+
"group.id": "gotest",
103+
"statistics.interval.ms": 50,
104+
"go.events.channel.enable": true,
105+
"socket.timeout.ms": 10,
106+
"session.timeout.ms": 10})
107+
if err != nil {
108+
t.Fatalf("%s", err)
109+
}
110+
111+
defer c.Close()
112+
113+
t.Logf("Consumer %s", c)
114+
115+
// go routine to check for stats event
116+
statsReceived := make(chan bool)
117+
go handleStatsEvent(t, c.Events(), statsReceived)
118+
119+
err = c.Subscribe("gotest", nil)
120+
if err != nil {
121+
t.Errorf("Subscribe failed: %s", err)
122+
}
123+
124+
select {
125+
case <-statsReceived:
126+
t.Logf("Stats recevied")
127+
case <-time.After(time.Second * 3):
128+
t.Fatalf("Excepted stats but got none")
129+
}
130+
131+
}

0 commit comments

Comments
 (0)