diff --git a/go.mod b/go.mod index e58681a7..2c0518cb 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( github.com/zeromicro/go-zero v1.6.6 gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1 gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240724095608-1727d09f030c - gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240718073732-bc5d687f6330 + gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240815070729-eeab0822f4fe gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 gitlink.org.cn/jcce-pcm/pcm-participant-ceph v0.0.0-20230904090036-24fc730ec87d diff --git a/go.sum b/go.sum index 18d10648..7e63e8c0 100644 --- a/go.sum +++ b/go.sum @@ -475,8 +475,8 @@ gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1 h1:DicBXoQiC gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240724095608-1727d09f030c h1:CodJeGgTYJwA6NDHFnw6B+4pBXUl79tvAcECq39tgZI= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240724095608-1727d09f030c/go.mod h1:/eOmBFZKWGoabG3sRVkVvIbLwsd2631k4jkUBR6x1AA= -gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240718073732-bc5d687f6330 h1:WxPrFSO6LjDCr+k7nmNFlPst8CtoTHQ2iSjv+D2rNnM= -gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240718073732-bc5d687f6330/go.mod h1:QOD5+/l2D+AYBjF2h5T0mdJyfGAmF78QmeKdbBXbjLQ= +gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240815070729-eeab0822f4fe h1:4zBOROvGGzmS1p/cuCVKE0d2tIqhkHNzpSspizZ4G2Y= +gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240815070729-eeab0822f4fe/go.mod h1:QOD5+/l2D+AYBjF2h5T0mdJyfGAmF78QmeKdbBXbjLQ= gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 h1:s6PsZ1+bev294IWdZRlV7mnOwI1+UzFcldVW/BqhQzI= gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203/go.mod h1:i2rrbMQ+Fve345BY9Heh4MUqVTAimZQElQhzzRee5B8= gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 h1:+/5vnzkJBfMRnya1NrhOzlroUtRa5ePiYbPKlHLoLV0= diff --git a/internal/scheduler/service/inference/imageInference/imageInference.go b/internal/scheduler/service/inference/imageInference/imageInference.go index 7ac6ec66..ea569056 100644 --- a/internal/scheduler/service/inference/imageInference/imageInference.go +++ b/internal/scheduler/service/inference/imageInference/imageInference.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" @@ -37,6 +38,7 @@ type FilteredCluster struct { urls []*inference.InferUrl clusterId string clusterName string + clusterType string imageNum int32 } @@ -156,7 +158,7 @@ func (i *ImageInference) filterClusters() ([]*FilteredCluster, error) { c := cluster go func() { r := http.Request{} - imageUrls, err := inferMap[c.ClusterId].GetInferUrl(r.Context(), i.opt) + clusterInferUrl, err := inferMap[c.ClusterId].GetClusterInferUrl(r.Context(), i.opt) if err != nil { mutex.Lock() i.errMap[c.ClusterId] = err.Error() @@ -165,14 +167,13 @@ func (i *ImageInference) filterClusters() ([]*FilteredCluster, error) { return } - i.inference.AppendRoute(imageUrls) - - clusterName, _ := i.storage.GetClusterNameById(c.ClusterId) + i.inference.AppendRoute(clusterInferUrl.InferUrls) var f FilteredCluster - f.urls = imageUrls + f.urls = clusterInferUrl.InferUrls f.clusterId = c.ClusterId - f.clusterName = clusterName + f.clusterName = clusterInferUrl.ClusterName + f.clusterType = clusterInferUrl.ClusterType f.imageNum = c.Replicas ch <- &f @@ -206,7 +207,7 @@ func (i *ImageInference) inferImages(cs []*FilteredCluster) ([]*types.ImageResul imageNumIdx = imageNumIdx + c.imageNum wg.Add(len(new_images)) - go sendInferReq(new_images, c, &wg, ch, limit) + go i.sendInferReq(new_images, c, &wg, ch, limit) } wg.Wait() close(ch) @@ -300,12 +301,12 @@ func (i *ImageInference) updateStatus(aiTaskList []*models.TaskAi, cs []*Filtere return nil } -func sendInferReq(images []*ImageFile, cluster *FilteredCluster, wg *sync.WaitGroup, ch chan<- *types.ImageResult, limit chan bool) { +func (i *ImageInference) sendInferReq(images []*ImageFile, cluster *FilteredCluster, wg *sync.WaitGroup, ch chan<- *types.ImageResult, limit chan bool) { for _, image := range images { limit <- true go func(t *ImageFile, c *FilteredCluster) { if len(c.urls) == 1 { - r, err := getInferResult(c.urls[0].Url, t.File, t.ImageResult.ImageName, c.clusterName) + r, err := getInferResult(c.urls[0].Url, t.File, t.ImageResult.ImageName, c.clusterId, c.clusterType, i.inferAdapter, i.opt.AdapterId) if err != nil { t.ImageResult.ImageResult = err.Error() t.ImageResult.ClusterId = c.clusterId @@ -327,7 +328,7 @@ func sendInferReq(images []*ImageFile, cluster *FilteredCluster, wg *sync.WaitGr return } else { idx := rand.Intn(len(c.urls)) - r, err := getInferResult(c.urls[idx].Url, t.File, t.ImageResult.ImageName, c.clusterName) + r, err := getInferResult(c.urls[idx].Url, t.File, t.ImageResult.ImageName, c.clusterId, c.clusterType, i.inferAdapter, i.opt.AdapterId) if err != nil { t.ImageResult.ImageResult = err.Error() t.ImageResult.ClusterId = c.clusterId @@ -404,24 +405,35 @@ func (i *ImageInference) saveAiSubTasks(id int64, aiTaskList []*models.TaskAi, c return nil } -func getInferResult(url string, file multipart.File, fileName string, clusterName string) (string, error) { - if clusterName == "鹏城云脑II-modelarts" { +func getInferResult(url string, file multipart.File, fileName string, clusterId string, clusterType string, inferAdapter map[string]map[string]inference.ICluster, adapterId string) (string, error) { + inferMap := inferAdapter[adapterId] + + switch clusterType { + case storeLink.TYPE_OCTOPUS: + r := http.Request{} + result, err := inferMap[clusterId].GetInferResult(r.Context(), url, file, fileName) + if err != nil { + return "", err + } + return result, nil + case storeLink.TYPE_MODELARTS: r, err := getInferResultModelarts(url, file, fileName) if err != nil { return "", err } return r, nil + default: + var res Res + req := GetRestyRequest(20) + _, err := req. + SetFileReader("file", fileName, file). + SetResult(&res). + Post(url) + if err != nil { + return "", err + } + return res.Result, nil } - var res Res - req := GetRestyRequest(20) - _, err := req. - SetFileReader("file", fileName, file). - SetResult(&res). - Post(url) - if err != nil { - return "", err - } - return res.Result, nil } func getInferResultModelarts(url string, file multipart.File, fileName string) (string, error) { diff --git a/internal/scheduler/service/inference/inference.go b/internal/scheduler/service/inference/inference.go index e052230c..59d8273a 100644 --- a/internal/scheduler/service/inference/inference.go +++ b/internal/scheduler/service/inference/inference.go @@ -3,6 +3,7 @@ package inference import ( "context" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" + "mime/multipart" ) const ( @@ -10,7 +11,8 @@ const ( ) type ICluster interface { - GetInferUrl(ctx context.Context, option *option.InferOption) ([]*InferUrl, error) + GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*ClusterInferUrl, error) + GetInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) GetInferDeployInstanceList(ctx context.Context) ([]*DeployInstance, error) StartInferDeployInstance(ctx context.Context, id string) bool StopInferDeployInstance(ctx context.Context, id string) bool @@ -26,6 +28,12 @@ type Inference struct { In IInference } +type ClusterInferUrl struct { + ClusterName string + ClusterType string + InferUrls []*InferUrl +} + type InferUrl struct { Url string Card string diff --git a/internal/scheduler/service/inference/textInference/textToText.go b/internal/scheduler/service/inference/textInference/textToText.go index 9d9beb71..b795e766 100644 --- a/internal/scheduler/service/inference/textInference/textToText.go +++ b/internal/scheduler/service/inference/textInference/textToText.go @@ -74,18 +74,18 @@ func filterClusters(opt *option.InferOption, storage *database.AiStorage, inferA wg.Add(1) go func(cId string) { r := http.Request{} - urls, err := inferMap[cId].GetInferUrl(r.Context(), opt) + clusterInferUrl, err := inferMap[cId].GetClusterInferUrl(r.Context(), opt) if err != nil { wg.Done() return } - for i, _ := range urls { - urls[i].Url = urls[i].Url + inference.FORWARD_SLASH + CHAT + for i, _ := range clusterInferUrl.InferUrls { + clusterInferUrl.InferUrls[i].Url = clusterInferUrl.InferUrls[i].Url + inference.FORWARD_SLASH + CHAT } clusterName, _ := storage.GetClusterNameById(cId) var f FilteredCluster - f.urls = urls + f.urls = clusterInferUrl.InferUrls f.clusterId = cId f.clusterName = clusterName diff --git a/internal/storeLink/modelarts.go b/internal/storeLink/modelarts.go index aae89de0..21266dc5 100644 --- a/internal/storeLink/modelarts.go +++ b/internal/storeLink/modelarts.go @@ -27,6 +27,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts" modelartsclient "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts" + "mime/multipart" "strconv" "strings" "time" @@ -427,7 +428,7 @@ func (m *ModelArtsLink) generateAlgorithmId(ctx context.Context, option *option. return errors.New("failed to get AlgorithmId") } -func (m *ModelArtsLink) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*inference.InferUrl, error) { +func (m *ModelArtsLink) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) { var imageUrls []*inference.InferUrl urlReq := &modelartsclient.ImageReasoningUrlReq{ ServiceName: option.ModelName, @@ -444,7 +445,12 @@ func (m *ModelArtsLink) GetInferUrl(ctx context.Context, option *option.InferOpt } imageUrls = append(imageUrls, imageUrl) - return imageUrls, nil + clusterWithUrl := &inference.ClusterInferUrl{ + ClusterName: m.platform, + ClusterType: TYPE_MODELARTS, + InferUrls: imageUrls, + } + return clusterWithUrl, nil } func (m *ModelArtsLink) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) { @@ -529,3 +535,7 @@ func (m *ModelArtsLink) GetInferDeployInstance(ctx context.Context, id string) ( ins.ClusterType = TYPE_MODELARTS return ins, nil } + +func (m *ModelArtsLink) GetInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) { + return "", nil +} diff --git a/internal/storeLink/octopus.go b/internal/storeLink/octopus.go index 82ae7efd..be23deea 100644 --- a/internal/storeLink/octopus.go +++ b/internal/storeLink/octopus.go @@ -24,7 +24,10 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-octopus/octopus" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" + "io/ioutil" + "log" "math" + "mime/multipart" "strconv" "strings" "time" @@ -872,7 +875,7 @@ func setResourceIdByCard(option *option.AiOption, specs *octopus.GetResourceSpec return errors.New("set ResourceId error") } -func (o *OctopusLink) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*inference.InferUrl, error) { +func (o *OctopusLink) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) { req := &octopus.GetNotebookListReq{ Platform: o.platform, PageIndex: o.pageIndex, @@ -902,7 +905,13 @@ func (o *OctopusLink) GetInferUrl(ctx context.Context, option *option.InferOptio if len(imageUrls) == 0 { return nil, errors.New("no infer url available") } - return imageUrls, nil + + clusterWithUrl := &inference.ClusterInferUrl{ + ClusterName: o.platform, + ClusterType: TYPE_OCTOPUS, + InferUrls: imageUrls, + } + return clusterWithUrl, nil } func (o *OctopusLink) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) { @@ -979,3 +988,26 @@ func (o *OctopusLink) GetInferDeployInstance(ctx context.Context, id string) (*i return ins, nil } + +func (o *OctopusLink) GetInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) { + buf, err := ioutil.ReadAll(file) + if err != nil { + log.Fatal(err) + } + req := &octopus.InferResultReq{ + Platform: o.platform, + InferUrl: url, + FileName: fileName, + FileBytes: buf, + } + + result, err := o.octopusRpc.GetInferResult(ctx, req) + if err != nil { + return "", err + } + + if result.Result == "" { + return "", errors.New("get result failed") + } + return result.Result, nil +} diff --git a/internal/storeLink/shuguangai.go b/internal/storeLink/shuguangai.go index 49d9ec1b..d5c41d87 100644 --- a/internal/storeLink/shuguangai.go +++ b/internal/storeLink/shuguangai.go @@ -25,6 +25,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" + "mime/multipart" "strconv" "strings" "sync" @@ -768,7 +769,7 @@ func (s *ShuguangAi) generateParams(option *option.AiOption) error { return errors.New("failed to set params") } -func (s *ShuguangAi) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*inference.InferUrl, error) { +func (s *ShuguangAi) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) { var imageUrls []*inference.InferUrl urlReq := &hpcAC.GetInferUrlReq{ @@ -787,7 +788,12 @@ func (s *ShuguangAi) GetInferUrl(ctx context.Context, option *option.InferOption } imageUrls = append(imageUrls, imageUrl) - return imageUrls, nil + clusterWithUrl := &inference.ClusterInferUrl{ + ClusterName: s.platform, + ClusterType: TYPE_SHUGUANGAI, + InferUrls: imageUrls, + } + return clusterWithUrl, nil } func (s *ShuguangAi) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) { @@ -871,3 +877,7 @@ func (s *ShuguangAi) GetInferDeployInstance(ctx context.Context, id string) (*in return ins, nil } + +func (s *ShuguangAi) GetInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) { + return "", nil +}