Skip to content

Commit 5e6f5fb

Browse files
author
Boris Shkolnik
committed
Merge branch 'master' of https://github.com/apache/samza
2 parents 410ce78 + 5f81b8d commit 5e6f5fb

File tree

19 files changed

+303
-216
lines changed

19 files changed

+303
-216
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ rat {
8989
'samza-hdfs/src/main/resources/**',
9090
'samza-hdfs/src/test/resources/**',
9191
'out/**',
92+
'hs_err_pid*.log',
9293
'state/**'
9394
]
9495
}

samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java

Lines changed: 18 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,19 @@ public interface KeyValueStore<K, V> {
4646
* @return a map of the keys that were found and their respective values.
4747
* @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
4848
*/
49-
Map<K, V> getAll(List<K> keys);
49+
default Map<K, V> getAll(List<K> keys) {
50+
Map<K, V> map = new HashMap<>(keys.size());
51+
52+
for (K key : keys) {
53+
V value = get(key);
54+
55+
if (value != null) {
56+
map.put(key, value);
57+
}
58+
}
59+
60+
return map;
61+
}
5062

5163
/**
5264
* Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}.
@@ -79,7 +91,11 @@ public interface KeyValueStore<K, V> {
7991
* @param keys the keys for which the mappings are to be deleted.
8092
* @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
8193
*/
82-
void deleteAll(List<K> keys);
94+
default void deleteAll(List<K> keys) {
95+
for (K key : keys) {
96+
delete(key);
97+
}
98+
}
8399

84100
/**
85101
* Returns an iterator for a sorted range of entries specified by [{@code from}, {@code to}).
@@ -111,53 +127,4 @@ public interface KeyValueStore<K, V> {
111127
* Flushes this key-value store, if applicable.
112128
*/
113129
void flush();
114-
115-
/**
116-
* Represents an extension for classes that implement {@link KeyValueStore}.
117-
*/
118-
// TODO replace with default interface methods when we can use Java 8 features.
119-
class Extension {
120-
private Extension() {
121-
// This class cannot be instantiated
122-
}
123-
124-
/**
125-
* Gets the values with which the specified {@code keys} are associated.
126-
*
127-
* @param store the key-value store for which this operation is to be performed.
128-
* @param keys the keys with which the associated values are to be fetched.
129-
* @param <K> the type of keys maintained by the specified {@code store}.
130-
* @param <V> the type of values maintained by the specified {@code store}.
131-
* @return a map of the keys that were found and their respective values.
132-
* @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
133-
*/
134-
public static <K, V> Map<K, V> getAll(final KeyValueStore<K, V> store, final List<K> keys) {
135-
final Map<K, V> map = new HashMap<>(keys.size());
136-
137-
for (final K key : keys) {
138-
final V value = store.get(key);
139-
140-
if (value != null) {
141-
map.put(key, value);
142-
}
143-
}
144-
145-
return map;
146-
}
147-
148-
/**
149-
* Deletes the mappings for the specified {@code keys} from this key-value store (if such mappings exist).
150-
*
151-
* @param store the key-value store for which this operation is to be performed.
152-
* @param keys the keys for which the mappings are to be deleted.
153-
* @param <K> the type of keys maintained by the specified {@code store}.
154-
* @param <V> the type of values maintained by the specified {@code store}.
155-
* @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
156-
*/
157-
public static <K, V> void deleteAll(final KeyValueStore<K, V> store, final List<K> keys) {
158-
for (final K key : keys) {
159-
store.delete(key);
160-
}
161-
}
162-
}
163130
}

samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
* A {@link StreamTask} implementation that brings all the operator API implementation components together and
4242
* feeds the input messages into the user-defined transformation chains in {@link StreamApplication}.
4343
*/
44-
public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
44+
public class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
4545
private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorTask.class);
4646

4747
private final StreamApplication streamApplication;

samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,12 @@ private StreamGraphImpl createStreamGraphWithJoinAndWindow() {
177177
OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
178178

179179
messageStream1.map(m -> m)
180-
.filter(m->true)
181-
.window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)));
180+
.filter(m -> true)
181+
.window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)));
182182

183183
messageStream2.map(m -> m)
184-
.filter(m->true)
185-
.window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)));
184+
.filter(m -> true)
185+
.window(Windows.<KV<Object, Object>, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)));
186186

187187
messageStream1
188188
.join(messageStream2, mock(JoinFunction.class),

samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,15 @@ public StreamSpec copyWithPartitionCount(int partitionCount) {
167167
return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), partitionCount, getReplicationFactor(), getProperties());
168168
}
169169

