Skip to content

Commit 80d7f30

Browse files
committed
Corrected produce w/ timestamp version check to >0.9.3. (confluentinc#41)
1 parent b97615b commit 80d7f30

File tree

3 files changed

+7
-7
lines changed

3 files changed

+7
-7
lines changed

kafka/integration_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -709,11 +709,11 @@ func TestConsumerPollRebalance(t *testing.T) {
709709

710710
// TestProducerConsumerTimestamps produces messages with timestamps
711711
// and verifies them on consumption.
712-
// Requires librdkafka >=0.9.3 and Kafka >=0.10.0.0
712+
// Requires librdkafka >=0.9.4 and Kafka >=0.10.0.0
713713
func TestProducerConsumerTimestamps(t *testing.T) {
714714
numver, strver := LibraryVersion()
715-
if numver < 0x00090300 {
716-
t.Skipf("Requires librdkafka >=0.9.3 (currently on %s)", strver)
715+
if numver < 0x00090400 {
716+
t.Skipf("Requires librdkafka >=0.9.4 (currently on %s)", strver)
717717
}
718718

719719
if !testconfRead() {

kafka/producer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ rd_kafka_resp_err_t do_produce (rd_kafka_t *rk,
3434
int64_t timestamp,
3535
uintptr_t cgoid) {
3636
37-
#if RD_KAFKA_VERSION >= 0x00090300
37+
#ifdef RD_KAFKA_V_TIMESTAMP
3838
return rd_kafka_producev(rk,
3939
RD_KAFKA_V_RKT(rkt),
4040
RD_KAFKA_V_PARTITION(partition),
@@ -147,7 +147,7 @@ func (p *Producer) produce(msg *Message, msgFlags int, deliveryChan chan Event)
147147
// transmit queue, thus returning immediately.
148148
// The delivery report will be sent on the provided deliveryChan if specified,
149149
// or on the Producer object's Events() channel if not.
150-
// msg.Timestamp requires librdkafka >= 0.9.3 (else returns ErrNotImplemented),
150+
// msg.Timestamp requires librdkafka >= 0.9.4 (else returns ErrNotImplemented),
151151
// api.version.request=true, and broker >= 0.10.0.0.
152152
// Returns an error if message could not be enqueued.
153153
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error {

kafka/producer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ func TestProducerAPIs(t *testing.T) {
6262
err = p.Produce(&Message{TopicPartition: TopicPartition{Topic: &topic2, Partition: 0}, Timestamp: time.Now()}, nil)
6363
numver, strver := LibraryVersion()
6464
t.Logf("Produce with timestamp on %s returned: %s", strver, err)
65-
if numver < 0x00090300 {
65+
if numver < 0x00090400 {
6666
if err == nil || err.(Error).Code() != ErrNotImplemented {
67-
t.Errorf("Expected Produce with timestamp to fail with ErrNotImplemented on %s, got: %s", strver, err)
67+
t.Errorf("Expected Produce with timestamp to fail with ErrNotImplemented on %s (0x%x), got: %s", strver, numver, err)
6868
}
6969
} else {
7070
if err != nil {

0 commit comments

Comments
 (0)