From cec8296ebcb005b1397b3810d5ea0a0b4cd11282 Mon Sep 17 00:00:00 2001 From: bartosz25 Date: Sun, 15 Sep 2019 17:24:58 +0200 Subject: [PATCH] adapt comment to the reality previous comment was true for Apache Spark prior 2.4.0. The 2.4.0 release brought multiple watermark policy and therefore stating that the 'min' is always chosen is misleading. --- .../spark/sql/execution/streaming/WatermarkTracker.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala index 76ab1284633b..b0f8cf9cd184 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/WatermarkTracker.scala @@ -63,7 +63,7 @@ case object MinWatermark extends MultipleWatermarkPolicy { } /** - * Policy to choose the *min* of the operator watermark values as the global watermark value. So the + * Policy to choose the *max* of the operator watermark values as the global watermark value. So the * global watermark will advance if any of the individual operator watermarks has advanced. * In other words, in a streaming query with multiple input streams and watermarks defined on all * of them, the global watermark will advance as fast as the fastest input. So if there is watermark @@ -108,10 +108,9 @@ case class WatermarkTracker(policy: MultipleWatermarkPolicy) extends Logging { } } - // Update the global watermark to the minimum of all watermark nodes. - // This is the safest option, because only the global watermark is fault-tolerant. Making - // it the minimum of all individual watermarks guarantees it will never advance past where - // any individual watermark operator would be if it were in a plan by itself. + // Update the global watermark accordingly to the chosen policy. To find all available policies + // and their semantics, please check the comments of + // `org.apache.spark.sql.execution.streaming.MultipleWatermarkPolicy` implementations. val chosenGlobalWatermark = policy.chooseGlobalWatermark(operatorToWatermarkMap.values.toSeq) if (chosenGlobalWatermark > globalWatermarkMs) { logInfo(s"Updating event-time watermark from $globalWatermarkMs to $chosenGlobalWatermark ms")