Skip to content

Commit 8b85266

Browse files
dxichenprateekm
authored andcommitted
SAMZA-1956: Update value only descriptor serde
Changed the KVSerde to only value Serde for the Eventhubs input and output descriptors. Since the key is always a `String`, the key serde should always be `NoOpSerde` and will lead to an error otherwise since the Samza `serializers.SerdeManager.scala` expectes a `byte[]` Author: Daniel Chen <[email protected]> Reviewers: Prateek Maheshwari <[email protected]> Closes apache#733 from dxichen/eventhubs-example-cleanup
1 parent 4d6ff98 commit 8b85266

File tree

6 files changed

+73
-50
lines changed

6 files changed

+73
-50
lines changed

samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.samza.config.ConfigException;
2626
import org.apache.samza.system.descriptors.InputDescriptor;
2727
import org.apache.samza.system.descriptors.SystemDescriptor;
28+
import org.apache.samza.serializers.KVSerde;
29+
import org.apache.samza.serializers.NoOpSerde;
2830
import org.apache.samza.serializers.Serde;
2931
import org.apache.samza.system.eventhub.EventHubConfig;
3032

@@ -52,12 +54,12 @@ public class EventHubsInputDescriptor<StreamMessageType>
5254
* @param streamId id of the stream
5355
* @param namespace namespace for the Event Hubs entity to consume from, not null
5456
* @param entityPath entity path for the Event Hubs entity to consume from, not null
55-
* @param serde serde for messages in the stream
57+
* @param valueSerde serde the values in the messages in the stream
5658
* @param systemDescriptor system descriptor this stream descriptor was obtained from
5759
*/
58-
EventHubsInputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
60+
EventHubsInputDescriptor(String streamId, String namespace, String entityPath, Serde valueSerde,
5961
SystemDescriptor systemDescriptor) {
60-
super(streamId, serde, systemDescriptor, null);
62+
super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), systemDescriptor, null);
6163
this.namespace = StringUtils.stripToNull(namespace);
6264
this.entityPath = StringUtils.stripToNull(entityPath);
6365
if (this.namespace == null || this.entityPath == null) {

samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.samza.config.ConfigException;
2626
import org.apache.samza.system.descriptors.OutputDescriptor;
2727
import org.apache.samza.system.descriptors.SystemDescriptor;
28+
import org.apache.samza.serializers.KVSerde;
29+
import org.apache.samza.serializers.NoOpSerde;
2830
import org.apache.samza.serializers.Serde;
2931
import org.apache.samza.system.eventhub.EventHubConfig;
3032

@@ -50,12 +52,12 @@ public class EventHubsOutputDescriptor<StreamMessageType>
5052
* @param streamId id of the stream
5153
* @param namespace namespace for the Event Hubs entity to produce to, not null
5254
* @param entityPath entity path for the Event Hubs entity to produce to, not null
53-
* @param serde serde for messages in the stream
55+
* @param valueSerde serde the values in the messages in the stream
5456
* @param systemDescriptor system descriptor this stream descriptor was obtained from
5557
*/
56-
EventHubsOutputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
58+
EventHubsOutputDescriptor(String streamId, String namespace, String entityPath, Serde valueSerde,
5759
SystemDescriptor systemDescriptor) {
58-
super(streamId, serde, systemDescriptor);
60+
super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), systemDescriptor);
5961
this.namespace = StringUtils.stripToNull(namespace);
6062
this.entityPath = StringUtils.stripToNull(entityPath);
6163
if (this.namespace == null || this.entityPath == null) {

samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.Optional;
26+
import org.apache.samza.operators.KV;
2627
import org.apache.samza.system.descriptors.SystemDescriptor;
2728
import org.apache.samza.serializers.Serde;
2829
import org.apache.samza.system.eventhub.EventHubConfig;
@@ -57,40 +58,40 @@ public EventHubsSystemDescriptor(String systemName) {
5758

5859
/**
5960
* Gets an {@link EventHubsInputDescriptor} for the input stream of this system. The stream has the provided
60-
* namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
61+
* namespace and entity name of the associated Event Hubs entity and the provided stream level value serde.
6162
* <p>
62-
* The type of messages in the stream is the type of the provided stream level serde.
63+
* The message in the stream will have {@link String} keys and {@code ValueType} values.
6364
*
6465
* @param streamId id of the input stream
6566
* @param namespace namespace of the Event Hubs entity to consume from
6667
* @param entityPath entity path of the Event Hubs entity to consume from
67-
* @param serde stream level serde for the input stream
68-
* @param <StreamMessageType> type of messages in this stream
68+
* @param valueSerde stream level serde for the values in the messages in the input stream
69+
* @param <ValueType> type of the value in the messages in this stream
6970
* @return an {@link EventHubsInputDescriptor} for the Event Hubs input stream
7071
*/
71-
public <StreamMessageType> EventHubsInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, String namespace,
72-
String entityPath, Serde<StreamMessageType> serde) {
72+
public <ValueType> EventHubsInputDescriptor<KV<String, ValueType>> getInputDescriptor(String streamId, String namespace,
73+
String entityPath, Serde<ValueType> valueSerde) {
7374
streamIds.add(streamId);
74-
return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, serde, this);
75+
return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, valueSerde, this);
7576
}
7677

