Skip to content

Commit cc30945

Browse files
committed
Implementing LLMServerPool controller and Pod controller
1 parent 9270ff6 commit cc30945

File tree

9 files changed

+265
-46
lines changed

9 files changed

+265
-46
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Dockerfile.cross
1515

1616
# Go workspace file
1717
go.work
18+
go.work.sum
1819

1920
# Kubernetes Generated files - skip generated files, except for vendored files
2021
!vendor/**/zz_generated.*

pkg/ext-proc/Dockerfile renamed to Dockerfile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
## Multistage build
2-
FROM golang:1.22.5-alpine as build
2+
FROM golang:1.23-alpine as build
33
ENV CGO_ENABLED=0
44
ENV GOOS=linux
55
ENV GOARCH=amd64
66

77
WORKDIR /src
88
COPY . .
9+
WORKDIR /src/pkg/ext-proc
910
RUN go mod download
1011
RUN go build -o /ext-proc
1112
FROM alpine:latest
@@ -16,4 +17,4 @@ FROM gcr.io/distroless/base-debian10
1617
WORKDIR /
1718
COPY --from=build /ext-proc /ext-proc
1819

19-
ENTRYPOINT ["/ext-proc"]
20+
ENTRYPOINT ["/ext-proc"]

examples/poc/ext-proc/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module ext-proc
1+
module ext-proc-poc
22

33
go 1.21
44

pkg/ext-proc/backend/datastore.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package backend
2+
3+
import (
4+
"sync"
5+
6+
"inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1"
7+
corev1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/labels"
10+
"k8s.io/klog/v2"
11+
)
12+
13+
// The datastore is a local cache of relevant data for the given LLMServerPool (currently all pulled from k8s-api)
14+
type K8sDatastore struct {
15+
LLMServerPool *v1alpha1.LLMServerPool
16+
Pods sync.Map
17+
Port string
18+
}
19+
20+
func (ds *K8sDatastore) GetPodIPs() []string {
21+
var ips []string
22+
ds.Pods.Range(func(name, pod any) bool {
23+
24+
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
25+
return true
26+
})
27+
return ips
28+
}
29+
30+
func (ds *K8sDatastore) LabelsMatch(podLabels map[string]string) bool {
31+
selector, err := metav1.LabelSelectorAsSelector(&ds.LLMServerPool.Spec.ModelServerSelector)
32+
if err != nil {
33+
klog.Error(err.Error())
34+
return false
35+
}
36+
if selector == nil {
37+
return false
38+
}
39+
set := labels.Set(podLabels)
40+
return selector.Matches(set)
41+
42+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
6+
"sigs.k8s.io/controller-runtime/pkg/client"
7+
8+
"inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1"
9+
"k8s.io/apimachinery/pkg/runtime"
10+
"k8s.io/client-go/tools/record"
11+
"k8s.io/klog/v2"
12+
ctrl "sigs.k8s.io/controller-runtime"
13+
)
14+
15+
const (
16+
controllerNamePrefix = "instance-gateway-"
17+
)
18+
19+
// LLMServerPoolController is the controller implementation for Instance Gateway resources
20+
type LLMServerPoolController struct {
21+
client.Client
22+
Scheme *runtime.Scheme
23+
Record record.EventRecorder
24+
ServerPoolName string
25+
Namespace string
26+
Datastore *K8sDatastore
27+
}
28+
29+
func (c *LLMServerPoolController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
30+
if req.NamespacedName.Name != c.ServerPoolName && req.NamespacedName.Namespace != c.Namespace {
31+
return ctrl.Result{}, nil
32+
}
33+
klog.V(1).Info("reconciling LLMServerPool", req.NamespacedName)
34+
35+
serverPool := &v1alpha1.LLMServerPool{}
36+
if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil {
37+
klog.Error(err, "unable to get LLMServerPool")
38+
return ctrl.Result{}, err
39+
}
40+
41+
c.updateDatastore(serverPool)
42+
43+
return ctrl.Result{}, nil
44+
}
45+
46+
func (c *LLMServerPoolController) updateDatastore(serverPool *v1alpha1.LLMServerPool) {
47+
if c.Datastore.LLMServerPool == nil || serverPool.ObjectMeta.ResourceVersion != c.Datastore.LLMServerPool.ObjectMeta.ResourceVersion {
48+
c.Datastore.LLMServerPool = serverPool
49+
}
50+
}
51+
52+
func (c *LLMServerPoolController) SetupWithManager(mgr ctrl.Manager) error {
53+
return ctrl.NewControllerManagedBy(mgr).
54+
For(&v1alpha1.LLMServerPool{}).
55+
Complete(c)
56+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
6+
v1 "k8s.io/api/core/v1"
7+
"k8s.io/apimachinery/pkg/runtime"
8+
ctrl "sigs.k8s.io/controller-runtime"
9+
"sigs.k8s.io/controller-runtime/pkg/client"
10+
11+
"k8s.io/client-go/tools/record"
12+
"k8s.io/klog/v2"
13+
)
14+
15+
type PodController struct {
16+
client.Client
17+
Scheme *runtime.Scheme
18+
Record record.EventRecorder
19+
Datastore *K8sDatastore
20+
}
21+
22+
func (c *PodController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
23+
klog.V(3).Info("reconciling Pod", req.NamespacedName)
24+
25+
pod := &v1.Pod{}
26+
if err := c.Get(ctx, req.NamespacedName, pod); err != nil {
27+
klog.Error(err, "unable to get Pod")
28+
return ctrl.Result{}, err
29+
}
30+
31+
c.updateDatastore(pod)
32+
33+
return ctrl.Result{}, nil
34+
}
35+
36+
func (c *PodController) updateDatastore(pod *v1.Pod) {
37+
// if labels don't match or pod is scheduled to be deleted, remove in case the pod was part of this pool, otherwise, add.
38+
if !c.Datastore.LabelsMatch(pod.ObjectMeta.Labels) || pod.ObjectMeta.DeletionTimestamp != nil {
39+
c.Datastore.Pods.Delete(pod.Name)
40+
} else if c.Datastore.LabelsMatch(pod.ObjectMeta.Labels) {
41+
c.Datastore.Pods.Store(pod.Name, pod)
42+
}
43+
}
44+
45+
func (c *PodController) SetupWithManager(mgr ctrl.Manager) error {
46+
return ctrl.NewControllerManagedBy(mgr).
47+
For(&v1.Pod{}).
48+
Complete(c)
49+
}

