From d5d46de854b18134f2507b90a577a84369d6d06e Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Mon, 3 Apr 2023 17:52:24 +0800 Subject: [PATCH] [migration] improve pending_stop & es error handling --- plugin/migration/api.go | 120 ++--------------------------------- plugin/migration/pipeline.go | 92 ++++++++++++++++++--------- 2 files changed, 68 insertions(+), 144 deletions(-) diff --git a/plugin/migration/api.go b/plugin/migration/api.go index e368b94e..a458e7f4 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -7,12 +7,13 @@ package migration import ( "context" "fmt" - "infini.sh/framework/core/api/rbac/enum" "net/http" "strconv" "strings" "time" + "infini.sh/framework/core/api/rbac/enum" + log "github.com/cihub/seelog" "infini.sh/console/model" "infini.sh/framework/core/api" @@ -289,62 +290,12 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ }, http.StatusNotFound) return } - //if task2.IsEnded(obj.Status) { - // h.WriteJSON(w, util.MapStr{ - // "success": true, - // }, 200) - // return - //} - //query all pipeline task(scroll/bulk_indexing) and then stop it - err = stopPipelineTasks(id) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) + if task2.IsEnded(obj.Status) { + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) return } - if obj.Metadata.Labels != nil && obj.Metadata.Labels["business_id"] == "cluster_migration" { - //update status of subtask to pending stop - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": obj.ID, - }, - }, - }, - { - "term": util.MapStr{ - "status": util.MapStr{ - "value": task2.StatusRunning, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.business_id": util.MapStr{ - "value": "index_migration", - }, - }, - }, - }, - }, - } - queryDsl := util.MapStr{ - "query": query, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop), - }, - } - - err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl)) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - } obj.Status = task2.StatusPendingStop err = orm.Update(nil, &obj) if err != nil { @@ -987,65 +938,6 @@ func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, }, http.StatusOK) } -func stopPipelineTasks(parentID string) error { - queryDsl := util.MapStr{ - "size": 1000, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": parentID, - }, - }, - }, - { - "terms": util.MapStr{ - "metadata.labels.pipeline_id": []string{"es_scroll", "bulk_indexing"}, - }, - }, - }, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(queryDsl), - } - err, result := orm.Search(task2.Task{}, &q) - if err != nil { - return err - } - - for _, hit := range result.Result { - buf, err := util.ToJSONBytes(hit) - if err != nil { - return err - } - tk := task2.Task{} - err = util.FromJSONBytes(buf, &tk) - if err != nil { - return err - } - if tk.Metadata.Labels != nil { - if instID, ok := tk.Metadata.Labels["execution_instance_id"].(string); ok { - inst := model.Instance{} - inst.ID = instID - _, err = orm.Get(&inst) - if err != nil { - return err - } - err = inst.StopPipeline(context.Background(), tk.ID) - if err != nil { - log.Error(err) - continue - } - } - } - } - return nil -} - func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats MajorTaskState, err error) { taskQuery := util.MapStr{ "size": 500, diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index eae7ce8c..695ade42 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -211,20 +211,63 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { }, } + // saved is_split if the following steps failed + defer func() { + p.sendMajorTaskNotification(taskItem) + p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ + Success: true, + }, fmt.Sprintf("success to start task [%s]", taskItem.ID)) + }() + esClient := elastic.GetClient(p.config.Elasticsearch) _, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl)) if err != nil { - return err + log.Errorf("failed to update sub task status, err: %v", err) + return nil } taskItem.Status = task2.StatusRunning - p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ - Success: true, - }, fmt.Sprintf("success to start task [%s]", taskItem.ID)) return nil } func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) error { + //update status of subtask to pending stop + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskItem.ID, + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusRunning, task2.StatusReady}, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop), + }, + } + + err := orm.UpdateBy(taskItem, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Errorf("failed to update sub task status, err: %v", err) + return nil + } //check whether all pipeline task is stopped or not, then update task status q := util.MapStr{ "size": 200, @@ -238,7 +281,7 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e }, { "terms": util.MapStr{ - "status": []string{task2.StatusRunning, task2.StatusPendingStop}, + "status": []string{task2.StatusRunning, task2.StatusPendingStop, task2.StatusReady}, }, }, }, @@ -247,7 +290,8 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e } tasks, err := p.getTasks(q) if err != nil { - return err + log.Errorf("failed to get sub tasks, err: %v", err) + return nil } // all subtask stopped or error or complete if len(tasks) == 0 { @@ -328,7 +372,8 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) { ptasks, err := p.getPipelineTasks(taskItem.ID) if err != nil { - return err + log.Errorf("failed to get pipeline tasks, err: %v", err) + return nil } var bulkTask *task2.Task for i, t := range ptasks { @@ -347,10 +392,12 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { inst.ID = instID _, err = orm.Get(inst) if err != nil { + log.Errorf("failed to get instance, err: %v", err) return err } err = inst.CreatePipeline(util.MustToJSONBytes(bulkTask.Config)) if err != nil { + log.Errorf("failed to create bulk_indexing pipeline, err: %v", err) return err } taskItem.Metadata.Labels["running_phase"] = 2 @@ -366,6 +413,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err //check whether all pipeline task is stopped or not, then update task status ptasks, err := p.getPipelineTasks(taskItem.ID) if err != nil { + log.Errorf("failed to get pipeline tasks, err: %v", err) return err } var taskIDs []string @@ -393,24 +441,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err } searchRes, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(q)) if err != nil { - return err - } - if len(searchRes.Hits.Hits) == 0 { - //check instance available - if instID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { - inst := model.Instance{} - inst.ID = instID - _, err = orm.Get(&inst) - if err != nil { - return err - } - err = inst.TryConnectWithTimeout(time.Second) - if err != nil { - if errors.Is(err, syscall.ECONNREFUSED) { - return fmt.Errorf("stoping task [%s] error: %w", taskItem.ID, err) - } - } - } + log.Errorf("failed to get latest pipeline status, err: %v", err) return nil } MainLoop: @@ -423,7 +454,8 @@ MainLoop: inst.ID = instID _, err = orm.Get(&inst) if err != nil { - return err + log.Errorf("failed to get instance, err: %v", err) + return nil } hasStopped := true for _, pipelineID := range taskIDs { @@ -492,7 +524,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { ptasks, err := p.getPipelineTasks(taskItem.ID) if err != nil { log.Errorf("getPipelineTasks failed, err: %+v", err) - return err + return nil } for i, t := range ptasks { if t.Metadata.Labels != nil { @@ -729,7 +761,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ Success: true, - }, fmt.Sprintf("dispatch task [%s] to instance ", taskItem.ID)) + }, fmt.Sprintf("dispatch task [%s] to instance [%s]", taskItem.ID, instance.ID)) return nil } @@ -1335,7 +1367,7 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) if err != nil { log.Errorf("search es failed, err: %v", err) - return taskState, err + return taskState, nil } if v, ok := res.Aggregations["total_docs"].Value.(float64); ok { taskState.IndexDocs = v