Skip to content

Commit e47edbe

Browse files
DEEPTHIKORATatoomula
authored andcommitted
SAMZA-2025: InputOperatorImpl should work with filtering InputTransformer
InputOperatorImpl should handle the case where InputTransformer returns null record. It makes having simple filtering operation as part of the transformer easy. Author: Deepthi Sridharan <[email protected]> Reviewers: atoomula, prateekm Closes apache#841 from DEEPTHIKORAT/tranformer
1 parent 0dc9dd2 commit e47edbe

File tree

2 files changed

+19
-1
lines changed

2 files changed

+19
-1
lines changed

samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ public Collection<Object> handleMessage(IncomingMessageEnvelope ime, MessageColl
5555
} else {
5656
message = this.inputOpSpec.isKeyed() ? KV.of(ime.getKey(), ime.getMessage()) : ime.getMessage();
5757
}
58-
return Collections.singletonList(message);
58+
if (message != null) {
59+
return Collections.singletonList(message);
60+
}
61+
return Collections.emptyList();
5962
}
6063

6164
@Override

samza-core/src/test/java/org/apache/samza/operators/impl/TestInputOperatorImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.junit.Test;
2929

3030
import static org.junit.Assert.assertEquals;
31+
import static org.junit.Assert.assertTrue;
3132
import static org.mockito.Mockito.mock;
3233

3334
public class TestInputOperatorImpl {
@@ -77,4 +78,18 @@ public void testWithInputTransformer() {
7778
Object result = results.iterator().next();
7879
assertEquals("123", result);
7980
}
81+
82+
@Test
83+
public void testWithFilteringInputTransformer() {
84+
InputOperatorSpec inputOpSpec =
85+
new InputOperatorSpec("stream-id", null, null, (ime) -> null, true, "input-op-id");
86+
InputOperatorImpl inputOperator = new InputOperatorImpl(inputOpSpec);
87+
88+
IncomingMessageEnvelope ime =
89+
new IncomingMessageEnvelope(mock(SystemStreamPartition.class), "123", "key", "msg");
90+
91+
Collection<Object> results =
92+
inputOperator.handleMessage(ime, mock(MessageCollector.class), mock(TaskCoordinator.class));
93+
assertTrue("Transformer doesn't return any record. Expected an empty collection", results.isEmpty());
94+
}
8095
}

0 commit comments

Comments
 (0)