fix imageinfer bugs

Former-commit-id: c12a5a3dbac98fe9c2657af48f3b80c5482aac13
This commit is contained in:
tzwang 2024-07-10 18:38:16 +08:00
parent 1f19eda9fb
commit 0deaf16f7d
10 changed files with 104 additions and 148 deletions

View File

@ -4,24 +4,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/zrpc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/config"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
"strconv"
)
const (
OCTOPUS = "octopus"
MODELARTS = "modelarts"
SHUGUANGAI = "shuguangAi"
) )
func GetTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) { func GetTaskList(svc *svc.ServiceContext) ([]*types.TaskModel, error) {
@ -73,84 +58,24 @@ func UpdateAiAdapterMaps(svc *svc.ServiceContext) {
continue continue
} else { } else {
if isAdapterEmpty(svc, id) { if isAdapterEmpty(svc, id) {
exeClusterMap, colClusterMap := InitAiClusterMap(&svc.Config, clusters.List) exeClusterMap, colClusterMap, inferMap := service.InitAiClusterMap(&svc.Config, clusters.List)
svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap svc.Scheduler.AiService.AiExecutorAdapterMap[id] = exeClusterMap
svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap svc.Scheduler.AiService.AiCollectorAdapterMap[id] = colClusterMap
svc.Scheduler.AiService.InferenceAdapterMap[id] = inferMap
} else { } else {
UpdateClusterMaps(svc, id, clusters.List) svc.Scheduler.AiService.UpdateClusterMaps(&svc.Config, id, clusters.List)
} }
} }
} }
} }
func UpdateClusterMaps(svc *svc.ServiceContext, adapterId string, clusters []types.ClusterInfo) {
for _, c := range clusters {
_, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id]
_, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[adapterId][c.Id]
if !ok && !ok2 {
switch c.Name {
case OCTOPUS:
id, _ := strconv.ParseInt(c.Id, 10, 64)
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(svc.Config.OctopusRpcConf))
octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = octopus
case MODELARTS:
id, _ := strconv.ParseInt(c.Id, 10, 64)
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(svc.Config.ModelArtsRpcConf))
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(svc.Config.ModelArtsImgRpcConf))
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = modelarts
case SHUGUANGAI:
id, _ := strconv.ParseInt(c.Id, 10, 64)
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(svc.Config.ACRpcConf))
sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai
svc.Scheduler.AiService.AiExecutorAdapterMap[adapterId][c.Id] = sgai
}
} else {
continue
}
}
}
func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) {
executorMap := make(map[string]executor.AiExecutor)
collectorMap := make(map[string]collector.AiCollector)
for _, c := range clusters {
switch c.Name {
case OCTOPUS:
id, _ := strconv.ParseInt(c.Id, 10, 64)
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf))
octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
collectorMap[c.Id] = octopus
executorMap[c.Id] = octopus
case MODELARTS:
id, _ := strconv.ParseInt(c.Id, 10, 64)
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf))
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
collectorMap[c.Id] = modelarts
executorMap[c.Id] = modelarts
case SHUGUANGAI:
id, _ := strconv.ParseInt(c.Id, 10, 64)
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf))
sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
collectorMap[c.Id] = sgai
executorMap[c.Id] = sgai
}
}
return executorMap, collectorMap
}
func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool { func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool {
emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id] emap, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id] cmap, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
if ok && ok2 { imap, ok3 := svc.Scheduler.AiService.InferenceAdapterMap[id]
if len(emap) == clusterNum && len(cmap) == clusterNum {
if ok && ok2 && ok3 {
if len(emap) == clusterNum && len(cmap) == clusterNum && len(imap) == clusterNum {
return true return true
} }
} }
@ -160,7 +85,8 @@ func isAdapterExist(svc *svc.ServiceContext, id string, clusterNum int) bool {
func isAdapterEmpty(svc *svc.ServiceContext, id string) bool { func isAdapterEmpty(svc *svc.ServiceContext, id string) bool {
_, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id] _, ok := svc.Scheduler.AiService.AiExecutorAdapterMap[id]
_, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id] _, ok2 := svc.Scheduler.AiService.AiCollectorAdapterMap[id]
if !ok && !ok2 { _, ok3 := svc.Scheduler.AiService.InferenceAdapterMap[id]
if !ok && !ok2 && !ok3 {
return true return true
} }
return false return false

View File

@ -96,6 +96,12 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere
return nil, errors.New("clusters is nil") return nil, errors.New("clusters is nil")
} }
for i := len(clusters) - 1; i >= 0; i-- {
if clusters[i].Replicas == 0 {
clusters = append(clusters[:i], clusters[i+1:]...)
}
}
//save task //save task
var synergystatus int64 var synergystatus int64
if len(clusters) > 1 { if len(clusters) > 1 {
@ -117,12 +123,6 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere
l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "create", "任务创建中") l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(opt.AdapterId, adapterName, "", "", opt.TaskName, "create", "任务创建中")
for i := len(clusters) - 1; i >= 0; i-- {
if clusters[i].Replicas == 0 {
clusters = append(clusters[:i], clusters[i+1:]...)
}
}
//save taskai //save taskai
for _, c := range clusters { for _, c := range clusters {
clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(c.ClusterId) clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(c.ClusterId)
@ -133,7 +133,13 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere
} }
} }
go l.svcCtx.Scheduler.AiService.ImageInfer(opt, id, adapterName, clusters, ts) go func() {
r := http.Request{}
_, err := inference.ImageInfer(opt, id, adapterName, clusters, ts, l.svcCtx.Scheduler.AiService.InferenceAdapterMap, l.svcCtx.Scheduler.AiStorages, r.Context())
if err != nil {
logx.Errorf(err.Error())
}
}()
return resp, nil return resp, nil
} }

