updated octopus getInferResultlogic

Former-commit-id: db824e8f2e316d551761d8861fd5afde89902937
This commit is contained in:
tzwang 2024-08-15 15:37:31 +08:00
parent 31d0096029
commit 94b806c34f
8 changed files with 108 additions and 36 deletions

2
go.mod
View File

@ -20,7 +20,7 @@ require (
github.com/zeromicro/go-zero v1.6.6 github.com/zeromicro/go-zero v1.6.6
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1 gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240724095608-1727d09f030c gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240724095608-1727d09f030c
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240718073732-bc5d687f6330 gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240815070729-eeab0822f4fe
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203
gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5
gitlink.org.cn/jcce-pcm/pcm-participant-ceph v0.0.0-20230904090036-24fc730ec87d gitlink.org.cn/jcce-pcm/pcm-participant-ceph v0.0.0-20230904090036-24fc730ec87d

4
go.sum
View File

@ -475,8 +475,8 @@ gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1 h1:DicBXoQiC
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240724095608-1727d09f030c h1:CodJeGgTYJwA6NDHFnw6B+4pBXUl79tvAcECq39tgZI= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240724095608-1727d09f030c h1:CodJeGgTYJwA6NDHFnw6B+4pBXUl79tvAcECq39tgZI=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240724095608-1727d09f030c/go.mod h1:/eOmBFZKWGoabG3sRVkVvIbLwsd2631k4jkUBR6x1AA= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240724095608-1727d09f030c/go.mod h1:/eOmBFZKWGoabG3sRVkVvIbLwsd2631k4jkUBR6x1AA=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240718073732-bc5d687f6330 h1:WxPrFSO6LjDCr+k7nmNFlPst8CtoTHQ2iSjv+D2rNnM= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240815070729-eeab0822f4fe h1:4zBOROvGGzmS1p/cuCVKE0d2tIqhkHNzpSspizZ4G2Y=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240718073732-bc5d687f6330/go.mod h1:QOD5+/l2D+AYBjF2h5T0mdJyfGAmF78QmeKdbBXbjLQ= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240815070729-eeab0822f4fe/go.mod h1:QOD5+/l2D+AYBjF2h5T0mdJyfGAmF78QmeKdbBXbjLQ=
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 h1:s6PsZ1+bev294IWdZRlV7mnOwI1+UzFcldVW/BqhQzI= gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 h1:s6PsZ1+bev294IWdZRlV7mnOwI1+UzFcldVW/BqhQzI=
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203/go.mod h1:i2rrbMQ+Fve345BY9Heh4MUqVTAimZQElQhzzRee5B8= gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203/go.mod h1:i2rrbMQ+Fve345BY9Heh4MUqVTAimZQElQhzzRee5B8=
gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 h1:+/5vnzkJBfMRnya1NrhOzlroUtRa5ePiYbPKlHLoLV0= gitlink.org.cn/JointCloud/pcm-slurm v0.0.0-20240301080743-8b94bbaf57f5 h1:+/5vnzkJBfMRnya1NrhOzlroUtRa5ePiYbPKlHLoLV0=

View File

@ -9,6 +9,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
@ -37,6 +38,7 @@ type FilteredCluster struct {
urls []*inference.InferUrl urls []*inference.InferUrl
clusterId string clusterId string
clusterName string clusterName string
clusterType string
imageNum int32 imageNum int32
} }
@ -156,7 +158,7 @@ func (i *ImageInference) filterClusters() ([]*FilteredCluster, error) {
c := cluster c := cluster
go func() { go func() {
r := http.Request{} r := http.Request{}
imageUrls, err := inferMap[c.ClusterId].GetInferUrl(r.Context(), i.opt) clusterInferUrl, err := inferMap[c.ClusterId].GetClusterInferUrl(r.Context(), i.opt)
if err != nil { if err != nil {
mutex.Lock() mutex.Lock()
i.errMap[c.ClusterId] = err.Error() i.errMap[c.ClusterId] = err.Error()
@ -165,14 +167,13 @@ func (i *ImageInference) filterClusters() ([]*FilteredCluster, error) {
return return
} }
i.inference.AppendRoute(imageUrls) i.inference.AppendRoute(clusterInferUrl.InferUrls)
clusterName, _ := i.storage.GetClusterNameById(c.ClusterId)
var f FilteredCluster var f FilteredCluster
f.urls = imageUrls f.urls = clusterInferUrl.InferUrls
f.clusterId = c.ClusterId f.clusterId = c.ClusterId
f.clusterName = clusterName f.clusterName = clusterInferUrl.ClusterName
f.clusterType = clusterInferUrl.ClusterType
f.imageNum = c.Replicas f.imageNum = c.Replicas
ch <- &f ch <- &f
@ -206,7 +207,7 @@ func (i *ImageInference) inferImages(cs []*FilteredCluster) ([]*types.ImageResul
imageNumIdx = imageNumIdx + c.imageNum imageNumIdx = imageNumIdx + c.imageNum
wg.Add(len(new_images)) wg.Add(len(new_images))
go sendInferReq(new_images, c, &wg, ch, limit) go i.sendInferReq(new_images, c, &wg, ch, limit)
} }
wg.Wait() wg.Wait()
close(ch) close(ch)
@ -300,12 +301,12 @@ func (i *ImageInference) updateStatus(aiTaskList []*models.TaskAi, cs []*Filtere
return nil return nil
} }
func sendInferReq(images []*ImageFile, cluster *FilteredCluster, wg *sync.WaitGroup, ch chan<- *types.ImageResult, limit chan bool) { func (i *ImageInference) sendInferReq(images []*ImageFile, cluster *FilteredCluster, wg *sync.WaitGroup, ch chan<- *types.ImageResult, limit chan bool) {
for _, image := range images { for _, image := range images {
limit <- true limit <- true
go func(t *ImageFile, c *FilteredCluster) { go func(t *ImageFile, c *FilteredCluster) {
if len(c.urls) == 1 { if len(c.urls) == 1 {
r, err := getInferResult(c.urls[0].Url, t.File, t.ImageResult.ImageName, c.clusterName) r, err := getInferResult(c.urls[0].Url, t.File, t.ImageResult.ImageName, c.clusterId, c.clusterType, i.inferAdapter, i.opt.AdapterId)
if err != nil { if err != nil {
t.ImageResult.ImageResult = err.Error() t.ImageResult.ImageResult = err.Error()
t.ImageResult.ClusterId = c.clusterId t.ImageResult.ClusterId = c.clusterId
@ -327,7 +328,7 @@ func sendInferReq(images []*ImageFile, cluster *FilteredCluster, wg *sync.WaitGr
return return
} else { } else {
idx := rand.Intn(len(c.urls)) idx := rand.Intn(len(c.urls))
r, err := getInferResult(c.urls[idx].Url, t.File, t.ImageResult.ImageName, c.clusterName) r, err := getInferResult(c.urls[idx].Url, t.File, t.ImageResult.ImageName, c.clusterId, c.clusterType, i.inferAdapter, i.opt.AdapterId)
if err != nil { if err != nil {
t.ImageResult.ImageResult = err.Error() t.ImageResult.ImageResult = err.Error()
t.ImageResult.ClusterId = c.clusterId t.ImageResult.ClusterId = c.clusterId
@ -404,24 +405,35 @@ func (i *ImageInference) saveAiSubTasks(id int64, aiTaskList []*models.TaskAi, c
return nil return nil
} }
func getInferResult(url string, file multipart.File, fileName string, clusterName string) (string, error) { func getInferResult(url string, file multipart.File, fileName string, clusterId string, clusterType string, inferAdapter map[string]map[string]inference.ICluster, adapterId string) (string, error) {
if clusterName == "鹏城云脑II-modelarts" { inferMap := inferAdapter[adapterId]
switch clusterType {
case storeLink.TYPE_OCTOPUS:
r := http.Request{}
result, err := inferMap[clusterId].GetInferResult(r.Context(), url, file, fileName)
if err != nil {
return "", err
}
return result, nil
case storeLink.TYPE_MODELARTS:
r, err := getInferResultModelarts(url, file, fileName) r, err := getInferResultModelarts(url, file, fileName)
if err != nil { if err != nil {
return "", err return "", err
} }
return r, nil return r, nil
default:
var res Res
req := GetRestyRequest(20)
_, err := req.
SetFileReader("file", fileName, file).
SetResult(&res).
Post(url)
if err != nil {
return "", err
}
return res.Result, nil
} }
var res Res
req := GetRestyRequest(20)
_, err := req.
SetFileReader("file", fileName, file).
SetResult(&res).
Post(url)
if err != nil {
return "", err
}
return res.Result, nil
} }
func getInferResultModelarts(url string, file multipart.File, fileName string) (string, error) { func getInferResultModelarts(url string, file multipart.File, fileName string) (string, error) {

View File

@ -3,6 +3,7 @@ package inference
import ( import (
"context" "context"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"mime/multipart"
) )
const ( const (
@ -10,7 +11,8 @@ const (
) )
type ICluster interface { type ICluster interface {
GetInferUrl(ctx context.Context, option *option.InferOption) ([]*InferUrl, error) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*ClusterInferUrl, error)
GetInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error)
GetInferDeployInstanceList(ctx context.Context) ([]*DeployInstance, error) GetInferDeployInstanceList(ctx context.Context) ([]*DeployInstance, error)
StartInferDeployInstance(ctx context.Context, id string) bool StartInferDeployInstance(ctx context.Context, id string) bool
StopInferDeployInstance(ctx context.Context, id string) bool StopInferDeployInstance(ctx context.Context, id string) bool
@ -26,6 +28,12 @@ type Inference struct {
In IInference In IInference
} }
type ClusterInferUrl struct {
ClusterName string
ClusterType string
InferUrls []*InferUrl
}
type InferUrl struct { type InferUrl struct {
Url string Url string
Card string Card string

View File

@ -74,18 +74,18 @@ func filterClusters(opt *option.InferOption, storage *database.AiStorage, inferA
wg.Add(1) wg.Add(1)
go func(cId string) { go func(cId string) {
r := http.Request{} r := http.Request{}
urls, err := inferMap[cId].GetInferUrl(r.Context(), opt) clusterInferUrl, err := inferMap[cId].GetClusterInferUrl(r.Context(), opt)
if err != nil { if err != nil {
wg.Done() wg.Done()
return return
} }
for i, _ := range urls { for i, _ := range clusterInferUrl.InferUrls {
urls[i].Url = urls[i].Url + inference.FORWARD_SLASH + CHAT clusterInferUrl.InferUrls[i].Url = clusterInferUrl.InferUrls[i].Url + inference.FORWARD_SLASH + CHAT
} }
clusterName, _ := storage.GetClusterNameById(cId) clusterName, _ := storage.GetClusterNameById(cId)
var f FilteredCluster var f FilteredCluster
f.urls = urls f.urls = clusterInferUrl.InferUrls
f.clusterId = cId f.clusterId = cId
f.clusterName = clusterName f.clusterName = clusterName

View File

@ -27,6 +27,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice" "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
"gitlink.org.cn/JointCloud/pcm-modelarts/modelarts" "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
modelartsclient "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts" modelartsclient "gitlink.org.cn/JointCloud/pcm-modelarts/modelarts"
"mime/multipart"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -427,7 +428,7 @@ func (m *ModelArtsLink) generateAlgorithmId(ctx context.Context, option *option.
return errors.New("failed to get AlgorithmId") return errors.New("failed to get AlgorithmId")
} }
func (m *ModelArtsLink) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*inference.InferUrl, error) { func (m *ModelArtsLink) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
var imageUrls []*inference.InferUrl var imageUrls []*inference.InferUrl
urlReq := &modelartsclient.ImageReasoningUrlReq{ urlReq := &modelartsclient.ImageReasoningUrlReq{
ServiceName: option.ModelName, ServiceName: option.ModelName,
@ -444,7 +445,12 @@ func (m *ModelArtsLink) GetInferUrl(ctx context.Context, option *option.InferOpt
} }
imageUrls = append(imageUrls, imageUrl) imageUrls = append(imageUrls, imageUrl)
return imageUrls, nil clusterWithUrl := &inference.ClusterInferUrl{
ClusterName: m.platform,
ClusterType: TYPE_MODELARTS,
InferUrls: imageUrls,
}
return clusterWithUrl, nil
} }
func (m *ModelArtsLink) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) { func (m *ModelArtsLink) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
@ -529,3 +535,7 @@ func (m *ModelArtsLink) GetInferDeployInstance(ctx context.Context, id string) (
ins.ClusterType = TYPE_MODELARTS ins.ClusterType = TYPE_MODELARTS
return ins, nil return ins, nil
} }
func (m *ModelArtsLink) GetInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
return "", nil
}

View File

@ -24,7 +24,10 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"gitlink.org.cn/JointCloud/pcm-octopus/octopus" "gitlink.org.cn/JointCloud/pcm-octopus/octopus"
"gitlink.org.cn/JointCloud/pcm-octopus/octopusclient" "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
"io/ioutil"
"log"
"math" "math"
"mime/multipart"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -872,7 +875,7 @@ func setResourceIdByCard(option *option.AiOption, specs *octopus.GetResourceSpec
return errors.New("set ResourceId error") return errors.New("set ResourceId error")
} }
func (o *OctopusLink) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*inference.InferUrl, error) { func (o *OctopusLink) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
req := &octopus.GetNotebookListReq{ req := &octopus.GetNotebookListReq{
Platform: o.platform, Platform: o.platform,
PageIndex: o.pageIndex, PageIndex: o.pageIndex,
@ -902,7 +905,13 @@ func (o *OctopusLink) GetInferUrl(ctx context.Context, option *option.InferOptio
if len(imageUrls) == 0 { if len(imageUrls) == 0 {
return nil, errors.New("no infer url available") return nil, errors.New("no infer url available")
} }
return imageUrls, nil
clusterWithUrl := &inference.ClusterInferUrl{
ClusterName: o.platform,
ClusterType: TYPE_OCTOPUS,
InferUrls: imageUrls,
}
return clusterWithUrl, nil
} }
func (o *OctopusLink) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) { func (o *OctopusLink) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
@ -979,3 +988,26 @@ func (o *OctopusLink) GetInferDeployInstance(ctx context.Context, id string) (*i
return ins, nil return ins, nil
} }
func (o *OctopusLink) GetInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
buf, err := ioutil.ReadAll(file)
if err != nil {
log.Fatal(err)
}
req := &octopus.InferResultReq{
Platform: o.platform,
InferUrl: url,
FileName: fileName,
FileBytes: buf,
}
result, err := o.octopusRpc.GetInferResult(ctx, req)
if err != nil {
return "", err
}
if result.Result == "" {
return "", errors.New("get result failed")
}
return result.Result, nil
}

View File

@ -25,6 +25,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"mime/multipart"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -768,7 +769,7 @@ func (s *ShuguangAi) generateParams(option *option.AiOption) error {
return errors.New("failed to set params") return errors.New("failed to set params")
} }
func (s *ShuguangAi) GetInferUrl(ctx context.Context, option *option.InferOption) ([]*inference.InferUrl, error) { func (s *ShuguangAi) GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*inference.ClusterInferUrl, error) {
var imageUrls []*inference.InferUrl var imageUrls []*inference.InferUrl
urlReq := &hpcAC.GetInferUrlReq{ urlReq := &hpcAC.GetInferUrlReq{
@ -787,7 +788,12 @@ func (s *ShuguangAi) GetInferUrl(ctx context.Context, option *option.InferOption
} }
imageUrls = append(imageUrls, imageUrl) imageUrls = append(imageUrls, imageUrl)
return imageUrls, nil clusterWithUrl := &inference.ClusterInferUrl{
ClusterName: s.platform,
ClusterType: TYPE_SHUGUANGAI,
InferUrls: imageUrls,
}
return clusterWithUrl, nil
} }
func (s *ShuguangAi) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) { func (s *ShuguangAi) GetInferDeployInstanceList(ctx context.Context) ([]*inference.DeployInstance, error) {
@ -871,3 +877,7 @@ func (s *ShuguangAi) GetInferDeployInstance(ctx context.Context, id string) (*in
return ins, nil return ins, nil
} }
func (s *ShuguangAi) GetInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
return "", nil
}