@@ -26,6 +26,13 @@ import (
26
26
/*
27
27
#include <stdlib.h>
28
28
#include <librdkafka/rdkafka.h>
29
+
30
+ rd_kafka_resp_err_t do_produce (rd_kafka_topic_t *rkt, int32_t partition,
31
+ int msg_flags,
32
+ void *val, size_t val_len, void *key, size_t key_len, uintptr_t cgoid) {
33
+ return rd_kafka_produce(rkt, partition, msg_flags, val, val_len, key, key_len,
34
+ (void *)cgoid);
35
+ }
29
36
*/
30
37
import "C"
31
38
@@ -81,7 +88,7 @@ func (p *Producer) produce(msg *Message, msg_flags int, delivery_chan chan Event
81
88
}
82
89
}
83
90
84
- var cgoid uintptr
91
+ var cgoid int
85
92
86
93
// Per-message state that needs to be retained through the C code:
87
94
// delivery channel (if specified)
@@ -93,10 +100,11 @@ func (p *Producer) produce(msg *Message, msg_flags int, delivery_chan chan Event
93
100
cgoid = p .handle .cgo_put (cgo_dr {delivery_chan : delivery_chan , opaque : opaque })
94
101
}
95
102
96
- r := int (C .rd_kafka_produce (c_rkt , C .int32_t (msg .TopicPartition .Partition ),
103
+ r := int (C .do_produce (c_rkt , C .int32_t (msg .TopicPartition .Partition ),
97
104
C .int (msg_flags )| C .RD_KAFKA_MSG_F_COPY ,
98
105
unsafe .Pointer (valp ), C .size_t (val_len ),
99
- unsafe .Pointer (keyp ), C .size_t (key_len ), unsafe .Pointer (cgoid )))
106
+ unsafe .Pointer (keyp ), C .size_t (key_len ),
107
+ (C .uintptr_t )(cgoid )))
100
108
if r == - 1 {
101
109
if cgoid != 0 {
102
110
p .handle .cgo_get (cgoid )
@@ -232,7 +240,7 @@ func NewProducer(conf *ConfigMap) (*Producer, error) {
232
240
p .handle .p = p
233
241
p .handle .setup ()
234
242
p .handle .rkq = C .rd_kafka_queue_get_main (p .handle .rk )
235
- p .handle .cgomap = make (map [uintptr ]cgoif )
243
+ p .handle .cgomap = make (map [int ]cgoif )
236
244
p .Events = make (chan Event , 1000000 )
237
245
p .ProduceChannel = make (chan * Message , produce_channel_size )
238
246
p .poller_term_chan = make (chan bool )
0 commit comments