From e7eca1d553401ab5aa358356838672fa800638fa Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Wed, 20 Apr 2022 23:14:04 +0100 Subject: [PATCH 01/12] Prevent dangling cat-file calls If an `os/exec.Command` is passed non `*os.File` as an input/output, go will create `os.Pipe`s and wait for their closure in `cmd.Wait()`. If the code following this is responsible for closing `io.Pipe`s or other handlers then on process death from context cancellation the `Wait` can hang. There are two possible solutions: 1. use `os.Pipe` as the input/output as `cmd.Wait` does not wait for these. 2. create a goroutine waiting on the context cancellation that will close the inputs. This PR changes the inputs to the `CatFileBatch` calls to use `os.Pipe`s preventing dangling cat-file calls. Signed-off-by: Andrew Thornton --- modules/git/batch_reader.go | 74 +++++++++++++++++------- modules/git/utils_closererror.go | 99 ++++++++++++++++++++++++++++++++ 2 files changed, 151 insertions(+), 22 deletions(-) create mode 100644 modules/git/utils_closererror.go diff --git a/modules/git/batch_reader.go b/modules/git/batch_reader.go index 5a0a82b13a9c8..fa9f774c6f5c4 100644 --- a/modules/git/batch_reader.go +++ b/modules/git/batch_reader.go @@ -9,24 +9,15 @@ import ( "bytes" "context" "fmt" - "io" "math" + "os" "runtime" "strconv" "strings" "code.gitea.io/gitea/modules/log" - - "github.com/djherbis/buffer" - "github.com/djherbis/nio/v3" ) -// WriteCloserError wraps an io.WriteCloser with an additional CloseWithError function -type WriteCloserError interface { - io.WriteCloser - CloseWithError(err error) error -} - // EnsureValidGitRepository runs git rev-parse in the repository path - thus ensuring that the repository is a valid repository. // Run before opening git cat-file. // This is needed otherwise the git cat-file will hang for invalid repositories. @@ -44,10 +35,28 @@ func EnsureValidGitRepository(ctx context.Context, repoPath string) error { return nil } +func returnClosedReaderWriters(err error) (WriteCloserError, *bufio.Reader, func()) { + wr := &writeCloserError{} + rd := &readCloserError{} + + _ = wr.CloseWithError(err) + _ = rd.CloseWithError(err) + + return wr, bufio.NewReader(rd), func() {} +} + // CatFileBatchCheck opens git cat-file --batch-check in the provided repo and returns a stdin pipe, a stdout reader and cancel function func CatFileBatchCheck(ctx context.Context, repoPath string) (WriteCloserError, *bufio.Reader, func()) { - batchStdinReader, batchStdinWriter := io.Pipe() - batchStdoutReader, batchStdoutWriter := io.Pipe() + batchStdinReader, batchStdinWriter, err := os.Pipe() + if err != nil { + log.Critical("Unable to open pipe to write to: %v", err) + return returnClosedReaderWriters(err) + } + batchStdoutReader, batchStdoutWriter, err := os.Pipe() + if err != nil { + log.Critical("Unable to open pipe to write to: %v", err) + return returnClosedReaderWriters(err) + } ctx, ctxCancel := context.WithCancel(ctx) closed := make(chan struct{}) cancel := func() { @@ -60,6 +69,9 @@ func CatFileBatchCheck(ctx context.Context, repoPath string) (WriteCloserError, _, filename, line, _ := runtime.Caller(2) filename = strings.TrimPrefix(filename, callerPrefix) + wr := newWriteCloserError(batchStdinWriter) + rd := newReadCloserError(batchStdoutReader) + go func() { stderr := strings.Builder{} err := NewCommand(ctx, "cat-file", "--batch-check"). @@ -71,8 +83,11 @@ func CatFileBatchCheck(ctx context.Context, repoPath string) (WriteCloserError, Stderr: &stderr, }) if err != nil { - _ = batchStdoutWriter.CloseWithError(ConcatenateError(err, (&stderr).String())) - _ = batchStdinReader.CloseWithError(ConcatenateError(err, (&stderr).String())) + err := ConcatenateError(err, (&stderr).String()) + _ = wr.CloseWithError(err) + _ = rd.CloseWithError(err) + _ = batchStdoutWriter.Close() + _ = batchStdinReader.Close() } else { _ = batchStdoutWriter.Close() _ = batchStdinReader.Close() @@ -81,17 +96,29 @@ func CatFileBatchCheck(ctx context.Context, repoPath string) (WriteCloserError, }() // For simplicities sake we'll use a buffered reader to read from the cat-file --batch-check - batchReader := bufio.NewReader(batchStdoutReader) + batchReader := bufio.NewReader(rd) - return batchStdinWriter, batchReader, cancel + return wr, batchReader, cancel } // CatFileBatch opens git cat-file --batch in the provided repo and returns a stdin pipe, a stdout reader and cancel function func CatFileBatch(ctx context.Context, repoPath string) (WriteCloserError, *bufio.Reader, func()) { // We often want to feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary. // so let's create a batch stdin and stdout - batchStdinReader, batchStdinWriter := io.Pipe() - batchStdoutReader, batchStdoutWriter := nio.Pipe(buffer.New(32 * 1024)) + batchStdinReader, batchStdinWriter, err := os.Pipe() + if err != nil { + log.Critical("Unable to open pipe to write to: %v", err) + return returnClosedReaderWriters(err) + } + batchStdoutReader, batchStdoutWriter, err := os.Pipe() + if err != nil { + log.Critical("Unable to open pipe to write to: %v", err) + return returnClosedReaderWriters(err) + } + + wr := newWriteCloserError(batchStdinWriter) + rd := newReadCloserError(batchStdoutReader) + ctx, ctxCancel := context.WithCancel(ctx) closed := make(chan struct{}) cancel := func() { @@ -115,8 +142,11 @@ func CatFileBatch(ctx context.Context, repoPath string) (WriteCloserError, *bufi Stderr: &stderr, }) if err != nil { - _ = batchStdoutWriter.CloseWithError(ConcatenateError(err, (&stderr).String())) - _ = batchStdinReader.CloseWithError(ConcatenateError(err, (&stderr).String())) + err := ConcatenateError(err, (&stderr).String()) + _ = wr.CloseWithError(err) + _ = rd.CloseWithError(err) + _ = batchStdoutWriter.Close() + _ = batchStdinReader.Close() } else { _ = batchStdoutWriter.Close() _ = batchStdinReader.Close() @@ -125,9 +155,9 @@ func CatFileBatch(ctx context.Context, repoPath string) (WriteCloserError, *bufi }() // For simplicities sake we'll us a buffered reader to read from the cat-file --batch - batchReader := bufio.NewReaderSize(batchStdoutReader, 32*1024) + batchReader := bufio.NewReaderSize(rd, 32*1024) - return batchStdinWriter, batchReader, cancel + return wr, batchReader, cancel } // ReadBatchLine reads the header line from cat-file --batch diff --git a/modules/git/utils_closererror.go b/modules/git/utils_closererror.go new file mode 100644 index 0000000000000..bcadfe31692ef --- /dev/null +++ b/modules/git/utils_closererror.go @@ -0,0 +1,99 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package git + +import ( + "io" + "sync" +) + +type closerError struct { + io.Closer + err error + lock sync.Mutex +} + +func (c *closerError) CloseWithError(err error) error { + c.lock.Lock() + defer c.lock.Unlock() + if c.err != nil { + return c.err + } + if c.Closer != nil { + _ = c.Closer.Close() + } + c.err = err + return nil +} + +func (c *closerError) Close() error { + c.lock.Lock() + defer c.lock.Unlock() + if c.err != nil { + return c.err + } + c.err = c.Closer.Close() + return c.err +} + +// WriteCloserError wraps an io.WriteCloser with an additional CloseWithError function +type WriteCloserError interface { + io.WriteCloser + CloseWithError(err error) error +} + +type writeCloserError struct { + io.Writer + closerError +} + +func (c *writeCloserError) Write(p []byte) (n int, err error) { + c.lock.Lock() + if c.err != nil { + return 0, c.err + } + c.lock.Unlock() // Unlock here to prevent hanging writes causing a deadlock in close + n, err = c.Writer.Write(p) + return +} + +func newWriteCloserError(w io.WriteCloser) WriteCloserError { + return &writeCloserError{ + Writer: w, + closerError: closerError{ + Closer: w, + }, + } +} + +// ReadCloserError wraps an io.ReadCloser with an additional CloseWithError function +type ReadCloserError interface { + io.ReadCloser + CloseWithError(err error) error +} + +type readCloserError struct { + io.Reader + closerError +} + +func (c *readCloserError) Read(p []byte) (n int, err error) { + c.lock.Lock() + if c.err != nil { + return 0, c.err + } + c.lock.Unlock() // Unlock here to prevent hanging reads causing a deadlock in close + n, err = c.Reader.Read(p) + return +} + +func newReadCloserError(r io.ReadCloser) ReadCloserError { + return &readCloserError{ + Reader: r, + closerError: closerError{ + Closer: r, + }, + } +} From 96d6ae2335ff8545170422d9cf808782228e8894 Mon Sep 17 00:00:00 2001 From: zeripath Date: Thu, 21 Apr 2022 00:02:44 +0100 Subject: [PATCH 02/12] Update modules/git/utils_closererror.go Co-authored-by: Gusted --- modules/git/utils_closererror.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/git/utils_closererror.go b/modules/git/utils_closererror.go index bcadfe31692ef..294823954befb 100644 --- a/modules/git/utils_closererror.go +++ b/modules/git/utils_closererror.go @@ -24,7 +24,10 @@ func (c *closerError) CloseWithError(err error) error { if c.Closer != nil { _ = c.Closer.Close() } - c.err = err + if err == nil { + err = io.ErrClosedPipe + } + c.err = err return nil } From 5e18cb551262c2ba04676e52175a8075368555fb Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Thu, 21 Apr 2022 01:31:33 +0200 Subject: [PATCH 03/12] gofmt --- modules/git/utils_closererror.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/git/utils_closererror.go b/modules/git/utils_closererror.go index 294823954befb..159581ab019d9 100644 --- a/modules/git/utils_closererror.go +++ b/modules/git/utils_closererror.go @@ -27,7 +27,7 @@ func (c *closerError) CloseWithError(err error) error { if err == nil { err = io.ErrClosedPipe } - c.err = err + c.err = err return nil } From 7983720eb69bf44ef7523ee7fd4d27f71ce27e40 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Thu, 21 Apr 2022 09:06:04 +0100 Subject: [PATCH 04/12] revert change to nil error in CloseWithError Signed-off-by: Andrew Thornton --- modules/git/utils_closererror.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/modules/git/utils_closererror.go b/modules/git/utils_closererror.go index 159581ab019d9..bcadfe31692ef 100644 --- a/modules/git/utils_closererror.go +++ b/modules/git/utils_closererror.go @@ -24,9 +24,6 @@ func (c *closerError) CloseWithError(err error) error { if c.Closer != nil { _ = c.Closer.Close() } - if err == nil { - err = io.ErrClosedPipe - } c.err = err return nil } From 7f8174ba39e16869bb1d93785accadf714d5e550 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Thu, 21 Apr 2022 22:33:09 +0100 Subject: [PATCH 05/12] Fix other uses of io.Pipe and create a new Pipe similar to io.Pipe but backed by *os.File Signed-off-by: Andrew Thornton --- go.mod | 2 - go.sum | 5 - modules/git/batch_reader.go | 49 +++---- modules/git/blob.go | 20 +-- modules/git/command.go | 18 ++- modules/git/commit.go | 8 +- modules/git/log_name_status.go | 10 +- modules/git/pipeline/catfile.go | 9 +- modules/git/pipeline/lfs.go | 13 +- modules/git/pipeline/lfs_nogogit.go | 18 ++- modules/git/pipeline/namerev.go | 3 +- modules/git/pipeline/revlist.go | 8 +- modules/git/repo.go | 13 +- modules/git/repo_branch_nogogit.go | 5 +- modules/git/repo_commit.go | 5 +- modules/git/repo_ref_nogogit.go | 5 +- modules/git/repo_tag.go | 6 +- modules/git/utils_closererror.go | 89 +++--------- modules/git/utils_pipe.go | 171 +++++++++++++++++++++++ modules/lfs/pointer_scanner_nogogit.go | 61 +++++++- services/gitdiff/gitdiff.go | 8 +- services/migrations/gitea_uploader.go | 42 ++++-- services/pull/lfs.go | 47 ++++++- services/pull/review.go | 7 +- services/repository/archiver/archiver.go | 8 +- 25 files changed, 434 insertions(+), 196 deletions(-) create mode 100644 modules/git/utils_pipe.go diff --git a/go.mod b/go.mod index bfb87a1b37a8b..3fe86ec5816e9 100644 --- a/go.mod +++ b/go.mod @@ -18,8 +18,6 @@ require ( github.com/caddyserver/certmagic v0.15.4 github.com/chi-middleware/proxy v1.1.1 github.com/denisenkom/go-mssqldb v0.12.0 - github.com/djherbis/buffer v1.2.0 - github.com/djherbis/nio/v3 v3.0.1 github.com/duo-labs/webauthn v0.0.0-20220223184316-4d1cf2d34051 github.com/dustin/go-humanize v1.0.0 github.com/editorconfig/editorconfig-core-go/v2 v2.4.3 diff --git a/go.sum b/go.sum index d969c26bf533d..a5d6cea344595 100644 --- a/go.sum +++ b/go.sum @@ -369,11 +369,6 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8= -github.com/djherbis/buffer v1.1.0/go.mod h1:VwN8VdFkMY0DCALdY8o00d3IZ6Amz/UNVMWcSaJT44o= -github.com/djherbis/buffer v1.2.0 h1:PH5Dd2ss0C7CRRhQCZ2u7MssF+No9ide8Ye71nPHcrQ= -github.com/djherbis/buffer v1.2.0/go.mod h1:fjnebbZjCUpPinBRD+TDwXSOeNQ7fPQWLfGQqiAiUyE= -github.com/djherbis/nio/v3 v3.0.1 h1:6wxhnuppteMa6RHA4L81Dq7ThkZH8SwnDzXDYy95vB4= -github.com/djherbis/nio/v3 v3.0.1/go.mod h1:Ng4h80pbZFMla1yKzm61cF0tqqilXZYrogmWgZxOcmg= github.com/dlclark/regexp2 v1.4.0 h1:F1rxgk7p4uKjwIQxBs9oAXe5CqrXlCduYEJvrF4u93E= github.com/dlclark/regexp2 v1.4.0/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= diff --git a/modules/git/batch_reader.go b/modules/git/batch_reader.go index fa9f774c6f5c4..f3cfab11a8b1a 100644 --- a/modules/git/batch_reader.go +++ b/modules/git/batch_reader.go @@ -10,7 +10,6 @@ import ( "context" "fmt" "math" - "os" "runtime" "strconv" "strings" @@ -36,24 +35,21 @@ func EnsureValidGitRepository(ctx context.Context, repoPath string) error { } func returnClosedReaderWriters(err error) (WriteCloserError, *bufio.Reader, func()) { - wr := &writeCloserError{} - rd := &readCloserError{} - - _ = wr.CloseWithError(err) - _ = rd.CloseWithError(err) - - return wr, bufio.NewReader(rd), func() {} + wr := &ClosedReadWriteCloserError{err} + return wr, bufio.NewReader(wr), func() {} } // CatFileBatchCheck opens git cat-file --batch-check in the provided repo and returns a stdin pipe, a stdout reader and cancel function func CatFileBatchCheck(ctx context.Context, repoPath string) (WriteCloserError, *bufio.Reader, func()) { - batchStdinReader, batchStdinWriter, err := os.Pipe() + batchStdinReader, batchStdinWriter, err := Pipe() if err != nil { log.Critical("Unable to open pipe to write to: %v", err) return returnClosedReaderWriters(err) } - batchStdoutReader, batchStdoutWriter, err := os.Pipe() + batchStdoutReader, batchStdoutWriter, err := Pipe() if err != nil { + _ = batchStdinReader.Close() + _ = batchStdinWriter.Close() log.Critical("Unable to open pipe to write to: %v", err) return returnClosedReaderWriters(err) } @@ -69,9 +65,6 @@ func CatFileBatchCheck(ctx context.Context, repoPath string) (WriteCloserError, _, filename, line, _ := runtime.Caller(2) filename = strings.TrimPrefix(filename, callerPrefix) - wr := newWriteCloserError(batchStdinWriter) - rd := newReadCloserError(batchStdoutReader) - go func() { stderr := strings.Builder{} err := NewCommand(ctx, "cat-file", "--batch-check"). @@ -84,10 +77,8 @@ func CatFileBatchCheck(ctx context.Context, repoPath string) (WriteCloserError, }) if err != nil { err := ConcatenateError(err, (&stderr).String()) - _ = wr.CloseWithError(err) - _ = rd.CloseWithError(err) - _ = batchStdoutWriter.Close() - _ = batchStdinReader.Close() + _ = batchStdinReader.CloseWithError(err) + _ = batchStdoutWriter.CloseWithError(err) } else { _ = batchStdoutWriter.Close() _ = batchStdinReader.Close() @@ -96,29 +87,27 @@ func CatFileBatchCheck(ctx context.Context, repoPath string) (WriteCloserError, }() // For simplicities sake we'll use a buffered reader to read from the cat-file --batch-check - batchReader := bufio.NewReader(rd) - - return wr, batchReader, cancel + batchReader := bufio.NewReader(batchStdoutReader) + return batchStdinWriter, batchReader, cancel } // CatFileBatch opens git cat-file --batch in the provided repo and returns a stdin pipe, a stdout reader and cancel function func CatFileBatch(ctx context.Context, repoPath string) (WriteCloserError, *bufio.Reader, func()) { // We often want to feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary. // so let's create a batch stdin and stdout - batchStdinReader, batchStdinWriter, err := os.Pipe() + batchStdinReader, batchStdinWriter, err := Pipe() if err != nil { log.Critical("Unable to open pipe to write to: %v", err) return returnClosedReaderWriters(err) } - batchStdoutReader, batchStdoutWriter, err := os.Pipe() + batchStdoutReader, batchStdoutWriter, err := Pipe() if err != nil { + _ = batchStdinReader.Close() + _ = batchStdinWriter.Close() log.Critical("Unable to open pipe to write to: %v", err) return returnClosedReaderWriters(err) } - wr := newWriteCloserError(batchStdinWriter) - rd := newReadCloserError(batchStdoutReader) - ctx, ctxCancel := context.WithCancel(ctx) closed := make(chan struct{}) cancel := func() { @@ -143,10 +132,8 @@ func CatFileBatch(ctx context.Context, repoPath string) (WriteCloserError, *bufi }) if err != nil { err := ConcatenateError(err, (&stderr).String()) - _ = wr.CloseWithError(err) - _ = rd.CloseWithError(err) - _ = batchStdoutWriter.Close() - _ = batchStdinReader.Close() + _ = batchStdinReader.CloseWithError(err) + _ = batchStdoutWriter.CloseWithError(err) } else { _ = batchStdoutWriter.Close() _ = batchStdinReader.Close() @@ -155,9 +142,9 @@ func CatFileBatch(ctx context.Context, repoPath string) (WriteCloserError, *bufi }() // For simplicities sake we'll us a buffered reader to read from the cat-file --batch - batchReader := bufio.NewReaderSize(rd, 32*1024) + batchReader := bufio.NewReaderSize(batchStdoutReader, 32*1024) - return wr, batchReader, cancel + return batchStdinWriter, batchReader, cancel } // ReadBatchLine reads the header line from cat-file --batch diff --git a/modules/git/blob.go b/modules/git/blob.go index 9567affd03745..34e5b533b94b1 100644 --- a/modules/git/blob.go +++ b/modules/git/blob.go @@ -9,6 +9,7 @@ import ( "bytes" "encoding/base64" "io" + "strings" "code.gitea.io/gitea/modules/typesniffer" "code.gitea.io/gitea/modules/util" @@ -69,25 +70,18 @@ func (b *Blob) GetBlobContentBase64() (string, error) { } defer dataRc.Close() - pr, pw := io.Pipe() - encoder := base64.NewEncoder(base64.StdEncoding, pw) + sb := &strings.Builder{} - go func() { - _, err := io.Copy(encoder, dataRc) - _ = encoder.Close() + encoder := base64.NewEncoder(base64.StdEncoding, sb) - if err != nil { - _ = pw.CloseWithError(err) - } else { - _ = pw.Close() - } - }() + _, err = io.Copy(encoder, dataRc) + _ = encoder.Close() - out, err := io.ReadAll(pr) if err != nil { return "", err } - return string(out), nil + + return sb.String(), nil } // GuessContentType guesses the content type of the blob. diff --git a/modules/git/command.go b/modules/git/command.go index 3dd12e421e409..7007216c6243d 100644 --- a/modules/git/command.go +++ b/modules/git/command.go @@ -158,9 +158,21 @@ func (c *Command) Run(opts *RunOpts) error { ) cmd.Dir = opts.Dir - cmd.Stdout = opts.Stdout - cmd.Stderr = opts.Stderr - cmd.Stdin = opts.Stdin + if pipeWriter, ok := opts.Stdout.(*PipeWriter); ok { + cmd.Stdout = pipeWriter.File() + } else { + cmd.Stdout = opts.Stdout + } + if pipeWriter, ok := opts.Stderr.(*PipeWriter); ok { + cmd.Stderr = pipeWriter.File() + } else { + cmd.Stderr = opts.Stderr + } + if pipeReader, ok := opts.Stdin.(*PipeReader); ok { + cmd.Stdin = pipeReader.File() + } else { + cmd.Stdin = opts.Stdin + } if err := cmd.Start(); err != nil { return err } diff --git a/modules/git/commit.go b/modules/git/commit.go index 8337e54fef27d..a8e1c5824a1b3 100644 --- a/modules/git/commit.go +++ b/modules/git/commit.go @@ -12,6 +12,7 @@ import ( "errors" "fmt" "io" + "os" "os/exec" "strconv" "strings" @@ -475,7 +476,10 @@ func parseCommitFileStatus(fileStatus *CommitFileStatus, stdout io.Reader) { // GetCommitFileStatus returns file status of commit in given repository. func GetCommitFileStatus(ctx context.Context, repoPath, commitID string) (*CommitFileStatus, error) { - stdout, w := io.Pipe() + stdout, w, err := os.Pipe() + if err != nil { + return nil, err + } done := make(chan struct{}) fileStatus := NewCommitFileStatus() go func() { @@ -486,7 +490,7 @@ func GetCommitFileStatus(ctx context.Context, repoPath, commitID string) (*Commi stderr := new(bytes.Buffer) args := []string{"log", "--name-status", "-c", "--pretty=format:", "--parents", "--no-renames", "-z", "-1", commitID} - err := NewCommand(ctx, args...).Run(&RunOpts{ + err = NewCommand(ctx, args...).Run(&RunOpts{ Dir: repoPath, Stdout: w, Stderr: stderr, diff --git a/modules/git/log_name_status.go b/modules/git/log_name_status.go index ffd0a0991bf43..8819353bc3c1e 100644 --- a/modules/git/log_name_status.go +++ b/modules/git/log_name_status.go @@ -13,15 +13,19 @@ import ( "sort" "strings" - "github.com/djherbis/buffer" - "github.com/djherbis/nio/v3" + "code.gitea.io/gitea/modules/log" ) // LogNameStatusRepo opens git log --raw in the provided repo and returns a stdin pipe, a stdout reader and cancel function func LogNameStatusRepo(ctx context.Context, repository, head, treepath string, paths ...string) (*bufio.Reader, func()) { // We often want to feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary. // so let's create a batch stdin and stdout - stdoutReader, stdoutWriter := nio.Pipe(buffer.New(32 * 1024)) + stdoutReader, stdoutWriter, err := Pipe() + if err != nil { + log.Critical("Unable to open pipe to write to: %v", err) + rd := &ClosedReadWriteCloserError{err} + return bufio.NewReader(rd), func() {} + } // Lets also create a context so that we can absolutely ensure that the command should die when we're done ctx, ctxCancel := context.WithCancel(ctx) diff --git a/modules/git/pipeline/catfile.go b/modules/git/pipeline/catfile.go index 40dd2bca2936d..dea5fe18eb087 100644 --- a/modules/git/pipeline/catfile.go +++ b/modules/git/pipeline/catfile.go @@ -9,7 +9,6 @@ import ( "bytes" "context" "fmt" - "io" "strconv" "strings" "sync" @@ -19,7 +18,7 @@ import ( ) // CatFileBatchCheck runs cat-file with --batch-check -func CatFileBatchCheck(ctx context.Context, shasToCheckReader *io.PipeReader, catFileCheckWriter *io.PipeWriter, wg *sync.WaitGroup, tmpBasePath string) { +func CatFileBatchCheck(ctx context.Context, shasToCheckReader git.ReadCloserError, catFileCheckWriter git.WriteCloserError, wg *sync.WaitGroup, tmpBasePath string) { defer wg.Done() defer shasToCheckReader.Close() defer catFileCheckWriter.Close() @@ -38,7 +37,7 @@ func CatFileBatchCheck(ctx context.Context, shasToCheckReader *io.PipeReader, ca } // CatFileBatchCheckAllObjects runs cat-file with --batch-check --batch-all -func CatFileBatchCheckAllObjects(ctx context.Context, catFileCheckWriter *io.PipeWriter, wg *sync.WaitGroup, tmpBasePath string, errChan chan<- error) { +func CatFileBatchCheckAllObjects(ctx context.Context, catFileCheckWriter git.WriteCloserError, wg *sync.WaitGroup, tmpBasePath string, errChan chan<- error) { defer wg.Done() defer catFileCheckWriter.Close() @@ -58,7 +57,7 @@ func CatFileBatchCheckAllObjects(ctx context.Context, catFileCheckWriter *io.Pip } // CatFileBatch runs cat-file --batch -func CatFileBatch(ctx context.Context, shasToBatchReader *io.PipeReader, catFileBatchWriter *io.PipeWriter, wg *sync.WaitGroup, tmpBasePath string) { +func CatFileBatch(ctx context.Context, shasToBatchReader git.ReadCloserError, catFileBatchWriter git.WriteCloserError, wg *sync.WaitGroup, tmpBasePath string) { defer wg.Done() defer shasToBatchReader.Close() defer catFileBatchWriter.Close() @@ -76,7 +75,7 @@ func CatFileBatch(ctx context.Context, shasToBatchReader *io.PipeReader, catFile } // BlobsLessThan1024FromCatFileBatchCheck reads a pipeline from cat-file --batch-check and returns the blobs <1024 in size -func BlobsLessThan1024FromCatFileBatchCheck(catFileCheckReader *io.PipeReader, shasToBatchWriter *io.PipeWriter, wg *sync.WaitGroup) { +func BlobsLessThan1024FromCatFileBatchCheck(catFileCheckReader git.ReadCloserError, shasToBatchWriter git.WriteCloserError, wg *sync.WaitGroup) { defer wg.Done() defer catFileCheckReader.Close() scanner := bufio.NewScanner(catFileCheckReader) diff --git a/modules/git/pipeline/lfs.go b/modules/git/pipeline/lfs.go index 1b64b672e4582..5e223b6eb0318 100644 --- a/modules/git/pipeline/lfs.go +++ b/modules/git/pipeline/lfs.go @@ -99,8 +99,17 @@ func FindLFSFile(repo *git.Repository, hash git.SHA1) ([]*LFSResult, error) { sort.Sort(lfsResultSlice(results)) // Should really use a go-git function here but name-rev is not completed and recapitulating it is not simple - shasToNameReader, shasToNameWriter := io.Pipe() - nameRevStdinReader, nameRevStdinWriter := io.Pipe() + shasToNameReader, shasToNameWriter, err := git.Pipe() + if err != nil { + return nil, err + } + nameRevStdinReader, nameRevStdinWriter, err := git.Pipe() + if err != nil { + _ = shasToNameReader.Close() + _ = shasToNameWriter.Close() + return nil, err + } + errChan := make(chan error, 1) wg := sync.WaitGroup{} wg.Add(3) diff --git a/modules/git/pipeline/lfs_nogogit.go b/modules/git/pipeline/lfs_nogogit.go index 31c10c6002f6f..e0a20e535be86 100644 --- a/modules/git/pipeline/lfs_nogogit.go +++ b/modules/git/pipeline/lfs_nogogit.go @@ -45,7 +45,10 @@ func FindLFSFile(repo *git.Repository, hash git.SHA1) ([]*LFSResult, error) { basePath := repo.Path // Use rev-list to provide us with all commits in order - revListReader, revListWriter := io.Pipe() + revListReader, revListWriter, err := git.Pipe() + if err != nil { + return nil, err + } defer func() { _ = revListWriter.Close() _ = revListReader.Close() @@ -195,8 +198,17 @@ func FindLFSFile(repo *git.Repository, hash git.SHA1) ([]*LFSResult, error) { sort.Sort(lfsResultSlice(results)) // Should really use a go-git function here but name-rev is not completed and recapitulating it is not simple - shasToNameReader, shasToNameWriter := io.Pipe() - nameRevStdinReader, nameRevStdinWriter := io.Pipe() + shasToNameReader, shasToNameWriter, err := git.Pipe() + if err != nil { + return nil, err + } + nameRevStdinReader, nameRevStdinWriter, err := git.Pipe() + if err != nil { + _ = shasToNameReader.Close() + _ = shasToNameWriter.Close() + return nil, err + } + errChan := make(chan error, 1) wg := sync.WaitGroup{} wg.Add(3) diff --git a/modules/git/pipeline/namerev.go b/modules/git/pipeline/namerev.go index 8356e70234459..421b80830de17 100644 --- a/modules/git/pipeline/namerev.go +++ b/modules/git/pipeline/namerev.go @@ -8,7 +8,6 @@ import ( "bytes" "context" "fmt" - "io" "strings" "sync" @@ -16,7 +15,7 @@ import ( ) // NameRevStdin runs name-rev --stdin -func NameRevStdin(ctx context.Context, shasToNameReader *io.PipeReader, nameRevStdinWriter *io.PipeWriter, wg *sync.WaitGroup, tmpBasePath string) { +func NameRevStdin(ctx context.Context, shasToNameReader git.ReadCloserError, nameRevStdinWriter git.WriteCloserError, wg *sync.WaitGroup, tmpBasePath string) { defer wg.Done() defer shasToNameReader.Close() defer nameRevStdinWriter.Close() diff --git a/modules/git/pipeline/revlist.go b/modules/git/pipeline/revlist.go index 02619cb58304f..c24cbb310f6af 100644 --- a/modules/git/pipeline/revlist.go +++ b/modules/git/pipeline/revlist.go @@ -9,7 +9,6 @@ import ( "bytes" "context" "fmt" - "io" "strings" "sync" @@ -18,7 +17,7 @@ import ( ) // RevListAllObjects runs rev-list --objects --all and writes to a pipewriter -func RevListAllObjects(ctx context.Context, revListWriter *io.PipeWriter, wg *sync.WaitGroup, basePath string, errChan chan<- error) { +func RevListAllObjects(ctx context.Context, revListWriter git.WriteCloserError, wg *sync.WaitGroup, basePath string, errChan chan<- error) { defer wg.Done() defer revListWriter.Close() @@ -38,7 +37,7 @@ func RevListAllObjects(ctx context.Context, revListWriter *io.PipeWriter, wg *sy } // RevListObjects run rev-list --objects from headSHA to baseSHA -func RevListObjects(ctx context.Context, revListWriter *io.PipeWriter, wg *sync.WaitGroup, tmpBasePath, headSHA, baseSHA string, errChan chan<- error) { +func RevListObjects(ctx context.Context, revListWriter git.WriteCloserError, wg *sync.WaitGroup, tmpBasePath, headSHA, baseSHA string, errChan chan<- error) { defer wg.Done() defer revListWriter.Close() stderr := new(bytes.Buffer) @@ -55,7 +54,8 @@ func RevListObjects(ctx context.Context, revListWriter *io.PipeWriter, wg *sync. } // BlobsFromRevListObjects reads a RevListAllObjects and only selects blobs -func BlobsFromRevListObjects(revListReader *io.PipeReader, shasToCheckWriter *io.PipeWriter, wg *sync.WaitGroup) { +// NOTE: Does not call git +func BlobsFromRevListObjects(revListReader git.ReadCloserError, shasToCheckWriter git.WriteCloserError, wg *sync.WaitGroup) { defer wg.Done() defer revListReader.Close() scanner := bufio.NewScanner(revListReader) diff --git a/modules/git/repo.go b/modules/git/repo.go index 3176e276959a0..59c02b4c0618b 100644 --- a/modules/git/repo.go +++ b/modules/git/repo.go @@ -329,18 +329,7 @@ func (repo *Repository) CreateBundle(ctx context.Context, commit string, out io. return err } - tmpFile := filepath.Join(tmp, "bundle") - _, _, err = NewCommand(ctx, "bundle", "create", tmpFile, "bundle", "HEAD").RunStdString(&RunOpts{Dir: tmp, Env: env}) - if err != nil { - return err - } - - fi, err := os.Open(tmpFile) - if err != nil { - return err - } - defer fi.Close() + _, _, err = NewCommand(ctx, "bundle", "create", "-", "bundle", "HEAD").RunStdString(&RunOpts{Dir: tmp, Env: env, Stdout: out}) - _, err = io.Copy(out, fi) return err } diff --git a/modules/git/repo_branch_nogogit.go b/modules/git/repo_branch_nogogit.go index 4393db10f9504..afc4f72682a97 100644 --- a/modules/git/repo_branch_nogogit.go +++ b/modules/git/repo_branch_nogogit.go @@ -100,7 +100,10 @@ func callShowRef(ctx context.Context, repoPath, prefix, arg string, skip, limit } func walkShowRef(ctx context.Context, repoPath, arg string, skip, limit int, walkfn func(sha1, refname string) error) (countAll int, err error) { - stdoutReader, stdoutWriter := io.Pipe() + stdoutReader, stdoutWriter, err := Pipe() + if err != nil { + return 0, err + } defer func() { _ = stdoutReader.Close() _ = stdoutWriter.Close() diff --git a/modules/git/repo_commit.go b/modules/git/repo_commit.go index e6fec4d1a32e2..c7153345fbc91 100644 --- a/modules/git/repo_commit.go +++ b/modules/git/repo_commit.go @@ -201,7 +201,10 @@ func (repo *Repository) FileCommitsCount(revision, file string) (int64, error) { func (repo *Repository) CommitsByFileAndRange(revision, file string, page int) ([]*Commit, error) { skip := (page - 1) * setting.Git.CommitsRangeSize - stdoutReader, stdoutWriter := io.Pipe() + stdoutReader, stdoutWriter, err := Pipe() + if err != nil { + return nil, err + } defer func() { _ = stdoutReader.Close() _ = stdoutWriter.Close() diff --git a/modules/git/repo_ref_nogogit.go b/modules/git/repo_ref_nogogit.go index 40e8a247c7488..58bed28c07b26 100644 --- a/modules/git/repo_ref_nogogit.go +++ b/modules/git/repo_ref_nogogit.go @@ -15,7 +15,10 @@ import ( // GetRefsFiltered returns all references of the repository that matches patterm exactly or starting with. func (repo *Repository) GetRefsFiltered(pattern string) ([]*Reference, error) { - stdoutReader, stdoutWriter := io.Pipe() + stdoutReader, stdoutWriter, err := Pipe() + if err != nil { + return nil, err + } defer func() { _ = stdoutReader.Close() _ = stdoutWriter.Close() diff --git a/modules/git/repo_tag.go b/modules/git/repo_tag.go index 8444e8d035a0b..a1bfea5e37f4a 100644 --- a/modules/git/repo_tag.go +++ b/modules/git/repo_tag.go @@ -8,7 +8,6 @@ package git import ( "context" "fmt" - "io" "strings" "code.gitea.io/gitea/modules/git/foreachref" @@ -115,7 +114,10 @@ func (repo *Repository) GetTagWithID(idStr, name string) (*Tag, error) { func (repo *Repository) GetTagInfos(page, pageSize int) ([]*Tag, int, error) { forEachRefFmt := foreachref.NewFormat("objecttype", "refname:short", "object", "objectname", "creator", "contents", "contents:signature") - stdoutReader, stdoutWriter := io.Pipe() + stdoutReader, stdoutWriter, err := Pipe() + if err != nil { + return nil, 0, err + } defer stdoutReader.Close() defer stdoutWriter.Close() stderr := strings.Builder{} diff --git a/modules/git/utils_closererror.go b/modules/git/utils_closererror.go index bcadfe31692ef..64ef217eda0c1 100644 --- a/modules/git/utils_closererror.go +++ b/modules/git/utils_closererror.go @@ -6,94 +6,43 @@ package git import ( "io" - "sync" ) -type closerError struct { - io.Closer - err error - lock sync.Mutex -} - -func (c *closerError) CloseWithError(err error) error { - c.lock.Lock() - defer c.lock.Unlock() - if c.err != nil { - return c.err - } - if c.Closer != nil { - _ = c.Closer.Close() - } - c.err = err - return nil -} - -func (c *closerError) Close() error { - c.lock.Lock() - defer c.lock.Unlock() - if c.err != nil { - return c.err - } - c.err = c.Closer.Close() - return c.err -} - // WriteCloserError wraps an io.WriteCloser with an additional CloseWithError function type WriteCloserError interface { io.WriteCloser CloseWithError(err error) error } -type writeCloserError struct { - io.Writer - closerError +// ReadCloserError wraps an io.ReadCloser with an additional CloseWithError function +type ReadCloserError interface { + io.ReadCloser + CloseWithError(err error) error } -func (c *writeCloserError) Write(p []byte) (n int, err error) { - c.lock.Lock() - if c.err != nil { - return 0, c.err - } - c.lock.Unlock() // Unlock here to prevent hanging writes causing a deadlock in close - n, err = c.Writer.Write(p) - return +// CloserError wraps an io.Closer with an additional CloseWithError function +type CloserError interface { + io.Closer + CloseWithError(err error) error } -func newWriteCloserError(w io.WriteCloser) WriteCloserError { - return &writeCloserError{ - Writer: w, - closerError: closerError{ - Closer: w, - }, - } +// ClosedReadWriteCloserError is a pre-closed ReadWriteCloserError +type ClosedReadWriteCloserError struct { + err error } -// ReadCloserError wraps an io.ReadCloser with an additional CloseWithError function -type ReadCloserError interface { - io.ReadCloser - CloseWithError(err error) error +func (c *ClosedReadWriteCloserError) Read(p []byte) (n int, err error) { + return 0, c.err } -type readCloserError struct { - io.Reader - closerError +func (c *ClosedReadWriteCloserError) Write(p []byte) (n int, err error) { + return 0, c.err } -func (c *readCloserError) Read(p []byte) (n int, err error) { - c.lock.Lock() - if c.err != nil { - return 0, c.err - } - c.lock.Unlock() // Unlock here to prevent hanging reads causing a deadlock in close - n, err = c.Reader.Read(p) - return +func (c *ClosedReadWriteCloserError) Close() error { + return c.err } -func newReadCloserError(r io.ReadCloser) ReadCloserError { - return &readCloserError{ - Reader: r, - closerError: closerError{ - Closer: r, - }, - } +func (c *ClosedReadWriteCloserError) CloseWithError(error) error { + return c.err } diff --git a/modules/git/utils_pipe.go b/modules/git/utils_pipe.go new file mode 100644 index 0000000000000..36960598339c3 --- /dev/null +++ b/modules/git/utils_pipe.go @@ -0,0 +1,171 @@ +// Copyright 2022 The Gitea Authors. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package git + +import ( + "io" + "os" + "sync" +) + +type pipe struct { + rd *os.File + wr *os.File + + lock sync.Mutex + wrErr error + rdErr error +} + +func (p *pipe) read(b []byte) (n int, err error) { + n, err = p.rd.Read(b) + if err != nil { + return n, p.readCloseError(err) + } + return +} + +func (p *pipe) closeRead(err error) error { + if err == nil { + err = io.ErrClosedPipe + } + p.lock.Lock() + defer p.lock.Unlock() + if p.rdErr != nil { + return nil + } + + p.rdErr = err + _ = p.rd.Close() + return nil +} + +// readCloseError returns the error returned on reading a closed read pipe +func (p *pipe) readCloseError(err error) error { + p.lock.Lock() + defer p.lock.Unlock() + rdErr := p.rdErr + + if wrErr := p.wrErr; rdErr == nil && wrErr != nil { + return wrErr + } + if err != io.EOF { + return err + } + return io.ErrClosedPipe +} + +func (p *pipe) write(b []byte) (n int, err error) { + n, err = p.wr.Write(b) + if err != nil { + return n, p.writeCloseError(err) + } + return +} + +func (p *pipe) closeWrite(err error) error { + if err == nil { + err = io.EOF + } + p.lock.Lock() + defer p.lock.Unlock() + if p.wrErr != nil { + return nil + } + p.wrErr = err + _ = p.wr.Close() + return nil +} + +// writeCloseError returns the error returned on writing to a closed write pipe +func (p *pipe) writeCloseError(err error) error { + p.lock.Lock() + defer p.lock.Unlock() + wrErr := p.wrErr + if rdErr := p.rdErr; wrErr == nil && rdErr != nil { + return rdErr + } + if err != io.EOF { + return err + } + return io.ErrClosedPipe +} + +// PipeReader is the read half of a pipe +type PipeReader struct { + p *pipe +} + +// Read implements the standard Read interface. +// If the write end is closed with an error, that error is +// returned as err; otherwise err is EOF. +func (r *PipeReader) Read(data []byte) (n int, err error) { + return r.p.read(data) +} + +// Close closes the reader; subsequent writes to the +// write half of the pipe will return the error ErrClosedPipe. +func (r *PipeReader) Close() error { + return r.CloseWithError(nil) +} + +// CloseWithError closes the reader; subsequent writes +// to the write half of the pipe will return the error err. +// +// CloseWithError never overwrites the previous error if it exists +// and always returns nil. +func (r *PipeReader) CloseWithError(err error) error { + return r.p.closeRead(err) +} + +// File returns the underlying os.File for this PipeReader +func (r *PipeReader) File() *os.File { + return r.p.rd +} + +// PipeWriter is the write half of a pipe. +type PipeWriter struct { + p *pipe +} + +// Write implements the standard Write interface: +// If the read end is closed with an error, that err is +// returned as err; otherwise err is ErrClosedPipe. +func (w *PipeWriter) Write(data []byte) (n int, err error) { + return w.p.write(data) +} + +// Close closes the writer; subsequent reads from the +// read half of the pipe will return no bytes and EOF. +func (w *PipeWriter) Close() error { + return w.CloseWithError(nil) +} + +// CloseWithError closes the writer; subsequent reads from the +// read half of the pipe will return no bytes and the error err, +// or EOF if err is nil. +// +// CloseWithError never overwrites the previous error if it exists +// and always returns nil. +func (w *PipeWriter) CloseWithError(err error) error { + return w.p.closeWrite(err) +} + +// File returns the underlying os.File for this PipeWriter +func (w *PipeWriter) File() *os.File { + return w.p.wr +} + +func Pipe() (*PipeReader, *PipeWriter, error) { + p := &pipe{} + var err error + + p.rd, p.wr, err = os.Pipe() + if err != nil { + return nil, nil, err + } + + return &PipeReader{p}, &PipeWriter{p}, nil +} diff --git a/modules/lfs/pointer_scanner_nogogit.go b/modules/lfs/pointer_scanner_nogogit.go index cdf88c51b009e..227df070f73e5 100644 --- a/modules/lfs/pointer_scanner_nogogit.go +++ b/modules/lfs/pointer_scanner_nogogit.go @@ -23,9 +23,40 @@ import ( func SearchPointerBlobs(ctx context.Context, repo *git.Repository, pointerChan chan<- PointerBlob, errChan chan<- error) { basePath := repo.Path - catFileCheckReader, catFileCheckWriter := io.Pipe() - shasToBatchReader, shasToBatchWriter := io.Pipe() - catFileBatchReader, catFileBatchWriter := io.Pipe() + closers := []git.CloserError{} + closeAll := func(err error) { + for _, closer := range closers { + _ = closer.CloseWithError(err) + } + } + defer closeAll(nil) + + fail := func(err error) { + errChan <- err + close(pointerChan) + close(errChan) + } + + catFileCheckReader, catFileCheckWriter, err := git.Pipe() + if err != nil { + fail(err) + return + } + closers = append(closers, catFileCheckReader, catFileCheckWriter) + + shasToBatchReader, shasToBatchWriter, err := git.Pipe() + if err != nil { + fail(err) + return + } + closers = append(closers, shasToBatchReader, shasToBatchWriter) + + catFileBatchReader, catFileBatchWriter, err := git.Pipe() + if err != nil { + fail(err) + return + } + closers = append(closers, catFileBatchReader, catFileBatchWriter) wg := sync.WaitGroup{} wg.Add(4) @@ -44,8 +75,25 @@ func SearchPointerBlobs(ctx context.Context, repo *git.Repository, pointerChan c // 1. Run batch-check on all objects in the repository if git.CheckGitVersionAtLeast("2.6.0") != nil { - revListReader, revListWriter := io.Pipe() - shasToCheckReader, shasToCheckWriter := io.Pipe() + revListReader, revListWriter, err := git.Pipe() + if err != nil { + wg.Done() + closeAll(err) + wg.Wait() + fail(err) + return + } + closers = append(closers, revListReader, revListWriter) + shasToCheckReader, shasToCheckWriter, err := git.Pipe() + if err != nil { + wg.Done() + closeAll(err) + wg.Wait() + fail(err) + return + } + closers = append(closers, shasToCheckReader, shasToCheckWriter) + wg.Add(2) go pipeline.CatFileBatchCheck(ctx, shasToCheckReader, catFileCheckWriter, &wg, basePath) go pipeline.BlobsFromRevListObjects(revListReader, shasToCheckWriter, &wg) @@ -59,7 +107,8 @@ func SearchPointerBlobs(ctx context.Context, repo *git.Repository, pointerChan c close(errChan) } -func createPointerResultsFromCatFileBatch(ctx context.Context, catFileBatchReader *io.PipeReader, wg *sync.WaitGroup, pointerChan chan<- PointerBlob) { +// createPointerResultsFromCatFileBatch does not call git +func createPointerResultsFromCatFileBatch(ctx context.Context, catFileBatchReader git.ReadCloserError, wg *sync.WaitGroup, pointerChan chan<- PointerBlob) { defer wg.Done() defer catFileBatchReader.Close() diff --git a/services/gitdiff/gitdiff.go b/services/gitdiff/gitdiff.go index 1df16e50167cf..b037db459867a 100644 --- a/services/gitdiff/gitdiff.go +++ b/services/gitdiff/gitdiff.go @@ -1369,13 +1369,17 @@ func GetDiff(gitRepo *git.Repository, opts *DiffOptions, files ...string) (*Diff diffArgs = append(diffArgs, files...) } - reader, writer := io.Pipe() + reader, writer, err := git.Pipe() + if err != nil { + return nil, err + } + defer func() { _ = reader.Close() _ = writer.Close() }() - go func(ctx context.Context, diffArgs []string, repoPath string, writer *io.PipeWriter) { + go func(ctx context.Context, diffArgs []string, repoPath string, writer git.WriteCloserError) { cmd := git.NewCommand(ctx, diffArgs...) cmd.SetDescription(fmt.Sprintf("GetDiffRange [repo_path: %s]", repoPath)) if err := cmd.Run(&git.RunOpts{ diff --git a/services/migrations/gitea_uploader.go b/services/migrations/gitea_uploader.go index 275b7026a0f6e..fd9522b934648 100644 --- a/services/migrations/gitea_uploader.go +++ b/services/migrations/gitea_uploader.go @@ -748,21 +748,10 @@ func (g *GiteaLocalUploader) CreateReviews(reviews ...*base.Review) error { continue } - var patch string - reader, writer := io.Pipe() - defer func() { - _ = reader.Close() - _ = writer.Close() - }() - go func(comment *base.ReviewComment) { - if err := git.GetRepoRawDiffForFile(g.gitRepo, pr.MergeBase, headCommitID, git.RawDiffNormal, comment.TreePath, writer); err != nil { - // We should ignore the error since the commit maybe removed when force push to the pull request - log.Warn("GetRepoRawDiffForFile failed when migrating [%s, %s, %s, %s]: %v", g.gitRepo.Path, pr.MergeBase, headCommitID, comment.TreePath, err) - } - _ = writer.Close() - }(comment) - - patch, _ = git.CutDiffAroundLine(reader, int64((&models.Comment{Line: int64(line + comment.Position - 1)}).UnsignedLine()), line < 0, setting.UI.CodeCommentLines) + patch, err := patchForComment(g, comment, line, pr, headCommitID) + if err != nil { + return err + } if comment.CreatedAt.IsZero() { comment.CreatedAt = review.CreatedAt @@ -796,6 +785,29 @@ func (g *GiteaLocalUploader) CreateReviews(reviews ...*base.Review) error { return models.InsertReviews(cms) } +func patchForComment(g *GiteaLocalUploader, comment *base.ReviewComment, line int, pr *models.PullRequest, headCommitID string) (string, error) { + reader, writer, err := git.Pipe() + if err != nil { + log.Error("GetRepoRawDiffForFile failed to create pipe when migrating [%s, %s, %s, %s]: %v", g.gitRepo.Path, pr.MergeBase, headCommitID, comment.TreePath, err) + return "", err + } + defer func() { + _ = reader.Close() + _ = writer.Close() + }() + go func(comment *base.ReviewComment) { + if err := git.GetRepoRawDiffForFile(g.gitRepo, pr.MergeBase, headCommitID, git.RawDiffNormal, comment.TreePath, writer); err != nil { + // We should ignore the error since the commit maybe removed when force push to the pull request + log.Warn("GetRepoRawDiffForFile failed when migrating [%s, %s, %s, %s]: %v", g.gitRepo.Path, pr.MergeBase, headCommitID, comment.TreePath, err) + } + _ = writer.Close() + }(comment) + + patch, _ := git.CutDiffAroundLine(reader, int64((&models.Comment{Line: int64(line + comment.Position - 1)}).UnsignedLine()), line < 0, setting.UI.CodeCommentLines) + + return patch, nil +} + // Rollback when migrating failed, this will rollback all the changes. func (g *GiteaLocalUploader) Rollback() error { if g.repo != nil && g.repo.ID > 0 { diff --git a/services/pull/lfs.go b/services/pull/lfs.go index fada9b6121fa5..42d31b1162c43 100644 --- a/services/pull/lfs.go +++ b/services/pull/lfs.go @@ -13,6 +13,7 @@ import ( "sync" "code.gitea.io/gitea/models" + "code.gitea.io/gitea/modules/git" "code.gitea.io/gitea/modules/git/pipeline" "code.gitea.io/gitea/modules/lfs" "code.gitea.io/gitea/modules/log" @@ -26,11 +27,45 @@ func LFSPush(ctx context.Context, tmpBasePath, mergeHeadSHA, mergeBaseSHA string // ensure only blobs and <=1k size then pass in to git cat-file --batch // to read each sha and check each as a pointer // Then if they are lfs -> add them to the baseRepo - revListReader, revListWriter := io.Pipe() - shasToCheckReader, shasToCheckWriter := io.Pipe() - catFileCheckReader, catFileCheckWriter := io.Pipe() - shasToBatchReader, shasToBatchWriter := io.Pipe() - catFileBatchReader, catFileBatchWriter := io.Pipe() + + closers := []git.CloserError{} + closeAll := func(err error) { + for _, closer := range closers { + _ = closer.CloseWithError(err) + } + } + defer closeAll(nil) + + revListReader, revListWriter, err := git.Pipe() + if err != nil { + return err + } + closers = append(closers, revListReader, revListWriter) + + shasToCheckReader, shasToCheckWriter, err := git.Pipe() + if err != nil { + return err + } + closers = append(closers, shasToCheckReader, shasToCheckWriter) + + catFileCheckReader, catFileCheckWriter, err := git.Pipe() + if err != nil { + return err + } + closers = append(closers, catFileCheckReader, catFileCheckWriter) + + shasToBatchReader, shasToBatchWriter, err := git.Pipe() + if err != nil { + return err + } + closers = append(closers, shasToBatchReader, shasToBatchWriter) + + catFileBatchReader, catFileBatchWriter, err := git.Pipe() + if err != nil { + return err + } + closers = append(closers, catFileBatchReader, catFileBatchWriter) + errChan := make(chan error, 1) wg := sync.WaitGroup{} wg.Add(6) @@ -67,7 +102,7 @@ func LFSPush(ctx context.Context, tmpBasePath, mergeHeadSHA, mergeBaseSHA string return nil } -func createLFSMetaObjectsFromCatFileBatch(catFileBatchReader *io.PipeReader, wg *sync.WaitGroup, pr *models.PullRequest) { +func createLFSMetaObjectsFromCatFileBatch(catFileBatchReader git.ReadCloserError, wg *sync.WaitGroup, pr *models.PullRequest) { defer wg.Done() defer catFileBatchReader.Close() diff --git a/services/pull/review.go b/services/pull/review.go index e7e6f3135ba91..41f99f38328d7 100644 --- a/services/pull/review.go +++ b/services/pull/review.go @@ -8,7 +8,6 @@ package pull import ( "context" "fmt" - "io" "regexp" "strings" @@ -183,7 +182,11 @@ func createCodeComment(ctx context.Context, doer *user_model.User, repo *repo_mo if len(commitID) == 0 { commitID = headCommitID } - reader, writer := io.Pipe() + reader, writer, err := git.Pipe() + if err != nil { + log.Critical("Unable to open pipe whilst generating patch: %v", err) + return nil, err + } defer func() { _ = reader.Close() _ = writer.Close() diff --git a/services/repository/archiver/archiver.go b/services/repository/archiver/archiver.go index 7c2cf237d5152..2529e9ef3f046 100644 --- a/services/repository/archiver/archiver.go +++ b/services/repository/archiver/archiver.go @@ -8,7 +8,6 @@ import ( "context" "errors" "fmt" - "io" "os" "regexp" "strings" @@ -167,7 +166,10 @@ func doArchive(r *ArchiveRequest) (*repo_model.RepoArchiver, error) { return nil, fmt.Errorf("unable to stat archive: %v", err) } - rd, w := io.Pipe() + rd, w, err := git.Pipe() + if err != nil { + return nil, err + } defer func() { w.Close() rd.Close() @@ -184,7 +186,7 @@ func doArchive(r *ArchiveRequest) (*repo_model.RepoArchiver, error) { } defer gitRepo.Close() - go func(done chan error, w *io.PipeWriter, archiver *repo_model.RepoArchiver, gitRepo *git.Repository) { + go func(done chan error, w git.WriteCloserError, archiver *repo_model.RepoArchiver, gitRepo *git.Repository) { defer func() { if r := recover(); r != nil { done <- fmt.Errorf("%v", r) From 43612d184c61b88b4dfb0a62401b3b748e516a2c Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 22 Apr 2022 17:38:46 +0100 Subject: [PATCH 06/12] Remove unnecessary closing go-routine Signed-off-by: Andrew Thornton --- modules/git/batch_reader.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/modules/git/batch_reader.go b/modules/git/batch_reader.go index 98d82413a4bd6..f3cfab11a8b1a 100644 --- a/modules/git/batch_reader.go +++ b/modules/git/batch_reader.go @@ -62,12 +62,6 @@ func CatFileBatchCheck(ctx context.Context, repoPath string) (WriteCloserError, <-closed } - // Ensure cancel is called as soon as the provided context is cancelled - go func() { - <-ctx.Done() - cancel() - }() - _, filename, line, _ := runtime.Caller(2) filename = strings.TrimPrefix(filename, callerPrefix) @@ -123,12 +117,6 @@ func CatFileBatch(ctx context.Context, repoPath string) (WriteCloserError, *bufi <-closed } - // Ensure cancel is called as soon as the provided context is cancelled - go func() { - <-ctx.Done() - cancel() - }() - _, filename, line, _ := runtime.Caller(2) filename = strings.TrimPrefix(filename, callerPrefix) From eb4ab16f105a5071078de1b891e27924d45fd6ae Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 22 Apr 2022 20:32:21 +0100 Subject: [PATCH 07/12] as per review Signed-off-by: Andrew Thornton --- modules/git/batch_reader.go | 23 +++---- modules/git/command.go | 25 ++++++++ modules/git/pipeline/lfs_nogogit.go | 11 ++-- modules/git/utils_pipe.go | 84 +++++++++++++++++++++----- modules/lfs/pointer_scanner_nogogit.go | 47 +++++--------- services/pull/lfs.go | 42 ++++--------- 6 files changed, 132 insertions(+), 100 deletions(-) diff --git a/modules/git/batch_reader.go b/modules/git/batch_reader.go index f3cfab11a8b1a..ee827e23f24b5 100644 --- a/modules/git/batch_reader.go +++ b/modules/git/batch_reader.go @@ -41,18 +41,14 @@ func returnClosedReaderWriters(err error) (WriteCloserError, *bufio.Reader, func // CatFileBatchCheck opens git cat-file --batch-check in the provided repo and returns a stdin pipe, a stdout reader and cancel function func CatFileBatchCheck(ctx context.Context, repoPath string) (WriteCloserError, *bufio.Reader, func()) { - batchStdinReader, batchStdinWriter, err := Pipe() + pipes, err := NewPipePairs(2) if err != nil { log.Critical("Unable to open pipe to write to: %v", err) return returnClosedReaderWriters(err) } - batchStdoutReader, batchStdoutWriter, err := Pipe() - if err != nil { - _ = batchStdinReader.Close() - _ = batchStdinWriter.Close() - log.Critical("Unable to open pipe to write to: %v", err) - return returnClosedReaderWriters(err) - } + batchStdinReader, batchStdinWriter := pipes[0].Reader(), pipes[0].Writer() + batchStdoutReader, batchStdoutWriter := pipes[1].Reader(), pipes[1].Writer() + ctx, ctxCancel := context.WithCancel(ctx) closed := make(chan struct{}) cancel := func() { @@ -95,18 +91,13 @@ func CatFileBatchCheck(ctx context.Context, repoPath string) (WriteCloserError, func CatFileBatch(ctx context.Context, repoPath string) (WriteCloserError, *bufio.Reader, func()) { // We often want to feed the commits in order into cat-file --batch, followed by their trees and sub trees as necessary. // so let's create a batch stdin and stdout - batchStdinReader, batchStdinWriter, err := Pipe() + pipes, err := NewPipePairs(2) if err != nil { log.Critical("Unable to open pipe to write to: %v", err) return returnClosedReaderWriters(err) } - batchStdoutReader, batchStdoutWriter, err := Pipe() - if err != nil { - _ = batchStdinReader.Close() - _ = batchStdinWriter.Close() - log.Critical("Unable to open pipe to write to: %v", err) - return returnClosedReaderWriters(err) - } + batchStdinReader, batchStdinWriter := pipes[0].Reader(), pipes[0].Writer() + batchStdoutReader, batchStdoutWriter := pipes[1].Reader(), pipes[1].Writer() ctx, ctxCancel := context.WithCancel(ctx) closed := make(chan struct{}) diff --git a/modules/git/command.go b/modules/git/command.go index 7007216c6243d..b076e93ff679d 100644 --- a/modules/git/command.go +++ b/modules/git/command.go @@ -177,6 +177,31 @@ func (c *Command) Run(opts *RunOpts) error { return err } + // Ensure that closers are closed + closers := make([]io.Closer, 0, 3) + for _, pipe := range []interface{}{cmd.Stdout, cmd.Stdin, cmd.Stderr} { + if pipe == nil { + continue + } + if _, ok := pipe.(*os.File); ok { + continue + } + + if closer, ok := pipe.(io.Closer); ok { + closers = append(closers, closer) + } + } + + if len(closers) > 0 { + go func() { + <-ctx.Done() + cancel() + for _, closer := range closers { + _ = closer.Close() + } + }() + } + if opts.PipelineFunc != nil { err := opts.PipelineFunc(ctx, cancel) if err != nil { diff --git a/modules/git/pipeline/lfs_nogogit.go b/modules/git/pipeline/lfs_nogogit.go index e0a20e535be86..9b7a54a512152 100644 --- a/modules/git/pipeline/lfs_nogogit.go +++ b/modules/git/pipeline/lfs_nogogit.go @@ -198,16 +198,13 @@ func FindLFSFile(repo *git.Repository, hash git.SHA1) ([]*LFSResult, error) { sort.Sort(lfsResultSlice(results)) // Should really use a go-git function here but name-rev is not completed and recapitulating it is not simple - shasToNameReader, shasToNameWriter, err := git.Pipe() + pipes, err := git.NewPipePairs(2) if err != nil { return nil, err } - nameRevStdinReader, nameRevStdinWriter, err := git.Pipe() - if err != nil { - _ = shasToNameReader.Close() - _ = shasToNameWriter.Close() - return nil, err - } + + shasToNameReader, shasToNameWriter := pipes[0].ReaderWriter() + nameRevStdinReader, nameRevStdinWriter := pipes[1].ReaderWriter() errChan := make(chan error, 1) wg := sync.WaitGroup{} diff --git a/modules/git/utils_pipe.go b/modules/git/utils_pipe.go index 36960598339c3..6b6f7cd5cbbd7 100644 --- a/modules/git/utils_pipe.go +++ b/modules/git/utils_pipe.go @@ -10,7 +10,8 @@ import ( "sync" ) -type pipe struct { +// PipePair represents an os.Pipe() wrapped with a io.Pipe()-like PipeWriter and PipeReader +type PipePair struct { rd *os.File wr *os.File @@ -19,7 +20,17 @@ type pipe struct { rdErr error } -func (p *pipe) read(b []byte) (n int, err error) { +// ReaderWriter returns the Reader and Writer ends of the Pipe +func (p *PipePair) ReaderWriter() (*PipeReader, *PipeWriter) { + return p.Reader(), p.Writer() +} + +// Reader returns the Reader end of the Pipe +func (p *PipePair) Reader() *PipeReader { + return &PipeReader{p} +} + +func (p *PipePair) read(b []byte) (n int, err error) { n, err = p.rd.Read(b) if err != nil { return n, p.readCloseError(err) @@ -27,7 +38,7 @@ func (p *pipe) read(b []byte) (n int, err error) { return } -func (p *pipe) closeRead(err error) error { +func (p *PipePair) closeRead(err error) error { if err == nil { err = io.ErrClosedPipe } @@ -43,7 +54,7 @@ func (p *pipe) closeRead(err error) error { } // readCloseError returns the error returned on reading a closed read pipe -func (p *pipe) readCloseError(err error) error { +func (p *PipePair) readCloseError(err error) error { p.lock.Lock() defer p.lock.Unlock() rdErr := p.rdErr @@ -57,7 +68,12 @@ func (p *pipe) readCloseError(err error) error { return io.ErrClosedPipe } -func (p *pipe) write(b []byte) (n int, err error) { +// Writer returns the Writer end of the Pipe +func (p *PipePair) Writer() *PipeWriter { + return &PipeWriter{p} +} + +func (p *PipePair) write(b []byte) (n int, err error) { n, err = p.wr.Write(b) if err != nil { return n, p.writeCloseError(err) @@ -65,7 +81,7 @@ func (p *pipe) write(b []byte) (n int, err error) { return } -func (p *pipe) closeWrite(err error) error { +func (p *PipePair) closeWrite(err error) error { if err == nil { err = io.EOF } @@ -80,7 +96,7 @@ func (p *pipe) closeWrite(err error) error { } // writeCloseError returns the error returned on writing to a closed write pipe -func (p *pipe) writeCloseError(err error) error { +func (p *PipePair) writeCloseError(err error) error { p.lock.Lock() defer p.lock.Unlock() wrErr := p.wrErr @@ -93,9 +109,21 @@ func (p *pipe) writeCloseError(err error) error { return io.ErrClosedPipe } +// Close closes the pipe pair +func (p *PipePair) Close() error { + return p.CloseWithError(nil) +} + +// CloseWithError closes the pipe pair +func (p *PipePair) CloseWithError(err error) error { + _ = p.closeRead(err) + _ = p.closeWrite(err) + return nil +} + // PipeReader is the read half of a pipe type PipeReader struct { - p *pipe + p *PipePair } // Read implements the standard Read interface. @@ -127,7 +155,7 @@ func (r *PipeReader) File() *os.File { // PipeWriter is the write half of a pipe. type PipeWriter struct { - p *pipe + p *PipePair } // Write implements the standard Write interface: @@ -158,14 +186,42 @@ func (w *PipeWriter) File() *os.File { return w.p.wr } +// Pipe returns a connected pair of Files wrapped with CloserError similar to io.Pipe(). +// Reads from r return bytes written to w. It returns the files and an error, if any. func Pipe() (*PipeReader, *PipeWriter, error) { - p := &pipe{} - var err error - - p.rd, p.wr, err = os.Pipe() + rd, wr, err := os.Pipe() if err != nil { return nil, nil, err } - return &PipeReader{p}, &PipeWriter{p}, nil + pipe := &PipePair{rd: rd, wr: wr} + + return pipe.Reader(), pipe.Writer(), nil +} + +// NewPipePair returns a connected pair of Files wrapped in a PipePair +func NewPipePair() (*PipePair, error) { + rd, wr, err := os.Pipe() + if err != nil { + return nil, err + } + + return &PipePair{rd: rd, wr: wr}, nil +} + +// NewPipePairs will return a slice of n PipePairs or an error +func NewPipePairs(n int) ([]*PipePair, error) { + pipePairs := make([]*PipePair, 0, n) + for i := 0; i < n; i++ { + pipe, err := NewPipePair() + if err != nil { + for _, pipe := range pipePairs { + _ = pipe.Close() + } + return nil, err + } + + pipePairs = append(pipePairs, pipe) + } + return pipePairs, nil } diff --git a/modules/lfs/pointer_scanner_nogogit.go b/modules/lfs/pointer_scanner_nogogit.go index 227df070f73e5..32b4a7fd71ad3 100644 --- a/modules/lfs/pointer_scanner_nogogit.go +++ b/modules/lfs/pointer_scanner_nogogit.go @@ -23,40 +23,28 @@ import ( func SearchPointerBlobs(ctx context.Context, repo *git.Repository, pointerChan chan<- PointerBlob, errChan chan<- error) { basePath := repo.Path - closers := []git.CloserError{} - closeAll := func(err error) { - for _, closer := range closers { - _ = closer.CloseWithError(err) - } - } - defer closeAll(nil) - fail := func(err error) { errChan <- err close(pointerChan) close(errChan) } - catFileCheckReader, catFileCheckWriter, err := git.Pipe() + pipes, err := git.NewPipePairs(3) if err != nil { fail(err) return } - closers = append(closers, catFileCheckReader, catFileCheckWriter) - shasToBatchReader, shasToBatchWriter, err := git.Pipe() - if err != nil { - fail(err) - return + closeAll := func(err error) { + for _, closer := range pipes { + _ = closer.CloseWithError(err) + } } - closers = append(closers, shasToBatchReader, shasToBatchWriter) + defer closeAll(nil) - catFileBatchReader, catFileBatchWriter, err := git.Pipe() - if err != nil { - fail(err) - return - } - closers = append(closers, catFileBatchReader, catFileBatchWriter) + catFileCheckReader, catFileCheckWriter := pipes[0].ReaderWriter() + shasToBatchReader, shasToBatchWriter := pipes[1].ReaderWriter() + catFileBatchReader, catFileBatchWriter := pipes[2].ReaderWriter() wg := sync.WaitGroup{} wg.Add(4) @@ -75,16 +63,8 @@ func SearchPointerBlobs(ctx context.Context, repo *git.Repository, pointerChan c // 1. Run batch-check on all objects in the repository if git.CheckGitVersionAtLeast("2.6.0") != nil { - revListReader, revListWriter, err := git.Pipe() - if err != nil { - wg.Done() - closeAll(err) - wg.Wait() - fail(err) - return - } - closers = append(closers, revListReader, revListWriter) - shasToCheckReader, shasToCheckWriter, err := git.Pipe() + + morePipes, err := git.NewPipePairs(2) if err != nil { wg.Done() closeAll(err) @@ -92,7 +72,10 @@ func SearchPointerBlobs(ctx context.Context, repo *git.Repository, pointerChan c fail(err) return } - closers = append(closers, shasToCheckReader, shasToCheckWriter) + + revListReader, revListWriter := morePipes[0].ReaderWriter() + shasToCheckReader, shasToCheckWriter := morePipes[1].ReaderWriter() + pipes = append(pipes, morePipes...) wg.Add(2) go pipeline.CatFileBatchCheck(ctx, shasToCheckReader, catFileCheckWriter, &wg, basePath) diff --git a/services/pull/lfs.go b/services/pull/lfs.go index 42d31b1162c43..ee7c3faae6716 100644 --- a/services/pull/lfs.go +++ b/services/pull/lfs.go @@ -28,43 +28,23 @@ func LFSPush(ctx context.Context, tmpBasePath, mergeHeadSHA, mergeBaseSHA string // to read each sha and check each as a pointer // Then if they are lfs -> add them to the baseRepo - closers := []git.CloserError{} - closeAll := func(err error) { - for _, closer := range closers { - _ = closer.CloseWithError(err) - } - } - defer closeAll(nil) - - revListReader, revListWriter, err := git.Pipe() + pipes, err := git.NewPipePairs(5) if err != nil { return err } - closers = append(closers, revListReader, revListWriter) - shasToCheckReader, shasToCheckWriter, err := git.Pipe() - if err != nil { - return err - } - closers = append(closers, shasToCheckReader, shasToCheckWriter) - - catFileCheckReader, catFileCheckWriter, err := git.Pipe() - if err != nil { - return err - } - closers = append(closers, catFileCheckReader, catFileCheckWriter) - - shasToBatchReader, shasToBatchWriter, err := git.Pipe() - if err != nil { - return err + closeAll := func(err error) { + for _, closer := range pipes { + _ = closer.CloseWithError(err) + } } - closers = append(closers, shasToBatchReader, shasToBatchWriter) + defer closeAll(nil) - catFileBatchReader, catFileBatchWriter, err := git.Pipe() - if err != nil { - return err - } - closers = append(closers, catFileBatchReader, catFileBatchWriter) + revListReader, revListWriter := pipes[0].ReaderWriter() + shasToCheckReader, shasToCheckWriter := pipes[1].ReaderWriter() + catFileCheckReader, catFileCheckWriter := pipes[2].ReaderWriter() + shasToBatchReader, shasToBatchWriter := pipes[3].ReaderWriter() + catFileBatchReader, catFileBatchWriter := pipes[4].ReaderWriter() errChan := make(chan error, 1) wg := sync.WaitGroup{} From 2600645da7ba36a73fc11e8ec595201ccbf4f226 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 22 Apr 2022 20:34:19 +0100 Subject: [PATCH 08/12] use ReaderWriter instead Signed-off-by: Andrew Thornton --- modules/git/batch_reader.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/git/batch_reader.go b/modules/git/batch_reader.go index ee827e23f24b5..e30fd1a3e1eb7 100644 --- a/modules/git/batch_reader.go +++ b/modules/git/batch_reader.go @@ -46,8 +46,8 @@ func CatFileBatchCheck(ctx context.Context, repoPath string) (WriteCloserError, log.Critical("Unable to open pipe to write to: %v", err) return returnClosedReaderWriters(err) } - batchStdinReader, batchStdinWriter := pipes[0].Reader(), pipes[0].Writer() - batchStdoutReader, batchStdoutWriter := pipes[1].Reader(), pipes[1].Writer() + batchStdinReader, batchStdinWriter := pipes[0].ReaderWriter() + batchStdoutReader, batchStdoutWriter := pipes[1].ReaderWriter() ctx, ctxCancel := context.WithCancel(ctx) closed := make(chan struct{}) @@ -96,8 +96,8 @@ func CatFileBatch(ctx context.Context, repoPath string) (WriteCloserError, *bufi log.Critical("Unable to open pipe to write to: %v", err) return returnClosedReaderWriters(err) } - batchStdinReader, batchStdinWriter := pipes[0].Reader(), pipes[0].Writer() - batchStdoutReader, batchStdoutWriter := pipes[1].Reader(), pipes[1].Writer() + batchStdinReader, batchStdinWriter := pipes[0].ReaderWriter() + batchStdoutReader, batchStdoutWriter := pipes[1].ReaderWriter() ctx, ctxCancel := context.WithCancel(ctx) closed := make(chan struct{}) From a56a5c8f193e73ddd03e218745f981f3fbf4bf43 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Fri, 22 Apr 2022 20:57:13 +0100 Subject: [PATCH 09/12] And ensure that the pipes are closed in lfs Signed-off-by: Andrew Thornton --- modules/git/pipeline/lfs.go | 17 ++++++++++------- modules/git/pipeline/lfs_nogogit.go | 7 +++++++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/modules/git/pipeline/lfs.go b/modules/git/pipeline/lfs.go index 5e223b6eb0318..60d24b49991bf 100644 --- a/modules/git/pipeline/lfs.go +++ b/modules/git/pipeline/lfs.go @@ -99,16 +99,19 @@ func FindLFSFile(repo *git.Repository, hash git.SHA1) ([]*LFSResult, error) { sort.Sort(lfsResultSlice(results)) // Should really use a go-git function here but name-rev is not completed and recapitulating it is not simple - shasToNameReader, shasToNameWriter, err := git.Pipe() + pipes, err := git.NewPipePairs(2) if err != nil { return nil, err } - nameRevStdinReader, nameRevStdinWriter, err := git.Pipe() - if err != nil { - _ = shasToNameReader.Close() - _ = shasToNameWriter.Close() - return nil, err - } + + defer func() { + for _, pipe := range pipes { + _ = pipe.Close() + } + } () + + shasToNameReader, shasToNameWriter := pipes[0].ReaderWriter() + nameRevStdinReader, nameRevStdinWriter := pipes[1].ReaderWriter() errChan := make(chan error, 1) wg := sync.WaitGroup{} diff --git a/modules/git/pipeline/lfs_nogogit.go b/modules/git/pipeline/lfs_nogogit.go index 9b7a54a512152..f3069f01be360 100644 --- a/modules/git/pipeline/lfs_nogogit.go +++ b/modules/git/pipeline/lfs_nogogit.go @@ -203,6 +203,13 @@ func FindLFSFile(repo *git.Repository, hash git.SHA1) ([]*LFSResult, error) { return nil, err } + defer func() { + for _, pipe := range pipes { + _ = pipe.Close() + } + } + + shasToNameReader, shasToNameWriter := pipes[0].ReaderWriter() nameRevStdinReader, nameRevStdinWriter := pipes[1].ReaderWriter() From 7002cd2cb24e7066ea575dd0ffb5e8f121474040 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 25 Apr 2022 20:22:32 +0100 Subject: [PATCH 10/12] oops Signed-off-by: Andrew Thornton --- modules/git/pipeline/lfs_nogogit.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/git/pipeline/lfs_nogogit.go b/modules/git/pipeline/lfs_nogogit.go index f3069f01be360..b4e9c886b3564 100644 --- a/modules/git/pipeline/lfs_nogogit.go +++ b/modules/git/pipeline/lfs_nogogit.go @@ -207,8 +207,7 @@ func FindLFSFile(repo *git.Repository, hash git.SHA1) ([]*LFSResult, error) { for _, pipe := range pipes { _ = pipe.Close() } - } - + }() shasToNameReader, shasToNameWriter := pipes[0].ReaderWriter() nameRevStdinReader, nameRevStdinWriter := pipes[1].ReaderWriter() From 37ffc85be92176b85619e01aefdd755447eeca41 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 25 Apr 2022 20:59:07 +0100 Subject: [PATCH 11/12] as per review Signed-off-by: Andrew Thornton --- modules/git/pipeline/lfs.go | 7 +---- modules/git/pipeline/lfs_nogogit.go | 7 +---- modules/git/utils_pipe.go | 18 ++++++++++- modules/lfs/pointer_scanner_nogogit.go | 43 ++++++++++---------------- services/pull/lfs.go | 8 +---- 5 files changed, 36 insertions(+), 47 deletions(-) diff --git a/modules/git/pipeline/lfs.go b/modules/git/pipeline/lfs.go index 60d24b49991bf..77cb4365a454c 100644 --- a/modules/git/pipeline/lfs.go +++ b/modules/git/pipeline/lfs.go @@ -103,12 +103,7 @@ func FindLFSFile(repo *git.Repository, hash git.SHA1) ([]*LFSResult, error) { if err != nil { return nil, err } - - defer func() { - for _, pipe := range pipes { - _ = pipe.Close() - } - } () + defer pipes.Close() shasToNameReader, shasToNameWriter := pipes[0].ReaderWriter() nameRevStdinReader, nameRevStdinWriter := pipes[1].ReaderWriter() diff --git a/modules/git/pipeline/lfs_nogogit.go b/modules/git/pipeline/lfs_nogogit.go index b4e9c886b3564..4fac5b5c246c8 100644 --- a/modules/git/pipeline/lfs_nogogit.go +++ b/modules/git/pipeline/lfs_nogogit.go @@ -202,12 +202,7 @@ func FindLFSFile(repo *git.Repository, hash git.SHA1) ([]*LFSResult, error) { if err != nil { return nil, err } - - defer func() { - for _, pipe := range pipes { - _ = pipe.Close() - } - }() + defer pipes.Close() shasToNameReader, shasToNameWriter := pipes[0].ReaderWriter() nameRevStdinReader, nameRevStdinWriter := pipes[1].ReaderWriter() diff --git a/modules/git/utils_pipe.go b/modules/git/utils_pipe.go index 6b6f7cd5cbbd7..891fbef8a8ce3 100644 --- a/modules/git/utils_pipe.go +++ b/modules/git/utils_pipe.go @@ -210,7 +210,7 @@ func NewPipePair() (*PipePair, error) { } // NewPipePairs will return a slice of n PipePairs or an error -func NewPipePairs(n int) ([]*PipePair, error) { +func NewPipePairs(n int) (PipePairs, error) { pipePairs := make([]*PipePair, 0, n) for i := 0; i < n; i++ { pipe, err := NewPipePair() @@ -225,3 +225,19 @@ func NewPipePairs(n int) ([]*PipePair, error) { } return pipePairs, nil } + +type PipePairs []*PipePair + +// Close closes the PipePairs +func (pairs PipePairs) Close() error { + return pairs.CloseWithError(nil) +} + +// CloseWithError closes the pipe pair +func (pairs PipePairs) CloseWithError(err error) error { + for _, p := range pairs { + _ = p.closeRead(err) + _ = p.closeWrite(err) + } + return nil +} diff --git a/modules/lfs/pointer_scanner_nogogit.go b/modules/lfs/pointer_scanner_nogogit.go index 32b4a7fd71ad3..2af726469e4c0 100644 --- a/modules/lfs/pointer_scanner_nogogit.go +++ b/modules/lfs/pointer_scanner_nogogit.go @@ -23,24 +23,24 @@ import ( func SearchPointerBlobs(ctx context.Context, repo *git.Repository, pointerChan chan<- PointerBlob, errChan chan<- error) { basePath := repo.Path - fail := func(err error) { - errChan <- err - close(pointerChan) - close(errChan) + // We will need to run batch-check on all the objects in the repository + // in Git 2.6.0+ we can use `git cat-file --batch-check --batch-all-objects` + // However in earlier versions we'll need to use rev-list to get the objects. + gitHasBatchCheckAllObjects := git.CheckGitVersionAtLeast("2.6.0") == nil + + numPipesRequired := 3 + if !gitHasBatchCheckAllObjects { + numPipesRequired += 2 } - pipes, err := git.NewPipePairs(3) + pipes, err := git.NewPipePairs(numPipesRequired) if err != nil { - fail(err) + errChan <- err + close(pointerChan) + close(errChan) return } - - closeAll := func(err error) { - for _, closer := range pipes { - _ = closer.CloseWithError(err) - } - } - defer closeAll(nil) + defer pipes.Close() catFileCheckReader, catFileCheckWriter := pipes[0].ReaderWriter() shasToBatchReader, shasToBatchWriter := pipes[1].ReaderWriter() @@ -62,20 +62,9 @@ func SearchPointerBlobs(ctx context.Context, repo *git.Repository, pointerChan c go pipeline.BlobsLessThan1024FromCatFileBatchCheck(catFileCheckReader, shasToBatchWriter, &wg) // 1. Run batch-check on all objects in the repository - if git.CheckGitVersionAtLeast("2.6.0") != nil { - - morePipes, err := git.NewPipePairs(2) - if err != nil { - wg.Done() - closeAll(err) - wg.Wait() - fail(err) - return - } - - revListReader, revListWriter := morePipes[0].ReaderWriter() - shasToCheckReader, shasToCheckWriter := morePipes[1].ReaderWriter() - pipes = append(pipes, morePipes...) + if !gitHasBatchCheckAllObjects { + revListReader, revListWriter := pipes[3].ReaderWriter() + shasToCheckReader, shasToCheckWriter := pipes[4].ReaderWriter() wg.Add(2) go pipeline.CatFileBatchCheck(ctx, shasToCheckReader, catFileCheckWriter, &wg, basePath) diff --git a/services/pull/lfs.go b/services/pull/lfs.go index ee7c3faae6716..08741e963802d 100644 --- a/services/pull/lfs.go +++ b/services/pull/lfs.go @@ -32,13 +32,7 @@ func LFSPush(ctx context.Context, tmpBasePath, mergeHeadSHA, mergeBaseSHA string if err != nil { return err } - - closeAll := func(err error) { - for _, closer := range pipes { - _ = closer.CloseWithError(err) - } - } - defer closeAll(nil) + defer pipes.Close() revListReader, revListWriter := pipes[0].ReaderWriter() shasToCheckReader, shasToCheckWriter := pipes[1].ReaderWriter() From 051fe40f9f544fba82a29a8e49ccb66ede6a80a5 Mon Sep 17 00:00:00 2001 From: Andrew Thornton Date: Mon, 25 Apr 2022 21:03:26 +0100 Subject: [PATCH 12/12] adjust doc comment Signed-off-by: Andrew Thornton --- modules/git/utils_pipe.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/git/utils_pipe.go b/modules/git/utils_pipe.go index 891fbef8a8ce3..1e7e79569a3a9 100644 --- a/modules/git/utils_pipe.go +++ b/modules/git/utils_pipe.go @@ -228,12 +228,12 @@ func NewPipePairs(n int) (PipePairs, error) { type PipePairs []*PipePair -// Close closes the PipePairs +// Close closes the pipe pairs func (pairs PipePairs) Close() error { return pairs.CloseWithError(nil) } -// CloseWithError closes the pipe pair +// CloseWithError closes the pipe pairs func (pairs PipePairs) CloseWithError(err error) error { for _, p := range pairs { _ = p.closeRead(err)