diff --git a/api/v1alpha1/llmserverpool_types.go b/api/v1alpha1/llmserverpool_types.go index a18cef58a..42bcea366 100644 --- a/api/v1alpha1/llmserverpool_types.go +++ b/api/v1alpha1/llmserverpool_types.go @@ -25,11 +25,19 @@ import ( // LLMServerPoolSpec defines the desired state of LLMServerPool type LLMServerPoolSpec struct { - // ModelServerSelector uses label selection to watch model server pods + // ModelServerSelector uses a map of label to watch model server pods // that should be included in the LLMServerPool. ModelServers should not // be with any other Service or LLMServerPool, that behavior is not supported // and will result in sub-optimal utilization. - ModelServerSelector metav1.LabelSelector `json:"modelServerSelector,omitempty"` + // Due to this selector being translated to a service a simple map is used instead + // of: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#LabelSelector + // To avoid footshoot errors when the https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#LabelSelectorAsMap would be used. + ModelServerSelector map[string]string `json:"modelServerSelector,omitempty"` + + // TargetPort is the port number that the model servers within the pool expect + // to recieve traffic from. + // This maps to the TargetPort in: https://pkg.go.dev/k8s.io/api/core/v1#ServicePort + TargetPort int32 } // LLMServerPoolStatus defines the observed state of LLMServerPool diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index b47b41f70..3db31ad1a 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -89,7 +89,13 @@ func (in *LLMServerPoolList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LLMServerPoolSpec) DeepCopyInto(out *LLMServerPoolSpec) { *out = *in - in.ModelServerSelector.DeepCopyInto(&out.ModelServerSelector) + if in.ModelServerSelector != nil { + in, out := &in.ModelServerSelector, &out.ModelServerSelector + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LLMServerPoolSpec. diff --git a/client-go/applyconfiguration/api/v1alpha1/llmserverpoolspec.go b/client-go/applyconfiguration/api/v1alpha1/llmserverpoolspec.go index 8a69cfde0..21ca637a6 100644 --- a/client-go/applyconfiguration/api/v1alpha1/llmserverpoolspec.go +++ b/client-go/applyconfiguration/api/v1alpha1/llmserverpoolspec.go @@ -17,14 +17,10 @@ limitations under the License. package v1alpha1 -import ( - v1 "k8s.io/client-go/applyconfigurations/meta/v1" -) - // LLMServerPoolSpecApplyConfiguration represents a declarative configuration of the LLMServerPoolSpec type for use // with apply. type LLMServerPoolSpecApplyConfiguration struct { - ModelServerSelector *v1.LabelSelectorApplyConfiguration `json:"modelServerSelector,omitempty"` + ModelServerSelector map[string]string `json:"modelServerSelector,omitempty"` } // LLMServerPoolSpecApplyConfiguration constructs a declarative configuration of the LLMServerPoolSpec type for use with @@ -33,10 +29,16 @@ func LLMServerPoolSpec() *LLMServerPoolSpecApplyConfiguration { return &LLMServerPoolSpecApplyConfiguration{} } -// WithModelServerSelector sets the ModelServerSelector field in the declarative configuration to the given value -// and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the ModelServerSelector field is set to the value of the last call. -func (b *LLMServerPoolSpecApplyConfiguration) WithModelServerSelector(value *v1.LabelSelectorApplyConfiguration) *LLMServerPoolSpecApplyConfiguration { - b.ModelServerSelector = value +// WithModelServerSelector puts the entries into the ModelServerSelector field in the declarative configuration +// and returns the receiver, so that objects can be build by chaining "With" function invocations. +// If called multiple times, the entries provided by each call will be put on the ModelServerSelector field, +// overwriting an existing map entries in ModelServerSelector field with the same key. +func (b *LLMServerPoolSpecApplyConfiguration) WithModelServerSelector(entries map[string]string) *LLMServerPoolSpecApplyConfiguration { + if b.ModelServerSelector == nil && len(entries) > 0 { + b.ModelServerSelector = make(map[string]string, len(entries)) + } + for k, v := range entries { + b.ModelServerSelector[k] = v + } return b } diff --git a/pkg/ext-proc/backend/datastore.go b/pkg/ext-proc/backend/datastore.go index 114e605d8..cddae4980 100644 --- a/pkg/ext-proc/backend/datastore.go +++ b/pkg/ext-proc/backend/datastore.go @@ -11,7 +11,6 @@ import ( type K8sDatastore struct { LLMServerPool *v1alpha1.LLMServerPool Pods *sync.Map - Port string } func (ds *K8sDatastore) GetPodIPs() []string { diff --git a/pkg/ext-proc/backend/endpointslice_reconciler.go b/pkg/ext-proc/backend/endpointslice_reconciler.go index cba538646..06c4faad8 100644 --- a/pkg/ext-proc/backend/endpointslice_reconciler.go +++ b/pkg/ext-proc/backend/endpointslice_reconciler.go @@ -2,6 +2,7 @@ package backend import ( "context" + "fmt" discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime" @@ -49,7 +50,7 @@ func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSli for _, endpoint := range slice.Endpoints { klog.V(4).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint) if c.validPod(endpoint) { - pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + c.Datastore.Port} + pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + fmt.Sprint(c.Datastore.LLMServerPool.Spec.TargetPort)} podMap[pod] = true c.Datastore.Pods.Store(pod, true) } diff --git a/pkg/ext-proc/backend/endpointslice_reconcilier_test.go b/pkg/ext-proc/backend/endpointslice_reconcilier_test.go index 730d314ff..f965d3e19 100644 --- a/pkg/ext-proc/backend/endpointslice_reconcilier_test.go +++ b/pkg/ext-proc/backend/endpointslice_reconcilier_test.go @@ -4,6 +4,7 @@ import ( "sync" "testing" + "inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1" v1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" ) @@ -25,7 +26,11 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) { name: "Add new pod", datastore: K8sDatastore{ Pods: populateMap(basePod1, basePod2), - Port: "8000", + LLMServerPool: &v1alpha1.LLMServerPool{ + Spec: v1alpha1.LLMServerPoolSpec{ + TargetPort: int32(8000), + }, + }, }, incomingSlice: &discoveryv1.EndpointSlice{ Endpoints: []discoveryv1.Endpoint{ @@ -63,14 +68,17 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) { }, want: K8sDatastore{ Pods: populateMap(basePod1, basePod2, basePod3), - Port: "8000", }, }, { name: "New pod, but its not ready yet. Do not add.", datastore: K8sDatastore{ Pods: populateMap(basePod1, basePod2), - Port: "8000", + LLMServerPool: &v1alpha1.LLMServerPool{ + Spec: v1alpha1.LLMServerPoolSpec{ + TargetPort: int32(8000), + }, + }, }, incomingSlice: &discoveryv1.EndpointSlice{ Endpoints: []discoveryv1.Endpoint{ @@ -108,14 +116,17 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) { }, want: K8sDatastore{ Pods: populateMap(basePod1, basePod2), - Port: "8000", }, }, { name: "Existing pod not ready, new pod added, and is ready", datastore: K8sDatastore{ Pods: populateMap(basePod1, basePod2), - Port: "8000", + LLMServerPool: &v1alpha1.LLMServerPool{ + Spec: v1alpha1.LLMServerPoolSpec{ + TargetPort: int32(8000), + }, + }, }, incomingSlice: &discoveryv1.EndpointSlice{ Endpoints: []discoveryv1.Endpoint{ @@ -153,7 +164,6 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) { }, want: K8sDatastore{ Pods: populateMap(basePod3, basePod2), - Port: "8000", }, }, } diff --git a/pkg/ext-proc/backend/llmlserverpool_reconciler_test.go b/pkg/ext-proc/backend/llmlserverpool_reconciler_test.go index ec4716cbd..2e3163417 100644 --- a/pkg/ext-proc/backend/llmlserverpool_reconciler_test.go +++ b/pkg/ext-proc/backend/llmlserverpool_reconciler_test.go @@ -20,9 +20,7 @@ func TestUpdateDatastore_LLMServerPoolReconciler(t *testing.T) { datastore: K8sDatastore{ LLMServerPool: &v1alpha1.LLMServerPool{ Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "vllm"}, - }, + ModelServerSelector: map[string]string{"app": "vllm"}, }, ObjectMeta: metav1.ObjectMeta{ Name: "test-pool", @@ -32,9 +30,7 @@ func TestUpdateDatastore_LLMServerPoolReconciler(t *testing.T) { }, incomingServerPool: &v1alpha1.LLMServerPool{ Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "not-vllm"}, - }, + ModelServerSelector: map[string]string{"app": "not-vllm"}, }, ObjectMeta: metav1.ObjectMeta{ Name: "test-pool", @@ -44,9 +40,7 @@ func TestUpdateDatastore_LLMServerPoolReconciler(t *testing.T) { want: K8sDatastore{ LLMServerPool: &v1alpha1.LLMServerPool{ Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "not-vllm"}, - }, + ModelServerSelector: map[string]string{"app": "not-vllm"}, }, ObjectMeta: metav1.ObjectMeta{ Name: "test-pool", @@ -60,9 +54,7 @@ func TestUpdateDatastore_LLMServerPoolReconciler(t *testing.T) { datastore: K8sDatastore{ LLMServerPool: &v1alpha1.LLMServerPool{ Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "vllm"}, - }, + ModelServerSelector: map[string]string{"app": "vllm"}, }, ObjectMeta: metav1.ObjectMeta{ Name: "test-pool", @@ -72,9 +64,7 @@ func TestUpdateDatastore_LLMServerPoolReconciler(t *testing.T) { }, incomingServerPool: &v1alpha1.LLMServerPool{ Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: metav1.LabelSelector{ - MatchLabels: map[string]string{"technically": "this-should-never-happen"}, - }, + ModelServerSelector: map[string]string{"technically": "this-should-never-happen"}, }, ObjectMeta: metav1.ObjectMeta{ Name: "test-pool", @@ -84,9 +74,7 @@ func TestUpdateDatastore_LLMServerPoolReconciler(t *testing.T) { want: K8sDatastore{ LLMServerPool: &v1alpha1.LLMServerPool{ Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: metav1.LabelSelector{ - MatchLabels: map[string]string{"app": "vllm"}, - }, + ModelServerSelector: map[string]string{"app": "vllm"}, }, ObjectMeta: metav1.ObjectMeta{ Name: "test-pool", diff --git a/pkg/ext-proc/backend/llmserverpool_reconciler.go b/pkg/ext-proc/backend/llmserverpool_reconciler.go index 37b4ed336..d6ca67681 100644 --- a/pkg/ext-proc/backend/llmserverpool_reconciler.go +++ b/pkg/ext-proc/backend/llmserverpool_reconciler.go @@ -26,7 +26,6 @@ type LLMServerPoolReconciler struct { ServerPoolName string Namespace string Datastore *K8sDatastore - Port int Zone string } diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index d4c9df3e7..11495a00b 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -36,7 +36,6 @@ var ( serviceName = flag.String("serviceName", "", "Name of the service that will be used to read the endpointslices from") namespace = flag.String("namespace", "default", "The Namespace that the server pool should exist in.") zone = flag.String("zone", "", "The zone that this instance is created in. Will be passed to the corresponding endpointSlice. ") - desiredPort = flag.String("desiredPort", "8000", "The port that the model server exposes") refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods") refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics") scheme = runtime.NewScheme() @@ -71,7 +70,7 @@ func main() { klog.Fatalf("failed to listen: %v", err) } - datastore := &backend.K8sDatastore{LLMServerPool: &v1alpha1.LLMServerPool{}, Pods: &sync.Map{}, Port: *desiredPort} + datastore := &backend.K8sDatastore{LLMServerPool: &v1alpha1.LLMServerPool{}, Pods: &sync.Map{}} mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme,