[migration] check STOPPED logs after pending_stop

This commit is contained in:
Kassian Sun 2023-04-24 16:31:44 +08:00
parent cbfe890982
commit 99cf8b769f
1 changed files with 12 additions and 4 deletions

View File

@ -182,7 +182,8 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error {
return fmt.Errorf("task [%s] has unknown pipeline_id [%s]", taskItem.ID, taskItem.Metadata.Labels["pipeline_id"])
}
hits, err := p.getPipelineLogs(taskItem, []string{"STOPPED"})
// we only check STOPPED log after the last task status update
hits, err := p.getPipelineLogs(taskItem, []string{"STOPPED"}, taskItem.Updated.UnixMilli())
if err != nil {
log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err)
return nil
@ -288,7 +289,7 @@ func (p *processor) getParentTask(taskItem *task.Task, taskType string) (*task.T
}
func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int64, totalHits int64, scrolled bool, err error) {
hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"})
hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0)
if err != nil {
log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err)
err = nil
@ -316,7 +317,7 @@ func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int6
}
func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, errs []string) {
hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"})
hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0)
if err != nil {
log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err)
return
@ -353,7 +354,7 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i
return
}
func (p *processor) getPipelineLogs(taskItem *task.Task, status []string) ([]elastic.IndexDocument, error) {
func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]elastic.IndexDocument, error) {
query := util.MapStr{
"sort": []util.MapStr{
{
@ -390,6 +391,13 @@ func (p *processor) getPipelineLogs(taskItem *task.Task, status []string) ([]ela
},
},
},
{
"range": util.MapStr{
"timestamp": util.MapStr{
"gte": timestampGte,
},
},
},
},
},
},