Merge remote-tracking branch 'origin/master' into master-wq

Former-commit-id: 86383667420e1d060a95a517dcea17a3c8163b2b
This commit is contained in:
qiwang 2024-06-21 10:32:12 +08:00
commit daa41d1dbc
11 changed files with 459 additions and 1 deletions

View File

@ -0,0 +1,34 @@
syntax = "v1"
type (
/******************image inference*************************/
ImageInferenceReq {
TaskName string `form:"taskName"`
TaskDesc string `form:"taskDesc"`
ModelName string `form:"modelName"`
ModelType string `form:"modelType"`
AdapterId string `form:"adapterId"`
AiClusterIds []string `form:"aiClusterIds,optional"`
ResourceType string `form:"resourceType,optional"`
ComputeCard string `form:"card,optional"`
Strategy string `form:"strategy"`
StaticWeightMap map[string]int32 `form:"staticWeightMap,optional"`
Params []string `form:"params,optional"`
Envs []string `form:"envs,optional"`
Cmd string `form:"cmd,optional"`
Replica int32 `form:"replicas,optional"`
}
ImageInferenceResp {
InferResults []*ImageResult `json:"result"`
}
ImageResult {
ClusterId string `json:"clusterId"`
ClusterName string `json:"clusterName"`
ImageName string `json:"imageName"`
Card string `json:"card"`
ImageResult string `json:"imageResult"`
}
)

View File

