diff --git a/README.md b/README.md index 4164db2..209cfe8 100644 --- a/README.md +++ b/README.md @@ -129,6 +129,18 @@ transforms.ConcatFields.delimiter="-" transforms.ConcatFields.field.replace.missing="*" ``` +### `MakeTombstone` + +This transformation converts a record into a tombstone by setting its value and value schema to `null`. + +It can be used together with predicates, for example, to create a tombstone event from a delete event produced by a source connector. + +Here is an example of this transformation configuration: +```properties +transforms=MakeTombstone +transforms.MakeTombstone.type=io.aiven.kafka.connect.transforms.MakeTombstone +``` + ## License This project is licensed under the [Apache License, Version 2.0](LICENSE). diff --git a/src/main/java/io/aiven/kafka/connect/transforms/MakeTombstone.java b/src/main/java/io/aiven/kafka/connect/transforms/MakeTombstone.java new file mode 100644 index 0000000..078db8f --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/transforms/MakeTombstone.java @@ -0,0 +1,55 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.Transformation; + +public class MakeTombstone> implements Transformation { + + @Override + public R apply(final R record) { + return record.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + null, + null, + record.timestamp(), + record.headers() + ); + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + + @Override + public void close() { + + } + + @Override + public void configure(final Map configs) { + + } +} diff --git a/src/test/java/io/aiven/kafka/connect/transforms/MakeTombstoneTest.java b/src/test/java/io/aiven/kafka/connect/transforms/MakeTombstoneTest.java new file mode 100644 index 0000000..3238785 --- /dev/null +++ b/src/test/java/io/aiven/kafka/connect/transforms/MakeTombstoneTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.transforms; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceRecord; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNull; + +class MakeTombstoneTest { + + @Test + void shouldMakeTombstone() { + final MakeTombstone makeTombstone = new MakeTombstone<>(); + final var record = new SourceRecord(null, null, "some_topic", Schema.STRING_SCHEMA, "dummy value"); + + final var actual = makeTombstone.apply(record); + assertNull(actual.valueSchema(), "value schema should be null"); + assertNull(actual.value(), "value should be null"); + } +}