Merge pull request 'updated imageinference logics' (#289) from tzwang/pcm-coordinator:master into master

Former-commit-id: 7f31f9565187af3718adf0d4ca6b2ca2b832f737
This commit is contained in:
tzwang 2024-08-27 17:47:18 +08:00
commit b9ed8c5fa7
14 changed files with 263 additions and 168 deletions

View File

@ -1,6 +1,20 @@
syntax = "v1"
type (
/******************image inference*************************/
DeployInstance {
InstanceId string `json:"instanceId"`
InstanceName string `json:"instanceName"`
AdapterId string `json:"adapterId"`
AdapterName string `json:"adapterName"`
ClusterId string `json:"clusterId"`
ClusterName string `json:"clusterName"`
ModelName string `json:"modelName"`
ModelType string `json:"modelType"`
InferCard string `json:"inferCard"`
Status string `json:"status"`
}
/******************image inference*************************/
ModelTypesResp {
ModelTypes []string `json:"types"`
@ -16,20 +30,13 @@ type (
/******************image inference*************************/
ImageInferenceReq {
TaskName string `form:"taskName"`
TaskDesc string `form:"taskDesc"`
ModelName string `form:"modelName"`
ModelType string `form:"modelType"`
AdapterIds []string `form:"adapterIds"`
AiClusterIds []string `form:"aiClusterIds,optional"`
ResourceType string `form:"resourceType,optional"`
ComputeCard string `form:"card,optional"`
Strategy string `form:"strategy"`
StaticWeightMap map[string]int32 `form:"staticWeightMap,optional"`
Params []string `form:"params,optional"`
Envs []string `form:"envs,optional"`
Cmd string `form:"cmd,optional"`
Replica int32 `form:"replicas,optional"`
TaskName string `json:"taskName"`
TaskDesc string `json:"taskDesc"`
ModelType string `json:"modelType"`
Instances []DeployInstance `json:"instances"`
Strategy string `json:"strategy,,optional"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
Replica int32 `json:"replicas,optional"`
}
ImageInferenceResp {
@ -85,7 +92,7 @@ type (
AiClusterIds []string `form:"aiClusterIds"`
}
TextToImageInferenceResp{
Result []byte
Result []byte `json:"result"`
}
/******************Deploy instance*************************/
@ -149,20 +156,19 @@ type (
}
GetDeployTasksReq {
PageInfo
}
GetDeployTasksResp {
PageResult
}
GetRunningInstanceReq {
AdapterIds []string `form:"adapterIds"`
ModelType string `path:"modelType"`
ModelName string `path:"modelName"`
Id string `form:"deployTaskId"`
AdapterId string `form:"adapterId"`
}
GetRunningInstanceResp {
List interface{} `json:"list,omitempty"`
List interface{} `json:"list"`
}
GetDeployTasksByTypeReq {
ModelType string `form:"modelType"`
}
GetDeployTasksByTypeResp {
List interface{} `json:"list"`
}
)

View File

@ -966,11 +966,11 @@ service pcm {
@handler StopAllByDeployTaskId
post /inference/stopAll (StopAllByDeployTaskIdReq) returns (StopAllByDeployTaskIdResp)
@handler GetDeployTasks
get /inference/getDeployTasks (GetDeployTasksReq) returns (GetDeployTasksResp)
@handler GetRunningInstanceById
get /inference/getRunningInstanceById (GetRunningInstanceReq) returns (GetRunningInstanceResp)
@handler GetRunningInstanceByModel
get /inference/getInstanceByModel (GetRunningInstanceReq) returns (GetRunningInstanceResp)
@handler GetDeployTasksByType
get /inference/getDeployTasksByType (GetDeployTasksByTypeReq) returns (GetDeployTasksByTypeResp)
}
@server(

View File

@ -1,6 +1,7 @@
package inference
import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result"
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
@ -9,20 +10,16 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
)
func GetDeployTasksHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
func GetDeployTasksByTypeHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.GetDeployTasksReq
var req types.GetDeployTasksByTypeReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
result.ParamErrorResult(r, w, err)
return
}
l := inference.NewGetDeployTasksLogic(r.Context(), svcCtx)
resp, err := l.GetDeployTasks(&req)
if err != nil {
httpx.ErrorCtx(r.Context(), w, err)
} else {
httpx.OkJsonCtx(r.Context(), w, resp)
}
l := inference.NewGetDeployTasksByTypeLogic(r.Context(), svcCtx)
resp, err := l.GetDeployTasksByType(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -10,7 +10,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
)
func GetRunningInstanceByModelHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
func GetRunningInstanceByIdHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.GetRunningInstanceReq
if err := httpx.Parse(r, &req); err != nil {
@ -18,8 +18,9 @@ func GetRunningInstanceByModelHandler(svcCtx *svc.ServiceContext) http.HandlerFu
return
}
l := inference.NewGetRunningInstanceByModelLogic(r.Context(), svcCtx)
resp, err := l.GetRunningInstanceByModel(&req)
l := inference.NewGetRunningInstanceByIdLogic(r.Context(), svcCtx)
resp, err := l.GetRunningInstanceById(&req)
result.HttpResult(r, w, resp, err)
}
}

View File

@ -1225,13 +1225,13 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
},
{
Method: http.MethodGet,
Path: "/inference/getDeployTasks",
Handler: inference.GetDeployTasksHandler(serverCtx),
Path: "/inference/getRunningInstanceById",
Handler: inference.GetRunningInstanceByIdHandler(serverCtx),
},
{
Method: http.MethodGet,
Path: "/inference/getInstanceByModel",
Handler: inference.GetRunningInstanceByModelHandler(serverCtx),
Path: "/inference/getDeployTasksByType",
Handler: inference.GetDeployTasksByTypeHandler(serverCtx),
},
},
rest.WithPrefix("/pcm/v1"),

View File

@ -0,0 +1,41 @@
package inference
import (
"context"
"github.com/pkg/errors"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetDeployTasksByTypeLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetDeployTasksByTypeLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetDeployTasksByTypeLogic {
return &GetDeployTasksByTypeLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetDeployTasksByTypeLogic) GetDeployTasksByType(req *types.GetDeployTasksByTypeReq) (resp *types.GetDeployTasksByTypeResp, err error) {
resp = &types.GetDeployTasksByTypeResp{}
list, err := l.svcCtx.Scheduler.AiStorages.GetDeployTaskListByType(req.ModelType)
if err != nil {
return nil, err
}
if len(list) == 0 {
return nil, errors.New("实列不存在")
}
resp.List = list
return resp, nil
}

View File

@ -1,30 +0,0 @@
package inference
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetDeployTasksLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetDeployTasksLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetDeployTasksLogic {
return &GetDeployTasksLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetDeployTasksLogic) GetDeployTasks(req *types.GetDeployTasksReq) (resp *types.GetDeployTasksResp, err error) {
// todo: add your logic here and delete this line
return
}

View File

@ -0,0 +1,38 @@
package inference
import (
"context"
"strconv"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetRunningInstanceByIdLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetRunningInstanceByIdLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetRunningInstanceByIdLogic {
return &GetRunningInstanceByIdLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetRunningInstanceByIdLogic) GetRunningInstanceById(req *types.GetRunningInstanceReq) (resp *types.GetRunningInstanceResp, err error) {
resp = &types.GetRunningInstanceResp{}
id, err := strconv.ParseInt(req.Id, 10, 64)
if err != nil {
return nil, err
}
list, err := l.svcCtx.Scheduler.AiStorages.GetRunningDeployInstanceById(id, req.AdapterId)
resp.List = list
return
}

View File

@ -1,30 +0,0 @@
package inference
import (
"context"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type GetRunningInstanceByModelLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewGetRunningInstanceByModelLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetRunningInstanceByModelLogic {
return &GetRunningInstanceByModelLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *GetRunningInstanceByModelLogic) GetRunningInstanceByModel(req *types.GetRunningInstanceReq) (resp *types.GetRunningInstanceResp, err error) {
resp = &types.GetRunningInstanceResp{}
return
}

View File

@ -34,12 +34,16 @@ func NewImageInferenceLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Im
func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInferenceReq) (resp *types.ImageInferenceResp, err error) {
resp = &types.ImageInferenceResp{}
if len(req.Instances) == 0 {
return nil, errors.New("instances are empty")
}
opt := &option.InferOption{
TaskName: req.TaskName,
TaskDesc: req.TaskDesc,
AdapterId: req.AdapterId,
AiClusterIds: req.AiClusterIds,
ModelName: req.ModelName,
TaskName: req.TaskName,
TaskDesc: req.TaskDesc,
//AdapterId: req.AdapterId,
//AiClusterIds: req.AiClusterIds,
//ModelName: req.ModelName,
ModelType: req.ModelType,
Strategy: req.Strategy,
StaticWeightMap: req.StaticWeightMap,
@ -72,42 +76,54 @@ func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInfere
ts = append(ts, &t)
}
_, ok := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[opt.AdapterId]
if !ok {
return nil, errors.New("AdapterId does not exist")
}
//_, ok := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[opt.AdapterId]
//if !ok {
// return nil, errors.New("AdapterId does not exist")
//}
//
var strat strategy.Strategy
switch opt.Strategy {
case strategy.STATIC_WEIGHT:
strat = strategy.NewStaticWeightStrategy(opt.StaticWeightMap, int32(len(ts)))
var cs []*strategy.AssignedCluster
var adapterName string
if opt.Strategy != "" {
var strat strategy.Strategy
switch opt.Strategy {
case strategy.STATIC_WEIGHT:
strat = strategy.NewStaticWeightStrategy(opt.StaticWeightMap, int32(len(ts)))
if err != nil {
return nil, err
}
default:
return nil, errors.New("no strategy has been chosen")
}
clusters, err := strat.Schedule()
if err != nil {
return nil, err
}
default:
return nil, errors.New("no strategy has been chosen")
}
clusters, err := strat.Schedule()
if err != nil {
return nil, err
}
if clusters == nil || len(clusters) == 0 {
return nil, errors.New("clusters is nil")
}
for i := len(clusters) - 1; i >= 0; i-- {
if clusters[i].Replicas == 0 {
clusters = append(clusters[:i], clusters[i+1:]...)
if clusters == nil || len(clusters) == 0 {
return nil, errors.New("clusters is nil")
}
for i := len(clusters) - 1; i >= 0; i-- {
if clusters[i].Replicas == 0 {
clusters = append(clusters[:i], clusters[i+1:]...)
}
}
name, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(opt.AdapterId)
if err != nil {
return nil, err
}
adapterName = name
}
adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(opt.AdapterId)
if err != nil {
return nil, err
}
//else {
// for i, instance := range req.Instances {
//
// }
//}
imageInfer, err := imageInference.New(imageInference.NewImageClassification(), ts, clusters, opt, l.svcCtx.Scheduler.AiStorages, l.svcCtx.Scheduler.AiService.InferenceAdapterMap, adapterName)
imageInfer, err := imageInference.New(imageInference.NewImageClassification(), ts, cs, req.Instances, opt, l.svcCtx.Scheduler.AiStorages, l.svcCtx.Scheduler.AiService.InferenceAdapterMap, adapterName)
if err != nil {
return nil, err
}

View File

@ -431,6 +431,16 @@ func (s *AiStorage) GetDeployTaskById(id int64) (*models.AiDeployInstanceTask, e
return &task, nil
}
func (s *AiStorage) GetDeployTaskListByType(modelType string) ([]*models.AiDeployInstanceTask, error) {
var tasks []*models.AiDeployInstanceTask
tx := s.DbEngin.Raw("select * from ai_deploy_instance_task where `model_type` = ?", modelType).Scan(&tasks)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, tx.Error
}
return tasks, nil
}
func (s *AiStorage) GetAllDeployTasks() ([]*models.AiDeployInstanceTask, error) {
var tasks []*models.AiDeployInstanceTask
tx := s.DbEngin.Raw("select * from ai_deploy_instance_task").Scan(&tasks)
@ -574,9 +584,9 @@ func (s *AiStorage) SaveInferDeployTask(taskName string, modelName string, model
return taskModel.Id, nil
}
func (s *AiStorage) GetRunningDeployInstanceByModelNameAndAdapterId(modelType string, modelName string, adapterId string) ([]*models.AiInferDeployInstance, error) {
func (s *AiStorage) GetRunningDeployInstanceById(id int64, adapterId string) ([]*models.AiInferDeployInstance, error) {
var list []*models.AiInferDeployInstance
tx := s.DbEngin.Raw("select * from ai_infer_deploy_instance where `model_type` = ? and `model_name` = ? and `adapter_id` = ? and `status` = 'Running'", modelType, modelName, adapterId).Scan(&list)
tx := s.DbEngin.Raw("select * from ai_infer_deploy_instance where `deploy_instance_task_id` = ? and `adapter_id` = ? and `status` = 'Running'", id, adapterId).Scan(&list)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return nil, tx.Error

View File

@ -46,6 +46,7 @@ type ImageInference struct {
inference IImageInference
files []*ImageFile
clusters []*strategy.AssignedCluster
instances []types.DeployInstance
opt *option.InferOption
storage *database.AiStorage
inferAdapter map[string]map[string]inference.ICluster
@ -57,6 +58,7 @@ func New(
inference IImageInference,
files []*ImageFile,
clusters []*strategy.AssignedCluster,
instances []types.DeployInstance,
opt *option.InferOption,
storage *database.AiStorage,
inferAdapter map[string]map[string]inference.ICluster,
@ -66,6 +68,7 @@ func New(
inference: inference,
files: files,
clusters: clusters,
instances: instances,
opt: opt,
storage: storage,
inferAdapter: inferAdapter,
@ -145,7 +148,7 @@ func (i *ImageInference) saveAiTask(id int64) error {
return nil
}
func (i *ImageInference) filterClusters() ([]*FilteredCluster, error) {
func (i *ImageInference) filterClustersTemp() ([]*FilteredCluster, error) {
var wg sync.WaitGroup
var ch = make(chan *FilteredCluster, len(i.clusters))
var cs []*FilteredCluster
@ -190,6 +193,30 @@ func (i *ImageInference) filterClusters() ([]*FilteredCluster, error) {
return cs, nil
}
func (i *ImageInference) filterClusters() ([]*FilteredCluster, error) {
var cs []*FilteredCluster
for _, cluster := range i.clusters {
var inferurls []*inference.InferUrl
for _, instance := range i.instances {
if cluster.ClusterId == instance.ClusterId {
r := http.Request{}
deployInstance, err := i.inferAdapter[instance.AdapterId][instance.ClusterId].GetInferDeployInstance(r.Context(), instance.InstanceId)
if err != nil {
return nil, err
}
var url inference.InferUrl
url.Url = deployInstance.InferUrl
url.Card = deployInstance.InferCard
inferurls = append(inferurls, &url)
}
}
var f FilteredCluster
f.urls = inferurls
cs = append(cs, &f)
}
return cs, nil
}
func (i *ImageInference) inferImages(cs []*FilteredCluster) ([]*types.ImageResult, error) {
var wg sync.WaitGroup
var ch = make(chan *types.ImageResult, len(i.files))

View File

@ -46,6 +46,7 @@ type DeployInstance struct {
ModelName string
ModelType string
InferCard string
InferUrl string
ClusterName string
ClusterType string
Status string

View File

@ -5904,6 +5904,19 @@ type Category struct {
Name string `json:"name"`
}
type DeployInstance struct {
InstanceId string `json:"instanceId"`
InstanceName string `json:"instanceName"`
AdapterId string `json:"adapterId"`
AdapterName string `json:"adapterName"`
ClusterId string `json:"clusterId"`
ClusterName string `json:"clusterName"`
ModelName string `json:"modelName"`
ModelType string `json:"modelType"`
InferCard string `json:"inferCard"`
Status string `json:"status"`
}
type ModelTypesResp struct {
ModelTypes []string `json:"types"`
}
@ -5917,20 +5930,13 @@ type ModelNamesResp struct {
}
type ImageInferenceReq struct {
TaskName string `form:"taskName"`
TaskDesc string `form:"taskDesc"`
ModelName string `form:"modelName"`
ModelType string `form:"modelType"`
AdapterId string `form:"adapterId"`
AiClusterIds []string `form:"aiClusterIds,optional"`
ResourceType string `form:"resourceType,optional"`
ComputeCard string `form:"card,optional"`
Strategy string `form:"strategy"`
StaticWeightMap map[string]int32 `form:"staticWeightMap,optional"`
Params []string `form:"params,optional"`
Envs []string `form:"envs,optional"`
Cmd string `form:"cmd,optional"`
Replica int32 `form:"replicas,optional"`
TaskName string `json:"taskName"`
TaskDesc string `json:"taskDesc"`
ModelType string `json:"modelType"`
Instances []DeployInstance `json:"instances"`
Strategy string `json:"strategy,,optional"`
StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"`
Replica int32 `json:"replicas,optional"`
}
type ImageInferenceResp struct {
@ -5976,6 +5982,18 @@ type TextToTextInferenceReq struct {
type TextToTextInferenceResp struct {
}
type TextToImageInferenceReq struct {
TaskName string `form:"taskName"`
TaskDesc string `form:"taskDesc"`
ModelName string `form:"modelName"`
ModelType string `form:"modelType"`
AiClusterIds []string `form:"aiClusterIds"`
}
type TextToImageInferenceResp struct {
Result []byte `json:"result"`
}
type DeployInstanceListReq struct {
PageInfo
}
@ -6034,19 +6052,19 @@ type StopAllByDeployTaskIdReq struct {
type StopAllByDeployTaskIdResp struct {
}
type GetDeployTasksReq struct {
PageInfo
}
type GetDeployTasksResp struct {
PageResult
}
type GetRunningInstanceReq struct {
ModelType string `path:"modelType"`
ModelName string `path:"modelName"`
Id string `form:"deployTaskId"`
AdapterId string `form:"adapterId"`
}
type GetRunningInstanceResp struct {
List interface{} `json:"list,omitempty"`
List interface{} `json:"list"`
}
type GetDeployTasksByTypeReq struct {
ModelType string `form:"modelType"`
}
type GetDeployTasksByTypeResp struct {
List interface{} `json:"list"`
}