Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions .github/workflows/docker-tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: Docker Tests
on:
pull_request:
paths:
- 'java/**/Dockerfile'
- '.github/workflows/docker-tests.yaml'

env:
RUST_BUILD_IMAGE_X86_64: "ghcr.io/gchq/sleeper-rust-builder-x86_64-sccache:latest"
RUST_BUILD_IMAGE_AARCH64: "ghcr.io/gchq/sleeper-rust-builder-aarch64-sccache:latest"
SCCACHE_GHA_ENABLED: "true"
SSCACHE_CACHE_SIZE: "2G"

jobs:
build:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
with:
driver-opts: network=host
- uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'corretto'
- uses: mozilla-actions/[email protected]
- name: Cache dependencies
uses: actions/cache@v4
with:
path: ${{ runner.temp }}/.m2/repository
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
restore-keys: |
${{ runner.os }}-maven-
- name: Resolve dependencies
run: mvn de.qaware.maven:go-offline-maven-plugin:resolve-dependencies -Dmaven.repo.local=${{ runner.temp }}/.m2/repository
working-directory: ./java
- name: Build with Maven
run: mvn --batch-mode install -Pquick -T 1C -Dmaven.repo.local=${{ runner.temp }}/.m2/repository
working-directory: ./java
- name: Copy build output
working-directory: ./scripts/build
run: ./copyBuildOutput.sh
- name: Build compaction task Docker image
working-directory: ./scripts/docker/compaction-job-execution
run: docker build -t compaction-job-execution:test .
- name: Docker image tests
working-directory: ./java
run: mvn verify -PsystemTest -DskipRust=true -pl clients "-DrunIT=*DockerImageST" -Dmaven.repo.local=${{ runner.temp }}/.m2/repository
4 changes: 4 additions & 0 deletions NOTICES
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,10 @@ AspectJ (org.aspectj:aspectjrt:1.*, org.aspectj:aspectjtools:1.*)

- Eclipse Public License (EPL) 2.0

Pty4J (org.jetbrains.pty4j:pty4j:0.*)

- Eclipse Public License (EPL) 1.0

AspectJ Maven Plugin (dev.aspectj:aspectj-maven-plugin:1.*)

- MIT License
Expand Down
14 changes: 13 additions & 1 deletion java/bulk-export/bulk-export-task-execution/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM amazoncorretto:17-al2023-headless
FROM ubuntu:24.04

