-
Notifications
You must be signed in to change notification settings - Fork 28
transforms: add FilterByFieldValue transformation #82
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
e1f9a53 to
9e6871b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @sebinsunny! I think this could be a useful SMT to add.
I left some comments to discuss.
src/main/java/io/aiven/kafka/connect/transforms/FilterFields.java
Outdated
Show resolved
Hide resolved
src/main/java/io/aiven/kafka/connect/transforms/FilterFields.java
Outdated
Show resolved
Hide resolved
src/main/java/io/aiven/kafka/connect/transforms/FilterFields.java
Outdated
Show resolved
Hide resolved
0a27203 to
0b19bc9
Compare
adding support for regex-based field value matching and configurable filter modes (equality and inequality). This allows filtering records based on whether a specific field value matches or does not match a given regex pattern
0b19bc9 to
5cec48a
Compare
Implement FilterByFieldValue transformation to filter records based on field value or regex pattern
e986948 to
f6c0880
Compare
f6c0880 to
2eb9477
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to give this a quick pass and leave some thoughts; feel free to take them or leave them. I probably won't have time to come back and give this another round so please don't block on my approval.
| public void configure(final Map<String, ?> configs) { | ||
| final AbstractConfig config = new AbstractConfig(config(), configs); | ||
| this.fieldName = config.getString("field.name"); | ||
| this.fieldExpectedValue = Optional.ofNullable(config.getString("field.value")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be useful to allow users to match more than only primitive values. We could use the Values class to parse this property which would allow us to read in more complex types. In addition, this would save unnecessary calls to toString on field values.
This also follows the precedent set in other transforms like InsertHeader and how it parses its "value.literal" property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the getStructFieldValue and getSchemalessFieldValue methods to parse more complex types using the Values class
cc @jeqo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion! Didn't know about Values class, looks like a great match here.
I guess the only potential issue I foresee is when there is a mismatch between parsed schema and the desired one, e.g. if "123" is used as expected value, the inferred schema is INT8, when you may want STRING. Though, to start with this looks good to me. If an issue arise around this we could add a config for expected type.
src/main/java/io/aiven/kafka/connect/transforms/FilterByFieldValue.java
Outdated
Show resolved
Hide resolved
src/main/java/io/aiven/kafka/connect/transforms/FilterByFieldValue.java
Outdated
Show resolved
Hide resolved
src/main/java/io/aiven/kafka/connect/transforms/FilterByFieldValue.java
Outdated
Show resolved
Hide resolved
6073212 to
ea593c0
Compare
ea593c0 to
6452aeb
Compare
Implement FilterByFieldValue transformation to filter records based on field value or a given regex pattern