Skip to content

Commit b3169c0

Browse files
committed
Fixes to runTerraformCmd for race conditions
- Use waitgroups for more readability - Improve handling errors from writeOutput - Finish reading from pipes before calling cmd.Wait - fixes a race condition that leads to an error :`read |0: file already closed` - Because now waiting for pipes to finish reading, need to update waitGroup to close buf.Read on context cancel. Otherwise buf.Read blocks until next line before stopping. Causes TestContext_sleepTimeoutExpired takes a little too long to cancel (~20s)
1 parent 594b411 commit b3169c0

File tree

3 files changed

+58
-31
lines changed

3 files changed

+58
-31
lines changed

tfexec/cmd.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,21 @@ func mergeWriters(writers ...io.Writer) io.Writer {
233233
return io.MultiWriter(compact...)
234234
}
235235

236-
func writeOutput(r io.ReadCloser, w io.Writer) error {
236+
func writeOutput(ctx context.Context, r io.ReadCloser, w io.Writer) error {
237+
// ReadBytes will block until bytes are read, which can cause a delay in
238+
// returning even if the command's context has been canceled. Use a separate
239+
// goroutine to prompt ReadBytes to return on cancel
240+
closeCtx, closeCancel := context.WithCancel(ctx)
241+
defer closeCancel()
242+
go func() {
243+
select {
244+
case <-ctx.Done():
245+
r.Close()
246+
case <-closeCtx.Done():
247+
return
248+
}
249+
}()
250+
237251
buf := bufio.NewReader(r)
238252
for {
239253
line, err := buf.ReadBytes('\n')

tfexec/cmd_default.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"os/exec"
99
"strings"
10+
"sync"
1011
)
1112

1213
func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
@@ -46,15 +47,24 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
4647
return tf.wrapExitError(ctx, err, "")
4748
}
4849

49-
exitChLen := 2
50-
exitCh := make(chan error, exitChLen)
50+
var errStdout, errStderr error
51+
var wg sync.WaitGroup
52+
wg.Add(1)
5153
go func() {
52-
exitCh <- writeOutput(stdoutPipe, stdoutWriter)
54+
defer wg.Done()
55+
errStdout = writeOutput(ctx, stdoutPipe, stdoutWriter)
5356
}()
57+
58+
wg.Add(1)
5459
go func() {
55-
exitCh <- writeOutput(stderrPipe, stderrWriter)
60+
defer wg.Done()
61+
errStderr = writeOutput(ctx, stderrPipe, stderrWriter)
5662
}()
5763

64+
// Reads from pipes must be completed before calling cmd.Wait(). Otherwise
65+
// can cause a race condition
66+
wg.Wait()
67+
5868
err = cmd.Wait()
5969
if err == nil && ctx.Err() != nil {
6070
err = ctx.Err()
@@ -63,16 +73,13 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
6373
return tf.wrapExitError(ctx, err, errBuf.String())
6474
}
6575

66-
// Wait for the logs to finish writing
67-
counter := 0
68-
for {
69-
counter++
70-
err := <-exitCh
71-
if err != nil && err != context.Canceled {
72-
return tf.wrapExitError(ctx, err, errBuf.String())
73-
}
74-
if counter >= exitChLen {
75-
return ctx.Err()
76-
}
76+
// Return error if there was an issue reading the std out/err
77+
if errStdout != nil && ctx.Err() != nil {
78+
return tf.wrapExitError(ctx, errStdout, errBuf.String())
7779
}
80+
if errStderr != nil && ctx.Err() != nil {
81+
return tf.wrapExitError(ctx, errStderr, errBuf.String())
82+
}
83+
84+
return nil
7885
}

tfexec/cmd_linux.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,24 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
5151
return tf.wrapExitError(ctx, err, "")
5252
}
5353

54-
exitChLen := 2
55-
exitCh := make(chan error, exitChLen)
54+
var errStdout, errStderr error
55+
var wg sync.WaitGroup
56+
wg.Add(1)
5657
go func() {
57-
exitCh <- writeOutput(stdoutPipe, stdoutWriter)
58+
defer wg.Done()
59+
errStdout = writeOutput(ctx, stdoutPipe, stdoutWriter)
5860
}()
61+
62+
wg.Add(1)
5963
go func() {
60-
exitCh <- writeOutput(stderrPipe, stderrWriter)
64+
defer wg.Done()
65+
errStderr = writeOutput(ctx, stderrPipe, stderrWriter)
6166
}()
6267

68+
// Reads from pipes must be completed before calling cmd.Wait(). Otherwise
69+
// can cause a race condition
70+
wg.Wait()
71+
6372
err = cmd.Wait()
6473
if err == nil && ctx.Err() != nil {
6574
err = ctx.Err()
@@ -68,16 +77,13 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
6877
return tf.wrapExitError(ctx, err, errBuf.String())
6978
}
7079

71-
// Wait for the logs to finish writing
72-
counter := 0
73-
for {
74-
counter++
75-
err := <-exitCh
76-
if err != nil && err != context.Canceled {
77-
return tf.wrapExitError(ctx, err, errBuf.String())
78-
}
79-
if counter >= exitChLen {
80-
return ctx.Err()
81-
}
80+
// Return error if there was an issue reading the std out/err
81+
if errStdout != nil && ctx.Err() != nil {
82+
return tf.wrapExitError(ctx, errStdout, errBuf.String())
8283
}
84+
if errStderr != nil && ctx.Err() != nil {
85+
return tf.wrapExitError(ctx, errStderr, errBuf.String())
86+
}
87+
88+
return nil
8389
}

0 commit comments

Comments
 (0)