From 99cf8b769fa85aac7474ee2d0071cc0e61598896 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Mon, 24 Apr 2023 16:31:44 +0800 Subject: [PATCH] [migration] check STOPPED logs after pending_stop --- plugin/migration/pipeline_task/pipeline_task.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index 186fb560..07bfd629 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -182,7 +182,8 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error { return fmt.Errorf("task [%s] has unknown pipeline_id [%s]", taskItem.ID, taskItem.Metadata.Labels["pipeline_id"]) } - hits, err := p.getPipelineLogs(taskItem, []string{"STOPPED"}) + // we only check STOPPED log after the last task status update + hits, err := p.getPipelineLogs(taskItem, []string{"STOPPED"}, taskItem.Updated.UnixMilli()) if err != nil { log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) return nil @@ -288,7 +289,7 @@ func (p *processor) getParentTask(taskItem *task.Task, taskType string) (*task.T } func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int64, totalHits int64, scrolled bool, err error) { - hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}) + hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0) if err != nil { log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) err = nil @@ -316,7 +317,7 @@ func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int6 } func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, errs []string) { - hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}) + hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0) if err != nil { log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) return @@ -353,7 +354,7 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i return } -func (p *processor) getPipelineLogs(taskItem *task.Task, status []string) ([]elastic.IndexDocument, error) { +func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]elastic.IndexDocument, error) { query := util.MapStr{ "sort": []util.MapStr{ { @@ -390,6 +391,13 @@ func (p *processor) getPipelineLogs(taskItem *task.Task, status []string) ([]ela }, }, }, + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gte": timestampGte, + }, + }, + }, }, }, },