migration optimize (#145)

- add task field name and tags

- add index logging level api

- other migration optimize

Reviewed-on: https://git.infini.ltd:64443/infini/console/pulls/145
Co-authored-by: liugq <silenceqi@hotmail.com>
Co-committed-by: liugq <silenceqi@hotmail.com>
This commit is contained in:
liugq 2023-09-19 15:32:50 +08:00 committed by medcl
parent 91dad0b302
commit 6557286bdd
7 changed files with 271 additions and 23 deletions

View File

@ -20,6 +20,8 @@ func InitAPI() {
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionMigrationTaskWrite)) api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionMigrationTaskWrite))
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead))
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead))
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/logging/:index", handler.RequirePermission(handler.searchIndexLevelTaskLogging, enum.PermissionMigrationTaskRead))
api.HandleAPIMethod(api.GET, "/migration/data/_search_values", handler.RequirePermission(handler.searchTaskFieldValues("cluster_migration"), enum.PermissionMigrationTaskRead))
api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead)) api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead))
api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite)) api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite))

View File

@ -130,6 +130,13 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error {
} }
if index.Incremental != nil { if index.Incremental != nil {
incrementalFilter, err := index.Incremental.BuildFilter(current, step) incrementalFilter, err := index.Incremental.BuildFilter(current, step)
if source.Step == nil {
source.Step = step.String()
source.End = float64(current - index.Incremental.Delay.Milliseconds())
if !index.Incremental.Full {
source.Start = source.End - float64(step.Milliseconds())
}
}
if err != nil { if err != nil {
return err return err
} }

View File

@ -78,7 +78,6 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac
config.Cluster.Source.Distribution = srcClusterCfg.Distribution config.Cluster.Source.Distribution = srcClusterCfg.Distribution
dstClusterCfg := elastic.GetConfig(config.Cluster.Target.Id) dstClusterCfg := elastic.GetConfig(config.Cluster.Target.Id)
config.Cluster.Target.Distribution = dstClusterCfg.Distribution config.Cluster.Target.Distribution = dstClusterCfg.Distribution
clearTaskConfig(config) clearTaskConfig(config)
var totalDocs int64 var totalDocs int64
@ -102,6 +101,7 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac
"target_cluster_id": config.Cluster.Target.Id, "target_cluster_id": config.Cluster.Target.Id,
"source_total_docs": totalDocs, "source_total_docs": totalDocs,
"permit_nodes": config.Settings.Execution.Nodes.Permit, "permit_nodes": config.Settings.Execution.Nodes.Permit,
"name": config.Name,
}, },
}, },
Cancellable: true, Cancellable: true,
@ -109,6 +109,9 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac
Status: task.StatusInit, Status: task.StatusInit,
ConfigString: util.MustToJSON(config), ConfigString: util.MustToJSON(config),
} }
if len(config.Tags) > 0 {
t.Metadata.Labels["tags"] = config.Tags
}
t.ID = util.GetUUID() t.ID = util.GetUUID()
return &t, nil return &t, nil
} }

View File

