Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka:0.10.1.0
- image: wurstmeister/kafka:0.10.1.1
ports: ['9092:9092']
environment:
KAFKA_BROKER_ID: '1'
Expand All @@ -18,6 +18,12 @@ jobs:
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
steps:
- checkout
- setup_remote_docker: { reusable: true, docker_layer_caching: true }
Expand All @@ -32,8 +38,8 @@ jobs:
- image: circleci/golang
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka:0.11.0.1
ports: ['9092:9092']
- image: wurstmeister/kafka:2.11-0.11.0.3
ports: ['9092:9092','9093:9093']
environment:
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
Expand All @@ -42,6 +48,13 @@ jobs:
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
steps:
- checkout
- setup_remote_docker: { reusable: true, docker_layer_caching: true }
Expand All @@ -57,7 +70,7 @@ jobs:
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka:2.11-1.1.1
ports: ['9092:9092']
ports: ['9092:9092','9093:9093']
environment:
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
Expand All @@ -66,6 +79,13 @@ jobs:
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
steps:
- checkout
- setup_remote_docker: { reusable: true, docker_layer_caching: true }
Expand All @@ -81,7 +101,7 @@ jobs:
- image: wurstmeister/zookeeper
ports: ['2181:2181']
- image: wurstmeister/kafka:2.12-2.1.0
ports: ['9092:9092']
ports: ['9092:9092','9093:9093']
environment:
KAFKA_BROKER_ID: '1'
KAFKA_CREATE_TOPICS: 'test-writer-0:3:1,test-writer-1:3:1'
Expand All @@ -90,6 +110,13 @@ jobs:
KAFKA_ADVERTISED_PORT: '9092'
KAFKA_ZOOKEEPER_CONNECT: 'localhost:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
steps:
- checkout
- setup_remote_docker: { reusable: true, docker_layer_caching: true }
Expand Down
85 changes: 85 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,3 +1242,88 @@ func (d *connDeadline) unsetConnWriteDeadline() {
d.wconn = nil
d.mutex.Unlock()
}

// saslHandshake sends the SASL handshake message. This will determine whether
// the Mechanism is supported by the cluster. If it's not, this function will
// error out with UnsupportedSASLMechanism.
//
// If the mechanism is unsupported, the handshake request will reply with the
// list of the cluster's configured mechanisms, which could potentially be used
// to facilitate negotiation. At the moment, we are not negotiating the
// mechanism as we believe that brokers are usually known to the client, and
// therefore the client should already know which mechanisms are supported.
//
// See http://kafka.apache.org/protocol.html#The_Messages_SaslHandshake
func (c *Conn) saslHandshake(mechanism string) error {
// The wire format for V0 and V1 is identical, but the version
// number will affect how the SASL authentication
// challenge/responses are sent
var resp saslHandshakeResponseV0
version := v0
if c.apiVersions[saslHandshakeRequest].MaxVersion >= 1 {
version = v1
}

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(saslHandshakeRequest, version, id, &saslHandshakeRequestV0{Mechanism: mechanism})
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (int, error) {
return (&resp).readFrom(&c.rbuf, size)
}())
},
)
if err == nil && resp.ErrorCode != 0 {
err = Error(resp.ErrorCode)
}
return err
}

// saslAuthenticate sends the SASL authenticate message. This function must
// be immediately preceded by a successful saslHandshake.
//
// See http://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate
func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
// if we sent a v1 handshake, then we must encapsulate the authentication
// request in a saslAuthenticateRequest. otherwise, we read and write raw
// bytes.
if c.apiVersions[saslHandshakeRequest].MaxVersion >= 1 {
var request = saslAuthenticateRequestV0{Data: data}
var response saslAuthenticateResponseV0

err := c.writeOperation(
func(deadline time.Time, id int32) error {
return c.writeRequest(saslAuthenticateRequest, v0, id, request)
},
func(deadline time.Time, size int) error {
return expectZeroSize(func() (remain int, err error) {
return (&response).readFrom(&c.rbuf, size)
}())
},
)
if err == nil && response.ErrorCode != 0 {
err = Error(response.ErrorCode)
}
return response.Data, err
}

// fall back to opaque bytes on the wire. the broker is expecting these if
// it just processed a v0 sasl handshake.
writeInt32(&c.wbuf, int32(len(data)))
if _, err := c.wbuf.Write(data); err != nil {
return nil, err
}
if err := c.wbuf.Flush(); err != nil {
return nil, err
}

var respLen int32
_, err := readInt32(&c.rbuf, 4, &respLen)
if err != nil {
return nil, err
}

resp, _, err := readNewBytes(&c.rbuf, int(respLen), int(respLen))
return resp, err
}
20 changes: 19 additions & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"
"time"

ktesting "github.com/segmentio/kafka-go/testing"
"golang.org/x/net/nettest"
)

Expand Down Expand Up @@ -257,7 +258,7 @@ func TestConn(t *testing.T) {
)

