Skip to content

Commit 98355db

Browse files
dnishimurashanthoosh
authored andcommitted
SAMZA-2132: Flatten startpoint key when serialized. (apache#956)
* Flatten startpoint key when serialized. * Provide custom JsonSerializer for StartpointKey.
1 parent 0967398 commit 98355db

File tree

3 files changed

+101
-2
lines changed

3 files changed

+101
-2
lines changed

samza-core/src/main/java/org/apache/samza/startpoint/StartpointKey.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,10 @@
2222
import com.google.common.base.Objects;
2323
import org.apache.samza.container.TaskName;
2424
import org.apache.samza.system.SystemStreamPartition;
25-
import org.codehaus.jackson.annotate.JsonAutoDetect;
25+
import org.codehaus.jackson.map.annotate.JsonSerialize;
2626

2727

28-
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
28+
@JsonSerialize(using = StartpointKeySerializer.class)
2929
class StartpointKey {
3030
private final SystemStreamPartition systemStreamPartition;
3131
private final TaskName taskName;
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*//*
19+
* Licensed to the Apache Software Foundation (ASF) under one
20+
* or more contributor license agreements. See the NOTICE file
21+
* distributed with this work for additional information
22+
* regarding copyright ownership. The ASF licenses this file
23+
* to you under the Apache License, Version 2.0 (the
24+
* "License"); you may not use this file except in compliance
25+
* with the License. You may obtain a copy of the License at
26+
*
27+
* http://www.apache.org/licenses/LICENSE-2.0
28+
*
29+
* Unless required by applicable law or agreed to in writing,
30+
* software distributed under the License is distributed on an
31+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
32+
* KIND, either express or implied. See the License for the
33+
* specific language governing permissions and limitations
34+
* under the License.
35+
*//*
36+
* Licensed to the Apache Software Foundation (ASF) under one
37+
* or more contributor license agreements. See the NOTICE file
38+
* distributed with this work for additional information
39+
* regarding copyright ownership. The ASF licenses this file
40+
* to you under the Apache License, Version 2.0 (the
41+
* "License"); you may not use this file except in compliance
42+
* with the License. You may obtain a copy of the License at
43+
*
44+
* http://www.apache.org/licenses/LICENSE-2.0
45+
*
46+
* Unless required by applicable law or agreed to in writing,
47+
* software distributed under the License is distributed on an
48+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
49+
* KIND, either express or implied. See the License for the
50+
* specific language governing permissions and limitations
51+
* under the License.
52+
*/
53+
package org.apache.samza.startpoint;
54+
55+
import java.io.IOException;
56+
import java.util.HashMap;
57+
import java.util.Map;
58+
import org.apache.samza.container.TaskName;
59+
import org.apache.samza.system.SystemStreamPartition;
60+
import org.codehaus.jackson.JsonGenerator;
61+
import org.codehaus.jackson.map.JsonSerializer;
62+
import org.codehaus.jackson.map.SerializerProvider;
63+
64+
65+
final class StartpointKeySerializer extends JsonSerializer<StartpointKey> {
66+
@Override
67+
public void serialize(StartpointKey startpointKey, JsonGenerator jsonGenerator, SerializerProvider provider) throws
68+
IOException {
69+
Map<String, Object> systemStreamPartitionMap = new HashMap<>();
70+
SystemStreamPartition systemStreamPartition = startpointKey.getSystemStreamPartition();
71+
TaskName taskName = startpointKey.getTaskName();
72+
systemStreamPartitionMap.put("system", systemStreamPartition.getSystem());
73+
systemStreamPartitionMap.put("stream", systemStreamPartition.getStream());
74+
systemStreamPartitionMap.put("partition", systemStreamPartition.getPartition().getPartitionId());
75+
if (taskName != null) {
76+
systemStreamPartitionMap.put("taskName", taskName.getTaskName());
77+
}
78+
jsonGenerator.writeObject(systemStreamPartitionMap);
79+
}
80+
}

samza-core/src/test/java/org/apache/samza/startpoint/TestStartpointKey.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@
1818
*/
1919
package org.apache.samza.startpoint;
2020

21+
import java.io.IOException;
22+
import java.util.LinkedHashMap;
2123
import org.apache.samza.Partition;
2224
import org.apache.samza.container.TaskName;
2325
import org.apache.samza.serializers.JsonSerdeV2;
2426
import org.apache.samza.system.SystemStreamPartition;
27+
import org.codehaus.jackson.map.ObjectMapper;
2528
import org.junit.Assert;
2629
import org.junit.Test;
2730

2831

2932
public class TestStartpointKey {
33+
3034
@Test
3135
public void testStartpointKey() {
3236
SystemStreamPartition ssp1 = new SystemStreamPartition("system", "stream", new Partition(2));
@@ -61,4 +65,19 @@ public void testStartpointKey() {
6165
Assert.assertNotEquals(new String(new JsonSerdeV2<>().toBytes(startpointKeyWithTask1)),
6266
new String(new JsonSerdeV2<>().toBytes(startpointKeyWithDifferentTask)));
6367
}
68+
69+
@Test
70+
public void testStartpointKeyFormat() throws IOException {
71+
SystemStreamPartition ssp = new SystemStreamPartition("system1", "stream1", new Partition(2));
72+
StartpointKey startpointKeyWithTask = new StartpointKey(ssp, new TaskName("t1"));
73+
ObjectMapper objectMapper = new ObjectMapper();
74+
byte[] jsonBytes = new JsonSerdeV2<>().toBytes(startpointKeyWithTask);
75+
LinkedHashMap<String, String> deserialized = objectMapper.readValue(jsonBytes, LinkedHashMap.class);
76+
77+
Assert.assertEquals(4, deserialized.size());
78+
Assert.assertEquals("system1", deserialized.get("system"));
79+
Assert.assertEquals("stream1", deserialized.get("stream"));
80+
Assert.assertEquals(2, deserialized.get("partition"));
81+
Assert.assertEquals("t1", deserialized.get("taskName"));
82+
}
6483
}

0 commit comments

Comments
 (0)