-
Notifications
You must be signed in to change notification settings - Fork 867
feat: [shard-distributor]Send "draining" heartbeat on executer shutdown #7498
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: [shard-distributor]Send "draining" heartbeat on executer shutdown #7498
Conversation
Signed-off-by: Gaziza Yestemirova <[email protected]>
Signed-off-by: Gaziza Yestemirova <[email protected]>
Signed-off-by: Gaziza Yestemirova <[email protected]>
| func registerExecutorLifecycle(params lifecycleParams) { | ||
| params.Lifecycle.Append(fx.Hook{ | ||
| OnStart: func(ctx context.Context) error { | ||
| if err := params.Dispatcher.Start(); err != nil { | ||
| return err | ||
| } | ||
| for _, executor := range params.FixedExecutors { | ||
| executor.Start(ctx) | ||
| } | ||
| for _, executor := range params.EphemeralExecutors { | ||
| executor.Start(ctx) | ||
| } | ||
| return nil | ||
| }, | ||
| OnStop: func(ctx context.Context) error { | ||
| for _, executor := range params.FixedExecutors { | ||
| executor.Stop() | ||
| } | ||
| for _, executor := range params.EphemeralExecutors { | ||
| executor.Stop() | ||
| } | ||
| return params.Dispatcher.Stop() | ||
| }, | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite unexpected to canary needs to explicitely .Start and .Stop executors + care about order.
Does that mean other clients (executors) should do that as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @jakobht
| "github.com/uber-go/tally" | ||
| "go.uber.org/fx" | ||
| "go.uber.org/fx/fxtest" | ||
| ubergomock "go.uber.org/mock/gomock" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think we need both "github.com/golang/mock/gomock" and "go.uber.org/mock/gomock".
I suggest paying attention every time there is an import alias, in lots of cases it is questionable.
| if e.heartBeatInterval > 0 && e.heartBeatInterval < drainingHeartbeatTimeout { | ||
| return e.heartBeatInterval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if e.heartBeatInterval > 0 { return min(e.heartBeatInterval, drainingHeartbeatTimeout) }
Btw. is that possible e.heartBeatInterval == 0? It means no heartbeat?
| assert.Equal(t, "test-namespace", req.Namespace) | ||
| assert.Equal(t, "test-executor-id", req.ExecutorID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Maybe to move part of to shard check? I believe having this in the DRAINING call is as important
| }, | ||
| MigrationMode: types.MigrationModeONBOARDED, | ||
| }, nil) | ||
| callCount := 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo in the function name "TestHeartBeartLoop"
| assert.Equal(t, types.ExecutorStatusDRAINING, req.Status) | ||
| assert.Empty(t, req.ShardStatusReports) | ||
| return &types.ExecutorHeartbeatResponse{}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd recommend moving this repeating check to embedded function like checkDrainingCall
| assert.Equal(t, mockShardProcessor, processor) | ||
| } | ||
|
|
||
| func TestStopWithoutHeartbeatDoesNotSendDraining(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all the functions use goleak. Let's be consistent.
| executor := &executorImpl[*MockShardProcessor]{ | ||
| logger: log.NewNoop(), | ||
| shardDistributorClient: mockShardDistributorClient, | ||
| namespace: "test-namespace", | ||
| executorID: "test-executor-id", | ||
| metrics: tally.NoopScope, | ||
| stopC: make(chan struct{}), | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if all this executor constructions could be moved to something like initTest() with one or two params?
| func registerExecutorLifecycle(params lifecycleParams) { | ||
| params.Lifecycle.Append(fx.Hook{ | ||
| OnStart: func(ctx context.Context) error { | ||
| if err := params.Dispatcher.Start(); err != nil { | ||
| return err | ||
| } | ||
| for _, executor := range params.FixedExecutors { | ||
| executor.Start(ctx) | ||
| } | ||
| for _, executor := range params.EphemeralExecutors { | ||
| executor.Start(ctx) | ||
| } | ||
| return nil | ||
| }, | ||
| OnStop: func(ctx context.Context) error { | ||
| for _, executor := range params.FixedExecutors { | ||
| executor.Stop() | ||
| } | ||
| for _, executor := range params.EphemeralExecutors { | ||
| executor.Stop() | ||
| } | ||
| return params.Dispatcher.Stop() | ||
| }, | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @jakobht
| mockLifecycle := tt.params.Lifecycle.(*mockLifecycle) | ||
|
|
||
| registerExecutorLifecycle(tt.params) | ||
| assert.Equal(t, tt.expectedHookCount, mockLifecycle.hookCount) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would recommend checking .Start and .Stop calls of executor.
Otherwise it's not even clear why we compare it with expectedHookCount = 1 both times.
btw, I wonder if fxtest would help with such testing since essentially you testing uberfx integration.
What changed?
Why?
Shard-distributor should be notified about the executors that are shutting down to reassign the shards and keep track of ownership.
How did you test it?

unit-tests and local testing with etcd:
Potential risks
Release notes
Documentation Changes