for _, test := range tests {
if !KafkaIsAtLeast(test.minVersion) {
if !ktesting.KafkaIsAtLeast(test.minVersion) {
t.Log("skipping " + test.scenario + " because broker is not at least version " + test.minVersion)
continue
}
Expand Down Expand Up @@ -980,6 +981,23 @@ func testBrokers(t *testing.T, conn *Conn) {
}
}

func TestUnsupportedSASLMechanism(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

conn, err := (&Dialer{
Resolver: &net.Resolver{},
}).DialContext(ctx, "tcp", "127.0.0.1:9093")
if err != nil {
t.Fatal("failed to open a new kafka connection:", err)
}
defer conn.Close()

if err := conn.saslHandshake("FOO"); err != UnsupportedSASLMechanism {
t.Errorf("Expected UnsupportedSASLMechanism but got %v", err)
}
}

const benchmarkMessageCount = 100

func BenchmarkConn(b *testing.B) {
Expand Down
82 changes: 70 additions & 12 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package kafka
import (
"context"
"crypto/tls"
"io"
"net"
"strconv"
"strings"
"time"

"github.com/segmentio/kafka-go/sasl"
)

// The Dialer type mirrors the net.Dialer API but is designed to open kafka
Expand Down Expand Up @@ -61,6 +64,10 @@ type Dialer struct {
// TLS enables Dialer to open secure connections. If nil, standard net.Conn
// will be used.
TLS *tls.Config

// SASLMechanism configures the Dialer to use SASL authentication. If nil,
// no authentication will be performed.
SASLMechanism sasl.Mechanism
}

// Dial connects to the address on the named network.
Expand Down Expand Up @@ -94,11 +101,7 @@ func (d *Dialer) DialContext(ctx context.Context, network string, address string
defer cancel()
}

c, err := d.dialContext(ctx, network, address)
if err != nil {
return nil, err
}
return NewConnWith(c, ConnConfig{ClientID: d.ClientID}), nil
return d.connect(ctx, network, address, ConnConfig{ClientID: d.ClientID})
}

// DialLeader opens a connection to the leader of the partition for a given
Expand All @@ -121,16 +124,11 @@ func (d *Dialer) DialLeader(ctx context.Context, network string, address string,
// descriptor. It's strongly advised to use descriptor of the partition that comes out of
// functions LookupPartition or LookupPartitions.
func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error) {
c, err := d.dialContext(ctx, network, net.JoinHostPort(partition.Leader.Host, strconv.Itoa(partition.Leader.Port)))
if err != nil {
return nil, err
}

return NewConnWith(c, ConnConfig{
return d.connect(ctx, network, net.JoinHostPort(partition.Leader.Host, strconv.Itoa(partition.Leader.Port)), ConnConfig{
ClientID: d.ClientID,
Topic: partition.Topic,
Partition: partition.ID,
}), nil
})
}

// LookupLeader searches for the kafka broker that is the leader of the
Expand Down Expand Up @@ -242,6 +240,66 @@ func (d *Dialer) connectTLS(ctx context.Context, conn net.Conn, config *tls.Conf
return
}

// connect opens a socket connection to the broker, wraps it to create a
// kafka connection, and performs SASL authentication if configured to do so.
func (d *Dialer) connect(ctx context.Context, network, address string, connCfg ConnConfig) (*Conn, error) {

c, err := d.dialContext(ctx, network, address)
if err != nil {
return nil, err
}

conn := NewConnWith(c, connCfg)

if d.SASLMechanism != nil {
if err := d.authenticateSASL(ctx, conn); err != nil {
_ = conn.Close()
return nil, err
}
}

return conn, nil
}

// authenticateSASL performs all of the required requests to authenticate this
// connection. If any step fails, this function returns with an error. A nil
// error indicates successful authentication.
//
// In case of error, this function *does not* close the connection. That is the
// responsibility of the caller.
func (d *Dialer) authenticateSASL(ctx context.Context, conn *Conn) error {
mech, state, err := d.SASLMechanism.Start(ctx)
if err != nil {
return err
}
err = conn.saslHandshake(mech)
if err != nil {
return err
}

var completed bool
for !completed {
challenge, err := conn.saslAuthenticate(state)
switch err {
case nil:
case io.EOF:
// the broker may communicate a failed exchange by closing the
// connection (esp. in the case where we're passing opaque sasl
// data over the wire since there's no protocol info).
return SASLAuthenticationFailed
default:
return err
}

completed, state, err = d.SASLMechanism.Next(ctx, challenge)
if err != nil {
return err
}
}

return nil
}

func (d *Dialer) dialContext(ctx context.Context, network string, address string) (net.Conn, error) {
if r := d.Resolver; r != nil {
host, port := splitHostPort(address)
Expand Down
11 changes: 9 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
version: "3"
services:
kafka:
image: wurstmeister/kafka:0.11.0.1
image: wurstmeister/kafka:2.11-0.11.0.3
restart: on-failure:3
links:
- zookeeper
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_VERSION: '0.11.0.1'
KAFKA_BROKER_ID: 1
Expand All @@ -17,7 +18,13 @@ services:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_MESSAGE_MAX_BYTES: 200000000

KAFKA_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093'
KAFKA_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512'
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf"
CUSTOM_INIT_SCRIPT: |-
echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/kafka/config/kafka_server_jaas.conf;
/opt/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]' --entity-type users --entity-name adminscram
zookeeper:
image: wurstmeister/zookeeper
ports:
Expand Down
2 changes: 2 additions & 0 deletions protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ const (
syncGroupRequest apiKey = 14
describeGroupsRequest apiKey = 15
listGroupsRequest apiKey = 16
saslHandshakeRequest apiKey = 17
apiVersionsRequest apiKey = 18
createTopicsRequest apiKey = 19
deleteTopicsRequest apiKey = 20
saslAuthenticateRequest apiKey = 36
)

type apiVersion int16
Expand Down
Loading