Skip to content

Commit 45e6004

Browse files
sezrubyEunjin Song (from Dev Box)
authored andcommitted
CDC reader optimization
1 parent 69c9545 commit 45e6004

File tree

9 files changed

+341
-55
lines changed

9 files changed

+341
-55
lines changed

project/SparkMimaExcludes.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,13 @@ object SparkMimaExcludes {
8989
// Changes in 4.0.0
9090
ProblemFilters.exclude[IncompatibleResultTypeProblem]("io.delta.tables.DeltaTable.improveUnsupportedOpError"),
9191
ProblemFilters.exclude[IncompatibleResultTypeProblem]("io.delta.tables.DeltaMergeBuilder.improveUnsupportedOpError"),
92-
ProblemFilters.exclude[IncompatibleResultTypeProblem]("io.delta.tables.DeltaMergeBuilder.execute")
92+
ProblemFilters.exclude[IncompatibleResultTypeProblem]("io.delta.tables.DeltaMergeBuilder.execute"),
9393

94+
// CDC Optimization
95+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.delta.sources.DeltaSQLConfBase.org$apache$spark$sql$delta$sources$DeltaSQLConfBase$_setter_$DELTA_CDF_BATCH_STATIC_READER_="),
96+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.delta.sources.DeltaSQLConfBase.DELTA_CDF_BATCH_STATIC_READER"),
97+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.delta.sources.DeltaSQLConfBase.org$apache$spark$sql$delta$sources$DeltaSQLConfBase$_setter_$DELTA_CDF_BATCH_STATIC_READER_START_ONLY_="),
98+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.delta.sources.DeltaSQLConfBase.DELTA_CDF_BATCH_STATIC_READER_START_ONLY")
9499
// scalastyle:on line.size.limit
95100
)
96101
}

sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceCMSuite.scala

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -96,46 +96,31 @@ class DeltaSharingDataSourceCMSuite
9696
deltaTableName: String,
9797
sharedTablePath: String,
9898
startingVersion: Int): Unit = {
99-
val schema = spark.read
100-
.format("deltaSharing")
101-
.option("responseFormat", "delta")
102-
.option("readChangeFeed", "true")
103-
.option("startingVersion", startingVersion)
104-
.load(sharedTablePath)
105-
.schema
106-
val expectedSchema = spark.read
107-
.format("delta")
108-
.option("readChangeFeed", "true")
109-
.option("startingVersion", startingVersion)
110-
.table(deltaTableName)
111-
.schema
112-
assert(expectedSchema == schema)
113-
114-
val deltaDf = spark.read
115-
.format("delta")
116-
.option("readChangeFeed", "true")
117-
.option("startingVersion", startingVersion)
118-
.table(deltaTableName)
119-
val sharingDf = spark.read
120-
.format("deltaSharing")
121-
.option("responseFormat", "delta")
122-
.option("readChangeFeed", "true")
123-
.option("startingVersion", startingVersion)
124-
.load(sharedTablePath)
12599
if (startingVersion <= 2) {
126100
Seq(BatchCDFSchemaEndVersion, BatchCDFSchemaLatest, BatchCDFSchemaLegacy).foreach { m =>
127101
withSQLConf(
128102
DeltaSQLConf.DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE.key ->
129103
m.name
130104
) {
131105
val deltaException = intercept[DeltaUnsupportedOperationException] {
106+
val deltaDf = spark.read
107+
.format("delta")
108+
.option("readChangeFeed", "true")
109+
.option("startingVersion", startingVersion)
110+
.table(deltaTableName)
132111
deltaDf.collect()
133112
}
134113
assert(
135114
deltaException.getMessage.contains("Retrieving table changes between") &&
136115
deltaException.getMessage.contains("failed because of an incompatible")
137116
)
138117
val sharingException = intercept[DeltaUnsupportedOperationException] {
118+
val sharingDf = spark.read
119+
.format("deltaSharing")
120+
.option("responseFormat", "delta")
121+
.option("readChangeFeed", "true")
122+
.option("startingVersion", startingVersion)
123+
.load(sharedTablePath)
139124
sharingDf.collect()
140125
}
141126
assert(
@@ -145,6 +130,32 @@ class DeltaSharingDataSourceCMSuite
145130
}
146131
}
147132
} else {
133+
val schema = spark.read
134+
.format("deltaSharing")
135+
.option("responseFormat", "delta")
136+
.option("readChangeFeed", "true")
137+
.option("startingVersion", startingVersion)
138+
.load(sharedTablePath)
139+
.schema
140+
val expectedSchema = spark.read
141+
.format("delta")
142+
.option("readChangeFeed", "true")
143+
.option("startingVersion", startingVersion)
144+
.table(deltaTableName)
145+
.schema
146+
assert(expectedSchema == schema)
147+
148+
val deltaDf = spark.read
149+
.format("delta")
150+
.option("readChangeFeed", "true")
151+
.option("startingVersion", startingVersion)
152+
.table(deltaTableName)
153+
val sharingDf = spark.read
154+
.format("deltaSharing")
155+
.option("responseFormat", "delta")
156+
.option("readChangeFeed", "true")
157+
.option("startingVersion", startingVersion)
158+
.load(sharedTablePath)
148159
checkAnswer(sharingDf, deltaDf)
149160
assert(sharingDf.count() > 0)
150161
}

spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.catalog.DeltaTableV2
3131
import org.apache.spark.sql.delta.catalog.IcebergTablePlaceHolder
3232
import org.apache.spark.sql.delta.commands._
3333
import org.apache.spark.sql.delta.commands.cdc.CDCReader
34+
import org.apache.spark.sql.delta.commands.cdc.CDCReader.DeltaCDFRelationAtAnalysis
3435
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
3536
import org.apache.spark.sql.delta.coordinatedcommits.{CatalogOwnedTableUtils, CoordinatedCommitsUtils}
3637
import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
@@ -321,6 +322,8 @@ class DeltaAnalysis(session: SparkSession)
321322

322323
case tc: TableChanges if tc.child.resolved => tc.toReadQuery
323324

325+
case LogicalRelation(cdc: DeltaCDFRelationAtAnalysis, _, _, _) =>
326+
cdc.getScanDf.queryExecution.optimizedPlan
324327

325328
// Here we take advantage of CreateDeltaTableCommand which takes a LogicalPlan for CTAS in order
326329
// to perform CLONE. We do this by passing the CloneTableCommand as the query in

spark/src/main/scala/org/apache/spark/sql/delta/DeltaTableValueFunctions.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
2424

2525
import org.apache.spark.sql.delta.catalog.DeltaTableV2
2626
import org.apache.spark.sql.delta.commands.cdc.CDCReader
27+
import org.apache.spark.sql.delta.commands.cdc.CDCReader.DeltaCDFRelationAtAnalysis
2728
import org.apache.spark.sql.delta.sources.DeltaDataSource
2829

2930
import org.apache.spark.sql.SparkSession
@@ -190,7 +191,13 @@ case class TableChanges(
190191
def toReadQuery: LogicalPlan = child.transformUp {
191192
case DataSourceV2Relation(d: DeltaTableV2, _, _, _, options) =>
192193
// withOptions empties the catalog table stats
193-
d.withOptions(options.asScala.toMap).toLogicalRelation
194+
val rel = d.withOptions(options.asScala.toMap).toLogicalRelation
195+
rel.relation match {
196+
case cr@DeltaCDFRelationAtAnalysis(_, _, _, _, _) =>
197+
cr.getScanDf.queryExecution.optimizedPlan
198+
case _ =>
199+
rel
200+
}
194201
case r: NamedRelation =>
195202
throw DeltaErrors.notADeltaTableException(fnName, r.name)
196203
case l: LogicalRelation =>

spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala

Lines changed: 105 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,73 @@ object CDCReader extends CDCReaderImpl
193193
}
194194
}
195195

196+
/**
197+
* A special BaseRelation wrapper for CDF reads for optimization at query planning stage.
198+
*/
199+
case class DeltaCDFRelationAtAnalysis(
200+
snapshotWithSchemaMode: SnapshotWithSchemaMode,
201+
sqlContext: SQLContext,
202+
catalogTableOpt: Option[CatalogTable],
203+
startingVersion: Option[Long],
204+
endingVersion: Option[Long]) extends BaseRelation {
205+
206+
private val deltaLog = snapshotWithSchemaMode.snapshot.deltaLog
207+
208+
private lazy val latestVersionOfTableDuringAnalysis: Long =
209+
deltaLog.update(catalogTableOpt = catalogTableOpt).version
210+
211+
/**
212+
* There may be a slight divergence here in terms of what schema is in the latest data vs what
213+
* schema we have captured during analysis, but this is an inherent limitation of Spark.
214+
*
215+
* However, if there are schema changes between analysis and execution, since we froze this
216+
* schema, our schema incompatibility checks will kick in during the scan so we will always
217+
* be safe - Although it is a notable caveat that user should be aware of because the CDC query
218+
* may break.
219+
*/
220+
private lazy val endingVersionForBatchSchema: Long = endingVersion.map { v =>
221+
// As defined in the method doc, if ending version is greater than the latest version, we will
222+
// just use the latest version to find the schema.
223+
latestVersionOfTableDuringAnalysis min v
224+
}.getOrElse {
225+
// Or if endingVersion is not specified, we just use the latest schema.
226+
latestVersionOfTableDuringAnalysis
227+
}
228+
229+
// The final snapshot whose schema is going to be used as this CDF relation's schema
230+
private val snapshotForBatchSchema: Snapshot = snapshotWithSchemaMode.schemaMode match {
231+
case BatchCDFSchemaEndVersion =>
232+
// Fetch the ending version and its schema
233+
deltaLog.getSnapshotAt(endingVersionForBatchSchema, catalogTableOpt = catalogTableOpt)
234+
case _ =>
235+
// Apply the default, either latest generated by DeltaTableV2 or specified by Time-travel
236+
// options.
237+
snapshotWithSchemaMode.snapshot
238+
}
239+
240+
override val schema: StructType = cdcReadSchema(snapshotForBatchSchema.metadata.schema)
241+
242+
def getScanDf: DataFrame = {
243+
startingVersion match {
244+
case Some(startingVer) =>
245+
val df = changesToBatchDF(
246+
deltaLog,
247+
startingVer,
248+
endingVersion.getOrElse {
249+
// ending version is decided at Analyzer phase if not specified.
250+
deltaLog.update(catalogTableOpt = catalogTableOpt).version
251+
},
252+
sqlContext.sparkSession,
253+
readSchemaSnapshot = Some(snapshotForBatchSchema))
254+
df
255+
case None =>
256+
// emtpyCDFRelation
257+
sqlContext.sparkSession.createDataFrame(
258+
sqlContext.sparkSession.sparkContext.emptyRDD[Row], schema)
259+
}
260+
}
261+
}
262+
196263
case class CDCDataSpec[T <: FileAction](
197264
version: Long,
198265
timestamp: Timestamp,
@@ -377,12 +444,24 @@ trait CDCReaderImpl extends DeltaLogging {
377444
s"${DeltaSQLConf.DELTA_CDF_DEFAULT_SCHEMA_MODE_FOR_COLUMN_MAPPING_TABLE.key} " +
378445
s"cannot be used with time travel options.")
379446
}
380-
DeltaCDFRelation(
381-
SnapshotWithSchemaMode(snapshotToUse, schemaMode),
382-
spark.sqlContext,
383-
catalogTableOpt,
384-
Some(startingVersion.version),
385-
endingVersionOpt.map(_.version))
447+
448+
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CDF_BATCH_STATIC_READER) &&
449+
(endingVersionOpt.isDefined || spark.sessionState.conf.getConf(
450+
DeltaSQLConf.DELTA_CDF_BATCH_STATIC_READER_START_ONLY))) {
451+
DeltaCDFRelationAtAnalysis(
452+
SnapshotWithSchemaMode(snapshotToUse, schemaMode),
453+
spark.sqlContext,
454+
catalogTableOpt,
455+
Some(startingVersion.version),
456+
endingVersionOpt.map(_.version))
457+
} else {
458+
DeltaCDFRelation(
459+
SnapshotWithSchemaMode(snapshotToUse, schemaMode),
460+
spark.sqlContext,
461+
catalogTableOpt,
462+
Some(startingVersion.version),
463+
endingVersionOpt.map(_.version))
464+
}
386465
}
387466

388467
private def verifyStartingVersion(
@@ -444,14 +523,26 @@ trait CDCReaderImpl extends DeltaLogging {
444523
snapshot: Snapshot,
445524
catalogTableOpt: Option[CatalogTable],
446525
schemaMode: DeltaBatchCDFSchemaMode) = {
447-
new DeltaCDFRelation(
448-
SnapshotWithSchemaMode(snapshot, schemaMode),
449-
spark.sqlContext,
450-
catalogTableOpt,
451-
startingVersion = None,
452-
endingVersion = None) {
453-
override def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] =
454-
sqlContext.sparkSession.sparkContext.emptyRDD[Row]
526+
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CDF_BATCH_STATIC_READER)) {
527+
DeltaCDFRelationAtAnalysis(
528+
SnapshotWithSchemaMode(snapshot, schemaMode),
529+
spark.sqlContext,
530+
catalogTableOpt,
531+
startingVersion = None,
532+
endingVersion = None)
533+
} else {
534+
new DeltaCDFRelation(
535+
SnapshotWithSchemaMode(snapshot, schemaMode),
536+
spark.sqlContext,
537+
catalogTableOpt,
538+
startingVersion = None,
539+
endingVersion = None) {
540+
override def buildScan(
541+
requiredColumns: Seq[Attribute],
542+
filters: Seq[Expression]): RDD[Row] = {
543+
sqlContext.sparkSession.sparkContext.emptyRDD[Row]
544+
}
545+
}
455546
}
456547
}
457548

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2070,6 +2070,35 @@ trait DeltaSQLConfBase {
20702070
.booleanConf
20712071
.createWithDefault(false)
20722072

2073+
val DELTA_CDF_BATCH_STATIC_READER =
2074+
buildConf("changeDataFeed.batch.staticReader")
2075+
.doc(
2076+
s"""If enabled, Delta uses an explicit query plan to read CDC changes instead of RDD impl.
2077+
|This enables query plan optimizations via Spark Plan Optimizer when applicable.
2078+
|The original CDF reader implementation uses Spark PrunedFilteredScan API,
2079+
|which performs a slower RDD-based scan and does not show jobs in the Spark UI.
2080+
|This optimization is applied to CDF reads with start and end timestamps/versions
2081+
|specified.
2082+
|If changeDataFeed.batch.staticReader.startOnly.enabled is set to true, it will
2083+
|also apply the optimization to the CDF reads with start version only
2084+
|and the end version is decided at Spark Query Plan Analyzer, not the execution time.
2085+
""".stripMargin)
2086+
.internal()
2087+
.booleanConf
2088+
.createWithDefault(true)
2089+
2090+
val DELTA_CDF_BATCH_STATIC_READER_START_ONLY =
2091+
buildConf("changeDataFeed.batch.staticReader.startOnly.enabled")
2092+
.doc(
2093+
s"""If enabled, changeDataFeed.batch.staticReader optimization also applies
2094+
|CDF read queries with start version only, non streaming scenario.
2095+
|The main caveat with the optimization is that, if the end version is not specified,
2096+
|it might not use the latest version if the table is updated after analysis.
2097+
""".stripMargin)
2098+
.internal()
2099+
.booleanConf
2100+
.createWithDefault(true)
2101+
20732102
val DELTA_COLUMN_MAPPING_CHECK_MAX_COLUMN_ID =
20742103
buildConf("columnMapping.checkMaxColumnId")
20752104
.doc(

spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.coordinatedcommits.CatalogOwnedTableUtils
2424
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2525
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
2626

27+
import org.apache.spark.SparkConf
2728
import org.apache.spark.sql.{AnalysisException, DataFrame}
2829
import org.apache.spark.sql.catalyst.TableIdentifier
2930
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
@@ -396,3 +397,10 @@ class DeltaCDCSQLWithCatalogOwnedBatch2Suite extends DeltaCDCSQLSuite {
396397
class DeltaCDCSQLWithCatalogOwnedBatch100Suite extends DeltaCDCSQLSuite {
397398
override def catalogOwnedCoordinatorBackfillBatchSize: Option[Int] = Some(100)
398399
}
400+
401+
class DeltaCDCSQLWithAtAnalyzerOptSuite
402+
extends DeltaCDCSQLSuite {
403+
override protected def sparkConf: SparkConf = super.sparkConf
404+
.set(DeltaSQLConf.DELTA_CDF_BATCH_STATIC_READER.key, "true")
405+
.set(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true")
406+
}

0 commit comments

Comments
 (0)