View File

@ -5,7 +5,7 @@ import (
"errors" "errors"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
@ -61,23 +61,23 @@ func (l *TextToTextInferenceLogic) TextToTextInference(req *types.TextToTextInfe
var wg sync.WaitGroup var wg sync.WaitGroup
var cluster_ch = make(chan struct { var cluster_ch = make(chan struct {
urls []*collector.InferUrl urls []*inference.InferUrl
clusterId string clusterId string
clusterName string clusterName string
}, len(opt.AiClusterIds)) }, len(opt.AiClusterIds))
var cs []struct { var cs []struct {
urls []*collector.InferUrl urls []*inference.InferUrl
clusterId string clusterId string
clusterName string clusterName string
} }
collectorMap := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[opt.AdapterId] inferMap := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[opt.AdapterId]
//save taskai //save taskai
for _, clusterId := range opt.AiClusterIds { for _, clusterId := range opt.AiClusterIds {
wg.Add(1) wg.Add(1)
go func(cId string) { go func(cId string) {
urls, err := collectorMap[cId].GetInferUrl(l.ctx, opt) urls, err := inferMap[cId].GetInferUrl(l.ctx, opt)
if err != nil { if err != nil {
wg.Done() wg.Done()
return return
@ -85,7 +85,7 @@ func (l *TextToTextInferenceLogic) TextToTextInference(req *types.TextToTextInfe
clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(cId) clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(cId)
s := struct { s := struct {
urls []*collector.InferUrl urls []*inference.InferUrl
clusterId string clusterId string
clusterName string clusterName string
}{ }{

View File

@ -1,22 +1,18 @@
package service package service
import ( import (
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/zrpc" "github.com/zeromicro/go-zero/zrpc"
hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient" hpcacclient "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/config" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/config"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
"net/http"
"strconv" "strconv"
"sync" "sync"
) )
@ -30,6 +26,7 @@ const (
type AiService struct { type AiService struct {
AiExecutorAdapterMap map[string]map[string]executor.AiExecutor AiExecutorAdapterMap map[string]map[string]executor.AiExecutor
AiCollectorAdapterMap map[string]map[string]collector.AiCollector AiCollectorAdapterMap map[string]map[string]collector.AiCollector
InferenceAdapterMap map[string]map[string]inference.Inference
Storage *database.AiStorage Storage *database.AiStorage
mu sync.Mutex mu sync.Mutex
} }
@ -43,6 +40,7 @@ func NewAiService(conf *config.Config, storages *database.AiStorage) (*AiService
aiService := &AiService{ aiService := &AiService{
AiExecutorAdapterMap: make(map[string]map[string]executor.AiExecutor), AiExecutorAdapterMap: make(map[string]map[string]executor.AiExecutor),
AiCollectorAdapterMap: make(map[string]map[string]collector.AiCollector), AiCollectorAdapterMap: make(map[string]map[string]collector.AiCollector),
InferenceAdapterMap: make(map[string]map[string]inference.Inference),
Storage: storages, Storage: storages,
} }
for _, id := range adapterIds { for _, id := range adapterIds {
@ -53,17 +51,19 @@ func NewAiService(conf *config.Config, storages *database.AiStorage) (*AiService
if len(clusters.List) == 0 { if len(clusters.List) == 0 {
continue continue
} }
exeClusterMap, colClusterMap := InitAiClusterMap(conf, clusters.List) exeClusterMap, colClusterMap, inferMap := InitAiClusterMap(conf, clusters.List)
aiService.AiExecutorAdapterMap[id] = exeClusterMap aiService.AiExecutorAdapterMap[id] = exeClusterMap
aiService.AiCollectorAdapterMap[id] = colClusterMap aiService.AiCollectorAdapterMap[id] = colClusterMap
aiService.InferenceAdapterMap[id] = inferMap
} }
return aiService, nil return aiService, nil
} }
func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector) { func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector, map[string]inference.Inference) {
executorMap := make(map[string]executor.AiExecutor) executorMap := make(map[string]executor.AiExecutor)
collectorMap := make(map[string]collector.AiCollector) collectorMap := make(map[string]collector.AiCollector)
inferenceMap := make(map[string]inference.Inference)
for _, c := range clusters { for _, c := range clusters {
switch c.Name { switch c.Name {
case OCTOPUS: case OCTOPUS:
@ -72,6 +72,7 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id) octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
collectorMap[c.Id] = octopus collectorMap[c.Id] = octopus
executorMap[c.Id] = octopus executorMap[c.Id] = octopus
inferenceMap[c.Id] = octopus
case MODELARTS: case MODELARTS:
id, _ := strconv.ParseInt(c.Id, 10, 64) id, _ := strconv.ParseInt(c.Id, 10, 64)
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf)) modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
@ -79,32 +80,52 @@ func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[st
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname) modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
collectorMap[c.Id] = modelarts collectorMap[c.Id] = modelarts
executorMap[c.Id] = modelarts executorMap[c.Id] = modelarts
inferenceMap[c.Id] = modelarts
case SHUGUANGAI: case SHUGUANGAI:
id, _ := strconv.ParseInt(c.Id, 10, 64) id, _ := strconv.ParseInt(c.Id, 10, 64)
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf)) aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf))
sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id) sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
collectorMap[c.Id] = sgai collectorMap[c.Id] = sgai
executorMap[c.Id] = sgai executorMap[c.Id] = sgai
inferenceMap[c.Id] = sgai
} }
} }
return executorMap, collectorMap return executorMap, collectorMap, inferenceMap
} }
func (as *AiService) ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*inference.ImageFile) { func (as *AiService) UpdateClusterMaps(conf *config.Config, adapterId string, clusters []types.ClusterInfo) {
for _, c := range clusters {
r := http.Request{} _, ok := as.AiExecutorAdapterMap[adapterId][c.Id]
_, err := inference.Infer(opt, id, adapterName, clusters, ts, as.AiCollectorAdapterMap, as.Storage, r.Context()) _, ok2 := as.AiCollectorAdapterMap[adapterId][c.Id]
if err != nil { _, ok3 := as.InferenceAdapterMap[adapterId][c.Id]
logx.Errorf(err.Error()) if !ok && !ok2 && !ok3 {
return switch c.Name {
case OCTOPUS:
id, _ := strconv.ParseInt(c.Id, 10, 64)
octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf))
octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
as.AiExecutorAdapterMap[adapterId][c.Id] = octopus
as.AiCollectorAdapterMap[adapterId][c.Id] = octopus
as.InferenceAdapterMap[adapterId][c.Id] = octopus
case MODELARTS:
id, _ := strconv.ParseInt(c.Id, 10, 64)
modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf))
modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
as.AiExecutorAdapterMap[adapterId][c.Id] = modelarts
as.AiCollectorAdapterMap[adapterId][c.Id] = modelarts
as.InferenceAdapterMap[adapterId][c.Id] = modelarts
case SHUGUANGAI:
id, _ := strconv.ParseInt(c.Id, 10, 64)
aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf))
sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
as.AiExecutorAdapterMap[adapterId][c.Id] = sgai
as.AiCollectorAdapterMap[adapterId][c.Id] = sgai
as.InferenceAdapterMap[adapterId][c.Id] = sgai
}
} else {
continue
}
} }
} }
//func (a *AiService) AddCluster() error {
//
//}
//
//func (a *AiService) AddAdapter() error {
//
//}

View File

@ -2,7 +2,6 @@ package collector
import ( import (
"context" "context"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
) )
type AiCollector interface { type AiCollector interface {
@ -15,12 +14,6 @@ type AiCollector interface {
UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error
GetComputeCards(ctx context.Context) ([]string, error) GetComputeCards(ctx context.Context) ([]string, error)
GetUserBalance(ctx context.Context) (float64, error) GetUserBalance(ctx context.Context) (float64, error)
GetInferUrl(ctx context.Context, option *option.InferOption) ([]*InferUrl, error)
}
type InferUrl struct {
Url string
Card string
} }
type ResourceStats struct { type ResourceStats struct {

View File

@ -8,9 +8,7 @@ import (
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
@ -24,12 +22,21 @@ import (
"time" "time"
) )
type Inference interface {
GetInferUrl(ctx context.Context, option *option.InferOption) ([]*InferUrl, error)
}
type InferUrl struct {
Url string
Card string
}
type ImageFile struct { type ImageFile struct {
ImageResult *types.ImageResult ImageResult *types.ImageResult
File multipart.File File multipart.File
} }
func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*ImageFile, aiCollectorAdapterMap map[string]map[string]collector.AiCollector, storage *database.AiStorage, ctx context.Context) ([]*types.ImageResult, error) { func ImageInfer(opt *option.InferOption, id int64, adapterName string, clusters []*strategy.AssignedCluster, ts []*ImageFile, inferAdapterMap map[string]map[string]Inference, storage *database.AiStorage, ctx context.Context) ([]*types.ImageResult, error) {
//for i := len(clusters) - 1; i >= 0; i-- { //for i := len(clusters) - 1; i >= 0; i-- {
// if clusters[i].Replicas == 0 { // if clusters[i].Replicas == 0 {
@ -39,19 +46,19 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st
var wg sync.WaitGroup var wg sync.WaitGroup
var cluster_ch = make(chan struct { var cluster_ch = make(chan struct {
urls []*collector.InferUrl urls []*InferUrl
clusterId string clusterId string
clusterName string clusterName string
imageNum int32 imageNum int32
}, len(clusters)) }, len(clusters))
var cs []struct { var cs []struct {
urls []*collector.InferUrl urls []*InferUrl
clusterId string clusterId string
clusterName string clusterName string
imageNum int32 imageNum int32
} }
collectorMap := aiCollectorAdapterMap[opt.AdapterId] inferMap := inferAdapterMap[opt.AdapterId]
////save taskai ////save taskai
//for _, c := range clusters { //for _, c := range clusters {
@ -69,7 +76,7 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st
wg.Add(1) wg.Add(1)
c := cluster c := cluster
go func() { go func() {
imageUrls, err := collectorMap[c.ClusterId].GetInferUrl(ctx, opt) imageUrls, err := inferMap[c.ClusterId].GetInferUrl(ctx, opt)
if err != nil { if err != nil {
mutex.Lock() mutex.Lock()
errMap[c.ClusterId] = err.Error() errMap[c.ClusterId] = err.Error()
@ -78,12 +85,12 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st
return return
} }
for i, _ := range imageUrls { for i, _ := range imageUrls {
imageUrls[i].Url = imageUrls[i].Url + storeLink.FORWARD_SLASH + "image" imageUrls[i].Url = imageUrls[i].Url + "/" + "image"
} }
clusterName, _ := storage.GetClusterNameById(c.ClusterId) clusterName, _ := storage.GetClusterNameById(c.ClusterId)
s := struct { s := struct {
urls []*collector.InferUrl urls []*InferUrl
clusterId string clusterId string
clusterName string clusterName string
imageNum int32 imageNum int32
@ -264,7 +271,7 @@ func Infer(opt *option.InferOption, id int64, adapterName string, clusters []*st
} }
func sendInferReq(images []*ImageFile, cluster struct { func sendInferReq(images []*ImageFile, cluster struct {
urls []*collector.InferUrl urls []*InferUrl
clusterId string clusterId string
clusterName string clusterName string
imageNum int32 imageNum int32
@ -272,7 +279,7 @@ func sendInferReq(images []*ImageFile, cluster struct {
for _, image := range images { for _, image := range images {
limit <- true limit <- true
go func(t *ImageFile, c struct { go func(t *ImageFile, c struct {
urls []*collector.InferUrl urls []*InferUrl
clusterId string clusterId string
clusterName string clusterName string
imageNum int32 imageNum int32
@ -382,7 +389,7 @@ type Res struct {
} }
func contains(cs []struct { func contains(cs []struct {
urls []*collector.InferUrl urls []*InferUrl
clusterId string clusterId string
clusterName string clusterName string
imageNum int32 imageNum int32

View File

@ -149,7 +149,7 @@ func updateInferTaskStatus(svc *svc.ServiceContext, task types.TaskModel) {
} }
if len(aiTask) == 0 { if len(aiTask) == 0 {
task.Status = constants.Failed //task.Status = constants.Failed
err = svc.Scheduler.AiStorages.UpdateTask(&task) err = svc.Scheduler.AiStorages.UpdateTask(&task)
if err != nil { if err != nil {
return return

View File

@ -19,6 +19,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/timeutils"
@ -378,8 +379,8 @@ func (m *ModelArtsLink) generateAlgorithmId(ctx context.Context, option *option.
return errors.New("failed to get AlgorithmId") return errors.New("failed to get AlgorithmId")
} }
func (m *ModelArtsLink) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*collector.InferUrl, error) { func (m *ModelArtsLink) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*inference.InferUrl, error) {
var imageUrls []*collector.InferUrl var imageUrls []*inference.InferUrl
urlReq := &modelartsclient.ImageReasoningUrlReq{ urlReq := &modelartsclient.ImageReasoningUrlReq{
ModelName: option.ModelName, ModelName: option.ModelName,
Type: option.ModelType, Type: option.ModelType,
@ -389,7 +390,7 @@ func (m *ModelArtsLink) GetInferUrl(ctx context.Context, option *option.InferOpt
if err != nil { if err != nil {
return nil, err return nil, err
} }
imageUrl := &collector.InferUrl{ imageUrl := &inference.InferUrl{
Url: urlResp.Url, Url: urlResp.Url,
Card: "npu", Card: "npu",
} }

View File

@ -19,6 +19,7 @@ import (
"errors" "errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus" "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
@ -871,7 +872,7 @@ func setResourceIdByCard(option *option.AiOption, specs *octopus.GetResourceSpec
return errors.New("set ResourceId error") return errors.New("set ResourceId error")
} }
func (o *OctopusLink) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*collector.InferUrl, error) { func (o *OctopusLink) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*inference.InferUrl, error) {
req := &octopus.GetNotebookListReq{ req := &octopus.GetNotebookListReq{
Platform: o.platform, Platform: o.platform,
PageIndex: o.pageIndex, PageIndex: o.pageIndex,
@ -882,12 +883,12 @@ func (o *OctopusLink) GetInferUrl(ctx context.Context, option *option.InferOptio
return nil, err return nil, err
} }
var imageUrls []*collector.InferUrl var imageUrls []*inference.InferUrl
for _, notebook := range list.Payload.GetNotebooks() { for _, notebook := range list.Payload.GetNotebooks() {
if strings.Contains(notebook.AlgorithmName, option.ModelName) && notebook.Status == "running" { if strings.Contains(notebook.AlgorithmName, option.ModelName) && notebook.Status == "running" {
url := strings.Replace(notebook.Tasks[0].Url, FORWARD_SLASH, "", -1) url := strings.Replace(notebook.Tasks[0].Url, FORWARD_SLASH, "", -1)
names := strings.Split(notebook.AlgorithmName, UNDERSCORE) names := strings.Split(notebook.AlgorithmName, UNDERSCORE)
imageUrl := &collector.InferUrl{ imageUrl := &inference.InferUrl{
Url: DOMAIN + url, Url: DOMAIN + url,
Card: names[2], Card: names[2],
} }

View File

@ -22,6 +22,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"strconv" "strconv"
@ -730,8 +731,8 @@ func (s *ShuguangAi) generateParams(option *option.AiOption) error {
return errors.New("failed to set params") return errors.New("failed to set params")
} }
func (s *ShuguangAi) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*collector.InferUrl, error) { func (s *ShuguangAi) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*inference.InferUrl, error) {
var imageUrls []*collector.InferUrl var imageUrls []*inference.InferUrl
urlReq := &hpcAC.GetInferUrlReq{ urlReq := &hpcAC.GetInferUrlReq{
ModelName: option.ModelName, ModelName: option.ModelName,
@ -743,7 +744,7 @@ func (s *ShuguangAi) GetInferUrl(ctx context.Context, option *option.InferOption
if err != nil { if err != nil {
return nil, err return nil, err
} }
imageUrl := &collector.InferUrl{ imageUrl := &inference.InferUrl{
Url: urlResp.Url, Url: urlResp.Url,
Card: "dcu", Card: "dcu",
} }