Skip to content

Commit 1ad58d4

Browse files
committed
Merge branch 'master' of https://github.com/apache/samza
2 parents 06b1ac3 + 2d10732 commit 1ad58d4

File tree

70 files changed

+3595
-326
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+3595
-326
lines changed

build.gradle

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,16 +192,27 @@ project(':samza-azure') {
192192

193193
dependencies {
194194
compile "com.microsoft.azure:azure-storage:5.3.1"
195+
compile "com.microsoft.azure:azure-eventhubs:0.14.5"
195196
compile "com.fasterxml.jackson.core:jackson-core:2.8.8"
197+
compile "io.dropwizard.metrics:metrics-core:3.1.2"
196198
compile project(':samza-api')
197199
compile project(":samza-core_$scalaVersion")
198200
compile "org.slf4j:slf4j-api:$slf4jVersion"
199201
testCompile "junit:junit:$junitVersion"
202+
testCompile "org.mockito:mockito-all:$mockitoVersion"
203+
testCompile "org.powermock:powermock-api-mockito:$powerMockVersion"
204+
testCompile "org.powermock:powermock-core:$powerMockVersion"
205+
testCompile "org.powermock:powermock-module-junit4:$powerMockVersion"
200206
}
201207
checkstyle {
202208
configFile = new File(rootDir, "checkstyle/checkstyle.xml")
203209
toolVersion = "$checkstyleVersion"
204210
}
211+
test {
212+
// Exclude integration tests that require connection to EventHub
213+
exclude 'org/apache/samza/system/eventhub/producer/*ITest*'
214+
exclude 'org/apache/samza/system/eventhub/consumer/*ITest*'
215+
}
205216
}
206217

207218

samza-api/src/main/java/org/apache/samza/operators/MessageStream.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public interface MessageStream<M> {
7373
* should be retained in the filtered {@link MessageStream}.
7474
*
7575
* @param filterFn the predicate to filter messages from this {@link MessageStream}.
76-
* @return the transformed {@link MessageStream}
76+
* @return the filtered {@link MessageStream}
7777
*/
7878
MessageStream<M> filter(FilterFunction<? super M> filterFn);
7979

@@ -105,15 +105,19 @@ public interface MessageStream<M> {
105105
* <p>
106106
* Use the {@link org.apache.samza.operators.windows.Windows} helper methods to create the appropriate windows.
107107
* <p>
108-
* <b>Warning:</b> As of version 0.13.0, messages in windows are kept in memory and will be lost during restarts.
108+
* The {@code id} must be unique for each operator in this application. It is used as part of the unique ID
109+
* for any state stores and streams created by this operator (the full ID also contains the job name, job id and
110+
* operator type). If the application logic is changed, this ID must be reused in the new operator to retain
111+
* state from the previous version, and changed for the new operator to discard the state from the previous version.
109112
*
110113
* @param window the window to group and process messages from this {@link MessageStream}
114+
* @param id the unique id of this operator in this application
111115
* @param <K> the type of key in the message in this {@link MessageStream}. If a key is specified,
112116
* panes are emitted per-key.
113117
* @param <WV> the type of value in the {@link WindowPane} in the transformed {@link MessageStream}
114-
* @return the transformed {@link MessageStream}
118+
* @return the windowed {@link MessageStream}
115119
*/
116-
<K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window);
120+
<K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String id);
117121

118122
/**
119123
* Joins this {@link MessageStream} with another {@link MessageStream} using the provided
@@ -124,22 +128,27 @@ public interface MessageStream<M> {
124128
* <p>
125129
* Both inputs being joined must have the same number of partitions, and should be partitioned by the join key.
126130
* <p>
127-
* <b>Warning:</b> As of version 0.13.0, messages in joins are kept in memory and will be lost during restarts.
131+
* The {@code id} must be unique for each operator in this application. It is used as part of the unique ID
132+
* for any state stores and streams created by this operator (the full ID also contains the job name, job id and
133+
* operator type). If the application logic is changed, this ID must be reused in the new operator to retain
134+
* state from the previous version, and changed for the new operator to discard the state from the previous version.
128135
*
129136
* @param otherStream the other {@link MessageStream} to be joined with
130137
* @param joinFn the function to join messages from this and the other {@link MessageStream}
131-
* @param ttl the ttl for messages in each stream
132138
* @param keySerde the serde for the join key
133139
* @param messageSerde the serde for messages in this stream
134140
* @param otherMessageSerde the serde for messages in the other stream
141+
* @param ttl the ttl for messages in each stream
142+
* @param id the unique id of this operator in this application
135143
* @param <K> the type of join key
136144
* @param <OM> the type of messages in the other stream
137145
* @param <JM> the type of messages resulting from the {@code joinFn}
138146
* @return the joined {@link MessageStream}
139147
*/
140148
<K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
141149
JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
142-
Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl);
150+
Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
151+
Duration ttl, String id);
143152

