From 16106c0ea063a9716907c21338e5b74e7672f39e Mon Sep 17 00:00:00 2001 From: Jamie Willis Date: Fri, 26 Jul 2024 14:01:02 +0100 Subject: [PATCH 1/9] feat(proc): added os.proc overloadings for timeoutGracefulPeriod and stubs for SubProcess --- os/src/ProcessOps.scala | 106 +++++++++++++++++++++++++++------------- os/src/SubProcess.scala | 17 +++++-- 2 files changed, 86 insertions(+), 37 deletions(-) diff --git a/os/src/ProcessOps.scala b/os/src/ProcessOps.scala index 5983aaa8..583f3721 100644 --- a/os/src/ProcessOps.scala +++ b/os/src/ProcessOps.scala @@ -45,17 +45,22 @@ case class proc(command: Shellable*) { * `call` provides a number of parameters that let you configure how the subprocess * is run: * - * @param cwd the working directory of the subprocess - * @param env any additional environment variables you wish to set in the subprocess - * @param stdin any data you wish to pass to the subprocess's standard input - * @param stdout How the process's output stream is configured. - * @param stderr How the process's error stream is configured. - * @param mergeErrIntoOut merges the subprocess's stderr stream into it's stdout - * @param timeout how long to wait in milliseconds for the subprocess to complete - * @param check disable this to avoid throwing an exception if the subprocess - * fails with a non-zero exit code - * @param propagateEnv disable this to avoid passing in this parent process's - * environment variables to the subprocess + * @param cwd the working directory of the subprocess + * @param env any additional environment variables you wish to set in the subprocess + * @param stdin any data you wish to pass to the subprocess's standard input + * @param stdout How the process's output stream is configured. + * @param stderr How the process's error stream is configured. + * @param mergeErrIntoOut merges the subprocess's stderr stream into it's stdout + * @param timeout how long to wait in milliseconds for the subprocess to complete + * (-1 for no timeout) + * @param check disable this to avoid throwing an exception if the subprocess + * fails with a non-zero exit code + * @param propagateEnv disable this to avoid passing in this parent process's + * environment variables to the subprocess + * @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the + * subprocess to gracefully terminate before attempting to + * forcibly kill it + * (-1 for no kill, 0 for always kill immediately) */ def call( cwd: Path = null, @@ -66,7 +71,9 @@ case class proc(command: Shellable*) { mergeErrIntoOut: Boolean = false, timeout: Long = -1, check: Boolean = true, - propagateEnv: Boolean = true + propagateEnv: Boolean = true, + // this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode) + timeoutGracePeriod: Long = 1000, ): CommandResult = { val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]] @@ -87,7 +94,7 @@ case class proc(command: Shellable*) { propagateEnv ) - sub.join(timeout) + sub.join(timeout, timeoutGracePeriod) val chunksSeq = chunks.iterator.asScala.toIndexedSeq val res = CommandResult(commandChunks, sub.exitCode(), chunksSeq) @@ -95,6 +102,19 @@ case class proc(command: Shellable*) { else throw SubprocessException(res) } + // forwarder for the new timeoutGracePeriod flag + private [proc] def call( + cwd: Path, + env: Map[String, String], + stdin: ProcessInput, + stdout: ProcessOutput, + stderr: ProcessOutput, + mergeErrIntoOut: Boolean, + timeout: Long, + check: Boolean, + propagateEnv: Boolean + ): CommandResult = call(cwd, env, stdin, stdout, stderr, mergeErrIntoOut, timeout, check, propagateEnv, timeoutGracePeriod = 1000) + /** * The most flexible of the [[os.proc]] calls, `os.proc.spawn` simply configures * and starts a subprocess, and returns it as a `java.lang.Process` for you to @@ -181,24 +201,28 @@ case class ProcGroup private[os] (commands: Seq[proc]) { * `call` provides a number of parameters that let you configure how the pipeline * is run: * - * @param cwd the working directory of the pipeline - * @param env any additional environment variables you wish to set in the pipeline - * @param stdin any data you wish to pass to the pipelines's standard input (to the first process) - * @param stdout How the pipelines's output stream is configured (the last process stdout) - * @param stderr How the process's error stream is configured (set for all processes) - * @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the - * stderr will be forwarded with stdout to subsequent processes in the pipeline. - * @param timeout how long to wait in milliseconds for the pipeline to complete - * @param check disable this to avoid throwing an exception if the pipeline - * fails with a non-zero exit code - * @param propagateEnv disable this to avoid passing in this parent process's - * environment variables to the pipeline - * @param pipefail if true, the pipeline's exitCode will be the exit code of the first - * failing process. If no process fails, the exit code will be 0. - * @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process - * will be caught and handled by killing the writing process. This behaviour - * is consistent with handlers of SIGPIPE signals in most programs - * supporting interruptable piping. Disabled by default on Windows. + * @param cwd the working directory of the pipeline + * @param env any additional environment variables you wish to set in the pipeline + * @param stdin any data you wish to pass to the pipelines's standard input (to the first process) + * @param stdout How the pipelines's output stream is configured (the last process stdout) + * @param stderr How the process's error stream is configured (set for all processes) + * @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the + * stderr will be forwarded with stdout to subsequent processes in the pipeline. + * @param timeout how long to wait in milliseconds for the pipeline to complete + * @param check disable this to avoid throwing an exception if the pipeline + * fails with a non-zero exit code + * @param propagateEnv disable this to avoid passing in this parent process's + * environment variables to the pipeline + * @param pipefail if true, the pipeline's exitCode will be the exit code of the first + * failing process. If no process fails, the exit code will be 0. + * @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process + * will be caught and handled by killing the writing process. This behaviour + * is consistent with handlers of SIGPIPE signals in most programs + * supporting interruptable piping. Disabled by default on Windows. + * @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the + * subprocess to gracefully terminate before attempting to + * forcibly kill it + * (-1 for no kill, 0 for always kill immediately) */ def call( cwd: Path = null, @@ -211,7 +235,9 @@ case class ProcGroup private[os] (commands: Seq[proc]) { check: Boolean = true, propagateEnv: Boolean = true, pipefail: Boolean = true, - handleBrokenPipe: Boolean = !isWindows + handleBrokenPipe: Boolean = !isWindows, + // this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode) + timeoutGracePeriod: Long = 1000, ): CommandResult = { val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]] @@ -232,7 +258,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) { pipefail ) - sub.join(timeout) + sub.join(timeout, timeoutGracePeriod) val chunksSeq = chunks.iterator.asScala.toIndexedSeq val res = @@ -241,6 +267,20 @@ case class ProcGroup private[os] (commands: Seq[proc]) { else throw SubprocessException(res) } + private [ProcGroup] def call( + cwd: Path, + env: Map[String, String], + stdin: ProcessInput, + stdout: ProcessOutput, + stderr: ProcessOutput, + mergeErrIntoOut: Boolean, + timeout: Long, + check: Boolean, + propagateEnv: Boolean, + pipefail: Boolean, + handleBrokenPipe: Boolean, + ): CommandResult = call(cwd, env, stdin, stdout, stderr, mergeErrIntoOut, timeout, check, propagateEnv, pipefail, handleBrokenPipe, timeoutGracePeriod = 1000) + /** * The most flexible of the [[os.ProcGroup]] calls. It sets-up a pipeline of processes, * and returns a [[ProcessPipeline]] for you to interact with however you like. diff --git a/os/src/SubProcess.scala b/os/src/SubProcess.scala index 61961619..5534df3d 100644 --- a/os/src/SubProcess.scala +++ b/os/src/SubProcess.scala @@ -49,19 +49,25 @@ sealed trait ProcessLike extends java.lang.AutoCloseable { */ def waitFor(timeout: Long = -1): Boolean + // FIXME: docs /** * Wait up to `millis` for the [[ProcessLike]] to terminate and all stdout and stderr * from the subprocess to be handled. By default waits indefinitely; if a time * limit is given, explicitly destroys the [[ProcessLike]] if it has not completed by * the time the timeout has occurred */ - def join(timeout: Long = -1): Boolean + def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean + + @deprecated("do not use this", "0.10.4") + @deprecatedOverriding("this method is now a forwarder, and should not be overriden", "0.10.4") + private [os] def join(timeout: Long): Boolean = join(timeout, timeoutGracePeriod = 1000) } /** * Represents a spawn subprocess that has started and may or may not have * completed. */ +@deprecatedInheritance("this class will be made final: if you are using it be aware that `join` has a new overloading", "0.10.4") class SubProcess( val wrapped: java.lang.Process, val inputPumperThread: Option[Thread], @@ -120,7 +126,8 @@ class SubProcess( * limit is given, explicitly destroys the subprocess if it has not completed by * the time the timeout has occurred */ - def join(timeout: Long = -1): Boolean = { + def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean = { + // FIXME: val exitedCleanly = waitFor(timeout) if (!exitedCleanly) { destroy() @@ -222,6 +229,7 @@ object SubProcess { } } +@deprecatedInheritance("this class will be made final: if you are using it be aware that `join` has a new overloading", "0.10.4") class ProcessPipeline( val processes: Seq[SubProcess], pipefail: Boolean, @@ -344,7 +352,8 @@ class ProcessPipeline( * in pipeline. By default waits indefinitely; if a time limit is given, explicitly * destroys each process if it has not completed by the time the timeout has occurred. */ - override def join(timeout: Long = -1): Boolean = { + override def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean = { + // FIXME: @tailrec def joinRec(startedAt: Long, processesLeft: Seq[SubProcess], result: Boolean): Boolean = processesLeft match { @@ -352,7 +361,7 @@ class ProcessPipeline( case head :: tail => val elapsed = System.currentTimeMillis() - startedAt val timeoutLeft = Math.max(0, timeout - elapsed) - val exitedCleanly = head.join(timeoutLeft) + val exitedCleanly = head.join(timeoutLeft) // FIXME: joinRec(startedAt, tail, result && exitedCleanly) } From c691dd13eff905e85e2bebc74d325222cb840abf Mon Sep 17 00:00:00 2001 From: Jamie Willis Date: Fri, 26 Jul 2024 14:17:09 +0100 Subject: [PATCH 2/9] fix: removed dummy deprecation warning --- os/src/SubProcess.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/os/src/SubProcess.scala b/os/src/SubProcess.scala index 5534df3d..c55c0c45 100644 --- a/os/src/SubProcess.scala +++ b/os/src/SubProcess.scala @@ -58,7 +58,6 @@ sealed trait ProcessLike extends java.lang.AutoCloseable { */ def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean - @deprecated("do not use this", "0.10.4") @deprecatedOverriding("this method is now a forwarder, and should not be overriden", "0.10.4") private [os] def join(timeout: Long): Boolean = join(timeout, timeoutGracePeriod = 1000) } From f0dea066ef33208e73aa46fae0652281b9024e02 Mon Sep 17 00:00:00 2001 From: Jamie Willis Date: Fri, 26 Jul 2024 17:13:03 +0100 Subject: [PATCH 3/9] feat: implementation of single-process join --- os/src/SubProcess.scala | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/os/src/SubProcess.scala b/os/src/SubProcess.scala index c55c0c45..d3e60ee5 100644 --- a/os/src/SubProcess.scala +++ b/os/src/SubProcess.scala @@ -119,6 +119,7 @@ class SubProcess( } } + // FIXME: documentation /** * Wait up to `millis` for the subprocess to terminate and all stdout and stderr * from the subprocess to be handled. By default waits indefinitely; if a time @@ -126,11 +127,16 @@ class SubProcess( * the time the timeout has occurred */ def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean = { - // FIXME: val exitedCleanly = waitFor(timeout) if (!exitedCleanly) { - destroy() - destroyForcibly() + if (timeoutGracePeriod == -1) destroy() + else if (timeoutGracePeriod == 0) destroyForcibly() + else { + destroy() + if (!waitFor(timeoutGracePeriod)) { + destroyForcibly() + } + } waitFor(-1) } outputPumperThread.foreach(_.join()) @@ -370,6 +376,11 @@ class ProcessPipeline( val timeNow = System.currentTimeMillis() joinRec(timeNow, processes, true) } + + /* + outputPumperThread.foreach(_.join()) + errorPumperThread.foreach(_.join()) + */ } } From 6dc9926cc7fae8b75df047440dd318f17cef00d4 Mon Sep 17 00:00:00 2001 From: Jamie Willis Date: Fri, 26 Jul 2024 17:32:18 +0100 Subject: [PATCH 4/9] feat(SubProcess): implement the piplined join --- os/src/SubProcess.scala | 43 ++++++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/os/src/SubProcess.scala b/os/src/SubProcess.scala index d3e60ee5..16c47223 100644 --- a/os/src/SubProcess.scala +++ b/os/src/SubProcess.scala @@ -129,6 +129,7 @@ class SubProcess( def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean = { val exitedCleanly = waitFor(timeout) if (!exitedCleanly) { + assume(timeout != -1, "if the waitFor does not complete cleanly, this implies there is a timeout imposed, so the grace period is applicable") if (timeoutGracePeriod == -1) destroy() else if (timeoutGracePeriod == 0) destroyForcibly() else { @@ -352,35 +353,41 @@ class ProcessPipeline( } } + // FIXME: documentation /** * Wait up to `millis` for the [[ProcessPipeline]] to terminate all the processes * in pipeline. By default waits indefinitely; if a time limit is given, explicitly * destroys each process if it has not completed by the time the timeout has occurred. */ override def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean = { - // FIXME: - @tailrec - def joinRec(startedAt: Long, processesLeft: Seq[SubProcess], result: Boolean): Boolean = - processesLeft match { - case Nil => result - case head :: tail => - val elapsed = System.currentTimeMillis() - startedAt - val timeoutLeft = Math.max(0, timeout - elapsed) - val exitedCleanly = head.join(timeoutLeft) // FIXME: - joinRec(startedAt, tail, result && exitedCleanly) - } + // previously, this was implemented in a similar way to the waitFor above, but with + // join logic. This is much harder to make work with the grace period, because we + // want to evenly give all threads a chance to terminate gracefully. As such, this + // implementation interleaves the single-process join implementation more fairly + // in this case, the grace period does not apply, so fine if (timeout == -1) { processes.forall(_.join()) } else { - val timeNow = System.currentTimeMillis() - joinRec(timeNow, processes, true) + // timeout is active, so the grace period must be accounted for + val exitedCleanly = waitFor(timeout) + if (!exitedCleanly) { + if (timeoutGracePeriod == -1) destroy() + else if (timeoutGracePeriod == 0) destroyForcibly() + else { + destroy() + if (!waitFor(timeoutGracePeriod)) { + destroyForcibly() + } + } + waitFor(-1) + // note that this is the only part that isn't shared with the other implementation of join... they could be unified if this is made an + // abstract method: then this implementation can move to the superclass (except for the default -1 case above, but that could be done via an override) + processes.flatMap(_.outputPumperThread).foreach(_.join()) + processes.flatMap(_.errorPumperThread).foreach(_.join()) + } + exitedCleanly } - - /* - outputPumperThread.foreach(_.join()) - errorPumperThread.foreach(_.join()) - */ } } From d0d3df6996d14bca269d219a0a716772ee6288cf Mon Sep 17 00:00:00 2001 From: Jamie Willis Date: Mon, 29 Jul 2024 10:32:32 +0100 Subject: [PATCH 5/9] chore: added MiMA exception for ProcessLike.join --- build.sc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/build.sc b/build.sc index ced4926c..3e70ee74 100644 --- a/build.sc +++ b/build.sc @@ -53,7 +53,9 @@ trait SafeDeps extends ScalaModule { trait MiMaChecks extends Mima { def mimaPreviousVersions = Seq("0.9.0", "0.9.1", "0.9.2", "0.9.3", "0.10.0") override def mimaBinaryIssueFilters: T[Seq[ProblemFilter]] = Seq( - ProblemFilter.exclude[ReversedMissingMethodProblem]("os.PathConvertible.isCustomFs") + ProblemFilter.exclude[ReversedMissingMethodProblem]("os.PathConvertible.isCustomFs"), + // this is fine, because ProcessLike is sealed (and its subclasses should be final) + ProblemFilter.exclude[ReversedMissingMethodProblem]("os.ProcessLike.join") ) } From 6c2678e0bfb61e1cfa4d2b873385ede0260080bc Mon Sep 17 00:00:00 2001 From: Jamie Willis Date: Mon, 29 Jul 2024 10:35:01 +0100 Subject: [PATCH 6/9] fixed formatting --- os/src/ProcessOps.scala | 38 +++++++++++++++++++++++++++++++------- os/src/SubProcess.scala | 19 ++++++++++++++----- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/os/src/ProcessOps.scala b/os/src/ProcessOps.scala index 583f3721..fdcfd681 100644 --- a/os/src/ProcessOps.scala +++ b/os/src/ProcessOps.scala @@ -73,7 +73,7 @@ case class proc(command: Shellable*) { check: Boolean = true, propagateEnv: Boolean = true, // this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode) - timeoutGracePeriod: Long = 1000, + timeoutGracePeriod: Long = 1000 ): CommandResult = { val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]] @@ -103,7 +103,7 @@ case class proc(command: Shellable*) { } // forwarder for the new timeoutGracePeriod flag - private [proc] def call( + private[proc] def call( cwd: Path, env: Map[String, String], stdin: ProcessInput, @@ -113,7 +113,18 @@ case class proc(command: Shellable*) { timeout: Long, check: Boolean, propagateEnv: Boolean - ): CommandResult = call(cwd, env, stdin, stdout, stderr, mergeErrIntoOut, timeout, check, propagateEnv, timeoutGracePeriod = 1000) + ): CommandResult = call( + cwd, + env, + stdin, + stdout, + stderr, + mergeErrIntoOut, + timeout, + check, + propagateEnv, + timeoutGracePeriod = 1000 + ) /** * The most flexible of the [[os.proc]] calls, `os.proc.spawn` simply configures @@ -237,7 +248,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) { pipefail: Boolean = true, handleBrokenPipe: Boolean = !isWindows, // this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode) - timeoutGracePeriod: Long = 1000, + timeoutGracePeriod: Long = 1000 ): CommandResult = { val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]] @@ -267,7 +278,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) { else throw SubprocessException(res) } - private [ProcGroup] def call( + private[ProcGroup] def call( cwd: Path, env: Map[String, String], stdin: ProcessInput, @@ -278,8 +289,21 @@ case class ProcGroup private[os] (commands: Seq[proc]) { check: Boolean, propagateEnv: Boolean, pipefail: Boolean, - handleBrokenPipe: Boolean, - ): CommandResult = call(cwd, env, stdin, stdout, stderr, mergeErrIntoOut, timeout, check, propagateEnv, pipefail, handleBrokenPipe, timeoutGracePeriod = 1000) + handleBrokenPipe: Boolean + ): CommandResult = call( + cwd, + env, + stdin, + stdout, + stderr, + mergeErrIntoOut, + timeout, + check, + propagateEnv, + pipefail, + handleBrokenPipe, + timeoutGracePeriod = 1000 + ) /** * The most flexible of the [[os.ProcGroup]] calls. It sets-up a pipeline of processes, diff --git a/os/src/SubProcess.scala b/os/src/SubProcess.scala index 16c47223..760235c9 100644 --- a/os/src/SubProcess.scala +++ b/os/src/SubProcess.scala @@ -59,14 +59,17 @@ sealed trait ProcessLike extends java.lang.AutoCloseable { def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean @deprecatedOverriding("this method is now a forwarder, and should not be overriden", "0.10.4") - private [os] def join(timeout: Long): Boolean = join(timeout, timeoutGracePeriod = 1000) + private[os] def join(timeout: Long): Boolean = join(timeout, timeoutGracePeriod = 1000) } /** * Represents a spawn subprocess that has started and may or may not have * completed. */ -@deprecatedInheritance("this class will be made final: if you are using it be aware that `join` has a new overloading", "0.10.4") +@deprecatedInheritance( + "this class will be made final: if you are using it be aware that `join` has a new overloading", + "0.10.4" +) class SubProcess( val wrapped: java.lang.Process, val inputPumperThread: Option[Thread], @@ -129,13 +132,16 @@ class SubProcess( def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean = { val exitedCleanly = waitFor(timeout) if (!exitedCleanly) { - assume(timeout != -1, "if the waitFor does not complete cleanly, this implies there is a timeout imposed, so the grace period is applicable") + assume( + timeout != -1, + "if the waitFor does not complete cleanly, this implies there is a timeout imposed, so the grace period is applicable" + ) if (timeoutGracePeriod == -1) destroy() else if (timeoutGracePeriod == 0) destroyForcibly() else { destroy() if (!waitFor(timeoutGracePeriod)) { - destroyForcibly() + destroyForcibly() } } waitFor(-1) @@ -235,7 +241,10 @@ object SubProcess { } } -@deprecatedInheritance("this class will be made final: if you are using it be aware that `join` has a new overloading", "0.10.4") +@deprecatedInheritance( + "this class will be made final: if you are using it be aware that `join` has a new overloading", + "0.10.4" +) class ProcessPipeline( val processes: Seq[SubProcess], pipefail: Boolean, From 5c6bae79c535ef9579c1986247032ecae7ffdd3a Mon Sep 17 00:00:00 2001 From: Jamie Willis Date: Mon, 29 Jul 2024 11:24:22 +0100 Subject: [PATCH 7/9] fix: fixed forwarder scope, which results in private bytecode --- os/src/ProcessOps.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/os/src/ProcessOps.scala b/os/src/ProcessOps.scala index fdcfd681..952c5b73 100644 --- a/os/src/ProcessOps.scala +++ b/os/src/ProcessOps.scala @@ -103,7 +103,7 @@ case class proc(command: Shellable*) { } // forwarder for the new timeoutGracePeriod flag - private[proc] def call( + private[os] def call( cwd: Path, env: Map[String, String], stdin: ProcessInput, @@ -278,7 +278,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) { else throw SubprocessException(res) } - private[ProcGroup] def call( + private[os] def call( cwd: Path, env: Map[String, String], stdin: ProcessInput, From 40ec7165d5e346411124504b8b1a0ddeb7c9a19b Mon Sep 17 00:00:00 2001 From: Jamie Willis Date: Tue, 30 Jul 2024 11:02:04 +0100 Subject: [PATCH 8/9] adjusted default timeout to 100ms and factored common code --- build.sc | 2 +- os/src/ProcessOps.scala | 8 ++-- os/src/SubProcess.scala | 92 ++++++++++++++++------------------------- 3 files changed, 41 insertions(+), 61 deletions(-) diff --git a/build.sc b/build.sc index 3e70ee74..c7eb122c 100644 --- a/build.sc +++ b/build.sc @@ -55,7 +55,7 @@ trait MiMaChecks extends Mima { override def mimaBinaryIssueFilters: T[Seq[ProblemFilter]] = Seq( ProblemFilter.exclude[ReversedMissingMethodProblem]("os.PathConvertible.isCustomFs"), // this is fine, because ProcessLike is sealed (and its subclasses should be final) - ProblemFilter.exclude[ReversedMissingMethodProblem]("os.ProcessLike.join") + ProblemFilter.exclude[ReversedMissingMethodProblem]("os.ProcessLike.joinPumperThreadsHook") ) } diff --git a/os/src/ProcessOps.scala b/os/src/ProcessOps.scala index 952c5b73..0dbdac5b 100644 --- a/os/src/ProcessOps.scala +++ b/os/src/ProcessOps.scala @@ -73,7 +73,7 @@ case class proc(command: Shellable*) { check: Boolean = true, propagateEnv: Boolean = true, // this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode) - timeoutGracePeriod: Long = 1000 + timeoutGracePeriod: Long = 100 ): CommandResult = { val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]] @@ -123,7 +123,7 @@ case class proc(command: Shellable*) { timeout, check, propagateEnv, - timeoutGracePeriod = 1000 + timeoutGracePeriod = 100 ) /** @@ -248,7 +248,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) { pipefail: Boolean = true, handleBrokenPipe: Boolean = !isWindows, // this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode) - timeoutGracePeriod: Long = 1000 + timeoutGracePeriod: Long = 100 ): CommandResult = { val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]] @@ -302,7 +302,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) { propagateEnv, pipefail, handleBrokenPipe, - timeoutGracePeriod = 1000 + timeoutGracePeriod = 100 ) /** diff --git a/os/src/SubProcess.scala b/os/src/SubProcess.scala index 760235c9..722e5004 100644 --- a/os/src/SubProcess.scala +++ b/os/src/SubProcess.scala @@ -56,10 +56,34 @@ sealed trait ProcessLike extends java.lang.AutoCloseable { * limit is given, explicitly destroys the [[ProcessLike]] if it has not completed by * the time the timeout has occurred */ - def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean + def join(timeout: Long = -1, timeoutGracePeriod: Long = 100): Boolean = { + val exitedCleanly = waitFor(timeout) + if (!exitedCleanly) { + assume( + timeout != -1, + "if the waitFor does not complete cleanly, this implies there is a timeout imposed, so the grace period is applicable" + ) + if (timeoutGracePeriod == -1) destroy() + else if (timeoutGracePeriod == 0) destroyForcibly() + else { + destroy() + if (!waitFor(timeoutGracePeriod)) { + destroyForcibly() + } + } + waitFor(-1) + } + joinPumperThreadsHook() + exitedCleanly + } @deprecatedOverriding("this method is now a forwarder, and should not be overriden", "0.10.4") - private[os] def join(timeout: Long): Boolean = join(timeout, timeoutGracePeriod = 1000) + private[os] def join(timeout: Long): Boolean = join(timeout, timeoutGracePeriod = 100) + + /** + * A hook method used by `join` to close the input and output streams associated with the process, not for public consumption. + */ + private[os] def joinPumperThreadsHook(): Unit } /** @@ -122,33 +146,9 @@ class SubProcess( } } - // FIXME: documentation - /** - * Wait up to `millis` for the subprocess to terminate and all stdout and stderr - * from the subprocess to be handled. By default waits indefinitely; if a time - * limit is given, explicitly destroys the subprocess if it has not completed by - * the time the timeout has occurred - */ - def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean = { - val exitedCleanly = waitFor(timeout) - if (!exitedCleanly) { - assume( - timeout != -1, - "if the waitFor does not complete cleanly, this implies there is a timeout imposed, so the grace period is applicable" - ) - if (timeoutGracePeriod == -1) destroy() - else if (timeoutGracePeriod == 0) destroyForcibly() - else { - destroy() - if (!waitFor(timeoutGracePeriod)) { - destroyForcibly() - } - } - waitFor(-1) - } + private[os] def joinPumperThreadsHook(): Unit = { outputPumperThread.foreach(_.join()) errorPumperThread.foreach(_.join()) - exitedCleanly } } @@ -335,12 +335,12 @@ class ProcessPipeline( } /** - * Wait up to `millis` for the [[ProcessPipeline]] to terminate, by default waits + * Wait up to `timeout` for the [[ProcessPipeline]] to terminate, by default waits * indefinitely. Returns `true` if the [[ProcessPipeline]] has terminated by the time * this method returns. * * Waits for each process one by one, while aggregating the total time waited. If - * [[timeout]] has passed before all processes have terminated, returns `false`. + * `timeout` has passed before all processes have terminated, returns `false`. */ override def waitFor(timeout: Long = -1): Boolean = { @tailrec @@ -364,39 +364,19 @@ class ProcessPipeline( // FIXME: documentation /** - * Wait up to `millis` for the [[ProcessPipeline]] to terminate all the processes + * Wait up to `timeout` for the [[ProcessPipeline]] to terminate all the processes * in pipeline. By default waits indefinitely; if a time limit is given, explicitly * destroys each process if it has not completed by the time the timeout has occurred. */ - override def join(timeout: Long = -1, timeoutGracePeriod: Long = 1000): Boolean = { - // previously, this was implemented in a similar way to the waitFor above, but with - // join logic. This is much harder to make work with the grace period, because we - // want to evenly give all threads a chance to terminate gracefully. As such, this - // implementation interleaves the single-process join implementation more fairly - + override def join(timeout: Long = -1, timeoutGracePeriod: Long = 100): Boolean = { // in this case, the grace period does not apply, so fine if (timeout == -1) { processes.forall(_.join()) - } else { - // timeout is active, so the grace period must be accounted for - val exitedCleanly = waitFor(timeout) - if (!exitedCleanly) { - if (timeoutGracePeriod == -1) destroy() - else if (timeoutGracePeriod == 0) destroyForcibly() - else { - destroy() - if (!waitFor(timeoutGracePeriod)) { - destroyForcibly() - } - } - waitFor(-1) - // note that this is the only part that isn't shared with the other implementation of join... they could be unified if this is made an - // abstract method: then this implementation can move to the superclass (except for the default -1 case above, but that could be done via an override) - processes.flatMap(_.outputPumperThread).foreach(_.join()) - processes.flatMap(_.errorPumperThread).foreach(_.join()) - } - exitedCleanly - } + } else super.join(timeout, timeoutGracePeriod) + } + + private[os] def joinPumperThreadsHook(): Unit = { + processes.foreach(_.joinPumperThreadsHook()) } } From fdc5a3f9e53036ee07c1e421614ed2bbd337a6e4 Mon Sep 17 00:00:00 2001 From: Jamie Willis Date: Mon, 5 Aug 2024 09:33:15 +0100 Subject: [PATCH 9/9] added documentation --- os/src/ProcessOps.scala | 6 ++++++ os/src/SubProcess.scala | 22 +++++++++++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/os/src/ProcessOps.scala b/os/src/ProcessOps.scala index 0dbdac5b..fa8cc973 100644 --- a/os/src/ProcessOps.scala +++ b/os/src/ProcessOps.scala @@ -61,6 +61,9 @@ case class proc(command: Shellable*) { * subprocess to gracefully terminate before attempting to * forcibly kill it * (-1 for no kill, 0 for always kill immediately) + * + * @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be + * issued. Check the documentation for your JDK's `Process.destroy`. */ def call( cwd: Path = null, @@ -234,6 +237,9 @@ case class ProcGroup private[os] (commands: Seq[proc]) { * subprocess to gracefully terminate before attempting to * forcibly kill it * (-1 for no kill, 0 for always kill immediately) + * + * @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be + * issued. Check the documentation for your JDK's `Process.destroy`. */ def call( cwd: Path = null, diff --git a/os/src/SubProcess.scala b/os/src/SubProcess.scala index 722e5004..075b2a83 100644 --- a/os/src/SubProcess.scala +++ b/os/src/SubProcess.scala @@ -49,12 +49,20 @@ sealed trait ProcessLike extends java.lang.AutoCloseable { */ def waitFor(timeout: Long = -1): Boolean - // FIXME: docs /** * Wait up to `millis` for the [[ProcessLike]] to terminate and all stdout and stderr * from the subprocess to be handled. By default waits indefinitely; if a time * limit is given, explicitly destroys the [[ProcessLike]] if it has not completed by - * the time the timeout has occurred + * the time the timeout has occurred. + * + * By default, a process is destroyed by sending a `SIGTERM` signal, which allows an opportunity + * for it to clean up any resources it was using. If the process is unresponsive to this, a + * `SIGKILL` signal is sent `timeoutGracePeriod` milliseconds later. If `timeoutGracePeriod` is + * `0`, then there is no `SIGTERM`; if it is `-1`, there is no `SIGKILL` sent. + * + * @returns `true` when the process did not require explicit termination by either `SIGTERM` or `SIGKILL` and `false` otherwise. + * @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be + * issued. Check the documentation for your JDK's `Process.destroy`. */ def join(timeout: Long = -1, timeoutGracePeriod: Long = 100): Boolean = { val exitedCleanly = waitFor(timeout) @@ -362,11 +370,19 @@ class ProcessPipeline( } } - // FIXME: documentation /** * Wait up to `timeout` for the [[ProcessPipeline]] to terminate all the processes * in pipeline. By default waits indefinitely; if a time limit is given, explicitly * destroys each process if it has not completed by the time the timeout has occurred. + * + * By default, the processes are destroyed by sending `SIGTERM` signals, which allows an opportunity + * for them to clean up any resources it. If any process is unresponsive to this, a + * `SIGKILL` signal is sent `timeoutGracePeriod` milliseconds later. If `timeoutGracePeriod` is + * `0`, then there is no `SIGTERM`; if it is `-1`, there is no `SIGKILL` sent. + * + * @returns `true` when the processes did not require explicit termination by either `SIGTERM` or `SIGKILL` and `false` otherwise. + * @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be + * issued. Check the documentation for your JDK's `Process.destroy`. */ override def join(timeout: Long = -1, timeoutGracePeriod: Long = 100): Boolean = { // in this case, the grace period does not apply, so fine