Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions pkg/epp/scheduling/framework/plugins/picker/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ limitations under the License.

package picker

import (
"math/rand/v2"
"time"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
)

const (
DefaultMaxNumOfEndpoints = 1 // common default to all pickers
)
Expand All @@ -24,3 +31,14 @@ const (
type pickerParameters struct {
MaxNumOfEndpoints int `json:"maxNumOfEndpoints"`
}

func shuffleScoredPods(scoredPods []*types.ScoredPod) {
// Rand package is not safe for concurrent use, so we create a new instance.
// Source: https://pkg.go.dev/math/rand/v2#pkg-overview
randomGenerator := rand.New(rand.NewPCG(uint64(time.Now().UnixNano()), 0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you compared using sync.Pool vs creating a new random source every time?
I don't expect much latency difference but a sync.Pool would reduce GC pressure as these are created and collected per request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func shuffleScoredPods(scoredPods []*types.ScoredPod) {
	// Rand package is not safe for concurrent use, so we create a new instance.
	// Source: https://pkg.go.dev/math/rand/v2#pkg-overview
	randomGenerator := rand.New(rand.NewPCG(uint64(time.Now().UnixNano()), 0))

	// Shuffle in-place
	randomGenerator.Shuffle(len(scoredPods), func(i, j int) {
		scoredPods[i], scoredPods[j] = scoredPods[j], scoredPods[i]
	})
}

var randPool = sync.Pool{
	New: func() interface{} {
		return rand.New(rand.NewPCG(uint64(time.Now().UnixNano()), 0))
	},
}

func shuffleScoredPodsWithPool(scoredPods []*types.ScoredPod) {
	randomGenerator := randPool.Get().(*rand.Rand)
	defer randPool.Put(randomGenerator) 

	// Shuffle in-place
	randomGenerator.Shuffle(len(scoredPods), func(i, j int) {
		scoredPods[i], scoredPods[j] = scoredPods[j], scoredPods[i]
	})
}
func createTestScoredPods(count int) []*types.ScoredPod {
	pods := make([]*types.ScoredPod, count)
	for i := 0; i < count; i++ {
		pods[i] = &types.ScoredPod{
			Pod: &types.PodMetrics{
				Pod: &backend.Pod{
					NamespacedName: k8stypes.NamespacedName{
						Name:      "pod-" + strconv.Itoa(i),
						Namespace: "namespace-" + strconv.Itoa(i),
					},
				},
			},
			Score: float64(i),
		}
	}
	return pods
}

func BenchmarkShuffleScoredPods(b *testing.B) {
	testPods := createTestScoredPods(100)
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		podsCopy := make([]*types.ScoredPod, len(testPods))
		copy(podsCopy, testPods)

		shuffleScoredPods(podsCopy)
	}
}

func BenchmarkShuffleScoredPodsWithPool(b *testing.B) {
	testPods := createTestScoredPods(100)
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		podsCopy := make([]*types.ScoredPod, len(testPods))
		copy(podsCopy, testPods)

		shuffleScoredPodsWithPool(podsCopy)
	}
}

func BenchmarkShuffleScoredPods_Small(b *testing.B) {
	testPods := createTestScoredPods(10)
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		podsCopy := make([]*types.ScoredPod, len(testPods))
		copy(podsCopy, testPods)

		shuffleScoredPods(podsCopy)
	}
}

func BenchmarkShuffleScoredPodsWithPool_Small(b *testing.B) {
	testPods := createTestScoredPods(10)
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		podsCopy := make([]*types.ScoredPod, len(testPods))
		copy(podsCopy, testPods)

		shuffleScoredPodsWithPool(podsCopy)
	}
}

func BenchmarkShuffleScoredPods_Large(b *testing.B) {
	testPods := createTestScoredPods(1000)
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		podsCopy := make([]*types.ScoredPod, len(testPods))
		copy(podsCopy, testPods)

		shuffleScoredPods(podsCopy)
	}
}

func BenchmarkShuffleScoredPodsWithPool_Large(b *testing.B) {
	testPods := createTestScoredPods(1000)
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		podsCopy := make([]*types.ScoredPod, len(testPods))
		copy(podsCopy, testPods)

		shuffleScoredPodsWithPool(podsCopy)
	}
}

