Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ metadata:
name: torrent-sample
spec:
hub:
name: Huggingface
repoID: Qwen/Qwen2.5-0.5B-Instruct
```

Expand All @@ -62,6 +63,7 @@ metadata:
name: torrent-sample
spec:
hub:
name: Huggingface
repoID: Qwen/Qwen2.5-0.5B-Instruct
nodeSelector:
zone: zone-a
Expand All @@ -78,6 +80,7 @@ metadata:
name: torrent-sample
spec:
hub:
name: Huggingface
repoID: Qwen/Qwen2.5-0.5B-Instruct
reclaimPolicy: Delete
```
Expand Down
8 changes: 8 additions & 0 deletions agent/config/base/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,11 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- pods
verbs:
- get
- list
- watch
2 changes: 1 addition & 1 deletion agent/config/manager/daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ metadata:
name: manta-agent
namespace: manta-system
labels:
# This is required for manta when fetching peer IP.
app: manta-agent
spec:
selector:
Expand All @@ -14,7 +15,6 @@ spec:
labels:
app: manta-agent
spec:
hostNetwork: true
serviceAccountName: manta-agent
initContainers:
- name: init-permissions
Expand Down
9 changes: 9 additions & 0 deletions agent/pkg/controller/replication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"os"

corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -92,6 +93,7 @@ func (r *ReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}

// This may take a long time, the concurrency is controlled by the MaxConcurrentReconciles.
// TODO: should we create a Job to handle this? See discussion: https://github.com/InftyAI/Manta/issues/25
if err := handler.HandleReplication(ctx, r.Client, replication); err != nil {
logger.Error(err, "error to handle replication", "Replication", klog.KObj(replication))
return ctrl.Result{}, err
Expand Down Expand Up @@ -144,6 +146,13 @@ func (r *ReplicationReconciler) updateNodeTracker(ctx context.Context, replicati

// SetupWithManager sets up the controller with the Manager.
func (r *ReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &corev1.Pod{}, "spec.nodeName", func(rawObj client.Object) []string {
pod := rawObj.(*corev1.Pod)
return []string{pod.Spec.NodeName}
}); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&api.Replication{}).
WithOptions(controller.Options{MaxConcurrentReconciles: 5}).
Expand Down
4 changes: 2 additions & 2 deletions agent/pkg/handler/chunk_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func SendChunk(w http.ResponseWriter, r *http.Request) {
}
}

