Skip to content

Commit ecbf174

Browse files
eramitmittalkou
authored andcommitted
apacheGH-38255: [Java] Implement Flight SQL Bulk Ingestion (apache#43551)
Please look at apache#38255 for details on this functionality. Support for Go and C++ was added as part of apache#38385. This pull request is to add the required support for Java. * GitHub Issue: apache#38255 Lead-authored-by: Amit Mittal <[email protected]> Co-authored-by: Amit Mittal <[email protected]> Signed-off-by: David Li <[email protected]>
1 parent 6a4e804 commit ecbf174

File tree

14 files changed

+989
-102
lines changed

14 files changed

+989
-102
lines changed

flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlExtensionScenario.java

Lines changed: 26 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,17 @@
1616
*/
1717
package org.apache.arrow.flight.integration.tests;
1818

19-
import java.util.HashMap;
2019
import java.util.Map;
2120
import org.apache.arrow.flight.FlightClient;
2221
import org.apache.arrow.flight.FlightInfo;
23-
import org.apache.arrow.flight.FlightStream;
2422
import org.apache.arrow.flight.Location;
2523
import org.apache.arrow.flight.SchemaResult;
26-
import org.apache.arrow.flight.Ticket;
2724
import org.apache.arrow.flight.sql.CancelResult;
2825
import org.apache.arrow.flight.sql.FlightSqlClient;
2926
import org.apache.arrow.flight.sql.FlightSqlProducer;
3027
import org.apache.arrow.flight.sql.impl.FlightSql;
3128
import org.apache.arrow.memory.BufferAllocator;
32-
import org.apache.arrow.util.Preconditions;
33-
import org.apache.arrow.vector.UInt4Vector;
3429
import org.apache.arrow.vector.VectorSchemaRoot;
35-
import org.apache.arrow.vector.complex.DenseUnionVector;
36-
import org.apache.arrow.vector.types.pojo.Schema;
3730

3831
/**
3932
* Integration test scenario for validating Flight SQL specs across multiple implementations. This
@@ -53,69 +46,32 @@ public void client(BufferAllocator allocator, Location location, FlightClient cl
5346
}
5447

5548
private void validateMetadataRetrieval(FlightSqlClient sqlClient) throws Exception {
56-
FlightInfo info = sqlClient.getSqlInfo();
57-
Ticket ticket = info.getEndpoints().get(0).getTicket();
58-
59-
Map<Integer, Object> infoValues = new HashMap<>();
60-
try (FlightStream stream = sqlClient.getStream(ticket)) {
61-
Schema actualSchema = stream.getSchema();
62-
IntegrationAssertions.assertEquals(
63-
FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA, actualSchema);
64-
65-
while (stream.next()) {
66-
UInt4Vector infoName = (UInt4Vector) stream.getRoot().getVector(0);
67-
DenseUnionVector value = (DenseUnionVector) stream.getRoot().getVector(1);
68-
69-
for (int i = 0; i < stream.getRoot().getRowCount(); i++) {
70-
final int code = infoName.get(i);
71-
if (infoValues.containsKey(code)) {
72-
throw new AssertionError("Duplicate SqlInfo value: " + code);
73-
}
74-
Object object;
75-
byte typeId = value.getTypeId(i);
76-
switch (typeId) {
77-
case 0: // string
78-
object =
79-
Preconditions.checkNotNull(
80-
value.getVarCharVector(typeId).getObject(value.getOffset(i)))
81-
.toString();
82-
break;
83-
case 1: // bool
84-
object = value.getBitVector(typeId).getObject(value.getOffset(i));
85-
break;
86-
case 2: // int64
87-
object = value.getBigIntVector(typeId).getObject(value.getOffset(i));
88-
break;
89-
case 3: // int32
90-
object = value.getIntVector(typeId).getObject(value.getOffset(i));
91-
break;
92-
default:
93-
throw new AssertionError("Decoding SqlInfo of type code " + typeId);
94-
}
95-
infoValues.put(code, object);
96-
}
97-
}
98-
}
99-
100-
IntegrationAssertions.assertEquals(
101-
Boolean.FALSE, infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_SQL_VALUE));
102-
IntegrationAssertions.assertEquals(
103-
Boolean.TRUE, infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_SUBSTRAIT_VALUE));
104-
IntegrationAssertions.assertEquals(
105-
"min_version",
106-
infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_SUBSTRAIT_MIN_VERSION_VALUE));
107-
IntegrationAssertions.assertEquals(
108-
"max_version",
109-
infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_SUBSTRAIT_MAX_VERSION_VALUE));
110-
IntegrationAssertions.assertEquals(
111-
FlightSql.SqlSupportedTransaction.SQL_SUPPORTED_TRANSACTION_SAVEPOINT_VALUE,
112-
infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_TRANSACTION_VALUE));
113-
IntegrationAssertions.assertEquals(
114-
Boolean.TRUE, infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_CANCEL_VALUE));
115-
IntegrationAssertions.assertEquals(
116-
42, infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_STATEMENT_TIMEOUT_VALUE));
117-
IntegrationAssertions.assertEquals(
118-
7, infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_TRANSACTION_TIMEOUT_VALUE));
49+
validate(
50+
FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA,
51+
sqlClient.getSqlInfo(),
52+
sqlClient,
53+
s -> {
54+
Map<Integer, Object> infoValues = readSqlInfoStream(s);
55+
IntegrationAssertions.assertEquals(
56+
Boolean.FALSE, infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_SQL_VALUE));
57+
IntegrationAssertions.assertEquals(
58+
Boolean.TRUE, infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_SUBSTRAIT_VALUE));
59+
IntegrationAssertions.assertEquals(
60+
"min_version",
61+
infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_SUBSTRAIT_MIN_VERSION_VALUE));
62+
IntegrationAssertions.assertEquals(
63+
"max_version",
64+
infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_SUBSTRAIT_MAX_VERSION_VALUE));
65+
IntegrationAssertions.assertEquals(
66+
FlightSql.SqlSupportedTransaction.SQL_SUPPORTED_TRANSACTION_SAVEPOINT_VALUE,
67+
infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_TRANSACTION_VALUE));
68+
IntegrationAssertions.assertEquals(
69+
Boolean.TRUE, infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_CANCEL_VALUE));
70+
IntegrationAssertions.assertEquals(
71+
42, infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_STATEMENT_TIMEOUT_VALUE));
72+
IntegrationAssertions.assertEquals(
73+
7, infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_TRANSACTION_TIMEOUT_VALUE));
74+
});
11975
}
12076

12177
private void validateStatementExecution(FlightSqlClient sqlClient) throws Exception {
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.flight.integration.tests;
18+
19+
import com.google.common.collect.ImmutableMap;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
import org.apache.arrow.flight.FlightClient;
23+
import org.apache.arrow.flight.FlightProducer;
24+
import org.apache.arrow.flight.Location;
25+
import org.apache.arrow.flight.sql.FlightSqlClient;
26+
import org.apache.arrow.flight.sql.FlightSqlClient.ExecuteIngestOptions;
27+
import org.apache.arrow.flight.sql.FlightSqlProducer;
28+
import org.apache.arrow.flight.sql.impl.FlightSql;
29+
import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementIngest.TableDefinitionOptions;
30+
import org.apache.arrow.memory.BufferAllocator;
31+
import org.apache.arrow.vector.VectorSchemaRoot;
32+
import org.apache.arrow.vector.types.pojo.Schema;
33+
34+
/**
35+
* Integration test scenario for validating Flight SQL specs across multiple implementations. This
36+
* should ensure that RPC objects are being built and parsed correctly for multiple languages and
37+
* that the Arrow schemas are returned as expected.
38+
*/
39+
public class FlightSqlIngestionScenario extends FlightSqlScenario {
40+
41+
@Override
42+
public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception {
43+
FlightSqlScenarioProducer producer =
44+
(FlightSqlScenarioProducer) super.producer(allocator, location);
45+
producer
46+
.getSqlInfoBuilder()
47+
.withFlightSqlServerBulkIngestionTransaction(true)
48+
.withFlightSqlServerBulkIngestion(true);
49+
return producer;
50+
}
51+
52+
@Override
53+
public void client(BufferAllocator allocator, Location location, FlightClient client)
54+
throws Exception {
55+
try (final FlightSqlClient sqlClient = new FlightSqlClient(client)) {
56+
validateMetadataRetrieval(sqlClient);
57+
validateIngestion(allocator, sqlClient);
58+
}
59+
}
60+
61+
private void validateMetadataRetrieval(FlightSqlClient sqlClient) throws Exception {
62+
validate(
63+
FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA,
64+
sqlClient.getSqlInfo(
65+
FlightSql.SqlInfo.FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED,
66+
FlightSql.SqlInfo.FLIGHT_SQL_SERVER_BULK_INGESTION),
67+
sqlClient,
68+
s -> {
69+
Map<Integer, Object> infoValues = readSqlInfoStream(s);
70+
IntegrationAssertions.assertEquals(
71+
Boolean.TRUE,
72+
infoValues.get(
73+
FlightSql.SqlInfo.FLIGHT_SQL_SERVER_INGEST_TRANSACTIONS_SUPPORTED_VALUE));
74+
IntegrationAssertions.assertEquals(
75+
Boolean.TRUE,
76+
infoValues.get(FlightSql.SqlInfo.FLIGHT_SQL_SERVER_BULK_INGESTION_VALUE));
77+
});
78+
}
79+
80+
private VectorSchemaRoot getIngestVectorRoot(BufferAllocator allocator) {
81+
Schema schema = FlightSqlScenarioProducer.getIngestSchema();
82+
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
83+
root.setRowCount(3);
84+
return root;
85+
}
86+
87+
private void validateIngestion(BufferAllocator allocator, FlightSqlClient sqlClient) {
88+
try (VectorSchemaRoot data = getIngestVectorRoot(allocator)) {
89+
TableDefinitionOptions tableDefinitionOptions =
90+
TableDefinitionOptions.newBuilder()
91+
.setIfExists(TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_REPLACE)
92+
.setIfNotExist(
93+
TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE)
94+
.build();
95+
Map<String, String> options = new HashMap<>(ImmutableMap.of("key1", "val1", "key2", "val2"));
96+
ExecuteIngestOptions executeIngestOptions =
97+
new ExecuteIngestOptions(
98+
"test_table", tableDefinitionOptions, true, "test_catalog", "test_schema", options);
99+
FlightSqlClient.Transaction transaction =
100+
new FlightSqlClient.Transaction(BULK_INGEST_TRANSACTION_ID);
101+
long updatedRows = sqlClient.executeIngest(data, executeIngestOptions, transaction);
102+
103+
IntegrationAssertions.assertEquals(3L, updatedRows);
104+
}
105+
}
106+
}

flight/flight-integration-tests/src/main/java/org/apache/arrow/flight/integration/tests/FlightSqlScenario.java

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,14 @@
1616
*/
1717
package org.apache.arrow.flight.integration.tests;
1818

19+
import static java.util.Objects.isNull;
20+
21+
import com.google.protobuf.Any;
1922
import java.nio.charset.StandardCharsets;
2023
import java.util.Arrays;
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
import java.util.function.Consumer;
2127
import org.apache.arrow.flight.CallOption;
2228
import org.apache.arrow.flight.FlightClient;
2329
import org.apache.arrow.flight.FlightInfo;
@@ -29,10 +35,14 @@
2935
import org.apache.arrow.flight.Ticket;
3036
import org.apache.arrow.flight.sql.FlightSqlClient;
3137
import org.apache.arrow.flight.sql.FlightSqlProducer;
38+
import org.apache.arrow.flight.sql.FlightSqlUtils;
3239
import org.apache.arrow.flight.sql.impl.FlightSql;
3340
import org.apache.arrow.flight.sql.util.TableRef;
3441
import org.apache.arrow.memory.BufferAllocator;
42+
import org.apache.arrow.util.Preconditions;
43+
import org.apache.arrow.vector.UInt4Vector;
3544
import org.apache.arrow.vector.VectorSchemaRoot;
45+
import org.apache.arrow.vector.complex.DenseUnionVector;
3646
import org.apache.arrow.vector.types.pojo.Schema;
3747

3848
/**
@@ -52,6 +62,7 @@ public class FlightSqlScenario implements Scenario {
5262
public static final FlightSqlClient.SubstraitPlan SUBSTRAIT_PLAN =
5363
new FlightSqlClient.SubstraitPlan(SUBSTRAIT_PLAN_TEXT, SUBSTRAIT_VERSION);
5464
public static final byte[] TRANSACTION_ID = "transaction_id".getBytes(StandardCharsets.UTF_8);
65+
public static final byte[] BULK_INGEST_TRANSACTION_ID = "123".getBytes(StandardCharsets.UTF_8);
5566

5667
@Override
5768
public FlightProducer producer(BufferAllocator allocator, Location location) throws Exception {
@@ -150,15 +161,23 @@ private void validateMetadataRetrieval(FlightSqlClient sqlClient) throws Excepti
150161
validateSchema(
151162
FlightSqlProducer.Schemas.GET_TYPE_INFO_SCHEMA, sqlClient.getXdbcTypeInfoSchema(options));
152163

153-
validate(
154-
FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA,
164+
FlightInfo sqlInfoFlightInfo =
155165
sqlClient.getSqlInfo(
156166
new FlightSql.SqlInfo[] {
157167
FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME,
158168
FlightSql.SqlInfo.FLIGHT_SQL_SERVER_READ_ONLY
159169
},
160-
options),
161-
sqlClient);
170+
options);
171+
172+
Ticket ticket = sqlInfoFlightInfo.getEndpoints().get(0).getTicket();
173+
FlightSql.CommandGetSqlInfo requestSqlInfoCommand =
174+
FlightSqlUtils.unpackOrThrow(
175+
Any.parseFrom(ticket.getBytes()), FlightSql.CommandGetSqlInfo.class);
176+
IntegrationAssertions.assertEquals(
177+
requestSqlInfoCommand.getInfo(0), FlightSql.SqlInfo.FLIGHT_SQL_SERVER_NAME_VALUE);
178+
IntegrationAssertions.assertEquals(
179+
requestSqlInfoCommand.getInfo(1), FlightSql.SqlInfo.FLIGHT_SQL_SERVER_READ_ONLY_VALUE);
180+
validate(FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA, sqlInfoFlightInfo, sqlClient);
162181
validateSchema(
163182
FlightSqlProducer.Schemas.GET_SQL_INFO_SCHEMA, sqlClient.getSqlInfoSchema(options));
164183
}
@@ -194,14 +213,64 @@ private void validatePreparedStatementExecution(
194213

195214
protected void validate(Schema expectedSchema, FlightInfo flightInfo, FlightSqlClient sqlClient)
196215
throws Exception {
216+
validate(expectedSchema, flightInfo, sqlClient, null);
217+
}
218+
219+
protected void validate(
220+
Schema expectedSchema,
221+
FlightInfo flightInfo,
222+
FlightSqlClient sqlClient,
223+
Consumer<FlightStream> streamConsumer)
224+
throws Exception {
197225
Ticket ticket = flightInfo.getEndpoints().get(0).getTicket();
198226
try (FlightStream stream = sqlClient.getStream(ticket)) {
199227
Schema actualSchema = stream.getSchema();
200228
IntegrationAssertions.assertEquals(expectedSchema, actualSchema);
229+
if (!isNull(streamConsumer)) {
230+
streamConsumer.accept(stream);
231+
}
201232
}
202233
}
203234

204235
protected void validateSchema(Schema expected, SchemaResult actual) {
205236
IntegrationAssertions.assertEquals(expected, actual.getSchema());
206237
}
238+
239+
protected Map<Integer, Object> readSqlInfoStream(FlightStream stream) {
240+
Map<Integer, Object> infoValues = new HashMap<>();
241+
while (stream.next()) {
242+
UInt4Vector infoName = (UInt4Vector) stream.getRoot().getVector(0);
243+
DenseUnionVector value = (DenseUnionVector) stream.getRoot().getVector(1);
244+
245+
for (int i = 0; i < stream.getRoot().getRowCount(); i++) {
246+
final int code = infoName.get(i);
247+
if (infoValues.containsKey(code)) {
248+
throw new AssertionError("Duplicate SqlInfo value: " + code);
249+
}
250+
Object object;
251+
byte typeId = value.getTypeId(i);
252+
switch (typeId) {
253+
case 0: // string
254+
object =
255+
Preconditions.checkNotNull(
256+
value.getVarCharVector(typeId).getObject(value.getOffset(i)))
257+
.toString();
258+
break;
259+
case 1: // bool
260+
object = value.getBitVector(typeId).getObject(value.getOffset(i));
261+
break;
262+
case 2: // int64
263+
object = value.getBigIntVector(typeId).getObject(value.getOffset(i));
264+
break;
265+
case 3: // int32
266+
object = value.getIntVector(typeId).getObject(value.getOffset(i));
267+
break;
268+
default:
269+
throw new AssertionError("Decoding SqlInfo of type code " + typeId);
270+
}
271+
infoValues.put(code, object);
272+
}
273+
}
274+
return infoValues;
275+
}
207276
}

0 commit comments

Comments
 (0)