@@ -467,7 +467,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
467467 AddData (inputStream, (6 , 6L ), (7 , 7L ), (8 , 8L ), (9 , 9L ), (10 , 10L )),
468468 // batch 2: same result as above test
469469 CheckNewAnswer ((6 , 6L , 6 , 6L ), (8 , 8L , 8 , 8L ), (10 , 10L , 10 , 10L )),
470- assertNumStateRows(11 , 6 ),
470+ assertNumStateRows(11 , 6 , 0 ),
471471 Execute { query =>
472472 // Verify state format = 1
473473 val f = query.lastExecution.executedPlan.collect {
@@ -804,7 +804,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
804804 // left: (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)
805805 // right: (2, 2L), (4, 4L)
806806 CheckNewAnswer ((2 , 2L , 2 , 2L ), (4 , 4L , 4 , 4L )),
807- assertNumStateRows(7 , 7 ),
807+ assertNumStateRows(7 , 7 , 0 ),
808808
809809 AddData (inputStream, (6 , 6L ), (7 , 7L ), (8 , 8L ), (9 , 9L ), (10 , 10L )),
810810 // batch 2 - global watermark = 5
@@ -818,7 +818,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
818818 // NOTE: look for evicted rows in right which are not evicted from left - they were
819819 // properly joined in batch 1
820820 CheckNewAnswer ((6 , 6L , 6 , 6L ), (8 , 8L , 8 , 8L ), (10 , 10L , 10 , 10L )),
821- assertNumStateRows(13 , 8 ),
821+ assertNumStateRows(13 , 8 , 0 ),
822822
823823 AddData (inputStream, (11 , 11L ), (12 , 12L ), (13 , 13L ), (14 , 14L ), (15 , 15L )),
824824 // batch 3
@@ -833,7 +833,7 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
833833 CheckNewAnswer (
834834 Row (12 , 12L , 12 , 12L ), Row (14 , 14L , 14 , 14L ),
835835 Row (1 , 1L , null , null ), Row (3 , 3L , null , null )),
836- assertNumStateRows(15 , 7 )
836+ assertNumStateRows(15 , 7 , 0 )
837837 )
838838 }
839839
@@ -867,17 +867,17 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
867867 testStream(query)(
868868 AddData (inputStream, (1 , 1L ), (2 , 2L ), (3 , 3L ), (4 , 4L ), (5 , 5L )),
869869 CheckNewAnswer ((2 , 2L , 2 , 2L ), (4 , 4L , 4 , 4L )),
870- assertNumStateRows(7 , 7 ),
870+ assertNumStateRows(7 , 7 , 0 ),
871871
872872 AddData (inputStream, (6 , 6L ), (7 , 7L ), (8 , 8L ), (9 , 9L ), (10 , 10L )),
873873 CheckNewAnswer ((6 , 6L , 6 , 6L ), (8 , 8L , 8 , 8L ), (10 , 10L , 10 , 10L )),
874- assertNumStateRows(13 , 8 ),
874+ assertNumStateRows(13 , 8 , 0 ),
875875
876876 AddData (inputStream, (11 , 11L ), (12 , 12L ), (13 , 13L ), (14 , 14L ), (15 , 15L )),
877877 CheckNewAnswer (
878878 Row (12 , 12L , 12 , 12L ), Row (14 , 14L , 14 , 14L ),
879879 Row (null , null , 1 , 1L ), Row (null , null , 3 , 3L )),
880- assertNumStateRows(15 , 7 )
880+ assertNumStateRows(15 , 7 , 0 )
881881 )
882882 }
883883
0 commit comments