-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-30462][SS] Streamline the logic on file stream source and sink metadata log to avoid memory issue #28904
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| package org.apache.spark.sql.execution.streaming | ||
|
|
||
| import java.io.{InputStream, IOException, OutputStream} | ||
| import java.io.{FileNotFoundException, InputStream, IOException, OutputStream} | ||
| import java.nio.charset.StandardCharsets.UTF_8 | ||
|
|
||
| import scala.io.{Source => IOSource} | ||
|
|
@@ -28,7 +28,7 @@ import org.json4s.NoTypeHints | |
| import org.json4s.jackson.Serialization | ||
|
|
||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.util.{SizeEstimator, Utils} | ||
| import org.apache.spark.util.Utils | ||
|
|
||
| /** | ||
| * An abstract class for compactible metadata logs. It will write one log file for each batch. | ||
|
|
@@ -107,9 +107,12 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( | |
| } | ||
|
|
||
| /** | ||
| * Filter out the obsolete logs. | ||
| * Determine whether the log should be retained or not. | ||
| * | ||
| * Default implementation retains all log entries. Implementations should override the method | ||
| * to change the behavior. | ||
| */ | ||
| def compactLogs(logs: Seq[T]): Seq[T] | ||
| def shouldRetain(log: T): Boolean = true | ||
|
|
||
| override def batchIdToPath(batchId: Long): Path = { | ||
| if (isCompactionBatch(batchId, compactInterval)) { | ||
|
|
@@ -132,12 +135,18 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( | |
| } | ||
| } | ||
|
|
||
| private def serializeEntry(entry: T, out: OutputStream): Unit = { | ||
| out.write(Serialization.write(entry).getBytes(UTF_8)) | ||
| } | ||
|
|
||
| private def deserializeEntry(line: String): T = Serialization.read[T](line) | ||
|
|
||
| override def serialize(logData: Array[T], out: OutputStream): Unit = { | ||
| // called inside a try-finally where the underlying stream is closed in the caller | ||
| out.write(("v" + metadataLogVersion).getBytes(UTF_8)) | ||
| logData.foreach { data => | ||
| out.write('\n') | ||
| out.write(Serialization.write(data).getBytes(UTF_8)) | ||
| serializeEntry(data, out) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -147,7 +156,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( | |
| throw new IllegalStateException("Incomplete log file") | ||
| } | ||
| validateVersion(lines.next(), metadataLogVersion) | ||
| lines.map(Serialization.read[T]).toArray | ||
| lines.map(deserializeEntry).toArray | ||
| } | ||
|
|
||
| override def add(batchId: Long, logs: Array[T]): Boolean = { | ||
|
|
@@ -173,37 +182,64 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( | |
| override def purge(thresholdBatchId: Long): Unit = throw new UnsupportedOperationException( | ||
| s"Cannot purge as it might break internal state.") | ||
|
|
||
| /** | ||
| * Apply function on all entries in the specific batch. The method will throw | ||
| * FileNotFoundException if the metadata log file doesn't exist. | ||
| * | ||
| * NOTE: This doesn't fail early on corruption. The caller should handle the exception | ||
| * properly and make sure the logic is not affected by failing in the middle. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the caller does not handle the exception properly, what is the consequence? Do we have a test case to cover it?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How to ensure all the callers handle it properly? Do we need to introduce a test to cover it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how this class ensures callers are following the guide. Did you mean we'd like to test this behavior with derived classes (file stream source/sink) log? Or we'd like to test this behavior with test-purpose implementation of CompactibleFileStreamLog? |
||
| */ | ||
| def foreachInBatch(batchId: Long)(fn: T => Unit): Unit = applyFnInBatch(batchId)(_.foreach(fn)) | ||
|
|
||
| /** | ||
| * Apply filter on all entries in the specific batch. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto. Shall we mention
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto. Would we want to mention all possible exceptions per method? |
||
| */ | ||
| def filterInBatch(batchId: Long)(predicate: T => Boolean): Option[Array[T]] = { | ||
| try { | ||
| Some(applyFnInBatch(batchId)(_.filter(predicate).toArray)) | ||
| } catch { | ||
| case _: FileNotFoundException => None | ||
| } | ||
| } | ||
|
|
||
| private def applyFnInBatch[RET](batchId: Long)(fn: Iterator[T] => RET): RET = { | ||
| applyFnToBatchByStream(batchId) { input => | ||
| val lines = IOSource.fromInputStream(input, UTF_8.name()).getLines() | ||
| if (!lines.hasNext) { | ||
| throw new IllegalStateException("Incomplete log file") | ||
| } | ||
| validateVersion(lines.next(), metadataLogVersion) | ||
| fn(lines.map(deserializeEntry)) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Compacts all logs before `batchId` plus the provided `logs`, and writes them into the | ||
| * corresponding `batchId` file. It will delete expired files as well if enabled. | ||
| */ | ||
| private def compact(batchId: Long, logs: Array[T]): Boolean = { | ||
| val (allLogs, loadElapsedMs) = Utils.timeTakenMs { | ||
| val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) | ||
| validBatches.flatMap { id => | ||
| super.get(id).getOrElse { | ||
| throw new IllegalStateException( | ||
| s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " + | ||
| s"(compactInterval: $compactInterval)") | ||
| } | ||
| } ++ logs | ||
| def writeEntry(entry: T, output: OutputStream): Unit = { | ||
| if (shouldRetain(entry)) { | ||
| output.write('\n') | ||
| serializeEntry(entry, output) | ||
| } | ||
| } | ||
| val compactedLogs = compactLogs(allLogs) | ||
|
|
||
| // Return false as there is another writer. | ||
| val (writeSucceed, writeElapsedMs) = Utils.timeTakenMs { | ||
| super.add(batchId, compactedLogs.toArray) | ||
| val (writeSucceed, elapsedMs) = Utils.timeTakenMs { | ||
| addNewBatchByStream(batchId) { output => | ||
| output.write(("v" + metadataLogVersion).getBytes(UTF_8)) | ||
| val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval) | ||
| validBatches.foreach { id => | ||
| foreachInBatch(id) { entry => writeEntry(entry, output) } | ||
| } | ||
| logs.foreach { entry => writeEntry(entry, output) } | ||
| } | ||
| } | ||
|
|
||
| val elapsedMs = loadElapsedMs + writeElapsedMs | ||
| if (elapsedMs >= COMPACT_LATENCY_WARN_THRESHOLD_MS) { | ||
| logWarning(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," + | ||
| s" write: $writeElapsedMs ms) for compact batch $batchId") | ||
| logWarning(s"Loaded ${allLogs.size} entries (estimated ${SizeEstimator.estimate(allLogs)} " + | ||
| s"bytes in memory), and wrote ${compactedLogs.size} entries for compact batch $batchId") | ||
| logWarning(s"Compacting took $elapsedMs ms for compact batch $batchId") | ||
| } else { | ||
| logDebug(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," + | ||
| s" write: $writeElapsedMs ms) for compact batch $batchId") | ||
| logDebug(s"Compacting took $elapsedMs ms for compact batch $batchId") | ||
| } | ||
|
|
||
| writeSucceed | ||
|
|
@@ -222,21 +258,22 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag]( | |
| try { | ||
| val logs = | ||
| getAllValidBatches(latestId, compactInterval).flatMap { id => | ||
| super.get(id).getOrElse { | ||
| filterInBatch(id)(shouldRetain).getOrElse { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would help when we introduce a new condition on exclusion of entries. |
||
| throw new IllegalStateException( | ||
| s"${batchIdToPath(id)} doesn't exist " + | ||
| s"(latestId: $latestId, compactInterval: $compactInterval)") | ||
| } | ||
| } | ||
| return compactLogs(logs).toArray | ||
| return logs.toArray | ||
| } catch { | ||
| case e: IOException => | ||
| // Another process using `CompactibleFileStreamLog` may delete the batch files when | ||
| // `StreamFileIndex` are reading. However, it only happens when a compaction is | ||
| // deleting old files. If so, let's try the next compaction batch and we should find it. | ||
| // Otherwise, this is a real IO issue and we should throw it. | ||
| latestId = nextCompactionBatchId(latestId, compactInterval) | ||
| super.get(latestId).getOrElse { | ||
| val expectedMinLatestId = nextCompactionBatchId(latestId, compactInterval) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This new approach is to avoid reading the next compact file log, which materializes all entries into the file. It should be extreme case, so it's also OK to keep this as it is if someone strongly think the previous one is better. |
||
| latestId = super.getLatestBatchId().getOrElse(-1) | ||
| if (latestId < expectedMinLatestId) { | ||
| throw e | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -115,42 +115,34 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: | |
| */ | ||
| override def add(batchId: Long, metadata: T): Boolean = { | ||
| require(metadata != null, "'null' metadata cannot written to a metadata log") | ||
| get(batchId).map(_ => false).getOrElse { | ||
| // Only write metadata when the batch has not yet been written | ||
| writeBatchToFile(metadata, batchIdToPath(batchId)) | ||
| true | ||
| } | ||
| addNewBatchByStream(batchId) { output => serialize(metadata, output) } | ||
| } | ||
|
|
||
| /** Write a batch to a temp file then rename it to the batch file. | ||
| * | ||
| * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a | ||
| * valid behavior, we still need to prevent it from destroying the files. | ||
| */ | ||
| private def writeBatchToFile(metadata: T, path: Path): Unit = { | ||
| val output = fileManager.createAtomic(path, overwriteIfPossible = false) | ||
| override def get(batchId: Long): Option[T] = { | ||
| try { | ||
| serialize(metadata, output) | ||
| output.close() | ||
| applyFnToBatchByStream(batchId) { input => Some(deserialize(input)) } | ||
| } catch { | ||
| case e: FileAlreadyExistsException => | ||
| output.cancel() | ||
| // If next batch file already exists, then another concurrently running query has | ||
| // written it. | ||
| throw new ConcurrentModificationException( | ||
| s"Multiple streaming queries are concurrently using $path", e) | ||
| case e: Throwable => | ||
| output.cancel() | ||
| throw e | ||
| case fne: FileNotFoundException => | ||
| logDebug(fne.getMessage) | ||
| None | ||
| } | ||
| } | ||
|
|
||
| override def get(batchId: Long): Option[T] = { | ||
| /** | ||
| * Apply provided function to each entry in the specific batch metadata log. | ||
| * | ||
| * Unlike get which will materialize all entries into memory, this method streamlines the process | ||
| * via READ-AND-PROCESS. This helps to avoid the memory issue on huge metadata log file. | ||
| * | ||
| * NOTE: This no longer fails early on corruption. The caller should handle the exception | ||
| * properly and make sure the logic is not affected by failing in the middle. | ||
| */ | ||
| def applyFnToBatchByStream[RET](batchId: Long)(fn: InputStream => RET): RET = { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: duplicated code with |
||
| val batchMetadataFile = batchIdToPath(batchId) | ||
| if (fileManager.exists(batchMetadataFile)) { | ||
| val input = fileManager.open(batchMetadataFile) | ||
| try { | ||
| Some(deserialize(input)) | ||
| fn(input) | ||
| } catch { | ||
| case ise: IllegalStateException => | ||
| // re-throw the exception with the log file path added | ||
|
|
@@ -160,8 +152,42 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: | |
| IOUtils.closeQuietly(input) | ||
| } | ||
| } else { | ||
| logDebug(s"Unable to find batch $batchMetadataFile") | ||
| None | ||
| throw new FileNotFoundException(s"Unable to find batch $batchMetadataFile") | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Store the metadata for the specified batchId and return `true` if successful. This method | ||
| * fills the content of metadata via executing function. If the function throws an exception, | ||
| * writing will be automatically cancelled and this method will propagate the exception. | ||
| * | ||
| * If the batchId's metadata has already been stored, this method will return `false`. | ||
| * | ||
| * Writing the metadata is done by writing a batch to a temp file then rename it to the batch | ||
| * file. | ||
| * | ||
| * There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a | ||
| * valid behavior, we still need to prevent it from destroying the files. | ||
| */ | ||
| def addNewBatchByStream(batchId: Long)(fn: OutputStream => Unit): Boolean = { | ||
dongjoon-hyun marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| get(batchId).map(_ => false).getOrElse { | ||
| // Only write metadata when the batch has not yet been written | ||
| val output = fileManager.createAtomic(batchIdToPath(batchId), overwriteIfPossible = false) | ||
| try { | ||
| fn(output) | ||
| output.close() | ||
| } catch { | ||
| case e: FileAlreadyExistsException => | ||
| output.cancel() | ||
| // If next batch file already exists, then another concurrently running query has | ||
| // written it. | ||
| throw new ConcurrentModificationException( | ||
| s"Multiple streaming queries are concurrently using $path", e) | ||
| case e: Throwable => | ||
| output.cancel() | ||
| throw e | ||
| } | ||
| true | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,2 @@ | ||
| v1 | ||
| {"path":"/a/b/8","size":800,"isDir":false,"modificationTime":800,"blockReplication":1,"blockSize":100,"action":"add"} | ||
| {"path":"/a/b/0","size":100,"isDir":false,"modificationTime":100,"blockReplication":1,"blockSize":100,"action":"delete"} | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be removed - this gave false information as we support DELETE_ACTION previously, but in reality it was never implemented. |
||
Uh oh!
There was an error while loading. Please reload this page.