From ce0d5a9b323e2d90ce0719be35ad5e99df530097 Mon Sep 17 00:00:00 2001 From: tzwang Date: Fri, 17 May 2024 19:43:25 +0800 Subject: [PATCH 1/3] fix db bugs Former-commit-id: 55a997bc7669f4b207da153a3fdd1cb3b1b2935d --- api/internal/logic/core/pagelisttasklogic.go | 22 ++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/api/internal/logic/core/pagelisttasklogic.go b/api/internal/logic/core/pagelisttasklogic.go index abcbde00..f2f6d9e8 100644 --- a/api/internal/logic/core/pagelisttasklogic.go +++ b/api/internal/logic/core/pagelisttasklogic.go @@ -80,8 +80,6 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa for _, ch := range chs { select { case <-ch: - case <-time.After(2 * time.Second): - return } } return @@ -107,6 +105,26 @@ func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- continue } + if len(aiTask) == 1 { + task.Status = aiTask[0].Status + task.StartTime = aiTask[0].StartTime + task.EndTime = aiTask[0].EndTime + tx = l.svcCtx.DbEngin.Model(&types.TaskModel{}).Table("task") + tx = tx.Where("deleted_at is null") + tx = tx.Updates(task) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return + } + continue + } + + for i := len(aiTask) - 1; i >= 0; i-- { + if aiTask[i].StartTime == "" { + aiTask = append(aiTask[:i], aiTask[i+1:]...) + } + } + start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local) end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local) From a977f63f6374a3ec832a8a35dfef22c07e1ff78f Mon Sep 17 00:00:00 2001 From: devad Date: Fri, 17 May 2024 21:06:32 +0800 Subject: [PATCH 2/3] fix Former-commit-id: 443ebf0046a411542e28d142c4784a4be8a827ae --- api/desc/core/pcm-core.api | 2 +- api/internal/logic/core/pagelisttasklogic.go | 6 ++---- api/internal/types/types.go | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/api/desc/core/pcm-core.api b/api/desc/core/pcm-core.api index 4c42c45b..9d9a5a20 100644 --- a/api/desc/core/pcm-core.api +++ b/api/desc/core/pcm-core.api @@ -409,7 +409,7 @@ type ( NsID string `json:"nsId,omitempty" db:"ns_id"` TenantId string `json:"tenantId,omitempty" db:"tenant_id"` CreatedTime string `json:"createdTime,omitempty" db:"created_time" gorm:"autoCreateTime"` - UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time" gorm:"autoUpdateTime"` + UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time"` AdapterTypeDict int `json:"adapterTypeDict" db:"create_time" gorm:"adapter_type_dict"` //任务类型(对应字典表的值 } ) diff --git a/api/internal/logic/core/pagelisttasklogic.go b/api/internal/logic/core/pagelisttasklogic.go index f2f6d9e8..09b731df 100644 --- a/api/internal/logic/core/pagelisttasklogic.go +++ b/api/internal/logic/core/pagelisttasklogic.go @@ -109,9 +109,7 @@ func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- task.Status = aiTask[0].Status task.StartTime = aiTask[0].StartTime task.EndTime = aiTask[0].EndTime - tx = l.svcCtx.DbEngin.Model(&types.TaskModel{}).Table("task") - tx = tx.Where("deleted_at is null") - tx = tx.Updates(task) + tx = l.svcCtx.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) if tx.Error != nil { logx.Errorf(tx.Error.Error()) return @@ -174,7 +172,7 @@ func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- } task.UpdatedTime = time.Now().Format(constants.Layout) - tx = l.svcCtx.DbEngin.Table("task").Updates(task) + tx = l.svcCtx.DbEngin.Table("task").Model(task).Updates(task) if tx.Error != nil { logx.Errorf(tx.Error.Error()) return diff --git a/api/internal/types/types.go b/api/internal/types/types.go index a90e9057..54a3c5d1 100644 --- a/api/internal/types/types.go +++ b/api/internal/types/types.go @@ -345,7 +345,7 @@ type TaskModel struct { NsID string `json:"nsId,omitempty" db:"ns_id"` TenantId string `json:"tenantId,omitempty" db:"tenant_id"` CreatedTime string `json:"createdTime,omitempty" db:"created_time" gorm:"autoCreateTime"` - UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time" gorm:"autoUpdateTime"` + UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time"` AdapterTypeDict int `json:"adapterTypeDict" db:"create_time" gorm:"adapter_type_dict"` //任务类型(对应字典表的值 } From 6c724b4ab96e6d1e83560e5214b64a40fe7fbf81 Mon Sep 17 00:00:00 2001 From: tzwang Date: Sun, 19 May 2024 22:52:41 +0800 Subject: [PATCH 3/3] fix task updatedtime bugs Former-commit-id: 226201747e9a5aabd47cb93a57d66448df3248b5 --- .../logic/ai/getcenteroverviewlogic.go | 2 +- .../logic/ai/getcentertasklistlogic.go | 2 +- api/internal/logic/core/pagelisttasklogic.go | 214 ++++++++++-------- 3 files changed, 117 insertions(+), 101 deletions(-) diff --git a/api/internal/logic/ai/getcenteroverviewlogic.go b/api/internal/logic/ai/getcenteroverviewlogic.go index d2669709..a4ecf111 100644 --- a/api/internal/logic/ai/getcenteroverviewlogic.go +++ b/api/internal/logic/ai/getcenteroverviewlogic.go @@ -76,7 +76,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview select { case _ = <-ch: return resp, nil - case <-time.After(2 * time.Second): + case <-time.After(1 * time.Second): return resp, nil } diff --git a/api/internal/logic/ai/getcentertasklistlogic.go b/api/internal/logic/ai/getcentertasklistlogic.go index 28393c71..bbae384b 100644 --- a/api/internal/logic/ai/getcentertasklistlogic.go +++ b/api/internal/logic/ai/getcentertasklistlogic.go @@ -101,7 +101,7 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<- } for _, task := range taskList { t := task - if t.Status == constants.Completed || task.Status == constants.Failed { + if t.Status == constants.Completed || task.Status == constants.Failed || task.Status == constants.Stopped { continue } wg.Add(1) diff --git a/api/internal/logic/core/pagelisttasklogic.go b/api/internal/logic/core/pagelisttasklogic.go index 09b731df..ba0c4382 100644 --- a/api/internal/logic/core/pagelisttasklogic.go +++ b/api/internal/logic/core/pagelisttasklogic.go @@ -86,104 +86,8 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa } func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { - for _, task := range tasks { - if task.AdapterTypeDict != 1 { - continue - } - if task.Status == constants.Succeeded || task.Status == constants.Failed { - continue - } - - var aiTask []*models.TaskAi - tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } - - if len(aiTask) == 0 { - continue - } - - if len(aiTask) == 1 { - task.Status = aiTask[0].Status - task.StartTime = aiTask[0].StartTime - task.EndTime = aiTask[0].EndTime - tx = l.svcCtx.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } - continue - } - - for i := len(aiTask) - 1; i >= 0; i-- { - if aiTask[i].StartTime == "" { - aiTask = append(aiTask[:i], aiTask[i+1:]...) - } - } - - start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local) - end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local) - - var status string - var count int - for _, a := range aiTask { - s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local) - e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local) - - if s.Before(start) { - start = s - } - - if e.After(end) { - end = e - } - - if a.Status == constants.Failed { - status = a.Status - break - } - - if a.Status == constants.Pending { - status = a.Status - continue - } - - if a.Status == constants.Running { - status = a.Status - continue - } - - if a.Status == constants.Completed { - count++ - continue - } - } - - if count == len(aiTask) { - status = constants.Succeeded - } - - if status != "" { - task.Status = status - task.StartTime = start.Format(constants.Layout) - task.EndTime = end.Format(constants.Layout) - } - - task.UpdatedTime = time.Now().Format(constants.Layout) - tx = l.svcCtx.DbEngin.Table("task").Model(task).Updates(task) - if tx.Error != nil { - logx.Errorf(tx.Error.Error()) - return - } - } - ch <- struct{}{} -} - -func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { for i := len(tasks) - 1; i >= 0; i-- { - if tasks[i].AdapterTypeDict == 0 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed { + if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed { tasks = append(tasks[:i], tasks[i+1:]...) } } @@ -197,7 +101,118 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan for i, _ := range tasks { earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime) - if earliest.Before(latest) { + if latest.Before(earliest) { + task = tasks[i] + } + } + + var aiTask []*models.TaskAi + tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + ch <- struct{}{} + return + } + + if len(aiTask) == 0 { + ch <- struct{}{} + return + } + + if len(aiTask) == 1 { + task.Status = aiTask[0].Status + task.StartTime = aiTask[0].StartTime + task.EndTime = aiTask[0].EndTime + tx = l.svcCtx.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + return + } + ch <- struct{}{} + return + } + + for i := len(aiTask) - 1; i >= 0; i-- { + if aiTask[i].StartTime == "" { + aiTask = append(aiTask[:i], aiTask[i+1:]...) + } + } + + start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local) + end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local) + + var status string + var count int + for _, a := range aiTask { + s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local) + e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local) + + if s.Before(start) { + start = s + } + + if e.After(end) { + end = e + } + + if a.Status == constants.Failed { + status = a.Status + break + } + + if a.Status == constants.Pending { + status = a.Status + continue + } + + if a.Status == constants.Running { + status = a.Status + continue + } + + if a.Status == constants.Completed { + count++ + continue + } + } + + if count == len(aiTask) { + status = constants.Succeeded + } + + if status != "" { + task.Status = status + task.StartTime = start.Format(constants.Layout) + task.EndTime = end.Format(constants.Layout) + } + + task.UpdatedTime = time.Now().Format(constants.Layout) + tx = l.svcCtx.DbEngin.Table("task").Model(task).Updates(task) + if tx.Error != nil { + logx.Errorf(tx.Error.Error()) + ch <- struct{}{} + return + } + ch <- struct{}{} +} + +func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) { + for i := len(tasks) - 1; i >= 0; i-- { + if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed { + tasks = append(tasks[:i], tasks[i+1:]...) + } + } + + if len(tasks) == 0 { + ch <- struct{}{} + return + } + + task := tasks[0] + for i, _ := range tasks { + earliest, _ := time.Parse(constants.Layout, task.UpdatedTime) + latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime) + if latest.Before(earliest) { task = tasks[i] } } @@ -206,6 +221,7 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList) if tx.Error != nil { logx.Errorf(tx.Error.Error()) + ch <- struct{}{} return } @@ -217,7 +233,7 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan var wg sync.WaitGroup for _, aitask := range aiTaskList { t := aitask - if t.Status == constants.Completed { + if t.Status == constants.Completed || t.Status == constants.Failed { continue } wg.Add(1)