diff --git a/internal/cron/cron.go b/internal/cron/cron.go index 09394d29..d143333e 100644 --- a/internal/cron/cron.go +++ b/internal/cron/cron.go @@ -16,7 +16,8 @@ package cron import ( "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/stat" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" ) @@ -28,8 +29,8 @@ func AddCronGroup(svc *svc.ServiceContext) { logx.Errorf(err.Error()) return } - updater.UpdateTaskStatus(svc, list) - updater.UpdateAiTaskStatus(svc, list) + status.UpdateTaskStatus(svc, list) + status.UpdateAiTaskStatus(svc, list) }) svc.Cron.AddFunc("*/5 * * * * ?", func() { @@ -42,6 +43,6 @@ func AddCronGroup(svc *svc.ServiceContext) { logx.Errorf(err.Error()) return } - updater.UpdateClusterResources(svc, adapterList) + stat.UpdateClusterResources(svc, adapterList) }) } diff --git a/internal/logic/ai/getcenteroverviewlogic.go b/internal/logic/ai/getcenteroverviewlogic.go index b52c8718..6aeecbe9 100644 --- a/internal/logic/ai/getcenteroverviewlogic.go +++ b/internal/logic/ai/getcenteroverviewlogic.go @@ -3,7 +3,7 @@ package ai import ( "context" "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/stat" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" ) @@ -37,7 +37,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview centerNum = int32(len(adapterList)) resp.CenterNum = centerNum - go updater.UpdateClusterResources(l.svcCtx, adapterList) + go stat.UpdateClusterResources(l.svcCtx, adapterList) for _, adapter := range adapterList { taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) diff --git a/internal/logic/ai/getcentertasklistlogic.go b/internal/logic/ai/getcentertasklistlogic.go index 0b55aa21..53828b3a 100644 --- a/internal/logic/ai/getcentertasklistlogic.go +++ b/internal/logic/ai/getcentertasklistlogic.go @@ -3,7 +3,7 @@ package ai import ( "context" "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -32,7 +32,7 @@ func (l *GetCenterTaskListLogic) GetCenterTaskList() (resp *types.CenterTaskList return nil, err } - go updater.UpdateTrainingTaskStatus(l.svcCtx, adapterList) + go status.UpdateTrainingTaskStatus(l.svcCtx, adapterList) for _, adapter := range adapterList { taskList, err := l.svcCtx.Scheduler.AiStorages.GetAiTasksByAdapterId(adapter.Id) diff --git a/internal/logic/core/pagelisttasklogic.go b/internal/logic/core/pagelisttasklogic.go index f5a44bc1..811554d8 100644 --- a/internal/logic/core/pagelisttasklogic.go +++ b/internal/logic/core/pagelisttasklogic.go @@ -2,7 +2,7 @@ package core import ( "context" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/repository/result" @@ -52,8 +52,8 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa } // 更新智算任务状态 - go updater.UpdateTaskStatus(l.svcCtx, list) - go updater.UpdateAiTaskStatus(l.svcCtx, list) + go status.UpdateTaskStatus(l.svcCtx, list) + go status.UpdateAiTaskStatus(l.svcCtx, list) for _, model := range list { if model.StartTime != "" && model.EndTime == "" { diff --git a/internal/logic/inference/deployinstancelistlogic.go b/internal/logic/inference/deployinstancelistlogic.go index 434dd72f..b6a4b280 100644 --- a/internal/logic/inference/deployinstancelistlogic.go +++ b/internal/logic/inference/deployinstancelistlogic.go @@ -5,7 +5,7 @@ import ( "errors" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" @@ -71,7 +71,7 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi list := common.ConcatMultipleSlices(slices) if len(list) != 0 { - go updater.UpdateDeployInstanceStatusBatch(l.svcCtx, list) + go status.UpdateDeployInstanceStatusBatch(l.svcCtx, list, true) ins := list[0] for i := range list { @@ -82,8 +82,8 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi } } - go updater.UpdateDeployInstanceStatus(l.svcCtx, ins, true) - go updater.UpdateDeployTaskStatus(l.svcCtx) + go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true) + go status.UpdateDeployTaskStatus(l.svcCtx) } resp.List = &deployTasks diff --git a/internal/logic/inference/startallbydeploytaskidlogic.go b/internal/logic/inference/startallbydeploytaskidlogic.go index d78f98b9..e50683b4 100644 --- a/internal/logic/inference/startallbydeploytaskidlogic.go +++ b/internal/logic/inference/startallbydeploytaskidlogic.go @@ -5,8 +5,7 @@ import ( "errors" "fmt" "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" @@ -84,7 +83,7 @@ func (l *StartAllByDeployTaskIdLogic) startAll(list []*models.AiInferDeployInsta <-buf return } - if checkStopStatus(in) { + if status.CheckStopStatus(in) { success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StartInferDeployInstance(l.ctx, ins.InstanceId) if !success { e := struct { @@ -136,31 +135,3 @@ func (l *StartAllByDeployTaskIdLogic) startAll(list []*models.AiInferDeployInsta return nil } - -func checkStopStatus(in *inference.DeployInstance) bool { - switch in.ClusterType { - case storeLink.TYPE_OCTOPUS: - switch in.Status { - case "stopped": - return true - default: - return false - } - case storeLink.TYPE_MODELARTS: - switch in.Status { - case "stopped": - return true - default: - return false - } - case storeLink.TYPE_SHUGUANGAI: - switch in.Status { - case "Terminated": - return true - default: - return false - } - default: - return false - } -} diff --git a/internal/logic/inference/startdeployinstancelistlogic.go b/internal/logic/inference/startdeployinstancelistlogic.go index 4b762e0f..db13d6cb 100644 --- a/internal/logic/inference/startdeployinstancelistlogic.go +++ b/internal/logic/inference/startdeployinstancelistlogic.go @@ -4,7 +4,7 @@ import ( "context" "errors" "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "strconv" @@ -33,12 +33,16 @@ func (l *StartDeployInstanceListLogic) StartDeployInstanceList(req *types.StartD return nil, err } - success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StartInferDeployInstance(l.ctx, req.InstanceId) - if !success { - return nil, errors.New("start instance failed") + in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId) + + if status.CheckStopStatus(in) { + success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StartInferDeployInstance(l.ctx, req.InstanceId) + if !success { + return nil, errors.New("start instance failed") + } } - go updater.UpdateDeployInstanceStatus(l.svcCtx, ins, true) + go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true) return resp, nil } diff --git a/internal/logic/inference/stopallbydeploytaskidlogic.go b/internal/logic/inference/stopallbydeploytaskidlogic.go index c6eaad01..6616c2ef 100644 --- a/internal/logic/inference/stopallbydeploytaskidlogic.go +++ b/internal/logic/inference/stopallbydeploytaskidlogic.go @@ -4,8 +4,7 @@ import ( "context" "errors" "fmt" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" @@ -85,7 +84,7 @@ func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstanc <-buf return } - if checkStatus(in) { + if status.CheckRunningStatus(in) { success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StopInferDeployInstance(l.ctx, ins.InstanceId) if !success { e := struct { @@ -137,31 +136,3 @@ func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstanc return nil } - -func checkStatus(in *inference.DeployInstance) bool { - switch in.ClusterType { - case storeLink.TYPE_OCTOPUS: - switch in.Status { - case "running": - return true - default: - return false - } - case storeLink.TYPE_MODELARTS: - switch in.Status { - case "running": - return true - default: - return false - } - case storeLink.TYPE_SHUGUANGAI: - switch in.Status { - case "Running": - return true - default: - return false - } - default: - return false - } -} diff --git a/internal/logic/inference/stopdeployinstancelogic.go b/internal/logic/inference/stopdeployinstancelogic.go index 919e6aa4..6678d6c1 100644 --- a/internal/logic/inference/stopdeployinstancelogic.go +++ b/internal/logic/inference/stopdeployinstancelogic.go @@ -4,7 +4,7 @@ import ( "context" "errors" "github.com/zeromicro/go-zero/core/logx" - "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/updater" + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "strconv" @@ -33,12 +33,16 @@ func (l *StopDeployInstanceLogic) StopDeployInstance(req *types.StopDeployInstan return nil, err } - success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StopInferDeployInstance(l.ctx, req.InstanceId) - if !success { - return nil, errors.New("stop instance failed") + in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId) + + if status.CheckRunningStatus(in) { + success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StopInferDeployInstance(l.ctx, req.InstanceId) + if !success { + return nil, errors.New("stop instance failed") + } } - go updater.UpdateDeployInstanceStatus(l.svcCtx, ins, true) + go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true) return resp, nil } diff --git a/internal/scheduler/service/updater/clusterResources.go b/internal/scheduler/service/utils/stat/clusterResources.go similarity index 99% rename from internal/scheduler/service/updater/clusterResources.go rename to internal/scheduler/service/utils/stat/clusterResources.go index b4c3389f..ca6938ab 100644 --- a/internal/scheduler/service/updater/clusterResources.go +++ b/internal/scheduler/service/utils/stat/clusterResources.go @@ -1,4 +1,4 @@ -package updater +package stat import ( "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" diff --git a/internal/scheduler/service/updater/deployInstance.go b/internal/scheduler/service/utils/status/deployInstance.go similarity index 64% rename from internal/scheduler/service/updater/deployInstance.go rename to internal/scheduler/service/utils/status/deployInstance.go index ecb48329..40b69d48 100644 --- a/internal/scheduler/service/updater/deployInstance.go +++ b/internal/scheduler/service/utils/status/deployInstance.go @@ -1,6 +1,7 @@ -package updater +package status import ( + "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" @@ -10,12 +11,15 @@ import ( "time" ) -func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models.AiInferDeployInstance) { +func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models.AiInferDeployInstance, needfilter bool) { list := make([]*models.AiInferDeployInstance, len(insList)) copy(list, insList) - for i := len(list) - 1; i >= 0; i-- { - if list[i].Status == constants.Running || list[i].Status == constants.Stopped { - list = append(list[:i], list[i+1:]...) + + if needfilter { + for i := len(list) - 1; i >= 0; i-- { + if list[i].Status == constants.Running || list[i].Status == constants.Stopped { + list = append(list[:i], list[i+1:]...) + } } } @@ -119,3 +123,72 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe return } } + +func UpdateAutoStoppedInstance(svc *svc.ServiceContext) { + list, err := svc.Scheduler.AiStorages.GetInferDeployInstanceList() + if err != nil { + return + } + + if len(list) == 0 { + return + } + + UpdateDeployInstanceStatusBatch(svc, list, false) +} + +func CheckStopStatus(in *inference.DeployInstance) bool { + switch in.ClusterType { + case storeLink.TYPE_OCTOPUS: + switch in.Status { + case "stopped": + return true + default: + return false + } + case storeLink.TYPE_MODELARTS: + switch in.Status { + case "stopped": + return true + default: + return false + } + case storeLink.TYPE_SHUGUANGAI: + switch in.Status { + case "Terminated": + return true + default: + return false + } + default: + return false + } +} + +func CheckRunningStatus(in *inference.DeployInstance) bool { + switch in.ClusterType { + case storeLink.TYPE_OCTOPUS: + switch in.Status { + case "running": + return true + default: + return false + } + case storeLink.TYPE_MODELARTS: + switch in.Status { + case "running": + return true + default: + return false + } + case storeLink.TYPE_SHUGUANGAI: + switch in.Status { + case "Running": + return true + default: + return false + } + default: + return false + } +} diff --git a/internal/scheduler/service/updater/taskStatusSync.go b/internal/scheduler/service/utils/status/taskStatusSync.go similarity index 99% rename from internal/scheduler/service/updater/taskStatusSync.go rename to internal/scheduler/service/utils/status/taskStatusSync.go index 649fb616..a691dbe4 100644 --- a/internal/scheduler/service/updater/taskStatusSync.go +++ b/internal/scheduler/service/utils/status/taskStatusSync.go @@ -1,4 +1,4 @@ -package updater +package status import ( "errors"