I have tested it locally and the test result is as follows. If there is any problem, please point it out. If the test results are accurate, do you think it needs to be optimized?

host@hostdeMacBook-Pro picker % go test -bench=. -benchmem
goos: darwin
goarch: amd64
pkg: sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker
cpu: Intel(R) Core(TM) i5-1038NG7 CPU @ 2.00GHz
BenchmarkShuffleScoredPods-8                      911576              1245 ns/op             912 B/op          2 allocs/op
BenchmarkShuffleScoredPodsWithPool-8             1000000              1130 ns/op             896 B/op          1 allocs/op
BenchmarkShuffleScoredPods_Small-8               4351080               247.4 ns/op            96 B/op          2 allocs/op
BenchmarkShuffleScoredPodsWithPool_Small-8       7934428               139.8 ns/op            80 B/op          1 allocs/op
BenchmarkShuffleScoredPods_Large-8                115184             10405 ns/op            8208 B/op          2 allocs/op
BenchmarkShuffleScoredPodsWithPool_Large-8        115098             10628 ns/op            8195 B/op          1 allocs/op
PASS
ok      sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework/plugins/picker 7.652s


// Shuffle in-place
randomGenerator.Shuffle(len(scoredPods), func(i, j int) {
scoredPods[i], scoredPods[j] = scoredPods[j], scoredPods[i]
})
}
12 changes: 1 addition & 11 deletions pkg/epp/scheduling/framework/plugins/picker/max_score_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"slices"
"time"

"sigs.k8s.io/controller-runtime/pkg/log"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please keep import groups separated and not combine controller-runtime (3rd-party/ external) with gateway-api-inference-extension (local/internal)

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
Expand Down Expand Up @@ -85,15 +82,8 @@ func (p *MaxScorePicker) Pick(ctx context.Context, cycleState *types.CycleState,
log.FromContext(ctx).V(logutil.DEBUG).Info("Selecting pods from candidates sorted by max score", "max-num-of-endpoints", p.maxNumOfEndpoints,
"num-of-candidates", len(scoredPods), "scored-pods", scoredPods)

// TODO: merge this with the logic in RandomPicker
// Rand package is not safe for concurrent use, so we create a new instance.
// Source: https://pkg.go.dev/math/rand#pkg-overview
randomGenerator := rand.New(rand.NewSource(time.Now().UnixNano()))

// Shuffle in-place - needed for random tie break when scores are equal
randomGenerator.Shuffle(len(scoredPods), func(i, j int) {
scoredPods[i], scoredPods[j] = scoredPods[j], scoredPods[i]
})
shuffleScoredPods(scoredPods)

slices.SortStableFunc(scoredPods, func(i, j *types.ScoredPod) int { // highest score first
if i.Score > j.Score {
Expand Down
12 changes: 1 addition & 11 deletions pkg/epp/scheduling/framework/plugins/picker/random_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,8 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"time"

"sigs.k8s.io/controller-runtime/pkg/log"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
Expand Down Expand Up @@ -84,15 +81,8 @@ func (p *RandomPicker) Pick(ctx context.Context, _ *types.CycleState, scoredPods
log.FromContext(ctx).V(logutil.DEBUG).Info("Selecting pods from candidates randomly", "max-num-of-endpoints", p.maxNumOfEndpoints,
"num-of-candidates", len(scoredPods), "scored-pods", scoredPods)

// TODO: merge this with the logic in MaxScorePicker
// Rand package is not safe for concurrent use, so we create a new instance.
// Source: https://pkg.go.dev/math/rand#pkg-overview
randomGenerator := rand.New(rand.NewSource(time.Now().UnixNano()))

// Shuffle in-place
randomGenerator.Shuffle(len(scoredPods), func(i, j int) {
scoredPods[i], scoredPods[j] = scoredPods[j], scoredPods[i]
})
shuffleScoredPods(scoredPods)

// if we have enough pods to return keep only the relevant subset
if p.maxNumOfEndpoints < len(scoredPods) {
Expand Down