Skip to content

Commit ea052c1

Browse files
committed
feat(extract-topic): add support for schemaless values
1 parent fbfbafe commit ea052c1

File tree

2 files changed

+254
-62
lines changed

2 files changed

+254
-62
lines changed

src/main/java/io/aiven/kafka/connect/transforms/ExtractTopic.java

Lines changed: 98 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232

3333
public abstract class ExtractTopic<R extends ConnectRecord<R>> implements Transformation<R> {
3434

35-
private static final List<Schema.Type> SUPPORTED_TYPES_TO_CONVERT_FROM = Arrays.asList(
35+
private static final List<Schema.Type> SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM = Arrays.asList(
3636
Schema.Type.INT8,
3737
Schema.Type.INT16,
3838
Schema.Type.INT32,
@@ -43,6 +43,17 @@ public abstract class ExtractTopic<R extends ConnectRecord<R>> implements Transf
4343
Schema.Type.STRING
4444
);
4545

46+
private static final List<Class<?>> SUPPORTED_VALUE_TYPES_TO_CONVERT_FROM = Arrays.asList(
47+
Byte.class,
48+
Short.class,
49+
Integer.class,
50+
Long.class,
51+
Double.class,
52+
Float.class,
53+
Boolean.class,
54+
String.class
55+
);
56+
4657
private ExtractTopicConfig config;
4758

4859
@Override
@@ -58,19 +69,28 @@ public void configure(final Map<String, ?> settings) {
5869
@Override
5970
public R apply(final R record) {
6071
final SchemaAndValue schemaAndValue = getSchemaAndValue(record);
61-
if (schemaAndValue.schema() == null) {
62-
throw new DataException(dataPlace() + " schema can't be null: " + record);
63-
}
6472

6573
final Optional<String> newTopic;
66-
if (config.fieldName().isPresent()) {
67-
newTopic = getNewTopicForNamedField(
68-
record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.fieldName().get());
74+
75+
if (schemaAndValue.schema() == null) {
76+
if (config.fieldName().isPresent()) {
77+
newTopic = topicNameFromNamedFieldSchemaless(
78+
record.toString(), schemaAndValue.value(), config.fieldName().get());
79+
} else {
80+
newTopic = topicNameWithoutFieldNameSchemaless(
81+
record.toString(), schemaAndValue.value());
82+
}
6983
} else {
70-
newTopic = getNewTopicWithoutFieldName(
71-
record.toString(), schemaAndValue.schema(), schemaAndValue.value());
84+
if (config.fieldName().isPresent()) {
85+
newTopic = topicNameFromNamedFieldWithSchema(
86+
record.toString(), schemaAndValue.schema(), schemaAndValue.value(), config.fieldName().get());
87+
} else {
88+
newTopic = topicNameWithoutFieldNameWithSchema(
89+
record.toString(), schemaAndValue.schema(), schemaAndValue.value());
90+
}
7291
}
7392

93+
7494
if (newTopic.isPresent()) {
7595
return record.newRecord(
7696
newTopic.get(),
@@ -91,10 +111,68 @@ public R apply(final R record) {
91111

92112
protected abstract SchemaAndValue getSchemaAndValue(final R record);
93113

94-
private Optional<String> getNewTopicForNamedField(final String recordStr,
95-
final Schema schema,
96-
final Object value,
97-
final String fieldName) {
114+
private Optional<String> topicNameFromNamedFieldSchemaless(final String recordStr,
115+
final Object value,
116+
final String fieldName) {
117+
if (value == null) {
118+
throw new DataException(dataPlace() + " can't be null if field name is specified: " + recordStr);
119+
}
120+
121+
if (!(value instanceof Map)) {
122+
throw new DataException(dataPlace() + " type must be Map if field name is specified: " + recordStr);
123+
}
124+
125+
@SuppressWarnings("unchecked") final Map<String, Object> valueMap = (Map<String, Object>) value;
126+
127+
128+
final Optional<String> result = Optional.ofNullable(valueMap.get(fieldName))
129+
.map(field -> {
130+
if (!SUPPORTED_VALUE_TYPES_TO_CONVERT_FROM.contains(field.getClass())) {
131+
throw new DataException(fieldName + " type in " + dataPlace()
132+
+ " " + value
133+
+ " must be " + SUPPORTED_VALUE_TYPES_TO_CONVERT_FROM
134+
+ ": " + recordStr);
135+
}
136+
return field;
137+
})
138+
.map(Object::toString);
139+
if (result.isPresent() && !result.get().equals("")) {
140+
return result;
141+
} else {
142+
if (config.skipMissingOrNull()) {
143+
return Optional.empty();
144+
} else {
145+
throw new DataException(fieldName + " in " + dataPlace() + " can't be null or empty: " + recordStr);
146+
}
147+
}
148+
}
149+
150+
private Optional<String> topicNameWithoutFieldNameSchemaless(final String recordStr,
151+
final Object value) {
152+
153+
if (value == null || "".equals(value)) {
154+
if (config.skipMissingOrNull()) {
155+
return Optional.empty();
156+
} else {
157+
throw new DataException(dataPlace() + " can't be null or empty: " + recordStr);
158+
}
159+
}
160+
161+
if (!SUPPORTED_VALUE_TYPES_TO_CONVERT_FROM.contains(value.getClass())) {
162+
throw new DataException("type in " + dataPlace()
163+
+ " " + value
164+
+ " must be " + SUPPORTED_VALUE_TYPES_TO_CONVERT_FROM
165+
+ ": " + recordStr);
166+
}
167+
168+
169+
return Optional.of(value.toString());
170+
}
171+
172+
private Optional<String> topicNameFromNamedFieldWithSchema(final String recordStr,
173+
final Schema schema,
174+
final Object value,
175+
final String fieldName) {
98176
if (Schema.Type.STRUCT != schema.type()) {
99177
throw new DataException(dataPlace() + " schema type must be STRUCT if field name is specified: "
100178
+ recordStr);
@@ -113,9 +191,9 @@ private Optional<String> getNewTopicForNamedField(final String recordStr,
113191
}
114192
}
115193

116-
if (!SUPPORTED_TYPES_TO_CONVERT_FROM.contains(field.schema().type())) {
194+
if (!SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM.contains(field.schema().type())) {
117195
throw new DataException(fieldName + " schema type in " + dataPlace()
118-
+ " must be " + SUPPORTED_TYPES_TO_CONVERT_FROM
196+
+ " must be " + SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM
119197
+ ": " + recordStr);
120198
}
121199

@@ -134,12 +212,12 @@ private Optional<String> getNewTopicForNamedField(final String recordStr,
134212
}
135213
}
136214

137-
private Optional<String> getNewTopicWithoutFieldName(final String recordStr,
138-
final Schema schema,
139-
final Object value) {
140-
if (!SUPPORTED_TYPES_TO_CONVERT_FROM.contains(schema.type())) {
215+
private Optional<String> topicNameWithoutFieldNameWithSchema(final String recordStr,
216+
final Schema schema,
217+
final Object value) {
218+
if (!SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM.contains(schema.type())) {
141219
throw new DataException(dataPlace() + " schema type must be "
142-
+ SUPPORTED_TYPES_TO_CONVERT_FROM
220+
+ SUPPORTED_SCHEMA_TYPES_TO_CONVERT_FROM
143221
+ " if field name is not specified: "
144222
+ recordStr);
145223
}

0 commit comments

Comments
 (0)