pkg/ext-proc/backend/provider.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,19 @@ import (
77
"time"
88

99
"go.uber.org/multierr"
10+
corev1 "k8s.io/api/core/v1"
1011
klog "k8s.io/klog/v2"
1112
)
1213

1314
const (
1415
fetchMetricsTimeout = 5 * time.Second
1516
)
1617

17-
func NewProvider(pmc PodMetricsClient, pl PodLister) *Provider {
18+
func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider {
1819
p := &Provider{
1920
podMetrics: sync.Map{},
2021
pmc: pmc,
21-
pl: pl,
22+
datastore: datastore,
2223
}
2324
return p
2425
}
@@ -28,17 +29,13 @@ type Provider struct {
2829
// key: Pod, value: *PodMetrics
2930
podMetrics sync.Map
3031
pmc PodMetricsClient
31-
pl PodLister
32+
datastore *K8sDatastore
3233
}
3334

3435
type PodMetricsClient interface {
3536
FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error)
3637
}
3738

38-
type PodLister interface {
39-
List() (PodSet, error)
40-
}
41-
4239
func (p *Provider) AllPodMetrics() []*PodMetrics {
4340
res := []*PodMetrics{}
4441
fn := func(k, v any) bool {
@@ -108,13 +105,11 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
108105
// refreshPodsOnce lists pods and updates keys in the podMetrics map.
109106
// Note this function doesn't update the PodMetrics value, it's done separately.
110107
func (p *Provider) refreshPodsOnce() error {
111-
pods, err := p.pl.List()
112-
if err != nil {
113-
return err
114-
}
115108
// merge new pods with cached ones.
116109
// add new pod to the map
117-
for pod := range pods {
110+
addNewPods := func(k, v any) bool {
111+
k8sPod := v.(*corev1.Pod)
112+
pod := Pod{Name: k8sPod.Name, Namespace: k8sPod.Namespace, Address: k8sPod.Status.PodIP + ":" + p.datastore.Port}
118113
if _, ok := p.podMetrics.Load(pod); !ok {
119114
new := &PodMetrics{
120115
Pod: pod,
@@ -124,16 +119,18 @@ func (p *Provider) refreshPodsOnce() error {
124119
}
125120
p.podMetrics.Store(pod, new)
126121
}
122+
return true
127123
}
128124
// remove pods that don't exist any more.
129125
mergeFn := func(k, v any) bool {
130126
pod := k.(Pod)
131-
if _, ok := pods[pod]; !ok {
127+
if _, ok := p.datastore.Pods.Load(pod.Name); !ok {
132128
p.podMetrics.Delete(pod)
133129
}
134130
return true
135131
}
136132
p.podMetrics.Range(mergeFn)
133+
p.datastore.Pods.Range(addNewPods)
137134
return nil
138135
}
139136

pkg/ext-proc/go.mod

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module ext-proc
22

3-
go 1.21
3+
go 1.22.0
44

55
require (
66
github.com/bojand/ghz v0.120.0
@@ -9,12 +9,14 @@ require (
99
github.com/jhump/protoreflect v1.15.1
1010
github.com/prometheus/client_model v0.6.1
1111
github.com/prometheus/common v0.55.0
12-
go.uber.org/multierr v1.9.0
12+
go.uber.org/multierr v1.11.0
1313
google.golang.org/grpc v1.67.0
1414
google.golang.org/protobuf v1.34.2
1515
k8s.io/klog/v2 v2.130.1
1616
)
1717

18+
require sigs.k8s.io/controller-runtime v0.19.0 // indirect
19+
1820
require (
1921
cel.dev/expr v0.16.0 // indirect
2022
cloud.google.com/go/compute/metadata v0.5.0 // indirect
@@ -27,31 +29,60 @@ require (
2729
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
2830
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2931
github.com/cncf/xds/go v0.0.0-20240723142845-024c85f92f20 // indirect
32+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3033
github.com/dustin/go-humanize v1.0.1 // indirect
34+
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
3135
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
32-
github.com/go-logr/logr v1.4.1 // indirect
36+
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
37+
github.com/go-logr/logr v1.4.2 // indirect
38+
github.com/go-openapi/jsonpointer v0.19.6 // indirect
39+
github.com/go-openapi/jsonreference v0.20.2 // indirect
40+
github.com/go-openapi/swag v0.22.4 // indirect
3341
github.com/gogo/protobuf v1.3.2 // indirect
3442
github.com/golang/protobuf v1.5.4 // indirect
43+
github.com/google/gnostic-models v0.6.8 // indirect
44+
github.com/google/go-cmp v0.6.0 // indirect
45+
github.com/google/gofuzz v1.2.0 // indirect
3546
github.com/google/uuid v1.6.0 // indirect
3647
github.com/huandu/xstrings v1.3.3 // indirect
3748
github.com/imdario/mergo v0.3.11 // indirect
3849
github.com/jinzhu/configor v1.2.1 // indirect
39-
github.com/kr/text v0.2.0 // indirect
50+
github.com/josharian/intern v1.0.0 // indirect
51+
github.com/json-iterator/go v1.1.12 // indirect
52+
github.com/mailru/easyjson v0.7.7 // indirect
4053
github.com/mitchellh/copystructure v1.0.0 // indirect
4154
github.com/mitchellh/reflectwalk v1.0.1 // indirect
55+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
56+
github.com/modern-go/reflect2 v1.0.2 // indirect
4257
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
4358
github.com/pkg/errors v0.9.1 // indirect
4459
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
4560
github.com/shopspring/decimal v1.2.0 // indirect
4661
github.com/spf13/cast v1.4.1 // indirect
47-
go.uber.org/atomic v1.7.0 // indirect
62+
github.com/x448/float16 v0.8.4 // indirect
4863
golang.org/x/crypto v0.26.0 // indirect
4964
golang.org/x/net v0.28.0 // indirect
5065
golang.org/x/oauth2 v0.22.0 // indirect
5166
golang.org/x/sync v0.8.0 // indirect
5267
golang.org/x/sys v0.24.0 // indirect
68+
golang.org/x/term v0.23.0 // indirect
5369
golang.org/x/text v0.17.0 // indirect
70+
golang.org/x/time v0.3.0 // indirect
5471
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
5572
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
73+
gopkg.in/inf.v0 v0.9.1 // indirect
5674
gopkg.in/yaml.v2 v2.4.0 // indirect
75+
gopkg.in/yaml.v3 v3.0.1 // indirect
76+
inference.networking.x-k8s.io/llm-instance-gateway v0.0.0
77+
k8s.io/api v0.31.1 // indirect
78+
k8s.io/apimachinery v0.31.1
79+
k8s.io/client-go v0.31.1
80+
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
81+
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
82+
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
83+
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
84+
sigs.k8s.io/yaml v1.4.0 // indirect
5785
)
86+
87+
//replace inference.networking.x-k8s.io/llm-instance-gateway v0.0.0 => /usr/local/google/home/kfswain/repos/kubernetes/llm-instance-gateway
88+
replace inference.networking.x-k8s.io/llm-instance-gateway v0.0.0 => /src

0 commit comments

Comments
 (0)