144153
/**
145154
* Merges all {@code otherStreams} with this {@link MessageStream}.
@@ -186,26 +195,34 @@ static <T> MessageStream<T> mergeAll(Collection<? extends MessageStream<? extend
186195
* configuration, if present.
187196
* Else, the number of partitions is set to to the max of number of partitions for all input and output streams
188197
* (excluding intermediate streams).
198+
* <p>
199+
* The {@code id} must be unique for each operator in this application. It is used as part of the unique ID
200+
* for any state stores and streams created by this operator (the full ID also contains the job name, job id and
201+
* operator type). If the application logic is changed, this ID must be reused in the new operator to retain
202+
* state from the previous version, and changed for the new operator to discard the state from the previous version.
189203
*
190-
* @param <K> the type of output key
191-
* @param <V> the type of output value
192204
* @param keyExtractor the {@link Function} to extract the message and partition key from the input message
193205
* @param valueExtractor the {@link Function} to extract the value from the input message
194206
* @param serde the {@link KVSerde} to use for (de)serializing the key and value.
207+
* @param id the unique id of this operator in this application
208+
* @param <K> the type of output key
209+
* @param <V> the type of output value
195210
* @return the repartitioned {@link MessageStream}
196211
*/
197212
<K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
198-
Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde);
213+
Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde, String id);
214+
199215

200216
/**
201-
* Same as calling {@link #partitionBy(Function, Function, KVSerde)} with a null KVSerde.
217+
* Same as calling {@link #partitionBy(Function, Function, KVSerde, String)} with a null KVSerde.
202218
*
203219
* @param keyExtractor the {@link Function} to extract the message and partition key from the input message
204220
* @param valueExtractor the {@link Function} to extract the value from the input message
221+
* @param id the unique id of this operator in this application
205222
* @param <K> the type of output key
206223
* @param <V> the type of output value
207224
* @return the repartitioned {@link MessageStream}
208225
*/
209226
<K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
210-
Function<? super M, ? extends V> valueExtractor);
227+
Function<? super M, ? extends V> valueExtractor, String id);
211228
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.system.eventhub;
21+
22+
import com.microsoft.azure.eventhubs.EventHubClient;
23+
24+
/**
25+
* <p>
26+
* EventHubClient manager is the interface that must be implemented to wrap the
27+
* {@link EventHubClient} with lifecycle hooks for initialization and close.
28+
* </p>
29+
*
30+
* <p>
31+
* {@link #init()} should be invoked once during the startup and provides a
32+
* hook to perform some initialization before the creation of the underlying
33+
* {@link EventHubClient}. {@link #close(long)} is invoked once during shut-down
34+
* and can be used to perform clean-ups.
35+
* </p>
36+
*/
37+
public interface EventHubClientManager {
38+
/**
39+
* A constant that can be used in the close method's timeout parameter to
40+
* denote that the close invocation should block until all the teardown
41+
* operations for the {@link EventHubClient} are completed
42+
*/
43+
public static int BLOCK_UNTIL_CLOSE = -1;
44+
45+
/**
46+
* Lifecycle hook to perform initializations for the creation of
47+
* the underlying {@link EventHubClient}.
48+
*/
49+
void init();
50+
51+
/**
52+
* Returns the underlying {@link EventHubClient} instance. Multiple invocations
53+
* of this method should return the same instance instead of
54+
* creating new ones.
55+
*
56+
* @return EventHub client instance of the wrapper
57+
*/
58+
EventHubClient getEventHubClient();
59+
60+
/**
61+
* Tries to close the {@link EventHubClient} instance within the provided
62+
* timeout. Use this method to perform clean-ups after the execution of the
63+
* {@link EventHubClient}. Set timeout the {@link #BLOCK_UNTIL_CLOSE} to
64+
* block until the client is closed.
65+
*
66+
* @param timeoutMs Close timeout in Milliseconds
67+
*/
68+
void close(long timeoutMs);
69+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.system.eventhub;
21+
22+
public class EventHubClientManagerFactory {
23+
public EventHubClientManager getEventHubClientManager(String systemName, String streamName, EventHubConfig config) {
24+
25+
String eventHubNamespace = config.getStreamNamespace(systemName, streamName);
26+
String entityPath = config.getStreamEntityPath(systemName, streamName);
27+
String sasKeyName = config.getStreamSasKeyName(systemName, streamName);
28+
String sasToken = config.getStreamSasToken(systemName, streamName);
29+
30+
return new SamzaEventHubClientManager(eventHubNamespace, entityPath, sasKeyName, sasToken);
31+
}
32+
}

0 commit comments

Comments
 (0)