[migration] optimze bulk error message

This commit is contained in:
Kassian Sun 2023-04-19 10:24:42 +08:00 committed by Gitea
parent cd85d21ef0
commit 7472052d90
3 changed files with 90 additions and 13 deletions

View File

@ -328,7 +328,10 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
return nil
}
scrolledDocs := migration_util.GetMapIntValue(util.MapStr(scrollTask.Metadata.Labels), "scrolled_docs")
var (
scrollLabels = util.MapStr(scrollTask.Metadata.Labels)
scrolledDocs = migration_util.GetMapIntValue(scrollLabels, "scrolled_docs")
)
if scrolledDocs != totalDocs {
return fmt.Errorf("scroll complete but docs count unmatch: %d / %d", scrolledDocs, totalDocs)
}
@ -346,9 +349,16 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
return nil
}
successDocs := migration_util.GetMapIntValue(util.MapStr(bulkTask.Metadata.Labels), "success_docs")
var (
bulkLabels = util.MapStr(bulkTask.Metadata.Labels)
successDocs = migration_util.GetMapIntValue(bulkLabels, "success_docs")
invalidDocs = migration_util.GetMapStringValue(bulkLabels, "invalid_docs")
invalidReasons = migration_util.GetMapStringValue(bulkLabels, "invalid_reasons")
failureDocs = migration_util.GetMapStringValue(bulkLabels, "failure_docs")
failureReasons = migration_util.GetMapStringValue(bulkLabels, "failure_reasons")
)
if successDocs != totalDocs {
return fmt.Errorf("bulk complete but docs count unmatch: %d / %d", successDocs, totalDocs)
return fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons)
}
taskItem.Status = task2.StatusComplete

View File

@ -3,6 +3,7 @@ package pipeline_task
import (
"errors"
"fmt"
"strings"
"time"
log "github.com/cihub/seelog"
@ -139,7 +140,7 @@ func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error
}
func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) error {
successDocs, indexDocs, bulked, err := p.getBulkIndexingTaskState(taskItem)
successDocs, indexDocs, bulked, totalInvalidDocs, totalInvalidReasons, totalFailureDocs, totalFailureReasons, err := p.getBulkIndexingTaskState(taskItem)
if !bulked {
return nil
}
@ -153,6 +154,10 @@ func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) e
taskItem.CompletedTime = &now
taskItem.Metadata.Labels["index_docs"] = indexDocs
taskItem.Metadata.Labels["success_docs"] = successDocs
taskItem.Metadata.Labels["invalid_docs"] = strings.Join(totalInvalidDocs, ",")
taskItem.Metadata.Labels["invalid_reasons"] = strings.Join(totalInvalidReasons, ",")
taskItem.Metadata.Labels["failure_docs"] = strings.Join(totalFailureDocs, ",")
taskItem.Metadata.Labels["failure_reasons"] = strings.Join(totalFailureReasons, ",")
if errMsg != "" {
taskItem.Status = task.StatusError
} else {
@ -303,13 +308,18 @@ func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int6
}
for _, hit := range hits {
scrolled = true
resultErr, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.result.error")
if errStr, ok := resultErr.(string); ok && errStr != "" {
m := util.MapStr(hit.Source)
errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error")
if errStr != "" {
err = errors.New(errStr)
return
}
m := util.MapStr(hit.Source)
scroll, total := migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.scrolled_docs"), migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.total_hits")
var (
scroll = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.scrolled_docs")
total = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.total_hits")
)
scrolledDocs += scroll
totalHits += total
@ -317,25 +327,42 @@ func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int6
return
}
func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, err error) {
func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, err error) {
hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"})
if err != nil {
log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err)
err = nil
return
}
for _, hit := range hits {
bulked = true
resultErr, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.result.error")
if errStr, ok := resultErr.(string); ok && errStr != "" {
m := util.MapStr(hit.Source)
errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error")
if errStr != "" {
err = errors.New(errStr)
return
}
m := util.MapStr(hit.Source)
success, failure, invalid := migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.success.count"), migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.failure.count"), migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.invalid.count")
var (
success = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.success.count")
failure = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.failure.count")
invalid = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.invalid.count")
)
successDocs += success
indexDocs += success + invalid + failure
var (
invalidDocs = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.invalid.documents")
invalidReasons = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.invalid.reasons")
failureDocs = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.failure.documents")
failureReasons = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.failure.reasons")
)
totalInvalidDocs = append(totalInvalidDocs, invalidDocs...)
totalInvalidReasons = append(invalidReasons, invalidReasons...)
totalFailureDocs = append(totalFailureDocs, failureDocs...)
totalFailureReasons = append(totalFailureReasons, failureReasons...)
}
return
}

View File

@ -64,3 +64,43 @@ func GetMapIntValue(m util.MapStr, key string) int64 {
}
return vv
}
func GetMapStringValue(m util.MapStr, key string) string {
v, err := m.GetValue(key)
if err != nil {
return ""
}
vv, err := util.ExtractString(v)
if err != nil {
log.Errorf("got %s but failed to extract, err: %v", key, err)
return ""
}
return vv
}
func GetMapStringSliceValue(m util.MapStr, key string) []string {
v, err := m.GetValue(key)
if err != nil {
return nil
}
vv, ok := v.([]string)
if !ok {
vv, ok := v.([]interface{})
if !ok {
log.Errorf("got %s but failed to extract, type: %T", key, v)
return nil
}
log.Info(key, vv)
ret := make([]string, len(vv))
var err error
for i := range vv {
ret[i], err = util.ExtractString(vv[i])
if err != nil {
log.Errorf("got %s but failed to extract, err: %v", key, err)
return nil
}
}
return ret
}
return vv
}