Skip to content

Commit cdddbd2

Browse files
committed
5409 Fix QueryClientIT
1 parent a3b09cf commit cdddbd2

File tree

2 files changed

+41
-45
lines changed

2 files changed

+41
-45
lines changed

java/clients/src/main/java/sleeper/clients/query/QueryClient.java

Lines changed: 38 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -68,45 +68,27 @@
6868
* Allows a user to run a query from the command line. An instance of this class cannot be used concurrently in multiple
6969
* threads, due to how query executors and state store objects are cached. This may be changed in a future version.
7070
*/
71-
public class QueryClient extends QueryCommandLineClient implements AutoCloseable {
71+
public class QueryClient extends QueryCommandLineClient {
7272

7373
private final ObjectFactory objectFactory;
7474
private final StateStoreProvider stateStoreProvider;
75-
private final ExecutorService executorService = Executors.newFixedThreadPool(30);
76-
private final DataFusionAwsConfig awsConfig = DataFusionAwsConfig.getDefault();
77-
private final BufferAllocator allocator = new RootAllocator();
78-
private final FFIContext ffiContext = DataFusionLeafPartitionRowRetriever.createContext();
75+
private final ExecutorService executorService;
76+
private final DataFusionAwsConfig awsConfig;
77+
private final BufferAllocator allocator;
78+
private final FFIContext ffiContext;
7979
private final Map<String, QueryExecutor> cachedQueryExecutors = new HashMap<>();
8080

81-
public QueryClient(InstanceProperties instanceProperties, S3Client s3Client, DynamoDbClient dynamoClient,
82-
ConsoleInput in, ConsoleOutput out) throws ObjectFactoryException {
83-
this(instanceProperties, s3Client, dynamoClient, in, out,
84-
new S3UserJarsLoader(instanceProperties, s3Client, makeTemporaryDirectory()).buildObjectFactory(),
85-
StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoClient));
86-
}
87-
88-
public QueryClient(InstanceProperties instanceProperties, S3Client s3Client, DynamoDbClient dynamoClient,
89-
ConsoleInput in, ConsoleOutput out, ObjectFactory objectFactory, StateStoreProvider stateStoreProvider) {
90-
this(instanceProperties, new DynamoDBTableIndex(instanceProperties, dynamoClient),
91-
S3TableProperties.createProvider(instanceProperties, s3Client, dynamoClient),
92-
in, out, objectFactory, stateStoreProvider);
93-
}
94-
95-
public QueryClient(InstanceProperties instanceProperties, TableIndex tableIndex, TablePropertiesProvider tablePropertiesProvider,
96-
ConsoleInput in, ConsoleOutput out, ObjectFactory objectFactory, StateStoreProvider stateStoreProvider) {
81+
public QueryClient(
82+
InstanceProperties instanceProperties, TableIndex tableIndex, TablePropertiesProvider tablePropertiesProvider,
83+
ConsoleInput in, ConsoleOutput out, ObjectFactory objectFactory, StateStoreProvider stateStoreProvider,
84+
ExecutorService executorService, DataFusionAwsConfig awsConfig, BufferAllocator allocator, FFIContext ffiContext) {
9785
super(instanceProperties, tableIndex, tablePropertiesProvider, in, out);
9886
this.objectFactory = objectFactory;
9987
this.stateStoreProvider = stateStoreProvider;
100-
}
101-
102-
public static Path makeTemporaryDirectory() {
103-
try {
104-
Path tempDir = Files.createTempDirectory(null);
105-
tempDir.toFile().deleteOnExit();
106-
return tempDir;
107-
} catch (IOException e) {
108-
throw new UncheckedIOException(e);
109-
}
88+
this.executorService = executorService;
89+
this.awsConfig = awsConfig;
90+
this.allocator = allocator;
91+
this.ffiContext = ffiContext;
11092
}
11193

11294
@Override
@@ -149,14 +131,6 @@ protected void submitQuery(TableProperties tableProperties, Query query) {
149131
out.println("Query took " + LoggedDuration.withFullOutput(startTime, Instant.now()) + " to return " + count + " rows");
150132
}
151133

152-
@Override
153-
public void close() {
154-
try (allocator; ffiContext) {
155-
} finally {
156-
executorService.shutdown();
157-
}
158-
}
159-
160134
private CloseableIterator<Row> runQuery(Query query) throws QueryException {
161135
QueryExecutor queryExecutor = cachedQueryExecutors.get(query.getTableName());
162136
return queryExecutor.execute(query);
@@ -168,13 +142,33 @@ public static void main(String[] args) throws ObjectFactoryException, Interrupte
168142
}
169143
String instanceId = args[0];
170144

145+
ExecutorService executorService = Executors.newFixedThreadPool(30);
171146
try (S3Client s3Client = buildAwsV2Client(S3Client.builder());
172147
DynamoDbClient dynamoClient = buildAwsV2Client(DynamoDbClient.builder());
173-
QueryClient queryClient = new QueryClient(
174-
S3InstanceProperties.loadGivenInstanceId(s3Client, instanceId),
175-
s3Client, dynamoClient,
176-
new ConsoleInput(System.console()), new ConsoleOutput(System.out))) {
177-
queryClient.run();
148+
BufferAllocator allocator = new RootAllocator();
149+
FFIContext ffiContext = DataFusionLeafPartitionRowRetriever.createContext()) {
150+
InstanceProperties instanceProperties = S3InstanceProperties.loadGivenInstanceId(s3Client, instanceId);
151+
new QueryClient(
152+
instanceProperties,
153+
new DynamoDBTableIndex(instanceProperties, dynamoClient),
154+
S3TableProperties.createProvider(instanceProperties, s3Client, dynamoClient),
155+
new ConsoleInput(System.console()), new ConsoleOutput(System.out),
156+
new S3UserJarsLoader(instanceProperties, s3Client, makeTemporaryDirectory()).buildObjectFactory(),
157+
StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoClient),
158+
executorService, DataFusionAwsConfig.getDefault(), allocator, ffiContext)
159+
.run();
160+
} finally {
161+
executorService.shutdown();
162+
}
163+
}
164+
165+
private static Path makeTemporaryDirectory() {
166+
try {
167+
Path tempDir = Files.createTempDirectory(null);
168+
tempDir.toFile().deleteOnExit();
169+
return tempDir;
170+
} catch (IOException e) {
171+
throw new UncheckedIOException(e);
178172
}
179173
}
180174
}

java/clients/src/test/java/sleeper/clients/query/QueryClientIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.Iterator;
4949
import java.util.List;
5050
import java.util.Map;
51+
import java.util.concurrent.ForkJoinPool;
5152
import java.util.stream.Collectors;
5253
import java.util.stream.LongStream;
5354

@@ -332,7 +333,8 @@ private TableProperties createTable(String tableName, Schema schema) {
332333
private void runQueryClient() throws Exception {
333334
new QueryClient(instanceProperties, tableIndex, new FixedTablePropertiesProvider(tablePropertiesList),
334335
in.consoleIn(), out.consoleOut(), ObjectFactory.noUserJars(),
335-
InMemoryTransactionLogStateStore.createProvider(instanceProperties, transactionLogs))
336+
InMemoryTransactionLogStateStore.createProvider(instanceProperties, transactionLogs),
337+
ForkJoinPool.commonPool(), null, null, null)
336338
.run();
337339
}
338340

0 commit comments

Comments
 (0)