diff --git a/Jenkinsfile-linux-amd64 b/Jenkinsfile-linux-amd64 new file mode 100644 index 00000000..86249276 --- /dev/null +++ b/Jenkinsfile-linux-amd64 @@ -0,0 +1,41 @@ +pipeline { + agent none + + environment { + CI = 'true' + } + + stages { + + + stage('Prepare Web Packages') { + + agent { + label 'linux' + } + + steps { + catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE'){ + sh 'cd /home/jenkins/go/src/infini.sh/console && git stash && git pull origin master && make clean' + sh 'cd /home/jenkins/go/src/infini.sh/console/ && true|| rm -rif web' + sh 'cd /home/jenkins/go/src/infini.sh/console/ && true || git clone ssh://git@git.infini.ltd:64221/infini/console-ui.git web' + sh 'cd /home/jenkins/go/src/infini.sh/console/web && git pull origin master' + sh 'cd /home/jenkins/go/src/infini.sh/console/web && git stash' + sh 'cd /home/jenkins/go/src/infini.sh/console/web && cnpm install' + sh 'cd /home/jenkins/go/src/infini.sh/console/web && cnpm run build' + sh 'cd /home/jenkins/go/src/infini.sh/console && git pull origin master && make config build-linux-amd64' + sh label: 'copy-license', script: 'cd /home/jenkins/go/src/infini.sh/console && cp ../framework/LICENSE bin && cat ../framework/NOTICE NOTICE > bin/NOTICE' + sh label: 'copy-configs', script: 'cd /home/jenkins/go/src/infini.sh/console && mkdir -p bin/config && cp config/*.json bin/config && cp config/*.tpl bin/config' + + sh label: 'package-linux-amd64', script: 'cd /home/jenkins/go/src/infini.sh/console/bin && tar cfz ${WORKSPACE}/console-$VERSION-$BUILD_NUMBER-linux-amd64.tar.gz console-linux-amd64 console.yml LICENSE NOTICE config' + archiveArtifacts artifacts: 'console-$VERSION-$BUILD_NUMBER-*.*', fingerprint: true, followSymlinks: true, onlyIfSuccessful: false + } + } + } + + + + +} + +} diff --git a/console.yml b/console.yml index 09a5a601..590a4206 100644 --- a/console.yml +++ b/console.yml @@ -64,4 +64,4 @@ badger: # authorize_url: "https://github.com/login/oauth/authorize" # token_url: "https://github.com/login/oauth/access_token" # redirect_url: "" -# scopes: ["repo"] \ No newline at end of file +# scopes: [] \ No newline at end of file diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 4c9faadc..d2ad25ce 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -53,7 +53,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro cfg := DispatcherConfig{} if err := c.Unpack(&cfg); err != nil { - log.Error(err) + log.Errorf("failed to unpack config, err: %v", err) return nil, fmt.Errorf("failed to unpack the configuration of migration dispatcher processor: %s", err) } if cfg.IndexName == "" || cfg.LogIndexName == "" { @@ -68,7 +68,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro } } else { err = fmt.Errorf("parse config elastic.orm error: %w", err) - log.Error(err) + log.Errorf("failed to parse elastic.orm, err: %v", err) return nil, err } } @@ -88,7 +88,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro } state, err := processor.getInstanceTaskState() if err != nil { - log.Error(err) + log.Errorf("failed to get instance task state, err: %v", err) return nil, err } processor.state = state @@ -118,7 +118,7 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { return nil } if t.Metadata.Labels == nil { - log.Error("got migration task with empty labels, skip handling: %v", t) + log.Errorf("got migration task with empty labels, skip handling: %v", t) continue } if t.Metadata.Labels["business_id"] == "cluster_migration" { @@ -131,6 +131,9 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { case task2.StatusPendingStop: err = p.handlePendingStopMajorTask(&t) } + if err != nil { + log.Errorf("failed to handling major task [%s]: [%v]", t.ID, err) + } } else if t.Metadata.Labels["business_id"] == "index_migration" { //handle sub migration task switch t.Status { @@ -140,6 +143,9 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { err = p.handleRunningSubTask(&t) case task2.StatusPendingStop: err = p.handlePendingStopSubTask(&t) + if err != nil { + log.Errorf("failed to handling sub task [%s]: [%v]", t.ID, err) + } } } if err != nil { @@ -314,7 +320,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { for _, pipelineID := range state.PipelineIds { err = inst.DeletePipeline(pipelineID) if err != nil { - log.Error(err) + log.Errorf("delete pipeline failed, err: %v", err) continue } selector := util.MapStr{ @@ -325,7 +331,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { //clear queue err = inst.DeleteQueueBySelector(selector) if err != nil { - log.Error(err) + log.Errorf("delete queue failed, err: %v", err) } } } @@ -466,7 +472,7 @@ MainLoop: hasStopped = false break } - log.Error(err) + log.Errorf("failed to stop pipeline, err: %v", err) } } if hasStopped { @@ -489,7 +495,7 @@ MainLoop: for _, pipelineID := range taskIDs { err = inst.DeletePipeline(pipelineID) if err != nil { - log.Error(err) + log.Errorf("failed to delete pipeline, err: %v", err) continue } selector := util.MapStr{ @@ -500,7 +506,7 @@ MainLoop: //clear queue err = inst.DeleteQueueBySelector(selector) if err != nil { - log.Error(err) + log.Errorf("failed to delete queue, err: %v", err) } } if st, ok := p.state[instanceID]; ok { @@ -532,6 +538,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { //query split pipeline task ptasks, err := p.getPipelineTasks(taskItem.ID) if err != nil { + log.Errorf("getPipelineTasks failed, err: %+v", err) return err } for i, t := range ptasks { @@ -727,6 +734,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { //call instance api to create pipeline task err = instance.CreatePipeline(util.MustToJSONBytes(scrollTask.Config)) if err != nil { + log.Errorf("create scroll pipeline failed, err: %+v", err) return err } //err = instance.CreatePipeline(util.MustToJSONBytes(bulkTask.Config)) @@ -813,7 +821,7 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc tempInst.ID = node.ID _, err = orm.Get(&tempInst) if err != nil { - log.Error(err) + log.Errorf("failed to get instance, err: %v", err) continue } err = tempInst.TryConnectWithTimeout(time.Second) @@ -894,7 +902,7 @@ func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, logItem esClient := elastic.GetClient(p.config.Elasticsearch) _, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh) if err != nil { - log.Error(err) + log.Errorf("failed to update task, err: %v", err) } if logItem != nil { event.SaveLog(event.Event{ @@ -1179,6 +1187,7 @@ func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error) esClient := elastic.GetClient(p.config.Elasticsearch) res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) if err != nil { + log.Errorf("query tasks from es failed, err: %v", err) return nil, err } if res.GetTotal() == 0 { @@ -1188,11 +1197,13 @@ func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error) for _, hit := range res.Hits.Hits { buf, err := util.ToJSONBytes(hit.Source) if err != nil { + log.Errorf("marshal task json failed, err: %v", err) return nil, err } tk := task2.Task{} err = util.FromJSONBytes(buf, &tk) if err != nil { + log.Errorf("unmarshal task json failed, err: %v", err) return nil, err } migrationTasks = append(migrationTasks, tk) @@ -1203,6 +1214,7 @@ func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error) func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCompleteState, error) { ptasks, err := p.getPipelineTasks(subTask.ID) if err != nil { + log.Errorf("failed to get pipeline tasks, err: %v", err) return nil, err } var pids []string @@ -1251,6 +1263,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo esClient := elastic.GetClient(p.config.Elasticsearch) res, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(query)) if err != nil { + log.Errorf("search task log from es failed, err: %v", err) return nil, err } var ( @@ -1262,6 +1275,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo } totalDocs, err := util.MapStr(cfg).GetValue("source.doc_count") if err != nil { + log.Errorf("failed to get source.doc_count, err: %v", err) return nil, err } @@ -1325,6 +1339,7 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo inst.ID = instID _, err = orm.Get(&inst) if err != nil { + log.Errorf("get instance failed, err: %v", err) return nil, err } err = inst.TryConnectWithTimeout(time.Second * 3) @@ -1376,6 +1391,7 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat esClient := elastic.GetClient(p.config.Elasticsearch) res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) if err != nil { + log.Errorf("search es failed, err: %v", err) return taskState, err } if v, ok := res.Aggregations["total_docs"].Value.(float64); ok { @@ -1436,6 +1452,7 @@ func (p *DispatcherProcessor) getInstanceTaskState() (map[string]DispatcherState esClient := elastic.GetClient(p.config.Elasticsearch) res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) if err != nil { + log.Errorf("search es failed, err: %v", err) return nil, err } state := map[string]DispatcherState{}