Skip to content

[Feature] Unify agency access #1024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- (Feature) Add `ArangoLocalStorage` CRD auto-installer
- (Feature) Add `ArangoDeploymentReplication` CRD auto-installer
- (Bugfix) Allow missing `token` key in License secret
- (Feature) Unify agency access

## [1.2.13](https://github.com/arangodb/kube-arangodb/tree/1.2.13) (2022-06-07)
- (Bugfix) Fix arangosync members state inspection
Expand Down
178 changes: 89 additions & 89 deletions pkg/deployment/agency/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,57 +22,85 @@ package agency

import (
"context"
"fmt"
"sync"
"time"

"github.com/arangodb/go-driver/agency"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
)

type health struct {
leaderID string

agencySize int

names []string
commitIndexes map[string]uint64
leaders map[string]string
election map[string]int
}

func (h health) LeaderID() string {
return h.leaderID
}

// IsHealthy returns true if all agencies have the same commit index.
// Returns false when:
// - agencies' list is empty.
// - agencies have different commit indices.
// - agencies have commit indices == 0.
func (h health) IsHealthy() bool {
var globalCommitIndex uint64
first := true

for _, commitIndex := range h.commitIndexes {
if first {
globalCommitIndex = commitIndex
first = false
} else if commitIndex != globalCommitIndex {
return false
// Healthy returns nil if all agencies have the same commit index.
func (h health) Healthy() error {
if err := h.Serving(); err != nil {
return err
}

if h.election[h.leaderID] != h.agencySize {
return errors.Newf("Not all agents are in quorum")
}

index := h.commitIndexes[h.leaderID]
if index == 0 {
return errors.Newf("Agency CommitIndex is zero")
}

for k, v := range h.commitIndexes {
if v != index {
return errors.Newf("Agent %s is behind in CommitIndex", k)
}
}

return globalCommitIndex != 0
return nil
}

func (h health) Serving() error {
if h.agencySize == 0 {
return errors.Newf("Empty agents list")
}

if len(h.election) == 0 {
return errors.Newf("No Leader")
} else if len(h.election) > 1 {
return errors.Newf("Multiple leaders")
}

if len(h.leaders) <= h.agencySize/2 {
return errors.Newf("Quorum is not present")
}

return nil
}

// Health describes interface to check healthy of the environment.
type Health interface {
// IsHealthy return true when environment is considered as healthy.
IsHealthy() bool
// Healthy return nil when environment is considered as healthy.
Healthy() error

// Serving return nil when environment is considered as responsive, but not fully healthy.
Serving() error

// LeaderID returns a leader ID or empty string if a leader is not known.
LeaderID() string
}

type Cache interface {
Reload(ctx context.Context, clients []agency.Agency) (uint64, error)
Reload(ctx context.Context, size int, clients []agency.Agency) (uint64, error)
Data() (State, bool)
CommitIndex() uint64
// Health returns true when healthy object is available.
Expand Down Expand Up @@ -107,7 +135,7 @@ func (c cacheSingle) Health() (Health, bool) {
return nil, false
}

func (c cacheSingle) Reload(_ context.Context, _ []agency.Agency) (uint64, error) {
func (c cacheSingle) Reload(_ context.Context, _ int, _ []agency.Agency) (uint64, error) {
return 0, nil
}

Expand Down Expand Up @@ -153,15 +181,16 @@ func (c *cache) Health() (Health, bool) {
return nil, false
}

func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, error) {
func (c *cache) Reload(ctx context.Context, size int, clients []agency.Agency) (uint64, error) {
c.lock.Lock()
defer c.lock.Unlock()

leaderCli, leaderConfig, health, err := getLeader(ctx, clients)
leaderCli, leaderConfig, health, err := getLeader(ctx, size, clients)
if err != nil {
// Invalidate a leader ID and agency state.
// In the next iteration leaderID will be sat because `valid` will be false.
c.valid = false
c.health = nil

return 0, err
}
Expand All @@ -186,91 +215,62 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er

// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
// If there is no quorum for the leader then error is returned.
func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *Config, Health, error) {
var mutex sync.Mutex
var anyError error
var wg sync.WaitGroup
func getLeader(ctx context.Context, size int, clients []agency.Agency) (agency.Agency, *Config, Health, error) {
configs := make([]*Config, len(clients))
errs := make([]error, len(clients))

cliLen := len(clients)
if cliLen == 0 {
return nil, nil, nil, errors.New("empty list of agencies' clients")
}
configs := make([]*Config, cliLen)
leaders := make(map[string]int, cliLen)

var h health
var wg sync.WaitGroup

h.commitIndexes = make(map[string]uint64, cliLen)
// Fetch all configs from agencies.
wg.Add(cliLen)
for i, cli := range clients {
go func(iLocal int, cliLocal agency.Agency) {
// Fetch Agency config
for i := range clients {
wg.Add(1)
go func(id int) {
defer wg.Done()

ctxLocal, cancel := context.WithTimeout(ctx, time.Second)
ctxLocal, cancel := globals.GetGlobals().Timeouts().Agency().WithTimeout(ctx)
defer cancel()
config, err := GetAgencyConfig(ctxLocal, cliLocal)

mutex.Lock()
defer mutex.Unlock()
config, err := GetAgencyConfig(ctxLocal, clients[id])

if err != nil {
anyError = err
return
} else if config == nil || config.LeaderId == "" {
anyError = fmt.Errorf("leader unknown for the agent %v", cliLocal.Connection().Endpoints())
errs[id] = err
return
}

// Write config on the same index where client is (It will be helpful later).
configs[iLocal] = config
// Count leaders.
leaders[config.LeaderId]++
h.commitIndexes[config.Configuration.ID] = config.CommitIndex
}(i, cli)
configs[id] = config
}(i)
}
wg.Wait()

if anyError != nil {
return nil, nil, nil, wrapError(anyError, "not all agencies are responsive")
}

if len(leaders) == 0 {
return nil, nil, nil, wrapError(anyError, "failed to get config from agencies")
}

// Find the leader ID which has the most votes from all agencies.
maxVotes := 0
var leaderID string
for id, votes := range leaders {
if votes > maxVotes {
maxVotes = votes
leaderID = id
var h health
h.agencySize = size
h.names = make([]string, len(clients))
h.commitIndexes = make(map[string]uint64, len(clients))
h.leaders = make(map[string]string, len(clients))
h.election = make(map[string]int, len(clients))

for id := range configs {
if config := configs[id]; config != nil {
name := config.Configuration.ID
h.names[id] = name
h.commitIndexes[name] = config.CommitIndex
if config.LeaderId != "" {
h.leaders[name] = config.LeaderId
h.election[config.LeaderId]++
h.leaderID = config.LeaderId
}
}
}

h.leaderID = leaderID

// Check if a leader has quorum from all possible agencies.
if maxVotes <= cliLen/2 {
message := fmt.Sprintf("no quorum for leader %s, votes %d of %d", leaderID, maxVotes, cliLen)
return nil, nil, nil, wrapError(anyError, message)
if err := h.Serving(); err != nil {
return nil, nil, nil, err
}

// From here on, a leader with quorum is known.
for i, config := range configs {
if config != nil && config.Configuration.ID == leaderID {
return clients[i], config, h, nil
for id := range clients {
if h.leaderID == h.names[id] {
return clients[id], configs[id], h, nil
}
}

return nil, nil, nil, wrapError(anyError, "the leader is not responsive")
}

func wrapError(err error, message string) error {
if err != nil {
return errors.WithMessage(err, message)
}

return errors.New(message)
return nil, nil, nil, errors.Newf("Unable to find agent")
}
2 changes: 1 addition & 1 deletion pkg/deployment/agency/current_collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ type StateCurrentDBCollections map[string]StateCurrentDBCollection
type StateCurrentDBCollection map[string]StateCurrentDBShard

type StateCurrentDBShard struct {
Servers ShardServers `json:"servers,omitempty"`
Servers Servers `json:"servers,omitempty"`
}
7 changes: 7 additions & 0 deletions pkg/deployment/agency/definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ const (

SupervisionKey = "Supervision"
SupervisionMaintenanceKey = "Maintenance"

TargetJobToDoKey = "ToDo"
TargetJobPendingKey = "Pending"
TargetJobFailedKey = "Failed"
TargetJobFinishedKey = "Finished"

TargetCleanedServersKey = "CleanedServers"
)

func GetAgencyKey(parts ...string) string {
Expand Down
Loading