Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions cmd/sharddistributor-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
fx.Provide(zap.NewDevelopment),
fx.Provide(log.NewLogger),

// Start the YARPC dispatcher
fx.Invoke(func(lc fx.Lifecycle, dispatcher *yarpc.Dispatcher) {
lc.Append(fx.StartStopHook(dispatcher.Start, dispatcher.Stop))
}),

// Include the canary module
canary.Module(canary.NamespacesNames{FixedNamespace: fixedNamespace, EphemeralNamespace: ephemeralNamespace, ExternalAssignmentNamespace: executors.ExternalAssignmentNamespace, SharddistributorServiceName: shardDistributorServiceName}),
)
Expand Down
17 changes: 0 additions & 17 deletions service/sharddistributor/canary/executors/executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,6 @@ func NewExecutorExternalAssignmentNamespace(params executorclient.Params[*proces
return ExecutorEphemeralResult{Executor: executor}, assigner, err
}

type ExecutorsParams struct {
fx.In
Lc fx.Lifecycle
ExecutorsFixed []executorclient.Executor[*processor.ShardProcessor] `group:"executor-fixed-proc"`
Executorsephemeral []executorclient.Executor[*processorephemeral.ShardProcessor] `group:"executor-ephemeral-proc"`
}

func NewExecutorsModule(params ExecutorsParams) {
for _, e := range params.ExecutorsFixed {
params.Lc.Append(fx.StartStopHook(e.Start, e.Stop))
}
for _, e := range params.Executorsephemeral {
params.Lc.Append(fx.StartStopHook(e.Start, e.Stop))
}
}

func Module(fixedNamespace, ephemeralNamespace, externalAssignmentNamespace string) fx.Option {
return fx.Module(
"Executors",
Expand All @@ -104,6 +88,5 @@ func Module(fixedNamespace, ephemeralNamespace, externalAssignmentNamespace stri
lifecycle.Append(fx.StartStopHook(shardAssigner.Start, shardAssigner.Stop))
}),
),
fx.Invoke(NewExecutorsModule),
)
}
55 changes: 0 additions & 55 deletions service/sharddistributor/canary/executors/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"go.uber.org/fx"
"go.uber.org/mock/gomock"

"github.com/uber/cadence/client/sharddistributor"
Expand All @@ -20,14 +19,6 @@ import (
)

// mockLifecycle is a simple mock implementation of fx.Lifecycle for testing
type mockLifecycle struct {
hookCount int
}

func (m *mockLifecycle) Append(hook fx.Hook) {
m.hookCount++
}

func TestNewExecutorsFixedNamespace(t *testing.T) {
ctrl := gomock.NewController(t)
tests := []struct {
Expand Down Expand Up @@ -163,52 +154,6 @@ func TestNewExecutor_InvalidConfig(t *testing.T) {
}
}

func TestNewExecutorsModule(t *testing.T) {
ctrl := gomock.NewController(t)
// Create a mock lifecycle
tests := []struct {
name string
params ExecutorsParams
expectedInvocation int
}{
{
name: "multiple executors",
params: ExecutorsParams{
ExecutorsFixed: []executorclient.Executor[*processor.ShardProcessor]{
executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl),
executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl),
},
Executorsephemeral: []executorclient.Executor[*processorephemeral.ShardProcessor]{
executorclient.NewMockExecutor[*processorephemeral.ShardProcessor](ctrl),
},
},
expectedInvocation: 3,
},
{
name: "no executors",
params: ExecutorsParams{
ExecutorsFixed: []executorclient.Executor[*processor.ShardProcessor]{},
Executorsephemeral: []executorclient.Executor[*processorephemeral.ShardProcessor]{},
},
expectedInvocation: 0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockLifecycle := &mockLifecycle{}
tt.params.Lc = mockLifecycle
// Call NewExecutorsModule - it should not panic or error
// The function doesn't return anything, so we just verify it executes successfully
require.NotPanics(t, func() {
NewExecutorsModule(tt.params)
})
// Verify that lifecycle hooks were registered for all executors
assert.Equal(t, tt.expectedInvocation, mockLifecycle.hookCount)
})
}
}

// Helper functions to create mock parameters
func createMockParams[SP executorclient.ShardProcessor](
ctrl *gomock.Controller,
Expand Down
39 changes: 39 additions & 0 deletions service/sharddistributor/canary/module.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package canary

import (
"context"

"go.uber.org/fx"
"go.uber.org/yarpc"

sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
"github.com/uber/cadence/service/sharddistributor/canary/executors"
Expand Down Expand Up @@ -49,5 +52,41 @@ func opts(names NamespacesNames) fx.Option {
executors.Module(names.FixedNamespace, names.EphemeralNamespace, names.ExternalAssignmentNamespace),

processorephemeral.ShardCreatorModule([]string{names.EphemeralNamespace}),

fx.Invoke(registerExecutorLifecycle),
)
}

type lifecycleParams struct {
fx.In
Lifecycle fx.Lifecycle
Dispatcher *yarpc.Dispatcher
FixedExecutors []executorclient.Executor[*processor.ShardProcessor] `group:"executor-fixed-proc"`
EphemeralExecutors []executorclient.Executor[*processorephemeral.ShardProcessor] `group:"executor-ephemeral-proc"`
}

func registerExecutorLifecycle(params lifecycleParams) {
params.Lifecycle.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
if err := params.Dispatcher.Start(); err != nil {
return err
}
for _, executor := range params.FixedExecutors {
executor.Start(ctx)
}
for _, executor := range params.EphemeralExecutors {
executor.Start(ctx)
}
return nil
},
OnStop: func(ctx context.Context) error {
for _, executor := range params.FixedExecutors {
executor.Stop()
}
for _, executor := range params.EphemeralExecutors {
executor.Stop()
}
return params.Dispatcher.Stop()
},
})
}
Comment on lines +68 to +92
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite unexpected to canary needs to explicitely .Start and .Stop executors + care about order.
Does that mean other clients (executors) should do that as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

65 changes: 65 additions & 0 deletions service/sharddistributor/canary/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/uber-go/tally"
"go.uber.org/fx"
"go.uber.org/fx/fxtest"
ubergomock "go.uber.org/mock/gomock"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think we need both "github.com/golang/mock/gomock" and "go.uber.org/mock/gomock".
I suggest paying attention every time there is an import alias, in lots of cases it is questionable.

"go.uber.org/yarpc"
"go.uber.org/yarpc/api/transport/transporttest"
"go.uber.org/yarpc/transport/grpc"
Expand All @@ -16,7 +18,10 @@ import (

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/service/sharddistributor/canary/processor"
"github.com/uber/cadence/service/sharddistributor/canary/processorephemeral"
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
)

func TestModule(t *testing.T) {
Expand Down Expand Up @@ -50,9 +55,69 @@ func TestModule(t *testing.T) {
fx.Annotate(clock.NewMockedTimeSource(), fx.As(new(clock.TimeSource))),
fx.Annotate(log.NewNoop(), fx.As(new(log.Logger))),
fx.Annotate(mockClientConfigProvider, fx.As(new(yarpc.ClientConfig))),
yarpc.Config{Name: "shard-distributor-canary-test"},
zaptest.NewLogger(t),
config,
),
fx.Provide(yarpc.NewDispatcher),
Module(NamespacesNames{FixedNamespace: "shard-distributor-canary", EphemeralNamespace: "shard-distributor-canary-ephemeral", ExternalAssignmentNamespace: "test-external-assignment", SharddistributorServiceName: "cadence-shard-distributor"}),
).RequireStart().RequireStop()
}

type mockLifecycle struct {
hookCount int
}

func (m *mockLifecycle) Append(hook fx.Hook) {
m.hookCount++
}

func TestRegisterExecutorLifecycle(t *testing.T) {
ctrl := ubergomock.NewController(t)
defer ctrl.Finish()

tests := []struct {
name string
params lifecycleParams
expectedHookCount int
}{
{
name: "multiple executors",
params: lifecycleParams{
Lifecycle: &mockLifecycle{},
Dispatcher: yarpc.NewDispatcher(yarpc.Config{
Name: "test-dispatcher",
}),
FixedExecutors: []executorclient.Executor[*processor.ShardProcessor]{
executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl),
executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl),
},
EphemeralExecutors: []executorclient.Executor[*processorephemeral.ShardProcessor]{
executorclient.NewMockExecutor[*processorephemeral.ShardProcessor](ctrl),
},
},
expectedHookCount: 1,
},
{
name: "no executors",
params: lifecycleParams{
Lifecycle: &mockLifecycle{},
Dispatcher: yarpc.NewDispatcher(yarpc.Config{
Name: "test-dispatcher",
}),
FixedExecutors: []executorclient.Executor[*processor.ShardProcessor]{},
EphemeralExecutors: []executorclient.Executor[*processorephemeral.ShardProcessor]{},
},
expectedHookCount: 1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockLifecycle := tt.params.Lifecycle.(*mockLifecycle)

registerExecutorLifecycle(tt.params)
assert.Equal(t, tt.expectedHookCount, mockLifecycle.hookCount)
Comment on lines +117 to +120
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend checking .Start and .Stop calls of executor.
Otherwise it's not even clear why we compare it with expectedHookCount = 1 both times.
btw, I wonder if fxtest would help with such testing since essentially you testing uberfx integration.

})
}
}
38 changes: 36 additions & 2 deletions service/sharddistributor/client/executorclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ const (
)

