From 6557286bddb727fdeb8f68bcb9148e6cdb6a8cb8 Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 19 Sep 2023 15:32:50 +0800 Subject: [PATCH 01/20] migration optimize (#145) - add task field name and tags - add index logging level api - other migration optimize Reviewed-on: https://git.infini.ltd:64443/infini/console/pulls/145 Co-authored-by: liugq Co-committed-by: liugq --- plugin/task_manager/api.go | 2 + .../cluster_migration/cluster_migration.go | 7 + plugin/task_manager/cluster_migration/orm.go | 5 +- plugin/task_manager/common_api.go | 193 +++++++++++++++++- plugin/task_manager/migration_api.go | 64 +++++- plugin/task_manager/model/migration.go | 4 + plugin/task_manager/util/orm.go | 19 +- 7 files changed, 271 insertions(+), 23 deletions(-) diff --git a/plugin/task_manager/api.go b/plugin/task_manager/api.go index e4f4cace..2279250a 100644 --- a/plugin/task_manager/api.go +++ b/plugin/task_manager/api.go @@ -20,6 +20,8 @@ func InitAPI() { api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionMigrationTaskWrite)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/logging/:index", handler.RequirePermission(handler.searchIndexLevelTaskLogging, enum.PermissionMigrationTaskRead)) + api.HandleAPIMethod(api.GET, "/migration/data/_search_values", handler.RequirePermission(handler.searchTaskFieldValues("cluster_migration"), enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead)) api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite)) diff --git a/plugin/task_manager/cluster_migration/cluster_migration.go b/plugin/task_manager/cluster_migration/cluster_migration.go index c3f57291..4424c0cd 100644 --- a/plugin/task_manager/cluster_migration/cluster_migration.go +++ b/plugin/task_manager/cluster_migration/cluster_migration.go @@ -130,6 +130,13 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { } if index.Incremental != nil { incrementalFilter, err := index.Incremental.BuildFilter(current, step) + if source.Step == nil { + source.Step = step.String() + source.End = float64(current - index.Incremental.Delay.Milliseconds()) + if !index.Incremental.Full { + source.Start = source.End - float64(step.Milliseconds()) + } + } if err != nil { return err } diff --git a/plugin/task_manager/cluster_migration/orm.go b/plugin/task_manager/cluster_migration/orm.go index 29b7464d..ca929772 100644 --- a/plugin/task_manager/cluster_migration/orm.go +++ b/plugin/task_manager/cluster_migration/orm.go @@ -78,7 +78,6 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac config.Cluster.Source.Distribution = srcClusterCfg.Distribution dstClusterCfg := elastic.GetConfig(config.Cluster.Target.Id) config.Cluster.Target.Distribution = dstClusterCfg.Distribution - clearTaskConfig(config) var totalDocs int64 @@ -102,6 +101,7 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac "target_cluster_id": config.Cluster.Target.Id, "source_total_docs": totalDocs, "permit_nodes": config.Settings.Execution.Nodes.Permit, + "name": config.Name, }, }, Cancellable: true, @@ -109,6 +109,9 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac Status: task.StatusInit, ConfigString: util.MustToJSON(config), } + if len(config.Tags) > 0 { + t.Metadata.Labels["tags"] = config.Tags + } t.ID = util.GetUUID() return &t, nil } diff --git a/plugin/task_manager/common_api.go b/plugin/task_manager/common_api.go index 08241dda..6d2c1bd5 100644 --- a/plugin/task_manager/common_api.go +++ b/plugin/task_manager/common_api.go @@ -3,6 +3,8 @@ package task_manager import ( "errors" "fmt" + migration_model "infini.sh/console/plugin/task_manager/model" + "infini.sh/framework/core/global" "net/http" "strconv" "strings" @@ -31,6 +33,9 @@ type TaskInfoResponse struct { CompletedPartitions int `json:"completed_partitions"` Partitions []util.MapStr `json:"partitions"` Repeating bool `json:"repeating"` + Workers []util.MapStr `json:"workers"` + Incremental *migration_model.IndexIncremental `json:"incremental"` + NextRunTime int64 `json:"next_run_time"` } func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -105,6 +110,7 @@ func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + for _, hit := range searchRes.Hits.Hits { sourceM := util.MapStr(hit.Source) h.populateMajorTaskInfo(hit.ID, sourceM) @@ -122,6 +128,12 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { log.Errorf("failed to unmarshal major task info, err: %v", err) return } + _, repeatStatus, err := h.calcRepeatingStatus(&majorTask) + if err != nil { + log.Warnf("failed to calc repeat info, err: %v", err) + return + } + sourceM.Put("repeat", repeatStatus) switch majorTask.Metadata.Type { case "cluster_migration": ts, _, err := h.getMigrationMajorTaskInfo(taskID) @@ -138,6 +150,15 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { return } sourceM.Put("running_children", count) + if repeatStatus.IsRepeat && repeatStatus.LastRunChildTaskID != "" { + ts, _, err = h.getMigrationMajorTaskInfo(repeatStatus.LastRunChildTaskID) + if err != nil { + log.Warnf("fetch progress info of task error: %v", err) + return + } + sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs) + sourceM.Put("metadata.labels.source_total_docs", ts.SourceDocs) + } case "cluster_comparison": ts, _, err := h.getComparisonMajorTaskInfo(taskID) if err != nil { @@ -156,12 +177,6 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { } sourceM.Put("running_children", count) } - _, repeatStatus, err := h.calcRepeatingStatus(&majorTask) - if err != nil { - log.Warnf("failed to calc repeat info, err: %v", err) - return - } - sourceM.Put("repeat", repeatStatus) } func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -332,15 +347,103 @@ func (h *APIHandler) resumeTask(w http.ResponseWriter, req *http.Request, ps htt return } +// query index level task logging +func (h *APIHandler) searchIndexLevelTaskLogging(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("task_id") + uniqueIndexName := ps.MustGetParameter("index") + cfg := global.MustLookup("cluster_migration_config") + var ( + migrationConfig *DispatcherConfig + ok bool + ) + if migrationConfig, ok = cfg.(*DispatcherConfig); !ok { + h.WriteJSON(w, elastic.SearchResponse{}, http.StatusOK) + return + } + client := elastic.GetClient(migrationConfig.Elasticsearch) + var ( + strSize = h.GetParameterOrDefault(req, "size", "500") + min = h.GetParameterOrDefault(req, "min", "") + max = h.GetParameterOrDefault(req, "max", "") + ) + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 500 + } + rangeObj := util.MapStr{} + if min != "" { + rangeObj["gte"] = min + } + if max != "" { + rangeObj["lt"] = max + } + mustQ := []util.MapStr{ + { + "term": util.MapStr{ + "metadata.category": util.MapStr{ + "value": "task", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.parent_task_id": util.MapStr{ + "value": id, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.unique_index_name": util.MapStr{ + "value": uniqueIndexName, + }, + }, + }, + } + if len(rangeObj) > 0 { + mustQ = append(mustQ, util.MapStr{ + "range": util.MapStr{ + "timestamp": rangeObj, + }, + }) + } + query := util.MapStr{ + "size": size, + "_source": []string{"payload.task.logging.message", "timestamp"}, + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": mustQ, + }, + }, + } + searchRes, err := client.SearchWithRawQueryDSL(migrationConfig.LogIndexName, util.MustToJSONBytes(query)) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteJSON(w, searchRes, http.StatusOK) +} + type RepeatStatus struct { IsRepeat bool `json:"is_repeat"` Done bool `json:"done"` Repeating bool `json:"repeating"` + LastRunTime int64 `json:"last_run_time"` + NextRunTime int64 `json:"next_run_time"` + LastRunChildTaskID string `json:"last_run_child_task_id"` } func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *RepeatStatus, error) { ret := &RepeatStatus{} - lastRepeatingChild, err := migration_util.GetLastRepeatingChildTask(taskItem.ID, taskItem.Metadata.Type) + lastRepeatingChild, lastRunChild, err := migration_util.GetLastRepeatingChildTask(taskItem.ID, taskItem.Metadata.Type) if err != nil { return nil, nil, err } @@ -363,6 +466,14 @@ func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *Repe if !repeatTriggered { ret.Repeating = true } + ret.NextRunTime = migration_util.GetMapIntValue(lastRepeatingChild.Metadata.Labels, "next_run_time") + ret.LastRunTime = lastRepeatingChild.StartTimeInMillis + if ret.LastRunTime == 0 && lastRunChild != nil { + ret.LastRunTime = lastRunChild.StartTimeInMillis + } + if lastRunChild != nil { + ret.LastRunChildTaskID = lastRunChild.ID + } return lastRepeatingChild, ret, nil } @@ -615,3 +726,71 @@ func (h *APIHandler) calcMajorTaskInfo(subTasks []task.Task, repeating bool) (st return } + +func (h *APIHandler) searchTaskFieldValues(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ( + field = h.GetParameterOrDefault(req, "field", "") + keyword = h.GetParameterOrDefault(req, "keyword", "") + mustQ []interface{} + ) + mustQ = append(mustQ, util.MapStr{ + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": taskType, + }, + }, + }) + + if v := strings.TrimSpace(keyword); v != ""{ + mustQ = append(mustQ, util.MapStr{ + "query_string": util.MapStr{ + "default_field": field, + "query": fmt.Sprintf("*%s*", v), + }, + }) + } + queryDSL := util.MapStr{ + "aggs": util.MapStr{ + "items": util.MapStr{ + "terms": util.MapStr{ + "field": field, + "size": 20, + }, + }, + }, + "size": 0, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": mustQ, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(queryDSL), + } + err, result := orm.Search(task.Task{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + searchRes := elastic.SearchResponse{} + err = util.FromJSONBytes(result.Raw, &searchRes) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + items := []string{} + for _, bk := range searchRes.Aggregations["items"].Buckets { + if v, ok := bk["key"].(string); ok { + if strings.Contains(v, keyword){ + items = append(items, v) + } + } + } + h.WriteJSON(w, items, http.StatusOK) + } +} + diff --git a/plugin/task_manager/migration_api.go b/plugin/task_manager/migration_api.go index 0c702365..cfe63465 100644 --- a/plugin/task_manager/migration_api.go +++ b/plugin/task_manager/migration_api.go @@ -3,6 +3,7 @@ package task_manager import ( "fmt" "net/http" + "strings" "time" log "github.com/cihub/seelog" @@ -78,18 +79,26 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R count := indexState[indexName].IndexDocs sourceDocs := index.Source.Docs var percent float64 + var exportedPercent float64 if sourceDocs <= 0 { percent = 100 + exportedPercent = 100 }else{ percent = float64(count) / float64(sourceDocs) * 100 if percent > 100 { percent = 100 } + exportedPercent = float64(indexState[indexName].ScrollDocs)/float64(sourceDocs) * 100 + if exportedPercent > 100 { + exportedPercent = 100 + } } //taskConfig.Indices[i].Source.Docs = sourceDocs taskConfig.Indices[i].Target.Docs = count taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions + taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren + taskConfig.Indices[i].ExportedPercent = util.ToFixed(exportedPercent, 2) if count == index.Source.Docs { completedIndices++ } @@ -141,6 +150,21 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt StartTime: majorTask.StartTimeInMillis, Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels), } + if taskInfo.Repeating { + _, repeatStatus, err := h.calcRepeatingStatus(&majorTask) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + taskInfo.NextRunTime = repeatStatus.NextRunTime + } + indexParts := strings.Split(uniqueIndexName, ":") + for _, index := range taskConfig.Indices { + if index.Source.Name == indexParts[0] { + taskInfo.Incremental = index.Incremental + } + } subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName) @@ -167,6 +191,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating) var partitionTaskInfos []util.MapStr + var workers = map[string]struct{}{} for i, ptask := range subTasks { cfg := migration_model.IndexMigrationTaskConfig{} @@ -178,7 +203,10 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt if i == 0 { taskInfo.Step = cfg.Source.Step } - + instID := migration_util.GetMapStringValue(ptask.Metadata.Labels, "execution_instance_id") + if instID != "" { + workers[instID] = struct{}{} + } var durationInMS int64 var subCompletedTime int64 if ptask.StartTimeInMillis > 0 { @@ -241,6 +269,14 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt } taskInfo.Partitions = partitionTaskInfos taskInfo.CompletedPartitions = completedPartitions + for _, node := range taskConfig.Settings.Execution.Nodes.Permit { + if _, ok := workers[node.ID]; ok { + taskInfo.Workers = append(taskInfo.Workers, util.MapStr{ + "id": node.ID, + "name": node.Name, + }) + } + } h.WriteJSON(w, taskInfo, http.StatusOK) } @@ -248,6 +284,8 @@ type MigrationIndexStateInfo struct { ErrorPartitions int IndexDocs int64 SourceDocs int64 + RunningChildren int + ScrollDocs int64 } /* @@ -324,9 +362,12 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m taskStats.SourceDocs += cfg.Source.DocCount st := indexState[indexName] st.SourceDocs += cfg.Source.DocCount - indexState[indexName] = st + scrollDocs := migration_util.GetMapIntValue(taskLabels, "scrolled_docs") + st.ScrollDocs += scrollDocs if subTask.Status == task.StatusRunning { + st.RunningChildren++ + indexState[indexName] = st indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) continue } @@ -334,6 +375,7 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs") taskStats.IndexDocs += indexDocs st.IndexDocs += indexDocs + if subTask.Status == task.StatusError { st.ErrorPartitions += 1 taskStats.ErrorPartitions += 1 @@ -347,7 +389,7 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m } taskQuery = util.MapStr{ - "size": len(indexMigrationTaskIDs), + "size": len(indexMigrationTaskIDs) * 2, "query": util.MapStr{ "bool": util.MapStr{ "must": []util.MapStr{ @@ -356,13 +398,13 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m "parent_id": indexMigrationTaskIDs, }, }, - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "bulk_indexing", - }, - }, - }, + //{ + // "term": util.MapStr{ + // "metadata.labels.pipeline_id": util.MapStr{ + // "value": "bulk_indexing", + // }, + // }, + //}, }, }, }, @@ -391,10 +433,12 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m for pipelineID, pipelineContext := range pipelineContexts { // add indexDocs of running tasks indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count") + scrollDocs := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs") taskStats.IndexDocs += indexDocs indexName := pipelineIndexNames[pipelineID] st := indexState[indexName] st.IndexDocs += indexDocs + st.ScrollDocs += scrollDocs indexState[indexName] = st } return taskStats, indexState, nil diff --git a/plugin/task_manager/model/migration.go b/plugin/task_manager/model/migration.go index 5e011112..e8e9e1ab 100644 --- a/plugin/task_manager/model/migration.go +++ b/plugin/task_manager/model/migration.go @@ -9,6 +9,8 @@ import ( ) type ClusterMigrationTaskConfig struct { + Name string `json:"name"` + Tags []string `json:"tags"` Cluster struct { Source ClusterInfo `json:"source"` Target ClusterInfo `json:"target"` @@ -39,6 +41,8 @@ type ClusterMigrationIndexConfig struct { // only used in API Percent float64 `json:"percent,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` + RunningChildren int `json:"running_children,omitempty"` + ExportedPercent float64 `json:"exported_percent,omitempty"` } type ClusterMigrationTaskState struct { diff --git a/plugin/task_manager/util/orm.go b/plugin/task_manager/util/orm.go index 55992057..99506721 100644 --- a/plugin/task_manager/util/orm.go +++ b/plugin/task_manager/util/orm.go @@ -38,9 +38,9 @@ func DeleteChildTasks(taskID string, taskType string) error { return nil } -func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, error) { +func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, *task.Task, error) { queryDsl := util.MapStr{ - "size": 1, + "size": 2, "sort": []util.MapStr{ { "metadata.labels.next_run_time": util.MapStr{ @@ -69,12 +69,21 @@ func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, erro } tasks, err := GetTasks(queryDsl) if err != nil { - return nil, err + return nil, nil, err } if len(tasks) == 0 { - return nil, nil + return nil, nil, nil } - return &tasks[0], nil + var lastRunChildTask *task.Task + if tasks[0].StartTimeInMillis > 0 { + lastRunChildTask = &tasks[0] + }else{ + if len(tasks) == 2 { + lastRunChildTask = &tasks[1] + } + } + + return &tasks[0], lastRunChildTask, nil } func GetPendingChildTasks(taskID string, taskType string) ([]task.Task, error) { From e5d06ffb9c2b1bf8fe4cc73a01fdfca2ebbe785d Mon Sep 17 00:00:00 2001 From: hardy Date: Tue, 19 Sep 2023 17:32:33 +0800 Subject: [PATCH 02/20] fix aarch64 build for ci --- Jenkinsfile-linux | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 Jenkinsfile-linux diff --git a/Jenkinsfile-linux b/Jenkinsfile-linux new file mode 100644 index 00000000..4e2099e4 --- /dev/null +++ b/Jenkinsfile-linux @@ -0,0 +1,45 @@ +pipeline { + agent none + + environment { + CI = 'true' + } + + stages { + + + stage('Prepare Web Packages') { + + agent { + label 'linux' + } + + steps { + catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE'){ + sh 'cd /home/jenkins/go/src/infini.sh/console && git stash && git pull origin master && make clean' + sh 'cd /home/jenkins/go/src/infini.sh/console/ && true || rm -rif web' + sh 'cd /home/jenkins/go/src/infini.sh/console/ && true || git clone ssh://git@git.infini.ltd:64221/infini/console-ui.git web' + sh 'cd /home/jenkins/go/src/infini.sh/console/web && git pull origin master' + sh 'cd /home/jenkins/go/src/infini.sh/console/web/src && true || git clone ssh://git@git.infini.ltd:64221/infini/common-ui.git common' + sh 'cd /home/jenkins/go/src/infini.sh/console/web/src/common && git pull origin master' + sh 'cd /home/jenkins/go/src/infini.sh/console/web && git stash' + sh 'cd /home/jenkins/go/src/infini.sh/console/web && cnpm install' + sh 'cd /home/jenkins/go/src/infini.sh/console/web && cnpm run build' + sh 'cd /home/jenkins/go/src/infini.sh/console && git pull origin master && make config build-linux-amd64 build-arm' + sh label: 'copy-license', script: 'cd /home/jenkins/go/src/infini.sh/console && cp ../framework/LICENSE bin && cat ../framework/NOTICE NOTICE > bin/NOTICE' + sh label: 'copy-configs', script: 'cd /home/jenkins/go/src/infini.sh/console && mkdir -p bin/config && cp config/*.json bin/config && cp config/*.tpl bin/config' + sh label: 'copy-certs', script: 'cd /home/jenkins/go/src/infini.sh/console && cp -rf config/certs bin/config' + + sh label: 'package-linux-amd64', script: 'cd /home/jenkins/go/src/infini.sh/console/bin && tar cfz ${WORKSPACE}/console-$VERSION-$BUILD_NUMBER-linux-amd64.tar.gz console-linux-amd64 console.yml LICENSE NOTICE config' + sh label: 'package-linux-arm64', script: 'cd /home/jenkins/go/src/infini.sh/console/bin && tar cfz ${WORKSPACE}/console-$VERSION-$BUILD_NUMBER-linux-arm64.tar.gz console-linux-arm64 console.yml LICENSE NOTICE config' + archiveArtifacts artifacts: 'console-$VERSION-$BUILD_NUMBER-*.*', fingerprint: true, followSymlinks: true, onlyIfSuccessful: false + } + } + } + + + + +} + +} From 288a2e0304d50b88c5f5a82f5df5be6a0781916e Mon Sep 17 00:00:00 2001 From: hardy Date: Thu, 21 Sep 2023 09:09:18 +0800 Subject: [PATCH 03/20] fix: arm build eol error --- Jenkinsfile-linux | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/Jenkinsfile-linux b/Jenkinsfile-linux index 4e2099e4..fbb68fc1 100644 --- a/Jenkinsfile-linux +++ b/Jenkinsfile-linux @@ -25,13 +25,19 @@ pipeline { sh 'cd /home/jenkins/go/src/infini.sh/console/web && git stash' sh 'cd /home/jenkins/go/src/infini.sh/console/web && cnpm install' sh 'cd /home/jenkins/go/src/infini.sh/console/web && cnpm run build' - sh 'cd /home/jenkins/go/src/infini.sh/console && git pull origin master && make config build-linux-amd64 build-arm' + + sh 'cd /home/jenkins/go/src/infini.sh/console && git pull origin master && make config build-linux-amd64' sh label: 'copy-license', script: 'cd /home/jenkins/go/src/infini.sh/console && cp ../framework/LICENSE bin && cat ../framework/NOTICE NOTICE > bin/NOTICE' sh label: 'copy-configs', script: 'cd /home/jenkins/go/src/infini.sh/console && mkdir -p bin/config && cp config/*.json bin/config && cp config/*.tpl bin/config' sh label: 'copy-certs', script: 'cd /home/jenkins/go/src/infini.sh/console && cp -rf config/certs bin/config' - sh label: 'package-linux-amd64', script: 'cd /home/jenkins/go/src/infini.sh/console/bin && tar cfz ${WORKSPACE}/console-$VERSION-$BUILD_NUMBER-linux-amd64.tar.gz console-linux-amd64 console.yml LICENSE NOTICE config' + + sh 'cd /home/jenkins/go/src/infini.sh/console && git pull origin master && make config build-arm' + sh label: 'copy-license', script: 'cd /home/jenkins/go/src/infini.sh/console && cp ../framework/LICENSE bin && cat ../framework/NOTICE NOTICE > bin/NOTICE' + sh label: 'copy-configs', script: 'cd /home/jenkins/go/src/infini.sh/console && mkdir -p bin/config && cp config/*.json bin/config && cp config/*.tpl bin/config' + sh label: 'copy-certs', script: 'cd /home/jenkins/go/src/infini.sh/console && cp -rf config/certs bin/config' sh label: 'package-linux-arm64', script: 'cd /home/jenkins/go/src/infini.sh/console/bin && tar cfz ${WORKSPACE}/console-$VERSION-$BUILD_NUMBER-linux-arm64.tar.gz console-linux-arm64 console.yml LICENSE NOTICE config' + archiveArtifacts artifacts: 'console-$VERSION-$BUILD_NUMBER-*.*', fingerprint: true, followSymlinks: true, onlyIfSuccessful: false } } From d0ab79f47de51806f6b4ddc5f1f6071fa888c75d Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 21 Sep 2023 16:08:57 +0800 Subject: [PATCH 04/20] insight metric support formula template (#158) only support template variable 'bucket_size_in_second' currently, eg frmula: `a/{{.bucket_size_in_second}}` Reviewed-on: https://git.infini.ltd:64443/infini/console/pulls/158 Co-authored-by: liugq Co-committed-by: liugq --- plugin/api/insight/metadata.go | 20 ++++++++++++++++++++ plugin/task_manager/common_api.go | 4 ++++ 2 files changed, 24 insertions(+) diff --git a/plugin/api/insight/metadata.go b/plugin/api/insight/metadata.go index 8b4c27c1..013eaa57 100644 --- a/plugin/api/insight/metadata.go +++ b/plugin/api/insight/metadata.go @@ -5,8 +5,10 @@ package insight import ( + "bytes" "github.com/Knetic/govaluate" log "github.com/cihub/seelog" + "text/template" "infini.sh/console/model/insight" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" @@ -224,6 +226,24 @@ func getMetricData(metric *insight.Metric) (interface{}, error) { if len(metric.Items) == 1 && formula == "" { targetMetricData = metricData }else { + tpl, err := template.New("insight_formula").Parse(formula) + if err != nil { + return nil, err + } + msgBuffer := &bytes.Buffer{} + params := map[string]interface{}{} + if metric.BucketSize != "" { + du, err := util.ParseDuration(metric.BucketSize) + if err != nil { + return nil, err + } + params["bucket_size_in_second"] = du.Seconds() + } + err = tpl.Execute(msgBuffer, params) + if err != nil { + return nil, err + } + formula = msgBuffer.String() for _, md := range metricData { targetData := insight.MetricData{ Groups: md.Groups, diff --git a/plugin/task_manager/common_api.go b/plugin/task_manager/common_api.go index 6d2c1bd5..6d6bf075 100644 --- a/plugin/task_manager/common_api.go +++ b/plugin/task_manager/common_api.go @@ -439,6 +439,7 @@ type RepeatStatus struct { LastRunTime int64 `json:"last_run_time"` NextRunTime int64 `json:"next_run_time"` LastRunChildTaskID string `json:"last_run_child_task_id"` + LastCompleteTime int64 `json:"last_complete_time"` } func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *RepeatStatus, error) { @@ -470,6 +471,9 @@ func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *Repe ret.LastRunTime = lastRepeatingChild.StartTimeInMillis if ret.LastRunTime == 0 && lastRunChild != nil { ret.LastRunTime = lastRunChild.StartTimeInMillis + if !lastRunChild.CompletedTime.IsZero(){ + ret.LastCompleteTime = lastRunChild.CompletedTime.UnixMilli() + } } if lastRunChild != nil { ret.LastRunChildTaskID = lastRunChild.ID From 7a5d3b1a87f554ac48dafe0016477d37d7adfe4d Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 21 Sep 2023 17:54:11 +0800 Subject: [PATCH 05/20] add metrics widget template of migration --- config/initialization.tpl | 245 +++++++++++++++++++++++++++++++++++ config/initialization_v5.tpl | 245 +++++++++++++++++++++++++++++++++++ config/initialization_v6.tpl | 245 +++++++++++++++++++++++++++++++++++ 3 files changed, 735 insertions(+) diff --git a/config/initialization.tpl b/config/initialization.tpl index 27953449..abac320b 100644 --- a/config/initialization.tpl +++ b/config/initialization.tpl @@ -2545,6 +2545,251 @@ POST $[[INDEX_PREFIX]]widget/_doc/cji1ttq8go5i051pl1t0 ] } } +PUT $[[INDEX_PREFIX]]widget/_doc/cji1sc28go5i051pl1i1 +{ + "id": "cji1sc28go5i051pl1i1", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Source Cluster Query QPS", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Total Query", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.query_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + + +PUT $[[INDEX_PREFIX]]widget/_doc/cji1sc28go5i051pl1i2 +{ + "id": "cji1sc28go5i051pl1i2", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Source Cluster Search Latency", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/b", + "name": "Query Latency", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.query_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.total.search.query_total", + "name": "b", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + }, + { + "metric": { + "formula": "a/b", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.scroll_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.total.search.scroll_total", + "name": "b", + "statistic": "derivative" + } + ], + "name": "Scroll Latency", + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + +PUT $[[INDEX_PREFIX]]widget/_doc/cji1sc28go5i051pl1i3 +{ + "id": "cji1sc28go5i051pl1i3", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Target Cluster Index QPS", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Total Indexing", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.indexing.index_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + }, + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Primary Indexing", + "items": [ + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + +PUT $[[INDEX_PREFIX]]widget/_doc/cji1sc28go5i051pl1i4 +{ + "id": "cji1sc28go5i051pl1i4", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Target Cluster Index Latency", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/b", + "name": "Indexing Latency", + "items": [ + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_total", + "name": "b", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} #The `id` value is consistent with the `_id` value POST $[[INDEX_PREFIX]]layout/_doc/cg2qqh28go5jqa6vvk70 diff --git a/config/initialization_v5.tpl b/config/initialization_v5.tpl index 9aeb3319..5b2bee11 100644 --- a/config/initialization_v5.tpl +++ b/config/initialization_v5.tpl @@ -2475,6 +2475,251 @@ POST $[[INDEX_PREFIX]]widget/doc/cji1ttq8go5i051pl1t0 ] } } +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i1 +{ + "id": "cji1sc28go5i051pl1i1", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Source Cluster Query QPS", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Total Query", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.query_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + + +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i2 +{ + "id": "cji1sc28go5i051pl1i2", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Source Cluster Search Latency", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/b", + "name": "Query Latency", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.query_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.total.search.query_total", + "name": "b", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + }, + { + "metric": { + "formula": "a/b", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.scroll_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.total.search.scroll_total", + "name": "b", + "statistic": "derivative" + } + ], + "name": "Scroll Latency", + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i3 +{ + "id": "cji1sc28go5i051pl1i3", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Target Cluster Index QPS", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Total Indexing", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.indexing.index_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + }, + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Primary Indexing", + "items": [ + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i4 +{ + "id": "cji1sc28go5i051pl1i4", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Target Cluster Index Latency", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/b", + "name": "Indexing Latency", + "items": [ + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_total", + "name": "b", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} #The `id` value is consistent with the `_id` value POST $[[INDEX_PREFIX]]layout/doc/cg2qqh28go5jqa6vvk70 diff --git a/config/initialization_v6.tpl b/config/initialization_v6.tpl index 62c8d96b..2e3a1402 100644 --- a/config/initialization_v6.tpl +++ b/config/initialization_v6.tpl @@ -2551,6 +2551,251 @@ POST $[[INDEX_PREFIX]]widget/doc/cji1ttq8go5i051pl1t0 } } +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i1 +{ + "id": "cji1sc28go5i051pl1i1", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Source Cluster Query QPS", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Total Query", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.query_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + + +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i2 +{ + "id": "cji1sc28go5i051pl1i2", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Source Cluster Search Latency", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/b", + "name": "Query Latency", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.query_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.total.search.query_total", + "name": "b", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + }, + { + "metric": { + "formula": "a/b", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.scroll_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.total.search.scroll_total", + "name": "b", + "statistic": "derivative" + } + ], + "name": "Scroll Latency", + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i3 +{ + "id": "cji1sc28go5i051pl1i3", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Target Cluster Index QPS", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Total Indexing", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.indexing.index_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + }, + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Primary Indexing", + "items": [ + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i4 +{ + "id": "cji1sc28go5i051pl1i4", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Target Cluster Index Latency", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/b", + "name": "Indexing Latency", + "items": [ + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_total", + "name": "b", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + #The `id` value is consistent with the `_id` value POST $[[INDEX_PREFIX]]layout/doc/cg2qqh28go5jqa6vvk70 { From a54cb2c9a1afe18126ac839b9c2d2f233c7d516c Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 25 Sep 2023 18:09:55 +0800 Subject: [PATCH 06/20] fix nil pointer (#164) Reviewed-on: https://git.infini.ltd:64443/infini/console/pulls/164 Co-authored-by: liugq Co-committed-by: liugq --- plugin/task_manager/common_api.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/task_manager/common_api.go b/plugin/task_manager/common_api.go index 6d6bf075..4400cf7b 100644 --- a/plugin/task_manager/common_api.go +++ b/plugin/task_manager/common_api.go @@ -471,7 +471,7 @@ func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *Repe ret.LastRunTime = lastRepeatingChild.StartTimeInMillis if ret.LastRunTime == 0 && lastRunChild != nil { ret.LastRunTime = lastRunChild.StartTimeInMillis - if !lastRunChild.CompletedTime.IsZero(){ + if lastRunChild.CompletedTime != nil && !lastRunChild.CompletedTime.IsZero(){ ret.LastCompleteTime = lastRunChild.CompletedTime.UnixMilli() } } From 6b9e08be2c5c3b2f5521535fd9927073f2e15982 Mon Sep 17 00:00:00 2001 From: hardy Date: Tue, 26 Sep 2023 11:59:05 +0800 Subject: [PATCH 07/20] fix path --- build-web.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build-web.sh b/build-web.sh index 20077ee7..0199db50 100644 --- a/build-web.sh +++ b/build-web.sh @@ -19,7 +19,7 @@ fi cd $WORKBASE/web git pull origin master -cd $WORKBASE/web/common +cd $WORKBASE/web/src/common git pull origin master git log --pretty=oneline -5 From 7311547c54b758155374ecd8d3f2a04fed6e3443 Mon Sep 17 00:00:00 2001 From: hardy Date: Tue, 26 Sep 2023 14:07:21 +0800 Subject: [PATCH 08/20] fix: build path --- build-web.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build-web.sh b/build-web.sh index 0199db50..cdb7d37c 100644 --- a/build-web.sh +++ b/build-web.sh @@ -24,6 +24,8 @@ git pull origin master git log --pretty=oneline -5 +cd $WORKBASE/web + #--quiet cnpm install --quiet --no-progress From 4356ac4ddfffb4ab1121a4b0a531a7c3e2ffa46b Mon Sep 17 00:00:00 2001 From: liugq Date: Wed, 27 Sep 2023 15:59:45 +0800 Subject: [PATCH 09/20] add missing es doc api path (#170) Reviewed-on: https://git.infini.ltd:64443/infini/console/pulls/170 Co-authored-by: liugq Co-committed-by: liugq --- config/permission.json | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/config/permission.json b/config/permission.json index 63426b91..1a7cf336 100644 --- a/config/permission.json +++ b/config/permission.json @@ -293,9 +293,15 @@ {"name": "doc.update", "methods": ["put"], "path": "/:index_name/:doctype/:doc_id" }, + {"name": "doc.update", "methods": ["post"], + "path": "/:index_name/_update/:doc_id" + }, {"name": "doc.create", "methods": ["post"], "path": "/:index_name/:doctype" }, + {"name": "doc.create", "methods": ["post", "put"], + "path": "/:index_name/_create/:doc_id" + }, {"name": "doc.delete", "methods": ["delete"], "path": "/:index_name/:doctype/:doc_id" @@ -303,6 +309,9 @@ {"name": "doc.get", "methods": ["get"], "path": "/:index_name/:doctype/:doc_id" }, + {"name": "doc.get", "methods": ["get"], + "path": "/:index_name/_source/:doc_id" + }, {"name": "doc.exists", "methods": ["head"], "path": "/:index_name/:doctype/:doc_id" }, From 97731564c1a9350676a7cda161ec1a1e171bbae3 Mon Sep 17 00:00:00 2001 From: liugq Date: Sun, 8 Oct 2023 11:00:04 +0800 Subject: [PATCH 10/20] add name and tags config --- plugin/task_manager/cluster_comparison/orm.go | 4 ++++ plugin/task_manager/model/comparison.go | 3 +++ 2 files changed, 7 insertions(+) diff --git a/plugin/task_manager/cluster_comparison/orm.go b/plugin/task_manager/cluster_comparison/orm.go index f3d37c57..5682a61d 100644 --- a/plugin/task_manager/cluster_comparison/orm.go +++ b/plugin/task_manager/cluster_comparison/orm.go @@ -105,6 +105,7 @@ func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rba "source_total_docs": sourceTotalDocs, "target_total_docs": targetTotalDocs, "permit_nodes": config.Settings.Execution.Nodes.Permit, + "name": config.Name, }, }, Cancellable: true, @@ -112,6 +113,9 @@ func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rba Status: task.StatusInit, ConfigString: util.MustToJSON(config), } + if len(config.Tags) > 0 { + t.Metadata.Labels["tags"] = config.Tags + } t.ID = util.GetUUID() return &t, nil } diff --git a/plugin/task_manager/model/comparison.go b/plugin/task_manager/model/comparison.go index ba9d046a..07a4085a 100644 --- a/plugin/task_manager/model/comparison.go +++ b/plugin/task_manager/model/comparison.go @@ -5,6 +5,8 @@ import ( ) type ClusterComparisonTaskConfig struct { + Name string `json:"name"` + Tags []string `json:"tags"` Cluster struct { Source ClusterInfo `json:"source"` Target ClusterInfo `json:"target"` @@ -31,6 +33,7 @@ type ClusterComparisonIndexConfig struct { // only used in API ScrollPercent float64 `json:"scroll_percent,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` + RunningChildren int `json:"running_children,omitempty"` } type IndexComparisonTaskConfig struct { From 2f457924801c234493eaa6bb24649be775ed11ff Mon Sep 17 00:00:00 2001 From: liugq Date: Sun, 8 Oct 2023 11:02:39 +0800 Subject: [PATCH 11/20] add diff result info and reset total_diff_docs after task restarted --- .../cluster_comparison/cluster_comparison.go | 4 ++++ plugin/task_manager/comparison_api.go | 12 ++++++++++++ .../index_comparison/index_comparison.go | 4 ++++ 3 files changed, 20 insertions(+) diff --git a/plugin/task_manager/cluster_comparison/cluster_comparison.go b/plugin/task_manager/cluster_comparison/cluster_comparison.go index ef7ac7ea..9dcaf070 100644 --- a/plugin/task_manager/cluster_comparison/cluster_comparison.go +++ b/plugin/task_manager/cluster_comparison/cluster_comparison.go @@ -60,6 +60,10 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error { taskItem.StartTimeInMillis = time.Now().UnixMilli() } taskItem.Status = task.StatusRunning + taskItem.Metadata.Labels["total_diff_docs"] = 0 + taskItem.Metadata.Labels["only_in_source"] = 0 + taskItem.Metadata.Labels["only_in_target"] = 0 + taskItem.Metadata.Labels["diff_both"] = 0 p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, fmt.Sprintf("cluster comparison task [%s] started", taskItem.ID)) diff --git a/plugin/task_manager/comparison_api.go b/plugin/task_manager/comparison_api.go index 439aa0c6..05ad2a41 100644 --- a/plugin/task_manager/comparison_api.go +++ b/plugin/task_manager/comparison_api.go @@ -83,6 +83,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2) taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions + taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren if count == index.Source.Docs+index.Target.Docs { completedIndices++ } @@ -115,6 +116,7 @@ type ClusterComparisonTaskState struct { TargetTotalDocs int64 TargetScrollDocs int64 TotalDiffDocs int64 + RunningChildren int } type ComparisonIndexStateInfo struct { @@ -124,6 +126,7 @@ type ComparisonIndexStateInfo struct { TargetTotalDocs int64 TargetScrollDocs int64 TotalDiffDocs int64 + RunningChildren int } // TODO: calc realtime info from instance @@ -188,6 +191,9 @@ func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats Cluste if subTask.Status == task.StatusError { st.ErrorPartitions += 1 } + if subTask.Status == task.StatusRunning { + st.RunningChildren++ + } indexState[indexName] = st } @@ -256,6 +262,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht subTaskLabels := util.MapStr(subTask.Metadata.Labels) sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled") targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled") + onlyInSource := migration_util.GetMapIntValue(subTaskLabels, "only_in_source") + onlyInTarget := migration_util.GetMapIntValue(subTaskLabels, "only_in_target") + diffBoth := migration_util.GetMapIntValue(subTaskLabels, "diff_both") partitionTaskInfo := util.MapStr{ "task_id": subTask.ID, @@ -267,6 +276,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht "duration": durationInMS, "source_total_docs": cfg.Source.DocCount, "target_total_docs": cfg.Target.DocCount, + "only_in_source": onlyInSource, + "only_in_target": onlyInTarget, + "diff_both": diffBoth, } sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg) if sourceDumpTask != nil { diff --git a/plugin/task_manager/index_comparison/index_comparison.go b/plugin/task_manager/index_comparison/index_comparison.go index c14d902a..cdcb1bc5 100644 --- a/plugin/task_manager/index_comparison/index_comparison.go +++ b/plugin/task_manager/index_comparison/index_comparison.go @@ -362,6 +362,9 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 { now := time.Now() taskItem.Metadata.Labels["total_diff_docs"] = onlyInSource + onlyInTarget + diffBoth + taskItem.Metadata.Labels["only_in_source"] = onlyInSource + taskItem.Metadata.Labels["only_in_target"] = onlyInTarget + taskItem.Metadata.Labels["diff_both"] = diffBoth taskItem.CompletedTime = &now taskItem.Status = task.StatusError p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ @@ -390,6 +393,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { now := time.Now() taskItem.CompletedTime = &now taskItem.Status = task.StatusComplete + taskItem.Metadata.Labels["total_diff_docs"] = 0 p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, "index comparison completed") From 71ba08d266a1f6b84426cb4f2992b855ac7f2814 Mon Sep 17 00:00:00 2001 From: liugq Date: Sun, 8 Oct 2023 17:46:55 +0800 Subject: [PATCH 12/20] refactor getComparisonMajorTaskInfo --- plugin/task_manager/comparison_api.go | 170 ++++++++++++++++-------- plugin/task_manager/model/comparison.go | 3 +- 2 files changed, 119 insertions(+), 54 deletions(-) diff --git a/plugin/task_manager/comparison_api.go b/plugin/task_manager/comparison_api.go index 05ad2a41..bca13d5d 100644 --- a/plugin/task_manager/comparison_api.go +++ b/plugin/task_manager/comparison_api.go @@ -74,7 +74,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. var completedIndices int for i, index := range taskConfig.Indices { indexName := index.Source.GetUniqueIndexName() - count := indexState[indexName].SourceScrollDocs + indexState[indexName].TargetScrollDocs + count := indexState[indexName].TotalScrollDocs percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100 if percent > 100 { percent = 100 @@ -82,6 +82,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceTotalDocs taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2) + taskConfig.Indices[i].TotalScrollDocs = count taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren if count == index.Source.Docs+index.Target.Docs { @@ -127,76 +128,139 @@ type ComparisonIndexStateInfo struct { TargetScrollDocs int64 TotalDiffDocs int64 RunningChildren int + TotalScrollDocs int64 } -// TODO: calc realtime info from instance func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) { + var pipelineTaskIDs = map[string][]string{} + var pipelineIndexNames = map[string]string{} indexState = map[string]ComparisonIndexStateInfo{} - - taskQuery := util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskID, + const size = 500 + var ( + from = -size + hasMore = true + ) + for hasMore { + from += size + taskQuery := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskID, + }, }, }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_comparison", + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_comparison", + }, }, }, }, }, }, - }, - } - subTasks, err := migration_util.GetTasks(taskQuery) - if err != nil { - return taskStats, indexState, err - } - - for _, subTask := range subTasks { - taskLabels := util.MapStr(subTask.Metadata.Labels) - indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") - if indexName == "" { - continue } - - cfg := migration_model.IndexComparisonTaskConfig{} - err = migration_util.GetTaskConfig(&subTask, &cfg) + subTasks, err := migration_util.GetTasks(taskQuery) if err != nil { - log.Errorf("failed to get task config, err: %v", err) + return taskStats, indexState, err + } + if len(subTasks) < size { + hasMore = false + } + + var indexMigrationTaskIDs []string + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + cfg := migration_model.IndexComparisonTaskConfig{} + err = migration_util.GetTaskConfig(&subTask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + continue + } + totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs") + taskStats.SourceTotalDocs += cfg.Source.DocCount + taskStats.TargetTotalDocs += cfg.Target.DocCount + taskStats.TotalDiffDocs += totalDiffDocs + st := indexState[indexName] + st.SourceTotalDocs += cfg.Source.DocCount + st.TargetTotalDocs += cfg.Target.DocCount + st.TotalDiffDocs += totalDiffDocs + if subTask.Status == task.StatusError { + st.ErrorPartitions += 1 + } + if subTask.Status == task.StatusRunning { + st.RunningChildren++ + indexState[indexName] = st + indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) + continue + } + sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled") + targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") + st.SourceScrollDocs += sourceDocs + st.TargetScrollDocs += targetDocs + st.TotalScrollDocs += sourceDocs + targetDocs + taskStats.TargetScrollDocs += targetDocs + taskStats.SourceScrollDocs += sourceDocs + indexState[indexName] = st + } + + if len(indexMigrationTaskIDs) == 0 { continue } - sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled") - targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") - totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs") - taskStats.SourceTotalDocs += cfg.Source.DocCount - taskStats.SourceScrollDocs += sourceDocs - taskStats.TargetTotalDocs += cfg.Target.DocCount - taskStats.TargetScrollDocs += targetDocs - taskStats.TotalDiffDocs += totalDiffDocs + + taskQuery = util.MapStr{ + "size": len(indexMigrationTaskIDs) * 2, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "parent_id": indexMigrationTaskIDs, + }, + }, + }, + }, + }, + } + subTasks, err = migration_util.GetTasks(taskQuery) + if err != nil { + return taskStats, indexState, err + } + + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + pipelineIndexNames[subTask.ID] = indexName + + if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + } + } + } + + pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs) + for pipelineID, pipelineContext := range pipelineContexts { + // add scrolledDocs of running tasks + scrollDocs := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs") + indexName := pipelineIndexNames[pipelineID] st := indexState[indexName] - st.SourceTotalDocs += cfg.Source.DocCount - st.SourceScrollDocs += sourceDocs - st.TargetTotalDocs += cfg.Target.DocCount - st.TargetScrollDocs += targetDocs - st.TotalDiffDocs += totalDiffDocs - if subTask.Status == task.StatusError { - st.ErrorPartitions += 1 - } - if subTask.Status == task.StatusRunning { - st.RunningChildren++ - } + st.TotalScrollDocs += scrollDocs indexState[indexName] = st } - return taskStats, indexState, nil } diff --git a/plugin/task_manager/model/comparison.go b/plugin/task_manager/model/comparison.go index 07a4085a..b64d61ce 100644 --- a/plugin/task_manager/model/comparison.go +++ b/plugin/task_manager/model/comparison.go @@ -31,7 +31,8 @@ type ClusterComparisonIndexConfig struct { Partition *IndexPartition `json:"partition,omitempty"` // only used in API - ScrollPercent float64 `json:"scroll_percent,omitempty"` + ScrollPercent float64 `json:"scroll_percent,omitempty"` + TotalScrollDocs int64 `json:"total_scroll_docs,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` RunningChildren int `json:"running_children,omitempty"` } From f3569c2701ce5fa0ea4dff8d7c000084f3ee0b37 Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 9 Oct 2023 09:36:50 +0800 Subject: [PATCH 13/20] support restart cluster comparison task --- plugin/task_manager/common_api.go | 2 +- plugin/task_manager/util/orm.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/task_manager/common_api.go b/plugin/task_manager/common_api.go index 4400cf7b..6bed3af8 100644 --- a/plugin/task_manager/common_api.go +++ b/plugin/task_manager/common_api.go @@ -189,7 +189,7 @@ func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps http h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) return } - if obj.Metadata.Type != "pipeline" && obj.Status == task.StatusComplete { + if obj.Metadata.Type != "pipeline" && (obj.Status == task.StatusComplete && obj.Metadata.Type != "cluster_comparison") { h.WriteError(w, fmt.Sprintf("[%s] task [%s] completed, can't start anymore", obj.Metadata.Type, taskID), http.StatusInternalServerError) return } diff --git a/plugin/task_manager/util/orm.go b/plugin/task_manager/util/orm.go index 99506721..faa65d00 100644 --- a/plugin/task_manager/util/orm.go +++ b/plugin/task_manager/util/orm.go @@ -242,7 +242,7 @@ func UpdateStoppedChildTasksToReady(taskItem *task.Task, taskType string) error }, { "terms": util.MapStr{ - "status": []string{task.StatusError, task.StatusStopped}, + "status": []string{task.StatusError, task.StatusStopped, task.StatusComplete}, }, }, { From 9f60c4685fdfa7601f06bef0e35207a8ad5ab92d Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 9 Oct 2023 10:56:22 +0800 Subject: [PATCH 14/20] return workers info of index level --- plugin/task_manager/comparison_api.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/plugin/task_manager/comparison_api.go b/plugin/task_manager/comparison_api.go index bca13d5d..a4382f0f 100644 --- a/plugin/task_manager/comparison_api.go +++ b/plugin/task_manager/comparison_api.go @@ -301,6 +301,7 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating) var partitionTaskInfos []util.MapStr + var workers = map[string]struct{}{} for i, subTask := range subTasks { cfg := migration_model.IndexComparisonTaskConfig{} @@ -312,6 +313,10 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht if i == 0 { taskInfo.Step = cfg.Source.Step } + instID := migration_util.GetMapStringValue(subTask.Metadata.Labels, "execution_instance_id") + if instID != "" { + workers[instID] = struct{}{} + } var durationInMS int64 var subCompletedTime int64 @@ -381,6 +386,14 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht if taskInfo.StartTime == 0 { taskInfo.StartTime = startTime } + for _, node := range taskConfig.Settings.Execution.Nodes.Permit { + if _, ok := workers[node.ID]; ok { + taskInfo.Workers = append(taskInfo.Workers, util.MapStr{ + "id": node.ID, + "name": node.Name, + }) + } + } taskInfo.Partitions = partitionTaskInfos taskInfo.CompletedPartitions = completedPartitions h.WriteJSON(w, taskInfo, http.StatusOK) From 12d4a8eef0e9366f1940ec2cbc78fa63fa3477fd Mon Sep 17 00:00:00 2001 From: liugq Date: Tue, 10 Oct 2023 08:59:44 +0800 Subject: [PATCH 15/20] return last running task info --- .../task_manager/cluster_comparison/cluster_comparison.go | 4 +--- plugin/task_manager/common_api.go | 6 +++++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/plugin/task_manager/cluster_comparison/cluster_comparison.go b/plugin/task_manager/cluster_comparison/cluster_comparison.go index 9dcaf070..982419f2 100644 --- a/plugin/task_manager/cluster_comparison/cluster_comparison.go +++ b/plugin/task_manager/cluster_comparison/cluster_comparison.go @@ -56,9 +56,7 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error { return nil } taskItem.RetryTimes++ - if taskItem.StartTimeInMillis == 0 { - taskItem.StartTimeInMillis = time.Now().UnixMilli() - } + taskItem.StartTimeInMillis = time.Now().UnixMilli() taskItem.Status = task.StatusRunning taskItem.Metadata.Labels["total_diff_docs"] = 0 taskItem.Metadata.Labels["only_in_source"] = 0 diff --git a/plugin/task_manager/common_api.go b/plugin/task_manager/common_api.go index 6bed3af8..e828a4a5 100644 --- a/plugin/task_manager/common_api.go +++ b/plugin/task_manager/common_api.go @@ -160,7 +160,11 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { sourceM.Put("metadata.labels.source_total_docs", ts.SourceDocs) } case "cluster_comparison": - ts, _, err := h.getComparisonMajorTaskInfo(taskID) + targetTaskId := taskID + if repeatStatus.IsRepeat && repeatStatus.LastRunChildTaskID != "" { + targetTaskId = repeatStatus.LastRunChildTaskID + } + ts, _, err := h.getComparisonMajorTaskInfo(targetTaskId) if err != nil { log.Warnf("fetch progress info of task error: %v", err) return From a19354bcb5f820676525f070daac8a3e94066082 Mon Sep 17 00:00:00 2001 From: hardy Date: Wed, 18 Oct 2023 10:49:35 +0800 Subject: [PATCH 16/20] fix case macos tar error when overwrite --- config/install_agent.tpl | 45 +++++++++++++++++++++++++++++++--------- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/config/install_agent.tpl b/config/install_agent.tpl index d7174559..14b74d93 100644 --- a/config/install_agent.tpl +++ b/config/install_agent.tpl @@ -52,19 +52,31 @@ function print_footprint() { echo "" } -__try() { +function __try() { if [[ $try_status -eq 0 ]]; then ! exception=$( $@ 2>&1 >/dev/null ) try_status=${PIPESTATUS[0]} fi } -__catch() { +function __catch() { _old_try=$try_status try_status=0 [[ $_old_try -ne 0 ]] } +function confirm() { + display_str=$1 + default_ans=$2 + if [[ $default_ans == 'y/N' ]]; then + must_match='[yY]' + else + must_match='[nN]' + fi + read -p"${display_str} [${default_ans}]:" ans + [[ $ans == $must_match ]] +} + function get_latest_version() { echo $(curl -m3 -s "https://release.infinilabs.com/.latest" |sed 's/",/"/;s/"//g;s/://1' |grep -Ev '^[{}]' |grep "$program_name" |awk '{print $NF}') } @@ -82,9 +94,11 @@ function check_dir() { echo -e "Error: The installation directory ${install_dir} should be owner by current user.\nsudo chown -R \$(whoami) ${install_dir}" >&2; exit 1; fi - #if [[ "$(ls -A ${install_dir})" ]]; then - # echo "Error: The installation directory ${install_dir} should be clean." >&2; exit 1; - #fi + if [[ "$(ls -A ${install_dir})" ]]; then + confirm "RISK WARN: Replace or upgrade exists agent version, Proceed?" 'y/N' && echo || exit 1; + uninstall_service + rm -rf ${install_dir}/* + fi } function check_platform() { @@ -241,7 +255,7 @@ agent: EOF } -function install_service() { +function uninstall_service() { agent_svc=${install_dir}/${program_name}-${file_ext%%.*} chmod 755 $agent_svc @@ -253,18 +267,28 @@ function install_service() { $agent_svc -service stop &>/dev/null $agent_svc -service uninstall &>/dev/null fi + sleep 3 +} +function install_service() { + agent_svc=${install_dir}/${program_name}-${file_ext%%.*} + chmod 755 $agent_svc echo "[agent] waiting service install & start" $agent_svc -service install &>/dev/null $agent_svc -service start &>/dev/null - sleep 5 + sleep 3 } function register_agent() { - token={{token}} console_endpoint="{{console_endpoint}}" - echo "[agent] waiting registering to INFINI Console" - __try curl -s --retry 1 --retry-delay 3 -m30 -XPOST -o ${install_dir}/setup.log "${console_endpoint}/agent/instance?token=${token}" + token={{token}} + echo '[agent] waiting registering to INFINI Console' + until curl -s -m30 -XPOST "${console_endpoint}/agent/instance?token=${token}"; + do + echo -n '.'; sleep 3; + done; + echo + #__try curl -s --retry 1 --retry-delay 3 -m30 -XPOST -o ${install_dir}/setup.log "${console_endpoint}/agent/instance?token=${token}" } function main() { @@ -295,6 +319,7 @@ function main() { install_binary install_certs install_config + uninstall_service install_service register_agent From f83c0d274f4ce56598449e3543c9476211e754de Mon Sep 17 00:00:00 2001 From: liugq Date: Mon, 16 Oct 2023 16:10:07 +0800 Subject: [PATCH 17/20] handle initialize setup timeout error --- plugin/setup/setup.go | 46 +++++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/plugin/setup/setup.go b/plugin/setup/setup.go index 7cd0f6e0..6e925c05 100644 --- a/plugin/setup/setup.go +++ b/plugin/setup/setup.go @@ -321,19 +321,32 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API, Setup func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { if !global.Env().SetupRequired() { - module.WriteError(w, "setup not permitted", 500) + //handle setup timeout + rkey, err := keystore.GetValue(credential.SecretKey) + if err != nil { + module.WriteError(w, err.Error(), 500) + return + } + request := SetupRequest{} + err = module.DecodeJSON(r, &request) + if err != nil { + module.WriteError(w, err.Error(), 500) + return + } + h := md5.New() + rawSecret := []byte(request.CredentialSecret) + h.Write(rawSecret) + secret := make([]byte, 32) + hex.Encode(secret, h.Sum(nil)) + if bytes.Compare(rkey, secret) == 0 { + module.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) + }else{ + module.WriteError(w, "invalid credential secret", 500) + } return } - scheme := "http" - if r.TLS != nil { - scheme = "https" - } - consoleEndpoint := fmt.Sprintf("%s://%s", scheme, r.Host) - err := kv.AddValue("system", []byte("INFINI_CONSOLE_ENDPOINT"), []byte(consoleEndpoint)) - if err != nil { - log.Error(err) - } - success := false var errType string var fixTips string @@ -372,7 +385,6 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http } module.WriteJSON(w, result, code) }() - err, client, request := module.initTempClient(r) if err != nil { panic(err) @@ -380,6 +392,15 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http if request.CredentialSecret == "" { panic("invalid credential secret") } + scheme := "http" + if r.TLS != nil { + scheme = "https" + } + consoleEndpoint := fmt.Sprintf("%s://%s", scheme, r.Host) + err = kv.AddValue("system", []byte("INFINI_CONSOLE_ENDPOINT"), []byte(consoleEndpoint)) + if err != nil { + log.Error(err) + } if cfg1.IndexPrefix == "" { cfg1.IndexPrefix = ".infini_" @@ -589,5 +610,4 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http } success = true - } From 5679f3d42659fbb47750a8965fd98808c01ef2be Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 12 Oct 2023 15:47:45 +0800 Subject: [PATCH 18/20] refactor search instance api to support sort and wildcard sarch --- plugin/api/gateway/instance.go | 45 ++++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/plugin/api/gateway/instance.go b/plugin/api/gateway/instance.go index e07ccca2..a0c97c7b 100644 --- a/plugin/api/gateway/instance.go +++ b/plugin/api/gateway/instance.go @@ -208,16 +208,17 @@ func (h *GatewayAPI) deleteInstance(w http.ResponseWriter, req *http.Request, ps } func (h *GatewayAPI) searchInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var ( - keyword = h.GetParameterOrDefault(req, "keyword", "") - queryDSL = `{"query":{"bool":{"must":[%s]}}, "size": %d, "from": %d}` + keyword = h.GetParameterOrDefault(req, "keyword", "") strSize = h.GetParameterOrDefault(req, "size", "20") strFrom = h.GetParameterOrDefault(req, "from", "0") - mustBuilder = &strings.Builder{} + sort = h.GetParameterOrDefault(req, "sort", "created:desc") ) + mustQ := []interface{}{} if keyword != "" { - mustBuilder.WriteString(fmt.Sprintf(`{"query_string":{"default_field":"*","query": "%s"}}`, keyword)) + mustQ = append(mustQ, util.MapStr{ + "query_string": util.MapStr{"default_field":"*","query": fmt.Sprintf("*%s*", keyword)}, + }) } size, _ := strconv.Atoi(strSize) if size <= 0 { @@ -227,10 +228,38 @@ func (h *GatewayAPI) searchInstance(w http.ResponseWriter, req *http.Request, ps if from < 0 { from = 0 } + var ( + sortField string + sortDirection string + ) + sortParts := strings.Split(sort, ":") + sortField = sortParts[0] + if len(sortParts) >= 2 { + sortDirection = sortParts[1] + } + if sortDirection == "" { + sortDirection = "asc" + } + query := util.MapStr{ + "size": size, + "from": from, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": mustQ, + }, + }, + "sort": []util.MapStr{ + { + sortField: util.MapStr{ + "order": sortDirection, + }, + }, + }, + } - q := orm.Query{} - queryDSL = fmt.Sprintf(queryDSL, mustBuilder.String(), size, from) - q.RawQuery = []byte(queryDSL) + q := orm.Query{ + RawQuery: util.MustToJSONBytes(query), + } err, res := orm.Search(&model.Instance{}, &q) if err != nil { From 3dc47e3b9a238a0253b8d6c2d16dae394516d66c Mon Sep 17 00:00:00 2001 From: liugq Date: Thu, 12 Oct 2023 15:55:36 +0800 Subject: [PATCH 19/20] remove api /_platform/nodes, reuse search instance api --- plugin/api/gateway/api.go | 5 +- plugin/api/gateway/instance.go | 96 ---------------------------------- 2 files changed, 2 insertions(+), 99 deletions(-) diff --git a/plugin/api/gateway/api.go b/plugin/api/gateway/api.go index 6ef82a9e..9ad5b75e 100644 --- a/plugin/api/gateway/api.go +++ b/plugin/api/gateway/api.go @@ -6,12 +6,12 @@ package gateway import ( "crypto/tls" + log "github.com/cihub/seelog" "infini.sh/framework/core/api" "infini.sh/framework/core/api/rbac/enum" "net" "net/http" "net/url" - log "github.com/cihub/seelog" "time" ) @@ -30,8 +30,7 @@ func InitAPI() { api.HandleAPIMethod(api.POST, "/gateway/instance/status", gateway.RequirePermission(gateway.getInstanceStatus, enum.PermissionGatewayInstanceRead)) api.HandleAPIMethod(api.POST, "/gateway/instance/:instance_id/_proxy", gateway.RequirePermission(gateway.proxy, enum.PermissionGatewayInstanceRead)) - - api.HandleAPIMethod(api.GET, "/_platform/nodes", gateway.getExecutionNodes) + api.HandleAPIFunc("/ws_proxy", func(w http.ResponseWriter, req *http.Request) { log.Debug(req.RequestURI) endpoint := req.URL.Query().Get("endpoint") diff --git a/plugin/api/gateway/instance.go b/plugin/api/gateway/instance.go index a0c97c7b..8dee19e0 100644 --- a/plugin/api/gateway/instance.go +++ b/plugin/api/gateway/instance.go @@ -11,17 +11,14 @@ import ( "infini.sh/console/model" "infini.sh/framework/core/agent" httprouter "infini.sh/framework/core/api/router" - elastic2 "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" "infini.sh/framework/core/proxy" "infini.sh/framework/core/task" "infini.sh/framework/core/util" "infini.sh/framework/modules/elastic" "net/http" - "net/url" "strconv" "strings" - "time" ) func (h *GatewayAPI) createInstance(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -429,96 +426,3 @@ func (h *GatewayAPI) tryConnect(w http.ResponseWriter, req *http.Request, ps htt } h.WriteJSON(w, connectRes, http.StatusOK) } - -func (h *GatewayAPI) getExecutionNodes(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ - var ( - keyword = h.GetParameterOrDefault(req, "keyword", "") - strSize = h.GetParameterOrDefault(req, "size", "10") - strFrom = h.GetParameterOrDefault(req, "from", "0") - ) - size, _ := strconv.Atoi(strSize) - if size <= 0 { - size = 10 - } - from, _ := strconv.Atoi(strFrom) - if from < 0 { - from = 0 - } - gatewayIndexName := orm.GetIndexName(model.Instance{}) - - query := util.MapStr{ - "size": size, - "from": from, - "sort": []util.MapStr{ - { - "created": util.MapStr{ - "order": "desc", - }, - }, - }, - } - if keyword != "" { - query["query"] = util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "prefix": util.MapStr{ - "name": util.MapStr{ - "value": keyword, - }, - }, - }, - }, - }, - } - } - q := orm.Query{ - IndexName: gatewayIndexName, - RawQuery: util.MustToJSONBytes(query), - } - err, result := orm.Search(nil, &q) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - searchRes := elastic2.SearchResponse{} - err = util.FromJSONBytes(result.Raw, &searchRes) - if err != nil||searchRes.ESError!=nil { - msg:=fmt.Sprintf("%v,%v",err,searchRes.ESError) - h.WriteError(w, msg, http.StatusInternalServerError) - return - } - var nodes = []util.MapStr{} - - for _, hit := range searchRes.Hits.Hits { - buf := util.MustToJSONBytes(hit.Source) - inst := model.Instance{} - err = util.FromJSONBytes(buf, &inst) - if err != nil { - log.Error(err) - continue - } - node := util.MapStr{ - "id": inst.ID, - "name": inst.Name, - "available": false, - "type": "gateway", - } - ul, err := url.Parse(inst.Endpoint) - if err != nil { - log.Error(err) - continue - } - node["host"] = ul.Host - err = inst.TryConnectWithTimeout(time.Second) - if err != nil { - log.Error(err) - }else{ - node["available"] = true - } - - nodes = append(nodes, node) - } - h.WriteJSON(w, nodes, http.StatusOK) -} From 7b59e0dc3ef81d582fb202e9d220d5cefb15ff44 Mon Sep 17 00:00:00 2001 From: hardy Date: Fri, 20 Oct 2023 13:55:02 +0800 Subject: [PATCH 20/20] change domain --- plugin/api/license/api.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/plugin/api/license/api.go b/plugin/api/license/api.go index fc9e9eb1..5225df20 100644 --- a/plugin/api/license/api.go +++ b/plugin/api/license/api.go @@ -5,16 +5,18 @@ package license import ( + "net/http" + "infini.sh/framework/core/api" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/util" "infini.sh/license" - "net/http" ) type LicenseAPI struct { api.Handler } + func InitAPI() { handler := LicenseAPI{} api.HandleAPIMethod(api.POST, "/_license/request_trial", handler.RequestTrialLicense) @@ -28,36 +30,36 @@ func (handler *LicenseAPI) RequestTrialLicense(w http.ResponseWriter, req *http. } v := license.TrialRequest{} - err=util.FromJSONBytes(body, &v) + err = util.FromJSONBytes(body, &v) if err != nil { handler.Error500(w, err.Error()) return } //TODO implement config for the api endpoint - request:=util.NewPostRequest("https://api.infini.sh/_license/request_trial", util.MustToJSONBytes(v)) - response,err:=util.ExecuteRequest(request) - if err!=nil{ - handler.WriteError(w,err.Error(),response.StatusCode) + request := util.NewPostRequest("https://api.infini.cloud/_license/request_trial", util.MustToJSONBytes(v)) + response, err := util.ExecuteRequest(request) + if err != nil { + handler.WriteError(w, err.Error(), response.StatusCode) return } - r:=license.TrialResponse{} - err=util.FromJSONBytes(response.Body, &r) + r := license.TrialResponse{} + err = util.FromJSONBytes(response.Body, &r) if err != nil { handler.Error500(w, err.Error()) return } - if r.License!=""{ + if r.License != "" { ok := license.ApplyLicense(r.License) if ok { license.PersistLicense(r.License) - }else{ - r.License="" + } else { + r.License = "" } } w.WriteHeader(response.StatusCode) w.Write(util.MustToJSONBytes(r)) -} \ No newline at end of file +}