diff --git a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java index ba51f8d3bc05..17928dd6faad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java @@ -46,6 +46,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.Supplier; @@ -85,9 +86,10 @@ public class RollingBlobFileWriter implements RollingFileWriter, DataFileMeta>> writerFactory; - private final PeojectedFileWriter< - RollingFileWriterImpl, List> - blobWriter; + private final Supplier< + PeojectedFileWriter< + RollingFileWriterImpl, List>> + blobWriterFactory; private final long targetFileSize; private final long blobTargetFileSize; @@ -96,6 +98,9 @@ public class RollingBlobFileWriter implements RollingFileWriter results; private PeojectedFileWriter, DataFileMeta> currentWriter; + private PeojectedFileWriter< + RollingFileWriterImpl, List> + blobWriter; private long recordCount = 0; private boolean closed = false; @@ -144,18 +149,19 @@ public RollingBlobFileWriter( statsDenseStore); // Initialize blob writer - this.blobWriter = - createBlobWriter( - fileIO, - schemaId, - blobType, - writeSchema, - pathFactory, - seqNumCounter, - fileSource, - asyncFileWrite, - statsDenseStore, - blobTargetFileSize); + this.blobWriterFactory = + () -> + createBlobWriter( + fileIO, + schemaId, + blobType, + writeSchema, + pathFactory, + seqNumCounter, + fileSource, + asyncFileWrite, + statsDenseStore, + blobTargetFileSize); } /** Creates a factory for normal data writers. */ @@ -265,6 +271,9 @@ public void write(InternalRow row) throws IOException { if (currentWriter == null) { currentWriter = writerFactory.get(); } + if (blobWriter == null) { + blobWriter = blobWriterFactory.get(); + } currentWriter.write(row); blobWriter.write(row); recordCount++; @@ -322,7 +331,10 @@ public void abort() { for (FileWriterAbortExecutor abortExecutor : closedWriters) { abortExecutor.abort(); } - blobWriter.abort(); + if (blobWriter != null) { + blobWriter.abort(); + blobWriter = null; + } } /** Checks if the current file should be rolled based on size and record count. */ @@ -369,8 +381,13 @@ private DataFileMeta closeMainWriter() throws IOException { /** Closes the blob writer and processes blob metadata with appropriate tags. */ private List closeBlobWriter() throws IOException { + if (blobWriter == null) { + return Collections.emptyList(); + } blobWriter.close(); - return blobWriter.result(); + List results = blobWriter.result(); + blobWriter = null; + return results; } /** Validates that the row counts match between main and blob files. */ diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java index ab37159648ef..334fe8d01535 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java @@ -57,7 +57,7 @@ public class BlobTableTest extends TableTestBase { public void testBasic() throws Exception { createTableDefault(); - commitDefault(writeDataDefault(100, 1)); + commitDefault(writeDataDefault(1000, 1)); AtomicInteger integer = new AtomicInteger(0); @@ -71,7 +71,7 @@ public void testBasic() throws Exception { assertThat(fieldGroups.size()).isEqualTo(2); assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); - assertThat(fieldGroups.get(1).files().size()).isEqualTo(8); + assertThat(fieldGroups.get(1).files().size()).isEqualTo(10); readDefault( row -> { @@ -81,7 +81,7 @@ public void testBasic() throws Exception { } }); - assertThat(integer.get()).isEqualTo(100); + assertThat(integer.get()).isEqualTo(1000); } @Test @@ -104,7 +104,7 @@ public void testWriteByInputStream() throws Exception { public void testMultiBatch() throws Exception { createTableDefault(); - commitDefault(writeDataDefault(100, 2)); + commitDefault(writeDataDefault(1000, 2)); AtomicInteger integer = new AtomicInteger(0); @@ -120,7 +120,7 @@ public void testMultiBatch() throws Exception { DataEvolutionSplitRead.splitFieldBunches(batch, file -> 0); assertThat(fieldGroups.size()).isEqualTo(2); assertThat(fieldGroups.get(0).files().size()).isEqualTo(1); - assertThat(fieldGroups.get(1).files().size()).isEqualTo(8); + assertThat(fieldGroups.get(1).files().size()).isEqualTo(10); } readDefault( @@ -129,7 +129,7 @@ public void testMultiBatch() throws Exception { assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes); } }); - assertThat(integer.get()).isEqualTo(200); + assertThat(integer.get()).isEqualTo(2000); } @Test @@ -179,6 +179,21 @@ public InternalRow next() { assertThat(i.get()).isEqualTo(1); } + @Test + public void testRolling() throws Exception { + createTableDefault(); + commitDefault(writeDataDefault(1025, 1)); + AtomicInteger integer = new AtomicInteger(0); + readDefault( + row -> { + integer.incrementAndGet(); + if (integer.get() % 50 == 0) { + assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes); + } + }); + assertThat(integer.get()).isEqualTo(1025); + } + protected Schema schemaDefault() { Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("f0", DataTypes.INT()); @@ -191,12 +206,13 @@ protected Schema schemaDefault() { } protected InternalRow dataDefault(int time, int size) { - return GenericRow.of(RANDOM.nextInt(), randomString(), new BlobData(blobBytes)); + return GenericRow.of( + RANDOM.nextInt(), BinaryString.fromBytes(randomBytes()), new BlobData(blobBytes)); } @Override protected byte[] randomBytes() { - byte[] binary = new byte[2 * 1024 * 1024]; + byte[] binary = new byte[2 * 1024 * 124]; RANDOM.nextBytes(binary); return binary; }