diff --git a/Jenkinsfile-linux b/Jenkinsfile-linux new file mode 100644 index 00000000..fbb68fc1 --- /dev/null +++ b/Jenkinsfile-linux @@ -0,0 +1,51 @@ +pipeline { + agent none + + environment { + CI = 'true' + } + + stages { + + + stage('Prepare Web Packages') { + + agent { + label 'linux' + } + + steps { + catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE'){ + sh 'cd /home/jenkins/go/src/infini.sh/console && git stash && git pull origin master && make clean' + sh 'cd /home/jenkins/go/src/infini.sh/console/ && true || rm -rif web' + sh 'cd /home/jenkins/go/src/infini.sh/console/ && true || git clone ssh://git@git.infini.ltd:64221/infini/console-ui.git web' + sh 'cd /home/jenkins/go/src/infini.sh/console/web && git pull origin master' + sh 'cd /home/jenkins/go/src/infini.sh/console/web/src && true || git clone ssh://git@git.infini.ltd:64221/infini/common-ui.git common' + sh 'cd /home/jenkins/go/src/infini.sh/console/web/src/common && git pull origin master' + sh 'cd /home/jenkins/go/src/infini.sh/console/web && git stash' + sh 'cd /home/jenkins/go/src/infini.sh/console/web && cnpm install' + sh 'cd /home/jenkins/go/src/infini.sh/console/web && cnpm run build' + + sh 'cd /home/jenkins/go/src/infini.sh/console && git pull origin master && make config build-linux-amd64' + sh label: 'copy-license', script: 'cd /home/jenkins/go/src/infini.sh/console && cp ../framework/LICENSE bin && cat ../framework/NOTICE NOTICE > bin/NOTICE' + sh label: 'copy-configs', script: 'cd /home/jenkins/go/src/infini.sh/console && mkdir -p bin/config && cp config/*.json bin/config && cp config/*.tpl bin/config' + sh label: 'copy-certs', script: 'cd /home/jenkins/go/src/infini.sh/console && cp -rf config/certs bin/config' + sh label: 'package-linux-amd64', script: 'cd /home/jenkins/go/src/infini.sh/console/bin && tar cfz ${WORKSPACE}/console-$VERSION-$BUILD_NUMBER-linux-amd64.tar.gz console-linux-amd64 console.yml LICENSE NOTICE config' + + sh 'cd /home/jenkins/go/src/infini.sh/console && git pull origin master && make config build-arm' + sh label: 'copy-license', script: 'cd /home/jenkins/go/src/infini.sh/console && cp ../framework/LICENSE bin && cat ../framework/NOTICE NOTICE > bin/NOTICE' + sh label: 'copy-configs', script: 'cd /home/jenkins/go/src/infini.sh/console && mkdir -p bin/config && cp config/*.json bin/config && cp config/*.tpl bin/config' + sh label: 'copy-certs', script: 'cd /home/jenkins/go/src/infini.sh/console && cp -rf config/certs bin/config' + sh label: 'package-linux-arm64', script: 'cd /home/jenkins/go/src/infini.sh/console/bin && tar cfz ${WORKSPACE}/console-$VERSION-$BUILD_NUMBER-linux-arm64.tar.gz console-linux-arm64 console.yml LICENSE NOTICE config' + + archiveArtifacts artifacts: 'console-$VERSION-$BUILD_NUMBER-*.*', fingerprint: true, followSymlinks: true, onlyIfSuccessful: false + } + } + } + + + + +} + +} diff --git a/build-web.sh b/build-web.sh index 20077ee7..cdb7d37c 100644 --- a/build-web.sh +++ b/build-web.sh @@ -19,11 +19,13 @@ fi cd $WORKBASE/web git pull origin master -cd $WORKBASE/web/common +cd $WORKBASE/web/src/common git pull origin master git log --pretty=oneline -5 +cd $WORKBASE/web + #--quiet cnpm install --quiet --no-progress diff --git a/config/initialization.tpl b/config/initialization.tpl index 3e7c29bb..a9eec148 100644 --- a/config/initialization.tpl +++ b/config/initialization.tpl @@ -2547,6 +2547,251 @@ POST $[[INDEX_PREFIX]]widget/_doc/cji1ttq8go5i051pl1t0 ] } } +PUT $[[INDEX_PREFIX]]widget/_doc/cji1sc28go5i051pl1i1 +{ + "id": "cji1sc28go5i051pl1i1", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Source Cluster Query QPS", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Total Query", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.query_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + + +PUT $[[INDEX_PREFIX]]widget/_doc/cji1sc28go5i051pl1i2 +{ + "id": "cji1sc28go5i051pl1i2", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Source Cluster Search Latency", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/b", + "name": "Query Latency", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.query_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.total.search.query_total", + "name": "b", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + }, + { + "metric": { + "formula": "a/b", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.scroll_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.total.search.scroll_total", + "name": "b", + "statistic": "derivative" + } + ], + "name": "Scroll Latency", + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + +PUT $[[INDEX_PREFIX]]widget/_doc/cji1sc28go5i051pl1i3 +{ + "id": "cji1sc28go5i051pl1i3", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Target Cluster Index QPS", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Total Indexing", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.indexing.index_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + }, + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Primary Indexing", + "items": [ + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + +PUT $[[INDEX_PREFIX]]widget/_doc/cji1sc28go5i051pl1i4 +{ + "id": "cji1sc28go5i051pl1i4", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Target Cluster Index Latency", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/b", + "name": "Indexing Latency", + "items": [ + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_total", + "name": "b", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} #The `id` value is consistent with the `_id` value POST $[[INDEX_PREFIX]]layout/_doc/cg2qqh28go5jqa6vvk70 diff --git a/config/initialization_v5.tpl b/config/initialization_v5.tpl index 9aeb3319..5b2bee11 100644 --- a/config/initialization_v5.tpl +++ b/config/initialization_v5.tpl @@ -2475,6 +2475,251 @@ POST $[[INDEX_PREFIX]]widget/doc/cji1ttq8go5i051pl1t0 ] } } +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i1 +{ + "id": "cji1sc28go5i051pl1i1", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Source Cluster Query QPS", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Total Query", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.query_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + + +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i2 +{ + "id": "cji1sc28go5i051pl1i2", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Source Cluster Search Latency", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/b", + "name": "Query Latency", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.query_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.total.search.query_total", + "name": "b", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + }, + { + "metric": { + "formula": "a/b", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.scroll_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.total.search.scroll_total", + "name": "b", + "statistic": "derivative" + } + ], + "name": "Scroll Latency", + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i3 +{ + "id": "cji1sc28go5i051pl1i3", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Target Cluster Index QPS", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Total Indexing", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.indexing.index_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + }, + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Primary Indexing", + "items": [ + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i4 +{ + "id": "cji1sc28go5i051pl1i4", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Target Cluster Index Latency", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/b", + "name": "Indexing Latency", + "items": [ + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_total", + "name": "b", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} #The `id` value is consistent with the `_id` value POST $[[INDEX_PREFIX]]layout/doc/cg2qqh28go5jqa6vvk70 diff --git a/config/initialization_v6.tpl b/config/initialization_v6.tpl index 62c8d96b..2e3a1402 100644 --- a/config/initialization_v6.tpl +++ b/config/initialization_v6.tpl @@ -2551,6 +2551,251 @@ POST $[[INDEX_PREFIX]]widget/doc/cji1ttq8go5i051pl1t0 } } +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i1 +{ + "id": "cji1sc28go5i051pl1i1", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Source Cluster Query QPS", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Total Query", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.query_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + + +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i2 +{ + "id": "cji1sc28go5i051pl1i2", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Source Cluster Search Latency", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/b", + "name": "Query Latency", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.query_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.total.search.query_total", + "name": "b", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + }, + { + "metric": { + "formula": "a/b", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.search.scroll_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.total.search.scroll_total", + "name": "b", + "statistic": "derivative" + } + ], + "name": "Scroll Latency", + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i3 +{ + "id": "cji1sc28go5i051pl1i3", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Target Cluster Index QPS", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Total Indexing", + "items": [ + { + "field": "payload.elasticsearch.index_stats.total.indexing.index_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + }, + { + "metric": { + "formula": "a/{{.bucket_size_in_second}}", + "name": "Primary Indexing", + "items": [ + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_total", + "name": "a", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} +PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i4 +{ + "id": "cji1sc28go5i051pl1i4", + "created": "2023-09-20T10:32:16.8356774+08:00", + "updated": "2023-08-20T10:32:16.8356774+08:00", + "title": "Target Cluster Index Latency", + "config": { + "bucket_size": "auto", + "format": { + "pattern": "0.00", + "type": "number" + }, + "series": [ + { + "metric": { + "formula": "a/b", + "name": "Indexing Latency", + "items": [ + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_time_in_millis", + "name": "a", + "statistic": "derivative" + }, + { + "field": "payload.elasticsearch.index_stats.primaries.indexing.index_total", + "name": "b", + "statistic": "derivative" + } + ], + "sort": [ + { + "direction": "desc", + "key": "_count" + } + ] + }, + "queries": { + "cluster_id": "infini_default_system_cluster", + "indices": [ + ".infini_metrics" + ], + "time_field": "timestamp" + }, + "type": "line" + } + ] + } +} + #The `id` value is consistent with the `_id` value POST $[[INDEX_PREFIX]]layout/doc/cg2qqh28go5jqa6vvk70 { diff --git a/config/permission.json b/config/permission.json index 63426b91..1a7cf336 100644 --- a/config/permission.json +++ b/config/permission.json @@ -293,9 +293,15 @@ {"name": "doc.update", "methods": ["put"], "path": "/:index_name/:doctype/:doc_id" }, + {"name": "doc.update", "methods": ["post"], + "path": "/:index_name/_update/:doc_id" + }, {"name": "doc.create", "methods": ["post"], "path": "/:index_name/:doctype" }, + {"name": "doc.create", "methods": ["post", "put"], + "path": "/:index_name/_create/:doc_id" + }, {"name": "doc.delete", "methods": ["delete"], "path": "/:index_name/:doctype/:doc_id" @@ -303,6 +309,9 @@ {"name": "doc.get", "methods": ["get"], "path": "/:index_name/:doctype/:doc_id" }, + {"name": "doc.get", "methods": ["get"], + "path": "/:index_name/_source/:doc_id" + }, {"name": "doc.exists", "methods": ["head"], "path": "/:index_name/:doctype/:doc_id" }, diff --git a/plugin/api/insight/metadata.go b/plugin/api/insight/metadata.go index 8b4c27c1..013eaa57 100644 --- a/plugin/api/insight/metadata.go +++ b/plugin/api/insight/metadata.go @@ -5,8 +5,10 @@ package insight import ( + "bytes" "github.com/Knetic/govaluate" log "github.com/cihub/seelog" + "text/template" "infini.sh/console/model/insight" httprouter "infini.sh/framework/core/api/router" "infini.sh/framework/core/elastic" @@ -224,6 +226,24 @@ func getMetricData(metric *insight.Metric) (interface{}, error) { if len(metric.Items) == 1 && formula == "" { targetMetricData = metricData }else { + tpl, err := template.New("insight_formula").Parse(formula) + if err != nil { + return nil, err + } + msgBuffer := &bytes.Buffer{} + params := map[string]interface{}{} + if metric.BucketSize != "" { + du, err := util.ParseDuration(metric.BucketSize) + if err != nil { + return nil, err + } + params["bucket_size_in_second"] = du.Seconds() + } + err = tpl.Execute(msgBuffer, params) + if err != nil { + return nil, err + } + formula = msgBuffer.String() for _, md := range metricData { targetData := insight.MetricData{ Groups: md.Groups, diff --git a/plugin/setup/setup.go b/plugin/setup/setup.go index 565f4ecc..fde8abd7 100644 --- a/plugin/setup/setup.go +++ b/plugin/setup/setup.go @@ -322,19 +322,32 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API, Setup func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps httprouter.Params) { if !global.Env().SetupRequired() { - module.WriteError(w, "setup not permitted", 500) + //handle setup timeout + rkey, err := keystore.GetValue(credential.SecretKey) + if err != nil { + module.WriteError(w, err.Error(), 500) + return + } + request := SetupRequest{} + err = module.DecodeJSON(r, &request) + if err != nil { + module.WriteError(w, err.Error(), 500) + return + } + h := md5.New() + rawSecret := []byte(request.CredentialSecret) + h.Write(rawSecret) + secret := make([]byte, 32) + hex.Encode(secret, h.Sum(nil)) + if bytes.Compare(rkey, secret) == 0 { + module.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) + }else{ + module.WriteError(w, "invalid credential secret", 500) + } return } - scheme := "http" - if r.TLS != nil { - scheme = "https" - } - consoleEndpoint := fmt.Sprintf("%s://%s", scheme, r.Host) - err := kv.AddValue("system", []byte("INFINI_CONSOLE_ENDPOINT"), []byte(consoleEndpoint)) - if err != nil { - log.Error(err) - } - success := false var errType string var fixTips string @@ -373,7 +386,6 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http } module.WriteJSON(w, result, code) }() - err, client, request := module.initTempClient(r) if err != nil { panic(err) @@ -381,6 +393,15 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http if request.CredentialSecret == "" { panic("invalid credential secret") } + scheme := "http" + if r.TLS != nil { + scheme = "https" + } + consoleEndpoint := fmt.Sprintf("%s://%s", scheme, r.Host) + err = kv.AddValue("system", []byte("INFINI_CONSOLE_ENDPOINT"), []byte(consoleEndpoint)) + if err != nil { + log.Error(err) + } if cfg1.IndexPrefix == "" { cfg1.IndexPrefix = ".infini_" @@ -586,5 +607,4 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http } success = true - } diff --git a/plugin/task_manager/api.go b/plugin/task_manager/api.go index e4f4cace..2279250a 100644 --- a/plugin/task_manager/api.go +++ b/plugin/task_manager/api.go @@ -20,6 +20,8 @@ func InitAPI() { api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionMigrationTaskWrite)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/logging/:index", handler.RequirePermission(handler.searchIndexLevelTaskLogging, enum.PermissionMigrationTaskRead)) + api.HandleAPIMethod(api.GET, "/migration/data/_search_values", handler.RequirePermission(handler.searchTaskFieldValues("cluster_migration"), enum.PermissionMigrationTaskRead)) api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead)) api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite)) diff --git a/plugin/task_manager/cluster_comparison/cluster_comparison.go b/plugin/task_manager/cluster_comparison/cluster_comparison.go index ef7ac7ea..982419f2 100644 --- a/plugin/task_manager/cluster_comparison/cluster_comparison.go +++ b/plugin/task_manager/cluster_comparison/cluster_comparison.go @@ -56,10 +56,12 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error { return nil } taskItem.RetryTimes++ - if taskItem.StartTimeInMillis == 0 { - taskItem.StartTimeInMillis = time.Now().UnixMilli() - } + taskItem.StartTimeInMillis = time.Now().UnixMilli() taskItem.Status = task.StatusRunning + taskItem.Metadata.Labels["total_diff_docs"] = 0 + taskItem.Metadata.Labels["only_in_source"] = 0 + taskItem.Metadata.Labels["only_in_target"] = 0 + taskItem.Metadata.Labels["diff_both"] = 0 p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, fmt.Sprintf("cluster comparison task [%s] started", taskItem.ID)) diff --git a/plugin/task_manager/cluster_comparison/orm.go b/plugin/task_manager/cluster_comparison/orm.go index f3d37c57..5682a61d 100644 --- a/plugin/task_manager/cluster_comparison/orm.go +++ b/plugin/task_manager/cluster_comparison/orm.go @@ -105,6 +105,7 @@ func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rba "source_total_docs": sourceTotalDocs, "target_total_docs": targetTotalDocs, "permit_nodes": config.Settings.Execution.Nodes.Permit, + "name": config.Name, }, }, Cancellable: true, @@ -112,6 +113,9 @@ func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rba Status: task.StatusInit, ConfigString: util.MustToJSON(config), } + if len(config.Tags) > 0 { + t.Metadata.Labels["tags"] = config.Tags + } t.ID = util.GetUUID() return &t, nil } diff --git a/plugin/task_manager/cluster_migration/cluster_migration.go b/plugin/task_manager/cluster_migration/cluster_migration.go index c3f57291..4424c0cd 100644 --- a/plugin/task_manager/cluster_migration/cluster_migration.go +++ b/plugin/task_manager/cluster_migration/cluster_migration.go @@ -130,6 +130,13 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { } if index.Incremental != nil { incrementalFilter, err := index.Incremental.BuildFilter(current, step) + if source.Step == nil { + source.Step = step.String() + source.End = float64(current - index.Incremental.Delay.Milliseconds()) + if !index.Incremental.Full { + source.Start = source.End - float64(step.Milliseconds()) + } + } if err != nil { return err } diff --git a/plugin/task_manager/cluster_migration/orm.go b/plugin/task_manager/cluster_migration/orm.go index 29b7464d..ca929772 100644 --- a/plugin/task_manager/cluster_migration/orm.go +++ b/plugin/task_manager/cluster_migration/orm.go @@ -78,7 +78,6 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac config.Cluster.Source.Distribution = srcClusterCfg.Distribution dstClusterCfg := elastic.GetConfig(config.Cluster.Target.Id) config.Cluster.Target.Distribution = dstClusterCfg.Distribution - clearTaskConfig(config) var totalDocs int64 @@ -102,6 +101,7 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac "target_cluster_id": config.Cluster.Target.Id, "source_total_docs": totalDocs, "permit_nodes": config.Settings.Execution.Nodes.Permit, + "name": config.Name, }, }, Cancellable: true, @@ -109,6 +109,9 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac Status: task.StatusInit, ConfigString: util.MustToJSON(config), } + if len(config.Tags) > 0 { + t.Metadata.Labels["tags"] = config.Tags + } t.ID = util.GetUUID() return &t, nil } diff --git a/plugin/task_manager/common_api.go b/plugin/task_manager/common_api.go index 087f6ce3..73cb9b26 100644 --- a/plugin/task_manager/common_api.go +++ b/plugin/task_manager/common_api.go @@ -4,6 +4,8 @@ import ( "errors" "fmt" model2 "infini.sh/console/model" + migration_model "infini.sh/console/plugin/task_manager/model" + "infini.sh/framework/core/global" "net/http" "strconv" "strings" @@ -31,6 +33,9 @@ type TaskInfoResponse struct { CompletedPartitions int `json:"completed_partitions"` Partitions []util.MapStr `json:"partitions"` Repeating bool `json:"repeating"` + Workers []util.MapStr `json:"workers"` + Incremental *migration_model.IndexIncremental `json:"incremental"` + NextRunTime int64 `json:"next_run_time"` } func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -105,6 +110,7 @@ func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + for _, hit := range searchRes.Hits.Hits { sourceM := util.MapStr(hit.Source) h.populateMajorTaskInfo(hit.ID, sourceM) @@ -122,6 +128,12 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { log.Errorf("failed to unmarshal major task info, err: %v", err) return } + _, repeatStatus, err := h.calcRepeatingStatus(&majorTask) + if err != nil { + log.Warnf("failed to calc repeat info, err: %v", err) + return + } + sourceM.Put("repeat", repeatStatus) switch majorTask.Metadata.Type { case "cluster_migration": ts, _, err := h.getMigrationMajorTaskInfo(taskID) @@ -138,8 +150,21 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { return } sourceM.Put("running_children", count) + if repeatStatus.IsRepeat && repeatStatus.LastRunChildTaskID != "" { + ts, _, err = h.getMigrationMajorTaskInfo(repeatStatus.LastRunChildTaskID) + if err != nil { + log.Warnf("fetch progress info of task error: %v", err) + return + } + sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs) + sourceM.Put("metadata.labels.source_total_docs", ts.SourceDocs) + } case "cluster_comparison": - ts, _, err := h.getComparisonMajorTaskInfo(taskID) + targetTaskId := taskID + if repeatStatus.IsRepeat && repeatStatus.LastRunChildTaskID != "" { + targetTaskId = repeatStatus.LastRunChildTaskID + } + ts, _, err := h.getComparisonMajorTaskInfo(targetTaskId) if err != nil { log.Warnf("fetch progress info of task error: %v", err) return @@ -156,12 +181,6 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) { } sourceM.Put("running_children", count) } - _, repeatStatus, err := h.calcRepeatingStatus(&majorTask) - if err != nil { - log.Warnf("failed to calc repeat info, err: %v", err) - return - } - sourceM.Put("repeat", repeatStatus) } func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -174,7 +193,7 @@ func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps http h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) return } - if obj.Metadata.Type != "pipeline" && obj.Status == task.StatusComplete { + if obj.Metadata.Type != "pipeline" && (obj.Status == task.StatusComplete && obj.Metadata.Type != "cluster_comparison") { h.WriteError(w, fmt.Sprintf("[%s] task [%s] completed, can't start anymore", obj.Metadata.Type, taskID), http.StatusInternalServerError) return } @@ -332,15 +351,104 @@ func (h *APIHandler) resumeTask(w http.ResponseWriter, req *http.Request, ps htt return } +// query index level task logging +func (h *APIHandler) searchIndexLevelTaskLogging(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("task_id") + uniqueIndexName := ps.MustGetParameter("index") + cfg := global.MustLookup("cluster_migration_config") + var ( + migrationConfig *DispatcherConfig + ok bool + ) + if migrationConfig, ok = cfg.(*DispatcherConfig); !ok { + h.WriteJSON(w, elastic.SearchResponse{}, http.StatusOK) + return + } + client := elastic.GetClient(migrationConfig.Elasticsearch) + var ( + strSize = h.GetParameterOrDefault(req, "size", "500") + min = h.GetParameterOrDefault(req, "min", "") + max = h.GetParameterOrDefault(req, "max", "") + ) + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 500 + } + rangeObj := util.MapStr{} + if min != "" { + rangeObj["gte"] = min + } + if max != "" { + rangeObj["lt"] = max + } + mustQ := []util.MapStr{ + { + "term": util.MapStr{ + "metadata.category": util.MapStr{ + "value": "task", + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.parent_task_id": util.MapStr{ + "value": id, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.unique_index_name": util.MapStr{ + "value": uniqueIndexName, + }, + }, + }, + } + if len(rangeObj) > 0 { + mustQ = append(mustQ, util.MapStr{ + "range": util.MapStr{ + "timestamp": rangeObj, + }, + }) + } + query := util.MapStr{ + "size": size, + "_source": []string{"payload.task.logging.message", "timestamp"}, + "sort": []util.MapStr{ + { + "timestamp": util.MapStr{ + "order": "desc", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": mustQ, + }, + }, + } + searchRes, err := client.SearchWithRawQueryDSL(migrationConfig.LogIndexName, util.MustToJSONBytes(query)) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + h.WriteJSON(w, searchRes, http.StatusOK) +} + type RepeatStatus struct { IsRepeat bool `json:"is_repeat"` Done bool `json:"done"` Repeating bool `json:"repeating"` + LastRunTime int64 `json:"last_run_time"` + NextRunTime int64 `json:"next_run_time"` + LastRunChildTaskID string `json:"last_run_child_task_id"` + LastCompleteTime int64 `json:"last_complete_time"` } func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *RepeatStatus, error) { ret := &RepeatStatus{} - lastRepeatingChild, err := migration_util.GetLastRepeatingChildTask(taskItem.ID, taskItem.Metadata.Type) + lastRepeatingChild, lastRunChild, err := migration_util.GetLastRepeatingChildTask(taskItem.ID, taskItem.Metadata.Type) if err != nil { return nil, nil, err } @@ -363,6 +471,17 @@ func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *Repe if !repeatTriggered { ret.Repeating = true } + ret.NextRunTime = migration_util.GetMapIntValue(lastRepeatingChild.Metadata.Labels, "next_run_time") + ret.LastRunTime = lastRepeatingChild.StartTimeInMillis + if ret.LastRunTime == 0 && lastRunChild != nil { + ret.LastRunTime = lastRunChild.StartTimeInMillis + if lastRunChild.CompletedTime != nil && !lastRunChild.CompletedTime.IsZero(){ + ret.LastCompleteTime = lastRunChild.CompletedTime.UnixMilli() + } + } + if lastRunChild != nil { + ret.LastRunChildTaskID = lastRunChild.ID + } return lastRepeatingChild, ret, nil } @@ -615,3 +734,71 @@ func (h *APIHandler) calcMajorTaskInfo(subTasks []task.Task, repeating bool) (st return } + +func (h *APIHandler) searchTaskFieldValues(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ( + field = h.GetParameterOrDefault(req, "field", "") + keyword = h.GetParameterOrDefault(req, "keyword", "") + mustQ []interface{} + ) + mustQ = append(mustQ, util.MapStr{ + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": taskType, + }, + }, + }) + + if v := strings.TrimSpace(keyword); v != ""{ + mustQ = append(mustQ, util.MapStr{ + "query_string": util.MapStr{ + "default_field": field, + "query": fmt.Sprintf("*%s*", v), + }, + }) + } + queryDSL := util.MapStr{ + "aggs": util.MapStr{ + "items": util.MapStr{ + "terms": util.MapStr{ + "field": field, + "size": 20, + }, + }, + }, + "size": 0, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": mustQ, + }, + }, + } + q := orm.Query{ + RawQuery: util.MustToJSONBytes(queryDSL), + } + err, result := orm.Search(task.Task{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + searchRes := elastic.SearchResponse{} + err = util.FromJSONBytes(result.Raw, &searchRes) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + items := []string{} + for _, bk := range searchRes.Aggregations["items"].Buckets { + if v, ok := bk["key"].(string); ok { + if strings.Contains(v, keyword){ + items = append(items, v) + } + } + } + h.WriteJSON(w, items, http.StatusOK) + } +} + diff --git a/plugin/task_manager/comparison_api.go b/plugin/task_manager/comparison_api.go index 439aa0c6..a4382f0f 100644 --- a/plugin/task_manager/comparison_api.go +++ b/plugin/task_manager/comparison_api.go @@ -74,7 +74,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. var completedIndices int for i, index := range taskConfig.Indices { indexName := index.Source.GetUniqueIndexName() - count := indexState[indexName].SourceScrollDocs + indexState[indexName].TargetScrollDocs + count := indexState[indexName].TotalScrollDocs percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100 if percent > 100 { percent = 100 @@ -82,7 +82,9 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http. taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceTotalDocs taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2) + taskConfig.Indices[i].TotalScrollDocs = count taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions + taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren if count == index.Source.Docs+index.Target.Docs { completedIndices++ } @@ -115,6 +117,7 @@ type ClusterComparisonTaskState struct { TargetTotalDocs int64 TargetScrollDocs int64 TotalDiffDocs int64 + RunningChildren int } type ComparisonIndexStateInfo struct { @@ -124,73 +127,140 @@ type ComparisonIndexStateInfo struct { TargetTotalDocs int64 TargetScrollDocs int64 TotalDiffDocs int64 + RunningChildren int + TotalScrollDocs int64 } -// TODO: calc realtime info from instance func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) { + var pipelineTaskIDs = map[string][]string{} + var pipelineIndexNames = map[string]string{} indexState = map[string]ComparisonIndexStateInfo{} - - taskQuery := util.MapStr{ - "size": 500, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskID, + const size = 500 + var ( + from = -size + hasMore = true + ) + for hasMore { + from += size + taskQuery := util.MapStr{ + "size": 500, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskID, + }, }, }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_comparison", + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_comparison", + }, }, }, }, }, }, - }, - } - subTasks, err := migration_util.GetTasks(taskQuery) - if err != nil { - return taskStats, indexState, err - } - - for _, subTask := range subTasks { - taskLabels := util.MapStr(subTask.Metadata.Labels) - indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") - if indexName == "" { - continue } - - cfg := migration_model.IndexComparisonTaskConfig{} - err = migration_util.GetTaskConfig(&subTask, &cfg) + subTasks, err := migration_util.GetTasks(taskQuery) if err != nil { - log.Errorf("failed to get task config, err: %v", err) + return taskStats, indexState, err + } + if len(subTasks) < size { + hasMore = false + } + + var indexMigrationTaskIDs []string + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + cfg := migration_model.IndexComparisonTaskConfig{} + err = migration_util.GetTaskConfig(&subTask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + continue + } + totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs") + taskStats.SourceTotalDocs += cfg.Source.DocCount + taskStats.TargetTotalDocs += cfg.Target.DocCount + taskStats.TotalDiffDocs += totalDiffDocs + st := indexState[indexName] + st.SourceTotalDocs += cfg.Source.DocCount + st.TargetTotalDocs += cfg.Target.DocCount + st.TotalDiffDocs += totalDiffDocs + if subTask.Status == task.StatusError { + st.ErrorPartitions += 1 + } + if subTask.Status == task.StatusRunning { + st.RunningChildren++ + indexState[indexName] = st + indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) + continue + } + sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled") + targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") + st.SourceScrollDocs += sourceDocs + st.TargetScrollDocs += targetDocs + st.TotalScrollDocs += sourceDocs + targetDocs + taskStats.TargetScrollDocs += targetDocs + taskStats.SourceScrollDocs += sourceDocs + indexState[indexName] = st + } + + if len(indexMigrationTaskIDs) == 0 { continue } - sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled") - targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled") - totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs") - taskStats.SourceTotalDocs += cfg.Source.DocCount - taskStats.SourceScrollDocs += sourceDocs - taskStats.TargetTotalDocs += cfg.Target.DocCount - taskStats.TargetScrollDocs += targetDocs - taskStats.TotalDiffDocs += totalDiffDocs - st := indexState[indexName] - st.SourceTotalDocs += cfg.Source.DocCount - st.SourceScrollDocs += sourceDocs - st.TargetTotalDocs += cfg.Target.DocCount - st.TargetScrollDocs += targetDocs - st.TotalDiffDocs += totalDiffDocs - if subTask.Status == task.StatusError { - st.ErrorPartitions += 1 + + taskQuery = util.MapStr{ + "size": len(indexMigrationTaskIDs) * 2, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "terms": util.MapStr{ + "parent_id": indexMigrationTaskIDs, + }, + }, + }, + }, + }, } + subTasks, err = migration_util.GetTasks(taskQuery) + if err != nil { + return taskStats, indexState, err + } + + for _, subTask := range subTasks { + taskLabels := util.MapStr(subTask.Metadata.Labels) + indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name") + if indexName == "" { + continue + } + + pipelineIndexNames[subTask.ID] = indexName + + if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" { + pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID) + } + } + } + + pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs) + for pipelineID, pipelineContext := range pipelineContexts { + // add scrolledDocs of running tasks + scrollDocs := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs") + indexName := pipelineIndexNames[pipelineID] + st := indexState[indexName] + st.TotalScrollDocs += scrollDocs indexState[indexName] = st } - return taskStats, indexState, nil } @@ -231,6 +301,7 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating) var partitionTaskInfos []util.MapStr + var workers = map[string]struct{}{} for i, subTask := range subTasks { cfg := migration_model.IndexComparisonTaskConfig{} @@ -242,6 +313,10 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht if i == 0 { taskInfo.Step = cfg.Source.Step } + instID := migration_util.GetMapStringValue(subTask.Metadata.Labels, "execution_instance_id") + if instID != "" { + workers[instID] = struct{}{} + } var durationInMS int64 var subCompletedTime int64 @@ -256,6 +331,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht subTaskLabels := util.MapStr(subTask.Metadata.Labels) sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled") targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled") + onlyInSource := migration_util.GetMapIntValue(subTaskLabels, "only_in_source") + onlyInTarget := migration_util.GetMapIntValue(subTaskLabels, "only_in_target") + diffBoth := migration_util.GetMapIntValue(subTaskLabels, "diff_both") partitionTaskInfo := util.MapStr{ "task_id": subTask.ID, @@ -267,6 +345,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht "duration": durationInMS, "source_total_docs": cfg.Source.DocCount, "target_total_docs": cfg.Target.DocCount, + "only_in_source": onlyInSource, + "only_in_target": onlyInTarget, + "diff_both": diffBoth, } sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg) if sourceDumpTask != nil { @@ -305,6 +386,14 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht if taskInfo.StartTime == 0 { taskInfo.StartTime = startTime } + for _, node := range taskConfig.Settings.Execution.Nodes.Permit { + if _, ok := workers[node.ID]; ok { + taskInfo.Workers = append(taskInfo.Workers, util.MapStr{ + "id": node.ID, + "name": node.Name, + }) + } + } taskInfo.Partitions = partitionTaskInfos taskInfo.CompletedPartitions = completedPartitions h.WriteJSON(w, taskInfo, http.StatusOK) diff --git a/plugin/task_manager/index_comparison/index_comparison.go b/plugin/task_manager/index_comparison/index_comparison.go index c14d902a..cdcb1bc5 100644 --- a/plugin/task_manager/index_comparison/index_comparison.go +++ b/plugin/task_manager/index_comparison/index_comparison.go @@ -362,6 +362,9 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 { now := time.Now() taskItem.Metadata.Labels["total_diff_docs"] = onlyInSource + onlyInTarget + diffBoth + taskItem.Metadata.Labels["only_in_source"] = onlyInSource + taskItem.Metadata.Labels["only_in_target"] = onlyInTarget + taskItem.Metadata.Labels["diff_both"] = diffBoth taskItem.CompletedTime = &now taskItem.Status = task.StatusError p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ @@ -390,6 +393,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error { now := time.Now() taskItem.CompletedTime = &now taskItem.Status = task.StatusComplete + taskItem.Metadata.Labels["total_diff_docs"] = 0 p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, }, "index comparison completed") diff --git a/plugin/task_manager/migration_api.go b/plugin/task_manager/migration_api.go index 0c702365..cfe63465 100644 --- a/plugin/task_manager/migration_api.go +++ b/plugin/task_manager/migration_api.go @@ -3,6 +3,7 @@ package task_manager import ( "fmt" "net/http" + "strings" "time" log "github.com/cihub/seelog" @@ -78,18 +79,26 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R count := indexState[indexName].IndexDocs sourceDocs := index.Source.Docs var percent float64 + var exportedPercent float64 if sourceDocs <= 0 { percent = 100 + exportedPercent = 100 }else{ percent = float64(count) / float64(sourceDocs) * 100 if percent > 100 { percent = 100 } + exportedPercent = float64(indexState[indexName].ScrollDocs)/float64(sourceDocs) * 100 + if exportedPercent > 100 { + exportedPercent = 100 + } } //taskConfig.Indices[i].Source.Docs = sourceDocs taskConfig.Indices[i].Target.Docs = count taskConfig.Indices[i].Percent = util.ToFixed(percent, 2) taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions + taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren + taskConfig.Indices[i].ExportedPercent = util.ToFixed(exportedPercent, 2) if count == index.Source.Docs { completedIndices++ } @@ -141,6 +150,21 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt StartTime: majorTask.StartTimeInMillis, Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels), } + if taskInfo.Repeating { + _, repeatStatus, err := h.calcRepeatingStatus(&majorTask) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + taskInfo.NextRunTime = repeatStatus.NextRunTime + } + indexParts := strings.Split(uniqueIndexName, ":") + for _, index := range taskConfig.Indices { + if index.Source.Name == indexParts[0] { + taskInfo.Incremental = index.Incremental + } + } subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName) @@ -167,6 +191,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating) var partitionTaskInfos []util.MapStr + var workers = map[string]struct{}{} for i, ptask := range subTasks { cfg := migration_model.IndexMigrationTaskConfig{} @@ -178,7 +203,10 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt if i == 0 { taskInfo.Step = cfg.Source.Step } - + instID := migration_util.GetMapStringValue(ptask.Metadata.Labels, "execution_instance_id") + if instID != "" { + workers[instID] = struct{}{} + } var durationInMS int64 var subCompletedTime int64 if ptask.StartTimeInMillis > 0 { @@ -241,6 +269,14 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt } taskInfo.Partitions = partitionTaskInfos taskInfo.CompletedPartitions = completedPartitions + for _, node := range taskConfig.Settings.Execution.Nodes.Permit { + if _, ok := workers[node.ID]; ok { + taskInfo.Workers = append(taskInfo.Workers, util.MapStr{ + "id": node.ID, + "name": node.Name, + }) + } + } h.WriteJSON(w, taskInfo, http.StatusOK) } @@ -248,6 +284,8 @@ type MigrationIndexStateInfo struct { ErrorPartitions int IndexDocs int64 SourceDocs int64 + RunningChildren int + ScrollDocs int64 } /* @@ -324,9 +362,12 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m taskStats.SourceDocs += cfg.Source.DocCount st := indexState[indexName] st.SourceDocs += cfg.Source.DocCount - indexState[indexName] = st + scrollDocs := migration_util.GetMapIntValue(taskLabels, "scrolled_docs") + st.ScrollDocs += scrollDocs if subTask.Status == task.StatusRunning { + st.RunningChildren++ + indexState[indexName] = st indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID) continue } @@ -334,6 +375,7 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs") taskStats.IndexDocs += indexDocs st.IndexDocs += indexDocs + if subTask.Status == task.StatusError { st.ErrorPartitions += 1 taskStats.ErrorPartitions += 1 @@ -347,7 +389,7 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m } taskQuery = util.MapStr{ - "size": len(indexMigrationTaskIDs), + "size": len(indexMigrationTaskIDs) * 2, "query": util.MapStr{ "bool": util.MapStr{ "must": []util.MapStr{ @@ -356,13 +398,13 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m "parent_id": indexMigrationTaskIDs, }, }, - { - "term": util.MapStr{ - "metadata.labels.pipeline_id": util.MapStr{ - "value": "bulk_indexing", - }, - }, - }, + //{ + // "term": util.MapStr{ + // "metadata.labels.pipeline_id": util.MapStr{ + // "value": "bulk_indexing", + // }, + // }, + //}, }, }, }, @@ -391,10 +433,12 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m for pipelineID, pipelineContext := range pipelineContexts { // add indexDocs of running tasks indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count") + scrollDocs := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs") taskStats.IndexDocs += indexDocs indexName := pipelineIndexNames[pipelineID] st := indexState[indexName] st.IndexDocs += indexDocs + st.ScrollDocs += scrollDocs indexState[indexName] = st } return taskStats, indexState, nil diff --git a/plugin/task_manager/model/comparison.go b/plugin/task_manager/model/comparison.go index ba9d046a..b64d61ce 100644 --- a/plugin/task_manager/model/comparison.go +++ b/plugin/task_manager/model/comparison.go @@ -5,6 +5,8 @@ import ( ) type ClusterComparisonTaskConfig struct { + Name string `json:"name"` + Tags []string `json:"tags"` Cluster struct { Source ClusterInfo `json:"source"` Target ClusterInfo `json:"target"` @@ -29,8 +31,10 @@ type ClusterComparisonIndexConfig struct { Partition *IndexPartition `json:"partition,omitempty"` // only used in API - ScrollPercent float64 `json:"scroll_percent,omitempty"` + ScrollPercent float64 `json:"scroll_percent,omitempty"` + TotalScrollDocs int64 `json:"total_scroll_docs,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` + RunningChildren int `json:"running_children,omitempty"` } type IndexComparisonTaskConfig struct { diff --git a/plugin/task_manager/model/migration.go b/plugin/task_manager/model/migration.go index 5e011112..e8e9e1ab 100644 --- a/plugin/task_manager/model/migration.go +++ b/plugin/task_manager/model/migration.go @@ -9,6 +9,8 @@ import ( ) type ClusterMigrationTaskConfig struct { + Name string `json:"name"` + Tags []string `json:"tags"` Cluster struct { Source ClusterInfo `json:"source"` Target ClusterInfo `json:"target"` @@ -39,6 +41,8 @@ type ClusterMigrationIndexConfig struct { // only used in API Percent float64 `json:"percent,omitempty"` ErrorPartitions int `json:"error_partitions,omitempty"` + RunningChildren int `json:"running_children,omitempty"` + ExportedPercent float64 `json:"exported_percent,omitempty"` } type ClusterMigrationTaskState struct { diff --git a/plugin/task_manager/util/orm.go b/plugin/task_manager/util/orm.go index 55992057..faa65d00 100644 --- a/plugin/task_manager/util/orm.go +++ b/plugin/task_manager/util/orm.go @@ -38,9 +38,9 @@ func DeleteChildTasks(taskID string, taskType string) error { return nil } -func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, error) { +func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, *task.Task, error) { queryDsl := util.MapStr{ - "size": 1, + "size": 2, "sort": []util.MapStr{ { "metadata.labels.next_run_time": util.MapStr{ @@ -69,12 +69,21 @@ func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, erro } tasks, err := GetTasks(queryDsl) if err != nil { - return nil, err + return nil, nil, err } if len(tasks) == 0 { - return nil, nil + return nil, nil, nil } - return &tasks[0], nil + var lastRunChildTask *task.Task + if tasks[0].StartTimeInMillis > 0 { + lastRunChildTask = &tasks[0] + }else{ + if len(tasks) == 2 { + lastRunChildTask = &tasks[1] + } + } + + return &tasks[0], lastRunChildTask, nil } func GetPendingChildTasks(taskID string, taskType string) ([]task.Task, error) { @@ -233,7 +242,7 @@ func UpdateStoppedChildTasksToReady(taskItem *task.Task, taskType string) error }, { "terms": util.MapStr{ - "status": []string{task.StatusError, task.StatusStopped}, + "status": []string{task.StatusError, task.StatusStopped, task.StatusComplete}, }, }, {