Skip to content

Commit 9fa8bee

Browse files
srinipunuruxiliu
authored andcommitted
Samza SQL implementation for basic projects, filtering and UDFs
## Samza SQL implementation for basic projects, filtering and ## Design document: https://docs.google.com/document/d/1bE-ZuPfTpntm1hT3GwQEShYDiTqU3IkxeP4-3ZcGHgU/edit?usp=sharing Author: Srinivasulu Punuru <[email protected]> Reviewers: Yi Pan <[email protected]> Closes apache#295 from srinipunuru/samza-sql.1
1 parent 2e2e00e commit 9fa8bee

File tree

64 files changed

+5218
-7
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+5218
-7
lines changed

build.gradle

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,12 @@ allprojects {
104104
}
105105
}
106106

107+
idea {
108+
project {
109+
languageLevel = 1.8
110+
}
111+
}
112+
107113
subprojects {
108114
apply plugin: 'eclipse'
109115
apply plugin: 'project-report'
@@ -185,7 +191,6 @@ project(":samza-core_$scalaVersion") {
185191
}
186192
}
187193

188-
189194
project(':samza-azure') {
190195
apply plugin: 'java'
191196
apply plugin: 'checkstyle'
@@ -215,7 +220,6 @@ project(':samza-azure') {
215220
}
216221
}
217222

218-
219223
project(":samza-autoscaling_$scalaVersion") {
220224
apply plugin: 'scala'
221225
apply plugin: 'checkstyle'
@@ -269,7 +273,24 @@ project(':samza-elasticsearch') {
269273
testCompile "junit:junit:$junitVersion"
270274
testCompile "org.mockito:mockito-all:$mockitoVersion"
271275

272-
// Logging in tests is good.
276+
testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
277+
}
278+
}
279+
280+
project(':samza-sql') {
281+
apply plugin: 'java'
282+
283+
dependencies {
284+
compile project(':samza-api')
285+
compile project(":samza-kafka_$scalaVersion")
286+
compile "org.apache.avro:avro:$avroVersion"
287+
compile "org.apache.calcite:calcite-core:$calciteVersion"
288+
compile "org.slf4j:slf4j-api:$slf4jVersion"
289+
290+
testCompile project(":samza-test_$scalaVersion")
291+
testCompile "junit:junit:$junitVersion"
292+
testCompile "org.mockito:mockito-all:$mockitoVersion"
293+
273294
testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
274295
}
275296
}
@@ -449,7 +470,7 @@ project(":samza-shell") {
449470
}
450471

