From 17a5ad6daa859864be1736fe84e23194e12cf4dc Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Mon, 24 Apr 2023 16:57:16 +0800 Subject: [PATCH] [migration] scroll pipeline only check last run pipeline log --- plugin/migration/pipeline_task/pipeline_task.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index 3ed46aba..3925473b 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -289,12 +289,17 @@ func (p *processor) getParentTask(taskItem *task.Task, taskType string) (*task.T } func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int64, totalHits int64, scrolled bool, err error) { - hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0) + hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli()) if err != nil { log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) err = nil return } + if len(hits) == 0 { + log.Debugf("scroll task [%s] not finished yet since last start", taskItem.ID) + return + } + // NOTE: we only check the last run of es_scroll for _, hit := range hits { scrolled = true m := util.MapStr(hit.Source) @@ -323,7 +328,7 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i return } if len(newHits) == 0 { - log.Debugf("task [%s] not finished yet since last start", taskItem.ID) + log.Debugf("bulk task [%s] not finished yet since last start", taskItem.ID) return }