Skip to content
This repository was archived by the owner on Nov 17, 2025. It is now read-only.

Commit b1166c8

Browse files
Merge pull request #26 from kerthcet/cleanup/remove-hostnetwork
Use podIP rather than hostNetwork
2 parents d924085 + 31e946d commit b1166c8

File tree

13 files changed

+53
-103
lines changed

13 files changed

+53
-103
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ metadata:
5050
name: torrent-sample
5151
spec:
5252
hub:
53+
name: Huggingface
5354
repoID: Qwen/Qwen2.5-0.5B-Instruct
5455
```
5556
@@ -62,6 +63,7 @@ metadata:
6263
name: torrent-sample
6364
spec:
6465
hub:
66+
name: Huggingface
6567
repoID: Qwen/Qwen2.5-0.5B-Instruct
6668
nodeSelector:
6769
zone: zone-a
@@ -78,6 +80,7 @@ metadata:
7880
name: torrent-sample
7981
spec:
8082
hub:
83+
name: Huggingface
8184
repoID: Qwen/Qwen2.5-0.5B-Instruct
8285
reclaimPolicy: Delete
8386
```

agent/config/base/clusterrole.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,11 @@ rules:
4747
- get
4848
- list
4949
- watch
50+
- apiGroups:
51+
- ""
52+
resources:
53+
- pods
54+
verbs:
55+
- get
56+
- list
57+
- watch

agent/config/manager/daemonset.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ metadata:
44
name: manta-agent
55
namespace: manta-system
66
labels:
7+
# This is required for manta when fetching peer IP.
78
app: manta-agent
89
spec:
910
selector:
@@ -14,7 +15,6 @@ spec:
1415
labels:
1516
app: manta-agent
1617
spec:
17-
hostNetwork: true
1818
serviceAccountName: manta-agent
1919
initContainers:
2020
- name: init-permissions

agent/pkg/controller/replication_controller.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"os"
2222

23+
corev1 "k8s.io/api/core/v1"
2324
apimeta "k8s.io/apimachinery/pkg/api/meta"
2425
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2526
"k8s.io/apimachinery/pkg/runtime"
@@ -92,6 +93,7 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
9293
}
9394

