Skip to content

Commit 6609238

Browse files
authored
Added proper SIGTERM/SIGKILL handling for sub-processes (#286)
Fixes #284. The idea is to introduce a new flag to both `os.proc.call` and `os.ProcGroup.call`, as well as backing implementations in `os.ProcessLike`. The flag, `timeoutGracePeriod`, allows for a configurable use of `SIGKILL`, which forcibly kills a process, after the issue of a `SIGTERM`, which allows a process to clean up. The values for the flag have the following behaviours: |value | behaviour| |------|-----------| | -1 | When `timeout != -1`, only a `SIGTERM` will be issued, requiring the child process to gracefully terminate. | | 0 | When `timeout != -1`, only a `SIGKILL` is issued, demanding the child terminate immediately with no chance of graceful cleanup. | | n > 0 | `timeout != -1`, first issue a `SIGTERM` and then wait for a further `n` milliseconds before issuing a `SIGKILL`, this provides a reasonable timeframe for process cleanup, but accounts for misbehaving processes. | For now, the default has been set to `timeoutGracePeriod = 1000`.
1 parent 6ab4c65 commit 6609238

File tree

3 files changed

+172
-69
lines changed

3 files changed

+172
-69
lines changed

build.sc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ trait SafeDeps extends ScalaModule {
5353
trait MiMaChecks extends Mima {
5454
def mimaPreviousVersions = Seq("0.9.0", "0.9.1", "0.9.2", "0.9.3", "0.10.0")
5555
override def mimaBinaryIssueFilters: T[Seq[ProblemFilter]] = Seq(
56-
ProblemFilter.exclude[ReversedMissingMethodProblem]("os.PathConvertible.isCustomFs")
56+
ProblemFilter.exclude[ReversedMissingMethodProblem]("os.PathConvertible.isCustomFs"),
57+
// this is fine, because ProcessLike is sealed (and its subclasses should be final)
58+
ProblemFilter.exclude[ReversedMissingMethodProblem]("os.ProcessLike.joinPumperThreadsHook")
5759
)
5860
}
5961

os/src/ProcessOps.scala

Lines changed: 103 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,25 @@ case class proc(command: Shellable*) {
4545
* `call` provides a number of parameters that let you configure how the subprocess
4646
* is run:
4747
*
48-
* @param cwd the working directory of the subprocess
49-
* @param env any additional environment variables you wish to set in the subprocess
50-
* @param stdin any data you wish to pass to the subprocess's standard input
51-
* @param stdout How the process's output stream is configured.
52-
* @param stderr How the process's error stream is configured.
53-
* @param mergeErrIntoOut merges the subprocess's stderr stream into it's stdout
54-
* @param timeout how long to wait in milliseconds for the subprocess to complete
55-
* @param check disable this to avoid throwing an exception if the subprocess
56-
* fails with a non-zero exit code
57-
* @param propagateEnv disable this to avoid passing in this parent process's
58-
* environment variables to the subprocess
48+
* @param cwd the working directory of the subprocess
49+
* @param env any additional environment variables you wish to set in the subprocess
50+
* @param stdin any data you wish to pass to the subprocess's standard input
51+
* @param stdout How the process's output stream is configured.
52+
* @param stderr How the process's error stream is configured.
53+
* @param mergeErrIntoOut merges the subprocess's stderr stream into it's stdout
54+
* @param timeout how long to wait in milliseconds for the subprocess to complete
55+
* (-1 for no timeout)
56+
* @param check disable this to avoid throwing an exception if the subprocess
57+
* fails with a non-zero exit code
58+
* @param propagateEnv disable this to avoid passing in this parent process's
59+
* environment variables to the subprocess
60+
* @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the
61+
* subprocess to gracefully terminate before attempting to
62+
* forcibly kill it
63+
* (-1 for no kill, 0 for always kill immediately)
64+
*
65+
* @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be
66+
* issued. Check the documentation for your JDK's `Process.destroy`.
5967
*/
6068
def call(
6169
cwd: Path = null,
@@ -66,7 +74,9 @@ case class proc(command: Shellable*) {
6674
mergeErrIntoOut: Boolean = false,
6775
timeout: Long = -1,
6876
check: Boolean = true,
69-
propagateEnv: Boolean = true
77+
propagateEnv: Boolean = true,
78+
// this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode)
79+
timeoutGracePeriod: Long = 100
7080
): CommandResult = {
7181

7282
val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]
@@ -87,14 +97,38 @@ case class proc(command: Shellable*) {
8797
propagateEnv
8898
)
8999

90-
sub.join(timeout)
100+
sub.join(timeout, timeoutGracePeriod)
91101

92102
val chunksSeq = chunks.iterator.asScala.toIndexedSeq
93103
val res = CommandResult(commandChunks, sub.exitCode(), chunksSeq)
94104
if (res.exitCode == 0 || !check) res
95105
else throw SubprocessException(res)
96106
}
97107

108+
// forwarder for the new timeoutGracePeriod flag
109+
private[os] def call(
110+
cwd: Path,
111+
env: Map[String, String],
112+
stdin: ProcessInput,
113+
stdout: ProcessOutput,
114+
stderr: ProcessOutput,
115+
mergeErrIntoOut: Boolean,
116+
timeout: Long,
117+
check: Boolean,
118+
propagateEnv: Boolean
119+
): CommandResult = call(
120+
cwd,
121+
env,
122+
stdin,
123+
stdout,
124+
stderr,
125+
mergeErrIntoOut,
126+
timeout,
127+
check,
128+
propagateEnv,
129+
timeoutGracePeriod = 100
130+
)
131+
98132
/**
99133
* The most flexible of the [[os.proc]] calls, `os.proc.spawn` simply configures
100134
* and starts a subprocess, and returns it as a `java.lang.Process` for you to
@@ -181,24 +215,31 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
181215
* `call` provides a number of parameters that let you configure how the pipeline
182216
* is run:
183217
*
184-
* @param cwd the working directory of the pipeline
185-
* @param env any additional environment variables you wish to set in the pipeline
186-
* @param stdin any data you wish to pass to the pipelines's standard input (to the first process)
187-
* @param stdout How the pipelines's output stream is configured (the last process stdout)
188-
* @param stderr How the process's error stream is configured (set for all processes)
189-
* @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the
190-
* stderr will be forwarded with stdout to subsequent processes in the pipeline.
191-
* @param timeout how long to wait in milliseconds for the pipeline to complete
192-
* @param check disable this to avoid throwing an exception if the pipeline
193-
* fails with a non-zero exit code
194-
* @param propagateEnv disable this to avoid passing in this parent process's
195-
* environment variables to the pipeline
196-
* @param pipefail if true, the pipeline's exitCode will be the exit code of the first
197-
* failing process. If no process fails, the exit code will be 0.
198-
* @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process
199-
* will be caught and handled by killing the writing process. This behaviour
200-
* is consistent with handlers of SIGPIPE signals in most programs
201-
* supporting interruptable piping. Disabled by default on Windows.
218+
* @param cwd the working directory of the pipeline
219+
* @param env any additional environment variables you wish to set in the pipeline
220+
* @param stdin any data you wish to pass to the pipelines's standard input (to the first process)
221+
* @param stdout How the pipelines's output stream is configured (the last process stdout)
222+
* @param stderr How the process's error stream is configured (set for all processes)
223+
* @param mergeErrIntoOut merges the pipeline's stderr stream into it's stdout. Note that then the
224+
* stderr will be forwarded with stdout to subsequent processes in the pipeline.
225+
* @param timeout how long to wait in milliseconds for the pipeline to complete
226+
* @param check disable this to avoid throwing an exception if the pipeline
227+
* fails with a non-zero exit code
228+
* @param propagateEnv disable this to avoid passing in this parent process's
229+
* environment variables to the pipeline
230+
* @param pipefail if true, the pipeline's exitCode will be the exit code of the first
231+
* failing process. If no process fails, the exit code will be 0.
232+
* @param handleBrokenPipe if true, every [[java.io.IOException]] when redirecting output of a process
233+
* will be caught and handled by killing the writing process. This behaviour
234+
* is consistent with handlers of SIGPIPE signals in most programs
235+
* supporting interruptable piping. Disabled by default on Windows.
236+
* @param timeoutGracePeriod if the timeout is enabled, how long in milliseconds for the
237+
* subprocess to gracefully terminate before attempting to
238+
* forcibly kill it
239+
* (-1 for no kill, 0 for always kill immediately)
240+
*
241+
* @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be
242+
* issued. Check the documentation for your JDK's `Process.destroy`.
202243
*/
203244
def call(
204245
cwd: Path = null,
@@ -211,7 +252,9 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
211252
check: Boolean = true,
212253
propagateEnv: Boolean = true,
213254
pipefail: Boolean = true,
214-
handleBrokenPipe: Boolean = !isWindows
255+
handleBrokenPipe: Boolean = !isWindows,
256+
// this cannot be next to `timeout` as this will introduce a bin-compat break (default arguments are numbered in the bytecode)
257+
timeoutGracePeriod: Long = 100
215258
): CommandResult = {
216259
val chunks = new java.util.concurrent.ConcurrentLinkedQueue[Either[geny.Bytes, geny.Bytes]]
217260

@@ -232,7 +275,7 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
232275
pipefail
233276
)
234277

235-
sub.join(timeout)
278+
sub.join(timeout, timeoutGracePeriod)
236279

237280
val chunksSeq = chunks.iterator.asScala.toIndexedSeq
238281
val res =
@@ -241,6 +284,33 @@ case class ProcGroup private[os] (commands: Seq[proc]) {
241284
else throw SubprocessException(res)
242285
}
243286

287+
private[os] def call(
288+
cwd: Path,
289+
env: Map[String, String],
290+
stdin: ProcessInput,
291+
stdout: ProcessOutput,
292+
stderr: ProcessOutput,
293+
mergeErrIntoOut: Boolean,
294+
timeout: Long,
295+
check: Boolean,
296+
propagateEnv: Boolean,
297+
pipefail: Boolean,
298+
handleBrokenPipe: Boolean
299+
): CommandResult = call(
300+
cwd,
301+
env,
302+
stdin,
303+
stdout,
304+
stderr,
305+
mergeErrIntoOut,
306+
timeout,
307+
check,
308+
propagateEnv,
309+
pipefail,
310+
handleBrokenPipe,
311+
timeoutGracePeriod = 100
312+
)
313+
244314
/**
245315
* The most flexible of the [[os.ProcGroup]] calls. It sets-up a pipeline of processes,
246316
* and returns a [[ProcessPipeline]] for you to interact with however you like.

os/src/SubProcess.scala

Lines changed: 66 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,55 @@ sealed trait ProcessLike extends java.lang.AutoCloseable {
5353
* Wait up to `millis` for the [[ProcessLike]] to terminate and all stdout and stderr
5454
* from the subprocess to be handled. By default waits indefinitely; if a time
5555
* limit is given, explicitly destroys the [[ProcessLike]] if it has not completed by
56-
* the time the timeout has occurred
56+
* the time the timeout has occurred.
57+
*
58+
* By default, a process is destroyed by sending a `SIGTERM` signal, which allows an opportunity
59+
* for it to clean up any resources it was using. If the process is unresponsive to this, a
60+
* `SIGKILL` signal is sent `timeoutGracePeriod` milliseconds later. If `timeoutGracePeriod` is
61+
* `0`, then there is no `SIGTERM`; if it is `-1`, there is no `SIGKILL` sent.
62+
*
63+
* @returns `true` when the process did not require explicit termination by either `SIGTERM` or `SIGKILL` and `false` otherwise.
64+
* @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be
65+
* issued. Check the documentation for your JDK's `Process.destroy`.
66+
*/
67+
def join(timeout: Long = -1, timeoutGracePeriod: Long = 100): Boolean = {
68+
val exitedCleanly = waitFor(timeout)
69+
if (!exitedCleanly) {
70+
assume(
71+
timeout != -1,
72+
"if the waitFor does not complete cleanly, this implies there is a timeout imposed, so the grace period is applicable"
73+
)
74+
if (timeoutGracePeriod == -1) destroy()
75+
else if (timeoutGracePeriod == 0) destroyForcibly()
76+
else {
77+
destroy()
78+
if (!waitFor(timeoutGracePeriod)) {
79+
destroyForcibly()
80+
}
81+
}
82+
waitFor(-1)
83+
}
84+
joinPumperThreadsHook()
85+
exitedCleanly
86+
}
87+
88+
@deprecatedOverriding("this method is now a forwarder, and should not be overriden", "0.10.4")
89+
private[os] def join(timeout: Long): Boolean = join(timeout, timeoutGracePeriod = 100)
90+
91+
/**
92+
* A hook method used by `join` to close the input and output streams associated with the process, not for public consumption.
5793
*/
58-
def join(timeout: Long = -1): Boolean
94+
private[os] def joinPumperThreadsHook(): Unit
5995
}
6096

6197
/**
6298
* Represents a spawn subprocess that has started and may or may not have
6399
* completed.
64100
*/
101+
@deprecatedInheritance(
102+
"this class will be made final: if you are using it be aware that `join` has a new overloading",
103+
"0.10.4"
104+
)
65105
class SubProcess(
66106
val wrapped: java.lang.Process,
67107
val inputPumperThread: Option[Thread],
@@ -114,22 +154,9 @@ class SubProcess(
114154
}
115155
}
116156

117-
/**
118-
* Wait up to `millis` for the subprocess to terminate and all stdout and stderr
119-
* from the subprocess to be handled. By default waits indefinitely; if a time
120-
* limit is given, explicitly destroys the subprocess if it has not completed by
121-
* the time the timeout has occurred
122-
*/
123-
def join(timeout: Long = -1): Boolean = {
124-
val exitedCleanly = waitFor(timeout)
125-
if (!exitedCleanly) {
126-
destroy()
127-
destroyForcibly()
128-
waitFor(-1)
129-
}
157+
private[os] def joinPumperThreadsHook(): Unit = {
130158
outputPumperThread.foreach(_.join())
131159
errorPumperThread.foreach(_.join())
132-
exitedCleanly
133160
}
134161
}
135162

@@ -222,6 +249,10 @@ object SubProcess {
222249
}
223250
}
224251

252+
@deprecatedInheritance(
253+
"this class will be made final: if you are using it be aware that `join` has a new overloading",
254+
"0.10.4"
255+
)
225256
class ProcessPipeline(
226257
val processes: Seq[SubProcess],
227258
pipefail: Boolean,
@@ -312,12 +343,12 @@ class ProcessPipeline(
312343
}
313344

314345
/**
315-
* Wait up to `millis` for the [[ProcessPipeline]] to terminate, by default waits
346+
* Wait up to `timeout` for the [[ProcessPipeline]] to terminate, by default waits
316347
* indefinitely. Returns `true` if the [[ProcessPipeline]] has terminated by the time
317348
* this method returns.
318349
*
319350
* Waits for each process one by one, while aggregating the total time waited. If
320-
* [[timeout]] has passed before all processes have terminated, returns `false`.
351+
* `timeout` has passed before all processes have terminated, returns `false`.
321352
*/
322353
override def waitFor(timeout: Long = -1): Boolean = {
323354
@tailrec
@@ -340,28 +371,28 @@ class ProcessPipeline(
340371
}
341372

342373
/**
343-
* Wait up to `millis` for the [[ProcessPipeline]] to terminate all the processes
374+
* Wait up to `timeout` for the [[ProcessPipeline]] to terminate all the processes
344375
* in pipeline. By default waits indefinitely; if a time limit is given, explicitly
345376
* destroys each process if it has not completed by the time the timeout has occurred.
377+
*
378+
* By default, the processes are destroyed by sending `SIGTERM` signals, which allows an opportunity
379+
* for them to clean up any resources it. If any process is unresponsive to this, a
380+
* `SIGKILL` signal is sent `timeoutGracePeriod` milliseconds later. If `timeoutGracePeriod` is
381+
* `0`, then there is no `SIGTERM`; if it is `-1`, there is no `SIGKILL` sent.
382+
*
383+
* @returns `true` when the processes did not require explicit termination by either `SIGTERM` or `SIGKILL` and `false` otherwise.
384+
* @note the issuing of `SIGTERM` instead of `SIGKILL` is implementation dependent on your JVM version. Pre-Java 9, no `SIGTERM` may be
385+
* issued. Check the documentation for your JDK's `Process.destroy`.
346386
*/
347-
override def join(timeout: Long = -1): Boolean = {
348-
@tailrec
349-
def joinRec(startedAt: Long, processesLeft: Seq[SubProcess], result: Boolean): Boolean =
350-
processesLeft match {
351-
case Nil => result
352-
case head :: tail =>
353-
val elapsed = System.currentTimeMillis() - startedAt
354-
val timeoutLeft = Math.max(0, timeout - elapsed)
355-
val exitedCleanly = head.join(timeoutLeft)
356-
joinRec(startedAt, tail, result && exitedCleanly)
357-
}
358-
387+
override def join(timeout: Long = -1, timeoutGracePeriod: Long = 100): Boolean = {
388+
// in this case, the grace period does not apply, so fine
359389
if (timeout == -1) {
360390
processes.forall(_.join())
361-
} else {
362-
val timeNow = System.currentTimeMillis()
363-
joinRec(timeNow, processes, true)
364-
}
391+
} else super.join(timeout, timeoutGracePeriod)
392+
}
393+
394+
private[os] def joinPumperThreadsHook(): Unit = {
395+
processes.foreach(_.joinPumperThreadsHook())
365396
}
366397
}
367398

0 commit comments

Comments
 (0)