Merge branch 'master' of ssh://git.infini.ltd:64221/infini/console
This commit is contained in:
commit
3987c5db37
|
@ -0,0 +1,41 @@
|
||||||
|
pipeline {
|
||||||
|
agent none
|
||||||
|
|
||||||
|
environment {
|
||||||
|
CI = 'true'
|
||||||
|
}
|
||||||
|
|
||||||
|
stages {
|
||||||
|
|
||||||
|
|
||||||
|
stage('Prepare Web Packages') {
|
||||||
|
|
||||||
|
agent {
|
||||||
|
label 'linux'
|
||||||
|
}
|
||||||
|
|
||||||
|
steps {
|
||||||
|
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE'){
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console && git stash && git pull origin master && make clean'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/ && true|| rm -rif web'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/ && true || git clone ssh://git@git.infini.ltd:64221/infini/console-ui.git web'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/web && git pull origin master'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/web && git stash'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/web && cnpm install'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console/web && cnpm run build'
|
||||||
|
sh 'cd /home/jenkins/go/src/infini.sh/console && git pull origin master && make config build-linux-amd64'
|
||||||
|
sh label: 'copy-license', script: 'cd /home/jenkins/go/src/infini.sh/console && cp ../framework/LICENSE bin && cat ../framework/NOTICE NOTICE > bin/NOTICE'
|
||||||
|
sh label: 'copy-configs', script: 'cd /home/jenkins/go/src/infini.sh/console && mkdir -p bin/config && cp config/*.json bin/config && cp config/*.tpl bin/config'
|
||||||
|
|
||||||
|
sh label: 'package-linux-amd64', script: 'cd /home/jenkins/go/src/infini.sh/console/bin && tar cfz ${WORKSPACE}/console-$VERSION-$BUILD_NUMBER-linux-amd64.tar.gz console-linux-amd64 console.yml LICENSE NOTICE config'
|
||||||
|
archiveArtifacts artifacts: 'console-$VERSION-$BUILD_NUMBER-*.*', fingerprint: true, followSymlinks: true, onlyIfSuccessful: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -64,4 +64,4 @@ badger:
|
||||||
# authorize_url: "https://github.com/login/oauth/authorize"
|
# authorize_url: "https://github.com/login/oauth/authorize"
|
||||||
# token_url: "https://github.com/login/oauth/access_token"
|
# token_url: "https://github.com/login/oauth/access_token"
|
||||||
# redirect_url: ""
|
# redirect_url: ""
|
||||||
# scopes: ["repo"]
|
# scopes: []
|
|
@ -53,7 +53,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro
|
||||||
|
|
||||||
cfg := DispatcherConfig{}
|
cfg := DispatcherConfig{}
|
||||||
if err := c.Unpack(&cfg); err != nil {
|
if err := c.Unpack(&cfg); err != nil {
|
||||||
log.Error(err)
|
log.Errorf("failed to unpack config, err: %v", err)
|
||||||
return nil, fmt.Errorf("failed to unpack the configuration of migration dispatcher processor: %s", err)
|
return nil, fmt.Errorf("failed to unpack the configuration of migration dispatcher processor: %s", err)
|
||||||
}
|
}
|
||||||
if cfg.IndexName == "" || cfg.LogIndexName == "" {
|
if cfg.IndexName == "" || cfg.LogIndexName == "" {
|
||||||
|
@ -68,7 +68,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err = fmt.Errorf("parse config elastic.orm error: %w", err)
|
err = fmt.Errorf("parse config elastic.orm error: %w", err)
|
||||||
log.Error(err)
|
log.Errorf("failed to parse elastic.orm, err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -88,7 +88,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro
|
||||||
}
|
}
|
||||||
state, err := processor.getInstanceTaskState()
|
state, err := processor.getInstanceTaskState()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Errorf("failed to get instance task state, err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
processor.state = state
|
processor.state = state
|
||||||
|
@ -118,7 +118,7 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if t.Metadata.Labels == nil {
|
if t.Metadata.Labels == nil {
|
||||||
log.Error("got migration task with empty labels, skip handling: %v", t)
|
log.Errorf("got migration task with empty labels, skip handling: %v", t)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if t.Metadata.Labels["business_id"] == "cluster_migration" {
|
if t.Metadata.Labels["business_id"] == "cluster_migration" {
|
||||||
|
@ -131,6 +131,9 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
|
||||||
case task2.StatusPendingStop:
|
case task2.StatusPendingStop:
|
||||||
err = p.handlePendingStopMajorTask(&t)
|
err = p.handlePendingStopMajorTask(&t)
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to handling major task [%s]: [%v]", t.ID, err)
|
||||||
|
}
|
||||||
} else if t.Metadata.Labels["business_id"] == "index_migration" {
|
} else if t.Metadata.Labels["business_id"] == "index_migration" {
|
||||||
//handle sub migration task
|
//handle sub migration task
|
||||||
switch t.Status {
|
switch t.Status {
|
||||||
|
@ -140,6 +143,9 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
|
||||||
err = p.handleRunningSubTask(&t)
|
err = p.handleRunningSubTask(&t)
|
||||||
case task2.StatusPendingStop:
|
case task2.StatusPendingStop:
|
||||||
err = p.handlePendingStopSubTask(&t)
|
err = p.handlePendingStopSubTask(&t)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to handling sub task [%s]: [%v]", t.ID, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -314,7 +320,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
|
||||||
for _, pipelineID := range state.PipelineIds {
|
for _, pipelineID := range state.PipelineIds {
|
||||||
err = inst.DeletePipeline(pipelineID)
|
err = inst.DeletePipeline(pipelineID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Errorf("delete pipeline failed, err: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
selector := util.MapStr{
|
selector := util.MapStr{
|
||||||
|
@ -325,7 +331,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
|
||||||
//clear queue
|
//clear queue
|
||||||
err = inst.DeleteQueueBySelector(selector)
|
err = inst.DeleteQueueBySelector(selector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Errorf("delete queue failed, err: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -466,7 +472,7 @@ MainLoop:
|
||||||
hasStopped = false
|
hasStopped = false
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
log.Error(err)
|
log.Errorf("failed to stop pipeline, err: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if hasStopped {
|
if hasStopped {
|
||||||
|
@ -489,7 +495,7 @@ MainLoop:
|
||||||
for _, pipelineID := range taskIDs {
|
for _, pipelineID := range taskIDs {
|
||||||
err = inst.DeletePipeline(pipelineID)
|
err = inst.DeletePipeline(pipelineID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Errorf("failed to delete pipeline, err: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
selector := util.MapStr{
|
selector := util.MapStr{
|
||||||
|
@ -500,7 +506,7 @@ MainLoop:
|
||||||
//clear queue
|
//clear queue
|
||||||
err = inst.DeleteQueueBySelector(selector)
|
err = inst.DeleteQueueBySelector(selector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Errorf("failed to delete queue, err: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if st, ok := p.state[instanceID]; ok {
|
if st, ok := p.state[instanceID]; ok {
|
||||||
|
@ -532,6 +538,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
|
||||||
//query split pipeline task
|
//query split pipeline task
|
||||||
ptasks, err := p.getPipelineTasks(taskItem.ID)
|
ptasks, err := p.getPipelineTasks(taskItem.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Errorf("getPipelineTasks failed, err: %+v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for i, t := range ptasks {
|
for i, t := range ptasks {
|
||||||
|
@ -727,6 +734,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
|
||||||
//call instance api to create pipeline task
|
//call instance api to create pipeline task
|
||||||
err = instance.CreatePipeline(util.MustToJSONBytes(scrollTask.Config))
|
err = instance.CreatePipeline(util.MustToJSONBytes(scrollTask.Config))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Errorf("create scroll pipeline failed, err: %+v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
//err = instance.CreatePipeline(util.MustToJSONBytes(bulkTask.Config))
|
//err = instance.CreatePipeline(util.MustToJSONBytes(bulkTask.Config))
|
||||||
|
@ -813,7 +821,7 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc
|
||||||
tempInst.ID = node.ID
|
tempInst.ID = node.ID
|
||||||
_, err = orm.Get(&tempInst)
|
_, err = orm.Get(&tempInst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Errorf("failed to get instance, err: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
err = tempInst.TryConnectWithTimeout(time.Second)
|
err = tempInst.TryConnectWithTimeout(time.Second)
|
||||||
|
@ -894,7 +902,7 @@ func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, logItem
|
||||||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||||
_, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh)
|
_, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Errorf("failed to update task, err: %v", err)
|
||||||
}
|
}
|
||||||
if logItem != nil {
|
if logItem != nil {
|
||||||
event.SaveLog(event.Event{
|
event.SaveLog(event.Event{
|
||||||
|
@ -1179,6 +1187,7 @@ func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error)
|
||||||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||||
res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query))
|
res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Errorf("query tasks from es failed, err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if res.GetTotal() == 0 {
|
if res.GetTotal() == 0 {
|
||||||
|
@ -1188,11 +1197,13 @@ func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error)
|
||||||
for _, hit := range res.Hits.Hits {
|
for _, hit := range res.Hits.Hits {
|
||||||
buf, err := util.ToJSONBytes(hit.Source)
|
buf, err := util.ToJSONBytes(hit.Source)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Errorf("marshal task json failed, err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
tk := task2.Task{}
|
tk := task2.Task{}
|
||||||
err = util.FromJSONBytes(buf, &tk)
|
err = util.FromJSONBytes(buf, &tk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Errorf("unmarshal task json failed, err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
migrationTasks = append(migrationTasks, tk)
|
migrationTasks = append(migrationTasks, tk)
|
||||||
|
@ -1203,6 +1214,7 @@ func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error)
|
||||||
func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCompleteState, error) {
|
func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCompleteState, error) {
|
||||||
ptasks, err := p.getPipelineTasks(subTask.ID)
|
ptasks, err := p.getPipelineTasks(subTask.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Errorf("failed to get pipeline tasks, err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var pids []string
|
var pids []string
|
||||||
|
@ -1251,6 +1263,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo
|
||||||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||||
res, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(query))
|
res, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(query))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Errorf("search task log from es failed, err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var (
|
var (
|
||||||
|
@ -1262,6 +1275,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo
|
||||||
}
|
}
|
||||||
totalDocs, err := util.MapStr(cfg).GetValue("source.doc_count")
|
totalDocs, err := util.MapStr(cfg).GetValue("source.doc_count")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Errorf("failed to get source.doc_count, err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1325,6 +1339,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo
|
||||||
inst.ID = instID
|
inst.ID = instID
|
||||||
_, err = orm.Get(&inst)
|
_, err = orm.Get(&inst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Errorf("get instance failed, err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = inst.TryConnectWithTimeout(time.Second * 3)
|
err = inst.TryConnectWithTimeout(time.Second * 3)
|
||||||
|
@ -1376,6 +1391,7 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat
|
||||||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||||
res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query))
|
res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Errorf("search es failed, err: %v", err)
|
||||||
return taskState, err
|
return taskState, err
|
||||||
}
|
}
|
||||||
if v, ok := res.Aggregations["total_docs"].Value.(float64); ok {
|
if v, ok := res.Aggregations["total_docs"].Value.(float64); ok {
|
||||||
|
@ -1436,6 +1452,7 @@ func (p *DispatcherProcessor) getInstanceTaskState() (map[string]DispatcherState
|
||||||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||||
res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query))
|
res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Errorf("search es failed, err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
state := map[string]DispatcherState{}
|
state := map[string]DispatcherState{}
|
||||||
|
|
Loading…
Reference in New Issue