[migration] scroll pipeline only check last run pipeline log

This commit is contained in:
Kassian Sun 2023-04-24 16:57:16 +08:00
parent d1e947f826
commit 17a5ad6daa
1 changed files with 7 additions and 2 deletions

View File

@ -289,12 +289,17 @@ func (p *processor) getParentTask(taskItem *task.Task, taskType string) (*task.T
} }
func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int64, totalHits int64, scrolled bool, err error) { func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int64, totalHits int64, scrolled bool, err error) {
hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0) hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli())
if err != nil { if err != nil {
log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err)
err = nil err = nil
return return
} }
if len(hits) == 0 {
log.Debugf("scroll task [%s] not finished yet since last start", taskItem.ID)
return
}
// NOTE: we only check the last run of es_scroll
for _, hit := range hits { for _, hit := range hits {
scrolled = true scrolled = true
m := util.MapStr(hit.Source) m := util.MapStr(hit.Source)
@ -323,7 +328,7 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i
return return
} }
if len(newHits) == 0 { if len(newHits) == 0 {
log.Debugf("task [%s] not finished yet since last start", taskItem.ID) log.Debugf("bulk task [%s] not finished yet since last start", taskItem.ID)
return return
} }