9495
// This may take a long time, the concurrency is controlled by the MaxConcurrentReconciles.
96+
// TODO: should we create a Job to handle this? See discussion: https://github.com/InftyAI/Manta/issues/25
9597
if err := handler.HandleReplication(ctx, r.Client, replication); err != nil {
9698
logger.Error(err, "error to handle replication", "Replication", klog.KObj(replication))
9799
return ctrl.Result{}, err
@@ -144,6 +146,13 @@ func (r *ReplicationReconciler) updateNodeTracker(ctx context.Context, replicati
144146

145147
// SetupWithManager sets up the controller with the Manager.
146148
func (r *ReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
149+
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &corev1.Pod{}, "spec.nodeName", func(rawObj client.Object) []string {
150+
pod := rawObj.(*corev1.Pod)
151+
return []string{pod.Spec.NodeName}
152+
}); err != nil {
153+
return err
154+
}
155+
147156
return ctrl.NewControllerManagedBy(mgr).
148157
For(&api.Replication{}).
149158
WithOptions(controller.Options{MaxConcurrentReconciles: 5}).

agent/pkg/handler/chunk_handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ func SendChunk(w http.ResponseWriter, r *http.Request) {
7272
}
7373
}
7474

75-
func recvChunk(blobPath, snapshotPath, ipAddr string) error {
76-
url := fmt.Sprintf("http://%s:%s/sync?path=%s", ipAddr, api.HttpPort, blobPath)
75+
func recvChunk(blobPath, snapshotPath, addr string) error {
76+
url := fmt.Sprintf("http://%s:%s/sync?path=%s", addr, api.HttpPort, blobPath)
7777

7878
resp, err := http.Get(url)
7979
if err != nil {

agent/pkg/handler/replication_handler.go renamed to agent/pkg/handler/handler.go

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ import (
2424
"strings"
2525

2626
corev1 "k8s.io/api/core/v1"
27-
"k8s.io/apimachinery/pkg/types"
27+
"k8s.io/apimachinery/pkg/fields"
28+
"k8s.io/apimachinery/pkg/labels"
2829
"k8s.io/klog/v2"
2930
"sigs.k8s.io/controller-runtime/pkg/client"
3031
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -104,15 +105,15 @@ func syncChunk(ctx context.Context, client client.Client, replication *api.Repli
104105
sourceSplits := strings.Split(*replication.Spec.Source.URI, "://")
105106
addresses := strings.Split(sourceSplits[1], "@")
106107
nodeName, blobPath := addresses[0], addresses[1]
107-
nodeIP, err := nodeIP(ctx, client, nodeName)
108+
addr, err := peerAddr(ctx, client, nodeName)
108109
if err != nil {
109110
return err
110111
}
111112

112113
// The destination URI looks like localhost://<path-to-your-file>
113114
destSplits := strings.Split(*replication.Spec.Destination.URI, "://")
114115

115-
if err := recvChunk(blobPath, destSplits[1], nodeIP); err != nil {
116+
if err := recvChunk(blobPath, destSplits[1], addr); err != nil {
116117
logger.Error(err, "failed to sync chunk")
117118
return err
118119
}
@@ -196,15 +197,22 @@ func parseURI(uri string) (host string, address string) {
196197
return splits[0], splits[1]
197198
}
198199

199-
func nodeIP(ctx context.Context, client client.Client, nodeName string) (string, error) {
200-
node := corev1.Node{}
201-
if err := client.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil {
200+
func peerAddr(ctx context.Context, c client.Client, nodeName string) (string, error) {
201+
fieldSelector := fields.OneTermEqualSelector("spec.nodeName", nodeName)
202+
listOptions := &client.ListOptions{
203+
FieldSelector: fieldSelector,
204+
// HACK: the label is hacked, we must set it explicitly in the daemonSet.
205+
LabelSelector: labels.SelectorFromSet(map[string]string{"app": "manta-agent"}),
206+
}
207+
208+
pods := corev1.PodList{}
209+
if err := c.List(ctx, &pods, listOptions); err != nil {
202210
return "", err
203211
}
204-
for _, address := range node.Status.Addresses {
205-
if address.Type == "InternalIP" {
206-
return address.Address, nil
207-
}
212+
213+
if len(pods.Items) != 1 {
214+
return "", fmt.Errorf("got more than one pod per node for daemonSet")
208215
}
209-
return "", fmt.Errorf("can't get node internal IP")
216+
217+
return pods.Items[0].Status.PodIP, nil
210218
}
File renamed without changes.

agent/pkg/task/task.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"path/filepath"
2424
"time"
2525

26-
"github.com/go-logr/logr"
2726
corev1 "k8s.io/api/core/v1"
2827
apierrors "k8s.io/apimachinery/pkg/api/errors"
2928
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -43,25 +42,20 @@ const (
4342
workspace = cons.DefaultWorkspace
4443
)
4544

46-
var (
47-
logger logr.Logger
48-
)
49-
5045
func BackgroundTasks(ctx context.Context, c client.Client) {
51-
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
52-
logger = ctrl.Log.WithName("Background")
53-
5446
// Sync the disk chunk infos to the nodeTracker.
5547
go syncChunks(ctx, c)
5648
}
5749

5850
func syncChunks(ctx context.Context, c client.Client) {
51+
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
52+
logger := ctrl.Log.WithName("Background tasks")
53+
5954
forFunc := func(ctx context.Context) error {
6055
attempts := 0
6156
for {
6257
attempts += 1
6358
if err := findOrCreateNodeTracker(ctx, c); err != nil {
64-
// fmt.Printf("failed to create nodeTracker: %v, retry.", err)
6559
logger.Error(err, "Failed to create nodeTracker, retry...")
6660

6761
if attempts > 10 {

cmd/main.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import (
3939
"github.com/inftyai/manta/pkg/dispatcher"
4040
"github.com/inftyai/manta/pkg/dispatcher/framework"
4141
"github.com/inftyai/manta/pkg/dispatcher/plugins/diskaware"
42-
"github.com/inftyai/manta/pkg/dispatcher/plugins/gnumber"
4342
"github.com/inftyai/manta/pkg/dispatcher/plugins/nodeselector"
4443
"github.com/inftyai/manta/pkg/webhook"
4544
//+kubebuilder:scaffold:imports
@@ -133,7 +132,7 @@ func setupControllers(mgr ctrl.Manager, certsReady chan struct{}) {
133132
<-certsReady
134133
setupLog.Info("certs ready")
135134

136-
dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New, gnumber.New})
135+
dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New})
137136
if err != nil {
138137
setupLog.Error(err, "unable to create dispatcher")
139138
os.Exit(1)

pkg/controller/nodetracker_controller.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
corev1 "k8s.io/api/core/v1"
2424
"k8s.io/apimachinery/pkg/runtime"
2525
"k8s.io/apimachinery/pkg/types"
26-
"k8s.io/klog/v2"
2726
ctrl "sigs.k8s.io/controller-runtime"
2827
"sigs.k8s.io/controller-runtime/pkg/builder"
2928
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -91,8 +90,6 @@ func (r *NodeTrackerReconciler) Create(e event.CreateEvent) bool {
9190
return false
9291
}
9392

94-
logger := log.FromContext(context.Background()).WithValues("NodeTracker", klog.KObj(nodeTracker))
95-
logger.Info("NodeTracker create event")
9693
r.dispatcher.AddNodeTracker(nodeTracker)
9794
return true
9895
}
@@ -110,7 +107,11 @@ func (r *NodeTrackerReconciler) Update(e event.UpdateEvent) bool {
110107
}
111108

112109
func (r *NodeTrackerReconciler) Delete(e event.DeleteEvent) bool {
113-
obj := e.Object.(*api.NodeTracker)
110+
obj, match := e.Object.(*api.NodeTracker)
111+
if !match {
112+
return false
113+
}
114+
114115
r.dispatcher.DeleteNodeTracker(obj)
115116
return true
116117
}

0 commit comments

Comments
 (0)