updated deployinstance logic

This commit is contained in:
tzwang 2024-09-14 15:58:57 +08:00
parent c37684548e
commit 0b8261f6f8
5 changed files with 67 additions and 8 deletions

View File

@ -45,4 +45,8 @@ func AddCronGroup(svc *svc.ServiceContext) {
} }
stat.UpdateClusterResources(svc, adapterList) stat.UpdateClusterResources(svc, adapterList)
}) })
svc.Cron.AddFunc("*/30 * * * * ?", func() {
status.UpdateAutoStoppedInstance(svc)
})
} }

View File

@ -82,7 +82,7 @@ func (l *DeployInstanceListLogic) DeployInstanceList(req *types.DeployInstanceLi
} }
} }
go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true) go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true, nil)
go status.UpdateDeployTaskStatus(l.svcCtx) go status.UpdateDeployTaskStatus(l.svcCtx)
} }

View File

@ -34,15 +34,18 @@ func (l *StartDeployInstanceListLogic) StartDeployInstanceList(req *types.StartD
} }
in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId) in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId)
if err != nil {
return nil, err
}
if status.CheckStopStatus(in) { if status.CheckStopStatus(in) {
success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StartInferDeployInstance(l.ctx, req.InstanceId) success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StartInferDeployInstance(l.ctx, in.InstanceId)
if !success { if !success {
return nil, errors.New("start instance failed") return nil, errors.New("start instance failed")
} }
} }
go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true) go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true, nil)
return resp, nil return resp, nil
} }

View File

@ -34,15 +34,18 @@ func (l *StopDeployInstanceLogic) StopDeployInstance(req *types.StopDeployInstan
} }
in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId) in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId)
if err != nil {
return nil, err
}
if status.CheckRunningStatus(in) { if status.CheckRunningStatus(in) {
success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StopInferDeployInstance(l.ctx, req.InstanceId) success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[req.AdapterId][req.ClusterId].StopInferDeployInstance(l.ctx, in.InstanceId)
if !success { if !success {
return nil, errors.New("stop instance failed") return nil, errors.New("stop instance failed")
} }
} }
go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true) go status.UpdateDeployInstanceStatus(l.svcCtx, ins, true, nil)
return resp, nil return resp, nil
} }

View File

@ -27,8 +27,10 @@ func UpdateDeployInstanceStatusBatch(svc *svc.ServiceContext, insList []*models.
return return
} }
buffer := make(chan bool, 3)
for _, instance := range list { for _, instance := range list {
go UpdateDeployInstanceStatus(svc, instance, false) buffer <- true
go UpdateDeployInstanceStatus(svc, instance, false, buffer)
} }
} }
@ -51,23 +53,37 @@ func UpdateDeployTaskStatus(svc *svc.ServiceContext) {
return return
} }
buffer := make(chan bool, 2)
for _, instance := range inslist { for _, instance := range inslist {
go UpdateDeployInstanceStatus(svc, instance, false) buffer <- true
go UpdateDeployInstanceStatus(svc, instance, false, buffer)
} }
} }
func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInferDeployInstance, updatetime bool) { func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInferDeployInstance, updatetime bool, ch chan bool) {
amap, found := svc.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(instance.AdapterId, 10)] amap, found := svc.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(instance.AdapterId, 10)]
if !found { if !found {
if ch != nil {
<-ch
return
}
return return
} }
cmap, found := amap[strconv.FormatInt(instance.ClusterId, 10)] cmap, found := amap[strconv.FormatInt(instance.ClusterId, 10)]
if !found { if !found {
if ch != nil {
<-ch
return
}
return return
} }
h := http.Request{} h := http.Request{}
ins, err := cmap.GetInferDeployInstance(h.Context(), instance.InstanceId) ins, err := cmap.GetInferDeployInstance(h.Context(), instance.InstanceId)
if err != nil { if err != nil {
if ch != nil {
<-ch
return
}
return return
} }
switch instance.ClusterType { switch instance.ClusterType {
@ -75,11 +91,19 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe
switch ins.Status { switch ins.Status {
case "running": case "running":
if instance.Status == constants.Running { if instance.Status == constants.Running {
if ch != nil {
<-ch
return
}
return return
} }
instance.Status = constants.Running instance.Status = constants.Running
case "stopped": case "stopped":
if instance.Status == constants.Stopped { if instance.Status == constants.Stopped {
if ch != nil {
<-ch
return
}
return return
} }
instance.Status = constants.Stopped instance.Status = constants.Stopped
@ -90,11 +114,19 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe
switch ins.Status { switch ins.Status {
case "running": case "running":
if instance.Status == constants.Running { if instance.Status == constants.Running {
if ch != nil {
<-ch
return
}
return return
} }
instance.Status = constants.Running instance.Status = constants.Running
case "stopped": case "stopped":
if instance.Status == constants.Stopped { if instance.Status == constants.Stopped {
if ch != nil {
<-ch
return
}
return return
} }
instance.Status = constants.Stopped instance.Status = constants.Stopped
@ -105,11 +137,19 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe
switch ins.Status { switch ins.Status {
case "Running": case "Running":
if instance.Status == constants.Running { if instance.Status == constants.Running {
if ch != nil {
<-ch
return
}
return return
} }
instance.Status = constants.Running instance.Status = constants.Running
case "Terminated": case "Terminated":
if instance.Status == constants.Stopped { if instance.Status == constants.Stopped {
if ch != nil {
<-ch
return
}
return return
} }
instance.Status = constants.Stopped instance.Status = constants.Stopped
@ -120,6 +160,15 @@ func UpdateDeployInstanceStatus(svc *svc.ServiceContext, instance *models.AiInfe
err = svc.Scheduler.AiStorages.UpdateInferDeployInstance(instance, updatetime) err = svc.Scheduler.AiStorages.UpdateInferDeployInstance(instance, updatetime)
if err != nil { if err != nil {
if ch != nil {
<-ch
return
}
return
}
if ch != nil {
<-ch
return return
} }
} }