diff --git a/CHANGELOG.md b/CHANGELOG.md index 290e6ff6..331a22eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +⚠️ Version 0.24.0 has a breaking change in `HookWorkEnd.WorkEnd` in that a new `JobRow` parameter has been added to the function's signature. Any intergration defining a custom `HookWorkEnd` hook should update its implementation so the hook continues to be called correctly. + ⚠️ Internal APIs used for communication between River and River Pro have changed. If using River Pro, make sure to update River and River Pro to latest at the same time to get compatible versions. River v0.24.0 is compatible with River Pro v0.16.0. ### Added @@ -19,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Bring all driver tests into separate package so they don't leak dependencies. This removes dependencies from the top level `river` package that most River installations won't need, thereby reducing the transitive dependency load of most River installations. [PR #955](https://github.com/riverqueue/river/pull/955). - The reindexer maintenance service now reindexes all `river_job` indexes, including its primary key. This is expected to help in situations where the jobs table has in the past expanded to a very large size (which makes most indexes larger), is now a much more modest size, but has left the indexes in their expanded state. [PR #963](https://github.com/riverqueue/river/pull/963). - The River CLI now accepts a `--target-version` of 0 with `river migrate-down` to run all down migrations and remove all River tables (previously, -1 was used for this; -1 still works, but now 0 also works). [PR #966](https://github.com/riverqueue/river/pull/966). +- **Breaking change:** The `HookWorkEnd` interface's `WorkEnd` function now receives a `JobRow` parameter in addition to the `error` it received before. Having a `JobRow` to work with is fairly crucial to most functionality that a hook would implement, and its previous omission was entirely an error. [PR #970](https://github.com/riverqueue/river/pull/970). ### Fixed diff --git a/client_test.go b/client_test.go index 287b7b6d..31942875 100644 --- a/client_test.go +++ b/client_test.go @@ -813,7 +813,7 @@ func Test_Client(t *testing.T) { workEndHookCalled := false bundle.config.Hooks = []rivertype.Hook{ - HookWorkEndFunc(func(ctx context.Context, err error) error { + HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { workEndHookCalled = true return err }), @@ -1433,7 +1433,7 @@ func (metadataHookWorkBegin) WorkBegin(ctx context.Context, job *rivertype.JobRo type metadataHookWorkEnd struct{ rivertype.Hook } -func (metadataHookWorkEnd) WorkEnd(ctx context.Context, err error) error { +func (metadataHookWorkEnd) WorkEnd(ctx context.Context, job *rivertype.JobRow, err error) error { metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx) if !hasMetadataUpdates { panic("expected to be called from within job executor") diff --git a/hook_defaults_funcs.go b/hook_defaults_funcs.go index 1b183ac4..32ab300c 100644 --- a/hook_defaults_funcs.go +++ b/hook_defaults_funcs.go @@ -35,10 +35,10 @@ func (f HookWorkBeginFunc) IsHook() bool { return true } // HookWorkEndFunc is a convenience helper for implementing // rivertype.HookworkEnd using a simple function instead of a struct. -type HookWorkEndFunc func(ctx context.Context, err error) error +type HookWorkEndFunc func(ctx context.Context, job *rivertype.JobRow, err error) error -func (f HookWorkEndFunc) WorkEnd(ctx context.Context, err error) error { - return f(ctx, err) +func (f HookWorkEndFunc) WorkEnd(ctx context.Context, job *rivertype.JobRow, err error) error { + return f(ctx, job, err) } func (f HookWorkEndFunc) IsHook() bool { return true } diff --git a/internal/hooklookup/hook_lookup_test.go b/internal/hooklookup/hook_lookup_test.go index 71f4622a..2b931135 100644 --- a/internal/hooklookup/hook_lookup_test.go +++ b/internal/hooklookup/hook_lookup_test.go @@ -267,6 +267,6 @@ var _ rivertype.HookWorkEnd = &testHookWorkEnd{} type testHookWorkEnd struct{ rivertype.Hook } -func (t *testHookWorkEnd) WorkEnd(ctx context.Context, err error) error { +func (t *testHookWorkEnd) WorkEnd(ctx context.Context, job *rivertype.JobRow, err error) error { return nil } diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index 640d6ed7..af9438ec 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -215,7 +215,7 @@ func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { e.HookLookupGlobal.ByHookKind(hooklookup.HookKindWorkEnd), e.WorkUnit.HookLookup(e.HookLookupByJob).ByHookKind(hooklookup.HookKindWorkEnd)..., ) { - err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, err) //nolint:forcetypeassert + err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, e.JobRow, err) //nolint:forcetypeassert } } diff --git a/internal/jobexecutor/job_executor_test.go b/internal/jobexecutor/job_executor_test.go index 281618e5..d4f52306 100644 --- a/internal/jobexecutor/job_executor_test.go +++ b/internal/jobexecutor/job_executor_test.go @@ -819,7 +819,7 @@ func TestJobExecutor_Execute(t *testing.T) { workBeginCalled = true return nil }), - HookWorkEndFunc(func(ctx context.Context, err error) error { + HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { workEndCalled = true return err }), @@ -845,12 +845,12 @@ func TestJobExecutor_Execute(t *testing.T) { workEnd2Called bool ) executor.HookLookupGlobal = hooklookup.NewHookLookup([]rivertype.Hook{ - HookWorkEndFunc(func(ctx context.Context, err error) error { + HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { workEnd1Called = true require.EqualError(t, err, "job error") return err }), - HookWorkEndFunc(func(ctx context.Context, err error) error { + HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { workEnd2Called = true require.EqualError(t, err, "job error") return err @@ -879,12 +879,12 @@ func TestJobExecutor_Execute(t *testing.T) { workEnd2Called bool ) executor.HookLookupGlobal = hooklookup.NewHookLookup([]rivertype.Hook{ - HookWorkEndFunc(func(ctx context.Context, err error) error { + HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { workEnd1Called = true require.EqualError(t, err, "job error") return err }), - HookWorkEndFunc(func(ctx context.Context, err error) error { + HookWorkEndFunc(func(ctx context.Context, job *rivertype.JobRow, err error) error { workEnd2Called = true require.EqualError(t, err, "job error") return nil // second hook suppresses the error @@ -917,10 +917,10 @@ func (f HookWorkBeginFunc) WorkBegin(ctx context.Context, job *rivertype.JobRow) func (f HookWorkBeginFunc) IsHook() bool { return true } -type HookWorkEndFunc func(ctx context.Context, err error) error +type HookWorkEndFunc func(ctx context.Context, job *rivertype.JobRow, err error) error -func (f HookWorkEndFunc) WorkEnd(ctx context.Context, err error) error { - return f(ctx, err) +func (f HookWorkEndFunc) WorkEnd(ctx context.Context, job *rivertype.JobRow, err error) error { + return f(ctx, job, err) } func (f HookWorkEndFunc) IsHook() bool { return true } diff --git a/rivertype/river_type.go b/rivertype/river_type.go index c12b1c01..0519ac65 100644 --- a/rivertype/river_type.go +++ b/rivertype/river_type.go @@ -300,6 +300,7 @@ type JobInsertParams struct { // List of hook interfaces that may be implemented: // - HookInsertBegin // - HookWorkBegin +// - HookWorkEnd // // More operation-specific interfaces may be added in future versions. type Hook interface { @@ -314,6 +315,7 @@ type Hook interface { type HookInsertBegin interface { Hook + // InsertBegin is invoked just before a job is inserted to the database. InsertBegin(ctx context.Context, params *JobInsertParams) error } @@ -322,6 +324,16 @@ type HookInsertBegin interface { type HookWorkBegin interface { Hook + // WorkBegin is invoked after a job has been locked and assigned to a + // particular executor for work and just before the job is actually worked. + // + // Returning an error from any HookWorkBegin hook will abort the job early + // such that it has an error set and doesn't work, with a retry scheduled + // according to its retry policy. + // + // This function doesn't return a context so any context set in WorkBegin is + // discarded after the function returns. If persistent context needs to be + // set, middleware should be used instead. WorkBegin(ctx context.Context, job *JobRow) error } @@ -338,7 +350,7 @@ type HookWorkEnd interface { // // err := e.WorkUnit.Work(ctx) // for _, hook := range hooks { - // err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, err) + // err = hook.(rivertype.HookWorkEnd).WorkEnd(ctx, e.JobRow, err) // } // return err // @@ -346,10 +358,14 @@ type HookWorkEnd interface { // return whatever error value it received as its argument whether that // error is nil or not. // + // The JobRow received by WorkEnd is the same one passed to HookWorkBegin's + // WorkBegin. Its state, errors, next scheduled at time, etc. have not yet + // been updated based on the latest work result. + // // Will not receive a common context related to HookWorkBegin because // WorkBegin doesn't return a context. Middleware should be used for this // sort of shared context instead. - WorkEnd(ctx context.Context, err error) error + WorkEnd(ctx context.Context, job *JobRow, err error) error } // Middleware is an arbitrary interface for a struct which will execute some