Skip to content

Commit 0a27203

Browse files
committed
transforms: add FilterFields transformation
adding support for regex-based field value matching and configurable filter modes (equality and inequality). This allows filtering records based on whether a specific field value matches or does not match a given regex pattern
1 parent 7e876a0 commit 0a27203

File tree

3 files changed

+368
-0
lines changed

3 files changed

+368
-0
lines changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,19 @@ transforms=MakeTombstone
141141
transforms.MakeTombstone.type=io.aiven.kafka.connect.transforms.MakeTombstone
142142
```
143143

144+
### `FilterFields`
145+
146+
This transformation allows filtering records based on a specific field value and a given regex pattern. The filter supports two modes: equality (equals) and inequality (not_equals).
147+
148+
Here is an example of this transformation configuration:
149+
```properties
150+
transforms=Filter
151+
transforms.Filter.type=io.aiven.kafka.connect.transforms.FilterByValueRegex
152+
transforms.Filter.fieldName=<field_name>
153+
transforms.Filter.pattern=<regex_pattern>
154+
transforms.Filter.matches=<true|false>
155+
```
156+
144157
## License
145158

146159
This project is licensed under the [Apache License, Version 2.0](LICENSE).
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.function.Predicate;
23+
import java.util.regex.Matcher;
24+
import java.util.regex.Pattern;
25+
26+
import org.apache.kafka.common.config.AbstractConfig;
27+
import org.apache.kafka.common.config.ConfigDef;
28+
import org.apache.kafka.connect.connector.ConnectRecord;
29+
import org.apache.kafka.connect.data.Schema;
30+
import org.apache.kafka.connect.data.Struct;
31+
import org.apache.kafka.connect.transforms.Transformation;
32+
33+
public class FilterByValueRegex<R extends ConnectRecord<R>> implements Transformation<R> {
34+
35+
private String fieldName;
36+
private String pattern;
37+
private Pattern fieldValuePattern;
38+
private boolean matches;
39+
private Predicate<Boolean> recordFilterCondition;
40+
41+
public FilterByValueRegex() {
42+
this.fieldName = "";
43+
this.pattern = "";
44+
this.fieldValuePattern = Pattern.compile("");
45+
this.matches = true;
46+
}
47+
48+
@Override
49+
public R apply(final R record) {
50+
if (record.value() instanceof Struct) {
51+
return handleStruct(record);
52+
} else if (record.value() instanceof Map) {
53+
return handleMap(record);
54+
} else if (record.value() instanceof List || record.value().getClass().isArray()) {
55+
return handleListOrArray(record);
56+
}
57+
return null;
58+
}
59+
60+
private R handleStruct(final R record) {
61+
final Struct value = (Struct) record.value();
62+
final Object fieldValueObj = value.get(fieldName);
63+
64+
if (fieldValueObj instanceof List || fieldValueObj instanceof Object[]) {
65+
if (handleArrayOrListField(fieldValueObj)) {
66+
return record;
67+
}
68+
} else {
69+
return checkAndReturnRecord(value.schema().field(fieldName).schema(), fieldValueObj, record);
70+
}
71+
72+
return null;
73+
}
74+
75+
private R handleMap(final R record) {
76+
final Map<String, Object> value = (Map<String, Object>) record.value();
77+
final Object fieldValueObj = value.get(fieldName);
78+
79+
return checkAndReturnRecord(null, fieldValueObj, record);
80+
}
81+
82+
private R handleListOrArray(final R record) {
83+
if (handleArrayOrListField(record.value())) {
84+
return record;
85+
}
86+
return null;
87+
}
88+
89+
private R checkAndReturnRecord(final Schema schema, final Object fieldValueObj, final R record) {
90+
if (fieldValueObj != null) {
91+
final String fieldValueStr = schema != null
92+
? convertToString(schema, fieldValueObj) : fieldValueObj.toString();
93+
94+
if (fieldValueStr != null) {
95+
final Matcher matcher = fieldValuePattern.matcher(fieldValueStr);
96+
final boolean matches = matcher.matches();
97+
98+
if (recordFilterCondition.test(matches)) {
99+
return record;
100+
}
101+
}
102+
}
103+
return null;
104+
}
105+
106+
private String convertToString(final Schema schema, final Object fieldValueObj) {
107+
if (schema.type() == Schema.Type.STRING) {
108+
return (String) fieldValueObj;
109+
} else if (schema.type().isPrimitive()) {
110+
return fieldValueObj.toString();
111+
}
112+
return null;
113+
}
114+
115+
private boolean handleArrayOrListField(final Object fieldValueObj) {
116+
final List<?> valueList = fieldValueObj
117+
instanceof List ? (List<?>) fieldValueObj : Arrays.asList((Object[]) fieldValueObj);
118+
boolean foundMatchingFieldValue = false;
119+
120+
for (final Object value : valueList) {
121+
final String fieldValueStr = value.toString();
122+
final Matcher matcher = fieldValuePattern.matcher(fieldValueStr);
123+
final boolean matches = matcher.matches();
124+
125+
if (recordFilterCondition.test(matches)) {
126+
foundMatchingFieldValue = true;
127+
break;
128+
}
129+
}
130+
131+
return foundMatchingFieldValue;
132+
}
133+
134+
@Override
135+
public ConfigDef config() {
136+
return new ConfigDef()
137+
.define("fieldName",
138+
ConfigDef.Type.STRING,
139+
ConfigDef.Importance.HIGH, "The field name to filter by")
140+
.define("pattern",
141+
ConfigDef.Type.STRING,
142+
ConfigDef.Importance.HIGH, "The pattern to match")
143+
.define("matches",
144+
ConfigDef.Type.BOOLEAN, true,
145+
ConfigDef.Importance.MEDIUM, "The filter mode, 'true' for matching or 'false' for non-matching");
146+
}
147+
148+
@Override
149+
public void close() {
150+
}
151+
152+
@Override
153+
public void configure(final Map<String, ?> configs) {
154+
final AbstractConfig config = new AbstractConfig(config(), configs);
155+
this.fieldName = config.getString("fieldName");
156+
this.pattern = config.getString("pattern");
157+
this.fieldValuePattern = Pattern.compile(this.pattern);
158+
this.matches = config.getBoolean("matches");
159+
recordFilterCondition = this.matches
160+
? (result -> result)
161+
: (result -> !result);
162+
}
163+
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Copyright 2023 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.transforms;
18+
19+
import java.util.Arrays;
20+
import java.util.HashMap;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import org.apache.kafka.connect.data.Schema;
25+
import org.apache.kafka.connect.data.SchemaBuilder;
26+
import org.apache.kafka.connect.data.Struct;
27+
import org.apache.kafka.connect.source.SourceRecord;
28+
29+
import org.junit.jupiter.api.Test;
30+
31+
import static org.junit.jupiter.api.Assertions.*;
32+
33+
class FilterByValueRegexTest {
34+
35+
private static final Schema VALUE_SCHEMA = SchemaBuilder.struct()
36+
.field("before", Schema.OPTIONAL_STRING_SCHEMA)
37+
.field("after", SchemaBuilder.struct()
38+
.field("pk", Schema.STRING_SCHEMA)
39+
.field("value", Schema.STRING_SCHEMA)
40+
.build())
41+
.field("source", SchemaBuilder.struct().optional())
42+
.field("op", Schema.STRING_SCHEMA)
43+
.field("ts_ms", Schema.STRING_SCHEMA)
44+
.field("transaction", Schema.OPTIONAL_STRING_SCHEMA)
45+
.build();
46+
47+
@Test
48+
void shouldFilterOutRecordsEqualsToReadEvents() {
49+
final FilterByValueRegex<SourceRecord> filter = new FilterByValueRegex<>();
50+
filter.configure(Map.of(
51+
"fieldName", "op",
52+
"pattern", "r",
53+
"matches", "false"
54+
));
55+
56+
final Struct after = new Struct(VALUE_SCHEMA.field("after").schema())
57+
.put("pk", "1")
58+
.put("value", "New data");
59+
60+
final Struct value = new Struct(VALUE_SCHEMA)
61+
.put("before", null)
62+
.put("after", after)
63+
.put("source", null)
64+
.put("op", "r")
65+
.put("ts_ms", "1620393591654")
66+
.put("transaction", null);
67+
68+
final var record = new SourceRecord(null, null, "some_topic", VALUE_SCHEMA, value);
69+
70+
final var actual = filter.apply(record);
71+
assertNull(actual, "Record with op 'r' should be filtered out");
72+
}
73+
74+
@Test
75+
void shouldKeepRecordsNotEqualsToReadEvents() {
76+
final FilterByValueRegex<SourceRecord> filter = new FilterByValueRegex<>();
77+
filter.configure(Map.of(
78+
"fieldName", "op",
79+
"pattern", "r",
80+
"matches", "false"
81+
));
82+
83+
final Struct after = new Struct(VALUE_SCHEMA.field("after").schema())
84+
.put("pk", "1")
85+
.put("value", "New data");
86+
87+
final Struct value = new Struct(VALUE_SCHEMA)
88+
.put("before", null)
89+
.put("after", after)
90+
.put("source", null)
91+
.put("op", "u")
92+
.put("ts_ms", "1620393591654")
93+
.put("transaction", null);
94+
95+
final var record = new SourceRecord(null, null, "some_topic", VALUE_SCHEMA, value);
96+
97+
final var actual = filter.apply(record);
98+
assertEquals(record, actual, "Record with op not equal to 'r' should be kept");
99+
}
100+
101+
@Test
102+
void shouldFilterOutRecordsNotEqualsReadEvents() {
103+
final FilterByValueRegex<SourceRecord> filter = new FilterByValueRegex<>();
104+
filter.configure(Map.of(
105+
"fieldName", "op",
106+
"pattern", "r",
107+
"matches", "true"
108+
));
109+
110+
final Struct after = new Struct(VALUE_SCHEMA.field("after").schema())
111+
.put("pk", "1")
112+
.put("value", "New data");
113+
114+
final Struct value = new Struct(VALUE_SCHEMA)
115+
.put("before", null)
116+
.put("after", after)
117+
.put("source", null)
118+
.put("op", "u")
119+
.put("ts_ms", "1620393591654")
120+
.put("transaction", null);
121+
122+
final var record = new SourceRecord(null, null, "some_topic", VALUE_SCHEMA, value);
123+
124+
final var actual = filter.apply(record);
125+
assertNull(actual, "Record with op not equal to 'r' should be filtered out");
126+
}
127+
128+
@Test
129+
void shouldFilterMatchingArrayFieldValue() {
130+
final FilterByValueRegex<SourceRecord> filterByValueRegex = new FilterByValueRegex<>();
131+
final Map<String, String> configs = new HashMap<>();
132+
configs.put("fieldName", "tags");
133+
configs.put("pattern", ".*apple.*");
134+
configs.put("matches", "true");
135+
filterByValueRegex.configure(configs);
136+
137+
Schema schema = SchemaBuilder.struct()
138+
.field("name", Schema.STRING_SCHEMA)
139+
.field("tags", SchemaBuilder.array(Schema.STRING_SCHEMA))
140+
.build();
141+
List<String> tags = Arrays.asList("apple", "orange", "mango");
142+
Struct value = new Struct(schema)
143+
.put("name", "John Doe")
144+
.put("tags", tags);
145+
146+
final var record = new SourceRecord(null, null, "some_topic", schema, value);
147+
148+
final var actual = filterByValueRegex.apply(record);
149+
assertEquals(record, actual, "The record contains the matching pattern");
150+
}
151+
152+
@Test
153+
void shouldFilterOutMapFieldValue() {
154+
final FilterByValueRegex<SourceRecord> filterByValueRegex = new FilterByValueRegex<>();
155+
final Map<String, String> configs = new HashMap<>();
156+
configs.put("fieldName", "language");
157+
configs.put("pattern", ".*Java.*");
158+
configs.put("matches", "false");
159+
filterByValueRegex.configure(configs);
160+
161+
Map<String, Object> value = new HashMap<>();
162+
value.put("name", "John Doe");
163+
value.put("language", "Java");
164+
165+
final var record = new SourceRecord(null, null, "some_topic", Schema.STRING_SCHEMA, value);
166+
167+
final var actual = filterByValueRegex.apply(record);
168+
assertNull(actual, "The record should be filtered out");
169+
}
170+
171+
@Test
172+
void shouldFilterArrayFieldValue() {
173+
final FilterByValueRegex<SourceRecord> filterByValueRegex = new FilterByValueRegex<>();
174+
final Map<String, String> configs = new HashMap<>();
175+
configs.put("fieldName", "tags");
176+
configs.put("pattern", ".*apple.*");
177+
configs.put("matches", "true");
178+
filterByValueRegex.configure(configs);
179+
180+
// Test with a list
181+
List<String> tagsList = Arrays.asList("apple", "orange", "mango");
182+
final var listRecord = new SourceRecord(null, null, "some_topic", null, tagsList);
183+
final var listActual = filterByValueRegex.apply(listRecord);
184+
assertNotNull(listActual, "The record should not be filtered out and return the record itself");
185+
186+
// Test with an array
187+
String[] tagsArray = {"apple", "orange", "mango"};
188+
final var arrayRecord = new SourceRecord(null, null, "some_topic", null, tagsArray);
189+
final var arrayActual = filterByValueRegex.apply(arrayRecord);
190+
assertNotNull(arrayActual, "The record should not be filtered out and return the record itself");
191+
}
192+
}

0 commit comments

Comments
 (0)