diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 2077cf7f..bf69eb26 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -328,7 +328,10 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { return nil } - scrolledDocs := migration_util.GetMapIntValue(util.MapStr(scrollTask.Metadata.Labels), "scrolled_docs") + var ( + scrollLabels = util.MapStr(scrollTask.Metadata.Labels) + scrolledDocs = migration_util.GetMapIntValue(scrollLabels, "scrolled_docs") + ) if scrolledDocs != totalDocs { return fmt.Errorf("scroll complete but docs count unmatch: %d / %d", scrolledDocs, totalDocs) } @@ -346,9 +349,16 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { return nil } - successDocs := migration_util.GetMapIntValue(util.MapStr(bulkTask.Metadata.Labels), "success_docs") + var ( + bulkLabels = util.MapStr(bulkTask.Metadata.Labels) + successDocs = migration_util.GetMapIntValue(bulkLabels, "success_docs") + invalidDocs = migration_util.GetMapStringValue(bulkLabels, "invalid_docs") + invalidReasons = migration_util.GetMapStringValue(bulkLabels, "invalid_reasons") + failureDocs = migration_util.GetMapStringValue(bulkLabels, "failure_docs") + failureReasons = migration_util.GetMapStringValue(bulkLabels, "failure_reasons") + ) if successDocs != totalDocs { - return fmt.Errorf("bulk complete but docs count unmatch: %d / %d", successDocs, totalDocs) + return fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons) } taskItem.Status = task2.StatusComplete diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index a7143198..2f54beb5 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -3,6 +3,7 @@ package pipeline_task import ( "errors" "fmt" + "strings" "time" log "github.com/cihub/seelog" @@ -139,7 +140,7 @@ func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error } func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) error { - successDocs, indexDocs, bulked, err := p.getBulkIndexingTaskState(taskItem) + successDocs, indexDocs, bulked, totalInvalidDocs, totalInvalidReasons, totalFailureDocs, totalFailureReasons, err := p.getBulkIndexingTaskState(taskItem) if !bulked { return nil } @@ -153,6 +154,10 @@ func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) e taskItem.CompletedTime = &now taskItem.Metadata.Labels["index_docs"] = indexDocs taskItem.Metadata.Labels["success_docs"] = successDocs + taskItem.Metadata.Labels["invalid_docs"] = strings.Join(totalInvalidDocs, ",") + taskItem.Metadata.Labels["invalid_reasons"] = strings.Join(totalInvalidReasons, ",") + taskItem.Metadata.Labels["failure_docs"] = strings.Join(totalFailureDocs, ",") + taskItem.Metadata.Labels["failure_reasons"] = strings.Join(totalFailureReasons, ",") if errMsg != "" { taskItem.Status = task.StatusError } else { @@ -303,13 +308,18 @@ func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int6 } for _, hit := range hits { scrolled = true - resultErr, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.result.error") - if errStr, ok := resultErr.(string); ok && errStr != "" { + m := util.MapStr(hit.Source) + + errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") + if errStr != "" { err = errors.New(errStr) return } - m := util.MapStr(hit.Source) - scroll, total := migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.scrolled_docs"), migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.total_hits") + + var ( + scroll = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.scrolled_docs") + total = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.total_hits") + ) scrolledDocs += scroll totalHits += total @@ -317,25 +327,42 @@ func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int6 return } -func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, err error) { +func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, err error) { hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}) if err != nil { log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) err = nil return } + for _, hit := range hits { bulked = true - resultErr, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.result.error") - if errStr, ok := resultErr.(string); ok && errStr != "" { + m := util.MapStr(hit.Source) + + errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") + if errStr != "" { err = errors.New(errStr) return } - m := util.MapStr(hit.Source) - success, failure, invalid := migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.success.count"), migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.failure.count"), migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.invalid.count") + var ( + success = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.success.count") + failure = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.failure.count") + invalid = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.invalid.count") + ) successDocs += success indexDocs += success + invalid + failure + + var ( + invalidDocs = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.invalid.documents") + invalidReasons = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.invalid.reasons") + failureDocs = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.failure.documents") + failureReasons = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.failure.reasons") + ) + totalInvalidDocs = append(totalInvalidDocs, invalidDocs...) + totalInvalidReasons = append(invalidReasons, invalidReasons...) + totalFailureDocs = append(totalFailureDocs, failureDocs...) + totalFailureReasons = append(totalFailureReasons, failureReasons...) } return } diff --git a/plugin/migration/util/util.go b/plugin/migration/util/util.go index 386d2c6f..aa984ebb 100644 --- a/plugin/migration/util/util.go +++ b/plugin/migration/util/util.go @@ -64,3 +64,43 @@ func GetMapIntValue(m util.MapStr, key string) int64 { } return vv } + +func GetMapStringValue(m util.MapStr, key string) string { + v, err := m.GetValue(key) + if err != nil { + return "" + } + vv, err := util.ExtractString(v) + if err != nil { + log.Errorf("got %s but failed to extract, err: %v", key, err) + return "" + } + return vv +} + +func GetMapStringSliceValue(m util.MapStr, key string) []string { + v, err := m.GetValue(key) + if err != nil { + return nil + } + vv, ok := v.([]string) + if !ok { + vv, ok := v.([]interface{}) + if !ok { + log.Errorf("got %s but failed to extract, type: %T", key, v) + return nil + } + log.Info(key, vv) + ret := make([]string, len(vv)) + var err error + for i := range vv { + ret[i], err = util.ExtractString(vv[i]) + if err != nil { + log.Errorf("got %s but failed to extract, err: %v", key, err) + return nil + } + } + return ret + } + return vv +}