From d1e947f826341403efd4508bfb5c186f0d116e03 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Mon, 24 Apr 2023 16:52:51 +0800 Subject: [PATCH] [migration] check finished pipeline logs by updated time --- plugin/migration/pipeline_task/pipeline_task.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index 07bfd629..3ed46aba 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -317,9 +317,19 @@ func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int6 } func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, errs []string) { + newHits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli()) + if err != nil { + log.Errorf("failed to get latest pipeline logs for task [%s], err: %v", taskItem.ID, err) + return + } + if len(newHits) == 0 { + log.Debugf("task [%s] not finished yet since last start", taskItem.ID) + return + } + hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0) if err != nil { - log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) + log.Errorf("failed to get all pipeline logs for task [%s], err: %v", taskItem.ID, err) return } @@ -356,6 +366,7 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]elastic.IndexDocument, error) { query := util.MapStr{ + "size": 999, "sort": []util.MapStr{ { "timestamp": util.MapStr{ @@ -368,9 +379,6 @@ func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timest }, }, }, - "collapse": util.MapStr{ - "field": "metadata.labels.task_id", - }, "query": util.MapStr{ "bool": util.MapStr{ "must": []util.MapStr{