7778
/**
7879
* Gets an {@link EventHubsOutputDescriptor} for the output stream of this system. The stream has the provided
79-
* namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
80+
* namespace and entity name of the associated Event Hubs entity and the provided stream level value serde.
8081
* <p>
81-
* The type of messages in the stream is the type of the provided stream level serde.
82+
* The message in the stream will have {@link String} keys and {@code ValueType} values.
8283
*
8384
* @param streamId id of the output stream
8485
* @param namespace namespace of the Event Hubs entity to produce to
8586
* @param entityPath entity path of the Event Hubs entity to produce to
86-
* @param serde stream level serde for the output stream
87-
* @param <StreamMessageType> type of the messages in this stream
87+
* @param valueSerde stream level serde for the values in the messages to the output stream
88+
* @param <ValueType> type of the value in the messages in this stream
8889
* @return an {@link EventHubsOutputDescriptor} for the Event Hubs output stream
8990
*/
90-
public <StreamMessageType> EventHubsOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, String namespace,
91-
String entityPath, Serde<StreamMessageType> serde) {
91+
public <ValueType> EventHubsOutputDescriptor<KV<String, ValueType>> getOutputDescriptor(String streamId, String namespace,
92+
String entityPath, Serde<ValueType> valueSerde) {
9293
streamIds.add(streamId);
93-
return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, serde, this);
94+
return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, valueSerde, this);
9495
}
9596

