From c794edb67eb5457847940d0c57d8c98b26d5cc84 Mon Sep 17 00:00:00 2001 From: Jason Song Date: Wed, 28 Jun 2023 14:45:36 +0800 Subject: [PATCH 1/7] fix: set LogInStorage when stopTasks --- services/actions/clear_tasks.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/services/actions/clear_tasks.go b/services/actions/clear_tasks.go index 0616a5fc0dcb2..d2893e4f23e02 100644 --- a/services/actions/clear_tasks.go +++ b/services/actions/clear_tasks.go @@ -56,12 +56,20 @@ func stopTasks(ctx context.Context, opts actions_model.FindTaskOptions) error { return nil }); err != nil { log.Warn("Cannot stop task %v: %v", task.ID, err) - // go on - } else if remove, err := actions.TransferLogs(ctx, task.LogFilename); err != nil { + continue + } + + remove, err := actions.TransferLogs(ctx, task.LogFilename) + if err != nil { log.Warn("Cannot transfer logs of task %v: %v", task.ID, err) - } else { - remove() + continue + } + task.LogInStorage = true + if err := actions_model.UpdateTask(ctx, task, "log_in_storage"); err != nil { + log.Warn("Cannot update task %v: %v", task.ID, err) + continue } + remove() } CreateCommitStatus(ctx, jobs...) From b43250a652038c9b47f601671c70200b621e234d Mon Sep 17 00:00:00 2001 From: Jason Song Date: Wed, 28 Jun 2023 16:57:05 +0800 Subject: [PATCH 2/7] fix: don't update log for done task --- routers/api/actions/runner/runner.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index 17801cb32202f..ecfda5500fa5c 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -210,8 +210,15 @@ func (s *Service) UpdateLog( task, err := actions_model.GetTaskByID(ctx, req.Msg.TaskId) if err != nil { + if errors.Is(err, util.ErrNotExist) { + return nil, status.Errorf(codes.NotFound, "task %d not found", req.Msg.TaskId) + } return nil, status.Errorf(codes.Internal, "get task: %v", err) } + if task.Status.IsDone() { + return nil, status.Errorf(codes.FailedPrecondition, "task %d is done", req.Msg.TaskId) + } + ack := task.LogLength if len(req.Msg.Rows) == 0 || req.Msg.Index > ack || int64(len(req.Msg.Rows))+req.Msg.Index <= ack { @@ -219,10 +226,6 @@ func (s *Service) UpdateLog( return res, nil } - if task.LogInStorage { - return nil, status.Errorf(codes.AlreadyExists, "log file has been archived") - } - rows := req.Msg.Rows[ack-req.Msg.Index:] ns, err := actions.WriteLogs(ctx, task.LogFilename, task.LogSize, rows) if err != nil { From dc708bc5d91b415fe9523571e632120e42e48319 Mon Sep 17 00:00:00 2001 From: Jason Song Date: Wed, 28 Jun 2023 17:15:50 +0800 Subject: [PATCH 3/7] fix: update updated of task --- models/actions/task.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/models/actions/task.go b/models/actions/task.go index 79b1d46dd0d5f..719fd193657f3 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -344,6 +344,9 @@ func UpdateTask(ctx context.Context, task *ActionTask, cols ...string) error { return err } +// UpdateTaskByState updates the task by the state. +// It will always update the task if the state is not final, even there is no change. +// So it will update ActionTask.Updated to avoid the task being judged as a zombie task. func UpdateTaskByState(ctx context.Context, state *runnerv1.TaskState) (*ActionTask, error) { stepStates := map[int64]*runnerv1.StepState{} for _, v := range state.Steps { @@ -384,6 +387,12 @@ func UpdateTaskByState(ctx context.Context, state *runnerv1.TaskState) (*ActionT }, nil); err != nil { return nil, err } + } else { + // Force update ActionTask.Updated to avoid the task being judged as a zombie task + task.Updated = timeutil.TimeStampNow() + if err := UpdateTask(ctx, task, "updated"); err != nil { + return nil, err + } } if err := task.LoadAttributes(ctx); err != nil { From 3fc1121e75e86c1b1180b1a2f7bb997f9ba6f817 Mon Sep 17 00:00:00 2001 From: Jason Song Date: Wed, 28 Jun 2023 17:28:58 +0800 Subject: [PATCH 4/7] fix: refuse to create file with holes --- modules/actions/log.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/modules/actions/log.go b/modules/actions/log.go index 3868101992a95..d00b36374cb04 100644 --- a/modules/actions/log.go +++ b/modules/actions/log.go @@ -29,8 +29,13 @@ const ( ) func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) { + flag := os.O_WRONLY + if offset == 0 { + // Create file only if offset is 0, or it could result in a file with content holes if the file doesn't exist. + flag |= os.O_CREATE + } name := DBFSPrefix + filename - f, err := dbfs.OpenFile(ctx, name, os.O_WRONLY|os.O_CREATE) + f, err := dbfs.OpenFile(ctx, name, flag) if err != nil { return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err) } From be3dff36b2b003d9c0ed211e1dd526f14a69b2d6 Mon Sep 17 00:00:00 2001 From: Jason Song Date: Wed, 28 Jun 2023 18:00:28 +0800 Subject: [PATCH 5/7] feat: support Stat for dbfs --- models/dbfs/dbfile.go | 18 ++++++++++++++++++ models/dbfs/dbfs.go | 29 +++++++++++++++++++++++++++++ models/dbfs/dbfs_test.go | 13 +++++++++++++ 3 files changed, 60 insertions(+) diff --git a/models/dbfs/dbfile.go b/models/dbfs/dbfile.go index bac1cb9eb608a..3650ce057e4c5 100644 --- a/models/dbfs/dbfile.go +++ b/models/dbfs/dbfile.go @@ -7,6 +7,7 @@ import ( "context" "errors" "io" + "io/fs" "os" "path/filepath" "strconv" @@ -21,6 +22,7 @@ var defaultFileBlockSize int64 = 32 * 1024 type File interface { io.ReadWriteCloser io.Seeker + fs.File } type file struct { @@ -193,10 +195,26 @@ func (f *file) Close() error { return nil } +func (f *file) Stat() (os.FileInfo, error) { + if f.metaID == 0 { + return nil, os.ErrInvalid + } + + fileMeta, err := findFileMetaByID(f.ctx, f.metaID) + if err != nil { + return nil, err + } + return fileMeta, nil +} + func timeToFileTimestamp(t time.Time) int64 { return t.UnixMicro() } +func fileTimestampToTime(timestamp int64) time.Time { + return time.UnixMicro(timestamp) +} + func (f *file) loadMetaByPath() (*dbfsMeta, error) { var fileMeta dbfsMeta if ok, err := db.GetEngine(f.ctx).Where("full_path = ?", f.fullPath).Get(&fileMeta); err != nil { diff --git a/models/dbfs/dbfs.go b/models/dbfs/dbfs.go index 6b5b3beeb2745..f68b4a2b70b48 100644 --- a/models/dbfs/dbfs.go +++ b/models/dbfs/dbfs.go @@ -5,7 +5,10 @@ package dbfs import ( "context" + "io/fs" "os" + "path" + "time" "code.gitea.io/gitea/models/db" ) @@ -100,3 +103,29 @@ func Remove(ctx context.Context, name string) error { defer f.Close() return f.delete() } + +var _ fs.FileInfo = (*dbfsMeta)(nil) + +func (m *dbfsMeta) Name() string { + return path.Base(m.FullPath) +} + +func (m *dbfsMeta) Size() int64 { + return m.FileSize +} + +func (m *dbfsMeta) Mode() fs.FileMode { + return os.ModePerm +} + +func (m *dbfsMeta) ModTime() time.Time { + return fileTimestampToTime(m.ModifyTimestamp) +} + +func (m *dbfsMeta) IsDir() bool { + return false +} + +func (m *dbfsMeta) Sys() any { + return nil +} diff --git a/models/dbfs/dbfs_test.go b/models/dbfs/dbfs_test.go index 300758c623ad7..96cb1014c71f8 100644 --- a/models/dbfs/dbfs_test.go +++ b/models/dbfs/dbfs_test.go @@ -111,6 +111,19 @@ func TestDbfsBasic(t *testing.T) { _, err = OpenFile(db.DefaultContext, "test2.txt", os.O_RDONLY) assert.Error(t, err) + + // test stat + f, err = OpenFile(db.DefaultContext, "test/test.txt", os.O_RDWR|os.O_CREATE) + assert.NoError(t, err) + stat, err := f.Stat() + assert.NoError(t, err) + assert.EqualValues(t, "test.txt", stat.Name()) + assert.EqualValues(t, 0, stat.Size()) + _, err = f.Write([]byte("0123456789")) + assert.NoError(t, err) + stat, err = f.Stat() + assert.NoError(t, err) + assert.EqualValues(t, 10, stat.Size()) } func TestDbfsReadWrite(t *testing.T) { From eb4c6b561241e791f549fd7f6aeb890dacd74b5e Mon Sep 17 00:00:00 2001 From: Jason Song Date: Wed, 28 Jun 2023 18:04:11 +0800 Subject: [PATCH 6/7] fix: refuse to write with holes --- modules/actions/log.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/modules/actions/log.go b/modules/actions/log.go index d00b36374cb04..36bed931fadf1 100644 --- a/modules/actions/log.go +++ b/modules/actions/log.go @@ -31,7 +31,7 @@ const ( func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runnerv1.LogRow) ([]int, error) { flag := os.O_WRONLY if offset == 0 { - // Create file only if offset is 0, or it could result in a file with content holes if the file doesn't exist. + // Create file only if offset is 0, or it could result in content holes if the file doesn't exist. flag |= os.O_CREATE } name := DBFSPrefix + filename @@ -40,6 +40,17 @@ func WriteLogs(ctx context.Context, filename string, offset int64, rows []*runne return nil, fmt.Errorf("dbfs OpenFile %q: %w", name, err) } defer f.Close() + + stat, err := f.Stat() + if err != nil { + return nil, fmt.Errorf("dbfs Stat %q: %w", name, err) + } + if stat.Size() < offset { + // If the size is less than offset, refuse to write, or it could result in content holes. + // However, if the size is greater than offset, we can still write to overwrite the content. + return nil, fmt.Errorf("size of %q is less than offset", name) + } + if _, err := f.Seek(offset, io.SeekStart); err != nil { return nil, fmt.Errorf("dbfs Seek %q: %w", name, err) } From 7dc2e89ef012b6b0863c417c47e925a7e4f29284 Mon Sep 17 00:00:00 2001 From: Jason Song Date: Wed, 28 Jun 2023 18:14:29 +0800 Subject: [PATCH 7/7] Revert "fix: don't update log for done task" This reverts commit b43250a652038c9b47f601671c70200b621e234d. --- routers/api/actions/runner/runner.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/routers/api/actions/runner/runner.go b/routers/api/actions/runner/runner.go index ecfda5500fa5c..17801cb32202f 100644 --- a/routers/api/actions/runner/runner.go +++ b/routers/api/actions/runner/runner.go @@ -210,15 +210,8 @@ func (s *Service) UpdateLog( task, err := actions_model.GetTaskByID(ctx, req.Msg.TaskId) if err != nil { - if errors.Is(err, util.ErrNotExist) { - return nil, status.Errorf(codes.NotFound, "task %d not found", req.Msg.TaskId) - } return nil, status.Errorf(codes.Internal, "get task: %v", err) } - if task.Status.IsDone() { - return nil, status.Errorf(codes.FailedPrecondition, "task %d is done", req.Msg.TaskId) - } - ack := task.LogLength if len(req.Msg.Rows) == 0 || req.Msg.Index > ack || int64(len(req.Msg.Rows))+req.Msg.Index <= ack { @@ -226,6 +219,10 @@ func (s *Service) UpdateLog( return res, nil } + if task.LogInStorage { + return nil, status.Errorf(codes.AlreadyExists, "log file has been archived") + } + rows := req.Msg.Rows[ack-req.Msg.Index:] ns, err := actions.WriteLogs(ctx, task.LogFilename, task.LogSize, rows) if err != nil {