diff --git a/plugin/task_manager/api.go b/plugin/task_manager/api.go index e4f4cace..2279250a 100644 --- a/plugin/task_manager/api.go +++ b/plugin/task_manager/api.go @@ -20,6 +20,8 @@ func InitAPI() { api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionMigrationTaskWrite)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/logging/:index", handler.RequirePermission(handler.searchIndexLevelTaskLogging, enum.PermissionMigrationTaskRead)) + api.HandleAPIMethod(api.GET, "/migration/data/_search_values", handler.RequirePermission(handler.searchTaskFieldValues("cluster_migration"), enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead)) api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite)) diff --git a/plugin/task_manager/cluster_migration/cluster_migration.go b/plugin/task_manager/cluster_migration/cluster_migration.go index c3f57291..4424c0cd 100644 --- a/plugin/task_manager/cluster_migration/cluster_migration.go +++ b/plugin/task_manager/cluster_migration/cluster_migration.go @@ -130,6 +130,13 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { } if index.Incremental != nil { incrementalFilter, err := index.Incremental.BuildFilter(current, step) + if source.Step == nil { + source.Step = step.String() + source.End = float64(current - index.Incremental.Delay.Milliseconds()) + if !index.Incremental.Full { + source.Start = source.End - float64(step.Milliseconds()) + } + } if err != nil { return err } diff --git a/plugin/task_manager/cluster_migration/orm.go b/plugin/task_manager/cluster_migration/orm.go index 29b7464d..ca929772 100644 --- a/plugin/task_manager/cluster_migration/orm.go +++ b/plugin/task_manager/cluster_migration/orm.go @@ -78,7 +78,6 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac config.Cluster.Source.Distribution = srcClusterCfg.Distribution dstClusterCfg := elastic.GetConfig(config.Cluster.Target.Id) config.Cluster.Target.Distribution = dstClusterCfg.Distribution - clearTaskConfig(config) var totalDocs int64 @@ -102,6 +101,7 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac "target_cluster_id": config.Cluster.Target.Id, "source_total_docs": totalDocs, "permit_nodes": config.Settings.Execution.Nodes.Permit, + "name": config.Name, }, }, Cancellable: true, @@ -109,6 +109,9 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac Status: task.StatusInit, ConfigString: util.MustToJSON(config), } + if len(config.Tags) > 0 { + t.Metadata.Labels["tags"] = config.Tags + } t.ID = util.GetUUID() return &t, nil } diff --git a/plugin/task_manager/common_api.go b/plugin/task_manager/common_api.go index 08241dda..6d2c1bd5 100644 --- a/plugin/task_manager/common_api.go +++ b/plugin/task_manager/common_api.go @@ -3,6 +3,8 @@ package task_manager import ( "errors" "fmt" + migration_model "infini.sh/console/plugin/task_manager/model" + "infini.sh/framework/core/global" "net/http" "strconv" "strings" @@ -31,6 +33,9 @@ type TaskInfoResponse struct { CompletedPartitions int `json:"completed_partitions"` Partitions []util.MapStr `json:"partitions"` Repeating bool `json:"repeating"` + Workers []util.MapStr `json:"workers"` + Incremental *migration_model.IndexIncremental `json:"incremental"` + NextRunTime int64 `json:"next_run_time"` } func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -105,6 +110,7 @@ func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + for _, hit := range searchRes.Hits.Hits { sourceM := util.MapStr(hit.Source) h.populateMajorTaskInfo(hit.ID, sourceM) @@ -122,6 +128,12 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { log.Errorf("failed to unmarshal major task info, err: %v", err) return } + _, repeatStatus, err := h.calcRepeatingStatus(&majorTask) + if err != nil { + log.Warnf("failed to calc repeat info, err: %v", err) + return + } + sourceM.Put("repeat", repeatStatus) switch majorTask.Metadata.Type { case "cluster_migration": ts, _, err := h.getMigrationMajorTaskInfo(taskID) @@ -138,6 +150,15 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { return } sourceM.Put("running_children", count) + if repeatStatus.IsRepeat && repeatStatus.LastRunChildTaskID != "" { + ts, _, err = h.getMigrationMajorTaskInfo(repeatStatus.LastRunChildTaskID) + if err != nil { + log.Warnf("fetch progress info of task error: %v", err) + return + } + sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs) + sourceM.Put("metadata.labels.source_total_docs", ts.SourceDocs) + } case "cluster_comparison": ts, _, err := h.getComparisonMajorTaskInfo(taskID) if err != nil { @@ -156,12 +177,6 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { } sourceM.Put("running_children", count) } - _, repeatStatus, err := h.calcRepeatingStatus(&majorTask) - if err != nil { - log.Warnf("failed to calc repeat info, err: %v", err) - return - } - sourceM.Put("repeat", repeatStatus) } func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -332,15 +347,103 @@ func (h *APIHandler) resumeTask(w http.ResponseWriter, req *http.Request, ps htt return } +// query index level task logging +func (h *APIHandler) searchIndexLevelTaskLogging(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("task_id") + uniqueIndexName := ps.MustGetParameter("index") + cfg := global.MustLookup("cluster_migration_config") + var ( + migrationConfig *DispatcherConfig + ok bool + ) + if migrationConfig, ok = cfg.(*DispatcherConfig); !ok { + h.WriteJSON(w, elastic.SearchResponse{}, http.StatusOK) + return + } + client := elastic.GetClient(migrationConfig.Elasticsearch) + var ( + strSize = h.GetParameterOrDefault(req, "size", "500") + min = h.GetParameterOrDefault(req, "min", "") + max = h.GetParameterOrDefault(req, "max", "") + ) + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 500 + } + rangeObj := util.MapStr{} + if min != "" { + rangeObj["gte"] = min + } + if max != "" { + rangeObj["lt"] = max + } + mustQ := []util.MapStr{ + { + "term": util.MapStr{ + "metadata.category": util.MapStr{ + "value": "task", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.parent_task_id": util.MapStr{ + "value": id, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.unique_index_name": util.MapStr{ + "value": uniqueIndexName, + }, + }, + }, + } + if len(rangeObj) > 0 { + mustQ = append(mustQ, util.MapStr{ + "range": util.MapStr{ + "timestamp": rangeObj, + }, + }) + } + query := util.MapStr{ + "size": size, + "_source": []string{"payload.task.logging.message", "timestamp"}, + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": mustQ, + }, + }, + } + searchRes, err := client.SearchWithRawQueryDSL(migrationConfig.LogIndexName, util.MustToJSONBytes(query)) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteJSON(w, searchRes, http.StatusOK) +} + type RepeatStatus struct { IsRepeat bool `json:"is_repeat"` Done bool `json:"done"` Repeating bool `json:"repeating"` + LastRunTime int64 `json:"last_run_time"` + NextRunTime int64 `json:"next_run_time"` + LastRunChildTaskID string `json:"last_run_child_task_id"` } func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *RepeatStatus, error) { ret := &RepeatStatus{} - lastRepeatingChild, err := migration_util.GetLastRepeatingChildTask(taskItem.ID, taskItem.Metadata.Type) + lastRepeatingChild, lastRunChild, err := migration_util.GetLastRepeatingChildTask(taskItem.ID, taskItem.Metadata.Type) if err != nil { return nil, nil, err } @@ -363,6 +466,14 @@ func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *Repe if !repeatTriggered { ret.Repeating = true } + ret.NextRunTime = migration_util.GetMapIntValue(lastRepeatingChild.Metadata.Labels, "next_run_time") + ret.LastRunTime = lastRepeatingChild.StartTimeInMillis + if ret.LastRunTime == 0 && lastRunChild != nil { + ret.LastRunTime = lastRunChild.StartTimeInMillis + } + if lastRunChild != nil { + ret.LastRunChildTaskID = lastRunChild.ID + } return lastRepeatingChild, ret, nil } @@ -615,3 +726,71 @@ func (h *APIHandler) calcMajorTaskInfo(subTasks []task.Task, repeating bool) (st return } + +func (h *APIHandler) searchTaskFieldValues(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ( + field = h.GetParameterOrDefault(req, "field", "") + keyword = h.GetParameterOrDefault(req, "keyword", "") + mustQ []interface{} + ) + mustQ = append(mustQ, util.MapStr{ + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": taskType, + }, + }, + }) + + if v := strings.TrimSpace(keyword); v != ""{ + mustQ = append(mustQ, util.MapStr{ + "query_string": util.MapStr{ + "default_field": field, + "query": fmt.Sprintf("*%s*", v), + }, + }) + } + queryDSL := util.MapStr{ + "aggs": util.MapStr{ + "items": util.MapStr{ + "terms": util.MapStr{ + "field": field, + "size": 20, + }, + }, + }, + "size": 0, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": mustQ, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(queryDSL), + } + err, result := orm.Search(task.Task{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + searchRes := elastic.SearchResponse{} + err = util.FromJSONBytes(result.Raw, &searchRes) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + items := []string{} + for _, bk := range searchRes.Aggregations["items"].Buckets { + if v, ok := bk["key"].(string); ok { + if strings.Contains(v, keyword){ + items = append(items, v) + } + } + } + h.WriteJSON(w, items, http.StatusOK) + } +} + diff --git a/plugin/task_manager/migration_api.go b/plugin/task_manager/migration_api.go index 0c702365..cfe63465 100644 --- a/plugin/task_manager/migration_api.go +++ b/plugin/task_manager/migration_api.go @@ -3,6 +3,7 @@ package task_manager import ( "fmt" "net/http" + "strings" "time" log "github.com/cihub/seelog" @@ -78,18 +79,26 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R count := indexState[indexName].IndexDocs sourceDocs := index.Source.Docs var percent float64 + var exportedPercent float64 if sourceDocs <= 0 { percent = 100 + exportedPercent = 100 }else{ percent = float64(count) / float64(sourceDocs) * 100 if percent > 100 { percent = 100 } + exportedPercent = float64(indexState[indexName].ScrollDocs)/float64(sourceDocs) * 100 + if exportedPercent > 100 { + exportedPercent = 100 + } } //taskConfig.Indices[i].Source.Docs = sourceDocs taskConfig.Indices[i].Target.Docs = count taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions + taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren + taskConfig.Indices[i].ExportedPercent = util.ToFixed(exportedPercent, 2) if count == index.Source.Docs { completedIndices++ } @@ -141,6 +150,21 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt StartTime: majorTask.StartTimeInMillis, Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels), } + if taskInfo.Repeating { + _, repeatStatus, err := h.calcRepeatingStatus(&majorTask) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + taskInfo.NextRunTime = repeatStatus.NextRunTime + } + indexParts := strings.Split(uniqueIndexName, ":") + for _, index := range taskConfig.Indices { + if index.Source.Name == indexParts[0] { + taskInfo.Incremental = index.Incremental + } + } subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName) @@ -167,6 +191,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating) var partitionTaskInfos []util.MapStr + var workers = map[string]struct{}{} for i, ptask := range subTasks { cfg := migration_model.IndexMigrationTaskConfig{} @@ -178,7 +203,10 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt if i == 0 { taskInfo.Step = cfg.Source.Step } - + instID := migration_util.GetMapStringValue(ptask.Metadata.Labels, "execution_instance_id") + if instID != "" { + workers[instID] = struct{}{} + } var durationInMS int64 var subCompletedTime int64 if ptask.StartTimeInMillis > 0 { @@ -241,6 +269,14 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt } taskInfo.Partitions = partitionTaskInfos taskInfo.CompletedPartitions = completedPartitions + for _, node := range taskConfig.Settings.Execution.Nodes.Permit { + if _, ok := workers[node.ID]; ok { + taskInfo.Workers = append(taskInfo.Workers, util.MapStr{ + "id": node.ID, + "name": node.Name, + }) + } + } h.WriteJSON(w, taskInfo, http.StatusOK) } @@ -248,6 +284,8 @@ type MigrationIndexStateInfo struct { ErrorPartitions int IndexDocs int64 SourceDocs int64 + RunningChildren int + ScrollDocs int64 } /* @@ -324,9 +362,12 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m taskStats.SourceDocs += cfg.Source.DocCount st := indexState[indexName] st.SourceDocs += cfg.Source.DocCount - indexState[indexName] = st + scrollDocs := migration_util.GetMapIntValue(taskLabels, "scrolled_docs") + st.ScrollDocs += scrollDocs if subTask.Status == task.StatusRunning { + st.RunningChildren++ + indexState[indexName] = st indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) continue } @@ -334,6 +375,7 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs") taskStats.IndexDocs += indexDocs st.IndexDocs += indexDocs + if subTask.Status == task.StatusError { st.ErrorPartitions += 1 taskStats.ErrorPartitions += 1 @@ -347,7 +389,7 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m } taskQuery = util.MapStr{ - "size": len(indexMigrationTaskIDs), + "size": len(indexMigrationTaskIDs) * 2, "query": util.MapStr{ "bool": util.MapStr{ "must": []util.MapStr{ @@ -356,13 +398,13 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m "parent_id": indexMigrationTaskIDs, }, }, - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "bulk_indexing", - }, - }, - }, + //{ + // "term": util.MapStr{ + // "metadata.labels.pipeline_id": util.MapStr{ + // "value": "bulk_indexing", + // }, + // }, + //}, }, }, }, @@ -391,10 +433,12 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m for pipelineID, pipelineContext := range pipelineContexts { // add indexDocs of running tasks indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count") + scrollDocs := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs") taskStats.IndexDocs += indexDocs indexName := pipelineIndexNames[pipelineID] st := indexState[indexName] st.IndexDocs += indexDocs + st.ScrollDocs += scrollDocs indexState[indexName] = st } return taskStats, indexState, nil diff --git a/plugin/task_manager/model/migration.go b/plugin/task_manager/model/migration.go index 5e011112..e8e9e1ab 100644 --- a/plugin/task_manager/model/migration.go +++ b/plugin/task_manager/model/migration.go @@ -9,6 +9,8 @@ import ( ) type ClusterMigrationTaskConfig struct { + Name string `json:"name"` + Tags []string `json:"tags"` Cluster struct { Source ClusterInfo `json:"source"` Target ClusterInfo `json:"target"` @@ -39,6 +41,8 @@ type ClusterMigrationIndexConfig struct { // only used in API Percent float64 `json:"percent,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` + RunningChildren int `json:"running_children,omitempty"` + ExportedPercent float64 `json:"exported_percent,omitempty"` } type ClusterMigrationTaskState struct { diff --git a/plugin/task_manager/util/orm.go b/plugin/task_manager/util/orm.go index 55992057..99506721 100644 --- a/plugin/task_manager/util/orm.go +++ b/plugin/task_manager/util/orm.go @@ -38,9 +38,9 @@ func DeleteChildTasks(taskID string, taskType string) error { return nil } -func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, error) { +func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, *task.Task, error) { queryDsl := util.MapStr{ - "size": 1, + "size": 2, "sort": []util.MapStr{ { "metadata.labels.next_run_time": util.MapStr{ @@ -69,12 +69,21 @@ func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, erro } tasks, err := GetTasks(queryDsl) if err != nil { - return nil, err + return nil, nil, err } if len(tasks) == 0 { - return nil, nil + return nil, nil, nil } - return &tasks[0], nil + var lastRunChildTask *task.Task + if tasks[0].StartTimeInMillis > 0 { + lastRunChildTask = &tasks[0] + }else{ + if len(tasks) == 2 { + lastRunChildTask = &tasks[1] + } + } + + return &tasks[0], lastRunChildTask, nil } func GetPendingChildTasks(taskID string, taskType string) ([]task.Task, error) {