From 5825bd0f719cc3e4984192d97542802e40c54ef1 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Wed, 17 May 2023 19:11:03 +0800 Subject: [PATCH] [migration] split pipeline code, clear labels for each type --- .../migration/pipeline_task/bulk_indexing.go | 99 +++++++++++ plugin/migration/pipeline_task/dump_hash.go | 4 + plugin/migration/pipeline_task/es_scroll.go | 81 +++++++++ plugin/migration/pipeline_task/index_diff.go | 9 + .../migration/pipeline_task/pipeline_task.go | 163 ++---------------- 5 files changed, 203 insertions(+), 153 deletions(-) create mode 100644 plugin/migration/pipeline_task/bulk_indexing.go create mode 100644 plugin/migration/pipeline_task/es_scroll.go diff --git a/plugin/migration/pipeline_task/bulk_indexing.go b/plugin/migration/pipeline_task/bulk_indexing.go new file mode 100644 index 00000000..a2029a94 --- /dev/null +++ b/plugin/migration/pipeline_task/bulk_indexing.go @@ -0,0 +1,99 @@ +package pipeline_task + +import ( + "fmt" + "strings" + "time" + + log "github.com/cihub/seelog" + migration_util "infini.sh/console/plugin/migration/util" + "infini.sh/framework/core/task" +) + +func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) error { + successDocs, indexDocs, bulked, totalInvalidDocs, totalInvalidReasons, totalFailureDocs, totalFailureReasons, errs := p.getBulkIndexingTaskState(taskItem) + if !bulked { + return nil + } + + var errMsg string + if len(errs) > 0 { + errMsg = fmt.Sprintf("bulk finished with error(s): %v", errs) + } + now := time.Now() + taskItem.CompletedTime = &now + taskItem.Metadata.Labels["index_docs"] = indexDocs + taskItem.Metadata.Labels["success_docs"] = successDocs + taskItem.Metadata.Labels["invalid_docs"] = strings.Join(totalInvalidDocs, ",") + taskItem.Metadata.Labels["invalid_reasons"] = strings.Join(totalInvalidReasons, ",") + taskItem.Metadata.Labels["failure_docs"] = strings.Join(totalFailureDocs, ",") + taskItem.Metadata.Labels["failure_reasons"] = strings.Join(totalFailureReasons, ",") + if errMsg != "" { + taskItem.Status = task.StatusError + } else { + taskItem.Status = task.StatusComplete + } + + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: errMsg == "", + Error: errMsg, + }, fmt.Sprintf("[bulk_indexing] pipeline task [%s] completed", taskItem.ID)) + p.cleanGatewayPipeline(taskItem) + return nil +} + +func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, errs []string) { + newHits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli()) + if err != nil { + log.Errorf("failed to get latest pipeline logs for task [%s], err: %v", taskItem.ID, err) + return + } + if len(newHits) == 0 { + log.Debugf("bulk task [%s] not finished yet since last start", taskItem.ID) + return + } + + hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0) + if err != nil { + log.Errorf("failed to get all pipeline logs for task [%s], err: %v", taskItem.ID, err) + return + } + + for _, m := range hits { + bulked = true + + errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") + if errStr != "" { + errs = append(errs, errStr) + } + + var ( + success = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.success.count") + failure = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.failure.count") + invalid = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.invalid.count") + ) + successDocs += success + indexDocs += success + invalid + failure + + var ( + invalidDocs = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.invalid.documents") + invalidReasons = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.invalid.reasons") + failureDocs = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.failure.documents") + failureReasons = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.failure.reasons") + ) + totalInvalidDocs = append(totalInvalidDocs, invalidDocs...) + totalInvalidReasons = append(invalidReasons, invalidReasons...) + totalFailureDocs = append(totalFailureDocs, failureDocs...) + totalFailureReasons = append(totalFailureReasons, failureReasons...) + } + return +} + +func (p *processor) clearBulkIndexLabels(labels map[string]interface{}) { + delete(labels, "index_docs") + delete(labels, "success_docs") + delete(labels, "invalid_docs") + delete(labels, "invalid_reasons") + delete(labels, "failure_docs") + delete(labels, "failure_reasons") +} diff --git a/plugin/migration/pipeline_task/dump_hash.go b/plugin/migration/pipeline_task/dump_hash.go index 9cd93870..b001bb87 100644 --- a/plugin/migration/pipeline_task/dump_hash.go +++ b/plugin/migration/pipeline_task/dump_hash.go @@ -74,3 +74,7 @@ func (p *processor) getDumpHashTaskState(taskItem *task.Task) (scrolledDocs int6 } return } + +func (p *processor) clearDumpHashLabels(labels map[string]interface{}) { + delete(labels, "scrolled_docs") +} diff --git a/plugin/migration/pipeline_task/es_scroll.go b/plugin/migration/pipeline_task/es_scroll.go new file mode 100644 index 00000000..2f41ddae --- /dev/null +++ b/plugin/migration/pipeline_task/es_scroll.go @@ -0,0 +1,81 @@ +package pipeline_task + +import ( + "errors" + "fmt" + "time" + + log "github.com/cihub/seelog" + migration_util "infini.sh/console/plugin/migration/util" + "infini.sh/framework/core/task" +) + +func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error { + scrolledDocs, totalHits, scrolled, err := p.getEsScrollTaskState(taskItem) + + if !scrolled { + return nil + } + + var errMsg string + if err != nil { + errMsg = err.Error() + } + if errMsg == "" { + if scrolledDocs < totalHits { + errMsg = fmt.Sprintf("scrolled finished but docs count unmatch: %d / %d", scrolledDocs, totalHits) + } + } + + now := time.Now() + taskItem.CompletedTime = &now + taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs + if errMsg != "" { + taskItem.Status = task.StatusError + } else { + taskItem.Status = task.StatusComplete + } + + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: errMsg == "", + Error: errMsg, + }, fmt.Sprintf("[es_scroll] pipeline task [%s] completed", taskItem.ID)) + p.cleanGatewayPipeline(taskItem) + return nil +} + +func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int64, totalHits int64, scrolled bool, err error) { + hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli()) + if err != nil { + log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) + err = nil + return + } + if len(hits) == 0 { + log.Debugf("scroll task [%s] not finished yet since last start", taskItem.ID) + return + } + // NOTE: we only check the last run of es_scroll + for _, m := range hits { + scrolled = true + + errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") + if errStr != "" { + err = errors.New(errStr) + return + } + + var ( + scroll = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.scrolled_docs") + total = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.total_hits") + ) + + scrolledDocs += scroll + totalHits += total + } + return +} + +func (p *processor) clearEsScrollLabels(labels map[string]interface{}) { + delete(labels, "scrolled_docs") +} diff --git a/plugin/migration/pipeline_task/index_diff.go b/plugin/migration/pipeline_task/index_diff.go index 5622eff3..6f969643 100644 --- a/plugin/migration/pipeline_task/index_diff.go +++ b/plugin/migration/pipeline_task/index_diff.go @@ -91,3 +91,12 @@ func (p *processor) getIndexDiffTaskState(taskItem *task.Task) (diffed bool, onl } return } + +func (p *processor) clearIndexDiffLabels(labels map[string]interface{}) { + delete(labels, "only_in_source_count") + delete(labels, "only_in_source_keys") + delete(labels, "only_in_target_count") + delete(labels, "only_in_target_keys") + delete(labels, "diff_both_count") + delete(labels, "diff_both_keys") +} diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index b8566bb4..b213c207 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -91,14 +91,16 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error { return err } - // TODO: find a better way to handle this - taskItem.Metadata.Labels["index_docs"] = 0 - taskItem.Metadata.Labels["success_docs"] = 0 - taskItem.Metadata.Labels["invalid_docs"] = "" - taskItem.Metadata.Labels["invalid_reasons"] = "" - taskItem.Metadata.Labels["failure_docs"] = "" - taskItem.Metadata.Labels["failure_reasons"] = "" - taskItem.Metadata.Labels["scrolled_docs"] = 0 + switch taskItem.Metadata.Labels["pipeline_id"] { + case "es_scroll": + p.clearEsScrollLabels(taskItem.Metadata.Labels) + case "bulk_indexing": + p.clearBulkIndexLabels(taskItem.Metadata.Labels) + case "dump_hash": + p.clearDumpHashLabels(taskItem.Metadata.Labels) + case "index_diff": + p.clearIndexDiffLabels(taskItem.Metadata.Labels) + } taskItem.Status = task.StatusRunning taskItem.StartTimeInMillis = time.Now().UnixMilli() @@ -125,72 +127,6 @@ func (p *processor) handleRunningPipelineTask(taskItem *task.Task) error { return nil } -func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error { - scrolledDocs, totalHits, scrolled, err := p.getEsScrollTaskState(taskItem) - - if !scrolled { - return nil - } - - var errMsg string - if err != nil { - errMsg = err.Error() - } - if errMsg == "" { - if scrolledDocs < totalHits { - errMsg = fmt.Sprintf("scrolled finished but docs count unmatch: %d / %d", scrolledDocs, totalHits) - } - } - - now := time.Now() - taskItem.CompletedTime = &now - taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs - if errMsg != "" { - taskItem.Status = task.StatusError - } else { - taskItem.Status = task.StatusComplete - } - - p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ - Success: errMsg == "", - Error: errMsg, - }, fmt.Sprintf("[es_scroll] pipeline task [%s] completed", taskItem.ID)) - p.cleanGatewayPipeline(taskItem) - return nil -} - -func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) error { - successDocs, indexDocs, bulked, totalInvalidDocs, totalInvalidReasons, totalFailureDocs, totalFailureReasons, errs := p.getBulkIndexingTaskState(taskItem) - if !bulked { - return nil - } - - var errMsg string - if len(errs) > 0 { - errMsg = fmt.Sprintf("bulk finished with error(s): %v", errs) - } - now := time.Now() - taskItem.CompletedTime = &now - taskItem.Metadata.Labels["index_docs"] = indexDocs - taskItem.Metadata.Labels["success_docs"] = successDocs - taskItem.Metadata.Labels["invalid_docs"] = strings.Join(totalInvalidDocs, ",") - taskItem.Metadata.Labels["invalid_reasons"] = strings.Join(totalInvalidReasons, ",") - taskItem.Metadata.Labels["failure_docs"] = strings.Join(totalFailureDocs, ",") - taskItem.Metadata.Labels["failure_reasons"] = strings.Join(totalFailureReasons, ",") - if errMsg != "" { - taskItem.Status = task.StatusError - } else { - taskItem.Status = task.StatusComplete - } - - p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ - Success: errMsg == "", - Error: errMsg, - }, fmt.Sprintf("[bulk_indexing] pipeline task [%s] completed", taskItem.ID)) - p.cleanGatewayPipeline(taskItem) - return nil -} - func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error { switch taskItem.Metadata.Labels["pipeline_id"] { case "es_scroll": @@ -313,85 +249,6 @@ func (p *processor) getParentTask(taskItem *task.Task) (*task.Task, error) { return nil, errors.New("not reachable") } -func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int64, totalHits int64, scrolled bool, err error) { - hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli()) - if err != nil { - log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) - err = nil - return - } - if len(hits) == 0 { - log.Debugf("scroll task [%s] not finished yet since last start", taskItem.ID) - return - } - // NOTE: we only check the last run of es_scroll - for _, m := range hits { - scrolled = true - - errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") - if errStr != "" { - err = errors.New(errStr) - return - } - - var ( - scroll = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.scrolled_docs") - total = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.es_scroll.total_hits") - ) - - scrolledDocs += scroll - totalHits += total - } - return -} - -func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, errs []string) { - newHits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli()) - if err != nil { - log.Errorf("failed to get latest pipeline logs for task [%s], err: %v", taskItem.ID, err) - return - } - if len(newHits) == 0 { - log.Debugf("bulk task [%s] not finished yet since last start", taskItem.ID) - return - } - - hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0) - if err != nil { - log.Errorf("failed to get all pipeline logs for task [%s], err: %v", taskItem.ID, err) - return - } - - for _, m := range hits { - bulked = true - - errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") - if errStr != "" { - errs = append(errs, errStr) - } - - var ( - success = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.success.count") - failure = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.failure.count") - invalid = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.bulk_indexing.invalid.count") - ) - successDocs += success - indexDocs += success + invalid + failure - - var ( - invalidDocs = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.invalid.documents") - invalidReasons = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.invalid.reasons") - failureDocs = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.failure.documents") - failureReasons = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.bulk_indexing.detail.failure.reasons") - ) - totalInvalidDocs = append(totalInvalidDocs, invalidDocs...) - totalInvalidReasons = append(invalidReasons, invalidReasons...) - totalFailureDocs = append(totalFailureDocs, failureDocs...) - totalFailureReasons = append(totalFailureReasons, failureReasons...) - } - return -} - func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]util.MapStr, error) { query := util.MapStr{ "size": 999,