diff --git a/plugin/migration/model.go b/plugin/migration/model.go index 5bed1192..695bb798 100644 --- a/plugin/migration/model.go +++ b/plugin/migration/model.go @@ -11,22 +11,22 @@ type ElasticDataConfig struct { Source ClusterInfo `json:"source"` Target ClusterInfo `json:"target"` } `json:"cluster"` - Indices []IndexConfig `json:"indices"` + Indices []IndexConfig `json:"indices"` Settings struct { ParallelIndices int `json:"parallel_indices"` ParallelTaskPerIndex int `json:"parallel_task_per_index"` - Scroll struct { - SliceSize int `json:"slice_size"` - Docs int `json:"docs"` + Scroll struct { + SliceSize int `json:"slice_size"` + Docs int `json:"docs"` Timeout string `json:"timeout"` } `json:"scroll"` Bulk struct { - Docs int `json:"docs"` - StoreSizeInMB int `json:"store_size_in_mb"` - MaxWorkerSize int `json:"max_worker_size"` - IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` - SliceSize int `json:"slice_size"` - Compress bool `json:"compress"` + Docs int `json:"docs"` + StoreSizeInMB int `json:"store_size_in_mb"` + MaxWorkerSize int `json:"max_worker_size"` + IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` + SliceSize int `json:"slice_size"` + Compress bool `json:"compress"` } `json:"bulk"` Execution ExecutionConfig `json:"execution"` } `json:"settings"` @@ -38,74 +38,74 @@ type ElasticDataConfig struct { type ExecutionConfig struct { TimeWindow []TimeWindowItem `json:"time_window"` - Nodes struct{ + Nodes struct { Permit []ExecutionNode `json:"permit"` } `json:"nodes"` } type ExecutionNode struct { - ID string `json:"id"` + ID string `json:"id"` Name string `json:"name"` } type TimeWindowItem struct { Start string `json:"start"` - End string `json:"end"` + End string `json:"end"` } type IndexConfig struct { - Source IndexInfo `json:"source"` - Target IndexInfo `json:"target"` - RawFilter interface{} `json:"raw_filter"` + Source IndexInfo `json:"source"` + Target IndexInfo `json:"target"` + RawFilter interface{} `json:"raw_filter"` IndexRename map[string]interface{} `json:"index_rename"` - TypeRename map[string]interface{} `json:"type_rename"` - Partition *IndexPartition `json:"partition,omitempty"` + TypeRename map[string]interface{} `json:"type_rename"` + Partition *IndexPartition `json:"partition,omitempty"` //TaskID string `json:"task_id,omitempty"` //Status string `json:"status,omitempty"` - Percent float64 `json:"percent,omitempty"` - ErrorPartitions int `json:"error_partitions,omitempty"` + Percent float64 `json:"percent,omitempty"` + ErrorPartitions int `json:"error_partitions,omitempty"` } type IndexPartition struct { - FieldType string `json:"field_type"` - FieldName string `json:"field_name"` - Step interface{} `json:"step"` + FieldType string `json:"field_type"` + FieldName string `json:"field_name"` + Step interface{} `json:"step"` } type IndexInfo struct { Name string `json:"name"` - DocType string `json:"doc_type"` - Docs int64 `json:"docs"` + DocType string `json:"doc_type"` + Docs int64 `json:"docs"` StoreSizeInBytes int `json:"store_size_in_bytes"` } -func (ii *IndexInfo) GetUniqueIndexName() string{ +func (ii *IndexInfo) GetUniqueIndexName() string { return fmt.Sprintf("%s:%s", ii.Name, ii.DocType) } type ClusterInfo struct { - Id string `json:"id"` - Name string `json:"name"` + Id string `json:"id"` + Name string `json:"name"` Distribution string `json:"distribution,omitempty"` } type TaskCompleteState struct { - IsComplete bool - Error string + IsComplete bool + Error string ClearPipeline bool - PipelineIds []string - SuccessDocs float64 - ScrolledDocs float64 - RunningPhase int - TotalDocs interface{} + PipelineIds []string + RunningPhase int + TotalDocs int64 + SuccessDocs int64 + ScrolledDocs int64 } -type MajorTaskState struct{ +type MajorTaskState struct { ScrolledDocs float64 - IndexDocs float64 - Status string + IndexDocs float64 + Status string } type IndexStateInfo struct { ErrorPartitions int IndexDocs float64 -} \ No newline at end of file +} diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index f0623e19..a4503d83 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -1262,46 +1262,64 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo if cfg, ok = subTask.Config.(map[string]interface{}); !ok { return nil, fmt.Errorf("got wrong config of task %v", *subTask) } - totalDocs, err := util.MapStr(cfg).GetValue("source.doc_count") + totalDocsVal, err := util.MapStr(cfg).GetValue("source.doc_count") if err != nil { log.Errorf("failed to get source.doc_count, err: %v", err) return nil, err } + totalDocs, err := util.ExtractInt(totalDocsVal) + if err != nil { + log.Errorf("failed to extract source.doc_count, err: %v", err) + return nil, err + } var ( - indexDocs float64 - successDocs float64 - scrolledDocs interface{} + indexDocs int64 + successDocs int64 + scrolledDocs int64 state TaskCompleteState ) state.TotalDocs = totalDocs state.PipelineIds = pids + var bulked, scrolled bool for _, hit := range res.Hits.Hits { + if bulked && scrolled { + break + } resultErr, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.result.error") if errStr, ok := resultErr.(string); ok && errStr != "" { state.Error = errStr state.IsComplete = true state.ClearPipeline = true } - for _, key := range []string{"payload.pipeline.logging.context.bulk_indexing.success.count", "payload.pipeline.logging.context.bulk_indexing.failure.count", "payload.pipeline.logging.context.bulk_indexing.invalid.count"} { - v, err := util.MapStr(hit.Source).GetValue(key) - if err == nil { - if fv, ok := v.(float64); ok { - indexDocs += fv - if key == "payload.pipeline.logging.context.bulk_indexing.success.count" { - successDocs = fv - state.SuccessDocs = successDocs + if !bulked { + for _, key := range []string{"payload.pipeline.logging.context.bulk_indexing.success.count", "payload.pipeline.logging.context.bulk_indexing.failure.count", "payload.pipeline.logging.context.bulk_indexing.invalid.count"} { + v, err := util.MapStr(hit.Source).GetValue(key) + if err == nil { + bulked = true + if fv, err := util.ExtractInt(v); err == nil { + indexDocs += fv + if key == "payload.pipeline.logging.context.bulk_indexing.success.count" { + successDocs = fv + state.SuccessDocs = successDocs + } + } else { + log.Errorf("got %s but failed to extract, err: %v", key, err) } } - } else { - break } } - v, err := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.context.es_scroll.scrolled_docs") - if err == nil { - scrolledDocs = v - if vv, ok := v.(float64); ok { - state.ScrolledDocs = vv + + if !scrolled { + v, err := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.context.es_scroll.scrolled_docs") + if err == nil { + scrolled = true + if vv, err := util.ExtractInt(v); err == nil { + scrolledDocs = vv + state.ScrolledDocs = vv + } else { + log.Errorf("got payload.pipeline.logging.context.es_scroll.scrolled_docs but failed to extract, err: %v", err) + } } } }