diff --git a/.github/workflows/sync.yml b/.github/workflows/sync.yml index 8944fda0..955dcb02 100644 --- a/.github/workflows/sync.yml +++ b/.github/workflows/sync.yml @@ -2,7 +2,7 @@ name: Sync Mirror Repository on: schedule: - - cron: '0 * * * *' # 每小时同步一次 + - cron: '0 */8 * * *' # 每小时同步一次 workflow_dispatch: # 允许手动触发 jobs: diff --git a/go.mod b/go.mod index 99eda213..4d92673b 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/prometheus/common v0.59.1 github.com/robfig/cron/v3 v3.0.1 github.com/zeromicro/go-zero v1.7.2 - gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1 + gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240918015229-59c579d1a437 gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240909072501-939c3144cd9e gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 diff --git a/go.sum b/go.sum index 4b6cd18b..f5302a5e 100644 --- a/go.sum +++ b/go.sum @@ -466,8 +466,8 @@ github.com/yuin/gopher-lua v1.1.1 h1:kYKnWBjvbNP4XLT3+bPEwAXJx262OhaHDWDVOPjL46M github.com/yuin/gopher-lua v1.1.1/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/zeromicro/go-zero v1.7.2 h1:a8lyVOG3KXG4LrAy6ZmtJTJtisX4Ostc4Pst4fE704I= github.com/zeromicro/go-zero v1.7.2/go.mod h1:WFXfF92Exw0O7WECifS6r99JSzv4KEN49x9RhAfgkMc= -gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1 h1:DicBXoQiC6mumMBeyqSPNrsjtqJIgk5Pv2hscu2xryw= -gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240725071305-f751eec4dde1/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= +gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240918015229-59c579d1a437 h1:ta6h9+FU7AQ2fNyQiXrZnMdlNBjOKdyBx4e3RF7BE84= +gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20240918015229-59c579d1a437/go.mod h1:3eECiw9O2bIFkkePlloKyLNXiqBAhOxNrDoGaaGseGY= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240909072501-939c3144cd9e h1:6LYJggBoeAQxy/otzWjt40Pa7gnVvUR4c5YMi6A/NdU= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20240909072501-939c3144cd9e/go.mod h1:/eOmBFZKWGoabG3sRVkVvIbLwsd2631k4jkUBR6x1AA= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 h1:GaXwr5sgDh0raHjUf9IewTvnRvajYea7zbLsaerYyXo= diff --git a/internal/cron/cron.go b/internal/cron/cron.go index 09394d29..bd8624c1 100644 --- a/internal/cron/cron.go +++ b/internal/cron/cron.go @@ -16,7 +16,8 @@ package cron import ( "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/stat" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" ) @@ -28,20 +29,24 @@ func AddCronGroup(svc *svc.ServiceContext) { logx.Errorf(err.Error()) return } - updater.UpdateTaskStatus(svc, list) - updater.UpdateAiTaskStatus(svc, list) + status.UpdateTaskStatus(svc, list) + status.UpdateAiTaskStatus(svc, list) }) svc.Cron.AddFunc("*/5 * * * * ?", func() { UpdateAiAdapterMaps(svc) }) - svc.Cron.AddFunc("*/59 * * * * ?", func() { + svc.Cron.AddFunc("30 * * * * ?", func() { adapterList, err := svc.Scheduler.AiStorages.GetAdaptersByType("1") if err != nil { logx.Errorf(err.Error()) return } - updater.UpdateClusterResources(svc, adapterList) + stat.UpdateClusterResources(svc, adapterList) + }) + + svc.Cron.AddFunc("@hourly", func() { + status.UpdateAutoStoppedInstance(svc) }) } diff --git a/internal/logic/ai/getcenteroverviewlogic.go b/internal/logic/ai/getcenteroverviewlogic.go index b52c8718..6aeecbe9 100644 --- a/internal/logic/ai/getcenteroverviewlogic.go +++ b/internal/logic/ai/getcenteroverviewlogic.go @@ -3,7 +3,7 @@ package ai import ( "context" "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/stat" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" ) @@ -37,7 +37,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview centerNum = int32(len(adapterList)) resp.CenterNum = centerNum - go updater.UpdateClusterResources(l.svcCtx, adapterList) + go stat.UpdateClusterResources(l.svcCtx, adapterList) for _, adapter := range adapterList { taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) diff --git a/internal/logic/ai/getcentertasklistlogic.go b/internal/logic/ai/getcentertasklistlogic.go index 0b55aa21..53828b3a 100644 --- a/internal/logic/ai/getcentertasklistlogic.go +++ b/internal/logic/ai/getcentertasklistlogic.go @@ -3,7 +3,7 @@ package ai import ( "context" "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -32,7 +32,7 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList return nil, err } - go updater.UpdateTrainingTaskStatus(l.svcCtx, adapterList) + go status.UpdateTrainingTaskStatus(l.svcCtx, adapterList) for _, adapter := range adapterList { taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) diff --git a/internal/logic/core/pagelisttasklogic.go b/internal/logic/core/pagelisttasklogic.go index f5a44bc1..811554d8 100644 --- a/internal/logic/core/pagelisttasklogic.go +++ b/internal/logic/core/pagelisttasklogic.go @@ -2,7 +2,7 @@ package core import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" @@ -52,8 +52,8 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa } // 更新智算任务状态 - go updater.UpdateTaskStatus(l.svcCtx, list) - go updater.UpdateAiTaskStatus(l.svcCtx, list) + go status.UpdateTaskStatus(l.svcCtx, list) + go status.UpdateAiTaskStatus(l.svcCtx, list) for _, model := range list { if model.StartTime != "" && model.EndTime == "" { diff --git a/internal/logic/inference/deployinstancelistlogic.go b/internal/logic/inference/deployinstancelistlogic.go index 434dd72f..794673cf 100644 --- a/internal/logic/inference/deployinstancelistlogic.go +++ b/internal/logic/inference/deployinstancelistlogic.go @@ -5,7 +5,7 @@ import ( "errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" @@ -71,7 +71,7 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi list := common.ConcatMultipleSlices(slices) if len(list) != 0 { - go updater.UpdateDeployInstanceStatusBatch(l.svcCtx, list) + go status.UpdateDeployInstanceStatusBatch(l.svcCtx, list, true) ins := list[0] for i := range list { @@ -82,8 +82,8 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi } } - go updater.UpdateDeployInstanceStatus(l.svcCtx, ins, true) - go updater.UpdateDeployTaskStatus(l.svcCtx) + go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true, nil) + go status.UpdateDeployTaskStatus(l.svcCtx) } resp.List = &deployTasks diff --git a/internal/logic/inference/startallbydeploytaskidlogic.go b/internal/logic/inference/startallbydeploytaskidlogic.go index d78f98b9..e50683b4 100644 --- a/internal/logic/inference/startallbydeploytaskidlogic.go +++ b/internal/logic/inference/startallbydeploytaskidlogic.go @@ -5,8 +5,7 @@ import ( "errors" "fmt" "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" @@ -84,7 +83,7 @@ func (l *StartAllByDeployTaskIdLogic) startAll(list []*models.AiInferDeployInsta <-buf return } - if checkStopStatus(in) { + if status.CheckStopStatus(in) { success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StartInferDeployInstance(l.ctx, ins.InstanceId) if !success { e := struct { @@ -136,31 +135,3 @@ func (l *StartAllByDeployTaskIdLogic) startAll(list []*models.AiInferDeployInsta return nil } - -func checkStopStatus(in *inference.DeployInstance) bool { - switch in.ClusterType { - case storeLink.TYPE_OCTOPUS: - switch in.Status { - case "stopped": - return true - default: - return false - } - case storeLink.TYPE_MODELARTS: - switch in.Status { - case "stopped": - return true - default: - return false - } - case storeLink.TYPE_SHUGUANGAI: - switch in.Status { - case "Terminated": - return true - default: - return false - } - default: - return false - } -} diff --git a/internal/logic/inference/startdeployinstancelistlogic.go b/internal/logic/inference/startdeployinstancelistlogic.go index 4b762e0f..bcdb18a0 100644 --- a/internal/logic/inference/startdeployinstancelistlogic.go +++ b/internal/logic/inference/startdeployinstancelistlogic.go @@ -4,7 +4,7 @@ import ( "context" "errors" "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "strconv" @@ -33,12 +33,19 @@ func (l *StartDeployInstanceListLogic) StartDeployInstanceList(req *types.StartD return nil, err } - success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StartInferDeployInstance(l.ctx, req.InstanceId) - if !success { - return nil, errors.New("start instance failed") + in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId) + if err != nil { + return nil, err } - go updater.UpdateDeployInstanceStatus(l.svcCtx, ins, true) + if status.CheckStopStatus(in) { + success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StartInferDeployInstance(l.ctx, in.InstanceId) + if !success { + return nil, errors.New("start instance failed") + } + } + + go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true, nil) return resp, nil } diff --git a/internal/logic/inference/stopallbydeploytaskidlogic.go b/internal/logic/inference/stopallbydeploytaskidlogic.go index c6eaad01..6616c2ef 100644 --- a/internal/logic/inference/stopallbydeploytaskidlogic.go +++ b/internal/logic/inference/stopallbydeploytaskidlogic.go @@ -4,8 +4,7 @@ import ( "context" "errors" "fmt" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" @@ -85,7 +84,7 @@ func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstanc <-buf return } - if checkStatus(in) { + if status.CheckRunningStatus(in) { success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StopInferDeployInstance(l.ctx, ins.InstanceId) if !success { e := struct { @@ -137,31 +136,3 @@ func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstanc return nil } - -func checkStatus(in *inference.DeployInstance) bool { - switch in.ClusterType { - case storeLink.TYPE_OCTOPUS: - switch in.Status { - case "running": - return true - default: - return false - } - case storeLink.TYPE_MODELARTS: - switch in.Status { - case "running": - return true - default: - return false - } - case storeLink.TYPE_SHUGUANGAI: - switch in.Status { - case "Running": - return true - default: - return false - } - default: - return false - } -} diff --git a/internal/logic/inference/stopdeployinstancelogic.go b/internal/logic/inference/stopdeployinstancelogic.go index 919e6aa4..3255a7dc 100644 --- a/internal/logic/inference/stopdeployinstancelogic.go +++ b/internal/logic/inference/stopdeployinstancelogic.go @@ -4,7 +4,7 @@ import ( "context" "errors" "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "strconv" @@ -33,12 +33,19 @@ func (l *StopDeployInstanceLogic) StopDeployInstance(req *types.StopDeployInstan return nil, err } - success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StopInferDeployInstance(l.ctx, req.InstanceId) - if !success { - return nil, errors.New("stop instance failed") + in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId) + if err != nil { + return nil, err } - go updater.UpdateDeployInstanceStatus(l.svcCtx, ins, true) + if status.CheckRunningStatus(in) { + success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StopInferDeployInstance(l.ctx, in.InstanceId) + if !success { + return nil, errors.New("stop instance failed") + } + } + + go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true, nil) return resp, nil } diff --git a/internal/scheduler/service/inference/imageInference/imageInference.go b/internal/scheduler/service/inference/imageInference/imageInference.go index 700eb864..ba524db6 100644 --- a/internal/scheduler/service/inference/imageInference/imageInference.go +++ b/internal/scheduler/service/inference/imageInference/imageInference.go @@ -240,7 +240,7 @@ func (i *ImageInference) inferImages(cs []*FilteredCluster) ([]*types.ImageResul var wg sync.WaitGroup var ch = make(chan *types.ImageResult, len(i.files)) var results []*types.ImageResult - limit := make(chan bool, 7) + limit := make(chan bool, 5) var imageNumIdx int32 = 0 var imageNumIdxEnd int32 = 0 @@ -290,22 +290,22 @@ func (i *ImageInference) updateStatus(aiTaskList []*models.TaskAi, cs []*Filtere //change cluster status if len(i.clusters) != len(cs) { - var acs []*strategy.AssignedCluster - var rcs []*strategy.AssignedCluster + var failedclusters []*strategy.AssignedCluster + var runningclusters []*strategy.AssignedCluster for _, cluster := range i.clusters { if contains(cs, cluster.ClusterId) { var ac *strategy.AssignedCluster ac = cluster - rcs = append(rcs, ac) + runningclusters = append(runningclusters, ac) } else { var ac *strategy.AssignedCluster ac = cluster - acs = append(acs, ac) + failedclusters = append(failedclusters, ac) } } // update failed cluster status - for _, ac := range acs { + for _, ac := range failedclusters { for _, t := range aiTaskList { if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { t.Status = constants.Failed @@ -322,7 +322,7 @@ func (i *ImageInference) updateStatus(aiTaskList []*models.TaskAi, cs []*Filtere } // update running cluster status - for _, ac := range rcs { + for _, ac := range runningclusters { for _, t := range aiTaskList { if ac.ClusterId == strconv.Itoa(int(t.ClusterId)) { t.Status = constants.Running @@ -396,7 +396,6 @@ func (i *ImageInference) sendInferReq(images []*ImageFile, cluster *FilteredClus return } }(image, cluster) - <-limit } } diff --git a/internal/scheduler/service/updater/clusterResources.go b/internal/scheduler/service/utils/stat/clusterResources.go similarity index 99% rename from internal/scheduler/service/updater/clusterResources.go rename to internal/scheduler/service/utils/stat/clusterResources.go index b4c3389f..ca6938ab 100644 --- a/internal/scheduler/service/updater/clusterResources.go +++ b/internal/scheduler/service/utils/stat/clusterResources.go @@ -1,4 +1,4 @@ -package updater +package stat import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" diff --git a/internal/scheduler/service/updater/deployInstance.go b/internal/scheduler/service/utils/status/deployInstance.go similarity index 53% rename from internal/scheduler/service/updater/deployInstance.go rename to internal/scheduler/service/utils/status/deployInstance.go index ecb48329..f82e66dd 100644 --- a/internal/scheduler/service/updater/deployInstance.go +++ b/internal/scheduler/service/utils/status/deployInstance.go @@ -1,6 +1,7 @@ -package updater +package status import ( + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -10,12 +11,15 @@ import ( "time" ) -func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models.AiInferDeployInstance) { +func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models.AiInferDeployInstance, needfilter bool) { list := make([]*models.AiInferDeployInstance, len(insList)) copy(list, insList) - for i := len(list) - 1; i >= 0; i-- { - if list[i].Status == constants.Running || list[i].Status == constants.Stopped { - list = append(list[:i], list[i+1:]...) + + if needfilter { + for i := len(list) - 1; i >= 0; i-- { + if list[i].Status == constants.Running || list[i].Status == constants.Stopped { + list = append(list[:i], list[i+1:]...) + } } } @@ -23,8 +27,10 @@ func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models. return } + buffer := make(chan bool, 3) for _, instance := range list { - go UpdateDeployInstanceStatus(svc, instance, false) + buffer <- true + go UpdateDeployInstanceStatus(svc, instance, false, buffer) } } @@ -47,23 +53,37 @@ func UpdateDeployTaskStatus(svc *svc.ServiceContext) { return } + buffer := make(chan bool, 2) for _, instance := range inslist { - go UpdateDeployInstanceStatus(svc, instance, false) + buffer <- true + go UpdateDeployInstanceStatus(svc, instance, false, buffer) } } -func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInferDeployInstance, updatetime bool) { +func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInferDeployInstance, updatetime bool, ch chan bool) { amap, found := svc.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(instance.AdapterId, 10)] if !found { + if ch != nil { + <-ch + return + } return } cmap, found := amap[strconv.FormatInt(instance.ClusterId, 10)] if !found { + if ch != nil { + <-ch + return + } return } h := http.Request{} ins, err := cmap.GetInferDeployInstance(h.Context(), instance.InstanceId) if err != nil { + if ch != nil { + <-ch + return + } return } switch instance.ClusterType { @@ -71,11 +91,19 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe switch ins.Status { case "running": if instance.Status == constants.Running { + if ch != nil { + <-ch + return + } return } instance.Status = constants.Running case "stopped": if instance.Status == constants.Stopped { + if ch != nil { + <-ch + return + } return } instance.Status = constants.Stopped @@ -86,11 +114,19 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe switch ins.Status { case "running": if instance.Status == constants.Running { + if ch != nil { + <-ch + return + } return } instance.Status = constants.Running case "stopped": if instance.Status == constants.Stopped { + if ch != nil { + <-ch + return + } return } instance.Status = constants.Stopped @@ -101,11 +137,19 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe switch ins.Status { case "Running": if instance.Status == constants.Running { + if ch != nil { + <-ch + return + } return } instance.Status = constants.Running case "Terminated": if instance.Status == constants.Stopped { + if ch != nil { + <-ch + return + } return } instance.Status = constants.Stopped @@ -116,6 +160,84 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe err = svc.Scheduler.AiStorages.UpdateInferDeployInstance(instance, updatetime) if err != nil { + if ch != nil { + <-ch + return + } + return + } + + if ch != nil { + <-ch return } } + +func UpdateAutoStoppedInstance(svc *svc.ServiceContext) { + list, err := svc.Scheduler.AiStorages.GetInferDeployInstanceList() + if err != nil { + return + } + + if len(list) == 0 { + return + } + + UpdateDeployInstanceStatusBatch(svc, list, false) +} + +func CheckStopStatus(in *inference.DeployInstance) bool { + switch in.ClusterType { + case storeLink.TYPE_OCTOPUS: + switch in.Status { + case "stopped": + return true + default: + return false + } + case storeLink.TYPE_MODELARTS: + switch in.Status { + case "stopped": + return true + default: + return false + } + case storeLink.TYPE_SHUGUANGAI: + switch in.Status { + case "Terminated": + return true + default: + return false + } + default: + return false + } +} + +func CheckRunningStatus(in *inference.DeployInstance) bool { + switch in.ClusterType { + case storeLink.TYPE_OCTOPUS: + switch in.Status { + case "running": + return true + default: + return false + } + case storeLink.TYPE_MODELARTS: + switch in.Status { + case "running": + return true + default: + return false + } + case storeLink.TYPE_SHUGUANGAI: + switch in.Status { + case "Running": + return true + default: + return false + } + default: + return false + } +} diff --git a/internal/scheduler/service/updater/taskStatusSync.go b/internal/scheduler/service/utils/status/taskStatusSync.go similarity index 99% rename from internal/scheduler/service/updater/taskStatusSync.go rename to internal/scheduler/service/utils/status/taskStatusSync.go index 649fb616..a691dbe4 100644 --- a/internal/scheduler/service/updater/taskStatusSync.go +++ b/internal/scheduler/service/utils/status/taskStatusSync.go @@ -1,4 +1,4 @@ -package updater +package status import ( "errors" diff --git a/internal/storeLink/modelarts.go b/internal/storeLink/modelarts.go index acc624d5..36022ac5 100644 --- a/internal/storeLink/modelarts.go +++ b/internal/storeLink/modelarts.go @@ -791,11 +791,14 @@ func (m *ModelArtsLink) CreateInferDeployInstance(ctx context.Context, option *o } var configItems []*modelarts.ServiceConfig configItems = append(configItems, configParam) + now := time.Now() + timestampSec := now.Unix() + str := strconv.FormatInt(timestampSec, 10) req := &modelarts.CreateServiceReq{ Platform: m.platform, Config: configItems, InferType: "real-time", - ServiceName: option.ModelName + "_" + option.ModelType + "_" + Npu, + ServiceName: option.ModelName + "_" + option.ModelType + "_" + Npu + "_" + str, } ctx, cancel := context.WithTimeout(context.Background(), 150*time.Second) defer cancel() @@ -812,10 +815,37 @@ func (m *ModelArtsLink) CheckModelExistence(ctx context.Context, name string, mt ModelName: name, ModelType: mtype, } - err := m.GetModelId(ctx, ifoption) + err := m.CheckImageExist(ctx, ifoption) if err != nil { return false } return true } + +func (m *ModelArtsLink) CheckImageExist(ctx context.Context, option *option.InferOption) error { + req := &modelarts.ListImagesReq{ + Limit: m.pageSize, + Offset: m.pageIndex, + } + ListImageResp, err := m.modelArtsRpc.ListImages(ctx, req) + if err != nil { + return err + } + var modelName string + if ListImageResp.Code == 200 { + //return errors.New("failed to get ModelId") + for _, ListImage := range ListImageResp.Data { + if option.ModelName == "ChatGLM-6B" { + modelName = "chatglm-6b" + } else { + modelName = option.ModelName + } + + if ListImage.Name == modelName { + return nil + } + } + } + return errors.New("failed to find Image ") +} diff --git a/internal/storeLink/octopus.go b/internal/storeLink/octopus.go index 0cb68ac7..5fd26e5f 100644 --- a/internal/storeLink/octopus.go +++ b/internal/storeLink/octopus.go @@ -84,10 +84,10 @@ var ( CardModelNameCmdMap = map[string]map[string]string{ BIV100: {"blip-image-captioning-base": "pip install -U transformers; pip install fastapi uvicorn[standard]; pip install python-multipart; cd /code; python infer_biv100.py", "imagenet_resnet50": "pip install -U transformers; pip install fastapi uvicorn[standard]; pip install python-multipart; cd /code/infer; python infer_biv100.py", - "chatGLM_6B": "su root; pip install transformers==4.33.2; pip install fastapi uvicorn[standard]; cd /code; python infer_biv100.py"}, + "ChatGLM_6B": "su root; pip install transformers==4.33.2; pip install fastapi uvicorn[standard]; cd /code; python infer_biv100.py"}, MLU: {"blip-image-captioning-base": "", "imagenet_resnet50": "su root; . /torch/venv3/pytorch/bin/activate; pip install fastapi uvicorn[standard]; pip install python-multipart; cd /code/infer; python infer_mlu.py", - "chatGLM_6B": ""}, + "ChatGLM_6B": ""}, } ) diff --git a/internal/storeLink/shuguangai.go b/internal/storeLink/shuguangai.go index 38f37d14..429f772d 100644 --- a/internal/storeLink/shuguangai.go +++ b/internal/storeLink/shuguangai.go @@ -24,6 +24,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "mime/multipart" "strconv" @@ -800,8 +801,11 @@ func (s *ShuguangAi) GetInferDeployInstanceList(ctx context.Context) ([]*inferen var insList []*inference.DeployInstance params := &hpcAC.GetInstanceServiceListReqParam{ InstanceServiceName: DEPLOY_INSTANCE_PREFIEX, + Status: "", + TaskType: "", Start: 0, Limit: DEPLOY_INSTANCE_LIMIT, + Sort: "desc", } req := &hpcacclient.GetInstanceServiceListReq{ Param: params, @@ -867,6 +871,25 @@ func (s *ShuguangAi) GetInferDeployInstance(ctx context.Context, id string) (*in if err != nil || resp.Code != "0" { return nil, err } + + var url string + if resp.Data.Status == constants.Running { + url = resp.Data.ContainerPortInfoList[0].AccessUrl + } + + var modelType string + var modelName string + var card string + + if resp.Data.Description != "" { + str := strings.Split(resp.Data.Description, FORWARD_SLASH) + if len(str) == 3 { + modelType = str[0] + modelName = str[1] + card = str[2] + } + } + ins.InstanceName = resp.Data.InstanceServiceName ins.InstanceId = resp.Data.Id ins.ClusterName = s.platform @@ -874,6 +897,10 @@ func (s *ShuguangAi) GetInferDeployInstance(ctx context.Context, id string) (*in ins.InferCard = DCU ins.CreatedTime = resp.Data.CreateTime ins.ClusterType = TYPE_SHUGUANGAI + ins.ModelType = modelType + ins.ModelName = modelName + ins.InferUrl = url + ins.InferCard = card return ins, nil }