Skip to content

Commit aae0f38

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents a15a7c9 + 13a38f2 commit aae0f38

File tree

45 files changed

+2164
-483
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+2164
-483
lines changed

samza-api/src/main/java/org/apache/samza/config/Config.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ public List<String> getList(String k, List<String> defaultValue) {
154154
return defaultValue;
155155

156156
String value = get(k);
157+
if (value.trim().isEmpty()) {
158+
return defaultValue;
159+
}
157160
String[] pieces = value.split("\\s*,\\s*");
158161
return Arrays.asList(pieces);
159162
}

samza-api/src/main/java/org/apache/samza/metrics/ListGauge.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,9 @@ public void visit(MetricsVisitor visitor) {
109109

110110
/**
111111
* Evicts entries from the elements list, based on the given item-size and durationThreshold.
112+
* Concurrent eviction threads can cause incorrectness (when reading elements.size or elements.peek).
112113
*/
113-
private void evict() {
114+
private synchronized void evict() {
114115
this.evictBasedOnSize();
115116
this.evictBasedOnTimestamp();
116117
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.runtime;
20+
21+
import java.util.Objects;
22+
23+
/**
24+
* Represents the physical execution environment of the StreamProcessor.
25+
* All the stream processors which run from a LocationId should be able to share (read/write)
26+
* their local state stores.
27+
*/
28+
public class LocationId {
29+
private final String locationId;
30+
31+
public LocationId(String locationId) {
32+
if (locationId == null) {
33+
throw new IllegalArgumentException("LocationId cannot be null");
34+
}
35+
this.locationId = locationId;
36+
}
37+
38+
public String getId() {
39+
return this.locationId;
40+
}
41+
42+
@Override
43+
public boolean equals(Object o) {
44+
if (this == o) return true;
45+
if (o == null || getClass() != o.getClass()) return false;
46+
LocationId that = (LocationId) o;
47+
48+
return Objects.equals(locationId, that.locationId);
49+
}
50+
51+
@Override
52+
public int hashCode() {
53+
return locationId.hashCode();
54+
}
55+
56+
@Override
57+
public String toString() {
58+
return locationId;
59+
}
60+
}

samza-core/src/main/java/org/apache/samza/table/remote/CreditFunction.java renamed to samza-api/src/main/java/org/apache/samza/runtime/LocationIdProvider.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,13 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
20-
package org.apache.samza.table.remote;
21-
22-
import java.io.Serializable;
23-
import java.util.function.Function;
24-
25-
import org.apache.samza.operators.KV;
26-
19+
package org.apache.samza.runtime;
2720

2821
/**
29-
* Function interface for providing rate limiting credits for each table record.
30-
* This interface allows callers to pass in lambda expressions which are otherwise
31-
* non-serializable as-is.
32-
* @param <K> the type of the key
33-
* @param <V> the type of the value
22+
* Generates {@link LocationId} that uniquely identifies the
23+
* execution environment of a stream processor.
3424
*/
35-
public interface CreditFunction<K, V> extends Function<KV<K, V>, Integer>, Serializable {
36-
}
25+
public interface LocationIdProvider {
26+
27+
LocationId getLocationId();
28+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.samza.runtime;
20+
21+
import org.apache.samza.config.Config;
22+
23+
/**
24+
* Builds the {@link LocationIdProvider}.
25+
*/
26+
public interface LocationIdProviderFactory {
27+
LocationIdProvider getLocationIdProvider(Config config);
28+
}

samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.samza.table;
2020

2121
import java.util.List;
22+
import java.util.concurrent.CompletableFuture;
23+
2224
import org.apache.samza.annotation.InterfaceStability;
2325
import org.apache.samza.storage.kv.Entry;
2426

@@ -33,7 +35,8 @@
3335
public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
3436

3537
/**
36-
* Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}.
38+
* Updates the mapping of the specified key-value pair;
39+
* Associates the specified {@code key} with the specified {@code value}.
3740
*
3841
* The key is deleted from the table if value is {@code null}.
3942
*
@@ -43,6 +46,18 @@ public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
4346
*/
4447
void put(K key, V value);
4548

49+
/**
50+
* Asynchronously updates the mapping of the specified key-value pair;
51+
* Associates the specified {@code key} with the specified {@code value}.
52+
* The key is deleted from the table if value is {@code null}.
53+
*
54+
* @param key the key with which the specified {@code value} is to be associated.
55+
* @param value the value with which the specified {@code key} is to be associated.
56+
* @throws NullPointerException if the specified {@code key} is {@code null}.
57+
* @return CompletableFuture for the operation
58+
*/
59+
CompletableFuture<Void> putAsync(K key, V value);
60+
4661
/**
4762
* Updates the mappings of the specified key-value {@code entries}.
4863
*
@@ -53,6 +68,16 @@ public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
5368
*/
5469
void putAll(List<Entry<K, V>> entries);
5570

71+
/**
72+
* Asynchronously updates the mappings of the specified key-value {@code entries}.
73+
* A key is deleted from the table if its corresponding value is {@code null}.
74+
*
75+
* @param entries the updated mappings to put into this table.
76+
* @throws NullPointerException if any of the specified {@code entries} has {@code null} as key.
77+
* @return CompletableFuture for the operation
78+
*/
79+
CompletableFuture<Void> putAllAsync(List<Entry<K, V>> entries);
80+
5681
/**
5782
* Deletes the mapping for the specified {@code key} from this table (if such mapping exists).
5883
*
@@ -61,6 +86,14 @@ public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
6186
*/
6287
void delete(K key);
6388

89+
/**
90+
* Asynchronously deletes the mapping for the specified {@code key} from this table (if such mapping exists).
91+
* @param key the key for which the mapping is to be deleted.
92+
* @throws NullPointerException if the specified {@code key} is {@code null}.
93+
* @return CompletableFuture for the operation
94+
*/
95+
CompletableFuture<Void> deleteAsync(K key);
96+
6497
/**
6598
* Deletes the mappings for the specified {@code keys} from this table.
6699
*
@@ -69,10 +102,16 @@ public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
69102
*/
70103
void deleteAll(List<K> keys);
71104

105+
/**
106+
* Asynchronously deletes the mappings for the specified {@code keys} from this table.
107+
* @param keys the keys for which the mappings are to be deleted.
108+
* @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
109+
* @return CompletableFuture for the operation
110+
*/
111+
CompletableFuture<Void> deleteAllAsync(List<K> keys);
72112

73113
/**
74114
* Flushes the underlying store of this table, if applicable.
75115
*/
76116
void flush();
77-
78117
}

samza-api/src/main/java/org/apache/samza/table/ReadableTable.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.List;
2222
import java.util.Map;
23+
import java.util.concurrent.CompletableFuture;
2324

2425
import org.apache.samza.annotation.InterfaceStability;
2526
import org.apache.samza.container.SamzaContainerContext;
@@ -54,6 +55,15 @@ default void init(SamzaContainerContext containerContext, TaskContext taskContex
5455
*/
5556
V get(K key);
5657

58+
/**
59+
* Asynchronously gets the value associated with the specified {@code key}.
60+
*
61+
* @param key the key with which the associated value is to be fetched.
62+
* @return completableFuture for the requested value
63+
* @throws NullPointerException if the specified {@code key} is {@code null}.
64+
*/
65+
CompletableFuture<V> getAsync(K key);
66+
5767
/**
5868
* Gets the values with which the specified {@code keys} are associated.
5969
*
@@ -63,6 +73,15 @@ default void init(SamzaContainerContext containerContext, TaskContext taskContex
6373
*/
6474
Map<K, V> getAll(List<K> keys);
6575

76+
/**
77+
* Asynchronously gets the values with which the specified {@code keys} are associated.
78+
*
79+
* @param keys the keys with which the associated values are to be fetched.
80+
* @return completableFuture for the requested entries
81+
* @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
82+
*/
83+
CompletableFuture<Map<K, V>> getAllAsync(List<K> keys);
84+
6685
/**
6786
* Close the table and release any resources acquired
6887
*/

samza-api/src/test/java/org/apache/samza/config/TestConfig.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.samza.config;
2121

22+
import java.util.Collections;
23+
import java.util.List;
2224
import org.junit.Test;
2325

2426
import java.util.HashMap;
@@ -85,4 +87,43 @@ public void testSanitize() {
8587
assertEquals(Config.SENSITIVE_MASK, sanitized.get("sensitive.key3"));
8688
assertEquals(Config.SENSITIVE_MASK, sanitized.get("sensitive.key4"));
8789
}
90+
91+
@Test
92+
public void testGetList() {
93+
Map<String, String> m = new HashMap<String, String>() {
94+
{
95+
put("key1", " ");
96+
put("key2", "");
97+
put("key3", " value1 ");
98+
put("key4", "value1,value2");
99+
put("key5", "value1, value2");
100+
put("key6", "value1 , value2");
101+
}
102+
};
103+
104+
Config config = new MapConfig(m);
105+
List<String> list = config.getList("key1", Collections.<String>emptyList());
106+
assertEquals(0, list.size());
107+
108+
list = config.getList("key2", Collections.<String>emptyList());
109+
assertEquals(0, list.size());
110+
111+
list = config.getList("key3");
112+
assertEquals(" value1 ", list.get(0));
113+
114+
list = config.getList("key4");
115+
assertEquals("value1", list.get(0));
116+
assertEquals("value2", list.get(1));
117+
118+
list = config.getList("key5");
119+
assertEquals("value1", list.get(0));
120+
assertEquals("value2", list.get(1));
121+
122+
list = config.getList("key6");
123+
assertEquals("value1", list.get(0));
124+
assertEquals("value2", list.get(1));
125+
126+
list = config.getList("UndefinedKey", Collections.<String>emptyList());
127+
assertEquals(0, list.size());
128+
}
88129
}

samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Map;
2929
import java.util.Queue;
3030
import java.util.Set;
31-
3231
import org.apache.samza.SamzaException;
3332
import org.apache.samza.config.ApplicationConfig;
3433
import org.apache.samza.config.ClusterManagerConfig;
@@ -186,11 +185,12 @@ private void validateConfig() {
186185
// The visited set keeps track of the join specs that have been already inserted in the queue before
187186
Set<OperatorSpec> visited = new HashSet<>();
188187

189-
jobGraph.getSpecGraph().getInputOperators().entrySet().forEach(entry -> {
190-
StreamConfig streamConfig = new StreamConfig(config);
191-
StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(entry.getKey(), streamConfig));
188+
StreamConfig streamConfig = new StreamConfig(config);
189+
190+
jobGraph.getSpecGraph().getInputOperators().forEach((key, value) -> {
191+
StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(key, streamConfig));
192192
// Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
193-
findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ, visited);
193+
findReachableJoins(value, streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ, visited);
194194
});
195195

196196
// At this point, joinQ contains joinSpecs where at least one of the input stream edge partitions is known.
@@ -204,16 +204,21 @@ private void validateConfig() {
204204
if (partitions == StreamEdge.PARTITIONS_UNKNOWN) {
205205
//if the partition is not assigned
206206
partitions = edgePartitions;
207+
log.info("Inferred the partition count {} for the join operator {} from {}."
208+
, new Object[] {partitions, join.getOpId(), edge.getName()});
207209
} else if (partitions != edgePartitions) {
208210
throw new SamzaException(String.format(
209-
"Unable to resolve input partitions of stream %s for join. Expected: %d, Actual: %d",
210-
edge.getName(), partitions, edgePartitions));
211+
"Unable to resolve input partitions of stream %s for the join %s. Expected: %d, Actual: %d",
212+
edge.getName(), join.getOpId(), partitions, edgePartitions));
211213
}
212214
}
213215
}
216+
214217
// assign the partition count for intermediate streams
215218
for (StreamEdge edge : joinSpecToStreamEdges.get(join)) {
216219
if (edge.getPartitionCount() <= 0) {
220+
log.info("Set the partition count to {} for input stream {} to the join {}.",
221+
new Object[] {partitions, edge.getName(), join.getOpId()});
217222
edge.setPartitionCount(partitions);
218223

219224
// find other joins can be inferred by setting this edge
@@ -278,6 +283,7 @@ private static void calculateIntStreamPartitions(JobGraph jobGraph, Config confi
278283
}
279284
for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
280285
if (edge.getPartitionCount() <= 0) {
286+
log.info("Set the partition count for intermediate stream {} to {}.", edge.getName(), partitions);
281287
edge.setPartitionCount(partitions);
282288
}
283289
}

0 commit comments

Comments
 (0)