Merge pull request '[migration][pipeline] filter duplicated logs' (#86) from fix/migration into master

This commit is contained in:
silenceqi 2023-05-16 14:43:09 +08:00
commit 4b21008c4b
1 changed files with 21 additions and 6 deletions

View File

@ -3,6 +3,7 @@ package pipeline_task
import ( import (
"errors" "errors"
"fmt" "fmt"
"strconv"
"strings" "strings"
"time" "time"
@ -312,9 +313,8 @@ func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int6
return return
} }
// NOTE: we only check the last run of es_scroll // NOTE: we only check the last run of es_scroll
for _, hit := range hits { for _, m := range hits {
scrolled = true scrolled = true
m := util.MapStr(hit.Source)
errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error")
if errStr != "" { if errStr != "" {
@ -350,9 +350,8 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i
return return
} }
for _, hit := range hits { for _, m := range hits {
bulked = true bulked = true
m := util.MapStr(hit.Source)
errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error")
if errStr != "" { if errStr != "" {
@ -381,7 +380,7 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i
return return
} }
func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]elastic.IndexDocument, error) { func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]util.MapStr, error) {
query := util.MapStr{ query := util.MapStr{
"size": 999, "size": 999,
"sort": []util.MapStr{ "sort": []util.MapStr{
@ -433,7 +432,23 @@ func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timest
log.Errorf("search task log from es failed, err: %v", err) log.Errorf("search task log from es failed, err: %v", err)
return nil, err return nil, err
} }
return res.Hits.Hits, nil var ret []util.MapStr
dups := map[string]struct{}{}
for _, hit := range res.Hits.Hits {
m := util.MapStr(hit.Source)
ctxID := migration_util.GetMapStringValue(m, "metadata.labels.context_id")
step := migration_util.GetMapIntValue(m, "payload.pipeline.logging.steps")
// NOTE: gateway <= 1.13.0 will not generate context_id, skip duplicate checks
if ctxID != "" {
dupKey := ctxID + "-" + strconv.Itoa(int(step))
if _, ok := dups[dupKey]; ok {
continue
}
dups[dupKey] = struct{}{}
}
ret = append(ret, m)
}
return ret, nil
} }
func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.TaskResult, message string) { func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.TaskResult, message string) {