Skip to content

Conversation

@Kkkaneki-k
Copy link
Contributor

Purpose

This PR is about to support row id push down for spark, following #6483

Linked issue: None

Tests

org.apache.paimon.spark.sql.RowIdPushDownTestBase

API and Format

Documentation

@JingsongLi JingsongLi closed this Dec 2, 2025
@JingsongLi JingsongLi reopened this Dec 2, 2025
@JingsongLi
Copy link
Contributor

It seems your modification breaks some tests.

@Kkkaneki-k
Copy link
Contributor Author

It seems your modification breaks some tests.

Yes, I am working on it.

@Kkkaneki-k
Copy link
Contributor Author

I have discovered some issues in paimon core and opened an issue #6747 to describe them. These issues will affect spark’s ability to read data based on rowIds, so I will temporarily move this PR to draft status.

@Kkkaneki-k Kkkaneki-k marked this pull request as draft December 4, 2025 14:49
@JingsongLi
Copy link
Contributor

Thanks @Kkkaneki-k

@JingsongLi
Copy link
Contributor

Hello, now row id pushdown only work for data evolution table. Can you continue to finish this?

@JingsongLi
Copy link
Contributor

@Kkkaneki-k

@Kkkaneki-k
Copy link
Contributor Author

Hello, now row id pushdown only work for data evolution table. Can you continue to finish this?

OK, I will continue to complete this PR.

@Kkkaneki-k Kkkaneki-k force-pushed the spark-rowid-push-down branch from 75220e5 to 44bfadf Compare December 12, 2025 14:53
@Kkkaneki-k Kkkaneki-k marked this pull request as ready for review December 12, 2025 15:44
@Kkkaneki-k
Copy link
Contributor Author

@JingsongLi I've finished the modifications. PTAL if you have some time, thanks!

@JingsongLi
Copy link
Contributor

@Kkkaneki-k Can you rebase latest master?

+ "By default is the number of processors available to the Java virtual machine.");

public static final ConfigOption<Boolean> ROW_ID_PUSH_DOWN_ENABLED =
key("row-id-push-down.enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this option, it is useless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this option, it is useless.

I think this option is necessary, it disables rowId pushdown for non-data-evolution tables. Currently, due to the issue in #6747, rowId pushdown should not be enabled for non-data-evolution tables.

table: InnerTable,
requiredSchema: StructType,
filters: Seq[Predicate],
override val rowIds: Seq[JLong],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rowIds -> pushedRowIds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rowIds -> pushedRowIds

done

* AND _ROW_ID IN (1, 2)}).
* </ul>
*/
public class RowIdPredicateVisitor implements PredicateVisitor<Set<Long>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return Set<Range>? In this way, we can support min max pushdown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return Set<Range>? In this way, we can support min max pushdown.

done

@Kkkaneki-k Kkkaneki-k force-pushed the spark-rowid-push-down branch from 44bfadf to e58cf42 Compare December 18, 2025 08:36
@Kkkaneki-k Kkkaneki-k closed this Dec 18, 2025
@Kkkaneki-k Kkkaneki-k reopened this Dec 18, 2025
@Kkkaneki-k Kkkaneki-k force-pushed the spark-rowid-push-down branch 5 times, most recently from 2971128 to 92d285e Compare December 18, 2025 15:27
false,
Collections.singletonList(new DataField(-1, ROW_ID.name(), DataTypes.BIGINT())))
val converterWithRowId = new SparkFilterConverter(rowTypeWithRowId)
val newPredicate = converterWithRowId.convertIgnoreFailure(filter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass the filter containing RowId into PaimonScanBuilder, and let Paimon Core parse it and generate the corresponding range itself? It feels redundant to reimplement this logic in every engine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass the filter containing RowId into PaimonScanBuilder, and let Paimon Core parse it and generate the corresponding range itself? It feels redundant to reimplement this logic in every engine.

Thanks for your review! I've carefully considered your suggestion and think that this change might introduce the following two problems:

  1. When a filter containing _ROW_ID cannot be consumed, we need to return it to the engine as a post-scan filter. This may be difficult to achieve if Paimon Core itself consumes it and generates the corresponding range.
  2. Currently, ReadBuilder requires separate inputs of filters containing _ROW_ID and filters without _ROW_ID during the build process. This means we need to differentiate between these two types of filters in the engine and input them separately (unless we modify ReadBuilder to handle this differentiation automatically during the build process).

Copy link
Contributor

@Zouxxyy Zouxxyy Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It should pass the filter containing _ROW_ID as pushedDataFilters to Paimon, while still adding it to the postFilter for Spark to handle. Therefore, almost no changes are needed in Paimon's Spark connector—except possibly update new SparkFilterConverter(rowType) with requiredSchema or rowType with row id.

  2. Yes, that’s exactly where I intend to put it. CC @JingsongLi

ReadBuilderImpl:

private InnerTableScan configureScan(InnerTableScan scan) {
  scan.withFilter(filter)
          .withReadType(readType)
          .withPartitionFilter(partitionFilter)
          // calculate rowRanges from filter
          .withRowRanges(rowRanges);
}

I believe users would much prefer writing filters that include _ROW_ID when using the ReadBuilder API, rather than having to understand what List rowRanges is and convert to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It should pass the filter containing _ROW_ID as pushedDataFilters to Paimon, while still adding it to the postFilter for Spark to handle. Therefore, almost no changes are needed in Paimon's Spark connector—except possibly update new SparkFilterConverter(rowType) with requiredSchema or rowType with row id.
  2. Yes, that’s exactly where I intend to put it. CC @JingsongLi

ReadBuilderImpl:

private InnerTableScan configureScan(InnerTableScan scan) {
  scan.withFilter(filter)
          .withReadType(readType)
          .withPartitionFilter(partitionFilter)
          // calculate rowRanges from filter
          .withRowRanges(rowRanges);
}

I believe users would much prefer writing filters that include _ROW_ID when using the ReadBuilder API, rather than having to understand what List rowRanges is and convert to it.

Thanks for your reply! I think your point is correct, and I will modify my code based on your suggestions.

@Kkkaneki-k Kkkaneki-k force-pushed the spark-rowid-push-down branch 2 times, most recently from 7268230 to 976a4df Compare December 23, 2025 16:09
@Kkkaneki-k
Copy link
Contributor Author

@Zouxxyy I've modified my code based on your suggestions. PTAL if you have some time, thanks!

@Kkkaneki-k Kkkaneki-k requested a review from Zouxxyy December 24, 2025 06:25
@JingsongLi JingsongLi force-pushed the spark-rowid-push-down branch from 976a4df to 4b34e36 Compare December 27, 2025 10:36
Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit b6d2302 into apache:master Dec 27, 2025
24 checks passed
jerry-024 added a commit to jerry-024/paimon that referenced this pull request Dec 29, 2025
* upstream/master: (51 commits)
  [test] Fix unstable test: handle MiniCluster shutdown gracefully in collect method (apache#6913)
  [python] fix ray dataset not lazy loading issue when parallelism = 1 (apache#6916)
  [core] Refactor ExternalPathProviders abstraction
  [spark] fix Merge Into unstable tests (apache#6912)
  [core] Enable Entropy Inject for data file path to prevent being throttled by object storage (apache#6832)
  [iceberg] support millisecond timestamps in iceberg compatibility mode (apache#6352)
  [spark] Handle NPE for pushdown aggregate when a datasplit has a null max/min value (apache#6611)
  [test] Fix unstable case testLimitPushDown
  [core] Refactor row id pushdown to DataEvolutionFileStoreScan
  [spark] paimon-spark supports row id push down (apache#6697)
  [spark] Support compact_database procedure (apache#6328) (apache#6910)
  [lucene] Fix row count in IndexManifestEntry
  [test] Remove unstable test: AppendTableITCase.testFlinkMemoryPool
  [core] Refactor Global index writer and reader for Btree
  [core] Minor refactor to magic number into footer
  [core] Support btree global index in paimon-common (apache#6869)
  [spark] Optimize compact for data-evolution table, commit multiple times to avoid out of memory (apache#6907)
  [rest] Add fromSnapshot to rollback (apache#6905)
  [test] Fix unstable RowTrackingTestBase test
  [core] Simplify FileStoreCommitImpl to extract some classes (apache#6904)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants