Open

Description
Describe the bug
&kafka.Writer then w.WriteMessages then w.Close()
leaves go routine open, leaks memory.
To verify issue you can run your code at vs code in
DEBUG MODE then check "CALL STACK" area
multi-able kakf-go(*conn).run will appear.
OR
run code in docker, check memory usage.
Kafka Version
- What version(s) of Kafka are you testing against?
confluentinc/cp-kafka:7.2.1 / 3.2-IV0- What version of kafka-go are you using?
latest release
To Reproduce
Resources to reproduce the behavior:
vscode, any
//##############################################################3
func Write(assetPair AssetPairBase, tlsConfig *tls.Config) {
var testMessage []kafka.Message
for _, value := range assetPair.Klines {
byteValue, errJSON := json.Marshal(value)
key := strconv.FormatInt(value.StartTime, 10) // s == "97" (decimal)
if errJSON != nil {
fmt.Println(errJSON)
}
testMessage = append(testMessage, kafka.Message{
Key: []byte(key),
Value: []byte(byteValue),
})
}
w := &kafka.Writer{
//AllowAutoTopicCreation: true,
Addr: kafka.TCP(Address),
Topic: assetPair.TopicName,
Balancer: &kafka.LeastBytes{},
Transport: &kafka.Transport{
TLS: tlsConfig,
},
}
// ISSUE APPEAR AFTER RUNNING THIS LINE######
err := w.WriteMessages(context.TODO(),
testMessage...,
)
if err != nil {
fmt.Println("failed to write messages")
} else {
fmt.Println("write completed")
}
// ISSUE APPERS AFTER THIS LION
if err := w.Close(); err != nil {
fmt.Println("failed to close writer")
tlsConfig = nil
//log.WithField("Reason", err.Error()).Fatal("failed to close writer")
} else {
fmt.Println("closed writer")
testMessage = nil
}
}
#################################################3
func GetTLSConfig() *tls.Config {
tlsConfig, err := NewTLSConfig("api/keys/client.cer.pem",
"api/keys/client.key.pem",
"api/keys/server.cer.pem")
if err != nil {
log.WithField("Reason", err.Error()).Fatal("connection failed")
}
// This can be used on test server if domain does not match cert:
tlsConfig.InsecureSkipVerify = true
return tlsConfig
}