From bcb664c704074d168090182746eb98b1b965975e Mon Sep 17 00:00:00 2001 From: tzwang Date: Mon, 2 Sep 2024 17:20:45 +0800 Subject: [PATCH 1/5] updated createdeploytask logic Former-commit-id: b90f79d81f32bc7f78b9978717a076e807e42d17 --- .../logic/inference/createdeploytasklogic.go | 2 +- .../logic/inference/deployinstancelistlogic.go | 17 ++++++++++++++--- internal/scheduler/database/aiStorage.go | 14 ++++++++++++-- internal/storeLink/octopus.go | 6 ++++++ 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/internal/logic/inference/createdeploytasklogic.go b/internal/logic/inference/createdeploytasklogic.go index 318950ec..27f561bc 100644 --- a/internal/logic/inference/createdeploytasklogic.go +++ b/internal/logic/inference/createdeploytasklogic.go @@ -95,7 +95,7 @@ func (l *CreateDeployTaskLogic) createDeployInstance(taskId int64, adapterId str return err } - _, err = l.svcCtx.Scheduler.AiStorages.SaveInferDeployInstance(taskId, ins.InstanceId, ins.InstanceName, aid, adapterName, cid, clusterName, ins.ModelName, ins.ModelType, ins.InferCard) + _, err = l.svcCtx.Scheduler.AiStorages.SaveInferDeployInstance(taskId, ins.InstanceId, ins.InstanceName, aid, adapterName, cid, clusterName, ins.ModelName, ins.ModelType, ins.InferCard, ins.ClusterType) if err != nil { return err } diff --git a/internal/logic/inference/deployinstancelistlogic.go b/internal/logic/inference/deployinstancelistlogic.go index 053d2fe7..af0c7abc 100644 --- a/internal/logic/inference/deployinstancelistlogic.go +++ b/internal/logic/inference/deployinstancelistlogic.go @@ -52,7 +52,10 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi return nil, errors.New(err.Error()) } - deployTasks := l.GenerateDeployTasks(tasklist) + deployTasks, err := l.GenerateDeployTasks(tasklist) + if err != nil { + return nil, errors.New(err.Error()) + } slices := make([][]*models.AiInferDeployInstance, len(deployTasks)) for i := 0; i < len(deployTasks); i++ { slices[i] = deployTasks[i].Instances @@ -83,12 +86,20 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi return } -func (l *DeployInstanceListLogic) GenerateDeployTasks(tasklist []*models.AiDeployInstanceTask) []*DeployTask { +func (l *DeployInstanceListLogic) GenerateDeployTasks(tasklist []*models.AiDeployInstanceTask) ([]*DeployTask, error) { var tasks []*DeployTask for _, t := range tasklist { list, err := l.svcCtx.Scheduler.AiStorages.GetInstanceListByDeployTaskId(t.Id) if err != nil { logx.Errorf("db GetInstanceListByDeployTaskId error") + return nil, errors.New(err.Error()) + } + if len(list) == 0 { + err := l.svcCtx.Scheduler.AiStorages.DeleteDeployTaskById(t.Id) + if err != nil { + logx.Errorf("db DeleteByDeployTaskId error") + return nil, errors.New(err.Error()) + } continue } deployTask := &DeployTask{ @@ -99,7 +110,7 @@ func (l *DeployInstanceListLogic) GenerateDeployTasks(tasklist []*models.AiDeplo } tasks = append(tasks, deployTask) } - return tasks + return tasks, nil } type DeployTask struct { diff --git a/internal/scheduler/database/aiStorage.go b/internal/scheduler/database/aiStorage.go index 603f5719..3cc70c7b 100644 --- a/internal/scheduler/database/aiStorage.go +++ b/internal/scheduler/database/aiStorage.go @@ -374,7 +374,7 @@ func (s *AiStorage) AddNoticeInfo(adapterId string, adapterName string, clusterI } func (s *AiStorage) SaveInferDeployInstance(taskId int64, instanceId string, instanceName string, adapterId int64, - adapterName string, clusterId int64, clusterName string, modelName string, modelType string, inferCard string) (int64, error) { + adapterName string, clusterId int64, clusterName string, modelName string, modelType string, inferCard string, clusterType string) (int64, error) { startTime := time.Now().Format(time.RFC3339) // 构建主任务结构体 insModel := models.AiInferDeployInstance{ @@ -388,7 +388,8 @@ func (s *AiStorage) SaveInferDeployInstance(taskId int64, instanceId string, ins ModelName: modelName, ModelType: modelType, InferCard: inferCard, - Status: constants.Stopped, + ClusterType: clusterType, + Status: constants.Deploying, CreateTime: startTime, UpdateTime: startTime, } @@ -464,6 +465,15 @@ func (s *AiStorage) UpdateDeployTask(task *models.AiDeployInstanceTask, needUpda return nil } +func (s *AiStorage) DeleteDeployTaskById(id int64) error { + tx := s.DbEngin.Delete(&models.AiDeployInstanceTask{}, id) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return tx.Error + } + return nil +} + func (s *AiStorage) UpdateDeployTaskById(id int64) error { task, err := s.GetDeployTaskById(id) if err != nil { diff --git a/internal/storeLink/octopus.go b/internal/storeLink/octopus.go index 2317ad7a..0cb68ac7 100644 --- a/internal/storeLink/octopus.go +++ b/internal/storeLink/octopus.go @@ -1158,10 +1158,14 @@ func (o *OctopusLink) GetInferDeployInstance(ctx context.Context, id string) (*i url := strings.Replace(resp.Payload.Notebook.Tasks[0].Url, FORWARD_SLASH, "", -1) inferUrl := DOMAIN + url + var modelType string + var modelName string var card string if resp.Payload.Notebook.Desc != "" { str := strings.Split(resp.Payload.Notebook.Desc, FORWARD_SLASH) if len(str) == 3 { + modelType = str[0] + modelName = str[1] card = str[2] } } @@ -1171,6 +1175,8 @@ func (o *OctopusLink) GetInferDeployInstance(ctx context.Context, id string) (*i ins.ClusterName = o.platform ins.Status = resp.Payload.Notebook.Status ins.ClusterType = TYPE_OCTOPUS + ins.ModelType = modelType + ins.ModelName = modelName ins.InferUrl = inferUrl ins.InferCard = card From 91bfc6e6b3f29ceb5c878cc7e73637d34341c32e Mon Sep 17 00:00:00 2001 From: tzwang Date: Mon, 2 Sep 2024 17:25:31 +0800 Subject: [PATCH 2/5] updated getadapterbymodel api Former-commit-id: 8a6175e6d646a05c9298b7d9747a671b37f43dd7 --- desc/inference/inference.api | 2 +- internal/types/types.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/desc/inference/inference.api b/desc/inference/inference.api index 07d8ee01..f6ac8568 100644 --- a/desc/inference/inference.api +++ b/desc/inference/inference.api @@ -200,7 +200,7 @@ type ( AdapterAvail { AdapterId string `json:"adapterId"` - AdapterName string `json:"taskName"` + AdapterName string `json:"adapterName"` Clusters []*ClusterAvail `json:"clusters"` } diff --git a/internal/types/types.go b/internal/types/types.go index 7d5da4ef..284d0383 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -6096,7 +6096,7 @@ type GetAdaptersByModelResp struct { type AdapterAvail struct { AdapterId string `json:"adapterId"` - AdapterName string `json:"taskName"` + AdapterName string `json:"adapterName"` Clusters []*ClusterAvail `json:"clusters"` } From 73bd1a7f08d7e7fa7154d332c0918227623f13b6 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 4 Sep 2024 10:47:22 +0800 Subject: [PATCH 3/5] updated deployinstancelist logic Former-commit-id: 2be82315a1c11d3aded5202ca97487720ed97c60 --- internal/logic/inference/deployinstancelistlogic.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/logic/inference/deployinstancelistlogic.go b/internal/logic/inference/deployinstancelistlogic.go index af0c7abc..a6794bef 100644 --- a/internal/logic/inference/deployinstancelistlogic.go +++ b/internal/logic/inference/deployinstancelistlogic.go @@ -114,8 +114,8 @@ func (l *DeployInstanceListLogic) GenerateDeployTasks(tasklist []*models.AiDeplo } type DeployTask struct { - Id int64 `json:"id,string"` - Name string `json:"name,string"` - Desc string `json:"desc,string"` + Id int64 `json:"id,string"` + Name string + Desc string Instances []*models.AiInferDeployInstance `json:"instances,string"` } From f7e3e91887312b141545a2e7c908505515f7d161 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 4 Sep 2024 15:44:04 +0800 Subject: [PATCH 4/5] updated texttotextinference api Former-commit-id: 1c9205ceaebf909eadfb52724189258c6326782d --- desc/inference/inference.api | 4 +--- internal/types/types.go | 10 ++++------ 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/desc/inference/inference.api b/desc/inference/inference.api index f6ac8568..cb67b4d8 100644 --- a/desc/inference/inference.api +++ b/desc/inference/inference.api @@ -79,10 +79,8 @@ type ( TextToTextInferenceReq{ TaskName string `form:"taskName"` TaskDesc string `form:"taskDesc"` - ModelName string `form:"modelName"` ModelType string `form:"modelType"` - AdapterId string `form:"adapterId"` - AiClusterIds []string `form:"aiClusterIds"` + InstanceId int64 `form:"instanceId"` } TextToTextInferenceResp{ diff --git a/internal/types/types.go b/internal/types/types.go index 284d0383..4cbdd876 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -5976,12 +5976,10 @@ type InferenceResult struct { } type TextToTextInferenceReq struct { - TaskName string `form:"taskName"` - TaskDesc string `form:"taskDesc"` - ModelName string `form:"modelName"` - ModelType string `form:"modelType"` - AdapterId string `form:"adapterId"` - AiClusterIds []string `form:"aiClusterIds"` + TaskName string `form:"taskName"` + TaskDesc string `form:"taskDesc"` + ModelType string `form:"modelType"` + InstanceId int64 `form:"instanceId"` } type TextToTextInferenceResp struct { From ec64ec20072e7d9c19bd0221acdb9e01c678fdb0 Mon Sep 17 00:00:00 2001 From: tzwang Date: Wed, 4 Sep 2024 16:37:31 +0800 Subject: [PATCH 5/5] updated texttotextinference logic Former-commit-id: 74abd736b1765b4fbe4400b1a59d18577043630e --- .../inference/deployinstancelistlogic.go | 6 ++-- .../inference/texttotextinferencelogic.go | 29 ++++++++------- .../inference/textInference/textInference.go | 24 ++++++------- .../inference/textInference/textToText.go | 35 +++++++++++++++++-- 4 files changed, 63 insertions(+), 31 deletions(-) diff --git a/internal/logic/inference/deployinstancelistlogic.go b/internal/logic/inference/deployinstancelistlogic.go index a6794bef..6f17c7cd 100644 --- a/internal/logic/inference/deployinstancelistlogic.go +++ b/internal/logic/inference/deployinstancelistlogic.go @@ -114,8 +114,8 @@ func (l *DeployInstanceListLogic) GenerateDeployTasks(tasklist []*models.AiDeplo } type DeployTask struct { - Id int64 `json:"id,string"` - Name string - Desc string + Id int64 `json:"id,string"` + Name string `json:"name"` + Desc string `json:"desc"` Instances []*models.AiInferDeployInstance `json:"instances,string"` } diff --git a/internal/logic/inference/texttotextinferencelogic.go b/internal/logic/inference/texttotextinferencelogic.go index 2507f2c6..0d73ec1d 100644 --- a/internal/logic/inference/texttotextinferencelogic.go +++ b/internal/logic/inference/texttotextinferencelogic.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference/textInference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "strconv" ) type TextToTextInferenceLogic struct { @@ -27,27 +28,31 @@ func NewTextToTextInferenceLogic(ctx context.Context, svcCtx *svc.ServiceContext func (l *TextToTextInferenceLogic) TextToTextInference(req *types.TextToTextInferenceReq) (resp *types.TextToTextInferenceResp, err error) { resp = &types.TextToTextInferenceResp{} - opt := &option.InferOption{ - TaskName: req.TaskName, - TaskDesc: req.TaskDesc, - AdapterId: req.AdapterId, - AiClusterIds: req.AiClusterIds, - ModelName: req.ModelName, - ModelType: req.ModelType, + + instance, err := l.svcCtx.Scheduler.AiStorages.GetInferDeployInstanceById(req.InstanceId) + if err != nil { + return nil, err + } + if instance == nil { + return nil, errors.New("instance is empty ") } - _, ok := l.svcCtx.Scheduler.AiService.AiCollectorAdapterMap[opt.AdapterId] - if !ok { - return nil, errors.New("AdapterId does not exist") + adapterId := strconv.FormatInt(instance.AdapterId, 10) + + opt := &option.InferOption{ + TaskName: req.TaskName, + TaskDesc: req.TaskDesc, + ModelType: req.ModelType, + AdapterId: adapterId, } adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(opt.AdapterId) - inType, err := textInference.NewTextToText(opt, l.svcCtx.Scheduler.AiStorages, l.svcCtx.Scheduler.AiService.InferenceAdapterMap) + infer, err := textInference.NewTextToText(opt, l.svcCtx.Scheduler.AiStorages, l.svcCtx.Scheduler.AiService.InferenceAdapterMap, instance) if err != nil { return nil, err } - textInfer, err := textInference.New(inType, opt, l.svcCtx.Scheduler.AiStorages, l.svcCtx.Scheduler.AiService.InferenceAdapterMap, adapterName) + textInfer, err := textInference.New(infer, opt, l.svcCtx.Scheduler.AiStorages, adapterName) if err != nil { return nil, err } diff --git a/internal/scheduler/service/inference/textInference/textInference.go b/internal/scheduler/service/inference/textInference/textInference.go index c0709c21..239754fa 100644 --- a/internal/scheduler/service/inference/textInference/textInference.go +++ b/internal/scheduler/service/inference/textInference/textInference.go @@ -17,30 +17,28 @@ type FilteredCluster struct { urls []*inference.InferUrl clusterId string clusterName string + clusterType string } type TextInference struct { - inference ITextInference - opt *option.InferOption - storage *database.AiStorage - inferAdapter map[string]map[string]inference.ICluster - errMap map[string]string - adapterName string + inference ITextInference + opt *option.InferOption + storage *database.AiStorage + errMap map[string]string + adapterName string } func New( inference ITextInference, opt *option.InferOption, storage *database.AiStorage, - inferAdapter map[string]map[string]inference.ICluster, adapterName string) (*TextInference, error) { return &TextInference{ - inference: inference, - opt: opt, - storage: storage, - inferAdapter: inferAdapter, - adapterName: adapterName, - errMap: make(map[string]string), + inference: inference, + opt: opt, + storage: storage, + adapterName: adapterName, + errMap: make(map[string]string), }, nil } diff --git a/internal/scheduler/service/inference/textInference/textToText.go b/internal/scheduler/service/inference/textInference/textToText.go index 3851a35e..e631049e 100644 --- a/internal/scheduler/service/inference/textInference/textToText.go +++ b/internal/scheduler/service/inference/textInference/textToText.go @@ -22,11 +22,12 @@ type TextToText struct { opt *option.InferOption storage *database.AiStorage inferAdapter map[string]map[string]inference.ICluster + instance *models.AiInferDeployInstance cs []*FilteredCluster } -func NewTextToText(opt *option.InferOption, storage *database.AiStorage, inferAdapter map[string]map[string]inference.ICluster) (*TextToText, error) { - cs, err := filterClusters(opt, storage, inferAdapter) +func NewTextToText(opt *option.InferOption, storage *database.AiStorage, inferAdapter map[string]map[string]inference.ICluster, instance *models.AiInferDeployInstance) (*TextToText, error) { + cs, err := filterClusters(inferAdapter, instance) if err != nil { return nil, err } @@ -64,7 +65,35 @@ func (tt *TextToText) SaveAiTask(id int64, adapterName string) error { return nil } -func filterClusters(opt *option.InferOption, storage *database.AiStorage, inferAdapter map[string]map[string]inference.ICluster) ([]*FilteredCluster, error) { +func filterClusters(inferAdapter map[string]map[string]inference.ICluster, instance *models.AiInferDeployInstance) ([]*FilteredCluster, error) { + var cs []*FilteredCluster + var inferurls []*inference.InferUrl + clusterId := strconv.FormatInt(instance.ClusterId, 10) + adapterId := strconv.FormatInt(instance.AdapterId, 10) + r := http.Request{} + deployInstance, err := inferAdapter[adapterId][clusterId].GetInferDeployInstance(r.Context(), instance.InstanceId) + if err != nil { + return nil, err + } + var url inference.InferUrl + url.Url = deployInstance.InferUrl + inference.FORWARD_SLASH + CHAT + url.Card = deployInstance.InferCard + inferurls = append(inferurls, &url) + + clusterType := deployInstance.ClusterType + clusterName := deployInstance.ClusterName + + var f FilteredCluster + f.urls = inferurls + f.clusterId = clusterId + f.clusterName = clusterName + f.clusterType = clusterType + cs = append(cs, &f) + + return cs, nil +} + +func filterClustersTemp(opt *option.InferOption, storage *database.AiStorage, inferAdapter map[string]map[string]inference.ICluster) ([]*FilteredCluster, error) { var wg sync.WaitGroup var ch = make(chan *FilteredCluster, len(opt.AiClusterIds)) var cs []*FilteredCluster