From b4b93d2c09ea9d2e499677e328604fb3ddae2a06 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 19 Jun 2024 16:06:47 +0800 Subject: [PATCH 1/8] added imageinference api Former-commit-id: 8bf23cca01d6b82ea5fa1f1078d608e83b206362 --- api/desc/inference/inference.api | 40 +++++++++++++++++++ api/desc/pcm.api | 10 +++++ .../inference/imageinferencehandler.go | 28 +++++++++++++ api/internal/handler/routes.go | 12 ++++++ .../logic/inference/imageinferencelogic.go | 30 ++++++++++++++ api/internal/types/types.go | 12 ++++++ 6 files changed, 132 insertions(+) create mode 100644 api/desc/inference/inference.api create mode 100644 api/internal/handler/inference/imageinferencehandler.go create mode 100644 api/internal/logic/inference/imageinferencelogic.go diff --git a/api/desc/inference/inference.api b/api/desc/inference/inference.api new file mode 100644 index 00000000..8e5b5340 --- /dev/null +++ b/api/desc/inference/inference.api @@ -0,0 +1,40 @@ +syntax = "v1" + +type ( + InferOption { + TaskName string `json:"taskName"` + Images []string `form:"images"` + // AdapterId string `json:"adapterId"` + // AiClusterIds []string `json:"aiClusterIds"` + // ResourceType string `json:"resourceType"` + // ComputeCard string `json:"card"` + // Tops float64 `json:"Tops,optional"` + // TaskType string `json:"taskType"` + // Datasets string `json:"datasets"` + // Algorithm string `json:"algorithm"` + // Strategy string `json:"strategy"` + // StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` + // Params []string `json:"params,optional"` + // Envs []string `json:"envs,optional"` + // Cmd string `json:"cmd,optional"` + // Replica int32 `json:"replicas"` + } + /******************image inference*************************/ + + ImageInferenceReq { + InferOption *InferOption `json:"inferOption,optional"` + } + + ImageInferenceResp { + } + +// InferResult { +// ClusterId string `json:"clusterId"` +// TaskId string `json:"taskId"` +// Card string `json:"card"` +// Strategy string `json:"strategy"` +// JobId string `json:"jobId"` +// Replica int32 `json:"replica"` +// Msg string `json:"msg"` +// } +) diff --git a/api/desc/pcm.api b/api/desc/pcm.api index be94f6eb..982b3d86 100644 --- a/api/desc/pcm.api +++ b/api/desc/pcm.api @@ -10,6 +10,7 @@ import ( "storelink/pcm-storelink.api" "schedule/pcm-schedule.api" "monitoring/pcm-monitoring.api" + "inference/inference.api" ) info( @@ -901,6 +902,15 @@ service pcm { get /schedule/getClusterBalanceById/:adapterId/:clusterId (GetClusterBalanceByIdReq) returns (GetClusterBalanceByIdResp) } +@server( + prefix: pcm/v1 + group: inference +) +service pcm { + @handler ImageInferenceHandler + post /inference/images (ImageInferenceReq) returns (ImageInferenceResp) +} + @server( prefix: pcm/v1 group: dictionary diff --git a/api/internal/handler/inference/imageinferencehandler.go b/api/internal/handler/inference/imageinferencehandler.go new file mode 100644 index 00000000..d617c790 --- /dev/null +++ b/api/internal/handler/inference/imageinferencehandler.go @@ -0,0 +1,28 @@ +package inference + +import ( + "net/http" + + "github.com/zeromicro/go-zero/rest/httpx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/inference" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" +) + +func ImageInferenceHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.ImageInferenceReq + if err := httpx.Parse(r, &req); err != nil { + httpx.ErrorCtx(r.Context(), w, err) + return + } + + l := inference.NewImageInferenceLogic(r.Context(), svcCtx) + resp, err := l.ImageInference(&req) + if err != nil { + httpx.ErrorCtx(r.Context(), w, err) + } else { + httpx.OkJsonCtx(r.Context(), w, resp) + } + } +} diff --git a/api/internal/handler/routes.go b/api/internal/handler/routes.go index c8aecb33..94a504ca 100644 --- a/api/internal/handler/routes.go +++ b/api/internal/handler/routes.go @@ -10,6 +10,7 @@ import ( core "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/core" dictionary "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/dictionary" hpc "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/hpc" + inference "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/inference" monitoring "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/monitoring" schedule "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/schedule" storage "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/handler/storage" @@ -1135,6 +1136,17 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { rest.WithPrefix("/pcm/v1"), ) + server.AddRoutes( + []rest.Route{ + { + Method: http.MethodPost, + Path: "/inference/images", + Handler: inference.ImageInferenceHandler(serverCtx), + }, + }, + rest.WithPrefix("/pcm/v1"), + ) + server.AddRoutes( []rest.Route{ { diff --git a/api/internal/logic/inference/imageinferencelogic.go b/api/internal/logic/inference/imageinferencelogic.go new file mode 100644 index 00000000..0a1f9d3e --- /dev/null +++ b/api/internal/logic/inference/imageinferencelogic.go @@ -0,0 +1,30 @@ +package inference + +import ( + "context" + + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type ImageInferenceLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewImageInferenceLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ImageInferenceLogic { + return &ImageInferenceLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *ImageInferenceLogic) ImageInference(req *types.ImageInferenceReq) (resp *types.ImageInferenceResp, err error) { + // todo: add your logic here and delete this line + + return +} diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 8c9b254a..56bfa68f 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -5878,3 +5878,15 @@ type Link struct { type Category struct { Name string `json:"name"` } + +type InferOption struct { + TaskName string `json:"taskName"` + Images []string `form:"images"` +} + +type ImageInferenceReq struct { + InferOption *InferOption `json:"inferOption,optional"` +} + +type ImageInferenceResp struct { +} From cfecebc3b8aeb61132264f572ac7e108e6abab0d Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 19 Jun 2024 16:24:51 +0800 Subject: [PATCH 2/8] added imageinference api Former-commit-id: 88cfe68d5d660c181f200719ea65c03eddce33b4 --- api/desc/inference/inference.api | 2 +- api/internal/types/types.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/api/desc/inference/inference.api b/api/desc/inference/inference.api index 8e5b5340..f227a39f 100644 --- a/api/desc/inference/inference.api +++ b/api/desc/inference/inference.api @@ -3,7 +3,6 @@ syntax = "v1" type ( InferOption { TaskName string `json:"taskName"` - Images []string `form:"images"` // AdapterId string `json:"adapterId"` // AiClusterIds []string `json:"aiClusterIds"` // ResourceType string `json:"resourceType"` @@ -23,6 +22,7 @@ type ( ImageInferenceReq { InferOption *InferOption `json:"inferOption,optional"` + Images []string `form:"images"` } ImageInferenceResp { diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 56bfa68f..c7048880 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -5880,12 +5880,12 @@ type Category struct { } type InferOption struct { - TaskName string `json:"taskName"` - Images []string `form:"images"` + TaskName string `json:"taskName"` } type ImageInferenceReq struct { InferOption *InferOption `json:"inferOption,optional"` + Images []string `form:"images"` } type ImageInferenceResp struct { From 5a0d3a942824d3cc3474f83b5e2199bc9e5ad9a7 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 19 Jun 2024 19:22:46 +0800 Subject: [PATCH 3/8] updated imageinference api Former-commit-id: 37f69eb747ca83ea4f303a71d77c9621efd05472 --- api/desc/inference/inference.api | 43 ++++++++++++++------------------ api/internal/types/types.go | 23 +++++++++++++++-- 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/api/desc/inference/inference.api b/api/desc/inference/inference.api index f227a39f..c47fb20e 100644 --- a/api/desc/inference/inference.api +++ b/api/desc/inference/inference.api @@ -3,38 +3,33 @@ syntax = "v1" type ( InferOption { TaskName string `json:"taskName"` - // AdapterId string `json:"adapterId"` - // AiClusterIds []string `json:"aiClusterIds"` - // ResourceType string `json:"resourceType"` - // ComputeCard string `json:"card"` - // Tops float64 `json:"Tops,optional"` - // TaskType string `json:"taskType"` - // Datasets string `json:"datasets"` - // Algorithm string `json:"algorithm"` - // Strategy string `json:"strategy"` - // StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` - // Params []string `json:"params,optional"` - // Envs []string `json:"envs,optional"` - // Cmd string `json:"cmd,optional"` - // Replica int32 `json:"replicas"` + TaskType string `json:"taskType"` + AdapterId string `json:"adapterId"` + AiClusterIds []string `json:"aiClusterIds"` + ResourceType string `json:"resourceType"` + ComputeCard string `json:"card"` + Strategy string `json:"strategy"` + StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` + Params []string `json:"params,optional"` + Envs []string `json:"envs,optional"` + Cmd string `json:"cmd,optional"` + Replica int32 `json:"replicas,optional"` } /******************image inference*************************/ ImageInferenceReq { InferOption *InferOption `json:"inferOption,optional"` - Images []string `form:"images"` } ImageInferenceResp { + InferResults []*ImageResult `json:"result"` } -// InferResult { -// ClusterId string `json:"clusterId"` -// TaskId string `json:"taskId"` -// Card string `json:"card"` -// Strategy string `json:"strategy"` -// JobId string `json:"jobId"` -// Replica int32 `json:"replica"` -// Msg string `json:"msg"` -// } + ImageResult { + ClusterId string `json:"clusterId"` + ClusterName string `json:"clusterName"` + ImageName string `json:"imageName"` + Card string `json:"card"` + ImageResult string `json:"imageResult"` + } ) diff --git a/api/internal/types/types.go b/api/internal/types/types.go index c7048880..e45a6d7e 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -5880,13 +5880,32 @@ type Category struct { } type InferOption struct { - TaskName string `json:"taskName"` + TaskName string `json:"taskName"` + TaskType string `json:"taskType"` + AdapterId string `json:"adapterId"` + AiClusterIds []string `json:"aiClusterIds"` + ResourceType string `json:"resourceType"` + ComputeCard string `json:"card"` + Strategy string `json:"strategy"` + StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` + Params []string `json:"params,optional"` + Envs []string `json:"envs,optional"` + Cmd string `json:"cmd,optional"` + Replica int32 `json:"replicas,optional"` } type ImageInferenceReq struct { InferOption *InferOption `json:"inferOption,optional"` - Images []string `form:"images"` } type ImageInferenceResp struct { + InferResults []*ImageResult `json:"result"` +} + +type ImageResult struct { + ClusterId string `json:"clusterId"` + ClusterName string `json:"clusterName"` + ImageName string `json:"imageName"` + Card string `json:"card"` + ImageResult string `json:"imageResult"` } From eb4ca38875497b2e284a966c987116c827b5ff9d Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 19 Jun 2024 19:30:09 +0800 Subject: [PATCH 4/8] updated imageinference api Former-commit-id: cb39280dcc921074722e34f0387b613fbef4e2c6 --- api/desc/inference/inference.api | 1 + api/internal/types/types.go | 1 + 2 files changed, 2 insertions(+) diff --git a/api/desc/inference/inference.api b/api/desc/inference/inference.api index c47fb20e..51492e01 100644 --- a/api/desc/inference/inference.api +++ b/api/desc/inference/inference.api @@ -3,6 +3,7 @@ syntax = "v1" type ( InferOption { TaskName string `json:"taskName"` + ModelName string `json:"modelName"` TaskType string `json:"taskType"` AdapterId string `json:"adapterId"` AiClusterIds []string `json:"aiClusterIds"` diff --git a/api/internal/types/types.go b/api/internal/types/types.go index e45a6d7e..d339507c 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -5881,6 +5881,7 @@ type Category struct { type InferOption struct { TaskName string `json:"taskName"` + ModelName string `json:"modelName"` TaskType string `json:"taskType"` AdapterId string `json:"adapterId"` AiClusterIds []string `json:"aiClusterIds"` From 0a0182a1bb487ff23c2fab3471534494b7824c72 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 19 Jun 2024 19:35:01 +0800 Subject: [PATCH 5/8] updated imageinference api Former-commit-id: 87a8f48882340c5df98f7cd1724bf3fd74836dc6 --- api/desc/inference/inference.api | 3 ++- api/internal/types/types.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/api/desc/inference/inference.api b/api/desc/inference/inference.api index 51492e01..994532b3 100644 --- a/api/desc/inference/inference.api +++ b/api/desc/inference/inference.api @@ -3,8 +3,9 @@ syntax = "v1" type ( InferOption { TaskName string `json:"taskName"` + TaskDesc string `json:"taskDesc"` ModelName string `json:"modelName"` - TaskType string `json:"taskType"` + ModelType string `json:"modelType"` AdapterId string `json:"adapterId"` AiClusterIds []string `json:"aiClusterIds"` ResourceType string `json:"resourceType"` diff --git a/api/internal/types/types.go b/api/internal/types/types.go index d339507c..e2b0fc9e 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -5881,8 +5881,9 @@ type Category struct { type InferOption struct { TaskName string `json:"taskName"` + TaskDesc string `json:"taskDesc"` ModelName string `json:"modelName"` - TaskType string `json:"taskType"` + ModelType string `json:"modelType"` AdapterId string `json:"adapterId"` AiClusterIds []string `json:"aiClusterIds"` ResourceType string `json:"resourceType"` From 99d96327acd4d0e54a03c38088f23b4baf957666 Mon Sep 17 00:00:00 2001 From: tzwang Date: Thu, 20 Jun 2024 11:31:29 +0800 Subject: [PATCH 6/8] updated imageinference api Former-commit-id: f02b83693b1989f34e6b67c019ed3599ae64305f --- api/desc/inference/inference.api | 6 +++--- api/internal/types/types.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/api/desc/inference/inference.api b/api/desc/inference/inference.api index 994532b3..7d8616fa 100644 --- a/api/desc/inference/inference.api +++ b/api/desc/inference/inference.api @@ -7,9 +7,9 @@ type ( ModelName string `json:"modelName"` ModelType string `json:"modelType"` AdapterId string `json:"adapterId"` - AiClusterIds []string `json:"aiClusterIds"` - ResourceType string `json:"resourceType"` - ComputeCard string `json:"card"` + AiClusterIds []string `json:"aiClusterIds,optional"` + ResourceType string `json:"resourceType,optional"` + ComputeCard string `json:"card,optional"` Strategy string `json:"strategy"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` Params []string `json:"params,optional"` diff --git a/api/internal/types/types.go b/api/internal/types/types.go index e2b0fc9e..4b0eb9c5 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -5885,9 +5885,9 @@ type InferOption struct { ModelName string `json:"modelName"` ModelType string `json:"modelType"` AdapterId string `json:"adapterId"` - AiClusterIds []string `json:"aiClusterIds"` - ResourceType string `json:"resourceType"` - ComputeCard string `json:"card"` + AiClusterIds []string `json:"aiClusterIds,optional"` + ResourceType string `json:"resourceType,optional"` + ComputeCard string `json:"card,optional"` Strategy string `json:"strategy"` StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` Params []string `json:"params,optional"` From bee590f9697d67c377f07b370d10a912d2f5201b Mon Sep 17 00:00:00 2001 From: tzwang Date: Thu, 20 Jun 2024 17:16:06 +0800 Subject: [PATCH 7/8] updated imageinference api Former-commit-id: f8b560cea1c26f05353800af2e2c0f16a885cc85 --- api/desc/inference/inference.api | 31 ++++++++++++++----------------- api/internal/types/types.go | 32 ++++++++++++++------------------ 2 files changed, 28 insertions(+), 35 deletions(-) diff --git a/api/desc/inference/inference.api b/api/desc/inference/inference.api index 7d8616fa..c8b6b8da 100644 --- a/api/desc/inference/inference.api +++ b/api/desc/inference/inference.api @@ -1,26 +1,23 @@ syntax = "v1" type ( - InferOption { - TaskName string `json:"taskName"` - TaskDesc string `json:"taskDesc"` - ModelName string `json:"modelName"` - ModelType string `json:"modelType"` - AdapterId string `json:"adapterId"` - AiClusterIds []string `json:"aiClusterIds,optional"` - ResourceType string `json:"resourceType,optional"` - ComputeCard string `json:"card,optional"` - Strategy string `json:"strategy"` - StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` - Params []string `json:"params,optional"` - Envs []string `json:"envs,optional"` - Cmd string `json:"cmd,optional"` - Replica int32 `json:"replicas,optional"` - } /******************image inference*************************/ ImageInferenceReq { - InferOption *InferOption `json:"inferOption,optional"` + TaskName string `form:"taskName"` + TaskDesc string `form:"taskDesc"` + ModelName string `form:"modelName"` + ModelType string `form:"modelType"` + AdapterId string `form:"adapterId"` + AiClusterIds []string `form:"aiClusterIds,optional"` + ResourceType string `form:"resourceType,optional"` + ComputeCard string `form:"card,optional"` + Strategy string `form:"strategy"` + StaticWeightMap map[string]int32 `form:"staticWeightMap,optional"` + Params []string `form:"params,optional"` + Envs []string `form:"envs,optional"` + Cmd string `form:"cmd,optional"` + Replica int32 `form:"replicas,optional"` } ImageInferenceResp { diff --git a/api/internal/types/types.go b/api/internal/types/types.go index 4b0eb9c5..b92be857 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -5879,25 +5879,21 @@ type Category struct { Name string `json:"name"` } -type InferOption struct { - TaskName string `json:"taskName"` - TaskDesc string `json:"taskDesc"` - ModelName string `json:"modelName"` - ModelType string `json:"modelType"` - AdapterId string `json:"adapterId"` - AiClusterIds []string `json:"aiClusterIds,optional"` - ResourceType string `json:"resourceType,optional"` - ComputeCard string `json:"card,optional"` - Strategy string `json:"strategy"` - StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` - Params []string `json:"params,optional"` - Envs []string `json:"envs,optional"` - Cmd string `json:"cmd,optional"` - Replica int32 `json:"replicas,optional"` -} - type ImageInferenceReq struct { - InferOption *InferOption `json:"inferOption,optional"` + TaskName string `form:"taskName"` + TaskDesc string `form:"taskDesc"` + ModelName string `form:"modelName"` + ModelType string `form:"modelType"` + AdapterId string `form:"adapterId"` + AiClusterIds []string `form:"aiClusterIds,optional"` + ResourceType string `form:"resourceType,optional"` + ComputeCard string `form:"card,optional"` + Strategy string `form:"strategy"` + StaticWeightMap map[string]int32 `form:"staticWeightMap,optional"` + Params []string `form:"params,optional"` + Envs []string `form:"envs,optional"` + Cmd string `form:"cmd,optional"` + Replica int32 `form:"replicas,optional"` } type ImageInferenceResp struct { From 734280ea00f794209d87c96de03f7b0b4166dfa9 Mon Sep 17 00:00:00 2001 From: tzwang Date: Thu, 20 Jun 2024 20:39:10 +0800 Subject: [PATCH 8/8] added imageinfer api Former-commit-id: 96ae7d54f30190581c662c5f8519efc5d92b555b --- .../inference/imageinferencehandler.go | 14 +- .../logic/inference/imageinferencelogic.go | 262 +++++++++++++++++- .../schedulers/option/inferOption.go | 18 ++ .../scheduler/service/collector/collector.go | 11 +- api/internal/storeLink/modelarts.go | 11 + api/internal/storeLink/octopus.go | 20 ++ api/internal/storeLink/shuguangai.go | 11 + 7 files changed, 331 insertions(+), 16 deletions(-) create mode 100644 api/internal/scheduler/schedulers/option/inferOption.go diff --git a/api/internal/handler/inference/imageinferencehandler.go b/api/internal/handler/inference/imageinferencehandler.go index d617c790..04ae9f78 100644 --- a/api/internal/handler/inference/imageinferencehandler.go +++ b/api/internal/handler/inference/imageinferencehandler.go @@ -1,28 +1,24 @@ package inference import ( - "net/http" - "github.com/zeromicro/go-zero/rest/httpx" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/logic/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" + "net/http" ) func ImageInferenceHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var req types.ImageInferenceReq if err := httpx.Parse(r, &req); err != nil { - httpx.ErrorCtx(r.Context(), w, err) + result.ParamErrorResult(r, w, err) return } l := inference.NewImageInferenceLogic(r.Context(), svcCtx) - resp, err := l.ImageInference(&req) - if err != nil { - httpx.ErrorCtx(r.Context(), w, err) - } else { - httpx.OkJsonCtx(r.Context(), w, resp) - } + resp, err := l.ImageInfer(r, &req) + result.HttpResult(r, w, resp, err) } } diff --git a/api/internal/logic/inference/imageinferencelogic.go b/api/internal/logic/inference/imageinferencelogic.go index 0a1f9d3e..5230c0be 100644 --- a/api/internal/logic/inference/imageinferencelogic.go +++ b/api/internal/logic/inference/imageinferencelogic.go @@ -2,11 +2,18 @@ package inference import ( "context" - + "errors" + "github.com/go-resty/resty/v2" + "github.com/zeromicro/go-zero/core/logx" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/service/collector" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/strategy" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types" - - "github.com/zeromicro/go-zero/core/logx" + "math/rand" + "mime/multipart" + "net/http" + "sync" ) type ImageInferenceLogic struct { @@ -24,7 +31,250 @@ func NewImageInferenceLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Im } func (l *ImageInferenceLogic) ImageInference(req *types.ImageInferenceReq) (resp *types.ImageInferenceResp, err error) { - // todo: add your logic here and delete this line - - return + return nil, nil +} + +func (l *ImageInferenceLogic) ImageInfer(r *http.Request, req *types.ImageInferenceReq) (resp *types.ImageInferenceResp, err error) { + resp = &types.ImageInferenceResp{} + opt := &option.InferOption{ + TaskName: req.TaskName, + TaskDesc: req.TaskDesc, + AdapterId: req.AdapterId, + AiClusterIds: req.AiClusterIds, + ModelName: req.ModelName, + ModelType: req.ModelType, + Strategy: req.Strategy, + StaticWeightMap: req.StaticWeightMap, + } + + var ts []struct { + imageResult *types.ImageResult + file multipart.File + } + + uploadedFiles := r.MultipartForm.File + + if len(uploadedFiles) == 0 { + return nil, errors.New("Images does not exist") + } + + if len(uploadedFiles["images"]) == 0 { + return nil, errors.New("Images does not exist") + } + + for _, header := range uploadedFiles["images"] { + file, err := header.Open() + if err != nil { + return nil, err + } + defer file.Close() + var ir types.ImageResult + ir.ImageName = header.Filename + t := struct { + imageResult *types.ImageResult + file multipart.File + }{ + imageResult: &ir, + file: file, + } + ts = append(ts, t) + } + + _, ok := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[opt.AdapterId] + if !ok { + return nil, errors.New("AdapterId does not exist") + } + + var strat strategy.Strategy + switch opt.Strategy { + case strategy.STATIC_WEIGHT: + //todo resources should match cluster StaticWeightMap + strat = strategy.NewStaticWeightStrategy(opt.StaticWeightMap, int32(len(ts))) + if err != nil { + return nil, err + } + default: + return nil, errors.New("no strategy has been chosen") + } + clusters, err := strat.Schedule() + if err != nil { + return nil, err + } + + results, err := infer(opt, clusters, ts, l.svcCtx, l.ctx) + if err != nil { + return nil, err + } + resp.InferResults = results + + return resp, nil +} + +func infer(opt *option.InferOption, clusters []*strategy.AssignedCluster, ts []struct { + imageResult *types.ImageResult + file multipart.File +}, svcCtx *svc.ServiceContext, ctx context.Context) ([]*types.ImageResult, error) { + + if clusters == nil || len(clusters) == 0 { + return nil, errors.New("clusters is nil") + } + + for i := len(clusters) - 1; i >= 0; i-- { + if clusters[i].Replicas == 0 { + clusters = append(clusters[:i], clusters[i+1:]...) + } + } + + var wg sync.WaitGroup + var cluster_ch = make(chan struct { + urls []*collector.ImageInferUrl + clusterName string + imageNum int32 + }, len(clusters)) + + var cs []struct { + urls []*collector.ImageInferUrl + clusterName string + imageNum int32 + } + collectorMap := svcCtx.Scheduler.AiService.AiCollectorAdapterMap[opt.AdapterId] + + for _, cluster := range clusters { + wg.Add(1) + c := cluster + go func() { + imageUrls, err := collectorMap[c.ClusterId].GetImageInferUrl(ctx, opt) + if err != nil { + return + } + clusterName, _ := svcCtx.Scheduler.AiStorages.GetClusterNameById(c.ClusterId) + + s := struct { + urls []*collector.ImageInferUrl + clusterName string + imageNum int32 + }{ + urls: imageUrls, + clusterName: clusterName, + imageNum: c.Replicas, + } + + cluster_ch <- s + wg.Done() + return + }() + } + wg.Wait() + close(cluster_ch) + + for s := range cluster_ch { + cs = append(cs, s) + } + + var result_ch = make(chan *types.ImageResult, len(ts)) + var results []*types.ImageResult + + wg.Add(len(ts)) + + var imageNumIdx int32 = 0 + var imageNumIdxEnd int32 = 0 + for _, c := range cs { + new_images := make([]struct { + imageResult *types.ImageResult + file multipart.File + }, len(ts)) + copy(new_images, ts) + + imageNumIdxEnd = imageNumIdxEnd + c.imageNum + new_images = new_images[imageNumIdx:imageNumIdxEnd] + imageNumIdx = imageNumIdx + c.imageNum + + go sendInferReq(new_images, c, &wg, result_ch) + } + wg.Wait() + + close(result_ch) + + for s := range result_ch { + results = append(results, s) + } + + return results, nil +} + +func sendInferReq(images []struct { + imageResult *types.ImageResult + file multipart.File +}, cluster struct { + urls []*collector.ImageInferUrl + clusterName string + imageNum int32 +}, wg *sync.WaitGroup, ch chan<- *types.ImageResult) { + for _, image := range images { + go func(t struct { + imageResult *types.ImageResult + file multipart.File + }, c struct { + urls []*collector.ImageInferUrl + clusterName string + imageNum int32 + }) { + if len(c.urls) == 1 { + r, err := getInferResult(c.urls[0].Url, t.file, t.imageResult.ImageName) + if err != nil { + t.imageResult.ImageResult = err.Error() + ch <- t.imageResult + wg.Done() + return + } + t.imageResult.ImageResult = r + t.imageResult.ClusterName = c.clusterName + t.imageResult.Card = c.urls[0].Card + + ch <- t.imageResult + wg.Done() + return + } else { + idx := rand.Intn(len(c.urls)) + r, err := getInferResult(c.urls[idx].Url, t.file, t.imageResult.ImageName) + if err != nil { + t.imageResult.ImageResult = err.Error() + ch <- t.imageResult + wg.Done() + return + } + t.imageResult.ImageResult = r + t.imageResult.ClusterName = c.clusterName + t.imageResult.Card = c.urls[idx].Card + + ch <- t.imageResult + wg.Done() + return + } + }(image, cluster) + } +} + +func getInferResult(url string, file multipart.File, fileName string) (string, error) { + var res Res + req := GetACHttpRequest() + _, err := req. + SetFileReader("file", fileName, file). + SetResult(&res). + Post(url) + + if err != nil { + return "", err + } + return res.Result, nil +} + +func GetACHttpRequest() *resty.Request { + client := resty.New() + request := client.R() + return request +} + +type Res struct { + Result string `json:"result"` } diff --git a/api/internal/scheduler/schedulers/option/inferOption.go b/api/internal/scheduler/schedulers/option/inferOption.go new file mode 100644 index 00000000..b576eb0f --- /dev/null +++ b/api/internal/scheduler/schedulers/option/inferOption.go @@ -0,0 +1,18 @@ +package option + +type InferOption struct { + TaskName string `json:"taskName"` + TaskDesc string `json:"taskDesc"` + ModelName string `json:"modelName"` + ModelType string `json:"modelType"` + AdapterId string `json:"adapterId"` + AiClusterIds []string `json:"aiClusterIds"` + ResourceType string `json:"resourceType"` + ComputeCard string `json:"card"` + Strategy string `json:"strategy"` + StaticWeightMap map[string]int32 `json:"staticWeightMap,optional"` + Params []string `json:"params,optional"` + Envs []string `json:"envs,optional"` + Cmd string `json:"cmd,optional"` + Replica int32 `json:"replicas,optional"` +} diff --git a/api/internal/scheduler/service/collector/collector.go b/api/internal/scheduler/service/collector/collector.go index 2c8d51a8..1fa777ed 100644 --- a/api/internal/scheduler/service/collector/collector.go +++ b/api/internal/scheduler/service/collector/collector.go @@ -1,6 +1,9 @@ package collector -import "context" +import ( + "context" + "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option" +) type AiCollector interface { GetResourceStats(ctx context.Context) (*ResourceStats, error) @@ -12,6 +15,12 @@ type AiCollector interface { UploadAlgorithmCode(ctx context.Context, resourceType string, card string, taskType string, dataset string, algorithm string, code string) error GetComputeCards(ctx context.Context) ([]string, error) GetUserBalance(ctx context.Context) (float64, error) + GetImageInferUrl(ctx context.Context, option *option.InferOption) ([]*ImageInferUrl, error) +} + +type ImageInferUrl struct { + Url string + Card string } type ResourceStats struct { diff --git a/api/internal/storeLink/modelarts.go b/api/internal/storeLink/modelarts.go index 0652c8a5..80b0a193 100644 --- a/api/internal/storeLink/modelarts.go +++ b/api/internal/storeLink/modelarts.go @@ -376,3 +376,14 @@ func (m *ModelArtsLink) generateAlgorithmId(ctx context.Context, option *option. return errors.New("failed to get AlgorithmId") } + +func (m *ModelArtsLink) GetImageInferUrl(ctx context.Context, option *option.InferOption) ([]*collector.ImageInferUrl, error) { + var imageUrls []*collector.ImageInferUrl + imageUrl := &collector.ImageInferUrl{ + Url: "http://0.0.0.0:8888/image", + Card: "npu", + } + imageUrls = append(imageUrls, imageUrl) + + return imageUrls, nil +} diff --git a/api/internal/storeLink/octopus.go b/api/internal/storeLink/octopus.go index 53c3652c..e84e54fd 100644 --- a/api/internal/storeLink/octopus.go +++ b/api/internal/storeLink/octopus.go @@ -870,3 +870,23 @@ func setResourceIdByCard(option *option.AiOption, specs *octopus.GetResourceSpec } return errors.New("set ResourceId error") } + +func (o *OctopusLink) GetImageInferUrl(ctx context.Context, option *option.InferOption) ([]*collector.ImageInferUrl, error) { + var imageUrls []*collector.ImageInferUrl + imageUrl := &collector.ImageInferUrl{ + Url: "http://0.0.0.0:8888/image", + Card: "mlu", + } + imageUrl1 := &collector.ImageInferUrl{ + Url: "http://0.0.0.0:8888/image", + Card: "gcu", + } + imageUrl2 := &collector.ImageInferUrl{ + Url: "http://0.0.0.0:8888/image", + Card: "biv100", + } + imageUrls = append(imageUrls, imageUrl) + imageUrls = append(imageUrls, imageUrl1) + imageUrls = append(imageUrls, imageUrl2) + return imageUrls, nil +} diff --git a/api/internal/storeLink/shuguangai.go b/api/internal/storeLink/shuguangai.go index 03cb8928..d29427d2 100644 --- a/api/internal/storeLink/shuguangai.go +++ b/api/internal/storeLink/shuguangai.go @@ -729,3 +729,14 @@ func (s *ShuguangAi) generateParams(option *option.AiOption) error { return errors.New("failed to set params") } + +func (s *ShuguangAi) GetImageInferUrl(ctx context.Context, option *option.InferOption) ([]*collector.ImageInferUrl, error) { + var imageUrls []*collector.ImageInferUrl + imageUrl := &collector.ImageInferUrl{ + Url: "http://0.0.0.0:8888/image", + Card: "dcu", + } + imageUrls = append(imageUrls, imageUrl) + + return imageUrls, nil +}