Merge pull request 'fix task updatetime bugs' (#186) from tzwang/pcm-coordinator:master into master
Former-commit-id: 5dda4aebfeba58b5f2174af4b78554c9e3906452
This commit is contained in:
commit
4852740e70
|
@ -409,7 +409,7 @@ type (
|
||||||
NsID string `json:"nsId,omitempty" db:"ns_id"`
|
NsID string `json:"nsId,omitempty" db:"ns_id"`
|
||||||
TenantId string `json:"tenantId,omitempty" db:"tenant_id"`
|
TenantId string `json:"tenantId,omitempty" db:"tenant_id"`
|
||||||
CreatedTime string `json:"createdTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
|
CreatedTime string `json:"createdTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
|
||||||
UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time" gorm:"autoUpdateTime"`
|
UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time"`
|
||||||
AdapterTypeDict int `json:"adapterTypeDict" db:"create_time" gorm:"adapter_type_dict"` //任务类型(对应字典表的值
|
AdapterTypeDict int `json:"adapterTypeDict" db:"create_time" gorm:"adapter_type_dict"` //任务类型(对应字典表的值
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
|
@ -76,7 +76,7 @@ func (l *GetCenterOverviewLogic) GetCenterOverview() (resp *types.CenterOverview
|
||||||
select {
|
select {
|
||||||
case _ = <-ch:
|
case _ = <-ch:
|
||||||
return resp, nil
|
return resp, nil
|
||||||
case <-time.After(2 * time.Second):
|
case <-time.After(1 * time.Second):
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -101,7 +101,7 @@ func (l *GetCenterTaskListLogic) updateAiTaskStatus(mu *sync.RWMutex, ch chan<-
|
||||||
}
|
}
|
||||||
for _, task := range taskList {
|
for _, task := range taskList {
|
||||||
t := task
|
t := task
|
||||||
if t.Status == constants.Completed || task.Status == constants.Failed {
|
if t.Status == constants.Completed || task.Status == constants.Failed || task.Status == constants.Stopped {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
|
@ -80,94 +80,14 @@ func (l *PageListTaskLogic) PageListTask(req *types.PageTaskReq) (resp *types.Pa
|
||||||
for _, ch := range chs {
|
for _, ch := range chs {
|
||||||
select {
|
select {
|
||||||
case <-ch:
|
case <-ch:
|
||||||
case <-time.After(2 * time.Second):
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
|
func (l *PageListTaskLogic) updateTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
|
||||||
for _, task := range tasks {
|
|
||||||
if task.AdapterTypeDict != 1 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if task.Status == constants.Succeeded || task.Status == constants.Failed {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var aiTask []*models.TaskAi
|
|
||||||
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask)
|
|
||||||
if tx.Error != nil {
|
|
||||||
logx.Errorf(tx.Error.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(aiTask) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local)
|
|
||||||
end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local)
|
|
||||||
|
|
||||||
var status string
|
|
||||||
var count int
|
|
||||||
for _, a := range aiTask {
|
|
||||||
s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local)
|
|
||||||
e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local)
|
|
||||||
|
|
||||||
if s.Before(start) {
|
|
||||||
start = s
|
|
||||||
}
|
|
||||||
|
|
||||||
if e.After(end) {
|
|
||||||
end = e
|
|
||||||
}
|
|
||||||
|
|
||||||
if a.Status == constants.Failed {
|
|
||||||
status = a.Status
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if a.Status == constants.Pending {
|
|
||||||
status = a.Status
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if a.Status == constants.Running {
|
|
||||||
status = a.Status
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if a.Status == constants.Completed {
|
|
||||||
count++
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if count == len(aiTask) {
|
|
||||||
status = constants.Succeeded
|
|
||||||
}
|
|
||||||
|
|
||||||
if status != "" {
|
|
||||||
task.Status = status
|
|
||||||
task.StartTime = start.Format(constants.Layout)
|
|
||||||
task.EndTime = end.Format(constants.Layout)
|
|
||||||
}
|
|
||||||
|
|
||||||
task.UpdatedTime = time.Now().Format(constants.Layout)
|
|
||||||
tx = l.svcCtx.DbEngin.Table("task").Updates(task)
|
|
||||||
if tx.Error != nil {
|
|
||||||
logx.Errorf(tx.Error.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ch <- struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
|
|
||||||
for i := len(tasks) - 1; i >= 0; i-- {
|
for i := len(tasks) - 1; i >= 0; i-- {
|
||||||
if tasks[i].AdapterTypeDict == 0 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed {
|
if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed {
|
||||||
tasks = append(tasks[:i], tasks[i+1:]...)
|
tasks = append(tasks[:i], tasks[i+1:]...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -181,7 +101,118 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan
|
||||||
for i, _ := range tasks {
|
for i, _ := range tasks {
|
||||||
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
|
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
|
||||||
latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime)
|
latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime)
|
||||||
if earliest.Before(latest) {
|
if latest.Before(earliest) {
|
||||||
|
task = tasks[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var aiTask []*models.TaskAi
|
||||||
|
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTask)
|
||||||
|
if tx.Error != nil {
|
||||||
|
logx.Errorf(tx.Error.Error())
|
||||||
|
ch <- struct{}{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(aiTask) == 0 {
|
||||||
|
ch <- struct{}{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(aiTask) == 1 {
|
||||||
|
task.Status = aiTask[0].Status
|
||||||
|
task.StartTime = aiTask[0].StartTime
|
||||||
|
task.EndTime = aiTask[0].EndTime
|
||||||
|
tx = l.svcCtx.DbEngin.Model(task).Table("task").Where("deleted_at is null").Updates(task)
|
||||||
|
if tx.Error != nil {
|
||||||
|
logx.Errorf(tx.Error.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ch <- struct{}{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := len(aiTask) - 1; i >= 0; i-- {
|
||||||
|
if aiTask[i].StartTime == "" {
|
||||||
|
aiTask = append(aiTask[:i], aiTask[i+1:]...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
start, _ := time.ParseInLocation(constants.Layout, aiTask[0].StartTime, time.Local)
|
||||||
|
end, _ := time.ParseInLocation(constants.Layout, aiTask[0].EndTime, time.Local)
|
||||||
|
|
||||||
|
var status string
|
||||||
|
var count int
|
||||||
|
for _, a := range aiTask {
|
||||||
|
s, _ := time.ParseInLocation(constants.Layout, a.StartTime, time.Local)
|
||||||
|
e, _ := time.ParseInLocation(constants.Layout, a.EndTime, time.Local)
|
||||||
|
|
||||||
|
if s.Before(start) {
|
||||||
|
start = s
|
||||||
|
}
|
||||||
|
|
||||||
|
if e.After(end) {
|
||||||
|
end = e
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.Status == constants.Failed {
|
||||||
|
status = a.Status
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.Status == constants.Pending {
|
||||||
|
status = a.Status
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.Status == constants.Running {
|
||||||
|
status = a.Status
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.Status == constants.Completed {
|
||||||
|
count++
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if count == len(aiTask) {
|
||||||
|
status = constants.Succeeded
|
||||||
|
}
|
||||||
|
|
||||||
|
if status != "" {
|
||||||
|
task.Status = status
|
||||||
|
task.StartTime = start.Format(constants.Layout)
|
||||||
|
task.EndTime = end.Format(constants.Layout)
|
||||||
|
}
|
||||||
|
|
||||||
|
task.UpdatedTime = time.Now().Format(constants.Layout)
|
||||||
|
tx = l.svcCtx.DbEngin.Table("task").Model(task).Updates(task)
|
||||||
|
if tx.Error != nil {
|
||||||
|
logx.Errorf(tx.Error.Error())
|
||||||
|
ch <- struct{}{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ch <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan<- struct{}) {
|
||||||
|
for i := len(tasks) - 1; i >= 0; i-- {
|
||||||
|
if tasks[i].AdapterTypeDict != 1 || tasks[i].Status == constants.Succeeded || tasks[i].Status == constants.Failed {
|
||||||
|
tasks = append(tasks[:i], tasks[i+1:]...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(tasks) == 0 {
|
||||||
|
ch <- struct{}{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
task := tasks[0]
|
||||||
|
for i, _ := range tasks {
|
||||||
|
earliest, _ := time.Parse(constants.Layout, task.UpdatedTime)
|
||||||
|
latest, _ := time.Parse(constants.Layout, tasks[i].UpdatedTime)
|
||||||
|
if latest.Before(earliest) {
|
||||||
task = tasks[i]
|
task = tasks[i]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -190,6 +221,7 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan
|
||||||
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList)
|
tx := l.svcCtx.DbEngin.Raw("select * from task_ai where `task_id` = ? ", task.Id).Scan(&aiTaskList)
|
||||||
if tx.Error != nil {
|
if tx.Error != nil {
|
||||||
logx.Errorf(tx.Error.Error())
|
logx.Errorf(tx.Error.Error())
|
||||||
|
ch <- struct{}{}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -201,7 +233,7 @@ func (l *PageListTaskLogic) updateAiTaskStatus(tasks []*types.TaskModel, ch chan
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for _, aitask := range aiTaskList {
|
for _, aitask := range aiTaskList {
|
||||||
t := aitask
|
t := aitask
|
||||||
if t.Status == constants.Completed {
|
if t.Status == constants.Completed || t.Status == constants.Failed {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
|
|
@ -345,7 +345,7 @@ type TaskModel struct {
|
||||||
NsID string `json:"nsId,omitempty" db:"ns_id"`
|
NsID string `json:"nsId,omitempty" db:"ns_id"`
|
||||||
TenantId string `json:"tenantId,omitempty" db:"tenant_id"`
|
TenantId string `json:"tenantId,omitempty" db:"tenant_id"`
|
||||||
CreatedTime string `json:"createdTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
|
CreatedTime string `json:"createdTime,omitempty" db:"created_time" gorm:"autoCreateTime"`
|
||||||
UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time" gorm:"autoUpdateTime"`
|
UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time"`
|
||||||
AdapterTypeDict int `json:"adapterTypeDict" db:"create_time" gorm:"adapter_type_dict"` //任务类型(对应字典表的值
|
AdapterTypeDict int `json:"adapterTypeDict" db:"create_time" gorm:"adapter_type_dict"` //任务类型(对应字典表的值
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue