From e8a0b43fba8bec86f2a3bf0b7a103736b668abc1 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Tue, 16 May 2023 14:34:47 +0800 Subject: [PATCH] [migration][pipeline] filter duplicated logs --- .../migration/pipeline_task/pipeline_task.go | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index 93b0a57c..f3b0b8e0 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -3,6 +3,7 @@ package pipeline_task import ( "errors" "fmt" + "strconv" "strings" "time" @@ -312,9 +313,8 @@ func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int6 return } // NOTE: we only check the last run of es_scroll - for _, hit := range hits { + for _, m := range hits { scrolled = true - m := util.MapStr(hit.Source) errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") if errStr != "" { @@ -350,9 +350,8 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i return } - for _, hit := range hits { + for _, m := range hits { bulked = true - m := util.MapStr(hit.Source) errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") if errStr != "" { @@ -381,7 +380,7 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i return } -func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]elastic.IndexDocument, error) { +func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]util.MapStr, error) { query := util.MapStr{ "size": 999, "sort": []util.MapStr{ @@ -433,7 +432,23 @@ func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timest log.Errorf("search task log from es failed, err: %v", err) return nil, err } - return res.Hits.Hits, nil + var ret []util.MapStr + dups := map[string]struct{}{} + for _, hit := range res.Hits.Hits { + m := util.MapStr(hit.Source) + ctxID := migration_util.GetMapStringValue(m, "metadata.labels.context_id") + step := migration_util.GetMapIntValue(m, "payload.pipeline.logging.steps") + // NOTE: gateway <= 1.13.0 will not generate context_id, skip duplicate checks + if ctxID != "" { + dupKey := ctxID + "-" + strconv.Itoa(int(step)) + if _, ok := dups[dupKey]; ok { + continue + } + dups[dupKey] = struct{}{} + } + ret = append(ret, m) + } + return ret, nil } func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.TaskResult, message string) {