diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index 8d5757dcd3b4..82b0b87439bd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -53,7 +53,6 @@ */ public class TableWriteImpl implements InnerTableWrite, Restorable>> { - private final RowType rowType; private final FileStoreWrite write; private final KeyAndBucketExtractor keyAndBucketExtractor; private final RecordExtractor recordExtractor; @@ -62,8 +61,9 @@ public class TableWriteImpl implements InnerTableWrite, Restorable recordExtractor, @Nullable RowKindGenerator rowKindGenerator, @Nullable RowKindFilter rowKindFilter) { - this.rowType = rowType; + this.writeType = rowType; this.write = write; this.keyAndBucketExtractor = keyAndBucketExtractor; this.recordExtractor = recordExtractor; @@ -114,6 +114,13 @@ public TableWriteImpl withIOManager(IOManager ioManager) { @Override public TableWriteImpl withWriteType(RowType writeType) { write.withWriteType(writeType); + this.writeType = writeType; + List notNullColumnNames = + writeType.getFields().stream() + .filter(field -> !field.type().isNullable()) + .map(DataField::name) + .collect(Collectors.toList()); + this.notNullFieldIndex = writeType.getFieldIndices(notNullColumnNames); return this; } @@ -188,7 +195,7 @@ public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception { private void checkNullability(InternalRow row) { for (int idx : notNullFieldIndex) { if (row.isNullAt(idx)) { - String columnName = rowType.getFields().get(idx).name(); + String columnName = writeType.getFields().get(idx).name(); throw new RuntimeException( String.format("Cannot write null to non-null column(%s)", columnName)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java index 65cd919e63e5..9b2655f34087 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java @@ -600,6 +600,47 @@ public void testWithRowIds() throws Exception { assertThat(i.get()).isEqualTo(2); } + @Test + public void testNonNullColumn() throws Exception { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.STRING().copy(false)); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + + Schema schema = schemaBuilder.build(); + + catalog.createTable(identifier(), schema, true); + Table table = catalog.getTable(identifier()); + BatchWriteBuilder builder = table.newBatchWriteBuilder(); + BatchTableWrite write = builder.newWrite(); + write.write(GenericRow.of(1, BinaryString.fromString("a"), BinaryString.fromString("b"))); + BatchTableCommit commit = builder.newCommit(); + List commitables = write.prepareCommit(); + commit.commit(commitables); + + write = + builder.newWrite() + .withWriteType(schema.rowType().project(Collections.singletonList("f2"))); + write.write(GenericRow.of(BinaryString.fromString("c"))); + commit = builder.newCommit(); + commitables = write.prepareCommit(); + setFirstRowId(commitables, 0L); + commit.commit(commitables); + + ReadBuilder readBuilder = table.newReadBuilder(); + RecordReader reader = + readBuilder.newRead().createReader(readBuilder.newScan().plan()); + assertThat(reader).isInstanceOf(DataEvolutionFileReader.class); + reader.forEachRemaining( + r -> { + assertThat(r.getInt(0)).isEqualTo(1); + assertThat(r.getString(1).toString()).isEqualTo("a"); + assertThat(r.getString(2).toString()).isEqualTo("c"); + }); + } + protected Schema schemaDefault() { Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("f0", DataTypes.INT());