Merge branch 'master' into v2
This commit is contained in:
commit
4c8ee4a3e2
|
@ -0,0 +1,51 @@
|
||||||
|
pipeline {
|
||||||
|
agent none
|
||||||
|
|
||||||
|
environment {
|
||||||
|
CI = 'true'
|
||||||
|
}
|
||||||
|
|
||||||
|
stages {
|
||||||
|
|
||||||
|
|
||||||
|
stage('Prepare Web Packages') {
|
||||||
|
|
||||||
|
agent {
|
||||||
|
label 'linux'
|
||||||
|
}
|
||||||
|
|
||||||
|
steps {
|
||||||
|
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE'){
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console && git stash && git pull origin master && make clean'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/ && true || rm -rif web'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/ && true || git clone ssh://git@git.infini.ltd:64221/infini/console-ui.git web'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/web && git pull origin master'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/web/src && true || git clone ssh://git@git.infini.ltd:64221/infini/common-ui.git common'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/web/src/common && git pull origin master'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/web && git stash'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/web && cnpm install'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/web && cnpm run build'
|
||||||
|
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console && git pull origin master && make config build-linux-amd64'
|
||||||
|
sh label: 'copy-license', script: 'cd /home/jenkins/go/src/infini.sh/console && cp ../framework/LICENSE bin && cat ../framework/NOTICE NOTICE > bin/NOTICE'
|
||||||
|
sh label: 'copy-configs', script: 'cd /home/jenkins/go/src/infini.sh/console && mkdir -p bin/config && cp config/*.json bin/config && cp config/*.tpl bin/config'
|
||||||
|
sh label: 'copy-certs', script: 'cd /home/jenkins/go/src/infini.sh/console && cp -rf config/certs bin/config'
|
||||||
|
sh label: 'package-linux-amd64', script: 'cd /home/jenkins/go/src/infini.sh/console/bin && tar cfz ${WORKSPACE}/console-$VERSION-$BUILD_NUMBER-linux-amd64.tar.gz console-linux-amd64 console.yml LICENSE NOTICE config'
|
||||||
|
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console && git pull origin master && make config build-arm'
|
||||||
|
sh label: 'copy-license', script: 'cd /home/jenkins/go/src/infini.sh/console && cp ../framework/LICENSE bin && cat ../framework/NOTICE NOTICE > bin/NOTICE'
|
||||||
|
sh label: 'copy-configs', script: 'cd /home/jenkins/go/src/infini.sh/console && mkdir -p bin/config && cp config/*.json bin/config && cp config/*.tpl bin/config'
|
||||||
|
sh label: 'copy-certs', script: 'cd /home/jenkins/go/src/infini.sh/console && cp -rf config/certs bin/config'
|
||||||
|
sh label: 'package-linux-arm64', script: 'cd /home/jenkins/go/src/infini.sh/console/bin && tar cfz ${WORKSPACE}/console-$VERSION-$BUILD_NUMBER-linux-arm64.tar.gz console-linux-arm64 console.yml LICENSE NOTICE config'
|
||||||
|
|
||||||
|
archiveArtifacts artifacts: 'console-$VERSION-$BUILD_NUMBER-*.*', fingerprint: true, followSymlinks: true, onlyIfSuccessful: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -19,11 +19,13 @@ fi
|
||||||
cd $WORKBASE/web
|
cd $WORKBASE/web
|
||||||
git pull origin master
|
git pull origin master
|
||||||
|
|
||||||
cd $WORKBASE/web/common
|
cd $WORKBASE/web/src/common
|
||||||
git pull origin master
|
git pull origin master
|
||||||
|
|
||||||
git log --pretty=oneline -5
|
git log --pretty=oneline -5
|
||||||
|
|
||||||
|
cd $WORKBASE/web
|
||||||
|
|
||||||
#--quiet
|
#--quiet
|
||||||
cnpm install --quiet --no-progress
|
cnpm install --quiet --no-progress
|
||||||
|
|
||||||
|
|
|
@ -2547,6 +2547,251 @@ POST $[[INDEX_PREFIX]]widget/_doc/cji1ttq8go5i051pl1t0
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
PUT $[[INDEX_PREFIX]]widget/_doc/cji1sc28go5i051pl1i1
|
||||||
|
{
|
||||||
|
"id": "cji1sc28go5i051pl1i1",
|
||||||
|
"created": "2023-09-20T10:32:16.8356774+08:00",
|
||||||
|
"updated": "2023-08-20T10:32:16.8356774+08:00",
|
||||||
|
"title": "Source Cluster Query QPS",
|
||||||
|
"config": {
|
||||||
|
"bucket_size": "auto",
|
||||||
|
"format": {
|
||||||
|
"pattern": "0.00",
|
||||||
|
"type": "number"
|
||||||
|
},
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/{{.bucket_size_in_second}}",
|
||||||
|
"name": "Total Query",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.query_total",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
PUT $[[INDEX_PREFIX]]widget/_doc/cji1sc28go5i051pl1i2
|
||||||
|
{
|
||||||
|
"id": "cji1sc28go5i051pl1i2",
|
||||||
|
"created": "2023-09-20T10:32:16.8356774+08:00",
|
||||||
|
"updated": "2023-08-20T10:32:16.8356774+08:00",
|
||||||
|
"title": "Source Cluster Search Latency",
|
||||||
|
"config": {
|
||||||
|
"bucket_size": "auto",
|
||||||
|
"format": {
|
||||||
|
"pattern": "0.00",
|
||||||
|
"type": "number"
|
||||||
|
},
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/b",
|
||||||
|
"name": "Query Latency",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.query_time_in_millis",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.query_total",
|
||||||
|
"name": "b",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/b",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.scroll_time_in_millis",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.scroll_total",
|
||||||
|
"name": "b",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"name": "Scroll Latency",
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PUT $[[INDEX_PREFIX]]widget/_doc/cji1sc28go5i051pl1i3
|
||||||
|
{
|
||||||
|
"id": "cji1sc28go5i051pl1i3",
|
||||||
|
"created": "2023-09-20T10:32:16.8356774+08:00",
|
||||||
|
"updated": "2023-08-20T10:32:16.8356774+08:00",
|
||||||
|
"title": "Target Cluster Index QPS",
|
||||||
|
"config": {
|
||||||
|
"bucket_size": "auto",
|
||||||
|
"format": {
|
||||||
|
"pattern": "0.00",
|
||||||
|
"type": "number"
|
||||||
|
},
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/{{.bucket_size_in_second}}",
|
||||||
|
"name": "Total Indexing",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.indexing.index_total",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/{{.bucket_size_in_second}}",
|
||||||
|
"name": "Primary Indexing",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.primaries.indexing.index_total",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PUT $[[INDEX_PREFIX]]widget/_doc/cji1sc28go5i051pl1i4
|
||||||
|
{
|
||||||
|
"id": "cji1sc28go5i051pl1i4",
|
||||||
|
"created": "2023-09-20T10:32:16.8356774+08:00",
|
||||||
|
"updated": "2023-08-20T10:32:16.8356774+08:00",
|
||||||
|
"title": "Target Cluster Index Latency",
|
||||||
|
"config": {
|
||||||
|
"bucket_size": "auto",
|
||||||
|
"format": {
|
||||||
|
"pattern": "0.00",
|
||||||
|
"type": "number"
|
||||||
|
},
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/b",
|
||||||
|
"name": "Indexing Latency",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.primaries.indexing.index_time_in_millis",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.primaries.indexing.index_total",
|
||||||
|
"name": "b",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#The `id` value is consistent with the `_id` value
|
#The `id` value is consistent with the `_id` value
|
||||||
POST $[[INDEX_PREFIX]]layout/_doc/cg2qqh28go5jqa6vvk70
|
POST $[[INDEX_PREFIX]]layout/_doc/cg2qqh28go5jqa6vvk70
|
||||||
|
|
|
@ -2475,6 +2475,251 @@ POST $[[INDEX_PREFIX]]widget/doc/cji1ttq8go5i051pl1t0
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i1
|
||||||
|
{
|
||||||
|
"id": "cji1sc28go5i051pl1i1",
|
||||||
|
"created": "2023-09-20T10:32:16.8356774+08:00",
|
||||||
|
"updated": "2023-08-20T10:32:16.8356774+08:00",
|
||||||
|
"title": "Source Cluster Query QPS",
|
||||||
|
"config": {
|
||||||
|
"bucket_size": "auto",
|
||||||
|
"format": {
|
||||||
|
"pattern": "0.00",
|
||||||
|
"type": "number"
|
||||||
|
},
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/{{.bucket_size_in_second}}",
|
||||||
|
"name": "Total Query",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.query_total",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i2
|
||||||
|
{
|
||||||
|
"id": "cji1sc28go5i051pl1i2",
|
||||||
|
"created": "2023-09-20T10:32:16.8356774+08:00",
|
||||||
|
"updated": "2023-08-20T10:32:16.8356774+08:00",
|
||||||
|
"title": "Source Cluster Search Latency",
|
||||||
|
"config": {
|
||||||
|
"bucket_size": "auto",
|
||||||
|
"format": {
|
||||||
|
"pattern": "0.00",
|
||||||
|
"type": "number"
|
||||||
|
},
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/b",
|
||||||
|
"name": "Query Latency",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.query_time_in_millis",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.query_total",
|
||||||
|
"name": "b",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/b",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.scroll_time_in_millis",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.scroll_total",
|
||||||
|
"name": "b",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"name": "Scroll Latency",
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i3
|
||||||
|
{
|
||||||
|
"id": "cji1sc28go5i051pl1i3",
|
||||||
|
"created": "2023-09-20T10:32:16.8356774+08:00",
|
||||||
|
"updated": "2023-08-20T10:32:16.8356774+08:00",
|
||||||
|
"title": "Target Cluster Index QPS",
|
||||||
|
"config": {
|
||||||
|
"bucket_size": "auto",
|
||||||
|
"format": {
|
||||||
|
"pattern": "0.00",
|
||||||
|
"type": "number"
|
||||||
|
},
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/{{.bucket_size_in_second}}",
|
||||||
|
"name": "Total Indexing",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.indexing.index_total",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/{{.bucket_size_in_second}}",
|
||||||
|
"name": "Primary Indexing",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.primaries.indexing.index_total",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i4
|
||||||
|
{
|
||||||
|
"id": "cji1sc28go5i051pl1i4",
|
||||||
|
"created": "2023-09-20T10:32:16.8356774+08:00",
|
||||||
|
"updated": "2023-08-20T10:32:16.8356774+08:00",
|
||||||
|
"title": "Target Cluster Index Latency",
|
||||||
|
"config": {
|
||||||
|
"bucket_size": "auto",
|
||||||
|
"format": {
|
||||||
|
"pattern": "0.00",
|
||||||
|
"type": "number"
|
||||||
|
},
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/b",
|
||||||
|
"name": "Indexing Latency",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.primaries.indexing.index_time_in_millis",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.primaries.indexing.index_total",
|
||||||
|
"name": "b",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#The `id` value is consistent with the `_id` value
|
#The `id` value is consistent with the `_id` value
|
||||||
POST $[[INDEX_PREFIX]]layout/doc/cg2qqh28go5jqa6vvk70
|
POST $[[INDEX_PREFIX]]layout/doc/cg2qqh28go5jqa6vvk70
|
||||||
|
|
|
@ -2551,6 +2551,251 @@ POST $[[INDEX_PREFIX]]widget/doc/cji1ttq8go5i051pl1t0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i1
|
||||||
|
{
|
||||||
|
"id": "cji1sc28go5i051pl1i1",
|
||||||
|
"created": "2023-09-20T10:32:16.8356774+08:00",
|
||||||
|
"updated": "2023-08-20T10:32:16.8356774+08:00",
|
||||||
|
"title": "Source Cluster Query QPS",
|
||||||
|
"config": {
|
||||||
|
"bucket_size": "auto",
|
||||||
|
"format": {
|
||||||
|
"pattern": "0.00",
|
||||||
|
"type": "number"
|
||||||
|
},
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/{{.bucket_size_in_second}}",
|
||||||
|
"name": "Total Query",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.query_total",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i2
|
||||||
|
{
|
||||||
|
"id": "cji1sc28go5i051pl1i2",
|
||||||
|
"created": "2023-09-20T10:32:16.8356774+08:00",
|
||||||
|
"updated": "2023-08-20T10:32:16.8356774+08:00",
|
||||||
|
"title": "Source Cluster Search Latency",
|
||||||
|
"config": {
|
||||||
|
"bucket_size": "auto",
|
||||||
|
"format": {
|
||||||
|
"pattern": "0.00",
|
||||||
|
"type": "number"
|
||||||
|
},
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/b",
|
||||||
|
"name": "Query Latency",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.query_time_in_millis",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.query_total",
|
||||||
|
"name": "b",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/b",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.scroll_time_in_millis",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.search.scroll_total",
|
||||||
|
"name": "b",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"name": "Scroll Latency",
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i3
|
||||||
|
{
|
||||||
|
"id": "cji1sc28go5i051pl1i3",
|
||||||
|
"created": "2023-09-20T10:32:16.8356774+08:00",
|
||||||
|
"updated": "2023-08-20T10:32:16.8356774+08:00",
|
||||||
|
"title": "Target Cluster Index QPS",
|
||||||
|
"config": {
|
||||||
|
"bucket_size": "auto",
|
||||||
|
"format": {
|
||||||
|
"pattern": "0.00",
|
||||||
|
"type": "number"
|
||||||
|
},
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/{{.bucket_size_in_second}}",
|
||||||
|
"name": "Total Indexing",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.total.indexing.index_total",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/{{.bucket_size_in_second}}",
|
||||||
|
"name": "Primary Indexing",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.primaries.indexing.index_total",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PUT $[[INDEX_PREFIX]]widget/doc/cji1sc28go5i051pl1i4
|
||||||
|
{
|
||||||
|
"id": "cji1sc28go5i051pl1i4",
|
||||||
|
"created": "2023-09-20T10:32:16.8356774+08:00",
|
||||||
|
"updated": "2023-08-20T10:32:16.8356774+08:00",
|
||||||
|
"title": "Target Cluster Index Latency",
|
||||||
|
"config": {
|
||||||
|
"bucket_size": "auto",
|
||||||
|
"format": {
|
||||||
|
"pattern": "0.00",
|
||||||
|
"type": "number"
|
||||||
|
},
|
||||||
|
"series": [
|
||||||
|
{
|
||||||
|
"metric": {
|
||||||
|
"formula": "a/b",
|
||||||
|
"name": "Indexing Latency",
|
||||||
|
"items": [
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.primaries.indexing.index_time_in_millis",
|
||||||
|
"name": "a",
|
||||||
|
"statistic": "derivative"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"field": "payload.elasticsearch.index_stats.primaries.indexing.index_total",
|
||||||
|
"name": "b",
|
||||||
|
"statistic": "derivative"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sort": [
|
||||||
|
{
|
||||||
|
"direction": "desc",
|
||||||
|
"key": "_count"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"queries": {
|
||||||
|
"cluster_id": "infini_default_system_cluster",
|
||||||
|
"indices": [
|
||||||
|
".infini_metrics"
|
||||||
|
],
|
||||||
|
"time_field": "timestamp"
|
||||||
|
},
|
||||||
|
"type": "line"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#The `id` value is consistent with the `_id` value
|
#The `id` value is consistent with the `_id` value
|
||||||
POST $[[INDEX_PREFIX]]layout/doc/cg2qqh28go5jqa6vvk70
|
POST $[[INDEX_PREFIX]]layout/doc/cg2qqh28go5jqa6vvk70
|
||||||
{
|
{
|
||||||
|
|
|
@ -293,9 +293,15 @@
|
||||||
{"name": "doc.update", "methods": ["put"],
|
{"name": "doc.update", "methods": ["put"],
|
||||||
"path": "/:index_name/:doctype/:doc_id"
|
"path": "/:index_name/:doctype/:doc_id"
|
||||||
},
|
},
|
||||||
|
{"name": "doc.update", "methods": ["post"],
|
||||||
|
"path": "/:index_name/_update/:doc_id"
|
||||||
|
},
|
||||||
{"name": "doc.create", "methods": ["post"],
|
{"name": "doc.create", "methods": ["post"],
|
||||||
"path": "/:index_name/:doctype"
|
"path": "/:index_name/:doctype"
|
||||||
},
|
},
|
||||||
|
{"name": "doc.create", "methods": ["post", "put"],
|
||||||
|
"path": "/:index_name/_create/:doc_id"
|
||||||
|
},
|
||||||
|
|
||||||
{"name": "doc.delete", "methods": ["delete"],
|
{"name": "doc.delete", "methods": ["delete"],
|
||||||
"path": "/:index_name/:doctype/:doc_id"
|
"path": "/:index_name/:doctype/:doc_id"
|
||||||
|
@ -303,6 +309,9 @@
|
||||||
{"name": "doc.get", "methods": ["get"],
|
{"name": "doc.get", "methods": ["get"],
|
||||||
"path": "/:index_name/:doctype/:doc_id"
|
"path": "/:index_name/:doctype/:doc_id"
|
||||||
},
|
},
|
||||||
|
{"name": "doc.get", "methods": ["get"],
|
||||||
|
"path": "/:index_name/_source/:doc_id"
|
||||||
|
},
|
||||||
{"name": "doc.exists", "methods": ["head"],
|
{"name": "doc.exists", "methods": ["head"],
|
||||||
"path": "/:index_name/:doctype/:doc_id"
|
"path": "/:index_name/:doctype/:doc_id"
|
||||||
},
|
},
|
||||||
|
|
|
@ -5,8 +5,10 @@
|
||||||
package insight
|
package insight
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"github.com/Knetic/govaluate"
|
"github.com/Knetic/govaluate"
|
||||||
log "github.com/cihub/seelog"
|
log "github.com/cihub/seelog"
|
||||||
|
"text/template"
|
||||||
"infini.sh/console/model/insight"
|
"infini.sh/console/model/insight"
|
||||||
httprouter "infini.sh/framework/core/api/router"
|
httprouter "infini.sh/framework/core/api/router"
|
||||||
"infini.sh/framework/core/elastic"
|
"infini.sh/framework/core/elastic"
|
||||||
|
@ -224,6 +226,24 @@ func getMetricData(metric *insight.Metric) (interface{}, error) {
|
||||||
if len(metric.Items) == 1 && formula == "" {
|
if len(metric.Items) == 1 && formula == "" {
|
||||||
targetMetricData = metricData
|
targetMetricData = metricData
|
||||||
}else {
|
}else {
|
||||||
|
tpl, err := template.New("insight_formula").Parse(formula)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
msgBuffer := &bytes.Buffer{}
|
||||||
|
params := map[string]interface{}{}
|
||||||
|
if metric.BucketSize != "" {
|
||||||
|
du, err := util.ParseDuration(metric.BucketSize)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
params["bucket_size_in_second"] = du.Seconds()
|
||||||
|
}
|
||||||
|
err = tpl.Execute(msgBuffer, params)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
formula = msgBuffer.String()
|
||||||
for _, md := range metricData {
|
for _, md := range metricData {
|
||||||
targetData := insight.MetricData{
|
targetData := insight.MetricData{
|
||||||
Groups: md.Groups,
|
Groups: md.Groups,
|
||||||
|
|
|
@ -322,19 +322,32 @@ func (module *Module) initTempClient(r *http.Request) (error, elastic.API, Setup
|
||||||
|
|
||||||
func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
|
func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
|
||||||
if !global.Env().SetupRequired() {
|
if !global.Env().SetupRequired() {
|
||||||
module.WriteError(w, "setup not permitted", 500)
|
//handle setup timeout
|
||||||
|
rkey, err := keystore.GetValue(credential.SecretKey)
|
||||||
|
if err != nil {
|
||||||
|
module.WriteError(w, err.Error(), 500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
scheme := "http"
|
request := SetupRequest{}
|
||||||
if r.TLS != nil {
|
err = module.DecodeJSON(r, &request)
|
||||||
scheme = "https"
|
|
||||||
}
|
|
||||||
consoleEndpoint := fmt.Sprintf("%s://%s", scheme, r.Host)
|
|
||||||
err := kv.AddValue("system", []byte("INFINI_CONSOLE_ENDPOINT"), []byte(consoleEndpoint))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
module.WriteError(w, err.Error(), 500)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
h := md5.New()
|
||||||
|
rawSecret := []byte(request.CredentialSecret)
|
||||||
|
h.Write(rawSecret)
|
||||||
|
secret := make([]byte, 32)
|
||||||
|
hex.Encode(secret, h.Sum(nil))
|
||||||
|
if bytes.Compare(rkey, secret) == 0 {
|
||||||
|
module.WriteJSON(w, util.MapStr{
|
||||||
|
"success": true,
|
||||||
|
}, 200)
|
||||||
|
}else{
|
||||||
|
module.WriteError(w, "invalid credential secret", 500)
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
success := false
|
success := false
|
||||||
var errType string
|
var errType string
|
||||||
var fixTips string
|
var fixTips string
|
||||||
|
@ -373,7 +386,6 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
|
||||||
}
|
}
|
||||||
module.WriteJSON(w, result, code)
|
module.WriteJSON(w, result, code)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err, client, request := module.initTempClient(r)
|
err, client, request := module.initTempClient(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -381,6 +393,15 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
|
||||||
if request.CredentialSecret == "" {
|
if request.CredentialSecret == "" {
|
||||||
panic("invalid credential secret")
|
panic("invalid credential secret")
|
||||||
}
|
}
|
||||||
|
scheme := "http"
|
||||||
|
if r.TLS != nil {
|
||||||
|
scheme = "https"
|
||||||
|
}
|
||||||
|
consoleEndpoint := fmt.Sprintf("%s://%s", scheme, r.Host)
|
||||||
|
err = kv.AddValue("system", []byte("INFINI_CONSOLE_ENDPOINT"), []byte(consoleEndpoint))
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
if cfg1.IndexPrefix == "" {
|
if cfg1.IndexPrefix == "" {
|
||||||
cfg1.IndexPrefix = ".infini_"
|
cfg1.IndexPrefix = ".infini_"
|
||||||
|
@ -586,5 +607,4 @@ func (module *Module) initialize(w http.ResponseWriter, r *http.Request, ps http
|
||||||
}
|
}
|
||||||
|
|
||||||
success = true
|
success = true
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,8 @@ func InitAPI() {
|
||||||
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionMigrationTaskWrite))
|
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionMigrationTaskWrite))
|
||||||
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead))
|
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead))
|
||||||
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead))
|
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead))
|
||||||
|
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/logging/:index", handler.RequirePermission(handler.searchIndexLevelTaskLogging, enum.PermissionMigrationTaskRead))
|
||||||
|
api.HandleAPIMethod(api.GET, "/migration/data/_search_values", handler.RequirePermission(handler.searchTaskFieldValues("cluster_migration"), enum.PermissionMigrationTaskRead))
|
||||||
|
|
||||||
api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead))
|
api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead))
|
||||||
api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite))
|
api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite))
|
||||||
|
|
|
@ -56,10 +56,12 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
taskItem.RetryTimes++
|
taskItem.RetryTimes++
|
||||||
if taskItem.StartTimeInMillis == 0 {
|
|
||||||
taskItem.StartTimeInMillis = time.Now().UnixMilli()
|
taskItem.StartTimeInMillis = time.Now().UnixMilli()
|
||||||
}
|
|
||||||
taskItem.Status = task.StatusRunning
|
taskItem.Status = task.StatusRunning
|
||||||
|
taskItem.Metadata.Labels["total_diff_docs"] = 0
|
||||||
|
taskItem.Metadata.Labels["only_in_source"] = 0
|
||||||
|
taskItem.Metadata.Labels["only_in_target"] = 0
|
||||||
|
taskItem.Metadata.Labels["diff_both"] = 0
|
||||||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
Success: true,
|
Success: true,
|
||||||
}, fmt.Sprintf("cluster comparison task [%s] started", taskItem.ID))
|
}, fmt.Sprintf("cluster comparison task [%s] started", taskItem.ID))
|
||||||
|
|
|
@ -105,6 +105,7 @@ func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rba
|
||||||
"source_total_docs": sourceTotalDocs,
|
"source_total_docs": sourceTotalDocs,
|
||||||
"target_total_docs": targetTotalDocs,
|
"target_total_docs": targetTotalDocs,
|
||||||
"permit_nodes": config.Settings.Execution.Nodes.Permit,
|
"permit_nodes": config.Settings.Execution.Nodes.Permit,
|
||||||
|
"name": config.Name,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Cancellable: true,
|
Cancellable: true,
|
||||||
|
@ -112,6 +113,9 @@ func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rba
|
||||||
Status: task.StatusInit,
|
Status: task.StatusInit,
|
||||||
ConfigString: util.MustToJSON(config),
|
ConfigString: util.MustToJSON(config),
|
||||||
}
|
}
|
||||||
|
if len(config.Tags) > 0 {
|
||||||
|
t.Metadata.Labels["tags"] = config.Tags
|
||||||
|
}
|
||||||
t.ID = util.GetUUID()
|
t.ID = util.GetUUID()
|
||||||
return &t, nil
|
return &t, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -130,6 +130,13 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error {
|
||||||
}
|
}
|
||||||
if index.Incremental != nil {
|
if index.Incremental != nil {
|
||||||
incrementalFilter, err := index.Incremental.BuildFilter(current, step)
|
incrementalFilter, err := index.Incremental.BuildFilter(current, step)
|
||||||
|
if source.Step == nil {
|
||||||
|
source.Step = step.String()
|
||||||
|
source.End = float64(current - index.Incremental.Delay.Milliseconds())
|
||||||
|
if !index.Incremental.Full {
|
||||||
|
source.Start = source.End - float64(step.Milliseconds())
|
||||||
|
}
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,7 +78,6 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac
|
||||||
config.Cluster.Source.Distribution = srcClusterCfg.Distribution
|
config.Cluster.Source.Distribution = srcClusterCfg.Distribution
|
||||||
dstClusterCfg := elastic.GetConfig(config.Cluster.Target.Id)
|
dstClusterCfg := elastic.GetConfig(config.Cluster.Target.Id)
|
||||||
config.Cluster.Target.Distribution = dstClusterCfg.Distribution
|
config.Cluster.Target.Distribution = dstClusterCfg.Distribution
|
||||||
|
|
||||||
clearTaskConfig(config)
|
clearTaskConfig(config)
|
||||||
|
|
||||||
var totalDocs int64
|
var totalDocs int64
|
||||||
|
@ -102,6 +101,7 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac
|
||||||
"target_cluster_id": config.Cluster.Target.Id,
|
"target_cluster_id": config.Cluster.Target.Id,
|
||||||
"source_total_docs": totalDocs,
|
"source_total_docs": totalDocs,
|
||||||
"permit_nodes": config.Settings.Execution.Nodes.Permit,
|
"permit_nodes": config.Settings.Execution.Nodes.Permit,
|
||||||
|
"name": config.Name,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Cancellable: true,
|
Cancellable: true,
|
||||||
|
@ -109,6 +109,9 @@ func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac
|
||||||
Status: task.StatusInit,
|
Status: task.StatusInit,
|
||||||
ConfigString: util.MustToJSON(config),
|
ConfigString: util.MustToJSON(config),
|
||||||
}
|
}
|
||||||
|
if len(config.Tags) > 0 {
|
||||||
|
t.Metadata.Labels["tags"] = config.Tags
|
||||||
|
}
|
||||||
t.ID = util.GetUUID()
|
t.ID = util.GetUUID()
|
||||||
return &t, nil
|
return &t, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,8 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
model2 "infini.sh/console/model"
|
model2 "infini.sh/console/model"
|
||||||
|
migration_model "infini.sh/console/plugin/task_manager/model"
|
||||||
|
"infini.sh/framework/core/global"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -31,6 +33,9 @@ type TaskInfoResponse struct {
|
||||||
CompletedPartitions int `json:"completed_partitions"`
|
CompletedPartitions int `json:"completed_partitions"`
|
||||||
Partitions []util.MapStr `json:"partitions"`
|
Partitions []util.MapStr `json:"partitions"`
|
||||||
Repeating bool `json:"repeating"`
|
Repeating bool `json:"repeating"`
|
||||||
|
Workers []util.MapStr `json:"workers"`
|
||||||
|
Incremental *migration_model.IndexIncremental `json:"incremental"`
|
||||||
|
NextRunTime int64 `json:"next_run_time"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
@ -105,6 +110,7 @@ func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, hit := range searchRes.Hits.Hits {
|
for _, hit := range searchRes.Hits.Hits {
|
||||||
sourceM := util.MapStr(hit.Source)
|
sourceM := util.MapStr(hit.Source)
|
||||||
h.populateMajorTaskInfo(hit.ID, sourceM)
|
h.populateMajorTaskInfo(hit.ID, sourceM)
|
||||||
|
@ -122,6 +128,12 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) {
|
||||||
log.Errorf("failed to unmarshal major task info, err: %v", err)
|
log.Errorf("failed to unmarshal major task info, err: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
_, repeatStatus, err := h.calcRepeatingStatus(&majorTask)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to calc repeat info, err: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sourceM.Put("repeat", repeatStatus)
|
||||||
switch majorTask.Metadata.Type {
|
switch majorTask.Metadata.Type {
|
||||||
case "cluster_migration":
|
case "cluster_migration":
|
||||||
ts, _, err := h.getMigrationMajorTaskInfo(taskID)
|
ts, _, err := h.getMigrationMajorTaskInfo(taskID)
|
||||||
|
@ -138,8 +150,21 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sourceM.Put("running_children", count)
|
sourceM.Put("running_children", count)
|
||||||
|
if repeatStatus.IsRepeat && repeatStatus.LastRunChildTaskID != "" {
|
||||||
|
ts, _, err = h.getMigrationMajorTaskInfo(repeatStatus.LastRunChildTaskID)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("fetch progress info of task error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs)
|
||||||
|
sourceM.Put("metadata.labels.source_total_docs", ts.SourceDocs)
|
||||||
|
}
|
||||||
case "cluster_comparison":
|
case "cluster_comparison":
|
||||||
ts, _, err := h.getComparisonMajorTaskInfo(taskID)
|
targetTaskId := taskID
|
||||||
|
if repeatStatus.IsRepeat && repeatStatus.LastRunChildTaskID != "" {
|
||||||
|
targetTaskId = repeatStatus.LastRunChildTaskID
|
||||||
|
}
|
||||||
|
ts, _, err := h.getComparisonMajorTaskInfo(targetTaskId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("fetch progress info of task error: %v", err)
|
log.Warnf("fetch progress info of task error: %v", err)
|
||||||
return
|
return
|
||||||
|
@ -156,12 +181,6 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) {
|
||||||
}
|
}
|
||||||
sourceM.Put("running_children", count)
|
sourceM.Put("running_children", count)
|
||||||
}
|
}
|
||||||
_, repeatStatus, err := h.calcRepeatingStatus(&majorTask)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("failed to calc repeat info, err: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
sourceM.Put("repeat", repeatStatus)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
@ -174,7 +193,7 @@ func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps http
|
||||||
h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError)
|
h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if obj.Metadata.Type != "pipeline" && obj.Status == task.StatusComplete {
|
if obj.Metadata.Type != "pipeline" && (obj.Status == task.StatusComplete && obj.Metadata.Type != "cluster_comparison") {
|
||||||
h.WriteError(w, fmt.Sprintf("[%s] task [%s] completed, can't start anymore", obj.Metadata.Type, taskID), http.StatusInternalServerError)
|
h.WriteError(w, fmt.Sprintf("[%s] task [%s] completed, can't start anymore", obj.Metadata.Type, taskID), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -332,15 +351,104 @@ func (h *APIHandler) resumeTask(w http.ResponseWriter, req *http.Request, ps htt
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// query index level task logging
|
||||||
|
func (h *APIHandler) searchIndexLevelTaskLogging(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
id := ps.MustGetParameter("task_id")
|
||||||
|
uniqueIndexName := ps.MustGetParameter("index")
|
||||||
|
cfg := global.MustLookup("cluster_migration_config")
|
||||||
|
var (
|
||||||
|
migrationConfig *DispatcherConfig
|
||||||
|
ok bool
|
||||||
|
)
|
||||||
|
if migrationConfig, ok = cfg.(*DispatcherConfig); !ok {
|
||||||
|
h.WriteJSON(w, elastic.SearchResponse{}, http.StatusOK)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
client := elastic.GetClient(migrationConfig.Elasticsearch)
|
||||||
|
var (
|
||||||
|
strSize = h.GetParameterOrDefault(req, "size", "500")
|
||||||
|
min = h.GetParameterOrDefault(req, "min", "")
|
||||||
|
max = h.GetParameterOrDefault(req, "max", "")
|
||||||
|
)
|
||||||
|
size, _ := strconv.Atoi(strSize)
|
||||||
|
if size <= 0 {
|
||||||
|
size = 500
|
||||||
|
}
|
||||||
|
rangeObj := util.MapStr{}
|
||||||
|
if min != "" {
|
||||||
|
rangeObj["gte"] = min
|
||||||
|
}
|
||||||
|
if max != "" {
|
||||||
|
rangeObj["lt"] = max
|
||||||
|
}
|
||||||
|
mustQ := []util.MapStr{
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.category": util.MapStr{
|
||||||
|
"value": "task",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.labels.parent_task_id": util.MapStr{
|
||||||
|
"value": id,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.labels.unique_index_name": util.MapStr{
|
||||||
|
"value": uniqueIndexName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if len(rangeObj) > 0 {
|
||||||
|
mustQ = append(mustQ, util.MapStr{
|
||||||
|
"range": util.MapStr{
|
||||||
|
"timestamp": rangeObj,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
query := util.MapStr{
|
||||||
|
"size": size,
|
||||||
|
"_source": []string{"payload.task.logging.message", "timestamp"},
|
||||||
|
"sort": []util.MapStr{
|
||||||
|
{
|
||||||
|
"timestamp": util.MapStr{
|
||||||
|
"order": "desc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"query": util.MapStr{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"must": mustQ,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
searchRes, err := client.SearchWithRawQueryDSL(migrationConfig.LogIndexName, util.MustToJSONBytes(query))
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
h.WriteJSON(w, searchRes, http.StatusOK)
|
||||||
|
}
|
||||||
|
|
||||||
type RepeatStatus struct {
|
type RepeatStatus struct {
|
||||||
IsRepeat bool `json:"is_repeat"`
|
IsRepeat bool `json:"is_repeat"`
|
||||||
Done bool `json:"done"`
|
Done bool `json:"done"`
|
||||||
Repeating bool `json:"repeating"`
|
Repeating bool `json:"repeating"`
|
||||||
|
LastRunTime int64 `json:"last_run_time"`
|
||||||
|
NextRunTime int64 `json:"next_run_time"`
|
||||||
|
LastRunChildTaskID string `json:"last_run_child_task_id"`
|
||||||
|
LastCompleteTime int64 `json:"last_complete_time"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *RepeatStatus, error) {
|
func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *RepeatStatus, error) {
|
||||||
ret := &RepeatStatus{}
|
ret := &RepeatStatus{}
|
||||||
lastRepeatingChild, err := migration_util.GetLastRepeatingChildTask(taskItem.ID, taskItem.Metadata.Type)
|
lastRepeatingChild, lastRunChild, err := migration_util.GetLastRepeatingChildTask(taskItem.ID, taskItem.Metadata.Type)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
@ -363,6 +471,17 @@ func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *Repe
|
||||||
if !repeatTriggered {
|
if !repeatTriggered {
|
||||||
ret.Repeating = true
|
ret.Repeating = true
|
||||||
}
|
}
|
||||||
|
ret.NextRunTime = migration_util.GetMapIntValue(lastRepeatingChild.Metadata.Labels, "next_run_time")
|
||||||
|
ret.LastRunTime = lastRepeatingChild.StartTimeInMillis
|
||||||
|
if ret.LastRunTime == 0 && lastRunChild != nil {
|
||||||
|
ret.LastRunTime = lastRunChild.StartTimeInMillis
|
||||||
|
if lastRunChild.CompletedTime != nil && !lastRunChild.CompletedTime.IsZero(){
|
||||||
|
ret.LastCompleteTime = lastRunChild.CompletedTime.UnixMilli()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if lastRunChild != nil {
|
||||||
|
ret.LastRunChildTaskID = lastRunChild.ID
|
||||||
|
}
|
||||||
return lastRepeatingChild, ret, nil
|
return lastRepeatingChild, ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -615,3 +734,71 @@ func (h *APIHandler) calcMajorTaskInfo(subTasks []task.Task, repeating bool) (st
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *APIHandler) searchTaskFieldValues(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
var (
|
||||||
|
field = h.GetParameterOrDefault(req, "field", "")
|
||||||
|
keyword = h.GetParameterOrDefault(req, "keyword", "")
|
||||||
|
mustQ []interface{}
|
||||||
|
)
|
||||||
|
mustQ = append(mustQ, util.MapStr{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.type": util.MapStr{
|
||||||
|
"value": taskType,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
if v := strings.TrimSpace(keyword); v != ""{
|
||||||
|
mustQ = append(mustQ, util.MapStr{
|
||||||
|
"query_string": util.MapStr{
|
||||||
|
"default_field": field,
|
||||||
|
"query": fmt.Sprintf("*%s*", v),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
queryDSL := util.MapStr{
|
||||||
|
"aggs": util.MapStr{
|
||||||
|
"items": util.MapStr{
|
||||||
|
"terms": util.MapStr{
|
||||||
|
"field": field,
|
||||||
|
"size": 20,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"size": 0,
|
||||||
|
"query": util.MapStr{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"must": mustQ,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
q := orm.Query{
|
||||||
|
RawQuery: util.MustToJSONBytes(queryDSL),
|
||||||
|
}
|
||||||
|
err, result := orm.Search(task.Task{}, &q)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
searchRes := elastic.SearchResponse{}
|
||||||
|
err = util.FromJSONBytes(result.Raw, &searchRes)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
items := []string{}
|
||||||
|
for _, bk := range searchRes.Aggregations["items"].Buckets {
|
||||||
|
if v, ok := bk["key"].(string); ok {
|
||||||
|
if strings.Contains(v, keyword){
|
||||||
|
items = append(items, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
h.WriteJSON(w, items, http.StatusOK)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -74,7 +74,7 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http.
|
||||||
var completedIndices int
|
var completedIndices int
|
||||||
for i, index := range taskConfig.Indices {
|
for i, index := range taskConfig.Indices {
|
||||||
indexName := index.Source.GetUniqueIndexName()
|
indexName := index.Source.GetUniqueIndexName()
|
||||||
count := indexState[indexName].SourceScrollDocs + indexState[indexName].TargetScrollDocs
|
count := indexState[indexName].TotalScrollDocs
|
||||||
percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100
|
percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100
|
||||||
if percent > 100 {
|
if percent > 100 {
|
||||||
percent = 100
|
percent = 100
|
||||||
|
@ -82,7 +82,9 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http.
|
||||||
taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceTotalDocs
|
taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceTotalDocs
|
||||||
taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs
|
taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs
|
||||||
taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2)
|
taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2)
|
||||||
|
taskConfig.Indices[i].TotalScrollDocs = count
|
||||||
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
||||||
|
taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren
|
||||||
if count == index.Source.Docs+index.Target.Docs {
|
if count == index.Source.Docs+index.Target.Docs {
|
||||||
completedIndices++
|
completedIndices++
|
||||||
}
|
}
|
||||||
|
@ -115,6 +117,7 @@ type ClusterComparisonTaskState struct {
|
||||||
TargetTotalDocs int64
|
TargetTotalDocs int64
|
||||||
TargetScrollDocs int64
|
TargetScrollDocs int64
|
||||||
TotalDiffDocs int64
|
TotalDiffDocs int64
|
||||||
|
RunningChildren int
|
||||||
}
|
}
|
||||||
|
|
||||||
type ComparisonIndexStateInfo struct {
|
type ComparisonIndexStateInfo struct {
|
||||||
|
@ -124,12 +127,21 @@ type ComparisonIndexStateInfo struct {
|
||||||
TargetTotalDocs int64
|
TargetTotalDocs int64
|
||||||
TargetScrollDocs int64
|
TargetScrollDocs int64
|
||||||
TotalDiffDocs int64
|
TotalDiffDocs int64
|
||||||
|
RunningChildren int
|
||||||
|
TotalScrollDocs int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: calc realtime info from instance
|
|
||||||
func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) {
|
func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) {
|
||||||
|
var pipelineTaskIDs = map[string][]string{}
|
||||||
|
var pipelineIndexNames = map[string]string{}
|
||||||
indexState = map[string]ComparisonIndexStateInfo{}
|
indexState = map[string]ComparisonIndexStateInfo{}
|
||||||
|
const size = 500
|
||||||
|
var (
|
||||||
|
from = -size
|
||||||
|
hasMore = true
|
||||||
|
)
|
||||||
|
for hasMore {
|
||||||
|
from += size
|
||||||
taskQuery := util.MapStr{
|
taskQuery := util.MapStr{
|
||||||
"size": 500,
|
"size": 500,
|
||||||
"query": util.MapStr{
|
"query": util.MapStr{
|
||||||
|
@ -157,7 +169,11 @@ func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats Cluste
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return taskStats, indexState, err
|
return taskStats, indexState, err
|
||||||
}
|
}
|
||||||
|
if len(subTasks) < size {
|
||||||
|
hasMore = false
|
||||||
|
}
|
||||||
|
|
||||||
|
var indexMigrationTaskIDs []string
|
||||||
for _, subTask := range subTasks {
|
for _, subTask := range subTasks {
|
||||||
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
||||||
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
||||||
|
@ -171,26 +187,80 @@ func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats Cluste
|
||||||
log.Errorf("failed to get task config, err: %v", err)
|
log.Errorf("failed to get task config, err: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled")
|
|
||||||
targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled")
|
|
||||||
totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs")
|
totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs")
|
||||||
taskStats.SourceTotalDocs += cfg.Source.DocCount
|
taskStats.SourceTotalDocs += cfg.Source.DocCount
|
||||||
taskStats.SourceScrollDocs += sourceDocs
|
|
||||||
taskStats.TargetTotalDocs += cfg.Target.DocCount
|
taskStats.TargetTotalDocs += cfg.Target.DocCount
|
||||||
taskStats.TargetScrollDocs += targetDocs
|
|
||||||
taskStats.TotalDiffDocs += totalDiffDocs
|
taskStats.TotalDiffDocs += totalDiffDocs
|
||||||
st := indexState[indexName]
|
st := indexState[indexName]
|
||||||
st.SourceTotalDocs += cfg.Source.DocCount
|
st.SourceTotalDocs += cfg.Source.DocCount
|
||||||
st.SourceScrollDocs += sourceDocs
|
|
||||||
st.TargetTotalDocs += cfg.Target.DocCount
|
st.TargetTotalDocs += cfg.Target.DocCount
|
||||||
st.TargetScrollDocs += targetDocs
|
|
||||||
st.TotalDiffDocs += totalDiffDocs
|
st.TotalDiffDocs += totalDiffDocs
|
||||||
if subTask.Status == task.StatusError {
|
if subTask.Status == task.StatusError {
|
||||||
st.ErrorPartitions += 1
|
st.ErrorPartitions += 1
|
||||||
}
|
}
|
||||||
|
if subTask.Status == task.StatusRunning {
|
||||||
|
st.RunningChildren++
|
||||||
|
indexState[indexName] = st
|
||||||
|
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled")
|
||||||
|
targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled")
|
||||||
|
st.SourceScrollDocs += sourceDocs
|
||||||
|
st.TargetScrollDocs += targetDocs
|
||||||
|
st.TotalScrollDocs += sourceDocs + targetDocs
|
||||||
|
taskStats.TargetScrollDocs += targetDocs
|
||||||
|
taskStats.SourceScrollDocs += sourceDocs
|
||||||
indexState[indexName] = st
|
indexState[indexName] = st
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(indexMigrationTaskIDs) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
taskQuery = util.MapStr{
|
||||||
|
"size": len(indexMigrationTaskIDs) * 2,
|
||||||
|
"query": util.MapStr{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"must": []util.MapStr{
|
||||||
|
{
|
||||||
|
"terms": util.MapStr{
|
||||||
|
"parent_id": indexMigrationTaskIDs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
subTasks, err = migration_util.GetTasks(taskQuery)
|
||||||
|
if err != nil {
|
||||||
|
return taskStats, indexState, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, subTask := range subTasks {
|
||||||
|
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
||||||
|
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
||||||
|
if indexName == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
pipelineIndexNames[subTask.ID] = indexName
|
||||||
|
|
||||||
|
if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
|
||||||
|
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
|
||||||
|
for pipelineID, pipelineContext := range pipelineContexts {
|
||||||
|
// add scrolledDocs of running tasks
|
||||||
|
scrollDocs := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs")
|
||||||
|
indexName := pipelineIndexNames[pipelineID]
|
||||||
|
st := indexState[indexName]
|
||||||
|
st.TotalScrollDocs += scrollDocs
|
||||||
|
indexState[indexName] = st
|
||||||
|
}
|
||||||
return taskStats, indexState, nil
|
return taskStats, indexState, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -231,6 +301,7 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
|
||||||
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating)
|
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating)
|
||||||
|
|
||||||
var partitionTaskInfos []util.MapStr
|
var partitionTaskInfos []util.MapStr
|
||||||
|
var workers = map[string]struct{}{}
|
||||||
|
|
||||||
for i, subTask := range subTasks {
|
for i, subTask := range subTasks {
|
||||||
cfg := migration_model.IndexComparisonTaskConfig{}
|
cfg := migration_model.IndexComparisonTaskConfig{}
|
||||||
|
@ -242,6 +313,10 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
taskInfo.Step = cfg.Source.Step
|
taskInfo.Step = cfg.Source.Step
|
||||||
}
|
}
|
||||||
|
instID := migration_util.GetMapStringValue(subTask.Metadata.Labels, "execution_instance_id")
|
||||||
|
if instID != "" {
|
||||||
|
workers[instID] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
var durationInMS int64
|
var durationInMS int64
|
||||||
var subCompletedTime int64
|
var subCompletedTime int64
|
||||||
|
@ -256,6 +331,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
|
||||||
subTaskLabels := util.MapStr(subTask.Metadata.Labels)
|
subTaskLabels := util.MapStr(subTask.Metadata.Labels)
|
||||||
sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled")
|
sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled")
|
||||||
targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled")
|
targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled")
|
||||||
|
onlyInSource := migration_util.GetMapIntValue(subTaskLabels, "only_in_source")
|
||||||
|
onlyInTarget := migration_util.GetMapIntValue(subTaskLabels, "only_in_target")
|
||||||
|
diffBoth := migration_util.GetMapIntValue(subTaskLabels, "diff_both")
|
||||||
|
|
||||||
partitionTaskInfo := util.MapStr{
|
partitionTaskInfo := util.MapStr{
|
||||||
"task_id": subTask.ID,
|
"task_id": subTask.ID,
|
||||||
|
@ -267,6 +345,9 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
|
||||||
"duration": durationInMS,
|
"duration": durationInMS,
|
||||||
"source_total_docs": cfg.Source.DocCount,
|
"source_total_docs": cfg.Source.DocCount,
|
||||||
"target_total_docs": cfg.Target.DocCount,
|
"target_total_docs": cfg.Target.DocCount,
|
||||||
|
"only_in_source": onlyInSource,
|
||||||
|
"only_in_target": onlyInTarget,
|
||||||
|
"diff_both": diffBoth,
|
||||||
}
|
}
|
||||||
sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg)
|
sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg)
|
||||||
if sourceDumpTask != nil {
|
if sourceDumpTask != nil {
|
||||||
|
@ -305,6 +386,14 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
|
||||||
if taskInfo.StartTime == 0 {
|
if taskInfo.StartTime == 0 {
|
||||||
taskInfo.StartTime = startTime
|
taskInfo.StartTime = startTime
|
||||||
}
|
}
|
||||||
|
for _, node := range taskConfig.Settings.Execution.Nodes.Permit {
|
||||||
|
if _, ok := workers[node.ID]; ok {
|
||||||
|
taskInfo.Workers = append(taskInfo.Workers, util.MapStr{
|
||||||
|
"id": node.ID,
|
||||||
|
"name": node.Name,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
taskInfo.Partitions = partitionTaskInfos
|
taskInfo.Partitions = partitionTaskInfos
|
||||||
taskInfo.CompletedPartitions = completedPartitions
|
taskInfo.CompletedPartitions = completedPartitions
|
||||||
h.WriteJSON(w, taskInfo, http.StatusOK)
|
h.WriteJSON(w, taskInfo, http.StatusOK)
|
||||||
|
|
|
@ -362,6 +362,9 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
|
||||||
if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 {
|
if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
taskItem.Metadata.Labels["total_diff_docs"] = onlyInSource + onlyInTarget + diffBoth
|
taskItem.Metadata.Labels["total_diff_docs"] = onlyInSource + onlyInTarget + diffBoth
|
||||||
|
taskItem.Metadata.Labels["only_in_source"] = onlyInSource
|
||||||
|
taskItem.Metadata.Labels["only_in_target"] = onlyInTarget
|
||||||
|
taskItem.Metadata.Labels["diff_both"] = diffBoth
|
||||||
taskItem.CompletedTime = &now
|
taskItem.CompletedTime = &now
|
||||||
taskItem.Status = task.StatusError
|
taskItem.Status = task.StatusError
|
||||||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
|
@ -390,6 +393,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
taskItem.CompletedTime = &now
|
taskItem.CompletedTime = &now
|
||||||
taskItem.Status = task.StatusComplete
|
taskItem.Status = task.StatusComplete
|
||||||
|
taskItem.Metadata.Labels["total_diff_docs"] = 0
|
||||||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
Success: true,
|
Success: true,
|
||||||
}, "index comparison completed")
|
}, "index comparison completed")
|
||||||
|
|
|
@ -3,6 +3,7 @@ package task_manager
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/cihub/seelog"
|
log "github.com/cihub/seelog"
|
||||||
|
@ -78,18 +79,26 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
|
||||||
count := indexState[indexName].IndexDocs
|
count := indexState[indexName].IndexDocs
|
||||||
sourceDocs := index.Source.Docs
|
sourceDocs := index.Source.Docs
|
||||||
var percent float64
|
var percent float64
|
||||||
|
var exportedPercent float64
|
||||||
if sourceDocs <= 0 {
|
if sourceDocs <= 0 {
|
||||||
percent = 100
|
percent = 100
|
||||||
|
exportedPercent = 100
|
||||||
}else{
|
}else{
|
||||||
percent = float64(count) / float64(sourceDocs) * 100
|
percent = float64(count) / float64(sourceDocs) * 100
|
||||||
if percent > 100 {
|
if percent > 100 {
|
||||||
percent = 100
|
percent = 100
|
||||||
}
|
}
|
||||||
|
exportedPercent = float64(indexState[indexName].ScrollDocs)/float64(sourceDocs) * 100
|
||||||
|
if exportedPercent > 100 {
|
||||||
|
exportedPercent = 100
|
||||||
|
}
|
||||||
}
|
}
|
||||||
//taskConfig.Indices[i].Source.Docs = sourceDocs
|
//taskConfig.Indices[i].Source.Docs = sourceDocs
|
||||||
taskConfig.Indices[i].Target.Docs = count
|
taskConfig.Indices[i].Target.Docs = count
|
||||||
taskConfig.Indices[i].Percent = util.ToFixed(percent, 2)
|
taskConfig.Indices[i].Percent = util.ToFixed(percent, 2)
|
||||||
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
||||||
|
taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren
|
||||||
|
taskConfig.Indices[i].ExportedPercent = util.ToFixed(exportedPercent, 2)
|
||||||
if count == index.Source.Docs {
|
if count == index.Source.Docs {
|
||||||
completedIndices++
|
completedIndices++
|
||||||
}
|
}
|
||||||
|
@ -141,6 +150,21 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
|
||||||
StartTime: majorTask.StartTimeInMillis,
|
StartTime: majorTask.StartTimeInMillis,
|
||||||
Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels),
|
Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels),
|
||||||
}
|
}
|
||||||
|
if taskInfo.Repeating {
|
||||||
|
_, repeatStatus, err := h.calcRepeatingStatus(&majorTask)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
taskInfo.NextRunTime = repeatStatus.NextRunTime
|
||||||
|
}
|
||||||
|
indexParts := strings.Split(uniqueIndexName, ":")
|
||||||
|
for _, index := range taskConfig.Indices {
|
||||||
|
if index.Source.Name == indexParts[0] {
|
||||||
|
taskInfo.Incremental = index.Incremental
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName)
|
subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName)
|
||||||
|
|
||||||
|
@ -167,6 +191,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
|
||||||
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating)
|
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating)
|
||||||
|
|
||||||
var partitionTaskInfos []util.MapStr
|
var partitionTaskInfos []util.MapStr
|
||||||
|
var workers = map[string]struct{}{}
|
||||||
|
|
||||||
for i, ptask := range subTasks {
|
for i, ptask := range subTasks {
|
||||||
cfg := migration_model.IndexMigrationTaskConfig{}
|
cfg := migration_model.IndexMigrationTaskConfig{}
|
||||||
|
@ -178,7 +203,10 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
taskInfo.Step = cfg.Source.Step
|
taskInfo.Step = cfg.Source.Step
|
||||||
}
|
}
|
||||||
|
instID := migration_util.GetMapStringValue(ptask.Metadata.Labels, "execution_instance_id")
|
||||||
|
if instID != "" {
|
||||||
|
workers[instID] = struct{}{}
|
||||||
|
}
|
||||||
var durationInMS int64
|
var durationInMS int64
|
||||||
var subCompletedTime int64
|
var subCompletedTime int64
|
||||||
if ptask.StartTimeInMillis > 0 {
|
if ptask.StartTimeInMillis > 0 {
|
||||||
|
@ -241,6 +269,14 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
|
||||||
}
|
}
|
||||||
taskInfo.Partitions = partitionTaskInfos
|
taskInfo.Partitions = partitionTaskInfos
|
||||||
taskInfo.CompletedPartitions = completedPartitions
|
taskInfo.CompletedPartitions = completedPartitions
|
||||||
|
for _, node := range taskConfig.Settings.Execution.Nodes.Permit {
|
||||||
|
if _, ok := workers[node.ID]; ok {
|
||||||
|
taskInfo.Workers = append(taskInfo.Workers, util.MapStr{
|
||||||
|
"id": node.ID,
|
||||||
|
"name": node.Name,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
h.WriteJSON(w, taskInfo, http.StatusOK)
|
h.WriteJSON(w, taskInfo, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -248,6 +284,8 @@ type MigrationIndexStateInfo struct {
|
||||||
ErrorPartitions int
|
ErrorPartitions int
|
||||||
IndexDocs int64
|
IndexDocs int64
|
||||||
SourceDocs int64
|
SourceDocs int64
|
||||||
|
RunningChildren int
|
||||||
|
ScrollDocs int64
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -324,9 +362,12 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m
|
||||||
taskStats.SourceDocs += cfg.Source.DocCount
|
taskStats.SourceDocs += cfg.Source.DocCount
|
||||||
st := indexState[indexName]
|
st := indexState[indexName]
|
||||||
st.SourceDocs += cfg.Source.DocCount
|
st.SourceDocs += cfg.Source.DocCount
|
||||||
indexState[indexName] = st
|
scrollDocs := migration_util.GetMapIntValue(taskLabels, "scrolled_docs")
|
||||||
|
st.ScrollDocs += scrollDocs
|
||||||
|
|
||||||
if subTask.Status == task.StatusRunning {
|
if subTask.Status == task.StatusRunning {
|
||||||
|
st.RunningChildren++
|
||||||
|
indexState[indexName] = st
|
||||||
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
|
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -334,6 +375,7 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m
|
||||||
indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs")
|
indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs")
|
||||||
taskStats.IndexDocs += indexDocs
|
taskStats.IndexDocs += indexDocs
|
||||||
st.IndexDocs += indexDocs
|
st.IndexDocs += indexDocs
|
||||||
|
|
||||||
if subTask.Status == task.StatusError {
|
if subTask.Status == task.StatusError {
|
||||||
st.ErrorPartitions += 1
|
st.ErrorPartitions += 1
|
||||||
taskStats.ErrorPartitions += 1
|
taskStats.ErrorPartitions += 1
|
||||||
|
@ -347,7 +389,7 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m
|
||||||
}
|
}
|
||||||
|
|
||||||
taskQuery = util.MapStr{
|
taskQuery = util.MapStr{
|
||||||
"size": len(indexMigrationTaskIDs),
|
"size": len(indexMigrationTaskIDs) * 2,
|
||||||
"query": util.MapStr{
|
"query": util.MapStr{
|
||||||
"bool": util.MapStr{
|
"bool": util.MapStr{
|
||||||
"must": []util.MapStr{
|
"must": []util.MapStr{
|
||||||
|
@ -356,13 +398,13 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m
|
||||||
"parent_id": indexMigrationTaskIDs,
|
"parent_id": indexMigrationTaskIDs,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
//{
|
||||||
"term": util.MapStr{
|
// "term": util.MapStr{
|
||||||
"metadata.labels.pipeline_id": util.MapStr{
|
// "metadata.labels.pipeline_id": util.MapStr{
|
||||||
"value": "bulk_indexing",
|
// "value": "bulk_indexing",
|
||||||
},
|
// },
|
||||||
},
|
// },
|
||||||
},
|
//},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -391,10 +433,12 @@ func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_m
|
||||||
for pipelineID, pipelineContext := range pipelineContexts {
|
for pipelineID, pipelineContext := range pipelineContexts {
|
||||||
// add indexDocs of running tasks
|
// add indexDocs of running tasks
|
||||||
indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count")
|
indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count")
|
||||||
|
scrollDocs := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs")
|
||||||
taskStats.IndexDocs += indexDocs
|
taskStats.IndexDocs += indexDocs
|
||||||
indexName := pipelineIndexNames[pipelineID]
|
indexName := pipelineIndexNames[pipelineID]
|
||||||
st := indexState[indexName]
|
st := indexState[indexName]
|
||||||
st.IndexDocs += indexDocs
|
st.IndexDocs += indexDocs
|
||||||
|
st.ScrollDocs += scrollDocs
|
||||||
indexState[indexName] = st
|
indexState[indexName] = st
|
||||||
}
|
}
|
||||||
return taskStats, indexState, nil
|
return taskStats, indexState, nil
|
||||||
|
|
|
@ -5,6 +5,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClusterComparisonTaskConfig struct {
|
type ClusterComparisonTaskConfig struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Tags []string `json:"tags"`
|
||||||
Cluster struct {
|
Cluster struct {
|
||||||
Source ClusterInfo `json:"source"`
|
Source ClusterInfo `json:"source"`
|
||||||
Target ClusterInfo `json:"target"`
|
Target ClusterInfo `json:"target"`
|
||||||
|
@ -30,7 +32,9 @@ type ClusterComparisonIndexConfig struct {
|
||||||
|
|
||||||
// only used in API
|
// only used in API
|
||||||
ScrollPercent float64 `json:"scroll_percent,omitempty"`
|
ScrollPercent float64 `json:"scroll_percent,omitempty"`
|
||||||
|
TotalScrollDocs int64 `json:"total_scroll_docs,omitempty"`
|
||||||
ErrorPartitions int `json:"error_partitions,omitempty"`
|
ErrorPartitions int `json:"error_partitions,omitempty"`
|
||||||
|
RunningChildren int `json:"running_children,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type IndexComparisonTaskConfig struct {
|
type IndexComparisonTaskConfig struct {
|
||||||
|
|
|
@ -9,6 +9,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClusterMigrationTaskConfig struct {
|
type ClusterMigrationTaskConfig struct {
|
||||||
|
Name string `json:"name"`
|
||||||
|
Tags []string `json:"tags"`
|
||||||
Cluster struct {
|
Cluster struct {
|
||||||
Source ClusterInfo `json:"source"`
|
Source ClusterInfo `json:"source"`
|
||||||
Target ClusterInfo `json:"target"`
|
Target ClusterInfo `json:"target"`
|
||||||
|
@ -39,6 +41,8 @@ type ClusterMigrationIndexConfig struct {
|
||||||
// only used in API
|
// only used in API
|
||||||
Percent float64 `json:"percent,omitempty"`
|
Percent float64 `json:"percent,omitempty"`
|
||||||
ErrorPartitions int `json:"error_partitions,omitempty"`
|
ErrorPartitions int `json:"error_partitions,omitempty"`
|
||||||
|
RunningChildren int `json:"running_children,omitempty"`
|
||||||
|
ExportedPercent float64 `json:"exported_percent,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClusterMigrationTaskState struct {
|
type ClusterMigrationTaskState struct {
|
||||||
|
|
|
@ -38,9 +38,9 @@ func DeleteChildTasks(taskID string, taskType string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, error) {
|
func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, *task.Task, error) {
|
||||||
queryDsl := util.MapStr{
|
queryDsl := util.MapStr{
|
||||||
"size": 1,
|
"size": 2,
|
||||||
"sort": []util.MapStr{
|
"sort": []util.MapStr{
|
||||||
{
|
{
|
||||||
"metadata.labels.next_run_time": util.MapStr{
|
"metadata.labels.next_run_time": util.MapStr{
|
||||||
|
@ -69,12 +69,21 @@ func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, erro
|
||||||
}
|
}
|
||||||
tasks, err := GetTasks(queryDsl)
|
tasks, err := GetTasks(queryDsl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
if len(tasks) == 0 {
|
if len(tasks) == 0 {
|
||||||
return nil, nil
|
return nil, nil, nil
|
||||||
}
|
}
|
||||||
return &tasks[0], nil
|
var lastRunChildTask *task.Task
|
||||||
|
if tasks[0].StartTimeInMillis > 0 {
|
||||||
|
lastRunChildTask = &tasks[0]
|
||||||
|
}else{
|
||||||
|
if len(tasks) == 2 {
|
||||||
|
lastRunChildTask = &tasks[1]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &tasks[0], lastRunChildTask, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetPendingChildTasks(taskID string, taskType string) ([]task.Task, error) {
|
func GetPendingChildTasks(taskID string, taskType string) ([]task.Task, error) {
|
||||||
|
@ -233,7 +242,7 @@ func UpdateStoppedChildTasksToReady(taskItem *task.Task, taskType string) error
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"terms": util.MapStr{
|
"terms": util.MapStr{
|
||||||
"status": []string{task.StatusError, task.StatusStopped},
|
"status": []string{task.StatusError, task.StatusStopped, task.StatusComplete},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue