Skip to content

Commit 4bf8ab6

Browse files
prateekmjagadish-northguard
authored andcommitted
SAMZA-1094 SAMZA-1101 SAMZA-1159; Remove MessageEnvelope from public operator APIs. : Delay the creation of SinkFunction for output streams. : Move StreamSpec from a public API to an internal class.
Removed the MessageEnvelope and OutputStream interfaces from public operator APIs. Moved the creation of SinkFunction for output streams to SinkOperatorSpec. Moved StreamSpec from a public API to an internal class. Additionally, 1. Removed references to StreamGraph in OperatorSpecs. It was being used to getNextOpId(). MessageStreamsImpl now gets the ID and gives it to OperatorSpecs itself. 2. Updated and cleaned up the StreamGraphBuilder examples. 3. Renamed SinkOperatorSpec to OutputOperatorSpec since its used by sink, sendTo and partitionBy. nickpan47 and xinyuiscool, please take a look. Author: Prateek Maheshwari <[email protected]> Reviewers: Jagadish <[email protected]>, Yi Pan <[email protected]> Closes apache#92 from prateekm/message-envelope-removal
1 parent 65af13d commit 4bf8ab6

File tree

51 files changed

+1046
-1938
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1046
-1938
lines changed

samza-api/src/main/java/org/apache/samza/operators/MessageStream.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -75,22 +75,24 @@ public interface MessageStream<M> {
7575
MessageStream<M> filter(FilterFunction<M> filterFn);
7676

7777
/**
78-
* Allows sending messages in this {@link MessageStream} to an output using the provided {@link SinkFunction}.
78+
* Allows sending messages in this {@link MessageStream} to an output system using the provided {@link SinkFunction}.
7979
*
80-
* NOTE: the output may not be a {@link org.apache.samza.system.SystemStream}. It can be an external database, etc.
80+
* NOTE: If the output is for a {@link org.apache.samza.system.SystemStream}, use
81+
* {@link #sendTo(OutputStream)} instead. This transform should only be used to output to
82+
* non-stream systems (e.g., an external database).
8183
*
82-
* @param sinkFn the function to send messages in this stream to output
84+
* @param sinkFn the function to send messages in this stream to an external system
8385
*/
8486
void sink(SinkFunction<M> sinkFn);
8587

8688
/**
8789
* Allows sending messages in this {@link MessageStream} to an output {@link MessageStream}.
8890
*
89-
* NOTE: the {@code stream} has to be a {@link MessageStream}.
90-
*
91-
* @param stream the output {@link MessageStream}
91+
* @param outputStream the output stream to send messages to
92+
* @param <K> the type of key in the outgoing message
93+
* @param <V> the type of message in the outgoing message
9294
*/
93-
void sendTo(OutputStream<M> stream);
95+
<K, V> void sendTo(OutputStream<K, V, M> outputStream);
9496

9597
/**
9698
* Groups the messages in this {@link MessageStream} according to the provided {@link Window} semantics
@@ -128,19 +130,20 @@ public interface MessageStream<M> {
128130
* <p>
129131
* The merging streams must have the same messages of type {@code M}.
130132
*
131-
* @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
132-
* @return the merged {@link MessageStream}
133+
* @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
134+
* @return the merged {@link MessageStream}
133135
*/
134136
MessageStream<M> merge(Collection<MessageStream<M>> otherStreams);
135137

136138
/**
137-
* Send the input message to an output {@link org.apache.samza.system.SystemStream} and consume it as input {@link MessageStream} again.
138-
*
139-
* Note: this is an transform function only used in logic DAG. In a physical DAG, this is either translated to a NOOP function, or a {@code MessageStream#sendThrough} function.
139+
* Sends the messages of type {@code M}in this {@link MessageStream} to a repartitioned output stream and consumes
140+
* them as an input {@link MessageStream} again. Uses keys returned by the {@code keyExtractor} as the partition key.
140141
*
141-
* @param parKeyExtractor a {@link Function} that extract the partition key from a message in this {@link MessageStream}
142-
* @param <K> the type of partition key
143-
* @return a {@link MessageStream} object after the re-partition
142+
* @param keyExtractor the {@link Function} to extract the output message key and partition key from
143+
* the input message
144+
* @param <K> the type of output message key and partition key
145+
* @return the repartitioned {@link MessageStream}
144146
*/
145-
<K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor);
147+
<K> MessageStream<M> partitionBy(Function<M, K> keyExtractor);
148+
146149
}

samza-api/src/main/java/org/apache/samza/operators/OutputStream.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,15 @@
1919
package org.apache.samza.operators;
2020

2121
import org.apache.samza.annotation.InterfaceStability;
22-
import org.apache.samza.operators.functions.SinkFunction;
23-
2422

2523
/**
26-
* The interface class defining the specific {@link SinkFunction} for a system {@link OutputStream}.
24+
* An output stream to send messages to.
2725
*
28-
* @param <M> The type of message to be send to this output stream
26+
* @param <K> the type of key in the outgoing message
27+
* @param <V> the type of message in the outgoing message
28+
* @param <M> the type of message in this {@link OutputStream}
2929
*/
3030
@InterfaceStability.Unstable
31-
public interface OutputStream<M> {
31+
public interface OutputStream<K, V, M> {
3232

33-
/**
34-
* Returns the specific {@link SinkFunction} for this output stream. The {@link OutputStream} is created
35-
* via {@link StreamGraph#createOutStream(StreamSpec, Serde, Serde)} or {@link StreamGraph#createIntStream(StreamSpec, Serde, Serde)}.
36-
* Hence, the proper types of serdes for key and value are instantiated and are used in the {@link SinkFunction} returned.
37-
*
38-
* @return The pre-defined {@link SinkFunction} to apply proper serdes before sending the message to the output stream.
39-
*/
40-
SinkFunction<M> getSinkFunction();
4133
}

samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java

Lines changed: 28 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -19,76 +19,52 @@
1919
package org.apache.samza.operators;
2020

2121
import org.apache.samza.annotation.InterfaceStability;
22-
import org.apache.samza.operators.data.MessageEnvelope;
23-
import org.apache.samza.serializers.Serde;
24-
import org.apache.samza.system.StreamSpec;
2522

26-
import java.util.Map;
23+
import java.util.function.BiFunction;
24+
import java.util.function.Function;
2725

2826
/**
29-
* Job-level programming interface to create an operator DAG and run in various different runtime environments.
27+
* Provides APIs for accessing {@link MessageStream}s to be used to create the DAG of transforms.
3028
*/
3129
@InterfaceStability.Unstable
3230
public interface StreamGraph {
33-
/**
34-
* Method to add an input {@link MessageStream} from the system
35-
*
36-
* @param streamSpec the {@link StreamSpec} describing the physical characteristics of the input {@link MessageStream}
37-
* @param keySerde the serde used to serialize/deserialize the message key from the input {@link MessageStream}
38-
* @param msgSerde the serde used to serialize/deserialize the message body from the input {@link MessageStream}
39-
* @param <K> the type of key in the input message
40-
* @param <V> the type of message in the input message
41-
* @param <M> the type of {@link MessageEnvelope} in the input {@link MessageStream}
42-
* @return the input {@link MessageStream} object
43-
*/
44-
<K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
45-
46-
/**
47-
* Method to add an output {@link MessageStream} from the system
48-
*
49-
* @param streamSpec the {@link StreamSpec} describing the physical characteristics of the output {@link MessageStream}
50-
* @param keySerde the serde used to serialize/deserialize the message key from the output {@link MessageStream}
51-
* @param msgSerde the serde used to serialize/deserialize the message body from the output {@link MessageStream}
52-
* @param <K> the type of key in the output message
53-
* @param <V> the type of message in the output message
54-
* @param <M> the type of {@link MessageEnvelope} in the output {@link MessageStream}
55-
* @return the output {@link MessageStream} object
56-
*/
57-
<K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
58-
59-
/**
60-
* Method to add an intermediate {@link MessageStream} from the system
61-
*
62-
* @param streamSpec the {@link StreamSpec} describing the physical characteristics of the intermediate {@link MessageStream}
63-
* @param keySerde the serde used to serialize/deserialize the message key from the intermediate {@link MessageStream}
64-
* @param msgSerde the serde used to serialize/deserialize the message body from the intermediate {@link MessageStream}
65-
* @param <K> the type of key in the intermediate message
66-
* @param <V> the type of message in the intermediate message
67-
* @param <M> the type of {@link MessageEnvelope} in the intermediate {@link MessageStream}
68-
* @return the intermediate {@link MessageStream} object
69-
*/
70-
<K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
7131

7232
/**
73-
* Method to get the input {@link MessageStream}s
33+
* Gets the input {@link MessageStream} corresponding to the logical {@code streamId}.
7434
*
35+
* @param streamId the unique logical ID for the stream
36+
* @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message
37+
* in the input {@link MessageStream}
38+
* @param <K> the type of key in the incoming message
39+
* @param <V> the type of message in the incoming message
40+
* @param <M> the type of message in the input {@link MessageStream}
7541
* @return the input {@link MessageStream}
7642
*/
77-
Map<StreamSpec, MessageStream> getInStreams();
43+
<K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<K, V, M> msgBuilder);
7844

7945
/**
80-
* Method to get the {@link OutputStream}s
46+
* Gets the {@link OutputStream} corresponding to the logical {@code streamId}.
8147
*
82-
* @return the map of all {@link OutputStream}s
48+
* @param streamId the unique logical ID for the stream
49+
* @param keyExtractor the {@link Function} to extract the outgoing key from the output message
50+
* @param msgExtractor the {@link Function} to extract the outgoing message from the output message
51+
* @param <K> the type of key in the outgoing message
52+
* @param <V> the type of message in the outgoing message
53+
* @param <M> the type of message in the {@link OutputStream}
54+
* @return the output {@link MessageStream}
8355
*/
84-
Map<StreamSpec, OutputStream> getOutStreams();
56+
<K, V, M> OutputStream<K, V, M> getOutputStream(String streamId,
57+
Function<M, K> keyExtractor, Function<M, V> msgExtractor);
8558

8659
/**
87-
* Method to set the {@link ContextManager} for this {@link StreamGraph}
60+
* Sets the {@link ContextManager} for this {@link StreamGraph}.
61+
*
62+
* The provided {@code contextManager} will be initialized before the transformation functions
63+
* and can be used to setup shared context between them.
8864
*
89-
* @param manager the {@link ContextManager} object
90-
* @return this {@link StreamGraph} object
65+
* @param contextManager the {@link ContextManager} to use for the {@link StreamGraph}
66+
* @return the {@link StreamGraph} with the {@code contextManager} as its {@link ContextManager}
9167
*/
92-
StreamGraph withContextManager(ContextManager manager);
68+
StreamGraph withContextManager(ContextManager contextManager);
9369

9470
}

samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java

Lines changed: 0 additions & 63 deletions
This file was deleted.

samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java

Lines changed: 0 additions & 80 deletions
This file was deleted.

samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public interface SinkFunction<M> extends InitableFunction {
3737
* or shut the container down.
3838
*
3939
* @param message the input message to be sent to an output {@link org.apache.samza.system.SystemStream}
40-
* @param messageCollector the {@link MessageCollector} to send the {@link org.apache.samza.operators.data.MessageEnvelope}
40+
* @param messageCollector the {@link MessageCollector} to send the message
4141
* @param taskCoordinator the {@link TaskCoordinator} to request commits or shutdown
4242
*/
4343
void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator);

samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818
*/
1919
package org.apache.samza.runtime;
2020

21-
import java.lang.reflect.Constructor;
2221
import org.apache.samza.annotation.InterfaceStability;
2322
import org.apache.samza.application.StreamApplication;
2423
import org.apache.samza.config.Config;
2524
import org.apache.samza.config.ConfigException;
2625
import org.apache.samza.job.ApplicationStatus;
2726
import org.apache.samza.system.StreamSpec;
2827

28+
import java.lang.reflect.Constructor;
29+
2930

3031
/**
3132
* A physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}

0 commit comments

Comments
 (0)