Skip to content

Commit 435d4fe

Browse files
prateekmJacob Maes
authored andcommitted
SAMZA-1247: MessageStreamImpl#merge shouldn't mutate input collection
Also fixes SAMZA-1253: MessageStream.merge operator broken for nested types Author: Prateek Maheshwari <[email protected]> Reviewers: Jacob Maes <[email protected]>, Jagadish <[email protected]> Closes apache#159 from prateekm/merge-fixes
1 parent f99de20 commit 435d4fe

File tree

3 files changed

+36
-13
lines changed

3 files changed

+36
-13
lines changed

samza-api/src/main/java/org/apache/samza/operators/MessageStream.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,11 @@ public interface MessageStream<M> {
127127

128128
/**
129129
* Merge all {@code otherStreams} with this {@link MessageStream}.
130-
* <p>
131-
* The merging streams must have the same messages of type {@code M}.
132130
*
133131
* @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
134132
* @return the merged {@link MessageStream}
135133
*/
136-
MessageStream<M> merge(Collection<MessageStream<? extends M>> otherStreams);
134+
MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams);
137135

138136
/**
139137
* Sends the messages of type {@code M}in this {@link MessageStream} to a repartitioned output stream and consumes

samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@
3838
import org.apache.samza.task.TaskContext;
3939

4040
import java.time.Duration;
41+
import java.util.ArrayList;
4142
import java.util.Collection;
4243
import java.util.Collections;
4344
import java.util.HashSet;
45+
import java.util.List;
4446
import java.util.Set;
4547
import java.util.function.Function;
4648

@@ -183,14 +185,15 @@ public void init(Config config, TaskContext taskContext) {
183185
}
184186

185187
@Override
186-
public MessageStream<M> merge(Collection<MessageStream<? extends M>> otherStreams) {
188+
public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) {
187189
MessageStreamImpl<M> nextStream = new MessageStreamImpl<>(this.graph);
188-
189-
otherStreams.add(this);
190-
otherStreams.forEach(other -> {
191-
OperatorSpec mergeOperatorSepc =
190+
List<MessageStream<M>> streamsToMerge = new ArrayList<>((Collection<MessageStream<M>>) otherStreams);
191+
streamsToMerge.add(this);
192+
193+
streamsToMerge.forEach(stream -> {
194+
OperatorSpec mergeOperatorSpec =
192195
OperatorSpecs.createMergeOperatorSpec(nextStream, this.graph.getNextOpId());
193-
((MessageStreamImpl<M>) other).registeredOperatorSpecs.add(mergeOperatorSepc);
196+
((MessageStreamImpl<M>) stream).registeredOperatorSpecs.add(mergeOperatorSpec);
194197
});
195198
return nextStream;
196199
}

samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.samza.operators;
2020

21+
import com.google.common.collect.ImmutableList;
2122
import org.apache.samza.config.Config;
2223
import org.apache.samza.config.JobConfig;
2324
import org.apache.samza.config.MapConfig;
@@ -260,16 +261,37 @@ public String getSecondKey(TestMessageEnvelope message) {
260261
@Test
261262
public void testMerge() {
262263
MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph);
263-
Collection<MessageStream<? extends TestMessageEnvelope>> others = new ArrayList<MessageStream<? extends TestMessageEnvelope>>() { {
264-
this.add(new MessageStreamImpl<>(mockGraph));
265-
this.add(new MessageStreamImpl<>(mockGraph));
266-
} };
264+
Collection<MessageStream<TestMessageEnvelope>> others = ImmutableList.of(
265+
new MessageStreamImpl<>(mockGraph), new MessageStreamImpl<>(mockGraph));
267266
MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
268267
validateMergeOperator(merge1, mergeOutput);
269268

269+
others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
270+
}
271+
272+
@Test
273+
public void testMergeWithRelaxedTypes() {
274+
MessageStream<TestMessageEnvelope> input1 = new MessageStreamImpl<>(mockGraph);
275+
Collection<MessageStream<? extends TestMessageEnvelope>> others = ImmutableList.of(
276+
new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph),
277+
new MessageStreamImpl<TestMessageEnvelope>(mockGraph));
278+
279+
MessageStream<TestMessageEnvelope> mergeOutput = input1.merge(others);
280+
validateMergeOperator(input1, mergeOutput);
281+
270282
others.forEach(merge -> validateMergeOperator((MessageStream<TestMessageEnvelope>) merge, mergeOutput));
271283
}
272284

285+
@Test
286+
public <T> void testMergeWithNestedTypes() {
287+
class MessageEnvelope<TM> { }
288+
MessageStream<MessageEnvelope<T>> ms1 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
289+
MessageStream<MessageEnvelope<T>> ms2 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
290+
MessageStream<MessageEnvelope<T>> ms3 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
291+
Collection<MessageStream<MessageEnvelope<T>>> otherStreams = ImmutableList.of(ms2, ms3);
292+
ms1.merge(otherStreams);
293+
}
294+
273295
private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) {
274296
Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs();
275297
assertEquals(subs.size(), 1);

0 commit comments

Comments
 (0)