const (
heartbeatJitterCoeff = 0.1 // 10% jitter
heartbeatJitterCoeff = 0.1 // 10% jitter
drainingHeartbeatTimeout = 5 * time.Second
)

type managedProcessor[SP ShardProcessor] struct {
Expand Down Expand Up @@ -101,6 +102,7 @@ type executorImpl[SP ShardProcessor] struct {
metrics tally.Scope
migrationMode atomic.Int32
metadata syncExecutorMetadata
hasSuccessfulHeartbeat atomic.Bool
}

func (e *executorImpl[SP]) setMigrationMode(mode types.MigrationMode) {
Expand All @@ -124,6 +126,17 @@ func (e *executorImpl[SP]) Stop() {
e.logger.Info("stopping shard distributor executor", tag.ShardNamespace(e.namespace))
close(e.stopC)
e.processLoopWG.Wait()

if !e.shouldSendFinalHeartbeat() {
return
}

ctx, cancel := context.WithTimeout(context.Background(), e.finalHeartbeatTimeout())
defer cancel()

if err := e.sendDrainingHeartbeat(ctx); err != nil {
e.logger.Error("failed to send draining heartbeat", tag.Error(err))
}
}

func (e *executorImpl[SP]) GetShardProcess(ctx context.Context, shardID string) (SP, error) {
Expand Down Expand Up @@ -269,6 +282,10 @@ func (e *executorImpl[SP]) updateShardAssignmentMetered(ctx context.Context, sha
}

func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[string]*types.ShardAssignment, migrationMode types.MigrationMode, err error) {
return e.sendHeartbeat(ctx, types.ExecutorStatusACTIVE)
}

func (e *executorImpl[SP]) sendHeartbeat(ctx context.Context, status types.ExecutorStatus) (map[string]*types.ShardAssignment, types.MigrationMode, error) {
// Fill in the shard status reports
shardStatusReports := make(map[string]*types.ShardStatusReport)
e.managedProcessors.Range(func(shardID string, managedProcessor *managedProcessor[SP]) bool {
Expand All @@ -289,7 +306,7 @@ func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[
request := &types.ExecutorHeartbeatRequest{
Namespace: e.namespace,
ExecutorID: e.executorID,
Status: types.ExecutorStatusACTIVE,
Status: status,
ShardStatusReports: shardStatusReports,
Metadata: e.metadata.Get(),
}
Expand All @@ -299,6 +316,7 @@ func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[
if err != nil {
return nil, types.MigrationModeINVALID, fmt.Errorf("send heartbeat: %w", err)
}
e.hasSuccessfulHeartbeat.Store(true)

previousMode := e.getMigrationMode()
currentMode := response.MigrationMode
Expand All @@ -314,6 +332,22 @@ func (e *executorImpl[SP]) heartbeat(ctx context.Context) (shardAssignments map[
return response.ShardAssignments, response.MigrationMode, nil
}

func (e *executorImpl[SP]) sendDrainingHeartbeat(ctx context.Context) error {
_, _, err := e.sendHeartbeat(ctx, types.ExecutorStatusDRAINING)
return err
}

func (e *executorImpl[SP]) shouldSendFinalHeartbeat() bool {
return e.shardDistributorClient != nil && e.hasSuccessfulHeartbeat.Load()
}

func (e *executorImpl[SP]) finalHeartbeatTimeout() time.Duration {
if e.heartBeatInterval > 0 && e.heartBeatInterval < drainingHeartbeatTimeout {
return e.heartBeatInterval
Comment on lines +345 to +346
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if e.heartBeatInterval > 0 { return min(e.heartBeatInterval, drainingHeartbeatTimeout) }

Btw. is that possible e.heartBeatInterval == 0? It means no heartbeat?

}
return drainingHeartbeatTimeout
}

func (e *executorImpl[SP]) updateShardAssignment(ctx context.Context, shardAssignments map[string]*types.ShardAssignment) {
wg := sync.WaitGroup{}

Expand Down
Loading
Loading