Skip to content

Commit cc314be

Browse files
committed
SAMZA-2021: Adding an API to rel converter to filter out system messages.
Author: Aditya Toomula <[email protected]> Reviewers: srinipunuru Closes apache#839 from atoomula/system and squashes the following commits: 0dcba87b [Aditya Toomula] Adding an API to rel converter to filter out system messages. 2bee3ba4 [Aditya Toomula] Adding an API to rel converter to filter out system messages.
1 parent e47edbe commit cc314be

File tree

4 files changed

+124
-4
lines changed

4 files changed

+124
-4
lines changed

samza-sql/src/main/java/org/apache/samza/sql/interfaces/SamzaRelConverter.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@
3030
* {@link SamzaRelConverter} for different systems.
3131
*/
3232
public interface SamzaRelConverter {
33+
/**
34+
* Determine if the input samza message is a system message.
35+
* This API will soon be removed when descriptor creation is done by SamzaRelConverter.
36+
* @param message input samza message.
37+
* @return true if the input message is system message.
38+
*/
39+
default boolean isSystemMessage(KV<Object, Object> message) {
40+
return false;
41+
}
42+
3343
/**
3444
* Converts the object to relational message corresponding to the tableName with relational schema.
3545
* @param message samza message that needs to be converted.
@@ -43,4 +53,5 @@ public interface SamzaRelConverter {
4353
* @return the key and value of the Samza message
4454
*/
4555
KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage);
56+
4657
}

samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.commons.lang.Validate;
2626
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
2727
import org.apache.samza.context.Context;
28+
import org.apache.samza.operators.functions.FilterFunction;
2829
import org.apache.samza.system.descriptors.GenericInputDescriptor;
2930
import org.apache.samza.operators.KV;
3031
import org.apache.samza.operators.MessageStream;
@@ -49,6 +50,30 @@ class ScanTranslator {
4950
private final Map<String, SqlIOConfig> systemStreamConfig;
5051
private final int queryId;
5152

53+
// FilterFunction to filter out any messages that are system specific.
54+
private static class FilterSystemMessageFunction implements FilterFunction<KV<Object, Object>> {
55+
private transient SamzaRelConverter relConverter;
56+
private final String source;
57+
private final int queryId;
58+
59+
FilterSystemMessageFunction(String source, int queryId) {
60+
this.source = source;
61+
this.queryId = queryId;
62+
}
63+
64+
@Override
65+
public void init(Context context) {
66+
TranslatorContext translatorContext =
67+
((SamzaSqlApplicationContext) context.getApplicationTaskContext()).getTranslatorContexts().get(queryId);
68+
relConverter = translatorContext.getMsgConverter(source);
69+
}
70+
71+
@Override
72+
public boolean apply(KV<Object, Object> message) {
73+
return !relConverter.isSystemMessage(message);
74+
}
75+
}
76+
5277
ScanTranslator(Map<String, SamzaRelConverter> converters, Map<String, SqlIOConfig> ssc, int queryId) {
5378
relMsgConverters = converters;
5479
this.systemStreamConfig = ssc;
@@ -109,8 +134,12 @@ void translate(final TableScan tableScan, final TranslatorContext context,
109134
sd = systemDescriptors.computeIfAbsent(systemName, DelegatingSystemDescriptor::new);
110135
GenericInputDescriptor<KV<Object, Object>> isd = sd.getInputDescriptor(streamId, noOpKVSerde);
111136

112-
MessageStream<KV<Object, Object>> inputStream = inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(isd));
113-
MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream = inputStream.map(new ScanMapFunction(sourceName, queryId));
137+
MessageStream<KV<Object, Object>> inputStream =
138+
inputMsgStreams.computeIfAbsent(source, v -> streamAppDesc.getInputStream(isd));
139+
MessageStream<KV<Object, Object>> outputStream =
140+
inputStream.filter(new FilterSystemMessageFunction(sourceName, queryId));
141+
MessageStream<SamzaSqlRelMessage> samzaSqlRelMessageStream =
142+
outputStream.map(new ScanMapFunction(sourceName, queryId));
114143
context.registerMessageStream(tableScan.getId(), samzaSqlRelMessageStream);
115144
}
116145
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.sql.testutil;
21+
22+
import org.apache.samza.config.Config;
23+
import org.apache.samza.operators.KV;
24+
import org.apache.samza.sql.avro.AvroRelConverter;
25+
import org.apache.samza.sql.avro.AvroRelSchemaProvider;
26+
import org.apache.samza.sql.interfaces.RelSchemaProvider;
27+
import org.apache.samza.sql.interfaces.SamzaRelConverter;
28+
import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
29+
import org.apache.samza.system.SystemStream;
30+
31+
32+
/**
33+
* SampleRelConverter is an {@link AvroRelConverter} which identifies alternate messages as system messages.
34+
* This is used purely for testing system messages.
35+
*/
36+
public class SampleRelConverterFactory implements SamzaRelConverterFactory {
37+
38+
private int i = 0;
39+
40+
@Override
41+
public SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider relSchemaProvider, Config config) {
42+
return new SampleRelConverter(systemStream, (AvroRelSchemaProvider) relSchemaProvider, config);
43+
}
44+
45+
public class SampleRelConverter extends AvroRelConverter {
46+
public SampleRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
47+
super(systemStream, schemaProvider, config);
48+
}
49+
50+
@Override
51+
public boolean isSystemMessage(KV<Object, Object> kv) {
52+
// Return alternate ones as system messages.
53+
return (i++) % 2 == 0;
54+
}
55+
}
56+
}

samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.samza.sql.system.TestAvroSystemFactory;
4040
import org.apache.samza.sql.testutil.JsonUtil;
4141
import org.apache.samza.sql.testutil.MyTestUdf;
42+
import org.apache.samza.sql.testutil.SampleRelConverterFactory;
4243
import org.apache.samza.sql.testutil.SamzaSqlTestConfig;
4344
import org.apache.samza.system.OutgoingMessageEnvelope;
4445
import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
@@ -90,6 +91,29 @@ public void testEndToEnd() {
9091
Assert.assertTrue(IntStream.range(0, numMessages).boxed().collect(Collectors.toList()).equals(outMessages));
9192
}
9293

94+
@Test
95+
public void testEndToEndWithSystemMessages() {
96+
int numMessages = 20;
97+
98+
TestAvroSystemFactory.messages.clear();
99+
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
100+
String avroSamzaToRelMsgConverterDomain =
101+
String.format(SamzaSqlApplicationConfig.CFG_FMT_SAMZA_REL_CONVERTER_DOMAIN, "avro");
102+
staticConfigs.put(avroSamzaToRelMsgConverterDomain + SamzaSqlApplicationConfig.CFG_FACTORY,
103+
SampleRelConverterFactory.class.getName());
104+
String sql = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
105+
List<String> sqlStmts = Arrays.asList(sql);
106+
staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
107+
SamzaSqlApplicationRunner runner = new SamzaSqlApplicationRunner(true, new MapConfig(staticConfigs));
108+
runner.runAndWaitForFinish();
109+
110+
List<Integer> outMessages = TestAvroSystemFactory.messages.stream()
111+
.map(x -> Integer.valueOf(((GenericRecord) x.getMessage()).get("id").toString()))
112+
.sorted()
113+
.collect(Collectors.toList());
114+
Assert.assertEquals((numMessages + 1) / 2, outMessages.size());
115+
}
116+
93117
@Test
94118
public void testEndToEndWithNullRecords() {
95119
int numMessages = 20;
@@ -138,7 +162,7 @@ public void testEndToEndWithDifferentSystemSameStream() {
138162

139163
@Test
140164
public void testEndToEndMultiSqlStmts() {
141-
int numMessages = 4;
165+
int numMessages = 20;
142166
TestAvroSystemFactory.messages.clear();
143167
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
144168
String sql1 = "Insert into testavro.simpleOutputTopic select * from testavro.SIMPLE1";
@@ -205,7 +229,7 @@ public void testEndToEndFanIn() {
205229

206230
@Test
207231
public void testEndToEndFanOut() {
208-
int numMessages = 4;
232+
int numMessages = 20;
209233
TestAvroSystemFactory.messages.clear();
210234
Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(configs, numMessages);
211235
String sql1 = "Insert into testavro.SIMPLE2 select * from testavro.SIMPLE1";

0 commit comments

Comments
 (0)