Skip to content

Commit 509bee8

Browse files
strkkkjmakes
authored andcommitted
SAMZA-2103: [samza-aws] code cleanup and refactoring (apache#914)
1 parent fab6356 commit 509bee8

File tree

8 files changed

+55
-74
lines changed

8 files changed

+55
-74
lines changed

samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public class KinesisRecordProcessor implements IRecordProcessor {
6363
private final SystemStreamPartition ssp;
6464

6565
private String shardId;
66-
private KinesisRecordProcessorListener listener;
66+
private final KinesisRecordProcessorListener listener;
6767
private IRecordProcessorCheckpointer checkpointer;
6868
private ExtendedSequenceNumber initSeqNumber;
6969

@@ -85,7 +85,7 @@ public class KinesisRecordProcessor implements IRecordProcessor {
8585
*/
8686
@Override
8787
public void initialize(InitializationInput initializationInput) {
88-
Validate.isTrue(listener != null, "There is no listener set for the processor.");
88+
Validate.notNull(listener, "There is no listener set for the processor.");
8989
initSeqNumber = initializationInput.getExtendedSequenceNumber();
9090
shardId = initializationInput.getShardId();
9191
LOG.info("Initialization done for {} with sequence {}", this,

samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,8 @@ IRecordProcessorFactory createRecordProcessorFactory(String stream) {
212212
}
213213

214214
@Override
215-
public void onCheckpoint(Map<SystemStreamPartition, String> sspOffsets) {
216-
LOG.info("onCheckpoint called with sspOffsets {}", sspOffsets);
215+
public void afterCheckpoint(Map<SystemStreamPartition, String> sspOffsets) {
216+
LOG.info("afterCheckpoint called with sspOffsets {}", sspOffsets);
217217
sspOffsets.forEach((ssp, offset) -> {
218218
KinesisRecordProcessor processor = processors.get(ssp);
219219
KinesisSystemConsumerOffset kinesisOffset = KinesisSystemConsumerOffset.parse(offset);

samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@
4040
public class KinesisSystemConsumerOffset {
4141

4242
@JsonProperty("shardId")
43-
private String shardId;
43+
private final String shardId;
4444
@JsonProperty("seqNumber")
45-
private String seqNumber;
45+
private final String seqNumber;
4646

4747
@JsonCreator
4848
KinesisSystemConsumerOffset(@JsonProperty("shardId") String shardId,
@@ -91,10 +91,7 @@ public boolean equals(Object o) {
9191
return false;
9292
}
9393
String thatSeqNumber = ((KinesisSystemConsumerOffset) o).getSeqNumber();
94-
if (!(seqNumber == null ? thatSeqNumber == null : seqNumber.equals(thatSeqNumber))) {
95-
return false;
96-
}
97-
return true;
94+
return seqNumber == null ? thatSeqNumber == null : seqNumber.equals(thatSeqNumber);
9895
}
9996

10097
@Override

samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ class SSPAllocator {
4646
private final Map<String, Set<SystemStreamPartition>> availableSsps = new HashMap<>();
4747

4848
synchronized SystemStreamPartition allocate(String stream) throws NoAvailablePartitionException {
49-
Validate.isTrue(availableSsps.get(stream) != null,
49+
Validate.notNull(availableSsps.get(stream),
5050
String.format("availableSsps is null for stream %s", stream));
5151

52-
if (availableSsps.get(stream).size() <= 0) {
52+
if (availableSsps.get(stream).isEmpty()) {
5353
// Set a flag in system consumer so that it could throw an exception in the subsequent poll.
5454
throw new NoAvailablePartitionException(String.format("More shards detected for stream %s than initially"
5555
+ " registered. Could be the result of dynamic resharding.", stream));

samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ <T> KinesisInputDescriptor(String streamId, Serde<T> valueSerde, SystemDescripto
6767
* @return this input descriptor
6868
*/
6969
public KinesisInputDescriptor<StreamMessageType> withRegion(String region) {
70-
this.region = Optional.of(StringUtils.stripToNull(region));
70+
this.region = Optional.ofNullable(StringUtils.stripToNull(region));
7171
return this;
7272
}
7373

@@ -77,7 +77,7 @@ public KinesisInputDescriptor<StreamMessageType> withRegion(String region) {
7777
* @return this input descriptor
7878
*/
7979
public KinesisInputDescriptor<StreamMessageType> withAccessKey(String accessKey) {
80-
this.accessKey = Optional.of(StringUtils.stripToNull(accessKey));
80+
this.accessKey = Optional.ofNullable(StringUtils.stripToNull(accessKey));
8181
return this;
8282
}
8383

@@ -87,7 +87,7 @@ public KinesisInputDescriptor<StreamMessageType> withAccessKey(String accessKey)
8787
* @return this input descriptor
8888
*/
8989
public KinesisInputDescriptor<StreamMessageType> withSecretKey(String secretKey) {
90-
this.secretKey = Optional.of(StringUtils.stripToNull(secretKey));
90+
this.secretKey = Optional.ofNullable(StringUtils.stripToNull(secretKey));
9191
return this;
9292
}
9393

@@ -110,13 +110,13 @@ public Map<String, String> toConfig() {
110110
String clientConfigPrefix =
111111
String.format(KinesisConfig.CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, systemName, streamId);
112112

113-
this.region.ifPresent(
113+
region.ifPresent(
114114
val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamId), val));
115-
this.accessKey.ifPresent(
115+
accessKey.ifPresent(
116116
val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamId), val));
117-
this.secretKey.ifPresent(
117+
secretKey.ifPresent(
118118
val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamId), val));
119-
this.kclConfig.forEach((k, v) -> config.put(clientConfigPrefix + k, v));
119+
kclConfig.forEach((k, v) -> config.put(clientConfigPrefix + k, v));
120120

121121
return config;
122122
}

samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public <ValueType> KinesisInputDescriptor<KV<String, ValueType>> getInputDescrip
7272
* @return this system descriptor
7373
*/
7474
public KinesisSystemDescriptor withRegion(String region) {
75-
this.region = Optional.of(StringUtils.stripToNull(region));
75+
this.region = Optional.ofNullable(StringUtils.stripToNull(region));
7676
return this;
7777
}
7878

@@ -102,7 +102,7 @@ public KinesisSystemDescriptor withKCLConfig(Map<String, String> kclConfig) {
102102
* @return this system descriptor
103103
*/
104104
public KinesisSystemDescriptor withProxyHost(String proxyHost) {
105-
this.proxyHost = Optional.of(StringUtils.stripToNull(proxyHost));
105+
this.proxyHost = Optional.ofNullable(StringUtils.stripToNull(proxyHost));
106106
return this;
107107
}
108108

@@ -121,18 +121,18 @@ public Map<String, String> toConfig() {
121121
Map<String, String> config = new HashMap<>(super.toConfig());
122122
String systemName = getSystemName();
123123

124-
this.region.ifPresent(
124+
region.ifPresent(
125125
val -> config.put(String.format(KinesisConfig.CONFIG_SYSTEM_REGION, systemName), val));
126-
this.proxyHost.ifPresent(
126+
proxyHost.ifPresent(
127127
val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_HOST, systemName), val));
128-
this.proxyPort.ifPresent(
128+
proxyPort.ifPresent(
129129
val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_PORT, systemName), String.valueOf(val)));
130130

131-
final String kclConfigPrefix = String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName);
132-
this.kclConfig.forEach((k, v) -> config.put(kclConfigPrefix + k, v));
131+
String kclConfigPrefix = String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName);
132+
kclConfig.forEach((k, v) -> config.put(kclConfigPrefix + k, v));
133133

134-
final String awsConfigPrefix = String.format(KinesisConfig.CONFIG_AWS_CLIENT_CONFIG, systemName);
135-
this.awsConfig.forEach((k, v) -> config.put(awsConfigPrefix + k, v));
134+
String awsConfigPrefix = String.format(KinesisConfig.CONFIG_AWS_CLIENT_CONFIG, systemName);
135+
awsConfig.forEach((k, v) -> config.put(awsConfigPrefix + k, v));
136136

137137
return config;
138138
}

samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,16 @@ public class TestKinesisRecordProcessor {
5656
KinesisRecordProcessor.POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS + 1000;
5757

5858
@Test
59-
public void testLifeCycleWithEvents() throws InterruptedException, ShutdownException, InvalidStateException,
60-
NoSuchFieldException, IllegalAccessException {
59+
public void testLifeCycleWithEvents() {
6160
testLifeCycleHelper(5);
6261
}
6362

6463
@Test
65-
public void testLifeCycleWithNoEvents() throws InterruptedException, ShutdownException, InvalidStateException,
66-
NoSuchFieldException, IllegalAccessException {
64+
public void testLifeCycleWithNoEvents() {
6765
testLifeCycleHelper(0);
6866
}
6967

70-
private void testLifeCycleHelper(int numRecords) throws InterruptedException, ShutdownException,
71-
InvalidStateException, NoSuchFieldException,
72-
IllegalAccessException {
68+
private void testLifeCycleHelper(int numRecords) {
7369
String system = "kinesis";
7470
String stream = "stream";
7571
final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
@@ -102,7 +98,7 @@ public void onShutdown(SystemStreamPartition ssp) {
10298
// Verification steps
10399

104100
// Verify there is a receivedRecords call to listener.
105-
Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0);
101+
Assert.assertEquals("Unable to receive records.", 0, receivedRecordsLatch.getCount());
106102

107103
if (numRecords > 0) {
108104
// Call checkpoint on last record
@@ -114,7 +110,7 @@ public void onShutdown(SystemStreamPartition ssp) {
114110
shutDownProcessor(processor, ShutdownReason.ZOMBIE);
115111

116112
// Verify that the processor is shutdown.
117-
Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0);
113+
Assert.assertEquals("Unable to shutdown processor.", 0, receivedShutdownLatch.getCount());
118114
}
119115

120116
/**
@@ -125,8 +121,7 @@ public void onShutdown(SystemStreamPartition ssp) {
125121
* before it processed any records.
126122
*/
127123
@Test
128-
public void testCheckpointAfterInit() throws InterruptedException, ShutdownException, InvalidStateException,
129-
NoSuchFieldException, IllegalAccessException {
124+
public void testCheckpointAfterInit() {
130125
String system = "kinesis";
131126
String stream = "stream";
132127
final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
@@ -160,26 +155,21 @@ public void onShutdown(SystemStreamPartition ssp) {
160155
shutDownProcessor(processor, ShutdownReason.ZOMBIE);
161156

162157
// Verify that the processor is shutdown.
163-
Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0);
158+
Assert.assertEquals("Unable to shutdown processor.", 0, receivedShutdownLatch.getCount());
164159
}
165160

166161
@Test
167-
public void testShutdownDuringReshardWithEvents() throws InterruptedException, ShutdownException,
168-
InvalidStateException, NoSuchFieldException,
169-
IllegalAccessException {
162+
public void testShutdownDuringReshardWithEvents() throws InterruptedException {
170163
testShutdownDuringReshardHelper(5);
171164
}
172165

173166
@Test
174-
public void testShutdownDuringReshardWithNoEvents() throws InterruptedException, ShutdownException,
175-
InvalidStateException, NoSuchFieldException,
176-
IllegalAccessException {
167+
public void testShutdownDuringReshardWithNoEvents() throws InterruptedException {
177168
testShutdownDuringReshardHelper(0);
178169
}
179170

180171
private void testShutdownDuringReshardHelper(int numRecords)
181-
throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException,
182-
IllegalAccessException {
172+
throws InterruptedException {
183173
String system = "kinesis";
184174
String stream = "stream";
185175
final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
@@ -212,7 +202,7 @@ public void onShutdown(SystemStreamPartition ssp) {
212202
// Verification steps
213203

214204
// Verify there is a receivedRecords call to listener.
215-
Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0);
205+
Assert.assertEquals("Unable to receive records.", 0, receivedRecordsLatch.getCount());
216206

217207
// Call shutdown (with TERMINATE reason) on processor and verify that the processor does not call shutdown on the
218208
// listener until checkpoint is called for the last record consumed from shard.
@@ -240,7 +230,7 @@ public void onShutdown(SystemStreamPartition ssp) {
240230
}
241231

242232
static Map<KinesisRecordProcessor, List<Record>> generateRecords(int numRecordsPerShard,
243-
List<KinesisRecordProcessor> processors) throws ShutdownException, InvalidStateException {
233+
List<KinesisRecordProcessor> processors) {
244234
Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>();
245235
processors.forEach(processor -> {
246236
try {
@@ -298,4 +288,4 @@ private static List<Record> createRecords(int numRecords) {
298288
}
299289
return records;
300290
}
301-
}
291+
}

samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ public class TestKinesisSystemConsumer {
6565
private static final String SYSTEM_CONSUMER_REGISTER_OFFSET = "0000"; // Could be any string
6666

6767
@Test
68-
public void testProcessRecords() throws InterruptedException, ShutdownException, InvalidStateException,
69-
NoSuchFieldException, IllegalAccessException {
68+
public void testProcessRecords() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
7069
String system = "kinesis";
7170
String stream = "stream";
7271
int numShards = 2;
@@ -76,9 +75,7 @@ public void testProcessRecords() throws InterruptedException, ShutdownException,
7675
}
7776

7877
@Test
79-
public void testProcessRecordsWithEmptyRecordList() throws InterruptedException, ShutdownException,
80-
InvalidStateException, NoSuchFieldException,
81-
IllegalAccessException {
78+
public void testProcessRecordsWithEmptyRecordList() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
8279
String system = "kinesis";
8380
String stream = "stream";
8481
int numShards = 1;
@@ -96,8 +93,7 @@ public void testProcessRecordsWithEmptyRecordList() throws InterruptedException,
9693
* 5. Shutting down (due to re-assignment or lease expiration) record processors.
9794
*/
9895
private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard)
99-
throws InterruptedException, ShutdownException, InvalidStateException,
100-
NoSuchFieldException, IllegalAccessException {
96+
throws InterruptedException, NoSuchFieldException, IllegalAccessException {
10197

10298
KinesisConfig kConfig = new KinesisConfig(new MapConfig());
10399
// Create consumer
@@ -140,24 +136,22 @@ private void testProcessRecordsHelper(String system, String stream, int numShard
140136
try {
141137
KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);
142138

143-
if (numRecordsPerShard > 0) {
144-
// Verify that the read messages are received in order and are the same as input records
145-
Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
146-
List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
147-
List<Record> inputRecords = inputRecordMap.get(processor);
148-
verifyRecords(envelopes, inputRecords, processor.getShardId());
149-
150-
// Call checkpoint on consumer and verify that the checkpoint is called with the right offset
151-
IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
152-
consumer.onCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
153-
ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
154-
verify(getCheckpointer(processor)).checkpoint(argument.capture());
155-
Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
156-
}
139+
// Verify that the read messages are received in order and are the same as input records
140+
Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
141+
List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
142+
List<Record> inputRecords = inputRecordMap.get(processor);
143+
verifyRecords(envelopes, inputRecords, processor.getShardId());
144+
145+
// Call checkpoint on consumer and verify that the checkpoint is called with the right offset
146+
IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
147+
consumer.afterCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
148+
ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
149+
verify(getCheckpointer(processor)).checkpoint(argument.capture());
150+
Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
157151

158152
// Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
159153
shutDownProcessor(processor, ShutdownReason.ZOMBIE);
160-
Assert.assertTrue(!sspToProcessorMap.containsValue(processor));
154+
Assert.assertFalse(sspToProcessorMap.containsValue(processor));
161155
Assert.assertTrue(isSspAvailable(consumer, ssp));
162156
} catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
163157
throw new RuntimeException(ex);
@@ -221,7 +215,7 @@ private void verifyRecords(List<IncomingMessageEnvelope> outputRecords, List<Rec
221215
Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId);
222216
ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage());
223217
record.getData().rewind();
224-
Assert.assertTrue(outputData.equals(record.getData()));
218+
Assert.assertEquals(outputData, record.getData());
225219
verifyOffset(envelope.getOffset(), record, shardId);
226220
});
227221
}

0 commit comments

Comments
 (0)