3232
3333public abstract class ExtractTopic <R extends ConnectRecord <R >> implements Transformation <R > {
3434
35- private static final List <Schema .Type > SUPPORTED_TYPES_TO_CONVERT_FROM = Arrays .asList (
35+ private static final List <Schema .Type > SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM = Arrays .asList (
3636 Schema .Type .INT8 ,
3737 Schema .Type .INT16 ,
3838 Schema .Type .INT32 ,
@@ -43,6 +43,17 @@ public abstract class ExtractTopic<R extends ConnectRecord<R>> implements Transf
4343 Schema .Type .STRING
4444 );
4545
46+ private static final List <Class <?>> SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM = Arrays .asList (
47+ Byte .class ,
48+ Short .class ,
49+ Integer .class ,
50+ Long .class ,
51+ Double .class ,
52+ Float .class ,
53+ Boolean .class ,
54+ String .class
55+ );
56+
4657 private ExtractTopicConfig config ;
4758
4859 @ Override
@@ -58,17 +69,25 @@ public void configure(final Map<String, ?> settings) {
5869 @ Override
5970 public R apply (final R record ) {
6071 final SchemaAndValue schemaAndValue = getSchemaAndValue (record );
61- if (schemaAndValue .schema () == null ) {
62- throw new DataException (dataPlace () + " schema can't be null: " + record );
63- }
6472
6573 final Optional <String > newTopic ;
66- if (config .fieldName ().isPresent ()) {
67- newTopic = getNewTopicForNamedField (
68- record .toString (), schemaAndValue .schema (), schemaAndValue .value (), config .fieldName ().get ());
69- } else {
70- newTopic = getNewTopicWithoutFieldName (
71- record .toString (), schemaAndValue .schema (), schemaAndValue .value ());
74+
75+ if (schemaAndValue .schema () == null ) { // schemaless values (Map)
76+ if (config .fieldName ().isPresent ()) {
77+ newTopic = topicNameFromNamedFieldSchemaless (
78+ record .toString (), schemaAndValue .value (), config .fieldName ().get ());
79+ } else {
80+ newTopic = topicNameWithoutFieldNameSchemaless (
81+ record .toString (), schemaAndValue .value ());
82+ }
83+ } else { // schema-based values (Struct)
84+ if (config .fieldName ().isPresent ()) {
85+ newTopic = topicNameFromNamedFieldWithSchema (
86+ record .toString (), schemaAndValue .schema (), schemaAndValue .value (), config .fieldName ().get ());
87+ } else {
88+ newTopic = topicNameWithoutFieldNameWithSchema (
89+ record .toString (), schemaAndValue .schema (), schemaAndValue .value ());
90+ }
7291 }
7392
7493 if (newTopic .isPresent ()) {
@@ -91,10 +110,66 @@ public R apply(final R record) {
91110
92111 protected abstract SchemaAndValue getSchemaAndValue (final R record );
93112
94- private Optional <String > getNewTopicForNamedField (final String recordStr ,
95- final Schema schema ,
96- final Object value ,
97- final String fieldName ) {
113+ private Optional <String > topicNameFromNamedFieldSchemaless (final String recordStr ,
114+ final Object value ,
115+ final String fieldName ) {
116+ if (value == null ) {
117+ throw new DataException (dataPlace () + " can't be null if field name is specified: " + recordStr );
118+ }
119+
120+ if (!(value instanceof Map )) {
121+ throw new DataException (dataPlace () + " type must be Map if field name is specified: " + recordStr );
122+ }
123+
124+ @ SuppressWarnings ("unchecked" ) final Map <String , Object > valueMap = (Map <String , Object >) value ;
125+
126+ final Optional <String > result = Optional .ofNullable (valueMap .get (fieldName ))
127+ .map (field -> {
128+ if (!SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM .contains (field .getClass ())) {
129+ throw new DataException (fieldName + " type in " + dataPlace ()
130+ + " " + value
131+ + " must be " + SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM
132+ + ": " + recordStr );
133+ }
134+ return field ;
135+ })
136+ .map (Object ::toString );
137+
138+ if (result .isPresent () && !result .get ().equals ("" )) {
139+ return result ;
140+ } else {
141+ if (config .skipMissingOrNull ()) {
142+ return Optional .empty ();
143+ } else {
144+ throw new DataException (fieldName + " in " + dataPlace () + " can't be null or empty: " + recordStr );
145+ }
146+ }
147+ }
148+
149+ private Optional <String > topicNameWithoutFieldNameSchemaless (final String recordStr ,
150+ final Object value ) {
151+ if (value == null || "" .equals (value )) {
152+ if (config .skipMissingOrNull ()) {
153+ return Optional .empty ();
154+ } else {
155+ throw new DataException (dataPlace () + " can't be null or empty: " + recordStr );
156+ }
157+ }
158+
159+ if (!SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM .contains (value .getClass ())) {
160+ throw new DataException ("type in " + dataPlace ()
161+ + " " + value
162+ + " must be " + SUPPORTED_VALUE_CLASS_TO_CONVERT_FROM
163+ + ": " + recordStr );
164+ }
165+
166+ return Optional .of (value .toString ());
167+ }
168+
169+ private Optional <String > topicNameFromNamedFieldWithSchema (final String recordStr ,
170+ final Schema schema ,
171+ final Object value ,
172+ final String fieldName ) {
98173 if (Schema .Type .STRUCT != schema .type ()) {
99174 throw new DataException (dataPlace () + " schema type must be STRUCT if field name is specified: "
100175 + recordStr );
@@ -113,9 +188,9 @@ private Optional<String> getNewTopicForNamedField(final String recordStr,
113188 }
114189 }
115190
116- if (!SUPPORTED_TYPES_TO_CONVERT_FROM .contains (field .schema ().type ())) {
191+ if (!SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM .contains (field .schema ().type ())) {
117192 throw new DataException (fieldName + " schema type in " + dataPlace ()
118- + " must be " + SUPPORTED_TYPES_TO_CONVERT_FROM
193+ + " must be " + SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM
119194 + ": " + recordStr );
120195 }
121196
@@ -134,12 +209,12 @@ private Optional<String> getNewTopicForNamedField(final String recordStr,
134209 }
135210 }
136211
137- private Optional <String > getNewTopicWithoutFieldName (final String recordStr ,
138- final Schema schema ,
139- final Object value ) {
140- if (!SUPPORTED_TYPES_TO_CONVERT_FROM .contains (schema .type ())) {
212+ private Optional <String > topicNameWithoutFieldNameWithSchema (final String recordStr ,
213+ final Schema schema ,
214+ final Object value ) {
215+ if (!SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM .contains (schema .type ())) {
141216 throw new DataException (dataPlace () + " schema type must be "
142- + SUPPORTED_TYPES_TO_CONVERT_FROM
217+ + SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM
143218 + " if field name is not specified: "
144219 + recordStr );
145220 }
0 commit comments