Skip to content

ExtractTopic, key schema can't be null #90

@arnitolog

Description

@arnitolog

Hello,

I'm trying to use ExtractTopic transformation. It works fine when I produce messages and explicitly set the key.
But if I'm trying to set Key from the field, the connector fails with the error:
key schema can't be null

here is a working config when I explicitly set the key:
command to produce messages:
kafkactl produce test-topic --file elasticsearch/data.json --key test12

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: "my-elasticsearch.sink"
  labels:
    strimzi.io/cluster: connect-cluster
spec:
  autoRestart:
    enabled: true
  class: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
  tasksMax: 2
  config:
    client.id: "connect-my-elasticsearch-sink"
    consumer.override.client.rack: "${env::STRIMZI_RACK_ID}"
    consumer.override.auto.offset.reset: "earliest"
    topics.regex: "test-topic"
    connection.url: "http://elasticsearch.svc.cluster.local:9200"
    connection.username: "xxxx"
    connection.password: "xxxx"
    write.method: "UPSERT"
    key.ignore: "false"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    transforms: "ExtractTopicFromKey,IndexRouter"
    transforms.ExtractTopicFromKey.type: io.aiven.kafka.connect.transforms.ExtractTopic$Key
    transforms.IndexRouter.type: "org.apache.kafka.connect.transforms.RegexRouter"
    transforms.IndexRouter.regex: "(.*)"
    transforms.IndexRouter.replacement: "test_index_$1"

and here is not-working config when I'm trying to retrieve ClientId from the message and use it as Key and then as topic name
kafkactl produce test-topic --file elasticsearch/data.json --key test12
data.json content

{
  "Action": 2,
  "ClientId": "testclient",
  "RawId": 179645,
  "Message": "A message from testclient"
}

connector's config

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: "my-elasticsearch.sink"
  labels:
    strimzi.io/cluster: connect-cluster
spec:
  autoRestart:
    enabled: true
  class: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
  tasksMax: 2
  config:
    client.id: "connect-my-elasticsearch-sink"
    consumer.override.client.rack: "${env::STRIMZI_RACK_ID}"
    consumer.override.auto.offset.reset: "earliest"
    topics.regex: "test-topic"
    connection.url: "http://elasticsearch.svc.cluster.local:9200"
    connection.username: "xxxx"
    connection.password: "xxxx"
    write.method: "UPSERT"
    key.ignore: "false"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    transforms: "CreateTopicID,ExtractTopicID,ExtractTopicFromKey,IndexRouter"
    transforms.CreateTopicID.type: "org.apache.kafka.connect.transforms.ValueToKey"
    transforms.CreateTopicID.fields: "ClientId"
    transforms.ExtractTopicID.type: "org.apache.kafka.connect.transforms.ExtractField$Key"
    transforms.ExtractTopicID.field: "ClientId"
    transforms.ExtractTopicFromKey.type: io.aiven.kafka.connect.transforms.ExtractTopic$Key
    transforms.IndexRouter.type: "org.apache.kafka.connect.transforms.RegexRouter"
    transforms.IndexRouter.regex: "(.*)"
    transforms.IndexRouter.replacement: "test_index_$1"

here is the error:

org.apache.kafka.connect.errors.DataException: key schema can't be null: SinkRecord{kafkaOffset=48, timestampType=CreateTime} ConnectRecord{topic='test-topic', kafkaPartition=0, key=testclient, keySchema=null, value={Action=2, RawId=179645, Message=A message from testclient, ClientId=testclient}, valueSchema=null, timestamp=1683821808209, headers=ConnectHeaders(headers=)} 	at io.aiven.kafka.connect.transforms.ExtractTopic.apply(ExtractTopic.java:66) 	at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173) 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207) 	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149) 	at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:540) 	at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493) 	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332) 	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) 	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244) 	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) 	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 	at java.base/java.lang.Thread.run(Thread.java:833)
--

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions