1616
1717package io .aiven .kafka .connect .transforms ;
1818
19+ import java .util .Arrays ;
20+ import java .util .HashSet ;
1921import java .util .Map ;
2022import java .util .Optional ;
23+ import java .util .Set ;
2124import java .util .function .Predicate ;
2225
2326import org .apache .kafka .common .config .AbstractConfig ;
@@ -49,12 +52,12 @@ public R apply(final R record) {
4952 }
5053
5154 private R handleStruct (final R record ) {
52- Struct struct = (Struct ) record .value ();
55+ final Struct struct = (Struct ) record .value ();
5356 final Optional <String > fieldValue = extractStructFieldValue (struct , fieldName );
5457 return filterCondition .test (fieldValue ) ? record : null ;
5558 }
5659
57- private Optional <String > extractStructFieldValue (Struct struct , String fieldName ) {
60+ private Optional <String > extractStructFieldValue (final Struct struct , final String fieldName ) {
5861 final Schema schema = struct .schema ();
5962 final Field field = schema .field (fieldName );
6063 final Object fieldValue = struct .get (field );
@@ -75,39 +78,50 @@ private R handleMap(final R record) {
7578 return filterCondition .test (fieldValue ) ? record : null ;
7679 }
7780
78- private Optional <String > extractMapFieldValue (Map <String , Object > map , String fieldName ) {
79- if (!map .containsKey (fieldName )) return Optional .empty ();
81+ private Optional <String > extractMapFieldValue (final Map <String , Object > map , final String fieldName ) {
82+ if (!map .containsKey (fieldName )) {
83+ return Optional .empty ();
84+ }
8085
8186 final Object fieldValue = map .get (fieldName );
8287
8388 Optional <String > text = Optional .empty ();
84- if (fieldValue instanceof String
85- || fieldValue instanceof Long
86- || fieldValue instanceof Integer
87- || fieldValue instanceof Short
88- || fieldValue instanceof Double
89- || fieldValue instanceof Float
90- || fieldValue instanceof Boolean ) {
89+ if (isSupportedType (fieldValue )) {
9190 text = Optional .of (fieldValue .toString ());
9291 }
9392 return text ;
9493 }
9594
95+ private boolean isSupportedType (final Object fieldValue ) {
96+ Set <Class <?>> supportedTypes = new HashSet <>(
97+ Arrays .asList (
98+ String .class , Long .class , Integer .class , Short .class ,
99+ Double .class , Float .class , Boolean .class
100+ )
101+ );
102+
103+ return supportedTypes .contains (fieldValue .getClass ());
104+ }
105+
96106 @ Override
97107 public ConfigDef config () {
98108 return new ConfigDef ()
99109 .define ("field.name" ,
100110 ConfigDef .Type .STRING ,
101- ConfigDef .Importance .HIGH , "The field name to filter by" )
111+ ConfigDef .Importance .HIGH ,
112+ "The field name to filter by" )
102113 .define ("field.value" ,
103114 ConfigDef .Type .STRING , null ,
104- ConfigDef .Importance .HIGH , "Expected value to match. Either define this, or a regex pattern" )
115+ ConfigDef .Importance .HIGH ,
116+ "Expected value to match. Either define this, or a regex pattern" )
105117 .define ("field.value.pattern" ,
106118 ConfigDef .Type .STRING , null ,
107- ConfigDef .Importance .HIGH , "The pattern to match. Either define this, or an expected value" )
119+ ConfigDef .Importance .HIGH ,
120+ "The pattern to match. Either define this, or an expected value" )
108121 .define ("field.value.matches" ,
109122 ConfigDef .Type .BOOLEAN , true ,
110- ConfigDef .Importance .MEDIUM , "The filter mode, 'true' for matching or 'false' for non-matching" );
123+ ConfigDef .Importance .MEDIUM ,
124+ "The filter mode, 'true' for matching or 'false' for non-matching" );
111125 }
112126
113127 @ Override
@@ -120,19 +134,20 @@ public void configure(final Map<String, ?> configs) {
120134 this .fieldName = config .getString ("field.name" );
121135 this .fieldExpectedValue = Optional .ofNullable (config .getString ("field.value" ));
122136 this .fieldValuePattern = Optional .ofNullable (config .getString ("field.value.pattern" ));
123- boolean expectedValuePresent = fieldExpectedValue .map (s -> !s .isEmpty ()).orElse (false );
124- boolean regexPatternPresent = fieldValuePattern .map (s -> !s .isEmpty ()).orElse (false );
137+ final boolean expectedValuePresent = fieldExpectedValue .map (s -> !s .isEmpty ()).orElse (false );
138+ final boolean regexPatternPresent = fieldValuePattern .map (s -> !s .isEmpty ()).orElse (false );
125139 if ((expectedValuePresent && regexPatternPresent )
126140 || (!expectedValuePresent && !regexPatternPresent )) {
127- throw new ConfigException ("Either field.value or field.value.pattern have to be set to apply filter transform" );
141+ throw new ConfigException (
142+ "Either field.value or field.value.pattern have to be set to apply filter transform" );
128143 }
129- Predicate <Optional <String >> matchCondition = fieldValue -> fieldValue
130- .filter (value -> expectedValuePresent ?
131- fieldExpectedValue .get ().equals (value ) :
132- value .matches (fieldValuePattern .get ()))
144+ final Predicate <Optional <String >> matchCondition = fieldValue -> fieldValue
145+ .filter (value -> expectedValuePresent
146+ ? fieldExpectedValue .get ().equals (value )
147+ : value .matches (fieldValuePattern .get ()))
133148 .isPresent ();
134149 this .filterCondition = config .getBoolean ("field.value.matches" )
135- ? ( matchCondition )
150+ ? matchCondition
136151 : (result -> !matchCondition .test (result ));
137152 }
138153}
0 commit comments