451472
// Usage: ./gradlew samza-shell:runJob \
452-
// -PconfigPath=file:///path/to/job/config.properties
473+
// -PconfigPath=file:///path/to/job/config.properties
453474
task runJob(type:JavaExec) {
454475
description 'To run a job (defined in a properties file)'
455476
main = 'org.apache.samza.job.JobRunner'
@@ -459,7 +480,7 @@ project(":samza-shell") {
459480
}
460481

461482
// Usage: ./gradlew samza-shell:checkpointTool \
462-
// -PconfigPath=file:///path/to/job/config.properties -PnewOffsets=file:///path/to/new/offsets.properties
483+
// -PconfigPath=file:///path/to/job/config.properties -PnewOffsets=file:///path/to/new/offsets.properties
463484
task checkpointTool(type:JavaExec) {
464485
description 'Command-line tool to inspect and manipulate the job’s checkpoint'
465486
main = 'org.apache.samza.checkpoint.CheckpointTool'
@@ -470,7 +491,7 @@ project(":samza-shell") {
470491
}
471492

472493
// Usage: ./gradlew samza-shell:kvPerformanceTest
473-
// -PconfigPath=file:///path/to/job/config.properties
494+
// -PconfigPath=file:///path/to/job/config.properties
474495
task kvPerformanceTest(type:JavaExec) {
475496
description 'Command-line tool to run key-value performance tests'
476497
main = 'org.apache.samza.test.performance.TestKeyValuePerformance'

gradle/dependency-versions.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
ext {
2020
apacheCommonsCollections4Version = "4.0"
21+
avroVersion = "1.7.0"
22+
calciteVersion = "1.14.0"
2123
commonsCodecVersion = "1.9"
2224
commonsCollectionVersion = "3.2.1"
2325
commonsHttpClientVersion = "3.1"
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.sql.udfs;
21+
22+
import org.apache.samza.config.Config;
23+
24+
25+
/**
26+
* The base class for the Scalar UDFs. All the scalar UDF classes needs to extend this and implement a method named
27+
* "execute". The number and type of arguments for the execute method in the UDF class should match the number and type of fields
28+
* used while invoking this UDF in SQL statement.
29+
* Say for e.g. User creates a UDF class with signature int execute(int var1, String var2). It can be used in a SQL query
30+
* select myudf(id, name) from profile
31+
* In the above query, Profile should contain fields named 'id' of INTEGER/NUMBER type and 'name' of type VARCHAR/CHARACTER
32+
*/
33+
public interface ScalarUdf {
34+
/**
35+
* Udfs can implement this method to perform any initialization that they may need.
36+
* @param udfConfig Config specific to the udf.
37+
*/
38+
void init(Config udfConfig);
39+
40+
/**
41+
* Actual implementation of the udf function
42+
* @param args
43+
* list of all arguments that the udf needs
44+
* @return
45+
* Return value from the scalar udf.
46+
*/
47+
Object execute(Object... args);
48+
}
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.sql.avro;
21+
22+
import java.util.ArrayList;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.stream.Collectors;
27+
import java.util.stream.IntStream;
28+
import org.apache.avro.Schema;
29+
import org.apache.avro.generic.GenericData;
30+
import org.apache.avro.generic.GenericRecord;
31+
import org.apache.avro.generic.IndexedRecord;
32+
import org.apache.calcite.rel.type.RelDataType;
33+
import org.apache.samza.SamzaException;
34+
import org.apache.samza.config.Config;
35+
import org.apache.samza.operators.KV;
36+
import org.apache.samza.sql.data.SamzaSqlRelMessage;
37+
import org.apache.samza.sql.interfaces.SamzaRelConverter;
38+
import org.apache.samza.system.SystemStream;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
41+
42+
43+
/**
44+
* This class converts a Samza Avro messages to Relational messages and vice versa.
45+
* This supports Samza messages where Key is a string and Value is an avro record.
46+
*
47+
* Conversion from Samza to Relational Message :
48+
* The key part of the samza message is represented as a special column {@link SamzaSqlRelMessage#KEY_NAME}
49+
* in relational message.
50+
*
51+
* The value part of the samza message is expected to be {@link IndexedRecord}, All the fields in the IndexedRecord form
52+
* the corresponding fields of the relational message.
53+
*
54+
* Conversion from Relational to Samza Message :
55+
* This converts the Samza relational message into Avro {@link GenericRecord}.
56+
* All the fields of the relational message is become fields of the Avro GenericRecord except of the field with name
57+
* {@link SamzaSqlRelMessage#KEY_NAME}. This special field becomes the Key in the output Samza message.
58+
*/
59+
public class AvroRelConverter implements SamzaRelConverter {
60+
61+
protected final Config config;
62+
private final Schema avroSchema;
63+
private final RelDataType relationalSchema;
64+
65+
/**
66+
* Class that converts the avro field to their corresponding relational fields
67+
* Array fields are converted from Avro {@link org.apache.avro.generic.GenericData.Array} to {@link ArrayList}
68+
*/
69+
public enum AvroToRelObjConverter {
70+
71+
/**
72+
* If the relational field type is ArraySqlType, We expect the avro field to be of type either
73+
* {@link GenericData.Array} or {@link List} which then is converted to Rel field of type {@link ArrayList}
74+
*/
75+
ArraySqlType {
76+
@Override
77+
Object convert(Object avroObj) {
78+
ArrayList<Object> retVal = new ArrayList<>();
79+
if (avroObj != null) {
80+
if (avroObj instanceof GenericData.Array) {
81+
retVal.addAll(((GenericData.Array) avroObj));
82+
} else if (avroObj instanceof List) {
83+
retVal.addAll((List) avroObj);
84+
}
85+
}
86+
87+
return retVal;
88+
}
89+
},
90+
91+
/**
92+
* If the relational field type is MapSqlType, We expect the avro field to be of type
93+
* {@link Map}
94+
*/
95+
MapSqlType {
96+
@Override
97+
Object convert(Object obj) {
98+
Map<String, Object> retVal = new HashMap<>();
99+
if (obj != null) {
100+
retVal.putAll((Map<String, ?>) obj);
101+
}
102+
return retVal;
103+
}
104+
},
105+
106+
/**
107+
* If the relational field type is RelRecordType, The field is considered an object
108+
* and moved to rel field without any translation.
109+
*/
110+
RelRecordType {
111+
@Override
112+
Object convert(Object obj) {
113+
return obj;
114+
}
115+
},
116+
117+
/**
118+
* If the relational field type is BasicSqlType, The field is moved to rel field without any translation.
119+
*/
120+
BasicSqlType {
121+
@Override
122+
Object convert(Object obj) {
123+
return obj;
124+
}
125+
};
126+
127+
abstract Object convert(Object obj);
128+
}
129+
130+
private static final Logger LOG = LoggerFactory.getLogger(AvroRelConverter.class);
131+
132+
private final Schema arraySchema = Schema.parse(
133+
"{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Object\",\"namespace\":\"java.lang\",\"fields\":[]},\"java-class\":\"java.util.List\"}");
134+
private final Schema mapSchema = Schema.parse(
135+
"{\"type\":\"map\",\"values\":{\"type\":\"record\",\"name\":\"Object\",\"namespace\":\"java.lang\",\"fields\":[]}}");
136+
137+
138+
public AvroRelConverter(SystemStream systemStream, AvroRelSchemaProvider schemaProvider, Config config) {
139+
this.config = config;
140+
this.relationalSchema = schemaProvider.getRelationalSchema();
141+
this.avroSchema = Schema.parse(schemaProvider.getSchema(systemStream));
142+
}
143+
144+
@Override
145+
public SamzaSqlRelMessage convertToRelMessage(KV<Object, Object> samzaMessage) {
146+
List<Object> values = new ArrayList<>();
147+
List<String> fieldNames = new ArrayList<>();
148+
Object value = samzaMessage.getValue();
149+
if (value instanceof IndexedRecord) {
150+
IndexedRecord record = (IndexedRecord) value;
151+
fieldNames.addAll(relationalSchema.getFieldNames());
152+
values.addAll(relationalSchema.getFieldList()
153+
.stream()
154+
.map(x -> getRelField(x.getType(), record.get(this.avroSchema.getField(x.getName()).pos())))
155+
.collect(Collectors.toList()));
156+
} else if (value == null) {
157+
fieldNames.addAll(relationalSchema.getFieldNames());
158+
IntStream.range(0, fieldNames.size() - 1).forEach(x -> values.add(null));
159+
} else {
160+
String msg = "Avro message converter doesn't support messages of type " + value.getClass();
161+
LOG.error(msg);
162+
throw new SamzaException(msg);
163+
}
164+
165+
return new SamzaSqlRelMessage(samzaMessage.getKey(), fieldNames, values);
166+
}
167+
168+
@Override
169+
public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) {
170+
GenericRecord record = new GenericData.Record(this.avroSchema);
171+
List<String> fieldNames = relMessage.getFieldNames();
172+
List<Object> values = relMessage.getFieldValues();
173+
for (int index = 0; index < fieldNames.size(); index++) {
174+
record.put(fieldNames.get(index), values.get(index));
175+
}
176+
177+
return new KV<>(relMessage.getKey(), record);
178+
}
179+
180+
private Object getRelField(RelDataType relType, Object avroObj) {
181+
return AvroToRelObjConverter.valueOf(relType.getClass().getSimpleName()).convert(avroObj);
182+
}
183+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.sql.avro;
21+
22+
import java.util.HashMap;
23+
import java.util.Properties;
24+
import org.apache.samza.config.Config;
25+
import org.apache.samza.sql.interfaces.RelSchemaProvider;
26+
import org.apache.samza.sql.interfaces.SamzaRelConverter;
27+
import org.apache.samza.sql.interfaces.SamzaRelConverterFactory;
28+
import org.apache.samza.system.SystemStream;
29+
30+
31+
/**
32+
* Avro Schema Resolver that uses static config to return a schema for a SystemStream.
33+
* Schemas are configured using the config of format {systemName}.{streamName}.schema.
34+
*/
35+
public class AvroRelConverterFactory implements SamzaRelConverterFactory {
36+
37+
private final HashMap<SystemStream, SamzaRelConverter> relConverters = new HashMap<>();
38+
39+
@Override
40+
public SamzaRelConverter create(SystemStream systemStream, RelSchemaProvider schemaProvider, Config config) {
41+
return relConverters.computeIfAbsent(systemStream,
42+
ss -> new AvroRelConverter(ss, (AvroRelSchemaProvider) schemaProvider, config));
43+
}
44+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.sql.avro;
21+
22+
import org.apache.samza.sql.interfaces.RelSchemaProvider;
23+
import org.apache.samza.system.SystemStream;
24+
25+
26+
public interface AvroRelSchemaProvider extends RelSchemaProvider {
27+
String getSchema(SystemStream systemStream);
28+
}

0 commit comments

Comments
 (0)