[migration] fix bulk_indexing counting logic (#60)

[migration] fix bulk_indexing counting logic

Co-authored-by: Kassian Sun <kassiansun@outlook.com>
This commit is contained in:
sunjiacheng 2023-04-11 15:58:24 +08:00 committed by medcl
parent 9c11312f40
commit dc606e457d
2 changed files with 76 additions and 58 deletions

View File

@ -11,22 +11,22 @@ type ElasticDataConfig struct {
Source ClusterInfo `json:"source"` Source ClusterInfo `json:"source"`
Target ClusterInfo `json:"target"` Target ClusterInfo `json:"target"`
} `json:"cluster"` } `json:"cluster"`
Indices []IndexConfig `json:"indices"` Indices []IndexConfig `json:"indices"`
Settings struct { Settings struct {
ParallelIndices int `json:"parallel_indices"` ParallelIndices int `json:"parallel_indices"`
ParallelTaskPerIndex int `json:"parallel_task_per_index"` ParallelTaskPerIndex int `json:"parallel_task_per_index"`
Scroll struct { Scroll struct {
SliceSize int `json:"slice_size"` SliceSize int `json:"slice_size"`
Docs int `json:"docs"` Docs int `json:"docs"`
Timeout string `json:"timeout"` Timeout string `json:"timeout"`
} `json:"scroll"` } `json:"scroll"`
Bulk struct { Bulk struct {
Docs int `json:"docs"` Docs int `json:"docs"`
StoreSizeInMB int `json:"store_size_in_mb"` StoreSizeInMB int `json:"store_size_in_mb"`
MaxWorkerSize int `json:"max_worker_size"` MaxWorkerSize int `json:"max_worker_size"`
IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"`
SliceSize int `json:"slice_size"` SliceSize int `json:"slice_size"`
Compress bool `json:"compress"` Compress bool `json:"compress"`
} `json:"bulk"` } `json:"bulk"`
Execution ExecutionConfig `json:"execution"` Execution ExecutionConfig `json:"execution"`
} `json:"settings"` } `json:"settings"`
@ -38,71 +38,71 @@ type ElasticDataConfig struct {
type ExecutionConfig struct { type ExecutionConfig struct {
TimeWindow []TimeWindowItem `json:"time_window"` TimeWindow []TimeWindowItem `json:"time_window"`
Nodes struct{ Nodes struct {
Permit []ExecutionNode `json:"permit"` Permit []ExecutionNode `json:"permit"`
} `json:"nodes"` } `json:"nodes"`
} }
type ExecutionNode struct { type ExecutionNode struct {
ID string `json:"id"` ID string `json:"id"`
Name string `json:"name"` Name string `json:"name"`
} }
type TimeWindowItem struct { type TimeWindowItem struct {
Start string `json:"start"` Start string `json:"start"`
End string `json:"end"` End string `json:"end"`
} }
type IndexConfig struct { type IndexConfig struct {
Source IndexInfo `json:"source"` Source IndexInfo `json:"source"`
Target IndexInfo `json:"target"` Target IndexInfo `json:"target"`
RawFilter interface{} `json:"raw_filter"` RawFilter interface{} `json:"raw_filter"`
IndexRename map[string]interface{} `json:"index_rename"` IndexRename map[string]interface{} `json:"index_rename"`
TypeRename map[string]interface{} `json:"type_rename"` TypeRename map[string]interface{} `json:"type_rename"`
Partition *IndexPartition `json:"partition,omitempty"` Partition *IndexPartition `json:"partition,omitempty"`
//TaskID string `json:"task_id,omitempty"` //TaskID string `json:"task_id,omitempty"`
//Status string `json:"status,omitempty"` //Status string `json:"status,omitempty"`
Percent float64 `json:"percent,omitempty"` Percent float64 `json:"percent,omitempty"`
ErrorPartitions int `json:"error_partitions,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"`
} }
type IndexPartition struct { type IndexPartition struct {
FieldType string `json:"field_type"` FieldType string `json:"field_type"`
FieldName string `json:"field_name"` FieldName string `json:"field_name"`
Step interface{} `json:"step"` Step interface{} `json:"step"`
} }
type IndexInfo struct { type IndexInfo struct {
Name string `json:"name"` Name string `json:"name"`
DocType string `json:"doc_type"` DocType string `json:"doc_type"`
Docs int64 `json:"docs"` Docs int64 `json:"docs"`
StoreSizeInBytes int `json:"store_size_in_bytes"` StoreSizeInBytes int `json:"store_size_in_bytes"`
} }
func (ii *IndexInfo) GetUniqueIndexName() string{ func (ii *IndexInfo) GetUniqueIndexName() string {
return fmt.Sprintf("%s:%s", ii.Name, ii.DocType) return fmt.Sprintf("%s:%s", ii.Name, ii.DocType)
} }
type ClusterInfo struct { type ClusterInfo struct {
Id string `json:"id"` Id string `json:"id"`
Name string `json:"name"` Name string `json:"name"`
Distribution string `json:"distribution,omitempty"` Distribution string `json:"distribution,omitempty"`
} }
type TaskCompleteState struct { type TaskCompleteState struct {
IsComplete bool IsComplete bool
Error string Error string
ClearPipeline bool ClearPipeline bool
PipelineIds []string PipelineIds []string
SuccessDocs float64 RunningPhase int
ScrolledDocs float64 TotalDocs int64
RunningPhase int SuccessDocs int64
TotalDocs interface{} ScrolledDocs int64
} }
type MajorTaskState struct{ type MajorTaskState struct {
ScrolledDocs float64 ScrolledDocs float64
IndexDocs float64 IndexDocs float64
Status string Status string
} }
type IndexStateInfo struct { type IndexStateInfo struct {

View File

@ -1262,46 +1262,64 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo
if cfg, ok = subTask.Config.(map[string]interface{}); !ok { if cfg, ok = subTask.Config.(map[string]interface{}); !ok {
return nil, fmt.Errorf("got wrong config of task %v", *subTask) return nil, fmt.Errorf("got wrong config of task %v", *subTask)
} }
totalDocs, err := util.MapStr(cfg).GetValue("source.doc_count") totalDocsVal, err := util.MapStr(cfg).GetValue("source.doc_count")
if err != nil { if err != nil {
log.Errorf("failed to get source.doc_count, err: %v", err) log.Errorf("failed to get source.doc_count, err: %v", err)
return nil, err return nil, err
} }
totalDocs, err := util.ExtractInt(totalDocsVal)
if err != nil {
log.Errorf("failed to extract source.doc_count, err: %v", err)
return nil, err
}
var ( var (
indexDocs float64 indexDocs int64
successDocs float64 successDocs int64
scrolledDocs interface{} scrolledDocs int64
state TaskCompleteState state TaskCompleteState
) )
state.TotalDocs = totalDocs state.TotalDocs = totalDocs
state.PipelineIds = pids state.PipelineIds = pids
var bulked, scrolled bool
for _, hit := range res.Hits.Hits { for _, hit := range res.Hits.Hits {
if bulked && scrolled {
break
}
resultErr, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.result.error") resultErr, _ := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.result.error")
if errStr, ok := resultErr.(string); ok && errStr != "" { if errStr, ok := resultErr.(string); ok && errStr != "" {
state.Error = errStr state.Error = errStr
state.IsComplete = true state.IsComplete = true
state.ClearPipeline = true state.ClearPipeline = true
} }
for _, key := range []string{"payload.pipeline.logging.context.bulk_indexing.success.count", "payload.pipeline.logging.context.bulk_indexing.failure.count", "payload.pipeline.logging.context.bulk_indexing.invalid.count"} { if !bulked {
v, err := util.MapStr(hit.Source).GetValue(key) for _, key := range []string{"payload.pipeline.logging.context.bulk_indexing.success.count", "payload.pipeline.logging.context.bulk_indexing.failure.count", "payload.pipeline.logging.context.bulk_indexing.invalid.count"} {
if err == nil { v, err := util.MapStr(hit.Source).GetValue(key)
if fv, ok := v.(float64); ok { if err == nil {
indexDocs += fv bulked = true
if key == "payload.pipeline.logging.context.bulk_indexing.success.count" { if fv, err := util.ExtractInt(v); err == nil {
successDocs = fv indexDocs += fv
state.SuccessDocs = successDocs if key == "payload.pipeline.logging.context.bulk_indexing.success.count" {
successDocs = fv
state.SuccessDocs = successDocs
}
} else {
log.Errorf("got %s but failed to extract, err: %v", key, err)
} }
} }
} else {
break
} }
} }
v, err := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.context.es_scroll.scrolled_docs")
if err == nil { if !scrolled {
scrolledDocs = v v, err := util.MapStr(hit.Source).GetValue("payload.pipeline.logging.context.es_scroll.scrolled_docs")
if vv, ok := v.(float64); ok { if err == nil {
state.ScrolledDocs = vv scrolled = true
if vv, err := util.ExtractInt(v); err == nil {
scrolledDocs = vv
state.ScrolledDocs = vv
} else {
log.Errorf("got payload.pipeline.logging.context.es_scroll.scrolled_docs but failed to extract, err: %v", err)
}
} }
} }
} }