@ -3,6 +3,8 @@ package task_manager
import ( import (
"errors" "errors"
"fmt" "fmt"
migration_model "infini.sh/console/plugin/task_manager/model"
"infini.sh/framework/core/global"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
@ -31,6 +33,9 @@ type TaskInfoResponse struct {
CompletedPartitions int `json:"completed_partitions"` CompletedPartitions int `json:"completed_partitions"`
Partitions []util.MapStr `json:"partitions"` Partitions []util.MapStr `json:"partitions"`
Repeating bool `json:"repeating"` Repeating bool `json:"repeating"`
Workers []util.MapStr `json:"workers"`
Incremental *migration_model.IndexIncremental `json:"incremental"`
NextRunTime int64 `json:"next_run_time"`
} }
func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -105,6 +110,7 @@ func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
return return
} }
for _, hit := range searchRes.Hits.Hits { for _, hit := range searchRes.Hits.Hits {
sourceM := util.MapStr(hit.Source) sourceM := util.MapStr(hit.Source)
h.populateMajorTaskInfo(hit.ID, sourceM) h.populateMajorTaskInfo(hit.ID, sourceM)
@ -122,6 +128,12 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) {
log.Errorf("failed to unmarshal major task info, err: %v", err) log.Errorf("failed to unmarshal major task info, err: %v", err)
return return
} }
_, repeatStatus, err := h.calcRepeatingStatus(&majorTask)
if err != nil {
log.Warnf("failed to calc repeat info, err: %v", err)
return
}
sourceM.Put("repeat", repeatStatus)
switch majorTask.Metadata.Type { switch majorTask.Metadata.Type {
case "cluster_migration": case "cluster_migration":
ts, _, err := h.getMigrationMajorTaskInfo(taskID) ts, _, err := h.getMigrationMajorTaskInfo(taskID)
@ -138,6 +150,15 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) {
return return
} }
sourceM.Put("running_children", count) sourceM.Put("running_children", count)
if repeatStatus.IsRepeat && repeatStatus.LastRunChildTaskID != "" {
ts, _, err = h.getMigrationMajorTaskInfo(repeatStatus.LastRunChildTaskID)
if err != nil {
log.Warnf("fetch progress info of task error: %v", err)
return
}
sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs)
sourceM.Put("metadata.labels.source_total_docs", ts.SourceDocs)
}
case "cluster_comparison": case "cluster_comparison":
ts, _, err := h.getComparisonMajorTaskInfo(taskID) ts, _, err := h.getComparisonMajorTaskInfo(taskID)
if err != nil { if err != nil {
@ -156,12 +177,6 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) {
} }
sourceM.Put("running_children", count) sourceM.Put("running_children", count)
} }
_, repeatStatus, err := h.calcRepeatingStatus(&majorTask)
if err != nil {
log.Warnf("failed to calc repeat info, err: %v", err)
return
}
sourceM.Put("repeat", repeatStatus)
} }
func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -332,15 +347,103 @@ func (h *APIHandler) resumeTask(w http.ResponseWriter, req *http.Request, ps htt
return return
} }
// query index level task logging
func (h *APIHandler) searchIndexLevelTaskLogging(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id")
uniqueIndexName := ps.MustGetParameter("index")
cfg := global.MustLookup("cluster_migration_config")
var (
migrationConfig *DispatcherConfig
ok bool
)
if migrationConfig, ok = cfg.(*DispatcherConfig); !ok {
h.WriteJSON(w, elastic.SearchResponse{}, http.StatusOK)
return
}
client := elastic.GetClient(migrationConfig.Elasticsearch)
var (
strSize = h.GetParameterOrDefault(req, "size", "500")
min = h.GetParameterOrDefault(req, "min", "")
max = h.GetParameterOrDefault(req, "max", "")
)
size, _ := strconv.Atoi(strSize)
if size <= 0 {
size = 500
}
rangeObj := util.MapStr{}
if min != "" {
rangeObj["gte"] = min
}
if max != "" {
rangeObj["lt"] = max
}
mustQ := []util.MapStr{
{
"term": util.MapStr{
"metadata.category": util.MapStr{
"value": "task",
},
},
},
{
"term": util.MapStr{
"metadata.labels.parent_task_id": util.MapStr{
"value": id,
},
},
},
{
"term": util.MapStr{
"metadata.labels.unique_index_name": util.MapStr{
"value": uniqueIndexName,
},
},
},
}
if len(rangeObj) > 0 {
mustQ = append(mustQ, util.MapStr{
"range": util.MapStr{
"timestamp": rangeObj,
},
})
}
query := util.MapStr{
"size": size,
"_source": []string{"payload.task.logging.message", "timestamp"},
"sort": []util.MapStr{
{
"timestamp": util.MapStr{
"order": "desc",
},
},
},
"query": util.MapStr{
"bool": util.MapStr{
"must": mustQ,
},
},
}
searchRes, err := client.SearchWithRawQueryDSL(migrationConfig.LogIndexName, util.MustToJSONBytes(query))
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
h.WriteJSON(w, searchRes, http.StatusOK)
}
type RepeatStatus struct { type RepeatStatus struct {
IsRepeat bool `json:"is_repeat"` IsRepeat bool `json:"is_repeat"`
Done bool `json:"done"` Done bool `json:"done"`
Repeating bool `json:"repeating"` Repeating bool `json:"repeating"`
LastRunTime int64 `json:"last_run_time"`
NextRunTime int64 `json:"next_run_time"`
LastRunChildTaskID string `json:"last_run_child_task_id"`
} }
func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *RepeatStatus, error) { func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *RepeatStatus, error) {
ret := &RepeatStatus{} ret := &RepeatStatus{}
lastRepeatingChild, err := migration_util.GetLastRepeatingChildTask(taskItem.ID, taskItem.Metadata.Type) lastRepeatingChild, lastRunChild, err := migration_util.GetLastRepeatingChildTask(taskItem.ID, taskItem.Metadata.Type)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -363,6 +466,14 @@ func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *Repe
if !repeatTriggered { if !repeatTriggered {
ret.Repeating = true ret.Repeating = true
} }
ret.NextRunTime = migration_util.GetMapIntValue(lastRepeatingChild.Metadata.Labels, "next_run_time")
ret.LastRunTime = lastRepeatingChild.StartTimeInMillis
if ret.LastRunTime == 0 && lastRunChild != nil {
ret.LastRunTime = lastRunChild.StartTimeInMillis
}
if lastRunChild != nil {
ret.LastRunChildTaskID = lastRunChild.ID
}
return lastRepeatingChild, ret, nil return lastRepeatingChild, ret, nil
} }
@ -615,3 +726,71 @@ func (h *APIHandler) calcMajorTaskInfo(subTasks []task.Task, repeating bool) (st
return return
} }
func (h *APIHandler) searchTaskFieldValues(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
var (
field = h.GetParameterOrDefault(req, "field", "")
keyword = h.GetParameterOrDefault(req, "keyword", "")
mustQ []interface{}
)
mustQ = append(mustQ, util.MapStr{
"term": util.MapStr{
"metadata.type": util.MapStr{
"value": taskType,
},
},
})
if v := strings.TrimSpace(keyword); v != ""{
mustQ = append(mustQ, util.MapStr{
"query_string": util.MapStr{
"default_field": field,
"query": fmt.Sprintf("*%s*", v),
},
})
}
queryDSL := util.MapStr{
"aggs": util.MapStr{
"items": util.MapStr{
"terms": util.MapStr{
"field": field,
"size": 20,
},
},
},
"size": 0,
"query": util.MapStr{
"bool": util.MapStr{
"must": mustQ,
},
},
}
q := orm.Query{
RawQuery: util.MustToJSONBytes(queryDSL),
}
err, result := orm.Search(task.Task{}, &q)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
searchRes := elastic.SearchResponse{}
err = util.FromJSONBytes(result.Raw, &searchRes)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
items := []string{}
for _, bk := range searchRes.Aggregations["items"].Buckets {
if v, ok := bk["key"].(string); ok {
if strings.Contains(v, keyword){
items = append(items, v)
}
}
}
h.WriteJSON(w, items, http.StatusOK)
}
}