@ -10,6 +10,7 @@ import (
"storelink/pcm-storelink.api" "storelink/pcm-storelink.api"
"schedule/pcm-schedule.api" "schedule/pcm-schedule.api"
"monitoring/pcm-monitoring.api" "monitoring/pcm-monitoring.api"
"inference/inference.api"
) )
info( info(
@ -901,6 +902,15 @@ service pcm {
get /schedule/getClusterBalanceById/:adapterId/:clusterId (GetClusterBalanceByIdReq) returns (GetClusterBalanceByIdResp) get /schedule/getClusterBalanceById/:adapterId/:clusterId (GetClusterBalanceByIdReq) returns (GetClusterBalanceByIdResp)
} }
@server(
prefix: pcm/v1
group: inference
)
service pcm {
@handler ImageInferenceHandler
post /inference/images (ImageInferenceReq) returns (ImageInferenceResp)
}
@server( @server(
prefix: pcm/v1 prefix: pcm/v1
group: dictionary group: dictionary

View File

@ -0,0 +1,24 @@
package inference
import (
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/inference"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"
)
func ImageInferenceHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.ImageInferenceReq
if err := httpx.Parse(r, &req); err != nil {
result.ParamErrorResult(r, w, err)
return
}
l := inference.NewImageInferenceLogic(r.Context(), svcCtx)
resp, err := l.ImageInfer(r, &req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -10,6 +10,7 @@ import (
core "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/core" core "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/core"
dictionary "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/dictionary" dictionary "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/dictionary"
hpc "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/hpc" hpc "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/hpc"
inference "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/inference"
monitoring "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/monitoring" monitoring "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/monitoring"
schedule "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/schedule" schedule "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/schedule"
storage "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/storage" storage "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/storage"
@ -1135,6 +1136,17 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
rest.WithPrefix("/pcm/v1"), rest.WithPrefix("/pcm/v1"),
) )
server.AddRoutes(
[]rest.Route{
{
Method: http.MethodPost,
Path: "/inference/images",
Handler: inference.ImageInferenceHandler(serverCtx),
},
},
rest.WithPrefix("/pcm/v1"),
)
server.AddRoutes( server.AddRoutes(
[]rest.Route{ []rest.Route{
{ {

View File

@ -0,0 +1,280 @@
package inference
import (
"context"
"errors"
"github.com/go-resty/resty/v2"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
"math/rand"
"mime/multipart"
"net/http"
"sync"
)
type ImageInferenceLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewImageInferenceLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ImageInferenceLogic {
return &ImageInferenceLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *ImageInferenceLogic) ImageInference(req *types.ImageInferenceReq) (resp *types.ImageInferenceResp, err error) {
return nil, nil
}
func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInferenceReq) (resp *types.ImageInferenceResp, err error) {
resp = &types.ImageInferenceResp{}
opt := &option.InferOption{
TaskName: req.TaskName,
TaskDesc: req.TaskDesc,
AdapterId: req.AdapterId,
AiClusterIds: req.AiClusterIds,
ModelName: req.ModelName,
ModelType: req.ModelType,
Strategy: req.Strategy,
StaticWeightMap: req.StaticWeightMap,
}
var ts []struct {
imageResult *types.ImageResult
file multipart.File
}
uploadedFiles := r.MultipartForm.File
if len(uploadedFiles) == 0 {
return nil, errors.New("Images does not exist")
}
if len(uploadedFiles["images"]) == 0 {
return nil, errors.New("Images does not exist")
}
for _, header := range uploadedFiles["images"] {
file, err := header.Open()
if err != nil {
return nil, err
}
defer file.Close()
var ir types.ImageResult
ir.ImageName = header.Filename
t := struct {
imageResult *types.ImageResult
file multipart.File
}{
imageResult: &ir,
file: file,
}
ts = append(ts, t)
}
_, ok := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[opt.AdapterId]
if !ok {
return nil, errors.New("AdapterId does not exist")
}
var strat strategy.Strategy
switch opt.Strategy {
case strategy.STATIC_WEIGHT:
//todo resources should match cluster StaticWeightMap
strat = strategy.NewStaticWeightStrategy(opt.StaticWeightMap, int32(len(ts)))
if err != nil {
return nil, err
}
default:
return nil, errors.New("no strategy has been chosen")
}
clusters, err := strat.Schedule()
if err != nil {
return nil, err
}
results, err := infer(opt, clusters, ts, l.svcCtx, l.ctx)
if err != nil {
return nil, err
}
resp.InferResults = results
return resp, nil
}
func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []struct {
imageResult *types.ImageResult
file multipart.File
}, svcCtx *svc.ServiceContext, ctx context.Context) ([]*types.ImageResult, error) {
if clusters == nil || len(clusters) == 0 {
return nil, errors.New("clusters is nil")
}
for i := len(clusters) - 1; i >= 0; i-- {
if clusters[i].Replicas == 0 {
clusters = append(clusters[:i], clusters[i+1:]...)
}
}
var wg sync.WaitGroup
var cluster_ch = make(chan struct {
urls []*collector.ImageInferUrl
clusterName string
imageNum int32
}, len(clusters))
var cs []struct {
urls []*collector.ImageInferUrl
clusterName string
imageNum int32
}
collectorMap := svcCtx.Scheduler.AiService.AiCollectorAdapterMap[opt.AdapterId]
for _, cluster := range clusters {
wg.Add(1)
c := cluster
go func() {
imageUrls, err := collectorMap[c.ClusterId].GetImageInferUrl(ctx, opt)
if err != nil {
return
}
clusterName, _ := svcCtx.Scheduler.AiStorages.GetClusterNameById(c.ClusterId)
s := struct {
urls []*collector.ImageInferUrl
clusterName string
imageNum int32
}{
urls: imageUrls,
clusterName: clusterName,
imageNum: c.Replicas,
}
cluster_ch <- s
wg.Done()
return
}()
}
wg.Wait()
close(cluster_ch)
for s := range cluster_ch {
cs = append(cs, s)
}
var result_ch = make(chan *types.ImageResult, len(ts))
var results []*types.ImageResult
wg.Add(len(ts))
var imageNumIdx int32 = 0
var imageNumIdxEnd int32 = 0
for _, c := range cs {
new_images := make([]struct {
imageResult *types.ImageResult
file multipart.File
}, len(ts))
copy(new_images, ts)
imageNumIdxEnd = imageNumIdxEnd + c.imageNum
new_images = new_images[imageNumIdx:imageNumIdxEnd]
imageNumIdx = imageNumIdx + c.imageNum
go sendInferReq(new_images, c, &wg, result_ch)
}
wg.Wait()
close(result_ch)
for s := range result_ch {
results = append(results, s)
}
return results, nil
}
func sendInferReq(images []struct {
imageResult *types.ImageResult
file multipart.File
}, cluster struct {
urls []*collector.ImageInferUrl
clusterName string
imageNum int32
}, wg *sync.WaitGroup, ch chan<- *types.ImageResult) {
for _, image := range images {
go func(t struct {
imageResult *types.ImageResult
file multipart.File
}, c struct {
urls []*collector.ImageInferUrl
clusterName string
imageNum int32
}) {
if len(c.urls) == 1 {
r, err := getInferResult(c.urls[0].Url, t.file, t.imageResult.ImageName)
if err != nil {
t.imageResult.ImageResult = err.Error()
ch <- t.imageResult
wg.Done()
return
}
t.imageResult.ImageResult = r
t.imageResult.ClusterName = c.clusterName
t.imageResult.Card = c.urls[0].Card
ch <- t.imageResult
wg.Done()
return
} else {
idx := rand.Intn(len(c.urls))
r, err := getInferResult(c.urls[idx].Url, t.file, t.imageResult.ImageName)
if err != nil {
t.imageResult.ImageResult = err.Error()
ch <- t.imageResult
wg.Done()
return
}
t.imageResult.ImageResult = r
t.imageResult.ClusterName = c.clusterName
t.imageResult.Card = c.urls[idx].Card
ch <- t.imageResult
wg.Done()
return
}
}(image, cluster)
}
}
func getInferResult(url string, file multipart.File, fileName string) (string, error) {
var res Res
req := GetACHttpRequest()
_, err := req.
SetFileReader("file", fileName, file).
SetResult(&res).
Post(url)
if err != nil {
return "", err
}
return res.Result, nil
}
func GetACHttpRequest() *resty.Request {
client := resty.New()
request := client.R()
return request
}
type Res struct {
Result string `json:"result"`
}

View File

@ -0,0 +1,18 @@
package option
type InferOption struct {
TaskName string `json:"taskName"`
TaskDesc string `json:"taskDesc"`
ModelName string `json:"modelName"`
ModelType string `json:"modelType"`
AdapterId string `json:"adapterId"`
AiClusterIds []string `json:"aiClusterIds"`
ResourceType string `json:"resourceType"`
ComputeCard string `json:"card"`
Strategy string `json:"strategy"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
Params []string `json:"params,optional"`
Envs []string `json:"envs,optional"`
Cmd string `json:"cmd,optional"`
Replica int32 `json:"replicas,optional"`
}

View File

@ -1,6 +1,9 @@
package collector package collector
import "context" import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
)
type AiCollector interface { type AiCollector interface {
GetResourceStats(ctx context.Context) (*ResourceStats, error) GetResourceStats(ctx context.Context) (*ResourceStats, error)
@ -12,6 +15,12 @@ type AiCollector interface {
UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error
GetComputeCards(ctx context.Context) ([]string, error) GetComputeCards(ctx context.Context) ([]string, error)
GetUserBalance(ctx context.Context) (float64, error) GetUserBalance(ctx context.Context) (float64, error)
GetImageInferUrl(ctx context.Context, option *option.InferOption) ([]*ImageInferUrl, error)
}
type ImageInferUrl struct {
Url string
Card string
} }
type ResourceStats struct { type ResourceStats struct {

View File

@ -376,3 +376,14 @@ func (m *ModelArtsLink) generateAlgorithmId(ctx context.Context, option *option.
return errors.New("failed to get AlgorithmId") return errors.New("failed to get AlgorithmId")
} }
func (m *ModelArtsLink) GetImageInferUrl(ctx context.Context, option *option.InferOption) ([]*collector.ImageInferUrl, error) {
var imageUrls []*collector.ImageInferUrl
imageUrl := &collector.ImageInferUrl{
Url: "http://0.0.0.0:8888/image",
Card: "npu",
}
imageUrls = append(imageUrls, imageUrl)
return imageUrls, nil
}

View File

@ -870,3 +870,23 @@ func setResourceIdByCard(option *option.AiOption, specs *octopus.GetResourceSpec
} }
return errors.New("set ResourceId error") return errors.New("set ResourceId error")
} }
func (o *OctopusLink) GetImageInferUrl(ctx context.Context, option *option.InferOption) ([]*collector.ImageInferUrl, error) {
var imageUrls []*collector.ImageInferUrl
imageUrl := &collector.ImageInferUrl{
Url: "http://0.0.0.0:8888/image",
Card: "mlu",
}
imageUrl1 := &collector.ImageInferUrl{
Url: "http://0.0.0.0:8888/image",
Card: "gcu",
}
imageUrl2 := &collector.ImageInferUrl{
Url: "http://0.0.0.0:8888/image",
Card: "biv100",
}
imageUrls = append(imageUrls, imageUrl)
imageUrls = append(imageUrls, imageUrl1)
imageUrls = append(imageUrls, imageUrl2)
return imageUrls, nil
}

View File

@ -729,3 +729,14 @@ func (s *ShuguangAi) generateParams(option *option.AiOption) error {
return errors.New("failed to set params") return errors.New("failed to set params")
} }
func (s *ShuguangAi) GetImageInferUrl(ctx context.Context, option *option.InferOption) ([]*collector.ImageInferUrl, error) {
var imageUrls []*collector.ImageInferUrl
imageUrl := &collector.ImageInferUrl{
Url: "http://0.0.0.0:8888/image",
Card: "dcu",
}
imageUrls = append(imageUrls, imageUrl)
return imageUrls, nil
}

View File

@ -5878,3 +5878,32 @@ type Link struct {
type Category struct { type Category struct {
Name string `json:"name"` Name string `json:"name"`
} }
type ImageInferenceReq struct {
TaskName string `form:"taskName"`
TaskDesc string `form:"taskDesc"`
ModelName string `form:"modelName"`
ModelType string `form:"modelType"`
AdapterId string `form:"adapterId"`
AiClusterIds []string `form:"aiClusterIds,optional"`
ResourceType string `form:"resourceType,optional"`
ComputeCard string `form:"card,optional"`
Strategy string `form:"strategy"`
StaticWeightMap map[string]int32 `form:"staticWeightMap,optional"`
Params []string `form:"params,optional"`
Envs []string `form:"envs,optional"`
Cmd string `form:"cmd,optional"`
Replica int32 `form:"replicas,optional"`
}
type ImageInferenceResp struct {
InferResults []*ImageResult `json:"result"`
}
type ImageResult struct {
ClusterId string `json:"clusterId"`
ClusterName string `json:"clusterName"`
ImageName string `json:"imageName"`
Card string `json:"card"`
ImageResult string `json:"imageResult"`
}