From 0deaf16f7da5c2f4d796e0290c51f7e7643bb813 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 10 Jul 2024 18:38:16 +0800 Subject: [PATCH] fix imageinfer bugs Former-commit-id: c12a5a3dbac98fe9c2657af48f3b80c5482aac13 --- internal/cron/aiCronTask.go | 94 ++----------------- .../logic/inference/imageinferencelogic.go | 20 ++-- .../inference/texttotextinferencelogic.go | 12 +-- internal/scheduler/service/aiService.go | 65 ++++++++----- .../scheduler/service/collector/collector.go | 7 -- .../inference/{imageInfer.go => inference.go} | 31 +++--- .../service/updater/taskStatusSync.go | 2 +- internal/storeLink/modelarts.go | 7 +- internal/storeLink/octopus.go | 7 +- internal/storeLink/shuguangai.go | 7 +- 10 files changed, 104 insertions(+), 148 deletions(-) rename internal/scheduler/service/inference/{imageInfer.go => inference.go} (91%) diff --git a/internal/cron/aiCronTask.go b/internal/cron/aiCronTask.go index fd3edc62..4ec31db2 100644 --- a/internal/cron/aiCronTask.go +++ b/internal/cron/aiCronTask.go @@ -4,24 +4,9 @@ import ( "errors" "fmt" "github.com/zeromicro/go-zero/core/logx" - "github.com/zeromicro/go-zero/zrpc" - hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/config" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" - "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" - "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" - "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" - "strconv" -) - -const ( - OCTOPUS = "octopus" - MODELARTS = "modelarts" - SHUGUANGAI = "shuguangAi" ) func GetTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) { @@ -73,84 +58,24 @@ func UpdateAiAdapterMaps(svc *svc.ServiceContext) { continue } else { if isAdapterEmpty(svc, id) { - exeClusterMap, colClusterMap := InitAiClusterMap(&svc.Config, clusters.List) + exeClusterMap, colClusterMap, inferMap := service.InitAiClusterMap(&svc.Config, clusters.List) svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap + svc.Scheduler.AiService.InferenceAdapterMap[id] = inferMap } else { - UpdateClusterMaps(svc, id, clusters.List) + svc.Scheduler.AiService.UpdateClusterMaps(&svc.Config, id, clusters.List) } } } } -func UpdateClusterMaps(svc *svc.ServiceContext, adapterId string, clusters []types.ClusterInfo) { - for _, c := range clusters { - _, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] - _, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[adapterId][c.Id] - if !ok && !ok2 { - switch c.Name { - case OCTOPUS: - id, _ := strconv.ParseInt(c.Id, 10, 64) - octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(svc.Config.OctopusRpcConf)) - octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id) - svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus - svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus - case MODELARTS: - id, _ := strconv.ParseInt(c.Id, 10, 64) - modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(svc.Config.ModelArtsRpcConf)) - modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(svc.Config.ModelArtsImgRpcConf)) - modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname) - svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts - svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts - case SHUGUANGAI: - id, _ := strconv.ParseInt(c.Id, 10, 64) - aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(svc.Config.ACRpcConf)) - sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id) - svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai - svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai - } - } else { - continue - } - } - -} - -func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) { - executorMap := make(map[string]executor.AiExecutor) - collectorMap := make(map[string]collector.AiCollector) - for _, c := range clusters { - switch c.Name { - case OCTOPUS: - id, _ := strconv.ParseInt(c.Id, 10, 64) - octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf)) - octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id) - collectorMap[c.Id] = octopus - executorMap[c.Id] = octopus - case MODELARTS: - id, _ := strconv.ParseInt(c.Id, 10, 64) - modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf)) - modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf)) - modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname) - collectorMap[c.Id] = modelarts - executorMap[c.Id] = modelarts - case SHUGUANGAI: - id, _ := strconv.ParseInt(c.Id, 10, 64) - aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf)) - sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id) - collectorMap[c.Id] = sgai - executorMap[c.Id] = sgai - } - } - - return executorMap, collectorMap -} - func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool { emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id] cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id] - if ok && ok2 { - if len(emap) == clusterNum && len(cmap) == clusterNum { + imap, ok3 := svc.Scheduler.AiService.InferenceAdapterMap[id] + + if ok && ok2 && ok3 { + if len(emap) == clusterNum && len(cmap) == clusterNum && len(imap) == clusterNum { return true } } @@ -160,7 +85,8 @@ func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool { func isAdapterEmpty(svc *svc.ServiceContext, id string) bool { _, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id] _, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id] - if !ok && !ok2 { + _, ok3 := svc.Scheduler.AiService.InferenceAdapterMap[id] + if !ok && !ok2 && !ok3 { return true } return false diff --git a/internal/logic/inference/imageinferencelogic.go b/internal/logic/inference/imageinferencelogic.go index 1673aed1..9dbfc6f5 100644 --- a/internal/logic/inference/imageinferencelogic.go +++ b/internal/logic/inference/imageinferencelogic.go @@ -96,6 +96,12 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere return nil, errors.New("clusters is nil") } + for i := len(clusters) - 1; i >= 0; i-- { + if clusters[i].Replicas == 0 { + clusters = append(clusters[:i], clusters[i+1:]...) + } + } + //save task var synergystatus int64 if len(clusters) > 1 { @@ -117,12 +123,6 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "create", "任务创建中") - for i := len(clusters) - 1; i >= 0; i-- { - if clusters[i].Replicas == 0 { - clusters = append(clusters[:i], clusters[i+1:]...) - } - } - //save taskai for _, c := range clusters { clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(c.ClusterId) @@ -133,7 +133,13 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere } } - go l.svcCtx.Scheduler.AiService.ImageInfer(opt, id, adapterName, clusters, ts) + go func() { + r := http.Request{} + _, err := inference.ImageInfer(opt, id, adapterName, clusters, ts, l.svcCtx.Scheduler.AiService.InferenceAdapterMap, l.svcCtx.Scheduler.AiStorages, r.Context()) + if err != nil { + logx.Errorf(err.Error()) + } + }() return resp, nil } diff --git a/internal/logic/inference/texttotextinferencelogic.go b/internal/logic/inference/texttotextinferencelogic.go index 8bf9fcfd..83894dff 100644 --- a/internal/logic/inference/texttotextinferencelogic.go +++ b/internal/logic/inference/texttotextinferencelogic.go @@ -5,7 +5,7 @@ import ( "errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" @@ -61,23 +61,23 @@ func (l *TextToTextInferenceLogic) TextToTextInference(req *types.TextToTextInfe var wg sync.WaitGroup var cluster_ch = make(chan struct { - urls []*collector.InferUrl + urls []*inference.InferUrl clusterId string clusterName string }, len(opt.AiClusterIds)) var cs []struct { - urls []*collector.InferUrl + urls []*inference.InferUrl clusterId string clusterName string } - collectorMap := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[opt.AdapterId] + inferMap := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[opt.AdapterId] //save taskai for _, clusterId := range opt.AiClusterIds { wg.Add(1) go func(cId string) { - urls, err := collectorMap[cId].GetInferUrl(l.ctx, opt) + urls, err := inferMap[cId].GetInferUrl(l.ctx, opt) if err != nil { wg.Done() return @@ -85,7 +85,7 @@ func (l *TextToTextInferenceLogic) TextToTextInference(req *types.TextToTextInfe clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(cId) s := struct { - urls []*collector.InferUrl + urls []*inference.InferUrl clusterId string clusterName string }{ diff --git a/internal/scheduler/service/aiService.go b/internal/scheduler/service/aiService.go index d282ee04..4f9e6630 100644 --- a/internal/scheduler/service/aiService.go +++ b/internal/scheduler/service/aiService.go @@ -1,22 +1,18 @@ package service import ( - "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/zrpc" hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/config" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" - "net/http" "strconv" "sync" ) @@ -30,6 +26,7 @@ const ( type AiService struct { AiExecutorAdapterMap map[string]map[string]executor.AiExecutor AiCollectorAdapterMap map[string]map[string]collector.AiCollector + InferenceAdapterMap map[string]map[string]inference.Inference Storage *database.AiStorage mu sync.Mutex } @@ -43,6 +40,7 @@ func NewAiService(conf *config.Config, storages *database.AiStorage) (*AiService aiService := &AiService{ AiExecutorAdapterMap: make(map[string]map[string]executor.AiExecutor), AiCollectorAdapterMap: make(map[string]map[string]collector.AiCollector), + InferenceAdapterMap: make(map[string]map[string]inference.Inference), Storage: storages, } for _, id := range adapterIds { @@ -53,17 +51,19 @@ func NewAiService(conf *config.Config, storages *database.AiStorage) (*AiService if len(clusters.List) == 0 { continue } - exeClusterMap, colClusterMap := InitAiClusterMap(conf, clusters.List) + exeClusterMap, colClusterMap, inferMap := InitAiClusterMap(conf, clusters.List) aiService.AiExecutorAdapterMap[id] = exeClusterMap aiService.AiCollectorAdapterMap[id] = colClusterMap + aiService.InferenceAdapterMap[id] = inferMap } return aiService, nil } -func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) { +func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector, map[string]inference.Inference) { executorMap := make(map[string]executor.AiExecutor) collectorMap := make(map[string]collector.AiCollector) + inferenceMap := make(map[string]inference.Inference) for _, c := range clusters { switch c.Name { case OCTOPUS: @@ -72,6 +72,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id) collectorMap[c.Id] = octopus executorMap[c.Id] = octopus + inferenceMap[c.Id] = octopus case MODELARTS: id, _ := strconv.ParseInt(c.Id, 10, 64) modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf)) @@ -79,32 +80,52 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname) collectorMap[c.Id] = modelarts executorMap[c.Id] = modelarts + inferenceMap[c.Id] = modelarts case SHUGUANGAI: id, _ := strconv.ParseInt(c.Id, 10, 64) aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf)) sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id) collectorMap[c.Id] = sgai executorMap[c.Id] = sgai + inferenceMap[c.Id] = sgai } } - return executorMap, collectorMap + return executorMap, collectorMap, inferenceMap } -func (as *AiService) ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*inference.ImageFile) { - - r := http.Request{} - _, err := inference.Infer(opt, id, adapterName, clusters, ts, as.AiCollectorAdapterMap, as.Storage, r.Context()) - if err != nil { - logx.Errorf(err.Error()) - return +func (as *AiService) UpdateClusterMaps(conf *config.Config, adapterId string, clusters []types.ClusterInfo) { + for _, c := range clusters { + _, ok := as.AiExecutorAdapterMap[adapterId][c.Id] + _, ok2 := as.AiCollectorAdapterMap[adapterId][c.Id] + _, ok3 := as.InferenceAdapterMap[adapterId][c.Id] + if !ok && !ok2 && !ok3 { + switch c.Name { + case OCTOPUS: + id, _ := strconv.ParseInt(c.Id, 10, 64) + octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf)) + octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id) + as.AiExecutorAdapterMap[adapterId][c.Id] = octopus + as.AiCollectorAdapterMap[adapterId][c.Id] = octopus + as.InferenceAdapterMap[adapterId][c.Id] = octopus + case MODELARTS: + id, _ := strconv.ParseInt(c.Id, 10, 64) + modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf)) + modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf)) + modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname) + as.AiExecutorAdapterMap[adapterId][c.Id] = modelarts + as.AiCollectorAdapterMap[adapterId][c.Id] = modelarts + as.InferenceAdapterMap[adapterId][c.Id] = modelarts + case SHUGUANGAI: + id, _ := strconv.ParseInt(c.Id, 10, 64) + aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf)) + sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id) + as.AiExecutorAdapterMap[adapterId][c.Id] = sgai + as.AiCollectorAdapterMap[adapterId][c.Id] = sgai + as.InferenceAdapterMap[adapterId][c.Id] = sgai + } + } else { + continue + } } } - -//func (a *AiService) AddCluster() error { -// -//} -// -//func (a *AiService) AddAdapter() error { -// -//} diff --git a/internal/scheduler/service/collector/collector.go b/internal/scheduler/service/collector/collector.go index e1d1e6d8..c0bb35f1 100644 --- a/internal/scheduler/service/collector/collector.go +++ b/internal/scheduler/service/collector/collector.go @@ -2,7 +2,6 @@ package collector import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" ) type AiCollector interface { @@ -15,12 +14,6 @@ type AiCollector interface { UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error GetComputeCards(ctx context.Context) ([]string, error) GetUserBalance(ctx context.Context) (float64, error) - GetInferUrl(ctx context.Context, option *option.InferOption) ([]*InferUrl, error) -} - -type InferUrl struct { - Url string - Card string } type ResourceStats struct { diff --git a/internal/scheduler/service/inference/imageInfer.go b/internal/scheduler/service/inference/inference.go similarity index 91% rename from internal/scheduler/service/inference/imageInfer.go rename to internal/scheduler/service/inference/inference.go index a1480549..1361c3c5 100644 --- a/internal/scheduler/service/inference/imageInfer.go +++ b/internal/scheduler/service/inference/inference.go @@ -8,9 +8,7 @@ import ( "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" @@ -24,12 +22,21 @@ import ( "time" ) +type Inference interface { + GetInferUrl(ctx context.Context, option *option.InferOption) ([]*InferUrl, error) +} + +type InferUrl struct { + Url string + Card string +} + type ImageFile struct { ImageResult *types.ImageResult File multipart.File } -func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*ImageFile, aiCollectorAdapterMap map[string]map[string]collector.AiCollector, storage *database.AiStorage, ctx context.Context) ([]*types.ImageResult, error) { +func ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*ImageFile, inferAdapterMap map[string]map[string]Inference, storage *database.AiStorage, ctx context.Context) ([]*types.ImageResult, error) { //for i := len(clusters) - 1; i >= 0; i-- { // if clusters[i].Replicas == 0 { @@ -39,19 +46,19 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st var wg sync.WaitGroup var cluster_ch = make(chan struct { - urls []*collector.InferUrl + urls []*InferUrl clusterId string clusterName string imageNum int32 }, len(clusters)) var cs []struct { - urls []*collector.InferUrl + urls []*InferUrl clusterId string clusterName string imageNum int32 } - collectorMap := aiCollectorAdapterMap[opt.AdapterId] + inferMap := inferAdapterMap[opt.AdapterId] ////save taskai //for _, c := range clusters { @@ -69,7 +76,7 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st wg.Add(1) c := cluster go func() { - imageUrls, err := collectorMap[c.ClusterId].GetInferUrl(ctx, opt) + imageUrls, err := inferMap[c.ClusterId].GetInferUrl(ctx, opt) if err != nil { mutex.Lock() errMap[c.ClusterId] = err.Error() @@ -78,12 +85,12 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st return } for i, _ := range imageUrls { - imageUrls[i].Url = imageUrls[i].Url + storeLink.FORWARD_SLASH + "image" + imageUrls[i].Url = imageUrls[i].Url + "/" + "image" } clusterName, _ := storage.GetClusterNameById(c.ClusterId) s := struct { - urls []*collector.InferUrl + urls []*InferUrl clusterId string clusterName string imageNum int32 @@ -264,7 +271,7 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st } func sendInferReq(images []*ImageFile, cluster struct { - urls []*collector.InferUrl + urls []*InferUrl clusterId string clusterName string imageNum int32 @@ -272,7 +279,7 @@ func sendInferReq(images []*ImageFile, cluster struct { for _, image := range images { limit <- true go func(t *ImageFile, c struct { - urls []*collector.InferUrl + urls []*InferUrl clusterId string clusterName string imageNum int32 @@ -382,7 +389,7 @@ type Res struct { } func contains(cs []struct { - urls []*collector.InferUrl + urls []*InferUrl clusterId string clusterName string imageNum int32 diff --git a/internal/scheduler/service/updater/taskStatusSync.go b/internal/scheduler/service/updater/taskStatusSync.go index 3923e1eb..649fb616 100644 --- a/internal/scheduler/service/updater/taskStatusSync.go +++ b/internal/scheduler/service/updater/taskStatusSync.go @@ -149,7 +149,7 @@ func updateInferTaskStatus(svc *svc.ServiceContext, task types.TaskModel) { } if len(aiTask) == 0 { - task.Status = constants.Failed + //task.Status = constants.Failed err = svc.Scheduler.AiStorages.UpdateTask(&task) if err != nil { return diff --git a/internal/storeLink/modelarts.go b/internal/storeLink/modelarts.go index caa8ec24..b907707b 100644 --- a/internal/storeLink/modelarts.go +++ b/internal/storeLink/modelarts.go @@ -19,6 +19,7 @@ import ( "github.com/pkg/errors" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" @@ -378,8 +379,8 @@ func (m *ModelArtsLink) generateAlgorithmId(ctx context.Context, option *option. return errors.New("failed to get AlgorithmId") } -func (m *ModelArtsLink) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*collector.InferUrl, error) { - var imageUrls []*collector.InferUrl +func (m *ModelArtsLink) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*inference.InferUrl, error) { + var imageUrls []*inference.InferUrl urlReq := &modelartsclient.ImageReasoningUrlReq{ ModelName: option.ModelName, Type: option.ModelType, @@ -389,7 +390,7 @@ func (m *ModelArtsLink) GetInferUrl(ctx context.Context, option *option.InferOpt if err != nil { return nil, err } - imageUrl := &collector.InferUrl{ + imageUrl := &inference.InferUrl{ Url: urlResp.Url, Card: "npu", } diff --git a/internal/storeLink/octopus.go b/internal/storeLink/octopus.go index 2813d048..38408ca6 100644 --- a/internal/storeLink/octopus.go +++ b/internal/storeLink/octopus.go @@ -19,6 +19,7 @@ import ( "errors" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-octopus/octopus" @@ -871,7 +872,7 @@ func setResourceIdByCard(option *option.AiOption, specs *octopus.GetResourceSpec return errors.New("set ResourceId error") } -func (o *OctopusLink) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*collector.InferUrl, error) { +func (o *OctopusLink) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*inference.InferUrl, error) { req := &octopus.GetNotebookListReq{ Platform: o.platform, PageIndex: o.pageIndex, @@ -882,12 +883,12 @@ func (o *OctopusLink) GetInferUrl(ctx context.Context, option *option.InferOptio return nil, err } - var imageUrls []*collector.InferUrl + var imageUrls []*inference.InferUrl for _, notebook := range list.Payload.GetNotebooks() { if strings.Contains(notebook.AlgorithmName, option.ModelName) && notebook.Status == "running" { url := strings.Replace(notebook.Tasks[0].Url, FORWARD_SLASH, "", -1) names := strings.Split(notebook.AlgorithmName, UNDERSCORE) - imageUrl := &collector.InferUrl{ + imageUrl := &inference.InferUrl{ Url: DOMAIN + url, Card: names[2], } diff --git a/internal/storeLink/shuguangai.go b/internal/storeLink/shuguangai.go index b44a0c1b..cb41ba11 100644 --- a/internal/storeLink/shuguangai.go +++ b/internal/storeLink/shuguangai.go @@ -22,6 +22,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "strconv" @@ -730,8 +731,8 @@ func (s *ShuguangAi) generateParams(option *option.AiOption) error { return errors.New("failed to set params") } -func (s *ShuguangAi) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*collector.InferUrl, error) { - var imageUrls []*collector.InferUrl +func (s *ShuguangAi) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*inference.InferUrl, error) { + var imageUrls []*inference.InferUrl urlReq := &hpcAC.GetInferUrlReq{ ModelName: option.ModelName, @@ -743,7 +744,7 @@ func (s *ShuguangAi) GetInferUrl(ctx context.Context, option *option.InferOption if err != nil { return nil, err } - imageUrl := &collector.InferUrl{ + imageUrl := &inference.InferUrl{ Url: urlResp.Url, Card: "dcu", }