170+
/**
171+
* Make a copy of the spec with new properties
172+
* @param properties properties of the Kafka stream
173+
* @return new instance of {@link KafkaStreamSpec}
174+
*/
175+
public KafkaStreamSpec copyWithProperties(Properties properties) {
176+
return new KafkaStreamSpec(getId(), getPhysicalName(), getSystemName(), getPartitionCount(), getReplicationFactor(), properties);
177+
}
178+
170179
public int getReplicationFactor() {
171180
return replicationFactor;
172181
}

samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala

Lines changed: 39 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ import scala.collection.mutable
4343
* keyed to that taskName. If there is no such message, no checkpoint data
4444
* exists. The underlying log has a single partition into which all
4545
* checkpoints and TaskName to changelog partition mappings are written.
46+
*
47+
* This class is thread safe for writing but not for reading checkpoints.
48+
* This is currently OK since checkpoints are only read on the main thread.
4649
*/
4750
class KafkaCheckpointManager(
4851
clientId: String,
@@ -64,14 +67,14 @@ class KafkaCheckpointManager(
6467
checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging {
6568

6669
var taskNames = Set[TaskName]()
67-
@volatile var systemProducer: SystemProducer = null
70+
@volatile var systemProducer: SystemProducer = null
71+
var systemConsumer: SystemConsumer = null
6872
var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
6973
val systemAdmin = getSystemAdmin()
7074

7175
val kafkaUtil: KafkaUtil = new KafkaUtil(retryBackoff, connectZk)
7276

7377

74-
7578
KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
7679

7780
info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
@@ -181,73 +184,40 @@ class KafkaCheckpointManager(
181184
*/
182185
private def readLog(shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
183186
handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
184-
185-
val UNKNOWN_OFFSET = "-1"
186-
var attempts = 10
187-
val POLL_TIMEOUT = 1000L
187+
info("Reading from checkpoint system:%s topic:%s" format(systemName, checkpointTopic))
188188

189189
val ssp: SystemStreamPartition = new SystemStreamPartition(systemName, checkpointTopic, new Partition(0))
190-
val systemConsumer = getSystemConsumer()
191-
val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0))
192-
// offsets returned are strings
193-
val newestOffset = if (partitionMetadata.getNewestOffset == null) UNKNOWN_OFFSET else partitionMetadata.getNewestOffset
194-
val oldestOffset = partitionMetadata.getOldestOffset
195-
systemConsumer.register(ssp, oldestOffset) // checkpoint stream should always be read from the beginning
196-
systemConsumer.start()
197190

198-
var msgCount = 0
199-
try {
200-
val emptyEnvelopes = util.Collections.emptyMap[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]
201-
// convert offsets to long
202-
var currentOffset = UNKNOWN_OFFSET.toLong
203-
val newestOffsetLong = newestOffset.toLong
204-
val sspToPoll = Collections.singleton(ssp)
205-
while (currentOffset < newestOffsetLong) {
206-
207-
val envelopes: java.util.Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]] =
208-
try {
209-
systemConsumer.poll(sspToPoll, POLL_TIMEOUT)
210-
} catch {
211-
case e: Exception => {
212-
// these exceptions are most likely intermediate
213-
warn("Got %s exception while polling the consumer for checkpoints." format e)
214-
if (attempts == 0) throw new SamzaException("Multiple attempts failed while reading the checkpoints. Giving up.", e)
215-
attempts -= 1
216-
emptyEnvelopes
217-
}
218-
}
191+
if (systemConsumer == null) {
192+
val partitionMetadata = getSSPMetadata(checkpointTopic, new Partition(0))
193+
val oldestOffset = partitionMetadata.getOldestOffset
219194

220-
val messages: util.List[IncomingMessageEnvelope] = envelopes.get(ssp)
221-
val messagesNum = if (messages != null) messages.size else 0
222-
info("CheckpointMgr read %s envelopes (%s messages) from ssp %s. Current offset is %s, newest is %s"
223-
format (envelopes.size(), messagesNum, ssp, currentOffset, newestOffset))
224-
if (envelopes.isEmpty || messagesNum <= 0) {
225-
info("Got empty/null list of messages")
226-
} else {
227-
msgCount += messages.size()
228-
// check the key
229-
for (msg: IncomingMessageEnvelope <- messages) {
230-
val key = msg.getKey.asInstanceOf[Array[Byte]]
231-
currentOffset = msg.getOffset().toLong
232-
if (key == null) {
233-
throw new KafkaUtilException("While reading checkpoint (currentOffset=%s) stream encountered message without key."
234-
format currentOffset)
235-
}
195+
systemConsumer = getSystemConsumer()
196+
systemConsumer.register(ssp, oldestOffset)
197+
systemConsumer.start()
198+
}
236199

237-
val checkpointKey = KafkaCheckpointLogKey.fromBytes(key)
200+
val iterator = new SystemStreamPartitionIterator(systemConsumer, ssp);
201+
var msgCount = 0
202+
while (iterator.hasNext) {
203+
val msg = iterator.next
204+
msgCount += 1
205+
206+
val offset = msg.getOffset
207+
val key = msg.getKey.asInstanceOf[Array[Byte]]
208+
if (key == null) {
209+
throw new KafkaUtilException(
210+
"While reading checkpoint (currentOffset=%s) stream encountered message without key." format offset)
211+
}
238212

239-
if (!shouldHandleEntry(checkpointKey)) {
240-
info("Skipping checkpoint log entry at offset %s with key %s." format(currentOffset, checkpointKey))
241-
} else {
242-
// handleEntry requires ByteBuffer
243-
val checkpointPayload = ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]])
244-
handleEntry(checkpointPayload, checkpointKey)
245-
}
246-
}
247-
}
213+
val checkpointKey = KafkaCheckpointLogKey.fromBytes(key)
214+
215+
if (!shouldHandleEntry(checkpointKey)) {
216+
info("Skipping checkpoint log entry at offset %s with key %s." format(offset, checkpointKey))
217+
} else {
218+
val checkpointPayload = ByteBuffer.wrap(msg.getMessage.asInstanceOf[Array[Byte]])
219+
handleEntry(checkpointPayload, checkpointKey)
248220
}
249-
} finally {
250-
systemConsumer.stop()
251221
}
252222
info("Done reading %s messages from checkpoint system:%s topic:%s" format(msgCount, systemName, checkpointTopic))
253223
}
@@ -282,12 +252,17 @@ class KafkaCheckpointManager(
282252

283253

284254
def stop = {
285-
synchronized (
255+
synchronized {
286256
if (systemProducer != null) {
287257
systemProducer.stop
288258
systemProducer = null
289259
}
290-
)
260+
261+
if (systemConsumer != null) {
262+
systemConsumer.stop
263+
systemConsumer = null
264+
}
265+
}
291266

292267
}
293268

samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ package org.apache.samza.checkpoint.kafka
2121

2222
import java.util.Properties
2323

24+
import com.google.common.collect.ImmutableMap
2425
import kafka.utils.ZkUtils
2526
import org.apache.samza.SamzaException
2627
import org.apache.samza.checkpoint.{CheckpointManager, CheckpointManagerFactory}
28+
import org.apache.samza.config.ApplicationConfig.ApplicationMode
2729
import org.apache.samza.config.JobConfig.Config2Job
28-
import org.apache.samza.config.{Config, KafkaConfig, SystemConfig}
30+
import org.apache.samza.config._
2931
import org.apache.samza.metrics.MetricsRegistry
3032
import org.apache.samza.system.SystemFactory
3133
import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, Logging, Util, _}
@@ -38,21 +40,6 @@ object KafkaCheckpointManagerFactory {
3840
// on log compacted topics. Details in SAMZA-586.
3941
"compression.type" -> "none")
4042

41-
// Set the checkpoint topic configs to have a very small segment size and
42-
// enable log compaction. This keeps job startup time small since there
43-
// are fewer useless (overwritten) messages to read from the checkpoint
44-
// topic.
45-
def getCheckpointTopicProperties(config: Config) = {
46-
val segmentBytes: Int = if (config == null) {
47-
KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES
48-
} else {
49-
new KafkaConfig(config).getCheckpointSegmentBytes()
50-
}
51-
(new Properties /: Map(
52-
"cleanup.policy" -> "compact",
53-
"segment.bytes" -> String.valueOf(segmentBytes))) { case (props, (k, v)) => props.put(k, v); props }
54-
}
55-
5643
/**
5744
* Get the checkpoint system and system factory from the configuration
5845
* @param config
@@ -113,6 +100,6 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
113100
connectZk,
114101
config.getSystemStreamPartitionGrouperFactory, // To find out the SSPGrouperFactory class so it can be included/verified in the key
115102
config.failOnCheckpointValidation,
116-
checkpointTopicProperties = getCheckpointTopicProperties(config))
103+
checkpointTopicProperties = new KafkaConfig(config).getCheckpointTopicProperties())
117104
}
118105
}

0 commit comments

Comments
 (0)