# Install Amazon Corretto JDK
RUN apt-get update && apt-get install -y \
ca-certificates \
curl \
gnupg
RUN curl -fsSL https://apt.corretto.aws/corretto.key | gpg --dearmor --yes -o /etc/apt/keyrings/corretto-keyring.gpg
RUN echo "deb [signed-by=/etc/apt/keyrings/corretto-keyring.gpg] https://apt.corretto.aws stable main" \
| tee /etc/apt/sources.list.d/corretto.list > /dev/null
RUN apt-get update && apt-get install -y \
java-17-amazon-corretto-jdk \
&& rm -rf /var/lib/apt/lists/*

COPY bulk-export-task-execution.jar /bulk-export-task-execution.jar
COPY run.sh /run.sh
Expand Down
4 changes: 4 additions & 0 deletions java/clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@
<artifactId>Java-WebSocket</artifactId>
<version>${java-websocket.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.pty4j</groupId>
<artifactId>pty4j</artifactId>
</dependency>
<!-- Sleeper dependencies -->
<dependency>
<groupId>sleeper</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package sleeper.clients.util.command;

import com.pty4j.PtyProcessBuilder;

import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -91,4 +93,10 @@ public ProcessBuilder toProcessBuilderInheritIO(int index, int pipelineSize) {
}
return builder;
}

public PtyProcessBuilder toPtyProcessBuilder() {
PtyProcessBuilder builder = new PtyProcessBuilder(command);
builder.setEnvironment(envVars);
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package sleeper.clients.util.command;

import com.pty4j.PtyProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -55,10 +56,30 @@ public static CommandPipelineResult runCommandLogOutput(CommandPipeline pipeline
return result;
}

public static void runCommandLogOutputWithPty(Command command) throws IOException, InterruptedException {
LOGGER.info("Running command: {}", command);
PtyProcess process = command.toPtyProcessBuilder().start();
CompletableFuture<Void> logOutput = logOutput(command.toPtyProcessBuilder().start());
int exitCode = process.waitFor();
logOutput.join();
LOGGER.info("Exit code: {}", exitCode);
if (exitCode != 0) {
throw new CommandFailedException(pipeline(command), exitCode);
}
}

private static CompletableFuture<Void> logOutput(Process process) {
return logOutput(process.getInputStream(), process.getErrorStream());
}

private static CompletableFuture<Void> logOutput(PtyProcess process) {
return logOutput(process.getInputStream(), process.getErrorStream());
}

private static CompletableFuture<Void> logOutput(InputStream inputStream, InputStream errorStream) {
return CompletableFuture.allOf(
CompletableFuture.runAsync(() -> logTo(process.getInputStream(), LOGGER::info)),
CompletableFuture.runAsync(() -> logTo(process.getErrorStream(), LOGGER::error)));
CompletableFuture.runAsync(() -> logTo(inputStream, LOGGER::info)),
CompletableFuture.runAsync(() -> logTo(errorStream, LOGGER::error)));
}

private static void logTo(InputStream stream, Consumer<String> logLine) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright 2022-2025 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.clients.images;

import org.apache.parquet.hadoop.ParquetWriter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

import sleeper.clients.util.command.CommandUtils;
import sleeper.compaction.core.job.CompactionJob;
import sleeper.compaction.core.job.CompactionJobFactory;
import sleeper.compaction.core.job.CompactionJobSerDe;
import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.configuration.properties.S3TableProperties;
import sleeper.configuration.table.index.DynamoDBTableIndexCreator;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.model.DataEngine;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.row.Row;
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.LongType;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.FileReferenceFactory;
import sleeper.core.statestore.StateStore;
import sleeper.core.util.EnvironmentUtils;
import sleeper.localstack.test.LocalStackTestBase;
import sleeper.parquet.row.ParquetReaderIterator;
import sleeper.parquet.row.ParquetRowReaderFactory;
import sleeper.parquet.row.ParquetRowWriterFactory;
import sleeper.statestore.StateStoreFactory;
import sleeper.statestore.transactionlog.TransactionLogStateStoreCreator;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.clients.util.command.Command.envAndCommand;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET;
import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_TASK_DELAY_BEFORE_RETRY_IN_SECONDS;
import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES;
import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS;
import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_TASK_WAIT_TIME_IN_SECONDS;
import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_TRACKER_ENABLED;
import static sleeper.core.properties.instance.TableDefaultProperty.DEFAULT_DATA_ENGINE;
import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties;
import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties;
import static sleeper.core.schema.SchemaTestHelper.createSchemaWithKey;
import static sleeper.core.statestore.testutils.StateStoreUpdatesWrapper.update;

public class CompactionTaskDockerImageST extends LocalStackTestBase {

String dockerImage = "compaction-job-execution:test";
InstanceProperties instanceProperties = createTestInstanceProperties();
Schema schema = createSchemaWithKey("key", new LongType());
TableProperties tableProperties = createTestTableProperties(instanceProperties, schema);
StateStore stateStore;

@BeforeEach
void setUp() {
createBucket(instanceProperties.get(CONFIG_BUCKET));
createBucket(instanceProperties.get(DATA_BUCKET));
instanceProperties.set(COMPACTION_JOB_QUEUE_URL, createSqsQueueGetUrl());
instanceProperties.setEnum(DEFAULT_DATA_ENGINE, DataEngine.DATAFUSION);
instanceProperties.set(COMPACTION_TRACKER_ENABLED, "false");
instanceProperties.set(COMPACTION_TASK_WAIT_TIME_IN_SECONDS, "0");
instanceProperties.set(COMPACTION_TASK_DELAY_BEFORE_RETRY_IN_SECONDS, "0");
instanceProperties.set(COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS, "0");
instanceProperties.set(COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES, "1");
S3InstanceProperties.saveToS3(s3Client, instanceProperties);
DynamoDBTableIndexCreator.create(dynamoClient, instanceProperties);
S3TableProperties.createStore(instanceProperties, s3Client, dynamoClient).save(tableProperties);
new TransactionLogStateStoreCreator(instanceProperties, dynamoClient).create();
stateStore = new StateStoreFactory(instanceProperties, s3Client, dynamoClient).getStateStore(tableProperties);
update(stateStore).initialise(tableProperties);
}

@Test
void shouldRunDataFusionCompactionWithDockerImage() throws Exception {
// Given
List<Row> rows = List.of(
new Row(Map.of("key", 10L)),
new Row(Map.of("key", 20L)));
FileReference file = addFileAtRoot("test", rows);
CompactionJob job = sendCompactionJobAtRoot("test-job", List.of(file));
update(stateStore).assignJobId("test-job", List.of(file));

// When
runDockerImage();

// Then
assertThat(readOutputFile(job)).containsExactlyElementsOf(rows);
}

private void runDockerImage() throws Exception {
List<String> command = new ArrayList<>();
command.addAll(List.of("docker", "run", "--rm", "-it", "--network=host"));

Map<String, String> environment = EnvironmentUtils.createDefaultEnvironment(instanceProperties);
environment.put("AWS_ENDPOINT_URL", localStackContainer.getEndpoint().toString());
environment.keySet().forEach(variable -> command.addAll(List.of("--env", variable)));
environment.putAll(System.getenv());

command.addAll(List.of(dockerImage, instanceProperties.get(CONFIG_BUCKET)));

CommandUtils.runCommandLogOutputWithPty(envAndCommand(environment, command.toArray(String[]::new)));
}

private FileReference addFileAtRoot(String name, List<Row> rows) {
FileReference reference = fileFactory().rootFile(name, rows.size());
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(reference.getFilename());
try (ParquetWriter<Row> writer = ParquetRowWriterFactory.createParquetRowWriter(path, tableProperties, hadoopConf)) {
for (Row row : rows) {
writer.write(row);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
update(stateStore).addFile(reference);
return reference;
}

private CompactionJob sendCompactionJobAtRoot(String jobId, List<FileReference> files) {
CompactionJobFactory factory = new CompactionJobFactory(instanceProperties, tableProperties);
CompactionJob job = factory.createCompactionJob(jobId, files, "root");
CompactionJobSerDe serDe = new CompactionJobSerDe();
sqsClient.sendMessage(SendMessageRequest.builder()
.queueUrl(instanceProperties.get(COMPACTION_JOB_QUEUE_URL))
.messageBody(serDe.toJson(job))
.build());
return job;
}

private List<Row> readOutputFile(CompactionJob job) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(job.getOutputFile());
try (ParquetReaderIterator reader = new ParquetReaderIterator(
ParquetRowReaderFactory.parquetRowReaderBuilder(path, schema).withConf(hadoopConf).build())) {
List<Row> rows = new ArrayList<>();
reader.forEachRemaining(rows::add);
return rows;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private FileReferenceFactory fileFactory() {
return FileReferenceFactory.from(instanceProperties, tableProperties, stateStore);
}

}
35 changes: 35 additions & 0 deletions java/clients/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Copyright 2022-2025 Crown Copyright
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# These are set at runtime by the CDK by setting the system properties. You can change these by setting the
# Sleeper properties of the same name
sleeper.logging.level=INFO
sleeper.logging.root.level=INFO
sleeper.logging.apache.level=INFO
sleeper.logging.parquet.level=WARN
sleeper.logging.aws.level=INFO

log4j.rootLogger=${sleeper.logging.root.level}, consoleAppender
log4j.category.sleeper=${sleeper.logging.level}
log4j.category.sleeper.core.metrics.MetricsLogger=INFO
log4j.category.org.apache=${sleeper.logging.apache.level}
log4j.category.org.apache.parquet=${sleeper.logging.parquet.level}
log4j.category.com.amazonaws=${sleeper.logging.aws.level}
log4j.category.software.amazon=${sleeper.logging.aws.level}

log4j.appender.consoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.consoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.consoleAppender.layout.ConversionPattern=[%t] %c{3} %p %x - %m%n
Loading
Loading