View File

@ -3,6 +3,7 @@ package task_manager
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"strings"
"time" "time"
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
@ -78,18 +79,26 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
count := indexState[indexName].IndexDocs count := indexState[indexName].IndexDocs
sourceDocs := index.Source.Docs sourceDocs := index.Source.Docs
var percent float64 var percent float64
var exportedPercent float64
if sourceDocs <= 0 { if sourceDocs <= 0 {
percent = 100 percent = 100
exportedPercent = 100
}else{ }else{
percent = float64(count) / float64(sourceDocs) * 100 percent = float64(count) / float64(sourceDocs) * 100
if percent > 100 { if percent > 100 {
percent = 100 percent = 100
} }
exportedPercent = float64(indexState[indexName].ScrollDocs)/float64(sourceDocs) * 100
if exportedPercent > 100 {
exportedPercent = 100
}
} }
//taskConfig.Indices[i].Source.Docs = sourceDocs //taskConfig.Indices[i].Source.Docs = sourceDocs
taskConfig.Indices[i].Target.Docs = count taskConfig.Indices[i].Target.Docs = count
taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) taskConfig.Indices[i].Percent = util.ToFixed(percent, 2)
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren
taskConfig.Indices[i].ExportedPercent = util.ToFixed(exportedPercent, 2)
if count == index.Source.Docs { if count == index.Source.Docs {
completedIndices++ completedIndices++
} }
@ -141,6 +150,21 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
StartTime: majorTask.StartTimeInMillis, StartTime: majorTask.StartTimeInMillis,
Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels), Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels),
} }
if taskInfo.Repeating {
_, repeatStatus, err := h.calcRepeatingStatus(&majorTask)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
taskInfo.NextRunTime = repeatStatus.NextRunTime
}
indexParts := strings.Split(uniqueIndexName, ":")
for _, index := range taskConfig.Indices {
if index.Source.Name == indexParts[0] {
taskInfo.Incremental = index.Incremental
}
}
subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName) subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName)
@ -167,6 +191,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating) startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating)
var partitionTaskInfos []util.MapStr var partitionTaskInfos []util.MapStr
var workers = map[string]struct{}{}
for i, ptask := range subTasks { for i, ptask := range subTasks {
cfg := migration_model.IndexMigrationTaskConfig{} cfg := migration_model.IndexMigrationTaskConfig{}
@ -178,7 +203,10 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
if i == 0 { if i == 0 {
taskInfo.Step = cfg.Source.Step taskInfo.Step = cfg.Source.Step
} }
instID := migration_util.GetMapStringValue(ptask.Metadata.Labels, "execution_instance_id")
if instID != "" {
workers[instID] = struct{}{}
}
var durationInMS int64 var durationInMS int64
var subCompletedTime int64 var subCompletedTime int64
if ptask.StartTimeInMillis > 0 { if ptask.StartTimeInMillis > 0 {
@ -241,6 +269,14 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
} }
taskInfo.Partitions = partitionTaskInfos taskInfo.Partitions = partitionTaskInfos
taskInfo.CompletedPartitions = completedPartitions taskInfo.CompletedPartitions = completedPartitions
for _, node := range taskConfig.Settings.Execution.Nodes.Permit {
if _, ok := workers[node.ID]; ok {
taskInfo.Workers = append(taskInfo.Workers, util.MapStr{
"id": node.ID,
"name": node.Name,
})
}
}
h.WriteJSON(w, taskInfo, http.StatusOK) h.WriteJSON(w, taskInfo, http.StatusOK)
} }
@ -248,6 +284,8 @@ type MigrationIndexStateInfo struct {
ErrorPartitions int ErrorPartitions int
IndexDocs int64 IndexDocs int64
SourceDocs int64 SourceDocs int64
RunningChildren int
ScrollDocs int64
} }
/* /*
@ -324,9 +362,12 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m
taskStats.SourceDocs += cfg.Source.DocCount taskStats.SourceDocs += cfg.Source.DocCount
st := indexState[indexName] st := indexState[indexName]
st.SourceDocs += cfg.Source.DocCount st.SourceDocs += cfg.Source.DocCount
indexState[indexName] = st scrollDocs := migration_util.GetMapIntValue(taskLabels, "scrolled_docs")
st.ScrollDocs += scrollDocs
if subTask.Status == task.StatusRunning { if subTask.Status == task.StatusRunning {
st.RunningChildren++
indexState[indexName] = st
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
continue continue
} }
@ -334,6 +375,7 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m
indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs") indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs")
taskStats.IndexDocs += indexDocs taskStats.IndexDocs += indexDocs
st.IndexDocs += indexDocs st.IndexDocs += indexDocs
if subTask.Status == task.StatusError { if subTask.Status == task.StatusError {
st.ErrorPartitions += 1 st.ErrorPartitions += 1
taskStats.ErrorPartitions += 1 taskStats.ErrorPartitions += 1
@ -347,7 +389,7 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m
} }
taskQuery = util.MapStr{ taskQuery = util.MapStr{
"size": len(indexMigrationTaskIDs), "size": len(indexMigrationTaskIDs) * 2,
"query": util.MapStr{ "query": util.MapStr{
"bool": util.MapStr{ "bool": util.MapStr{
"must": []util.MapStr{ "must": []util.MapStr{
@ -356,13 +398,13 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m
"parent_id": indexMigrationTaskIDs, "parent_id": indexMigrationTaskIDs,
}, },
}, },
{ //{
"term": util.MapStr{ // "term": util.MapStr{
"metadata.labels.pipeline_id": util.MapStr{ // "metadata.labels.pipeline_id": util.MapStr{
"value": "bulk_indexing", // "value": "bulk_indexing",
}, // },
}, // },
}, //},
}, },
}, },
}, },
@ -391,10 +433,12 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m
for pipelineID, pipelineContext := range pipelineContexts { for pipelineID, pipelineContext := range pipelineContexts {
// add indexDocs of running tasks // add indexDocs of running tasks
indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count") indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count")
scrollDocs := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs")
taskStats.IndexDocs += indexDocs taskStats.IndexDocs += indexDocs
indexName := pipelineIndexNames[pipelineID] indexName := pipelineIndexNames[pipelineID]
st := indexState[indexName] st := indexState[indexName]
st.IndexDocs += indexDocs st.IndexDocs += indexDocs
st.ScrollDocs += scrollDocs
indexState[indexName] = st indexState[indexName] = st
} }
return taskStats, indexState, nil return taskStats, indexState, nil

View File

@ -9,6 +9,8 @@ import (
) )
type ClusterMigrationTaskConfig struct { type ClusterMigrationTaskConfig struct {
Name string `json:"name"`
Tags []string `json:"tags"`
Cluster struct { Cluster struct {
Source ClusterInfo `json:"source"` Source ClusterInfo `json:"source"`
Target ClusterInfo `json:"target"` Target ClusterInfo `json:"target"`
@ -39,6 +41,8 @@ type ClusterMigrationIndexConfig struct {
// only used in API // only used in API
Percent float64 `json:"percent,omitempty"` Percent float64 `json:"percent,omitempty"`
ErrorPartitions int `json:"error_partitions,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"`
RunningChildren int `json:"running_children,omitempty"`
ExportedPercent float64 `json:"exported_percent,omitempty"`
} }
type ClusterMigrationTaskState struct { type ClusterMigrationTaskState struct {

View File

@ -38,9 +38,9 @@ func DeleteChildTasks(taskID string, taskType string) error {
return nil return nil
} }
func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, error) { func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, *task.Task, error) {
queryDsl := util.MapStr{ queryDsl := util.MapStr{
"size": 1, "size": 2,
"sort": []util.MapStr{ "sort": []util.MapStr{
{ {
"metadata.labels.next_run_time": util.MapStr{ "metadata.labels.next_run_time": util.MapStr{
@ -69,12 +69,21 @@ func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, erro
} }
tasks, err := GetTasks(queryDsl) tasks, err := GetTasks(queryDsl)
if err != nil { if err != nil {
return nil, err return nil, nil, err
} }
if len(tasks) == 0 { if len(tasks) == 0 {
return nil, nil return nil, nil, nil
} }
return &tasks[0], nil var lastRunChildTask *task.Task
if tasks[0].StartTimeInMillis > 0 {
lastRunChildTask = &tasks[0]
}else{
if len(tasks) == 2 {
lastRunChildTask = &tasks[1]
}
}
return &tasks[0], lastRunChildTask, nil
} }
func GetPendingChildTasks(taskID string, taskType string) ([]task.Task, error) { func GetPendingChildTasks(taskID string, taskType string) ([]task.Task, error) {