@@ -37,12 +37,12 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp
3737 case class RelationSetting (
3838 cols : Seq [Attribute ],
3939 numBuckets : Int ,
40- expectedCoalescedNumBuckets : Option [Int ])
40+ expectedNumBuckets : Option [Int ])
4141
4242 object RelationSetting {
43- def apply (numBuckets : Int , expectedCoalescedNumBuckets : Option [Int ]): RelationSetting = {
43+ def apply (numBuckets : Int , expectedNumBuckets : Option [Int ]): RelationSetting = {
4444 val cols = Seq (AttributeReference (" i" , IntegerType )())
45- RelationSetting (cols, numBuckets, expectedCoalescedNumBuckets )
45+ RelationSetting (cols, numBuckets, expectedNumBuckets )
4646 }
4747 }
4848
@@ -80,13 +80,14 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp
8080 leftKeys = setting.rightKeys,
8181 rightKeys = setting.leftKeys,
8282 leftRelation = setting.rightRelation,
83- rightRelation = setting.leftRelation)
83+ rightRelation = setting.leftRelation,
84+ shjBuildSide = setting.shjBuildSide.map {
85+ case BuildLeft => BuildRight
86+ case BuildRight => BuildLeft
87+ })
88+
89+ val settings = Seq (setting, swappedSetting)
8490
85- val settings = if (setting.joinOperator != SHUFFLED_HASH_JOIN ) {
86- Seq (setting, swappedSetting)
87- } else {
88- Seq (setting)
89- }
9091 settings.foreach { s =>
9192 val lScan = newFileSourceScanExec(s.leftRelation)
9293 val rScan = newFileSourceScanExec(s.rightRelation)
@@ -102,19 +103,19 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp
102103 val plan = CoalesceOrRepartitionBucketsInJoin (spark.sessionState.conf)(join)
103104
104105 def verify (expected : Option [Int ], subPlan : SparkPlan ): Unit = {
105- val coalesced = subPlan.collect {
106+ val optionalNewNumBuckets = subPlan.collect {
106107 case f : FileSourceScanExec if f.optionalNewNumBuckets.nonEmpty =>
107108 f.optionalNewNumBuckets.get
108109 }
109110 if (expected.isDefined) {
110- assert(coalesced .size == 1 && coalesced .head == expected.get)
111+ assert(optionalNewNumBuckets .size == 1 && optionalNewNumBuckets .head == expected.get)
111112 } else {
112- assert(coalesced .isEmpty)
113+ assert(optionalNewNumBuckets .isEmpty)
113114 }
114115 }
115116
116- verify(s.leftRelation.expectedCoalescedNumBuckets , plan.asInstanceOf [BinaryExecNode ].left)
117- verify(s.rightRelation.expectedCoalescedNumBuckets , plan.asInstanceOf [BinaryExecNode ].right)
117+ verify(s.leftRelation.expectedNumBuckets , plan.asInstanceOf [BinaryExecNode ].left)
118+ verify(s.rightRelation.expectedNumBuckets , plan.asInstanceOf [BinaryExecNode ].right)
118119 }
119120 }
120121
@@ -136,9 +137,32 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp
136137 }
137138 }
138139
139- test(" bucket coalescing should work only for sort merge join and shuffled hash join" ) {
140- Seq (true , false ).foreach { enabled =>
141- withSQLConf(SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> enabled.toString) {
140+ test(" bucket repartitioning - basic" ) {
141+ withSQLConf(SQLConf .REPARTITION_BUCKETS_IN_JOIN_ENABLED .key -> " true" ) {
142+ run(JoinSetting (
143+ RelationSetting (8 , None ), RelationSetting (4 , Some (8 )), joinOperator = SORT_MERGE_JOIN ))
144+ Seq (BuildLeft , BuildRight ).foreach { buildSide =>
145+ run(JoinSetting (
146+ RelationSetting (8 , None ), RelationSetting (4 , Some (8 )), joinOperator = SHUFFLED_HASH_JOIN ,
147+ shjBuildSide = Some (buildSide)))
148+ }
149+ }
150+
151+ withSQLConf(SQLConf .REPARTITION_BUCKETS_IN_JOIN_ENABLED .key -> " false" ) {
152+ run(JoinSetting (
153+ RelationSetting (8 , None ), RelationSetting (4 , None ), joinOperator = SORT_MERGE_JOIN ))
154+ Seq (BuildLeft , BuildRight ).foreach { buildSide =>
155+ run(JoinSetting (
156+ RelationSetting (8 , None ), RelationSetting (4 , None ), joinOperator = SHUFFLED_HASH_JOIN ,
157+ shjBuildSide = Some (buildSide)))
158+ }
159+ }
160+ }
161+
162+ test(" bucket coalesce/repartition should work only for sort merge join and shuffled hash join" ) {
163+ Seq (SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ,
164+ SQLConf .REPARTITION_BUCKETS_IN_JOIN_ENABLED .key -> " true" ).foreach { enableConfig =>
165+ withSQLConf(enableConfig) {
142166 run(JoinSetting (
143167 RelationSetting (4 , None ), RelationSetting (8 , None ), joinOperator = BROADCAST_HASH_JOIN ))
144168 }
@@ -153,120 +177,142 @@ class CoalesceOrRepartitionBucketsInJoinSuite extends SQLTestUtils with SharedSp
153177 }
154178 }
155179
156- test(" bucket coalescing shouldn't be applied when the number of buckets are the same" ) {
157- withSQLConf(SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ) {
158- run(JoinSetting (
159- RelationSetting (8 , None ), RelationSetting (8 , None ), joinOperator = SORT_MERGE_JOIN ))
160- run(JoinSetting (
161- RelationSetting (8 , None ), RelationSetting (8 , None ), joinOperator = SHUFFLED_HASH_JOIN ,
162- shjBuildSide = Some (BuildLeft )))
180+ test(" bucket coalesce/repartition shouldn't be applied when the number of buckets are the same" ) {
181+ Seq (SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ,
182+ SQLConf .REPARTITION_BUCKETS_IN_JOIN_ENABLED .key -> " true" ).foreach { enableConfig =>
183+ withSQLConf(enableConfig) {
184+ run(JoinSetting (
185+ RelationSetting (8 , None ), RelationSetting (8 , None ), joinOperator = SORT_MERGE_JOIN ))
186+ run(JoinSetting (
187+ RelationSetting (8 , None ), RelationSetting (8 , None ), joinOperator = SHUFFLED_HASH_JOIN ,
188+ shjBuildSide = Some (BuildLeft )))
189+ }
163190 }
164191 }
165192
166193 test(" number of bucket is not divisible by other number of bucket" ) {
167- withSQLConf(SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ) {
168- run(JoinSetting (
169- RelationSetting (3 , None ), RelationSetting (8 , None ), joinOperator = SORT_MERGE_JOIN ))
170- run(JoinSetting (
171- RelationSetting (3 , None ), RelationSetting (8 , None ), joinOperator = SHUFFLED_HASH_JOIN ,
172- shjBuildSide = Some (BuildLeft )))
194+ Seq (SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ,
195+ SQLConf .REPARTITION_BUCKETS_IN_JOIN_ENABLED .key -> " true" ).foreach { enableConfig =>
196+ withSQLConf(enableConfig) {
197+ run(JoinSetting (
198+ RelationSetting (3 , None ), RelationSetting (8 , None ), joinOperator = SORT_MERGE_JOIN ))
199+ run(JoinSetting (
200+ RelationSetting (3 , None ), RelationSetting (8 , None ), joinOperator = SHUFFLED_HASH_JOIN ,
201+ shjBuildSide = Some (BuildLeft )))
202+ }
173203 }
174204 }
175205
176206 test(" the ratio of the number of buckets is greater than max allowed" ) {
177- withSQLConf(SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ,
178- SQLConf .COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO .key -> " 2" ) {
179- run(JoinSetting (
180- RelationSetting (4 , None ), RelationSetting (16 , None ), joinOperator = SORT_MERGE_JOIN ))
181- run(JoinSetting (
182- RelationSetting (4 , None ), RelationSetting (16 , None ), joinOperator = SHUFFLED_HASH_JOIN ,
183- shjBuildSide = Some (BuildLeft )))
207+ Seq (SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ,
208+ SQLConf .REPARTITION_BUCKETS_IN_JOIN_ENABLED .key -> " true" ).foreach { enableConfig =>
209+ withSQLConf(enableConfig,
210+ SQLConf .COALESCE_OR_REPARTITION_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO .key -> " 2" ) {
211+ run(JoinSetting (
212+ RelationSetting (4 , None ), RelationSetting (16 , None ), joinOperator = SORT_MERGE_JOIN ))
213+ run(JoinSetting (
214+ RelationSetting (4 , None ), RelationSetting (16 , None ), joinOperator = SHUFFLED_HASH_JOIN ,
215+ shjBuildSide = Some (BuildLeft )))
216+ }
184217 }
185218 }
186219
187220 test(" join keys should match with output partitioning" ) {
188- withSQLConf(SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ) {
189- val lCols = Seq (
190- AttributeReference (" l1" , IntegerType )(),
191- AttributeReference (" l2" , IntegerType )())
192- val rCols = Seq (
193- AttributeReference (" r1" , IntegerType )(),
194- AttributeReference (" r2" , IntegerType )())
195-
196- val lRel = RelationSetting (lCols, 4 , None )
197- val rRel = RelationSetting (rCols, 8 , None )
198-
199- // The following should not be coalesced because join keys do not match with output
200- // partitioning (missing one expression).
201- run(JoinSetting (
202- leftKeys = Seq (lCols.head),
203- rightKeys = Seq (rCols.head),
204- leftRelation = lRel,
205- rightRelation = rRel,
206- joinOperator = SORT_MERGE_JOIN ,
207- shjBuildSide = None ))
221+ val lCols = Seq (
222+ AttributeReference (" l1" , IntegerType )(),
223+ AttributeReference (" l2" , IntegerType )())
224+ val rCols = Seq (
225+ AttributeReference (" r1" , IntegerType )(),
226+ AttributeReference (" r2" , IntegerType )())
208227
209- run(JoinSetting (
210- leftKeys = Seq (lCols.head),
211- rightKeys = Seq (rCols.head),
212- leftRelation = lRel,
213- rightRelation = rRel,
214- joinOperator = SHUFFLED_HASH_JOIN ,
215- shjBuildSide = Some (BuildLeft )))
228+ val lRel = RelationSetting (lCols, 4 , None )
229+ val rRel = RelationSetting (rCols, 8 , None )
216230
217- // The following should not be coalesced because join keys do not match with output
218- // partitioning (more expressions).
219- run(JoinSetting (
220- leftKeys = lCols :+ AttributeReference (" l3" , IntegerType )(),
221- rightKeys = rCols :+ AttributeReference (" r3" , IntegerType )(),
222- leftRelation = lRel,
223- rightRelation = rRel,
224- joinOperator = SORT_MERGE_JOIN ,
225- shjBuildSide = None ))
231+ Seq (SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ,
232+ SQLConf .REPARTITION_BUCKETS_IN_JOIN_ENABLED .key -> " true" ).foreach { enableConfig =>
233+ withSQLConf(enableConfig) {
234+ // The following should not be coalesced because join keys do not match with output
235+ // partitioning (missing one expression).
236+ run(JoinSetting (
237+ leftKeys = Seq (lCols.head),
238+ rightKeys = Seq (rCols.head),
239+ leftRelation = lRel,
240+ rightRelation = rRel,
241+ joinOperator = SORT_MERGE_JOIN ,
242+ shjBuildSide = None ))
226243
227- run(JoinSetting (
228- leftKeys = lCols :+ AttributeReference (" l3" , IntegerType )(),
229- rightKeys = rCols :+ AttributeReference (" r3" , IntegerType )(),
230- leftRelation = lRel,
231- rightRelation = rRel,
232- joinOperator = SHUFFLED_HASH_JOIN ,
233- shjBuildSide = Some (BuildLeft )))
244+ run(JoinSetting (
245+ leftKeys = Seq (lCols.head),
246+ rightKeys = Seq (rCols.head),
247+ leftRelation = lRel,
248+ rightRelation = rRel,
249+ joinOperator = SHUFFLED_HASH_JOIN ,
250+ shjBuildSide = Some (BuildLeft )))
251+
252+ // The following should not be coalesced because join keys do not match with output
253+ // partitioning (more expressions).
254+ run(JoinSetting (
255+ leftKeys = lCols :+ AttributeReference (" l3" , IntegerType )(),
256+ rightKeys = rCols :+ AttributeReference (" r3" , IntegerType )(),
257+ leftRelation = lRel,
258+ rightRelation = rRel,
259+ joinOperator = SORT_MERGE_JOIN ,
260+ shjBuildSide = None ))
234261
262+ run(JoinSetting (
263+ leftKeys = lCols :+ AttributeReference (" l3" , IntegerType )(),
264+ rightKeys = rCols :+ AttributeReference (" r3" , IntegerType )(),
265+ leftRelation = lRel,
266+ rightRelation = rRel,
267+ joinOperator = SHUFFLED_HASH_JOIN ,
268+ shjBuildSide = Some (BuildLeft )))
269+ }
270+ }
271+
272+ withSQLConf(SQLConf .COALESCE_BUCKETS_IN_JOIN_ENABLED .key -> " true" ) {
235273 // The following will be coalesced since ordering should not matter because it will be
236274 // adjusted in `EnsureRequirements`.
237- run( JoinSetting (
275+ val setting = JoinSetting (
238276 leftKeys = lCols.reverse,
239277 rightKeys = rCols.reverse,
240278 leftRelation = lRel,
241279 rightRelation = RelationSetting (rCols, 8 , Some (4 )),
242- joinOperator = SORT_MERGE_JOIN ,
243- shjBuildSide = None ))
280+ joinOperator = " " ,
281+ shjBuildSide = None )
244282
245- run(JoinSetting (
283+ run(setting.copy(joinOperator = SORT_MERGE_JOIN ))
284+ run(setting.copy(joinOperator = SHUFFLED_HASH_JOIN , shjBuildSide = Some (BuildLeft )))
285+ }
286+
287+ withSQLConf(SQLConf .REPARTITION_BUCKETS_IN_JOIN_ENABLED .key -> " true" ) {
288+ // The following will be repartitioned since ordering should not matter because it will be
289+ // adjusted in `EnsureRequirements`.
290+ val setting = JoinSetting (
246291 leftKeys = lCols.reverse,
247292 rightKeys = rCols.reverse,
248- leftRelation = lRel ,
249- rightRelation = RelationSetting (rCols, 8 , Some ( 4 )) ,
250- joinOperator = SHUFFLED_HASH_JOIN ,
251- shjBuildSide = Some ( BuildLeft )) )
293+ leftRelation = RelationSetting (lCols, 4 , Some ( 8 )) ,
294+ rightRelation = rRel ,
295+ joinOperator = " " ,
296+ shjBuildSide = None )
252297
253- run(JoinSetting (
254- leftKeys = rCols.reverse,
255- rightKeys = lCols.reverse,
256- leftRelation = RelationSetting (rCols, 8 , Some (4 )),
257- rightRelation = lRel,
258- joinOperator = SHUFFLED_HASH_JOIN ,
259- shjBuildSide = Some (BuildRight )))
298+ run(setting.copy(joinOperator = SORT_MERGE_JOIN ))
299+ Seq (BuildLeft , BuildRight ).foreach { buildSide =>
300+ run(setting.copy(joinOperator = SHUFFLED_HASH_JOIN , shjBuildSide = Some (buildSide)))
301+ }
260302 }
261303 }
262304
263- test(" FileSourceScanExec's metadata should be updated with coalesced info" ) {
305+ test(" FileSourceScanExec's metadata should be updated with coalesced/repartitioned info" ) {
264306 val scan = newFileSourceScanExec(RelationSetting (8 , None ))
265307 val value = scan.metadata(" SelectedBucketsCount" )
266308 assert(value === " 8 out of 8" )
267309
268310 val scanWithCoalescing = scan.copy(optionalNewNumBuckets = Some (4 ))
269- val valueWithCoalescing = scanWithCoalescing.metadata(" SelectedBucketsCount" )
270- assert(valueWithCoalescing == " 8 out of 8 (Coalesced to 4)" )
311+ val metadataWithCoalescing = scanWithCoalescing.metadata(" SelectedBucketsCount" )
312+ assert(metadataWithCoalescing == " 8 out of 8 (Coalesced to 4)" )
313+
314+ val scanWithRepartitioning = scan.copy(optionalNewNumBuckets = Some (16 ))
315+ val metadataWithRepartitioning = scanWithRepartitioning.metadata(" SelectedBucketsCount" )
316+ assert(metadataWithRepartitioning == " 8 out of 8 (Repartitioned to 16)" )
271317 }
272318}
0 commit comments