Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ A collection of [Single Message Transformations (SMTs)](https://kafka.apache.org

## Transformations

See [the Kafka documentation](https://kafka.apache.org/documentation/#connect_transforms) for more details about configuring transformations.
See [the Kafka documentation](https://kafka.apache.org/documentation/#connect_transforms) for more details about configuring transformations
or [demo](./demo) on how to install transforms.

### `ExtractTimestamp`

Expand Down
10 changes: 10 additions & 0 deletions demo/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM eclipse-temurin:11-jdk AS base

ADD ./ transforms
WORKDIR transforms

RUN ./gradlew installDist

FROM confluentinc/cp-kafka-connect:7.3.3

COPY --from=base /transforms/build/install/transforms-for-apache-kafka-connect /transforms
11 changes: 11 additions & 0 deletions demo/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
all: build

build:
docker compose build

up:
docker compose up -d

test:
curl -sSL 'http://localhost:8083/connector-plugins?connectorsOnly=false' \
| jq '.[] | select(.type=="transformation") | select (.class | startswith("io.aiven"))'
116 changes: 116 additions & 0 deletions demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Demo

How to load plugins to Kafka Connect installations.

## Requirements

To run the demo, you need:
- Docker compose
- `curl`
- `jq`

## Demo

To install transforms into a Kafka Connect installation, it needs:

- Build `transforms-for-apache-kafka-connect` libraries
- Add libraries to Kafka Connect nodes
- Configure Kafka Connect `plugin.path`

### Build `transforms-for-apache-kafka-connect` libraries

To build libraries, use `gradlew installDist` command.
e.g. Dockerfile build:

```dockerfile
FROM eclipse-temurin:11-jdk AS base

ADD ./ transforms
WORKDIR transforms

RUN ./gradlew installDist
```

This generates the set of libraries to be installed in Kafka Connect workers.

### Add libraries to Kafka Connect nodes

Copy the directory with libraries into your Kafka Connect nodes.
e.g. add directory to Docker images:

```dockerfile
FROM confluentinc/cp-kafka-connect:7.3.3

COPY --from=base /transforms/build/install/transforms-for-apache-kafka-connect /transforms
```

### Configure Kafka Connect `plugin.path`

On Kafka Connect configuration file, set `plugin.path` to indicate where to load plugins from,
e.g. with Docker compose:

```yaml
connect:
# ...
environment:
# ...
CONNECT_PLUGIN_PATH: /usr/share/java,/transforms # /transforms added on Dockerfile build
```

## Running

1. Build docker images: `make build` or `docker compose build`
2. Start environment: `make up` or `docker compose up -d`
3. Test connect plugins are loaded: `make test`

Sample response:
```json lines
{
"class": "io.aiven.kafka.connect.transforms.ConcatFields$Key",
"type": "transformation"
}
{
"class": "io.aiven.kafka.connect.transforms.ConcatFields$Value",
"type": "transformation"
}
{
"class": "io.aiven.kafka.connect.transforms.ExtractTimestamp$Key",
"type": "transformation"
}
{
"class": "io.aiven.kafka.connect.transforms.ExtractTimestamp$Value",
"type": "transformation"
}
{
"class": "io.aiven.kafka.connect.transforms.ExtractTopic$Key",
"type": "transformation"
}
{
"class": "io.aiven.kafka.connect.transforms.ExtractTopic$Value",
"type": "transformation"
}
{
"class": "io.aiven.kafka.connect.transforms.FilterByFieldValue$Key",
"type": "transformation"
}
{
"class": "io.aiven.kafka.connect.transforms.FilterByFieldValue$Value",
"type": "transformation"
}
{
"class": "io.aiven.kafka.connect.transforms.Hash$Key",
"type": "transformation"
}
{
"class": "io.aiven.kafka.connect.transforms.Hash$Value",
"type": "transformation"
}
{
"class": "io.aiven.kafka.connect.transforms.MakeTombstone",
"type": "transformation"
}
{
"class": "io.aiven.kafka.connect.transforms.TombstoneHandler",
"type": "transformation"
}
```
45 changes: 45 additions & 0 deletions demo/compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
version: '3.8'
services:
zookeeper:
image: "confluentinc/cp-zookeeper:7.3.3"
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: "confluentinc/cp-kafka:7.3.3"
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

connect:
build:
context: ../.
dockerfile: demo/Dockerfile
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:19092
CONNECT_REST_ADVERTISED_HOST_NAME: localhost
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: connect-demo
CONNECT_CONFIG_STORAGE_TOPIC: connect-demo-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: connect-demo-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: connect-demo-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1

CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_KEY_CONVERTER_SCHEMA_ENABLE: "false"
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_PLUGIN_PATH: /usr/share/java,/transforms # /transforms added on Dockerfile build