From 6e872593c2329da8e83d2c8763a37aa568083911 Mon Sep 17 00:00:00 2001 From: tzwang Date: Mon, 9 Sep 2024 15:27:38 +0800 Subject: [PATCH] fix stopall startall logic Former-commit-id: 80ee208c157e57a4ea9b88a552b6892e23c0ee0a --- .../inference/startallbydeploytaskidlogic.go | 98 ++++++++++++++++--- .../inference/stopallbydeploytaskidlogic.go | 98 ++++++++++++++++--- 2 files changed, 174 insertions(+), 22 deletions(-) diff --git a/internal/logic/inference/startallbydeploytaskidlogic.go b/internal/logic/inference/startallbydeploytaskidlogic.go index db87a515..5074d1f5 100644 --- a/internal/logic/inference/startallbydeploytaskidlogic.go +++ b/internal/logic/inference/startallbydeploytaskidlogic.go @@ -3,12 +3,15 @@ package inference import ( "context" "errors" + "fmt" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "strconv" + "sync" ) type StartAllByDeployTaskIdLogic struct { @@ -35,17 +38,8 @@ func (l *StartAllByDeployTaskIdLogic) StartAllByDeployTaskId(req *types.StartAll return nil, err } - for _, ins := range list { - in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId) - if err != nil { - return nil, err - } - if checkStopStatus(in) { - success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StartInferDeployInstance(l.ctx, ins.InstanceId) - if !success { - return nil, errors.New(ins.InstanceName + " start failed") - } - } + if len(list) == 0 { + return nil, errors.New("instances are empty") } err = l.svcCtx.Scheduler.AiStorages.UpdateDeployTaskById(id) @@ -53,9 +47,91 @@ func (l *StartAllByDeployTaskIdLogic) StartAllByDeployTaskId(req *types.StartAll return nil, err } + err = l.startAll(list) + if err != nil { + return nil, err + } + return resp, nil } +func (l *StartAllByDeployTaskIdLogic) startAll(list []*models.AiInferDeployInstance) error { + var wg sync.WaitGroup + var errCh = make(chan interface{}, len(list)) + var errs []interface{} + buf := make(chan bool, 2) + + for _, instance := range list { + wg.Add(1) + ins := instance + buf <- true + go func() { + in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId) + if err != nil { + e := struct { + errTyp uint8 + err error + instanceName string + clusterName string + }{ + errTyp: 1, + err: err, + instanceName: ins.InstanceName, + clusterName: ins.ClusterName, + } + errCh <- e + wg.Done() + <-buf + } + if checkStopStatus(in) { + success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StartInferDeployInstance(l.ctx, ins.InstanceId) + if !success { + e := struct { + errTyp uint8 + err error + instanceName string + clusterName string + }{ + errTyp: 2, + err: err, + instanceName: ins.InstanceName, + clusterName: ins.ClusterName, + } + errCh <- e + wg.Done() + <-buf + } + } + }() + <-buf + } + + for e := range errCh { + errs = append(errs, e) + } + + if len(errs) != 0 { + var msg string + for _, err := range errs { + e := (err).(struct { + errTyp uint8 + err error + instanceName string + clusterName string + }) + switch e.errTyp { + case 1: + msg += fmt.Sprintf("GetInstance Failed # clusterName: %v , instanceName: %v , error: %v \n", e.clusterName, e.instanceName, e.err.Error()) + case 2: + msg += fmt.Sprintf("StartInstance Failed # clusterName: %v , instanceName: %v , error: %v \n", e.clusterName, e.instanceName, e.err.Error()) + } + } + return errors.New(msg) + } + + return nil +} + func checkStopStatus(in *inference.DeployInstance) bool { switch in.ClusterType { case storeLink.TYPE_OCTOPUS: diff --git a/internal/logic/inference/stopallbydeploytaskidlogic.go b/internal/logic/inference/stopallbydeploytaskidlogic.go index d0ebc23a..d4cc2b36 100644 --- a/internal/logic/inference/stopallbydeploytaskidlogic.go +++ b/internal/logic/inference/stopallbydeploytaskidlogic.go @@ -3,11 +3,14 @@ package inference import ( "context" "errors" + "fmt" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" + "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "strconv" + "sync" "github.com/zeromicro/go-zero/core/logx" ) @@ -36,17 +39,8 @@ func (l *StopAllByDeployTaskIdLogic) StopAllByDeployTaskId(req *types.StopAllByD return nil, err } - for _, ins := range list { - in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId) - if err != nil { - return nil, err - } - if checkStatus(in) { - success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StopInferDeployInstance(l.ctx, ins.InstanceId) - if !success { - return nil, errors.New(ins.InstanceName + " stop failed") - } - } + if len(list) == 0 { + return nil, errors.New("instances are empty") } err = l.svcCtx.Scheduler.AiStorages.UpdateDeployTaskById(id) @@ -54,9 +48,91 @@ func (l *StopAllByDeployTaskIdLogic) StopAllByDeployTaskId(req *types.StopAllByD return nil, err } + err = l.stopAll(list) + if err != nil { + return nil, err + } + return resp, nil } +func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstance) error { + var wg sync.WaitGroup + var errCh = make(chan interface{}, len(list)) + var errs []interface{} + buf := make(chan bool, 2) + + for _, instance := range list { + wg.Add(1) + ins := instance + buf <- true + go func() { + in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId) + if err != nil { + e := struct { + errTyp uint8 + err error + instanceName string + clusterName string + }{ + errTyp: 1, + err: err, + instanceName: ins.InstanceName, + clusterName: ins.ClusterName, + } + errCh <- e + wg.Done() + <-buf + } + if checkStatus(in) { + success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StopInferDeployInstance(l.ctx, ins.InstanceId) + if !success { + e := struct { + errTyp uint8 + err error + instanceName string + clusterName string + }{ + errTyp: 2, + err: err, + instanceName: ins.InstanceName, + clusterName: ins.ClusterName, + } + errCh <- e + wg.Done() + <-buf + } + } + }() + <-buf + } + + for e := range errCh { + errs = append(errs, e) + } + + if len(errs) != 0 { + var msg string + for _, err := range errs { + e := (err).(struct { + errTyp uint8 + err error + instanceName string + clusterName string + }) + switch e.errTyp { + case 1: + msg += fmt.Sprintf("GetInstance Failed # clusterName: %v , instanceName: %v , error: %v \n", e.clusterName, e.instanceName, e.err.Error()) + case 2: + msg += fmt.Sprintf("StopInstance Failed # clusterName: %v , instanceName: %v , error: %v \n", e.clusterName, e.instanceName, e.err.Error()) + } + } + return errors.New(msg) + } + + return nil +} + func checkStatus(in *inference.DeployInstance) bool { switch in.ClusterType { case storeLink.TYPE_OCTOPUS: