Merge pull request 'optimize comparison' (#175) from optimize_comparison into master
Reviewed-on: https://git.infini.ltd:64443/infini/console/pulls/175
This commit is contained in:
commit
242d5364a0
|
@ -56,10 +56,12 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
taskItem.RetryTimes++
|
taskItem.RetryTimes++
|
||||||
if taskItem.StartTimeInMillis == 0 {
|
taskItem.StartTimeInMillis = time.Now().UnixMilli()
|
||||||
taskItem.StartTimeInMillis = time.Now().UnixMilli()
|
|
||||||
}
|
|
||||||
taskItem.Status = task.StatusRunning
|
taskItem.Status = task.StatusRunning
|
||||||
|
taskItem.Metadata.Labels["total_diff_docs"] = 0
|
||||||
|
taskItem.Metadata.Labels["only_in_source"] = 0
|
||||||
|
taskItem.Metadata.Labels["only_in_target"] = 0
|
||||||
|
taskItem.Metadata.Labels["diff_both"] = 0
|
||||||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
Success: true,
|
Success: true,
|
||||||
}, fmt.Sprintf("cluster comparison task [%s] started", taskItem.ID))
|
}, fmt.Sprintf("cluster comparison task [%s] started", taskItem.ID))
|
||||||
|
|
|
@ -105,6 +105,7 @@ func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rba
|
||||||
"source_total_docs": sourceTotalDocs,
|
"source_total_docs": sourceTotalDocs,
|
||||||
"target_total_docs": targetTotalDocs,
|
"target_total_docs": targetTotalDocs,
|
||||||
"permit_nodes": config.Settings.Execution.Nodes.Permit,
|
"permit_nodes": config.Settings.Execution.Nodes.Permit,
|
||||||
|
"name": config.Name,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Cancellable: true,
|
Cancellable: true,
|
||||||
|
@ -112,6 +113,9 @@ func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rba
|
||||||
Status: task.StatusInit,
|
Status: task.StatusInit,
|
||||||
ConfigString: util.MustToJSON(config),
|
ConfigString: util.MustToJSON(config),
|
||||||
}
|
}
|
||||||
|
if len(config.Tags) > 0 {
|
||||||
|
t.Metadata.Labels["tags"] = config.Tags
|
||||||
|
}
|
||||||
t.ID = util.GetUUID()
|
t.ID = util.GetUUID()
|
||||||
return &t, nil
|
return &t, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -160,7 +160,11 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) {
|
||||||
sourceM.Put("metadata.labels.source_total_docs", ts.SourceDocs)
|
sourceM.Put("metadata.labels.source_total_docs", ts.SourceDocs)
|
||||||
}
|
}
|
||||||
case "cluster_comparison":
|
case "cluster_comparison":
|
||||||
ts, _, err := h.getComparisonMajorTaskInfo(taskID)
|
targetTaskId := taskID
|
||||||
|
if repeatStatus.IsRepeat && repeatStatus.LastRunChildTaskID != "" {
|
||||||
|
targetTaskId = repeatStatus.LastRunChildTaskID
|
||||||
|
}
|
||||||
|
ts, _, err := h.getComparisonMajorTaskInfo(targetTaskId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("fetch progress info of task error: %v", err)
|
log.Warnf("fetch progress info of task error: %v", err)
|
||||||
return
|
return
|
||||||
|
@ -189,7 +193,7 @@ func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps http
|
||||||
h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError)
|
h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if obj.Metadata.Type != "pipeline" && obj.Status == task.StatusComplete {
|
if obj.Metadata.Type != "pipeline" && (obj.Status == task.StatusComplete && obj.Metadata.Type != "cluster_comparison") {
|
||||||
h.WriteError(w, fmt.Sprintf("[%s] task [%s] completed, can't start anymore", obj.Metadata.Type, taskID), http.StatusInternalServerError)
|
h.WriteError(w, fmt.Sprintf("[%s] task [%s] completed, can't start anymore", obj.Metadata.Type, taskID), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http.
|
||||||
var completedIndices int
|
var completedIndices int
|
||||||
for i, index := range taskConfig.Indices {
|
for i, index := range taskConfig.Indices {
|
||||||
indexName := index.Source.GetUniqueIndexName()
|
indexName := index.Source.GetUniqueIndexName()
|
||||||
count := indexState[indexName].SourceScrollDocs + indexState[indexName].TargetScrollDocs
|
count := indexState[indexName].TotalScrollDocs
|
||||||
percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100
|
percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100
|
||||||
if percent > 100 {
|
if percent > 100 {
|
||||||
percent = 100
|
percent = 100
|
||||||
|
@ -82,7 +82,9 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http.
|
||||||
taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceTotalDocs
|
taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceTotalDocs
|
||||||
taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs
|
taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs
|
||||||
taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2)
|
taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2)
|
||||||
|
taskConfig.Indices[i].TotalScrollDocs = count
|
||||||
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
||||||
|
taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren
|
||||||
if count == index.Source.Docs+index.Target.Docs {
|
if count == index.Source.Docs+index.Target.Docs {
|
||||||
completedIndices++
|
completedIndices++
|
||||||
}
|
}
|
||||||
|
@ -115,6 +117,7 @@ type ClusterComparisonTaskState struct {
|
||||||
TargetTotalDocs int64
|
TargetTotalDocs int64
|
||||||
TargetScrollDocs int64
|
TargetScrollDocs int64
|
||||||
TotalDiffDocs int64
|
TotalDiffDocs int64
|
||||||
|
RunningChildren int
|
||||||
}
|
}
|
||||||
|
|
||||||
type ComparisonIndexStateInfo struct {
|
type ComparisonIndexStateInfo struct {
|
||||||
|
@ -124,73 +127,140 @@ type ComparisonIndexStateInfo struct {
|
||||||
TargetTotalDocs int64
|
TargetTotalDocs int64
|
||||||
TargetScrollDocs int64
|
TargetScrollDocs int64
|
||||||
TotalDiffDocs int64
|
TotalDiffDocs int64
|
||||||
|
RunningChildren int
|
||||||
|
TotalScrollDocs int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: calc realtime info from instance
|
|
||||||
func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) {
|
func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) {
|
||||||
|
var pipelineTaskIDs = map[string][]string{}
|
||||||
|
var pipelineIndexNames = map[string]string{}
|
||||||
indexState = map[string]ComparisonIndexStateInfo{}
|
indexState = map[string]ComparisonIndexStateInfo{}
|
||||||
|
const size = 500
|
||||||
taskQuery := util.MapStr{
|
var (
|
||||||
"size": 500,
|
from = -size
|
||||||
"query": util.MapStr{
|
hasMore = true
|
||||||
"bool": util.MapStr{
|
)
|
||||||
"must": []util.MapStr{
|
for hasMore {
|
||||||
{
|
from += size
|
||||||
"term": util.MapStr{
|
taskQuery := util.MapStr{
|
||||||
"parent_id": util.MapStr{
|
"size": 500,
|
||||||
"value": taskID,
|
"query": util.MapStr{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"must": []util.MapStr{
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"parent_id": util.MapStr{
|
||||||
|
"value": taskID,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
{
|
||||||
{
|
"term": util.MapStr{
|
||||||
"term": util.MapStr{
|
"metadata.type": util.MapStr{
|
||||||
"metadata.type": util.MapStr{
|
"value": "index_comparison",
|
||||||
"value": "index_comparison",
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
|
||||||
}
|
|
||||||
subTasks, err := migration_util.GetTasks(taskQuery)
|
|
||||||
if err != nil {
|
|
||||||
return taskStats, indexState, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, subTask := range subTasks {
|
|
||||||
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
|
||||||
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
|
||||||
if indexName == "" {
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
subTasks, err := migration_util.GetTasks(taskQuery)
|
||||||
cfg := migration_model.IndexComparisonTaskConfig{}
|
|
||||||
err = migration_util.GetTaskConfig(&subTask, &cfg)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to get task config, err: %v", err)
|
return taskStats, indexState, err
|
||||||
|
}
|
||||||
|
if len(subTasks) < size {
|
||||||
|
hasMore = false
|
||||||
|
}
|
||||||
|
|
||||||
|
var indexMigrationTaskIDs []string
|
||||||
|
for _, subTask := range subTasks {
|
||||||
|
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
||||||
|
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
||||||
|
if indexName == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg := migration_model.IndexComparisonTaskConfig{}
|
||||||
|
err = migration_util.GetTaskConfig(&subTask, &cfg)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get task config, err: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs")
|
||||||
|
taskStats.SourceTotalDocs += cfg.Source.DocCount
|
||||||
|
taskStats.TargetTotalDocs += cfg.Target.DocCount
|
||||||
|
taskStats.TotalDiffDocs += totalDiffDocs
|
||||||
|
st := indexState[indexName]
|
||||||
|
st.SourceTotalDocs += cfg.Source.DocCount
|
||||||
|
st.TargetTotalDocs += cfg.Target.DocCount
|
||||||
|
st.TotalDiffDocs += totalDiffDocs
|
||||||
|
if subTask.Status == task.StatusError {
|
||||||
|
st.ErrorPartitions += 1
|
||||||
|
}
|
||||||
|
if subTask.Status == task.StatusRunning {
|
||||||
|
st.RunningChildren++
|
||||||
|
indexState[indexName] = st
|
||||||
|
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled")
|
||||||
|
targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled")
|
||||||
|
st.SourceScrollDocs += sourceDocs
|
||||||
|
st.TargetScrollDocs += targetDocs
|
||||||
|
st.TotalScrollDocs += sourceDocs + targetDocs
|
||||||
|
taskStats.TargetScrollDocs += targetDocs
|
||||||
|
taskStats.SourceScrollDocs += sourceDocs
|
||||||
|
indexState[indexName] = st
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(indexMigrationTaskIDs) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled")
|
|
||||||
targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled")
|
taskQuery = util.MapStr{
|
||||||
totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs")
|
"size": len(indexMigrationTaskIDs) * 2,
|
||||||
taskStats.SourceTotalDocs += cfg.Source.DocCount
|
"query": util.MapStr{
|
||||||
taskStats.SourceScrollDocs += sourceDocs
|
"bool": util.MapStr{
|
||||||
taskStats.TargetTotalDocs += cfg.Target.DocCount
|
"must": []util.MapStr{
|
||||||
taskStats.TargetScrollDocs += targetDocs
|
{
|
||||||
taskStats.TotalDiffDocs += totalDiffDocs
|
"terms": util.MapStr{
|
||||||
st := indexState[indexName]
|
"parent_id": indexMigrationTaskIDs,
|
||||||
st.SourceTotalDocs += cfg.Source.DocCount
|
},
|
||||||
st.SourceScrollDocs += sourceDocs
|
},
|
||||||
st.TargetTotalDocs += cfg.Target.DocCount
|
},
|
||||||
st.TargetScrollDocs += targetDocs
|
},
|
||||||
st.TotalDiffDocs += totalDiffDocs
|
},
|
||||||
if subTask.Status == task.StatusError {
|
|
||||||
st.ErrorPartitions += 1
|
|
||||||
}
|
}
|
||||||
|
subTasks, err = migration_util.GetTasks(taskQuery)
|
||||||
|
if err != nil {
|
||||||
|
return taskStats, indexState, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, subTask := range subTasks {
|
||||||
|
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
||||||
|
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
||||||
|
if indexName == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
pipelineIndexNames[subTask.ID] = indexName
|
||||||
|
|
||||||
|
if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
|
||||||
|
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
|
||||||
|
for pipelineID, pipelineContext := range pipelineContexts {
|
||||||
|
// add scrolledDocs of running tasks
|
||||||
|
scrollDocs := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs")
|
||||||
|
indexName := pipelineIndexNames[pipelineID]
|
||||||
|
st := indexState[indexName]
|
||||||
|
st.TotalScrollDocs += scrollDocs
|
||||||
indexState[indexName] = st
|
indexState[indexName] = st
|
||||||
}
|
}
|
||||||
|
|
||||||
return taskStats, indexState, nil
|
return taskStats, indexState, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,6 +301,7 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
|
||||||
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating)
|
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating)
|
||||||
|
|
||||||
var partitionTaskInfos []util.MapStr
|
var partitionTaskInfos []util.MapStr
|
||||||
|
var workers = map[string]struct{}{}
|
||||||
|
|
||||||
for i, subTask := range subTasks {
|
for i, subTask := range subTasks {
|
||||||
cfg := migration_model.IndexComparisonTaskConfig{}
|
cfg := migration_model.IndexComparisonTaskConfig{}
|
||||||
|
@ -242,6 +313,10 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
taskInfo.Step = cfg.Source.Step
|
taskInfo.Step = cfg.Source.Step
|
||||||
}
|
}
|
||||||
|
instID := migration_util.GetMapStringValue(subTask.Metadata.Labels, "execution_instance_id")
|
||||||
|
if instID != "" {
|
||||||
|
workers[instID] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
var durationInMS int64
|
var durationInMS int64
|
||||||
var subCompletedTime int64
|
var subCompletedTime int64
|
||||||
|
@ -256,6 +331,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
|
||||||
subTaskLabels := util.MapStr(subTask.Metadata.Labels)
|
subTaskLabels := util.MapStr(subTask.Metadata.Labels)
|
||||||
sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled")
|
sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled")
|
||||||
targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled")
|
targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled")
|
||||||
|
onlyInSource := migration_util.GetMapIntValue(subTaskLabels, "only_in_source")
|
||||||
|
onlyInTarget := migration_util.GetMapIntValue(subTaskLabels, "only_in_target")
|
||||||
|
diffBoth := migration_util.GetMapIntValue(subTaskLabels, "diff_both")
|
||||||
|
|
||||||
partitionTaskInfo := util.MapStr{
|
partitionTaskInfo := util.MapStr{
|
||||||
"task_id": subTask.ID,
|
"task_id": subTask.ID,
|
||||||
|
@ -267,6 +345,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
|
||||||
"duration": durationInMS,
|
"duration": durationInMS,
|
||||||
"source_total_docs": cfg.Source.DocCount,
|
"source_total_docs": cfg.Source.DocCount,
|
||||||
"target_total_docs": cfg.Target.DocCount,
|
"target_total_docs": cfg.Target.DocCount,
|
||||||
|
"only_in_source": onlyInSource,
|
||||||
|
"only_in_target": onlyInTarget,
|
||||||
|
"diff_both": diffBoth,
|
||||||
}
|
}
|
||||||
sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg)
|
sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg)
|
||||||
if sourceDumpTask != nil {
|
if sourceDumpTask != nil {
|
||||||
|
@ -305,6 +386,14 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
|
||||||
if taskInfo.StartTime == 0 {
|
if taskInfo.StartTime == 0 {
|
||||||
taskInfo.StartTime = startTime
|
taskInfo.StartTime = startTime
|
||||||
}
|
}
|
||||||
|
for _, node := range taskConfig.Settings.Execution.Nodes.Permit {
|
||||||
|
if _, ok := workers[node.ID]; ok {
|
||||||
|
taskInfo.Workers = append(taskInfo.Workers, util.MapStr{
|
||||||
|
"id": node.ID,
|
||||||
|
"name": node.Name,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
taskInfo.Partitions = partitionTaskInfos
|
taskInfo.Partitions = partitionTaskInfos
|
||||||
taskInfo.CompletedPartitions = completedPartitions
|
taskInfo.CompletedPartitions = completedPartitions
|
||||||
h.WriteJSON(w, taskInfo, http.StatusOK)
|
h.WriteJSON(w, taskInfo, http.StatusOK)
|
||||||
|
|
|
@ -362,6 +362,9 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
|
||||||
if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 {
|
if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
taskItem.Metadata.Labels["total_diff_docs"] = onlyInSource + onlyInTarget + diffBoth
|
taskItem.Metadata.Labels["total_diff_docs"] = onlyInSource + onlyInTarget + diffBoth
|
||||||
|
taskItem.Metadata.Labels["only_in_source"] = onlyInSource
|
||||||
|
taskItem.Metadata.Labels["only_in_target"] = onlyInTarget
|
||||||
|
taskItem.Metadata.Labels["diff_both"] = diffBoth
|
||||||
taskItem.CompletedTime = &now
|
taskItem.CompletedTime = &now
|
||||||
taskItem.Status = task.StatusError
|
taskItem.Status = task.StatusError
|
||||||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
|
@ -390,6 +393,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
taskItem.CompletedTime = &now
|
taskItem.CompletedTime = &now
|
||||||
taskItem.Status = task.StatusComplete
|
taskItem.Status = task.StatusComplete
|
||||||
|
taskItem.Metadata.Labels["total_diff_docs"] = 0
|
||||||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
Success: true,
|
Success: true,
|
||||||
}, "index comparison completed")
|
}, "index comparison completed")
|
||||||
|
|
|
@ -5,6 +5,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClusterComparisonTaskConfig struct {
|
type ClusterComparisonTaskConfig struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Tags []string `json:"tags"`
|
||||||
Cluster struct {
|
Cluster struct {
|
||||||
Source ClusterInfo `json:"source"`
|
Source ClusterInfo `json:"source"`
|
||||||
Target ClusterInfo `json:"target"`
|
Target ClusterInfo `json:"target"`
|
||||||
|
@ -29,8 +31,10 @@ type ClusterComparisonIndexConfig struct {
|
||||||
Partition *IndexPartition `json:"partition,omitempty"`
|
Partition *IndexPartition `json:"partition,omitempty"`
|
||||||
|
|
||||||
// only used in API
|
// only used in API
|
||||||
ScrollPercent float64 `json:"scroll_percent,omitempty"`
|
ScrollPercent float64 `json:"scroll_percent,omitempty"`
|
||||||
|
TotalScrollDocs int64 `json:"total_scroll_docs,omitempty"`
|
||||||
ErrorPartitions int `json:"error_partitions,omitempty"`
|
ErrorPartitions int `json:"error_partitions,omitempty"`
|
||||||
|
RunningChildren int `json:"running_children,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type IndexComparisonTaskConfig struct {
|
type IndexComparisonTaskConfig struct {
|
||||||
|
|
|
@ -242,7 +242,7 @@ func UpdateStoppedChildTasksToReady(taskItem *task.Task, taskType string) error
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"terms": util.MapStr{
|
"terms": util.MapStr{
|
||||||
"status": []string{task.StatusError, task.StatusStopped},
|
"status": []string{task.StatusError, task.StatusStopped, task.StatusComplete},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue