fix task updatedtime bugs

Former-commit-id: 226201747e9a5aabd47cb93a57d66448df3248b5
This commit is contained in:
tzwang 2024-05-19 22:52:41 +08:00
parent ed1929d356
commit 6c724b4ab9
3 changed files with 117 additions and 101 deletions

View File

@ -76,7 +76,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
select { select {
case _ = <-ch: case _ = <-ch:
return resp, nil return resp, nil
case <-time.After(2 * time.Second): case <-time.After(1 * time.Second):
return resp, nil return resp, nil
} }

View File

@ -101,7 +101,7 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<-
} }
for _, task := range taskList { for _, task := range taskList {
t := task t := task
if t.Status == constants.Completed || task.Status == constants.Failed { if t.Status == constants.Completed || task.Status == constants.Failed || task.Status == constants.Stopped {
continue continue
} }
wg.Add(1) wg.Add(1)

View File

@ -86,23 +86,37 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
} }
func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
for _, task := range tasks { for i := len(tasks) - 1; i >= 0; i-- {
if task.AdapterTypeDict != 1 { if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed {
continue tasks = append(tasks[:i], tasks[i+1:]...)
}
}
if len(tasks) == 0 {
ch <- struct{}{}
return
}
task := tasks[0]
for i, _ := range tasks {
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime)
if latest.Before(earliest) {
task = tasks[i]
} }
if task.Status == constants.Succeeded || task.Status == constants.Failed {
continue
} }
var aiTask []*models.TaskAi var aiTask []*models.TaskAi
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask) tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask)
if tx.Error != nil { if tx.Error != nil {
logx.Errorf(tx.Error.Error()) logx.Errorf(tx.Error.Error())
ch <- struct{}{}
return return
} }
if len(aiTask) == 0 { if len(aiTask) == 0 {
continue ch <- struct{}{}
return
} }
if len(aiTask) == 1 { if len(aiTask) == 1 {
@ -114,7 +128,8 @@ func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<-
logx.Errorf(tx.Error.Error()) logx.Errorf(tx.Error.Error())
return return
} }
continue ch <- struct{}{}
return
} }
for i := len(aiTask) - 1; i >= 0; i-- { for i := len(aiTask) - 1; i >= 0; i-- {
@ -175,15 +190,15 @@ func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<-
tx = l.svcCtx.DbEngin.Table("task").Model(task).Updates(task) tx = l.svcCtx.DbEngin.Table("task").Model(task).Updates(task)
if tx.Error != nil { if tx.Error != nil {
logx.Errorf(tx.Error.Error()) logx.Errorf(tx.Error.Error())
ch <- struct{}{}
return return
} }
}
ch <- struct{}{} ch <- struct{}{}
} }
func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
for i := len(tasks) - 1; i >= 0; i-- { for i := len(tasks) - 1; i >= 0; i-- {
if tasks[i].AdapterTypeDict == 0 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed { if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed {
tasks = append(tasks[:i], tasks[i+1:]...) tasks = append(tasks[:i], tasks[i+1:]...)
} }
} }
@ -197,7 +212,7 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan
for i, _ := range tasks { for i, _ := range tasks {
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime) latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime)
if earliest.Before(latest) { if latest.Before(earliest) {
task = tasks[i] task = tasks[i]
} }
} }
@ -206,6 +221,7 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList) tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList)
if tx.Error != nil { if tx.Error != nil {
logx.Errorf(tx.Error.Error()) logx.Errorf(tx.Error.Error())
ch <- struct{}{}
return return
} }
@ -217,7 +233,7 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan
var wg sync.WaitGroup var wg sync.WaitGroup
for _, aitask := range aiTaskList { for _, aitask := range aiTaskList {
t := aitask t := aitask
if t.Status == constants.Completed { if t.Status == constants.Completed || t.Status == constants.Failed {
continue continue
} }
wg.Add(1) wg.Add(1)