Skip to content

Commit 7ab955a

Browse files
author
bosiew.tian
committed
[core] Support reading sequence_number in AuditLogTable and BinlogTable
1 parent 816a76f commit 7ab955a

File tree

21 files changed

+922
-238
lines changed

21 files changed

+922
-238
lines changed

docs/content/concepts/system-tables.md

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ SELECT * FROM my_table$options;
109109

110110
### Audit log Table
111111

112-
If you need to audit the changelog of the table, you can use the `audit_log` system table. Through `audit_log` table, you can get the `rowkind` column when you get the incremental data of the table. You can use this column for
112+
If you need to audit the changelog of the table, you can use the `audit_log` system table. Through `audit_log` table, you can get the `rowkind` column and `_SEQUENCE_NUMBER` column when you get the incremental data of the table. You can use this column for
113113
filtering and other operations to complete the audit.
114114

115115
There are four values for `rowkind`:
@@ -123,15 +123,15 @@ There are four values for `rowkind`:
123123
SELECT * FROM my_table$audit_log;
124124

125125
/*
126-
+------------------+-----------------+-----------------+
127-
| rowkind | column_0 | column_1 |
128-
+------------------+-----------------+-----------------+
129-
| +I | ... | ... |
130-
+------------------+-----------------+-----------------+
131-
| -U | ... | ... |
132-
+------------------+-----------------+-----------------+
133-
| +U | ... | ... |
134-
+------------------+-----------------+-----------------+
126+
+------------------+------------------+-----------------+-----------------+
127+
| rowkind | _SEQUENCE_NUMBER | column_0 | column_1 |
128+
+------------------+------------------+-----------------+-----------------+
129+
| +I | 0 | ... | ... |
130+
+------------------+------------------+-----------------+-----------------+
131+
| -U | 0 | ... | ... |
132+
+------------------+------------------+-----------------+-----------------+
133+
| +U | 1 | ... | ... |
134+
+------------------+------------------+-----------------+-----------------+
135135
3 rows in set
136136
*/
137137
```
@@ -146,15 +146,15 @@ Currently, the binlog table is unable to display Flink's computed columns.
146146
SELECT * FROM T$binlog;
147147

148148
/*
149-
+------------------+----------------------+-----------------------+
150-
| rowkind | column_0 | column_1 |
151-
+------------------+----------------------+-----------------------+
152-
| +I | [col_0] | [col_1] |
153-
+------------------+----------------------+-----------------------+
154-
| +U | [col_0_ub, col_0_ua] | [col_1_ub, col_1_ua] |
155-
+------------------+----------------------+-----------------------+
156-
| -D | [col_0] | [col_1] |
157-
+------------------+----------------------+-----------------------+
149+
+------------------+------------------+----------------------+-----------------------+
150+
| rowkind | _SEQUENCE_NUMBER | column_0 | column_1 |
151+
+------------------+------------------+----------------------+-----------------------+
152+
| +I | 0 | [col_0] | [col_1] |
153+
+------------------+------------------+----------------------+-----------------------+
154+
| +U | 1 | [col_0_ub, col_0_ua] | [col_1_ub, col_1_ua] |
155+
+------------------+------------------+----------------------+-----------------------+
156+
| -D | 2 | [col_0] | [col_1] |
157+
+------------------+------------------+----------------------+-----------------------+
158158
*/
159159
```
160160

paimon-core/src/main/java/org/apache/paimon/operation/MergeFileSplitRead.java

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.paimon.table.source.ChainSplit;
5151
import org.apache.paimon.table.source.DataSplit;
5252
import org.apache.paimon.table.source.DeletionFile;
53+
import org.apache.paimon.table.source.KeyValueSystemFieldsRecordReader;
5354
import org.apache.paimon.table.source.Split;
5455
import org.apache.paimon.types.DataField;
5556
import org.apache.paimon.types.RowType;
@@ -62,6 +63,7 @@
6263
import java.io.IOException;
6364
import java.util.ArrayList;
6465
import java.util.Arrays;
66+
import java.util.Collections;
6567
import java.util.Comparator;
6668
import java.util.List;
6769
import java.util.Set;
@@ -97,6 +99,11 @@ public class MergeFileSplitRead implements SplitRead<KeyValue> {
9799
@Nullable private int[][] outerProjection;
98100
@Nullable private VariantAccessInfo[] variantAccess;
99101

102+
private List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor> systemFieldExtractors =
103+
Collections.emptyList();
104+
105+
@Nullable private int[] projection = null;
106+
100107
private boolean forceKeepDelete = false;
101108

102109
public MergeFileSplitRead(
@@ -137,18 +144,31 @@ public MergeFileSplitRead withReadKeyType(RowType readKeyType) {
137144
return this;
138145
}
139146

147+
public List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor> getSystemFieldExtractors() {
148+
return systemFieldExtractors;
149+
}
150+
151+
@Nullable
152+
public int[] getProjection() {
153+
return projection;
154+
}
155+
140156
@Override
141157
public MergeFileSplitRead withReadType(RowType readType) {
158+
this.systemFieldExtractors = collectSystemFieldExtractors(readType);
159+
this.projection = createProjection(readType);
160+
142161
// todo: replace projectedFields with readType
143162
RowType tableRowType = tableSchema.logicalRowType();
163+
List<String> fieldNames = tableSchema.fieldNames();
144164
int[][] projectedFields =
145165
Arrays.stream(tableRowType.getFieldIndices(readType.getFieldNames()))
166+
.filter(i -> i >= 0) // Filter out system fields (index = -1)
146167
.mapToObj(i -> new int[] {i})
147168
.toArray(int[][]::new);
148169
int[][] newProjectedFields = projectedFields;
149170
if (sequenceFields.size() > 0) {
150171
// make sure projection contains sequence fields
151-
List<String> fieldNames = tableSchema.fieldNames();
152172
List<String> projectedNames = Projection.of(projectedFields).project(fieldNames);
153173
int[] lackFields =
154174
sequenceFields.stream()
@@ -408,4 +428,68 @@ public UserDefinedSeqComparator createUdsComparator() {
408428
return UserDefinedSeqComparator.create(
409429
readerFactoryBuilder.readValueType(), sequenceFields, sequenceOrder);
410430
}
431+
432+
/**
433+
* Collects system field extractors for the requested read type.
434+
*
435+
* @param readType the requested read type (may contain system fields)
436+
* @return list of extractors for system fields present in readType
437+
*/
438+
private List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor>
439+
collectSystemFieldExtractors(RowType readType) {
440+
if (readType == null) {
441+
return Collections.emptyList();
442+
}
443+
444+
List<KeyValueSystemFieldsRecordReader.SystemFieldExtractor> extractors = new ArrayList<>();
445+
for (String fieldName : readType.getFieldNames()) {
446+
KeyValueSystemFieldsRecordReader.SystemFieldExtractor extractor =
447+
KeyValueSystemFieldsRecordReader.getExtractor(fieldName);
448+
if (extractor != null) {
449+
extractors.add(extractor);
450+
}
451+
}
452+
return extractors;
453+
}
454+
455+
/**
456+
* Creates a projection array to reorder fields from natural order to requested order.
457+
*
458+
* <p>Example: readType = [pt, rowkind, col1], systemFieldExtractors = [rowkind] Natural order:
459+
* [rowkind(0), pt(1), col1(2)] (physical fields pt, col1 in readType order) Requested order:
460+
* [pt, rowkind, col1] Projection: [1, 0, 2]
461+
*
462+
* @param readType the requested read type (may contain system fields)
463+
* @return projection array, or null if fields are already in natural order
464+
*/
465+
@Nullable
466+
private int[] createProjection(RowType readType) {
467+
if (readType == null || systemFieldExtractors.isEmpty()) {
468+
return null;
469+
}
470+
471+
List<String> readFieldNames = readType.getFieldNames();
472+
int[] projection = new int[readFieldNames.size()];
473+
// System fields are first in natural order
474+
int systemIdx = 0;
475+
// Physical fields follow system fields in natural order
476+
int physicalIdx = systemFieldExtractors.size();
477+
boolean needsProjection = false;
478+
479+
for (int i = 0; i < readFieldNames.size(); i++) {
480+
String fieldName = readFieldNames.get(i);
481+
// Check if it's a system field
482+
if (KeyValueSystemFieldsRecordReader.getExtractor(fieldName) != null) {
483+
projection[i] = systemIdx++;
484+
} else {
485+
projection[i] = physicalIdx++;
486+
}
487+
488+
if (projection[i] != i) {
489+
needsProjection = true;
490+
}
491+
}
492+
493+
return needsProjection ? projection : null;
494+
}
411495
}

0 commit comments

Comments
 (0)