added updating aitask status to taskList

Former-commit-id: 8908fee9e07f5d334a6a7fd440d1d62246efa972
This commit is contained in:
tzwang 2024-05-10 16:17:33 +08:00
parent 004da905ae
commit 275e14a758
1 changed files with 66 additions and 1 deletions

View File

@ -17,6 +17,7 @@ package core
import (
"context"
"fmt"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"strconv"
"time"
@ -55,6 +56,11 @@ func (l *TaskListLogic) TaskList(req *types.TaskListReq) (resp *types.TaskListRe
if len(tasks) == 0 {
return nil, nil
}
// 更新智算任务状态
var ch = make(chan struct{})
go l.updateAitaskStatus(tasks, ch)
// 查询任务总数
l.svcCtx.DbEngin.Model(&models.Task{}).Count(&resp.TotalCount)
@ -106,5 +112,64 @@ func (l *TaskListLogic) TaskList(req *types.TaskListReq) (resp *types.TaskListRe
}
return
select {
case _ = <-ch:
return resp, nil
case <-time.After(1 * time.Second):
return resp, nil
}
}
func (l *TaskListLogic) updateAitaskStatus(tasks []models.Task, ch chan<- struct{}) {
for _, task := range tasks {
if task.AdapterTypeDict != 1 {
continue
}
if task.Status == constants.Succeeded {
continue
}
var aiTask []*models.TaskAi
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask)
if tx.Error != nil {
logx.Errorf(tx.Error.Error())
return
}
start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local)
end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local)
var status = constants.Succeeded
for _, a := range aiTask {
s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local)
e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local)
if s.Before(start) {
start = s
}
if e.After(end) {
end = e
}
if a.Status == constants.Failed {
status = a.Status
break
}
if a.Status == constants.Running {
status = a.Status
continue
}
}
task.Status = status
task.StartTime = &start
task.EndTime = &end
tx = l.svcCtx.DbEngin.Updates(task)
if tx.Error != nil {
return
}
}
ch <- struct{}{}
}