diff --git a/api/desc/core/pcm-core.api b/api/desc/core/pcm-core.api index 4c42c45b..9d9a5a20 100644 --- a/api/desc/core/pcm-core.api +++ b/api/desc/core/pcm-core.api @@ -409,7 +409,7 @@ type ( NsID string `json:"nsId,omitempty" db:"ns_id"` TenantId string `json:"tenantId,omitempty" db:"tenant_id"` CreatedTime string `json:"createdTime,omitempty" db:"created_time" gorm:"autoCreateTime"` - UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time" gorm:"autoUpdateTime"` + UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time"` AdapterTypeDict int `json:"adapterTypeDict" db:"create_time" gorm:"adapter_type_dict"` //任务类型(对应字典表的值 } ) diff --git a/api/internal/logic/ai/getcenteroverviewlogic.go b/api/internal/logic/ai/getcenteroverviewlogic.go index d2669709..a4ecf111 100644 --- a/api/internal/logic/ai/getcenteroverviewlogic.go +++ b/api/internal/logic/ai/getcenteroverviewlogic.go @@ -76,7 +76,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview select { case _ = <-ch: return resp, nil - case <-time.After(2 * time.Second): + case <-time.After(1 * time.Second): return resp, nil } diff --git a/api/internal/logic/ai/getcentertasklistlogic.go b/api/internal/logic/ai/getcentertasklistlogic.go index 28393c71..bbae384b 100644 --- a/api/internal/logic/ai/getcentertasklistlogic.go +++ b/api/internal/logic/ai/getcentertasklistlogic.go @@ -101,7 +101,7 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- } for _, task := range taskList { t := task - if t.Status == constants.Completed || task.Status == constants.Failed { + if t.Status == constants.Completed || task.Status == constants.Failed || task.Status == constants.Stopped { continue } wg.Add(1) diff --git a/api/internal/logic/core/pagelisttasklogic.go b/api/internal/logic/core/pagelisttasklogic.go index abcbde00..ba0c4382 100644 --- a/api/internal/logic/core/pagelisttasklogic.go +++ b/api/internal/logic/core/pagelisttasklogic.go @@ -80,94 +80,14 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa for _, ch := range chs { select { case <-ch: - case <-time.After(2 * time.Second): - return } } return } func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { - for _, task := range tasks { - if task.AdapterTypeDict != 1 { - continue - } - if task.Status == constants.Succeeded || task.Status == constants.Failed { - continue - } - - var aiTask []*models.TaskAi - tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } - - if len(aiTask) == 0 { - continue - } - - start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local) - end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local) - - var status string - var count int - for _, a := range aiTask { - s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local) - e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local) - - if s.Before(start) { - start = s - } - - if e.After(end) { - end = e - } - - if a.Status == constants.Failed { - status = a.Status - break - } - - if a.Status == constants.Pending { - status = a.Status - continue - } - - if a.Status == constants.Running { - status = a.Status - continue - } - - if a.Status == constants.Completed { - count++ - continue - } - } - - if count == len(aiTask) { - status = constants.Succeeded - } - - if status != "" { - task.Status = status - task.StartTime = start.Format(constants.Layout) - task.EndTime = end.Format(constants.Layout) - } - - task.UpdatedTime = time.Now().Format(constants.Layout) - tx = l.svcCtx.DbEngin.Table("task").Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } - } - ch <- struct{}{} -} - -func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { for i := len(tasks) - 1; i >= 0; i-- { - if tasks[i].AdapterTypeDict == 0 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed { + if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed { tasks = append(tasks[:i], tasks[i+1:]...) } } @@ -181,7 +101,118 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan for i, _ := range tasks { earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime) - if earliest.Before(latest) { + if latest.Before(earliest) { + task = tasks[i] + } + } + + var aiTask []*models.TaskAi + tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + ch <- struct{}{} + return + } + + if len(aiTask) == 0 { + ch <- struct{}{} + return + } + + if len(aiTask) == 1 { + task.Status = aiTask[0].Status + task.StartTime = aiTask[0].StartTime + task.EndTime = aiTask[0].EndTime + tx = l.svcCtx.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return + } + ch <- struct{}{} + return + } + + for i := len(aiTask) - 1; i >= 0; i-- { + if aiTask[i].StartTime == "" { + aiTask = append(aiTask[:i], aiTask[i+1:]...) + } + } + + start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local) + end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local) + + var status string + var count int + for _, a := range aiTask { + s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local) + e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local) + + if s.Before(start) { + start = s + } + + if e.After(end) { + end = e + } + + if a.Status == constants.Failed { + status = a.Status + break + } + + if a.Status == constants.Pending { + status = a.Status + continue + } + + if a.Status == constants.Running { + status = a.Status + continue + } + + if a.Status == constants.Completed { + count++ + continue + } + } + + if count == len(aiTask) { + status = constants.Succeeded + } + + if status != "" { + task.Status = status + task.StartTime = start.Format(constants.Layout) + task.EndTime = end.Format(constants.Layout) + } + + task.UpdatedTime = time.Now().Format(constants.Layout) + tx = l.svcCtx.DbEngin.Table("task").Model(task).Updates(task) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + ch <- struct{}{} + return + } + ch <- struct{}{} +} + +func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { + for i := len(tasks) - 1; i >= 0; i-- { + if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed { + tasks = append(tasks[:i], tasks[i+1:]...) + } + } + + if len(tasks) == 0 { + ch <- struct{}{} + return + } + + task := tasks[0] + for i, _ := range tasks { + earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) + latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime) + if latest.Before(earliest) { task = tasks[i] } } @@ -190,6 +221,7 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList) if tx.Error != nil { logx.Errorf(tx.Error.Error()) + ch <- struct{}{} return } @@ -201,7 +233,7 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan var wg sync.WaitGroup for _, aitask := range aiTaskList { t := aitask - if t.Status == constants.Completed { + if t.Status == constants.Completed || t.Status == constants.Failed { continue } wg.Add(1) diff --git a/api/internal/types/types.go b/api/internal/types/types.go index a90e9057..54a3c5d1 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -345,7 +345,7 @@ type TaskModel struct { NsID string `json:"nsId,omitempty" db:"ns_id"` TenantId string `json:"tenantId,omitempty" db:"tenant_id"` CreatedTime string `json:"createdTime,omitempty" db:"created_time" gorm:"autoCreateTime"` - UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time" gorm:"autoUpdateTime"` + UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time"` AdapterTypeDict int `json:"adapterTypeDict" db:"create_time" gorm:"adapter_type_dict"` //任务类型(对应字典表的值 }