func recvChunk(blobPath, snapshotPath, ipAddr string) error {
url := fmt.Sprintf("http://%s:%s/sync?path=%s", ipAddr, api.HttpPort, blobPath)
func recvChunk(blobPath, snapshotPath, addr string) error {
url := fmt.Sprintf("http://%s:%s/sync?path=%s", addr, api.HttpPort, blobPath)

resp, err := http.Get(url)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -104,15 +105,15 @@ func syncChunk(ctx context.Context, client client.Client, replication *api.Repli
sourceSplits := strings.Split(*replication.Spec.Source.URI, "://")
addresses := strings.Split(sourceSplits[1], "@")
nodeName, blobPath := addresses[0], addresses[1]
nodeIP, err := nodeIP(ctx, client, nodeName)
addr, err := peerAddr(ctx, client, nodeName)
if err != nil {
return err
}

// The destination URI looks like localhost://<path-to-your-file>
destSplits := strings.Split(*replication.Spec.Destination.URI, "://")

if err := recvChunk(blobPath, destSplits[1], nodeIP); err != nil {
if err := recvChunk(blobPath, destSplits[1], addr); err != nil {
logger.Error(err, "failed to sync chunk")
return err
}
Expand Down Expand Up @@ -196,15 +197,22 @@ func parseURI(uri string) (host string, address string) {
return splits[0], splits[1]
}

func nodeIP(ctx context.Context, client client.Client, nodeName string) (string, error) {
node := corev1.Node{}
if err := client.Get(ctx, types.NamespacedName{Name: nodeName}, &node); err != nil {
func peerAddr(ctx context.Context, c client.Client, nodeName string) (string, error) {
fieldSelector := fields.OneTermEqualSelector("spec.nodeName", nodeName)
listOptions := &client.ListOptions{
FieldSelector: fieldSelector,
// HACK: the label is hacked, we must set it explicitly in the daemonSet.
LabelSelector: labels.SelectorFromSet(map[string]string{"app": "manta-agent"}),
}

pods := corev1.PodList{}
if err := c.List(ctx, &pods, listOptions); err != nil {
return "", err
}
for _, address := range node.Status.Addresses {
if address.Type == "InternalIP" {
return address.Address, nil
}

if len(pods.Items) != 1 {
return "", fmt.Errorf("got more than one pod per node for daemonSet")
}
return "", fmt.Errorf("can't get node internal IP")

return pods.Items[0].Status.PodIP, nil
}
12 changes: 3 additions & 9 deletions agent/pkg/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"path/filepath"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -43,25 +42,20 @@ const (
workspace = cons.DefaultWorkspace
)

var (
logger logr.Logger
)

func BackgroundTasks(ctx context.Context, c client.Client) {
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
logger = ctrl.Log.WithName("Background")

// Sync the disk chunk infos to the nodeTracker.
go syncChunks(ctx, c)
}

func syncChunks(ctx context.Context, c client.Client) {
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))
logger := ctrl.Log.WithName("Background tasks")

forFunc := func(ctx context.Context) error {
attempts := 0
for {
attempts += 1
if err := findOrCreateNodeTracker(ctx, c); err != nil {
// fmt.Printf("failed to create nodeTracker: %v, retry.", err)
logger.Error(err, "Failed to create nodeTracker, retry...")

if attempts > 10 {
Expand Down
3 changes: 1 addition & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/inftyai/manta/pkg/dispatcher"
"github.com/inftyai/manta/pkg/dispatcher/framework"
"github.com/inftyai/manta/pkg/dispatcher/plugins/diskaware"
"github.com/inftyai/manta/pkg/dispatcher/plugins/gnumber"
"github.com/inftyai/manta/pkg/dispatcher/plugins/nodeselector"
"github.com/inftyai/manta/pkg/webhook"
//+kubebuilder:scaffold:imports
Expand Down Expand Up @@ -133,7 +132,7 @@ func setupControllers(mgr ctrl.Manager, certsReady chan struct{}) {
<-certsReady
setupLog.Info("certs ready")

dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New, gnumber.New})
dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New})
if err != nil {
setupLog.Error(err, "unable to create dispatcher")
os.Exit(1)
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/nodetracker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -91,8 +90,6 @@ func (r *NodeTrackerReconciler) Create(e event.CreateEvent) bool {
return false
}

logger := log.FromContext(context.Background()).WithValues("NodeTracker", klog.KObj(nodeTracker))
logger.Info("NodeTracker create event")
r.dispatcher.AddNodeTracker(nodeTracker)
return true
}
Expand All @@ -110,7 +107,11 @@ func (r *NodeTrackerReconciler) Update(e event.UpdateEvent) bool {
}

func (r *NodeTrackerReconciler) Delete(e event.DeleteEvent) bool {
obj := e.Object.(*api.NodeTracker)
obj, match := e.Object.(*api.NodeTracker)
if !match {
return false
}

r.dispatcher.DeleteNodeTracker(obj)
return true
}
Expand Down
47 changes: 0 additions & 47 deletions pkg/dispatcher/plugins/gnumber/goroutine_number.go

This file was deleted.

26 changes: 1 addition & 25 deletions test/e2e/suit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/scheme"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Expand All @@ -35,7 +34,6 @@ import (

api "github.com/inftyai/manta/api/v1alpha1"
"github.com/inftyai/manta/test/util"
"github.com/inftyai/manta/test/util/wrapper"
)

var cfg *rest.Config
Expand Down Expand Up @@ -78,28 +76,6 @@ var _ = AfterSuite(func() {
})

func readyForTesting(client client.Client) {
By("waiting for webhooks to ready")

// To verify that webhooks are ready, let's create a simple Replication.
replication := wrapper.MakeReplication("sample-replication").
NodeName("unknown-node").
ChunkName("chunk1").
SizeBytes(1024).
SourceOfURI("localhost:///workspace/models/modelA").
Obj()

// Once the creation succeeds, that means the webhooks are ready
// and we can begin testing.
Eventually(func() error {
return client.Create(ctx, replication)
}, util.Timeout, util.Interval).Should(Succeed())

// Delete this replication before beginning tests.
Expect(client.Delete(ctx, replication)).To(Succeed())
Eventually(func() error {
return client.Get(ctx, types.NamespacedName{Name: replication.Name}, &api.Replication{})
}).ShouldNot(Succeed())

By("waiting for nodeTrackers to ready")
Eventually(func() error {
nodeTrackers := &api.NodeTrackerList{}
Expand All @@ -110,5 +86,5 @@ func readyForTesting(client client.Client) {
return fmt.Errorf("no nodeTrackers")
}
return nil
}, util.Timeout, util.Interval).Should(Succeed())
}, util.Timeout*3, util.Interval).Should(Succeed())
}
3 changes: 1 addition & 2 deletions test/integration/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/inftyai/manta/pkg/dispatcher"
"github.com/inftyai/manta/pkg/dispatcher/framework"
"github.com/inftyai/manta/pkg/dispatcher/plugins/diskaware"
"github.com/inftyai/manta/pkg/dispatcher/plugins/gnumber"
"github.com/inftyai/manta/pkg/dispatcher/plugins/nodeselector"
)

Expand Down Expand Up @@ -106,7 +105,7 @@ var _ = BeforeSuite(func() {
})
Expect(err).ToNot(HaveOccurred())

dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New, gnumber.New})
dispatcher, err := dispatcher.NewDispatcher([]framework.RegisterFunc{nodeselector.New, diskaware.New})
Expect(err).ToNot(HaveOccurred())

torrentController := controller.NewTorrentReconciler(mgr.GetClient(), mgr.GetScheme(), dispatcher)
Expand Down
Loading