1616
1717package io .aiven .kafka .connect .transforms ;
1818
19- import java .util .Arrays ;
20- import java .util .HashSet ;
2119import java .util .Map ;
2220import java .util .Optional ;
23- import java .util .Set ;
2421import java .util .function .Predicate ;
2522
2623import org .apache .kafka .common .config .AbstractConfig ;
3027import org .apache .kafka .connect .data .Field ;
3128import org .apache .kafka .connect .data .Schema ;
3229import org .apache .kafka .connect .data .Struct ;
30+ import org .apache .kafka .connect .data .Values ;
3331import org .apache .kafka .connect .transforms .Transformation ;
3432
35- import static org .apache .kafka .connect .data .Schema .Type .STRING ;
36-
3733public abstract class FilterByFieldValue <R extends ConnectRecord <R >> implements Transformation <R > {
3834
3935 private String fieldName ;
4036 private Optional <String > fieldExpectedValue ;
4137 private Optional <String > fieldValuePattern ;
42- private Predicate <Optional < String > > filterCondition ;
38+ private Predicate <String > filterCondition ;
4339
4440 @ Override
4541 public R apply (final R record ) {
@@ -61,32 +57,25 @@ public R apply(final R record) {
6157
6258 private R applyWithSchema (final R record ) {
6359 final Struct struct = (Struct ) operatingValue (record );
64- final Optional < String > fieldValue = getStructFieldValue (struct , fieldName );
60+ final String fieldValue = getStructFieldValue (struct , fieldName ). orElse ( null );
6561 return filterCondition .test (fieldValue ) ? record : null ;
6662 }
6763
6864 private Optional <String > getStructFieldValue (final Struct struct , final String fieldName ) {
6965 final Schema schema = struct .schema ();
7066 final Field field = schema .field (fieldName );
7167 final Object fieldValue = struct .get (field );
72-
73- Optional <String > text = Optional .empty ();
74- if (STRING .equals (field .schema ().type ())) {
75- text = Optional .of ((String ) fieldValue );
76- } else if (schema .type ().isPrimitive ()) {
77- text = Optional .of (fieldValue .toString ());
78- }
79- return text ;
68+ return Optional .ofNullable (Values .convertToString (field .schema (), fieldValue ));
8069 }
8170
8271 @ SuppressWarnings ("unchecked" )
8372 private R applySchemaless (final R record ) {
8473 if (fieldName == null || fieldName .isEmpty ()) {
85- final Optional < String > value = getSchemalessFieldValue (operatingValue (record ));
74+ final String value = getSchemalessFieldValue (operatingValue (record )). orElse ( null );
8675 return filterCondition .test (value ) ? record : null ;
8776 } else {
8877 final Map <String , Object > map = (Map <String , Object >) operatingValue (record );
89- final Optional < String > fieldValue = getSchemalessFieldValue (map .get (fieldName ));
78+ final String fieldValue = getSchemalessFieldValue (map .get (fieldName )). orElse ( null );
9079 return filterCondition .test (fieldValue ) ? record : null ;
9180 }
9281 }
@@ -95,49 +84,34 @@ private Optional<String> getSchemalessFieldValue(final Object fieldValue) {
9584 if (fieldValue == null ) {
9685 return Optional .empty ();
9786 }
98- Optional <String > text = Optional .empty ();
99- if (isSupportedType (fieldValue )) {
100- text = Optional .of (fieldValue .toString ());
101- }
102- return text ;
103- }
104-
105- private boolean isSupportedType (final Object fieldValue ) {
106- final Set <Class <?>> supportedTypes = new HashSet <>(
107- Arrays .asList (
108- String .class , Long .class , Integer .class , Short .class ,
109- Double .class , Float .class , Boolean .class
110- )
111- );
112-
113- return supportedTypes .contains (fieldValue .getClass ());
87+ return Optional .ofNullable (Values .convertToString (null , fieldValue ));
11488 }
11589
11690 @ Override
11791 public ConfigDef config () {
11892 return new ConfigDef ()
119- .define ("field.name" ,
120- ConfigDef .Type .STRING ,
121- null ,
122- ConfigDef .Importance .HIGH ,
123- "The field name to filter by."
124- + "Schema-based records (Avro), schemaless (e.g. JSON), and raw values are supported."
125- + "If empty, the whole key/value record will be filtered." )
126- .define ("field.value" ,
127- ConfigDef .Type .STRING ,
128- null ,
129- ConfigDef .Importance .HIGH ,
130- "Expected value to match. Either define this, or a regex pattern" )
131- .define ("field.value.pattern" ,
132- ConfigDef .Type .STRING ,
133- null ,
134- ConfigDef .Importance .HIGH ,
135- "The pattern to match. Either define this, or an expected value" )
136- .define ("field.value.matches" ,
137- ConfigDef .Type .BOOLEAN ,
138- true ,
139- ConfigDef .Importance .MEDIUM ,
140- "The filter mode, 'true' for matching or 'false' for non-matching" );
93+ .define ("field.name" ,
94+ ConfigDef .Type .STRING ,
95+ null ,
96+ ConfigDef .Importance .HIGH ,
97+ "The field name to filter by." +
98+ "Schema-based records (Avro), schemaless (e.g. JSON), and raw values are supported." +
99+ "If empty, the whole key/value record will be filtered." )
100+ .define ("field.value" ,
101+ ConfigDef .Type .STRING ,
102+ null ,
103+ ConfigDef .Importance .HIGH ,
104+ "Expected value to match. Either define this, or a regex pattern" )
105+ .define ("field.value.pattern" ,
106+ ConfigDef .Type .STRING ,
107+ null ,
108+ ConfigDef .Importance .HIGH ,
109+ "The pattern to match. Either define this, or an expected value" )
110+ .define ("field.value.matches" ,
111+ ConfigDef .Type .BOOLEAN ,
112+ true ,
113+ ConfigDef .Importance .MEDIUM ,
114+ "The filter mode, 'true' for matching or 'false' for non-matching" );
141115 }
142116
143117 @ Override
@@ -152,44 +126,48 @@ public void configure(final Map<String, ?> configs) {
152126 this .fieldValuePattern = Optional .ofNullable (config .getString ("field.value.pattern" ));
153127 final boolean expectedValuePresent = fieldExpectedValue .map (s -> !s .isEmpty ()).orElse (false );
154128 final boolean regexPatternPresent = fieldValuePattern .map (s -> !s .isEmpty ()).orElse (false );
155- if ((expectedValuePresent && regexPatternPresent )
156- || (!expectedValuePresent && !regexPatternPresent )) {
129+ if (expectedValuePresent == regexPatternPresent ) {
157130 throw new ConfigException (
158- "Either field.value or field.value.pattern have to be set to apply filter transform" );
131+ "Either field.value or field.value.pattern have to be set to apply filter transform" );
159132 }
160- final Predicate <Optional <String >> matchCondition = fieldValue -> fieldValue
161- .filter (value -> expectedValuePresent
162- ? fieldExpectedValue .get ().equals (value )
163- : value .matches (fieldValuePattern .get ()))
164- .isPresent ();
133+ final Predicate <String > matchCondition ;
134+
135+ if (expectedValuePresent ) {
136+ final String expectedValue = fieldExpectedValue .get ();
137+ matchCondition = fieldValue -> fieldValue != null && fieldValue .equals (expectedValue );
138+ } else {
139+ final String pattern = fieldValuePattern .get ();
140+ matchCondition = fieldValue -> fieldValue != null && fieldValue .matches (pattern );
141+ }
142+
165143 this .filterCondition = config .getBoolean ("field.value.matches" )
166- ? matchCondition
167- : (result -> !matchCondition .test (result ));
144+ ? matchCondition
145+ : (result -> !matchCondition .test (result ));
168146 }
169147
170148
171149 public static final class Key <R extends ConnectRecord <R >> extends FilterByFieldValue <R > {
172150
173151 @ Override
174- protected Schema operatingSchema (final R record ) {
152+ protected Schema operatingSchema (R record ) {
175153 return record .keySchema ();
176154 }
177155
178156 @ Override
179- protected Object operatingValue (final R record ) {
157+ protected Object operatingValue (R record ) {
180158 return record .key ();
181159 }
182160 }
183161
184162 public static final class Value <R extends ConnectRecord <R >> extends FilterByFieldValue <R > {
185163
186164 @ Override
187- protected Schema operatingSchema (final R record ) {
165+ protected Schema operatingSchema (R record ) {
188166 return record .valueSchema ();
189167 }
190168
191169 @ Override
192- protected Object operatingValue (final R record ) {
170+ protected Object operatingValue (R record ) {
193171 return record .value ();
194172 }
195173 }
0 commit comments