diff --git a/cmd/sharddistributor-canary/main.go b/cmd/sharddistributor-canary/main.go index 350cb3e4dbc..c436e698e39 100644 --- a/cmd/sharddistributor-canary/main.go +++ b/cmd/sharddistributor-canary/main.go @@ -73,11 +73,6 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option { fx.Provide(zap.NewDevelopment), fx.Provide(log.NewLogger), - // Start the YARPC dispatcher - fx.Invoke(func(lc fx.Lifecycle, dispatcher *yarpc.Dispatcher) { - lc.Append(fx.StartStopHook(dispatcher.Start, dispatcher.Stop)) - }), - // Include the canary module canary.Module(canary.NamespacesNames{FixedNamespace: fixedNamespace, EphemeralNamespace: ephemeralNamespace, ExternalAssignmentNamespace: executors.ExternalAssignmentNamespace, SharddistributorServiceName: shardDistributorServiceName}), ) diff --git a/service/sharddistributor/canary/executors/executors.go b/service/sharddistributor/canary/executors/executors.go index 2216089944f..dc75231882c 100644 --- a/service/sharddistributor/canary/executors/executors.go +++ b/service/sharddistributor/canary/executors/executors.go @@ -66,22 +66,6 @@ func NewExecutorExternalAssignmentNamespace(params executorclient.Params[*proces return ExecutorEphemeralResult{Executor: executor}, assigner, err } -type ExecutorsParams struct { - fx.In - Lc fx.Lifecycle - ExecutorsFixed []executorclient.Executor[*processor.ShardProcessor] `group:"executor-fixed-proc"` - Executorsephemeral []executorclient.Executor[*processorephemeral.ShardProcessor] `group:"executor-ephemeral-proc"` -} - -func NewExecutorsModule(params ExecutorsParams) { - for _, e := range params.ExecutorsFixed { - params.Lc.Append(fx.StartStopHook(e.Start, e.Stop)) - } - for _, e := range params.Executorsephemeral { - params.Lc.Append(fx.StartStopHook(e.Start, e.Stop)) - } -} - func Module(fixedNamespace, ephemeralNamespace, externalAssignmentNamespace string) fx.Option { return fx.Module( "Executors", @@ -104,6 +88,5 @@ func Module(fixedNamespace, ephemeralNamespace, externalAssignmentNamespace stri lifecycle.Append(fx.StartStopHook(shardAssigner.Start, shardAssigner.Stop)) }), ), - fx.Invoke(NewExecutorsModule), ) } diff --git a/service/sharddistributor/canary/executors/executors_test.go b/service/sharddistributor/canary/executors/executors_test.go index db4b8600270..9a1dd902426 100755 --- a/service/sharddistributor/canary/executors/executors_test.go +++ b/service/sharddistributor/canary/executors/executors_test.go @@ -7,7 +7,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" - "go.uber.org/fx" "go.uber.org/mock/gomock" "github.com/uber/cadence/client/sharddistributor" @@ -20,14 +19,6 @@ import ( ) // mockLifecycle is a simple mock implementation of fx.Lifecycle for testing -type mockLifecycle struct { - hookCount int -} - -func (m *mockLifecycle) Append(hook fx.Hook) { - m.hookCount++ -} - func TestNewExecutorsFixedNamespace(t *testing.T) { ctrl := gomock.NewController(t) tests := []struct { @@ -163,52 +154,6 @@ func TestNewExecutor_InvalidConfig(t *testing.T) { } } -func TestNewExecutorsModule(t *testing.T) { - ctrl := gomock.NewController(t) - // Create a mock lifecycle - tests := []struct { - name string - params ExecutorsParams - expectedInvocation int - }{ - { - name: "multiple executors", - params: ExecutorsParams{ - ExecutorsFixed: []executorclient.Executor[*processor.ShardProcessor]{ - executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl), - executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl), - }, - Executorsephemeral: []executorclient.Executor[*processorephemeral.ShardProcessor]{ - executorclient.NewMockExecutor[*processorephemeral.ShardProcessor](ctrl), - }, - }, - expectedInvocation: 3, - }, - { - name: "no executors", - params: ExecutorsParams{ - ExecutorsFixed: []executorclient.Executor[*processor.ShardProcessor]{}, - Executorsephemeral: []executorclient.Executor[*processorephemeral.ShardProcessor]{}, - }, - expectedInvocation: 0, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - mockLifecycle := &mockLifecycle{} - tt.params.Lc = mockLifecycle - // Call NewExecutorsModule - it should not panic or error - // The function doesn't return anything, so we just verify it executes successfully - require.NotPanics(t, func() { - NewExecutorsModule(tt.params) - }) - // Verify that lifecycle hooks were registered for all executors - assert.Equal(t, tt.expectedInvocation, mockLifecycle.hookCount) - }) - } -} - // Helper functions to create mock parameters func createMockParams[SP executorclient.ShardProcessor]( ctrl *gomock.Controller, diff --git a/service/sharddistributor/canary/module.go b/service/sharddistributor/canary/module.go index fc82740110f..287ec3729ad 100644 --- a/service/sharddistributor/canary/module.go +++ b/service/sharddistributor/canary/module.go @@ -1,7 +1,10 @@ package canary import ( + "context" + "go.uber.org/fx" + "go.uber.org/yarpc" sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" "github.com/uber/cadence/service/sharddistributor/canary/executors" @@ -49,5 +52,41 @@ func opts(names NamespacesNames) fx.Option { executors.Module(names.FixedNamespace, names.EphemeralNamespace, names.ExternalAssignmentNamespace), processorephemeral.ShardCreatorModule([]string{names.EphemeralNamespace}), + + fx.Invoke(registerExecutorLifecycle), ) } + +type lifecycleParams struct { + fx.In + Lifecycle fx.Lifecycle + Dispatcher *yarpc.Dispatcher + FixedExecutors []executorclient.Executor[*processor.ShardProcessor] `group:"executor-fixed-proc"` + EphemeralExecutors []executorclient.Executor[*processorephemeral.ShardProcessor] `group:"executor-ephemeral-proc"` +} + +func registerExecutorLifecycle(params lifecycleParams) { + params.Lifecycle.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + if err := params.Dispatcher.Start(); err != nil { + return err + } + for _, executor := range params.FixedExecutors { + executor.Start(ctx) + } + for _, executor := range params.EphemeralExecutors { + executor.Start(ctx) + } + return nil + }, + OnStop: func(ctx context.Context) error { + for _, executor := range params.FixedExecutors { + executor.Stop() + } + for _, executor := range params.EphemeralExecutors { + executor.Stop() + } + return params.Dispatcher.Stop() + }, + }) +} diff --git a/service/sharddistributor/canary/module_test.go b/service/sharddistributor/canary/module_test.go index f723c07a85d..41678220899 100644 --- a/service/sharddistributor/canary/module_test.go +++ b/service/sharddistributor/canary/module_test.go @@ -5,9 +5,11 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "github.com/uber-go/tally" "go.uber.org/fx" "go.uber.org/fx/fxtest" + ubergomock "go.uber.org/mock/gomock" "go.uber.org/yarpc" "go.uber.org/yarpc/api/transport/transporttest" "go.uber.org/yarpc/transport/grpc" @@ -16,7 +18,10 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/service/sharddistributor/canary/processor" + "github.com/uber/cadence/service/sharddistributor/canary/processorephemeral" "github.com/uber/cadence/service/sharddistributor/client/clientcommon" + "github.com/uber/cadence/service/sharddistributor/client/executorclient" ) func TestModule(t *testing.T) { @@ -50,9 +55,69 @@ func TestModule(t *testing.T) { fx.Annotate(clock.NewMockedTimeSource(), fx.As(new(clock.TimeSource))), fx.Annotate(log.NewNoop(), fx.As(new(log.Logger))), fx.Annotate(mockClientConfigProvider, fx.As(new(yarpc.ClientConfig))), + yarpc.Config{Name: "shard-distributor-canary-test"}, zaptest.NewLogger(t), config, ), + fx.Provide(yarpc.NewDispatcher), Module(NamespacesNames{FixedNamespace: "shard-distributor-canary", EphemeralNamespace: "shard-distributor-canary-ephemeral", ExternalAssignmentNamespace: "test-external-assignment", SharddistributorServiceName: "cadence-shard-distributor"}), ).RequireStart().RequireStop() } + +type mockLifecycle struct { + hookCount int +} + +func (m *mockLifecycle) Append(hook fx.Hook) { + m.hookCount++ +} + +func TestRegisterExecutorLifecycle(t *testing.T) { + ctrl := ubergomock.NewController(t) + defer ctrl.Finish() + + tests := []struct { + name string + params lifecycleParams + expectedHookCount int + }{ + { + name: "multiple executors", + params: lifecycleParams{ + Lifecycle: &mockLifecycle{}, + Dispatcher: yarpc.NewDispatcher(yarpc.Config{ + Name: "test-dispatcher", + }), + FixedExecutors: []executorclient.Executor[*processor.ShardProcessor]{ + executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl), + executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl), + }, + EphemeralExecutors: []executorclient.Executor[*processorephemeral.ShardProcessor]{ + executorclient.NewMockExecutor[*processorephemeral.ShardProcessor](ctrl), + }, + }, + expectedHookCount: 1, + }, + { + name: "no executors", + params: lifecycleParams{ + Lifecycle: &mockLifecycle{}, + Dispatcher: yarpc.NewDispatcher(yarpc.Config{ + Name: "test-dispatcher", + }), + FixedExecutors: []executorclient.Executor[*processor.ShardProcessor]{}, + EphemeralExecutors: []executorclient.Executor[*processorephemeral.ShardProcessor]{}, + }, + expectedHookCount: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockLifecycle := tt.params.Lifecycle.(*mockLifecycle) + + registerExecutorLifecycle(tt.params) + assert.Equal(t, tt.expectedHookCount, mockLifecycle.hookCount) + }) + } +} diff --git a/service/sharddistributor/client/executorclient/clientimpl.go b/service/sharddistributor/client/executorclient/clientimpl.go index 65638f2b026..666cb621e15 100644 --- a/service/sharddistributor/client/executorclient/clientimpl.go +++ b/service/sharddistributor/client/executorclient/clientimpl.go @@ -34,7 +34,8 @@ const ( ) const ( - heartbeatJitterCoeff = 0.1 // 10% jitter + heartbeatJitterCoeff = 0.1 // 10% jitter + drainingHeartbeatTimeout = 5 * time.Second ) type managedProcessor[SP ShardProcessor] struct { @@ -101,6 +102,7 @@ type executorImpl[SP ShardProcessor] struct { metrics tally.Scope migrationMode atomic.Int32 metadata syncExecutorMetadata + hasSuccessfulHeartbeat atomic.Bool } func (e *executorImpl[SP]) setMigrationMode(mode types.MigrationMode) { @@ -124,6 +126,17 @@ func (e *executorImpl[SP]) Stop() { e.logger.Info("stopping shard distributor executor", tag.ShardNamespace(e.namespace)) close(e.stopC) e.processLoopWG.Wait() + + if !e.shouldSendFinalHeartbeat() { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), e.finalHeartbeatTimeout()) + defer cancel() + + if err := e.sendDrainingHeartbeat(ctx); err != nil { + e.logger.Error("failed to send draining heartbeat", tag.Error(err)) + } } func (e *executorImpl[SP]) GetShardProcess(ctx context.Context, shardID string) (SP, error) { @@ -269,6 +282,10 @@ func (e *executorImpl[SP]) updateShardAssignmentMetered(ctx context.Context, sha } func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[string]*types.ShardAssignment, migrationMode types.MigrationMode, err error) { + return e.sendHeartbeat(ctx, types.ExecutorStatusACTIVE) +} + +func (e *executorImpl[SP]) sendHeartbeat(ctx context.Context, status types.ExecutorStatus) (map[string]*types.ShardAssignment, types.MigrationMode, error) { // Fill in the shard status reports shardStatusReports := make(map[string]*types.ShardStatusReport) e.managedProcessors.Range(func(shardID string, managedProcessor *managedProcessor[SP]) bool { @@ -289,7 +306,7 @@ func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[ request := &types.ExecutorHeartbeatRequest{ Namespace: e.namespace, ExecutorID: e.executorID, - Status: types.ExecutorStatusACTIVE, + Status: status, ShardStatusReports: shardStatusReports, Metadata: e.metadata.Get(), } @@ -299,6 +316,7 @@ func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[ if err != nil { return nil, types.MigrationModeINVALID, fmt.Errorf("send heartbeat: %w", err) } + e.hasSuccessfulHeartbeat.Store(true) previousMode := e.getMigrationMode() currentMode := response.MigrationMode @@ -314,6 +332,22 @@ func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[ return response.ShardAssignments, response.MigrationMode, nil } +func (e *executorImpl[SP]) sendDrainingHeartbeat(ctx context.Context) error { + _, _, err := e.sendHeartbeat(ctx, types.ExecutorStatusDRAINING) + return err +} + +func (e *executorImpl[SP]) shouldSendFinalHeartbeat() bool { + return e.shardDistributorClient != nil && e.hasSuccessfulHeartbeat.Load() +} + +func (e *executorImpl[SP]) finalHeartbeatTimeout() time.Duration { + if e.heartBeatInterval > 0 && e.heartBeatInterval < drainingHeartbeatTimeout { + return e.heartBeatInterval + } + return drainingHeartbeatTimeout +} + func (e *executorImpl[SP]) updateShardAssignment(ctx context.Context, shardAssignments map[string]*types.ShardAssignment) { wg := sync.WaitGroup{} diff --git a/service/sharddistributor/client/executorclient/clientimpl_test.go b/service/sharddistributor/client/executorclient/clientimpl_test.go index 6c260961449..ac937280d83 100755 --- a/service/sharddistributor/client/executorclient/clientimpl_test.go +++ b/service/sharddistributor/client/executorclient/clientimpl_test.go @@ -11,6 +11,7 @@ import ( "github.com/uber-go/tally" "go.uber.org/goleak" "go.uber.org/mock/gomock" + "go.uber.org/yarpc" "github.com/uber/cadence/client/sharddistributorexecutor" "github.com/uber/cadence/common/clock" @@ -29,21 +30,33 @@ func TestHeartBeartLoop(t *testing.T) { mockShardDistributorClient := sharddistributorexecutor.NewMockClient(ctrl) // We expect nothing is assigned to the executor, and we assign two shards to it - mockShardDistributorClient.EXPECT().Heartbeat(gomock.Any(), - &types.ExecutorHeartbeatRequest{ - Namespace: "test-namespace", - ExecutorID: "test-executor-id", - Status: types.ExecutorStatusACTIVE, - ShardStatusReports: make(map[string]*types.ShardStatusReport), - Metadata: make(map[string]string), - }, gomock.Any()). - Return(&types.ExecutorHeartbeatResponse{ - ShardAssignments: map[string]*types.ShardAssignment{ - "test-shard-id1": {Status: types.AssignmentStatusREADY}, - "test-shard-id2": {Status: types.AssignmentStatusREADY}, - }, - MigrationMode: types.MigrationModeONBOARDED, - }, nil) + callCount := 0 + mockShardDistributorClient.EXPECT().Heartbeat(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, req *types.ExecutorHeartbeatRequest, _ ...yarpc.CallOption) (*types.ExecutorHeartbeatResponse, error) { + callCount++ + switch callCount { + case 1: + assert.Equal(t, "test-namespace", req.Namespace) + assert.Equal(t, "test-executor-id", req.ExecutorID) + assert.Equal(t, types.ExecutorStatusACTIVE, req.Status) + assert.Empty(t, req.ShardStatusReports) + assert.Empty(t, req.Metadata) + return &types.ExecutorHeartbeatResponse{ + ShardAssignments: map[string]*types.ShardAssignment{ + "test-shard-id1": {Status: types.AssignmentStatusREADY}, + "test-shard-id2": {Status: types.AssignmentStatusREADY}, + }, + MigrationMode: types.MigrationModeONBOARDED, + }, nil + case 2: + assert.Equal(t, types.ExecutorStatusDRAINING, req.Status) + assert.Empty(t, req.ShardStatusReports) + return &types.ExecutorHeartbeatResponse{}, nil + default: + t.Fatalf("unexpected heartbeat call: %d", callCount) + return nil, nil + } + }).Times(2) // The two shards are assigned to the executor, so we expect them to be created, started and stopped mockShardProcessor1 := NewMockShardProcessor(ctrl) @@ -477,13 +490,28 @@ func TestHeartbeatLoop_LocalPassthroughShadow_SkipsAssignment(t *testing.T) { mockShardDistributorClient := sharddistributorexecutor.NewMockClient(ctrl) // Heartbeat should be called but assignment should not be applied + callCount := 0 mockShardDistributorClient.EXPECT().Heartbeat(gomock.Any(), gomock.Any(), gomock.Any()). - Return(&types.ExecutorHeartbeatResponse{ - ShardAssignments: map[string]*types.ShardAssignment{ - "test-shard-id1": {Status: types.AssignmentStatusREADY}, - }, - MigrationMode: types.MigrationModeLOCALPASSTHROUGHSHADOW, - }, nil) + DoAndReturn(func(_ context.Context, req *types.ExecutorHeartbeatRequest, _ ...yarpc.CallOption) (*types.ExecutorHeartbeatResponse, error) { + callCount++ + switch callCount { + case 1: + assert.Equal(t, types.ExecutorStatusACTIVE, req.Status) + return &types.ExecutorHeartbeatResponse{ + ShardAssignments: map[string]*types.ShardAssignment{ + "test-shard-id1": {Status: types.AssignmentStatusREADY}, + }, + MigrationMode: types.MigrationModeLOCALPASSTHROUGHSHADOW, + }, nil + case 2: + assert.Equal(t, types.ExecutorStatusDRAINING, req.Status) + assert.Empty(t, req.ShardStatusReports) + return &types.ExecutorHeartbeatResponse{}, nil + default: + t.Fatalf("unexpected heartbeat call count: %d", callCount) + return nil, nil + } + }).Times(2) mockShardProcessorFactory := NewMockShardProcessorFactory[*MockShardProcessor](ctrl) // No shard processor should be created @@ -524,13 +552,28 @@ func TestHeartbeatLoop_DistributedPassthrough_AppliesAssignment(t *testing.T) { ctrl := gomock.NewController(t) mockShardDistributorClient := sharddistributorexecutor.NewMockClient(ctrl) + callCount := 0 mockShardDistributorClient.EXPECT().Heartbeat(gomock.Any(), gomock.Any(), gomock.Any()). - Return(&types.ExecutorHeartbeatResponse{ - ShardAssignments: map[string]*types.ShardAssignment{ - "test-shard-id1": {Status: types.AssignmentStatusREADY}, - }, - MigrationMode: types.MigrationModeDISTRIBUTEDPASSTHROUGH, - }, nil) + DoAndReturn(func(_ context.Context, req *types.ExecutorHeartbeatRequest, _ ...yarpc.CallOption) (*types.ExecutorHeartbeatResponse, error) { + callCount++ + switch callCount { + case 1: + assert.Equal(t, types.ExecutorStatusACTIVE, req.Status) + return &types.ExecutorHeartbeatResponse{ + ShardAssignments: map[string]*types.ShardAssignment{ + "test-shard-id1": {Status: types.AssignmentStatusREADY}, + }, + MigrationMode: types.MigrationModeDISTRIBUTEDPASSTHROUGH, + }, nil + case 2: + assert.Equal(t, types.ExecutorStatusDRAINING, req.Status) + assert.Empty(t, req.ShardStatusReports) + return &types.ExecutorHeartbeatResponse{}, nil + default: + t.Fatalf("unexpected heartbeat call count: %d", callCount) + return nil, nil + } + }).Times(2) mockShardProcessor := NewMockShardProcessor(ctrl) mockShardProcessor.EXPECT().Start(gomock.Any()) @@ -569,6 +612,24 @@ func TestHeartbeatLoop_DistributedPassthrough_AppliesAssignment(t *testing.T) { assert.Equal(t, mockShardProcessor, processor) } +func TestStopWithoutHeartbeatDoesNotSendDraining(t *testing.T) { + ctrl := gomock.NewController(t) + mockShardDistributorClient := sharddistributorexecutor.NewMockClient(ctrl) + + mockShardDistributorClient.EXPECT().Heartbeat(gomock.Any(), gomock.Any(), gomock.Any()).Times(0) + + executor := &executorImpl[*MockShardProcessor]{ + logger: log.NewNoop(), + shardDistributorClient: mockShardDistributorClient, + namespace: "test-namespace", + executorID: "test-executor-id", + metrics: tally.NoopScope, + stopC: make(chan struct{}), + } + + executor.Stop() +} + func TestCompareAssignments_Converged(t *testing.T) { ctrl := gomock.NewController(t)