9697
/**

samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
import java.util.Map;
2222
import org.apache.samza.config.ConfigException;
2323
import org.apache.samza.operators.KV;
24-
import org.apache.samza.serializers.IntegerSerde;
2524
import org.apache.samza.serializers.KVSerde;
25+
import org.apache.samza.serializers.NoOpSerde;
2626
import org.apache.samza.serializers.StringSerde;
2727
import org.apache.samza.system.eventhub.EventHubConfig;
2828
import org.junit.Test;
2929

3030
import static org.junit.Assert.assertEquals;
3131
import static org.junit.Assert.assertNull;
32+
import static org.junit.Assert.assertTrue;
3233
import static org.junit.Assert.fail;
3334

3435

@@ -40,8 +41,8 @@ public void testEntityConnectionConfigs() {
4041

4142
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
4243

43-
EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
44-
.getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
44+
EventHubsInputDescriptor<KV<String, String>> inputDescriptor = systemDescriptor
45+
.getInputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde())
4546
.withSasKeyName("secretkey")
4647
.withSasKey("sasToken-123")
4748
.withConsumerGroup("$notdefault");
@@ -62,8 +63,8 @@ public void testWithoutEntityConnectionConfigs() {
6263

6364
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
6465

65-
EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
66-
.getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
66+
EventHubsInputDescriptor<KV<String, String>> inputDescriptor = systemDescriptor
67+
.getInputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde());
6768

6869
Map<String, String> generatedConfigs = inputDescriptor.toConfig();
6970
assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system"));
@@ -82,11 +83,24 @@ public void testMissingInputDescriptorFields() {
8283

8384
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
8485
try {
85-
systemDescriptor.getInputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
86+
systemDescriptor.getInputDescriptor(streamId, null, null, new StringSerde());
8687
fail("Should have thrown Config Exception");
8788
} catch (ConfigException exception) {
8889
assertEquals(String.format("Missing namespace and entity path Event Hubs input descriptor in " //
8990
+ "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
9091
}
9192
}
93+
94+
@Test
95+
public void testStreamDescriptorContainsKVserde() {
96+
String systemName = "eventHub";
97+
String streamId = "input-stream";
98+
99+
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
100+
EventHubsInputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor
101+
.getInputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde());
102+
assertTrue(outputDescriptor.getSerde() instanceof KVSerde);
103+
assertTrue(((KVSerde) outputDescriptor.getSerde()).getKeySerde() instanceof NoOpSerde);
104+
assertTrue(((KVSerde) outputDescriptor.getSerde()).getValueSerde() instanceof StringSerde);
105+
}
92106
}

samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,15 @@
2121
import java.util.Map;
2222
import org.apache.samza.config.ConfigException;
2323
import org.apache.samza.operators.KV;
24-
import org.apache.samza.serializers.IntegerSerde;
2524
import org.apache.samza.serializers.KVSerde;
25+
import org.apache.samza.serializers.NoOpSerde;
2626
import org.apache.samza.serializers.StringSerde;
2727
import org.apache.samza.system.eventhub.EventHubConfig;
2828
import org.junit.Test;
2929

3030
import static org.junit.Assert.assertEquals;
3131
import static org.junit.Assert.assertNull;
32+
import static org.junit.Assert.assertTrue;
3233
import static org.junit.Assert.fail;
3334

3435
public class TestEventHubsOutputDescriptor {
@@ -39,8 +40,8 @@ public void testEntityConnectionConfigs() {
3940

4041
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
4142

42-
EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
43-
.getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
43+
EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor
44+
.getOutputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde())
4445
.withSasKeyName("secretkey")
4546
.withSasKey("sasToken-123");
4647

@@ -59,8 +60,8 @@ public void testWithoutEntityConnectionConfigs() {
5960

6061
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
6162

62-
EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
63-
.getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
63+
EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor
64+
.getOutputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde());
6465

6566
Map<String, String> generatedConfigs = outputDescriptor.toConfig();
6667
assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system"));
@@ -79,11 +80,24 @@ public void testMissingOutputDescriptorFields() {
7980

8081
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
8182
try {
82-
systemDescriptor.getOutputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
83+
systemDescriptor.getOutputDescriptor(streamId, null, null, new StringSerde());
8384
fail("Should have thrown Config Exception");
8485
} catch (ConfigException exception) {
8586
assertEquals(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
8687
+ "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
8788
}
8889
}
90+
91+
@Test
92+
public void testStreamDescriptorContainsKVserde() {
93+
String systemName = "eventHub";
94+
String streamId = "output-stream";
95+
96+
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
97+
EventHubsOutputDescriptor<KV<String, String>> outputDescriptor = systemDescriptor
98+
.getOutputDescriptor(streamId, "entity-namespace", "entity3", new StringSerde());
99+
assertTrue(outputDescriptor.getSerde() instanceof KVSerde);
100+
assertTrue(((KVSerde) outputDescriptor.getSerde()).getKeySerde() instanceof NoOpSerde);
101+
assertTrue(((KVSerde) outputDescriptor.getSerde()).getValueSerde() instanceof StringSerde);
102+
}
89103
}

samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
package org.apache.samza.system.eventhub.descriptors;
2020

2121
import java.util.Map;
22-
import org.apache.samza.serializers.IntegerSerde;
23-
import org.apache.samza.serializers.KVSerde;
2422
import org.apache.samza.serializers.StringSerde;
2523
import org.apache.samza.system.eventhub.EventHubConfig;
2624
import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
@@ -47,14 +45,10 @@ public void testWithDescriptorOverrides() {
4745
.withRuntimeInfoTimeout(60000)
4846
.withSendKeys(false);
4947

50-
systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
51-
new IntegerSerde()));
52-
systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
53-
new IntegerSerde()));
54-
systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
55-
new IntegerSerde()));
56-
systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
57-
new IntegerSerde()));
48+
systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", new StringSerde());
49+
systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", new StringSerde());
50+
systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", new StringSerde());
51+
systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", new StringSerde());
5852

5953
Map<String, String> generatedConfigs = systemDescriptor.toConfig();
6054
assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
@@ -96,14 +90,10 @@ public void testWithInputOutputStreams() {
9690

9791
EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
9892

99-
systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
100-
new IntegerSerde()));
101-
systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
102-
new IntegerSerde()));
103-
systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
104-
new IntegerSerde()));
105-
systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
106-
new IntegerSerde()));
93+
systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", new StringSerde());
94+
systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", new StringSerde());
95+
systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", new StringSerde());
96+
systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", new StringSerde());
10797

10898
Map<String, String> generatedConfigs = systemDescriptor.toConfig();
10999
assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));

0 commit comments

Comments
 (0)