# Conflicts:
#	go.mod
#	go.sum
#	internal/logic/adapters/createclusterlogic.go


Former-commit-id: e39ad860088a08c910b2456d843bf2a19e142630
This commit is contained in:
zhangwei 2024-09-23 14:49:25 +08:00
commit 5bc93f9e98
10 changed files with 209 additions and 54 deletions

2
go.mod
View File

@ -18,7 +18,7 @@ require (
github.com/prometheus/common v0.59.1
github.com/robfig/cron/v3 v3.0.1
github.com/zeromicro/go-zero v1.7.2
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240918015229-59c579d1a437
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240920093406-601f283f0185
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240918011543-482dcd609877
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203

4
go.sum
View File

@ -466,8 +466,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M
github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/zeromicro/go-zero v1.7.2 h1:a8lyVOG3KXG4LrAy6ZmtJTJtisX4Ostc4Pst4fE704I=
github.com/zeromicro/go-zero v1.7.2/go.mod h1:WFXfF92Exw0O7WECifS6r99JSzv4KEN49x9RhAfgkMc=
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240918015229-59c579d1a437 h1:ta6h9+FU7AQ2fNyQiXrZnMdlNBjOKdyBx4e3RF7BE84=
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240918015229-59c579d1a437/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY=
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240920093406-601f283f0185 h1:B+YBB5xHlIAS6ILuaCGQwbOpr/L6LOHAlj9PeFUCetM=
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240920093406-601f283f0185/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240918011543-482dcd609877 h1:a+1FpxqLPRojlAkJlAeRhKRbxajymXYgrM+s9bfQx0E=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240918011543-482dcd609877/go.mod h1:/eOmBFZKWGoabG3sRVkVvIbLwsd2631k4jkUBR6x1AA=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 h1:GaXwr5sgDh0raHjUf9IewTvnRvajYea7zbLsaerYyXo=

View File

@ -3,8 +3,10 @@ package inference
import (
"context"
"errors"
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
"strconv"
"sync"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
"gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
@ -40,19 +42,78 @@ func (l *CreateDeployTaskLogic) CreateDeployTask(req *types.CreateDeployTaskReq)
Cmd: "",
}
duplicated, err := l.svcCtx.Scheduler.AiStorages.IsDeployTaskNameDuplicated(req.TaskName)
if err != nil {
return nil, err
}
if duplicated {
return nil, errors.New("TaskName doesn't exist")
}
taskId, err := l.svcCtx.Scheduler.AiStorages.SaveInferDeployTask(req.TaskName, req.ModelName, req.ModelType, req.TaskDesc)
if err != nil {
return nil, err
}
var clusterlen int
for _, c := range req.AdapterClusterMap {
clusterlen += len(c)
}
var errCh = make(chan interface{}, clusterlen)
var errs []interface{}
buf := make(chan bool, 2)
var wg sync.WaitGroup
for aid, v := range req.AdapterClusterMap {
for _, cid := range v {
err = l.createDeployInstance(taskId, aid, cid, opt)
if err != nil {
return nil, err
}
for _, c := range v {
wg.Add(1)
cid := c
buf <- true
go func() {
err = l.createDeployInstance(taskId, aid, cid, opt)
if err != nil {
e := struct {
err error
clusterId string
}{
err: err,
clusterId: cid,
}
errCh <- e
wg.Done()
<-buf
return
}
wg.Done()
<-buf
}()
}
}
wg.Wait()
close(errCh)
for e := range errCh {
errs = append(errs, e)
}
if len(errs) != 0 {
var msg string
for _, err := range errs {
e := (err).(struct {
err error
clusterId string
})
clusterName, err := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(e.clusterId)
if err != nil {
clusterName = e.clusterId
}
msg += fmt.Sprintf("CreateInstance Failed # clusterName: %v, error: %v \n", clusterName, e.err.Error())
}
return nil, errors.New(msg)
}
return
}

View File

@ -103,11 +103,11 @@ func (l *DeployInstanceListLogic) GenerateDeployTasks(tasklist []*models.AiDeplo
return nil, errors.New(err.Error())
}
if len(list) == 0 {
err := l.svcCtx.Scheduler.AiStorages.DeleteDeployTaskById(t.Id)
if err != nil {
logx.Errorf("db DeleteByDeployTaskId error")
return nil, errors.New(err.Error())
}
//err := l.svcCtx.Scheduler.AiStorages.DeleteDeployTaskById(t.Id)
//if err != nil {
// logx.Errorf("db DeleteByDeployTaskId error")
// return nil, errors.New(err.Error())
//}
continue
}
deployTask := &DeployTask{

View File

@ -604,3 +604,16 @@ func (s *AiStorage) GetRunningDeployInstanceById(id int64, adapterId string) ([]
}
return list, nil
}
func (s *AiStorage) IsDeployTaskNameDuplicated(name string) (bool, error) {
var total int32
tx := s.DbEngin.Raw("select count(*) from ai_deploy_instance_task where `name` = ?", name).Scan(&total)
if tx.Error != nil {
return false, tx.Error
}
if total == 0 {
return false, nil
}
return true, nil
}

View File

@ -463,7 +463,7 @@ func getInferResult(url string, file multipart.File, fileName string, clusterId
switch clusterType {
case storeLink.TYPE_OCTOPUS:
r := http.Request{}
result, err := iCluster.GetInferResult(r.Context(), url, file, fileName)
result, err := iCluster.GetImageInferResult(r.Context(), url, file, fileName)
if err != nil {
return "", err
}

View File

@ -12,13 +12,17 @@ const (
type ICluster interface {
GetClusterInferUrl(ctx context.Context, option *option.InferOption) (*ClusterInferUrl, error)
GetInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error)
GetInferDeployInstanceList(ctx context.Context) ([]*DeployInstance, error)
StartInferDeployInstance(ctx context.Context, id string) bool
StopInferDeployInstance(ctx context.Context, id string) bool
GetInferDeployInstance(ctx context.Context, id string) (*DeployInstance, error)
CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error)
CheckModelExistence(ctx context.Context, modelName string, modelType string) bool
InferResult
}
type InferResult interface {
GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error)
}
type IInference interface {

View File

@ -767,7 +767,7 @@ func (m *ModelArtsLink) GetInferDeployInstance(ctx context.Context, id string) (
return ins, nil
}
func (m *ModelArtsLink) GetInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
func (m *ModelArtsLink) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
return "", nil
}

View File

@ -1183,7 +1183,7 @@ func (o *OctopusLink) GetInferDeployInstance(ctx context.Context, id string) (*i
return ins, nil
}
func (o *OctopusLink) GetInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
func (o *OctopusLink) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
stream, err := o.octopusRpc.GetInferResult(ctx)
if err != nil {
return "", err

View File

@ -47,49 +47,60 @@ const (
PythonCodePath = "/work/home/acgnnmfbwo/111111/py/test.py"
DATASETS_DIR = "/work/home/acgnnmfbwo/pcmv1/dataset"
ALGORITHM_DIR = "/work/home/acgnnmfbwo/pcmv1/algorithm"
KUNSHAN_DIR = "/public/home/acgnnmfbwo/pcmv1"
TRAIN_FILE = "train.py"
CPUCOREPRICEPERHOUR = 0.09
DCUPRICEPERHOUR = 2.0
KB = 1024
TIMEOUT = 20
DEPLOY_INSTANCE_LIMIT = 100
ProtocolType = "HTTP"
ContainerPort = 8881
JUPYTER = "jupyter"
)
var RESOURCESGAIMAP = map[string]ResourceSpecSGAI{
"WodTB2rJ8SobMgQ1nrtR245jxOrsovFi": {
CPU: 1,
GPU: 1,
RAM: 2 * RAM_SIZE_1G,
},
"6d41v1XV53MQPmQOJ5kNatIck9yl8nWZ": {
CPU: 1,
GPU: 2,
RAM: 2 * RAM_SIZE_1G,
},
"OBtVaaXAv9n9FbLR7pWAoa3yR13jXwNc": {
CPU: 2,
GPU: 3,
RAM: 4 * RAM_SIZE_1G,
},
"sBWfpkntUzsWYly11kdwEHZOYYIsFmve": {
CPU: 4,
GPU: 4,
RAM: 8 * RAM_SIZE_1G,
},
"jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2": {
CPU: 5,
GPU: 5,
RAM: 10 * RAM_SIZE_1G,
},
}
var (
RESOURCESGAIMAP = map[string]ResourceSpecSGAI{
"WodTB2rJ8SobMgQ1nrtR245jxOrsovFi": {
CPU: 1,
GPU: 1,
RAM: 2 * RAM_SIZE_1G,
},
"6d41v1XV53MQPmQOJ5kNatIck9yl8nWZ": {
CPU: 1,
GPU: 2,
RAM: 2 * RAM_SIZE_1G,
},
"OBtVaaXAv9n9FbLR7pWAoa3yR13jXwNc": {
CPU: 2,
GPU: 3,
RAM: 4 * RAM_SIZE_1G,
},
"sBWfpkntUzsWYly11kdwEHZOYYIsFmve": {
CPU: 4,
GPU: 4,
RAM: 8 * RAM_SIZE_1G,
},
"jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2": {
CPU: 5,
GPU: 5,
RAM: 10 * RAM_SIZE_1G,
},
}
var RESOURCESPECSAI = map[string]string{
"WodTB2rJ8SobMgQ1nrtR245jxOrsovFi": "CPU:1, DCU:1, RAM:2G",
"6d41v1XV53MQPmQOJ5kNatIck9yl8nWZ": "CPU:1, DCU:2, RAM:2G",
"OBtVaaXAv9n9FbLR7pWAoa3yR13jXwNc": "CPU:2, DCU:3, RAM:4G",
"sBWfpkntUzsWYly11kdwEHZOYYIsFmve": "CPU:4, DCU:4, RAM:8G",
"jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2": "CPU:5, DCU:5, RAM:10G",
}
RESOURCESPECSAI = map[string]string{
"WodTB2rJ8SobMgQ1nrtR245jxOrsovFi": "CPU:1, DCU:1, RAM:2G",
"6d41v1XV53MQPmQOJ5kNatIck9yl8nWZ": "CPU:1, DCU:2, RAM:2G",
"OBtVaaXAv9n9FbLR7pWAoa3yR13jXwNc": "CPU:2, DCU:3, RAM:4G",
"sBWfpkntUzsWYly11kdwEHZOYYIsFmve": "CPU:4, DCU:4, RAM:8G",
"jeYBVPwyIALjVYNzHvysh2o5CsBpBLp2": "CPU:5, DCU:5, RAM:10G",
}
ModelNameCmdMap = map[string]string{
"blip-image-captioning-base": "pip install transformers python-multipart fastapi uvicorn[standard]; python /public/home/acgnnmfbwo/pcmv1/inference/pytorch/blip_image_captioning_base/infer.py",
"imagenet_resnet50": "pip install fastapi uvicorn[standard] python-multipart; python /public/home/acgnnmfbwo/pcmv1/inference/pytorch/imagenet_resnet50/infer.py",
}
)
type ResourceSpecSGAI struct {
CPU int64
@ -905,15 +916,81 @@ func (s *ShuguangAi) GetInferDeployInstance(ctx context.Context, id string) (*in
return ins, nil
}
func (s *ShuguangAi) GetInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
func (s *ShuguangAi) GetImageInferResult(ctx context.Context, url string, file multipart.File, fileName string) (string, error) {
return "", nil
}
func (s *ShuguangAi) CreateInferDeployInstance(ctx context.Context, option *option.InferOption) (string, error) {
containerPortInfoList := []*hpcAC.ContainerPortInfoList{
{
ProtocolType: ProtocolType,
ContainerPort: ContainerPort,
},
}
return "", nil
desc := option.ModelType + FORWARD_SLASH + option.ModelName + FORWARD_SLASH + strings.ToLower(DCU)
instanceServiceName := "infer_instance" + UNDERSCORE + utils.RandomString(15)
resourceGroup := "kshdtest"
script, ok := ModelNameCmdMap[option.ModelName]
if !ok {
return "", errors.New("failed to set cmd, ModelName not exist")
}
param := &hpcAC.CreateParams{
AcceleratorType: strings.ToLower(DCU),
ContainerPortInfoList: containerPortInfoList,
CpuNumber: 8,
Description: desc,
//env
GpuNumber: 1,
ImagePath: "11.11.100.6:5000/dcu/admin/base/jupyterlab-pytorch:1.13.1-py3.7-dtk23.04-centos7.6",
InstanceServiceName: instanceServiceName,
MountInfoList: make([]*hpcAC.MountInfoList, 0),
//originalVersion
RamSize: 10 * RAM_SIZE_1G,
//rdma
ResourceGroup: resourceGroup,
StartScriptActionScope: "all",
StartScriptContent: script,
//startServiceCommand
//taskClassification: "interactive"
TaskNumber: 1,
TaskType: JUPYTER,
TimeoutLimit: "01:00:00",
UseStartScript: true,
//useStartServiceCommand: false
Version: "jupyterlab-pytorch:1.13.1-py3.7-dtk23.04-centos7.6",
}
req := &hpcacclient.CreateInstanceServiceReq{
Data: param,
}
resp, err := s.aCRpc.CreateInstanceService(ctx, req)
if err != nil {
return "", err
}
if resp.Code != "0" {
return "", errors.New(resp.Msg)
}
return resp.Data, nil
}
func (s *ShuguangAi) CheckModelExistence(ctx context.Context, name string, mtype string) bool {
return false
modelPath := "model" + FORWARD_SLASH + name
req := &hpcAC.IsExistFileReq{
Path: KUNSHAN_DIR + FORWARD_SLASH + modelPath,
}
resp, err := s.aCRpc.IsExistFile(ctx, req)
if err != nil {
return false
}
if resp.Code != "0" || resp.Data == nil {
return false
}
return resp.Data.Exist
}