Skip to content

Commit 9e6871b

Browse files
committed
transforms: add FilterFields transformation
adding support for regex-based field value matching and configurable filter modes (equality and inequality). This allows filtering records based on whether a specific field value matches or does not match a given regex pattern
1 parent 7e876a0 commit 9e6871b

File tree

3 files changed

+230
-0
lines changed

3 files changed

+230
-0
lines changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,19 @@ transforms=MakeTombstone
141141
transforms.MakeTombstone.type=io.aiven.kafka.connect.transforms.MakeTombstone
142142
```
143143

144+
### `FilterFields`
145+
146+
This transformation allows filtering records based on a specific field value and a given regex pattern. The filter supports two modes: equality (equals) and inequality (not_equals).
147+
148+
Here is an example of this transformation configuration:
149+
```properties
150+
transforms=Filter
151+
transforms.Filter.type=io.aiven.kafka.connect.transforms.FilterFields
152+
transforms.Filter.fieldName=<field_name>
153+
transforms.Filter.fieldValue=<regex_pattern>
154+
transforms.Filter.filterMode=<equals|not_equals>
155+
```
156+
144157
## License
145158

146159
This project is licensed under the [Apache License, Version 2.0](LICENSE).
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Map;
20+
import java.util.regex.Matcher;
21+
import java.util.regex.Pattern;
22+
23+
import org.apache.kafka.common.config.AbstractConfig;
24+
import org.apache.kafka.common.config.ConfigDef;
25+
import org.apache.kafka.connect.connector.ConnectRecord;
26+
import org.apache.kafka.connect.data.Struct;
27+
import org.apache.kafka.connect.transforms.Transformation;
28+
29+
public class FilterFields<R extends ConnectRecord<R>> implements Transformation<R> {
30+
31+
private String fieldName;
32+
private String fieldValue;
33+
private Pattern fieldValuePattern;
34+
private String filterMode;
35+
36+
public FilterFields() {
37+
this.fieldName = "";
38+
this.fieldValue = "";
39+
this.fieldValuePattern = Pattern.compile("");
40+
this.filterMode = "equals";
41+
}
42+
43+
@Override
44+
public R apply(final R record) {
45+
46+
if (!(record.value() instanceof Struct)) {
47+
return record;
48+
}
49+
50+
final Struct value = (Struct) record.value();
51+
final Object fieldValueObj = value.get(fieldName);
52+
53+
if (fieldValueObj != null) {
54+
final Matcher matcher = fieldValuePattern.matcher(fieldValueObj.toString());
55+
final boolean matches = matcher.matches();
56+
57+
if (("equals".equalsIgnoreCase(filterMode) && matches)
58+
|| ("not_equals".equalsIgnoreCase(filterMode) && !matches)) {
59+
return record;
60+
}
61+
}
62+
63+
return null;
64+
}
65+
66+
@Override
67+
public ConfigDef config() {
68+
return new ConfigDef()
69+
.define("fieldName",
70+
ConfigDef.Type.STRING,
71+
ConfigDef.Importance.HIGH, "The field name to filter by")
72+
.define("fieldValue",
73+
ConfigDef.Type.STRING,
74+
ConfigDef.Importance.HIGH, "The field value to match")
75+
.define("filterMode",
76+
ConfigDef.Type.STRING, "equals",
77+
ConfigDef.Importance.MEDIUM, "The filter mode, 'equals' or 'not_equals'");
78+
}
79+
80+
@Override
81+
public void close() {
82+
}
83+
84+
@Override
85+
public void configure(final Map<String, ?> configs) {
86+
final AbstractConfig config = new AbstractConfig(config(), configs);
87+
this.fieldName = config.getString("fieldName");
88+
this.fieldValue = config.getString("fieldValue");
89+
this.fieldValuePattern = Pattern.compile(this.fieldValue);
90+
this.filterMode = config.getString("filterMode");
91+
}
92+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Map;
20+
21+
import org.apache.kafka.connect.data.Schema;
22+
import org.apache.kafka.connect.data.SchemaBuilder;
23+
import org.apache.kafka.connect.data.Struct;
24+
import org.apache.kafka.connect.source.SourceRecord;
25+
26+
import org.junit.jupiter.api.Test;
27+
28+
import static org.junit.jupiter.api.Assertions.assertEquals;
29+
import static org.junit.jupiter.api.Assertions.assertNull;
30+
31+
class FilterFieldsTest {
32+
33+
private static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
34+
.field("before", Schema.OPTIONAL_STRING_SCHEMA)
35+
.field("after", SchemaBuilder.struct()
36+
.field("pk", Schema.STRING_SCHEMA)
37+
.field("value", Schema.STRING_SCHEMA)
38+
.build())
39+
.field("source", SchemaBuilder.struct().optional())
40+
.field("op", Schema.STRING_SCHEMA)
41+
.field("ts_ms", Schema.STRING_SCHEMA)
42+
.field("transaction", Schema.OPTIONAL_STRING_SCHEMA)
43+
.build();
44+
45+
@Test
46+
void shouldFilterOutRecordsEqualsToReadEvents() {
47+
final FilterFields<SourceRecord> filter = new FilterFields<>();
48+
filter.configure(Map.of(
49+
"fieldName", "op",
50+
"fieldValue", "r",
51+
"filterMode", "not_equals"
52+
));
53+
54+
final Struct after = new Struct(VALUE_SCHEMA.field("after").schema())
55+
.put("pk", "1")
56+
.put("value", "New data");
57+
58+
final Struct value = new Struct(VALUE_SCHEMA)
59+
.put("before", null)
60+
.put("after", after)
61+
.put("source", null)
62+
.put("op", "r")
63+
.put("ts_ms", "1620393591654")
64+
.put("transaction", null);
65+
66+
final var record = new SourceRecord(null, null, "some_topic", VALUE_SCHEMA, value);
67+
68+
final var actual = filter.apply(record);
69+
assertNull(actual, "Record with op 'r' should be filtered out");
70+
}
71+
72+
@Test
73+
void shouldKeepRecordsNotEqualsToReadEvents() {
74+
final FilterFields<SourceRecord> filter = new FilterFields<>();
75+
filter.configure(Map.of(
76+
"fieldName", "op",
77+
"fieldValue", "r",
78+
"filterMode", "not_equals"
79+
));
80+
81+
final Struct after = new Struct(VALUE_SCHEMA.field("after").schema())
82+
.put("pk", "1")
83+
.put("value", "New data");
84+
85+
final Struct value = new Struct(VALUE_SCHEMA)
86+
.put("before", null)
87+
.put("after", after)
88+
.put("source", null)
89+
.put("op", "u")
90+
.put("ts_ms", "1620393591654")
91+
.put("transaction", null);
92+
93+
final var record = new SourceRecord(null, null, "some_topic", VALUE_SCHEMA, value);
94+
95+
final var actual = filter.apply(record);
96+
assertEquals(record, actual, "Record with op not equal to 'r' should be kept");
97+
}
98+
99+
@Test
100+
void shouldFilterOutRecordsNotEqualsReadEvents() {
101+
final FilterFields<SourceRecord> filter = new FilterFields<>();
102+
filter.configure(Map.of(
103+
"fieldName", "op",
104+
"fieldValue", "r",
105+
"filterMode", "equals"
106+
));
107+
108+
final Struct after = new Struct(VALUE_SCHEMA.field("after").schema())
109+
.put("pk", "1")
110+
.put("value", "New data");
111+
112+
final Struct value = new Struct(VALUE_SCHEMA)
113+
.put("before", null)
114+
.put("after", after)
115+
.put("source", null)
116+
.put("op", "u")
117+
.put("ts_ms", "1620393591654")
118+
.put("transaction", null);
119+
120+
final var record = new SourceRecord(null, null, "some_topic", VALUE_SCHEMA, value);
121+
122+
final var actual = filter.apply(record);
123+
assertNull(actual, "Record with op not equal to 'r' should be filtered out");
124+
}
125+
}

0 commit comments

Comments
 (0)