Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/hex"
"errors"
"math/big"
"math/rand"
"net/url"
"sort"
Expand Down Expand Up @@ -364,6 +365,11 @@ func reportLiveAICapacity(ch chan common.OrchestratorDescriptor, caps common.Cap

idleContainersByModelAndOrchestrator := make(map[string]map[string]int)
for _, od := range allOrchInfo {
pricePerUnit := od.RemoteInfo.PriceInfo.PricePerUnit
pixelsPerUnit := od.RemoteInfo.PriceInfo.PixelsPerUnit
pricePerPixel := big.NewRat(pricePerUnit, pixelsPerUnit)
monitor.LiveAIPricePerPixel(od.LocalInfo.URL.String(), pricePerPixel)

var models map[string]*net.Capabilities_CapabilityConstraints_ModelConstraint
if od.RemoteInfo != nil {
models = getModelCaps(od.RemoteInfo.Capabilities)
Expand Down
18 changes: 18 additions & 0 deletions monitor/census.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ type (
mAIResultSaveFailed *stats.Int64Measure
mAIContainersInUse *stats.Int64Measure
mAIContainersIdle *stats.Int64Measure
mLiveAIPricePerPixel *stats.Float64Measure
aiContainersIdleByPipelineByOrchestrator map[string]map[string]int
mAIGPUsIdle *stats.Int64Measure
mAICurrentLivePipelines *stats.Int64Measure
Expand Down Expand Up @@ -391,6 +392,7 @@ func InitCensus(nodeType NodeType, version string) {
census.mAIResultSaveFailed = stats.Int64("ai_result_upload_failed_total", "AIResultUploadFailed", "tot")
census.mAIContainersInUse = stats.Int64("ai_container_in_use", "Number of containers currently used for AI processing", "tot")
census.mAIContainersIdle = stats.Int64("ai_container_idle", "Number of containers currently available for AI processing", "tot")
census.mLiveAIPricePerPixel = stats.Float64("live_ai_price_per_pixel", "Live AI price per pixel", "wei/pixel")
census.aiContainersIdleByPipelineByOrchestrator = make(map[string]map[string]int)
census.mAIGPUsIdle = stats.Int64("ai_gpus_idle", "Number of idle GPUs (with no configured container)", "tot")
census.mAICurrentLivePipelines = stats.Int64("ai_current_live_pipelines", "Number of live AI pipelines currently running", "tot")
Expand Down Expand Up @@ -1029,6 +1031,13 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: append([]tag.Key{census.kPipeline, census.kModelName, census.kOrchestratorURI}, baseTags...),
Aggregation: view.LastValue(),
},
{
Name: "live_ai_price_per_pixel",
Measure: census.mLiveAIPricePerPixel,
Description: "Live AI price per pixel",
TagKeys: append([]tag.Key{census.kOrchestratorURI}, baseTags...),
Aggregation: view.LastValue(),
},
{
Name: "ai_gpus_idle",
Measure: census.mAIGPUsIdle,
Expand Down Expand Up @@ -2067,6 +2076,15 @@ func AIContainersIdle(currentContainersIdle int, pipeline, modelID, uri string)
}
}

func LiveAIPricePerPixel(orchestratorURI string, pricePerPixel *big.Rat) {
floatPrice, _ := pricePerPixel.Float64()
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kOrchestratorURI, orchestratorURI)},
census.mLiveAIPricePerPixel.M(floatPrice)); err != nil {
glog.Errorf("Error recording metrics err=%q", err)
}
}

func AIGPUsIdle(currentGPUsIdle int, pipeline, modelID string) {
if err := stats.RecordWithTags(census.ctx,
[]tag.Mutator{tag.Insert(census.kPipeline, pipeline), tag.Insert(census.kModelName, modelID)},
Expand Down
2 changes: 2 additions & 0 deletions server/selection_algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ func filterByMaxPrice(ctx context.Context, addrs []ethcommon.Address, maxPrice *
price := prices[addr]
if price != nil && price.Cmp(maxPrice) <= 0 {
res = append(res, addr)
} else {
clog.Warningf(ctx, "Orchestrator %s is above max price %v, price=%v", addr, maxPrice, price)
}
}
return res
Expand Down
Loading