Merge remote-tracking branch 'upstream/master' into upmaster

Former-commit-id: 5cb5ec7b204c264c1a422f207ec44a8390c8dd9e
This commit is contained in:
jagger 2024-06-27 15:54:46 +08:00
commit 5cc569b431
4 changed files with 26 additions and 9 deletions

View File

@ -115,7 +115,7 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere
l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "create", "任务创建中") l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "create", "任务创建中")
go l.svcCtx.Scheduler.AiService.ImageInfer(opt, id, adapterName, clusters, ts, l.ctx) go l.svcCtx.Scheduler.AiService.ImageInfer(opt, id, adapterName, clusters, ts)
return resp, nil return resp, nil
} }

View File

@ -11,6 +11,7 @@ type Weight struct {
} }
func DistributeReplicas(weights []*Weight, replicas int32) { func DistributeReplicas(weights []*Weight, replicas int32) {
var weightSum int32 var weightSum int32
weightSum = 0 weightSum = 0
for _, w := range weights { for _, w := range weights {
@ -59,6 +60,10 @@ func DistributeReplicas(weights []*Weight, replicas int32) {
weightRatio[maxIdx]-- weightRatio[maxIdx]--
rest-- rest--
} else { } else {
if weights[minIdx].Replica == 0 {
weightRatio[minIdx]++
continue
}
weights[minIdx].Replica-- weights[minIdx].Replica--
weightRatio[minIdx]++ weightRatio[minIdx]++
rest++ rest++

View File

@ -1,8 +1,7 @@
package service package service
import ( import (
"context" "github.com/zeromicro/go-zero/core/logx"
"fmt"
"github.com/zeromicro/go-zero/zrpc" "github.com/zeromicro/go-zero/zrpc"
hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient" hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/config"
@ -17,6 +16,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
"net/http"
"strconv" "strconv"
"sync" "sync"
) )
@ -91,13 +91,14 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
return executorMap, collectorMap return executorMap, collectorMap
} }
func (as *AiService) ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*inference.ImageFile, ctx context.Context) { func (as *AiService) ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*inference.ImageFile) {
res, err := inference.Infer(opt, id, adapterName, clusters, ts, as.AiCollectorAdapterMap, as.Storage, ctx) r := http.Request{}
_, err := inference.Infer(opt, id, adapterName, clusters, ts, as.AiCollectorAdapterMap, as.Storage, r.Context())
if err != nil { if err != nil {
logx.Errorf(err.Error())
return return
} }
fmt.Println(res)
} }
//func (a *AiService) AddCluster() error { //func (a *AiService) AddCluster() error {

View File

@ -63,18 +63,23 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st
} }
} }
var mutex sync.Mutex
errMap := make(map[string]string)
for _, cluster := range clusters { for _, cluster := range clusters {
wg.Add(1) wg.Add(1)
c := cluster c := cluster
go func() { go func() {
imageUrls, err := collectorMap[c.ClusterId].GetInferUrl(ctx, opt) imageUrls, err := collectorMap[c.ClusterId].GetInferUrl(ctx, opt)
for i, _ := range imageUrls {
imageUrls[i].Url = imageUrls[i].Url + storeLink.FORWARD_SLASH + "image"
}
if err != nil { if err != nil {
mutex.Lock()
errMap[c.ClusterId] = err.Error()
mutex.Unlock()
wg.Done() wg.Done()
return return
} }
for i, _ := range imageUrls {
imageUrls[i].Url = imageUrls[i].Url + storeLink.FORWARD_SLASH + "image"
}
clusterName, _ := storage.GetClusterNameById(c.ClusterId) clusterName, _ := storage.GetClusterNameById(c.ClusterId)
s := struct { s := struct {
@ -111,6 +116,9 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st
for _, t := range aiTaskList { for _, t := range aiTaskList {
t.Status = constants.Failed t.Status = constants.Failed
t.EndTime = time.Now().Format(time.RFC3339) t.EndTime = time.Now().Format(time.RFC3339)
if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok {
t.Msg = errMap[strconv.Itoa(int(t.ClusterId))]
}
err := storage.UpdateAiTask(t) err := storage.UpdateAiTask(t)
if err != nil { if err != nil {
logx.Errorf(err.Error()) logx.Errorf(err.Error())
@ -142,6 +150,9 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st
if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) {
t.Status = constants.Failed t.Status = constants.Failed
t.EndTime = time.Now().Format(time.RFC3339) t.EndTime = time.Now().Format(time.RFC3339)
if _, ok := errMap[strconv.Itoa(int(t.ClusterId))]; ok {
t.Msg = errMap[strconv.Itoa(int(t.ClusterId))]
}
err := storage.UpdateAiTask(t) err := storage.UpdateAiTask(t)
if err != nil { if err != nil {
logx.Errorf(err.Error()) logx.Errorf(err.Error())