diff --git a/main.go b/main.go index eef232f8..d383d906 100644 --- a/main.go +++ b/main.go @@ -131,7 +131,6 @@ func main() { orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") orm.RegisterSchemaWithIndexName(task1.Task{}, "task") - orm.RegisterSchemaWithIndexName(task1.Log{}, "task-log") orm.RegisterSchemaWithIndexName(model.Layout{}, "layout") orm.RegisterSchemaWithIndexName(model.Notification{}, "notification") api.RegisterSchema() diff --git a/plugin/migration/api.go b/plugin/migration/api.go index e988ef76..a458e7f4 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -7,12 +7,13 @@ package migration import ( "context" "fmt" - "infini.sh/framework/core/api/rbac/enum" "net/http" "strconv" "strings" "time" + "infini.sh/framework/core/api/rbac/enum" + log "github.com/cihub/seelog" "infini.sh/console/model" "infini.sh/framework/core/api" @@ -81,7 +82,7 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re clusterTaskConfig.Cluster.Target.Distribution = dstClusterCfg.Distribution t := task2.Task{ Metadata: task2.Metadata{ - Type: "pipeline", + Type: "cluster_migration", Labels: util.MapStr{ "business_id": "cluster_migration", "source_cluster_id": clusterTaskConfig.Cluster.Source.Id, @@ -289,62 +290,12 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ }, http.StatusNotFound) return } - //if task2.IsEnded(obj.Status) { - // h.WriteJSON(w, util.MapStr{ - // "success": true, - // }, 200) - // return - //} - //query all pipeline task(scroll/bulk_indexing) and then stop it - err = stopPipelineTasks(id) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) + if task2.IsEnded(obj.Status) { + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) return } - if obj.Metadata.Labels != nil && obj.Metadata.Labels["business_id"] == "cluster_migration" { - //update status of subtask to pending stop - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": obj.ID, - }, - }, - }, - { - "term": util.MapStr{ - "status": util.MapStr{ - "value": task2.StatusRunning, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.labels.business_id": util.MapStr{ - "value": "index_migration", - }, - }, - }, - }, - }, - } - queryDsl := util.MapStr{ - "query": query, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop), - }, - } - - err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl)) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - } obj.Status = task2.StatusPendingStop err = orm.Update(nil, &obj) if err != nil { @@ -912,48 +863,6 @@ func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps h.WriteJSON(w, countRes, http.StatusOK) } -func (h *APIHandler) getMigrationTaskLog(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("task_id") - query := util.MapStr{ - "sort": []util.MapStr{ - { - "timestamp": util.MapStr{ - "order": "asc", - }, - }, - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "minimum_should_match": 1, - "should": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": id, - }, - }, - }, { - "term": util.MapStr{ - "id": util.MapStr{ - "value": id, - }, - }, - }, - }, - }, - }, - } - - q := &orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - err, _ := orm.Search(task2.Log{}, q) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - } -} - func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") @@ -1029,65 +938,6 @@ func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, }, http.StatusOK) } -func stopPipelineTasks(parentID string) error { - queryDsl := util.MapStr{ - "size": 1000, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": parentID, - }, - }, - }, - { - "terms": util.MapStr{ - "metadata.labels.pipeline_id": []string{"es_scroll", "bulk_indexing"}, - }, - }, - }, - }, - }, - } - q := orm.Query{ - RawQuery: util.MustToJSONBytes(queryDsl), - } - err, result := orm.Search(task2.Task{}, &q) - if err != nil { - return err - } - - for _, hit := range result.Result { - buf, err := util.ToJSONBytes(hit) - if err != nil { - return err - } - tk := task2.Task{} - err = util.FromJSONBytes(buf, &tk) - if err != nil { - return err - } - if tk.Metadata.Labels != nil { - if instID, ok := tk.Metadata.Labels["execution_instance_id"].(string); ok { - inst := model.Instance{} - inst.ID = instID - _, err = orm.Get(&inst) - if err != nil { - return err - } - err = inst.StopPipeline(context.Background(), tk.ID) - if err != nil { - log.Error(err) - continue - } - } - } - } - return nil -} - func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats MajorTaskState, err error) { taskQuery := util.MapStr{ "size": 500, diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index f7ccfccd..695ade42 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -154,19 +154,10 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { t.Status = task2.StatusError tn := time.Now() t.CompletedTime = &tn - p.saveTaskAndWriteLog(&t, &task2.Log{ - ID: util.GetUUID(), - TaskId: t.ID, - Status: task2.StatusError, - Type: t.Metadata.Type, - Config: t.Config, - Result: &task2.LogResult{ - Success: false, - Error: err.Error(), - }, - Message: fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{ + Success: false, + Error: err.Error(), + }, fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err)) } } //es index refresh @@ -220,30 +211,63 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { }, } + // saved is_split if the following steps failed + defer func() { + p.sendMajorTaskNotification(taskItem) + p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ + Success: true, + }, fmt.Sprintf("success to start task [%s]", taskItem.ID)) + }() + esClient := elastic.GetClient(p.config.Elasticsearch) _, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl)) if err != nil { - return err - } - taskLog := &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: task2.StatusRunning, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Result: &task2.LogResult{ - Success: true, - }, - Message: fmt.Sprintf("success to start task [%s]", taskItem.ID), - Timestamp: time.Now().UTC(), + log.Errorf("failed to update sub task status, err: %v", err) + return nil } taskItem.Status = task2.StatusRunning - p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, taskLog, "") return nil } func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) error { + //update status of subtask to pending stop + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskItem.ID, + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task2.StatusRunning, task2.StatusReady}, + }, + }, + { + "term": util.MapStr{ + "metadata.labels.business_id": util.MapStr{ + "value": "index_migration", + }, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop), + }, + } + + err := orm.UpdateBy(taskItem, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Errorf("failed to update sub task status, err: %v", err) + return nil + } //check whether all pipeline task is stopped or not, then update task status q := util.MapStr{ "size": 200, @@ -257,7 +281,7 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e }, { "terms": util.MapStr{ - "status": []string{task2.StatusRunning, task2.StatusPendingStop}, + "status": []string{task2.StatusRunning, task2.StatusPendingStop, task2.StatusReady}, }, }, }, @@ -266,21 +290,14 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e } tasks, err := p.getTasks(q) if err != nil { - return err + log.Errorf("failed to get sub tasks, err: %v", err) + return nil } // all subtask stopped or error or complete if len(tasks) == 0 { taskItem.Status = task2.StatusStopped p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: task2.StatusStopped, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID)) } return nil } @@ -295,15 +312,7 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error tn := time.Now() taskItem.CompletedTime = &tn p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: taskItem.Status, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Message: fmt.Sprintf("task [%s] is complete", taskItem.ID), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is complete", taskItem.ID)) } return nil } @@ -355,24 +364,16 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { tn := time.Now() taskItem.CompletedTime = &tn - p.saveTaskAndWriteLog(taskItem, &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: taskItem.Status, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Result: &task2.LogResult{ - Success: state.Error == "", - Error: state.Error, - }, - Message: fmt.Sprintf("task [%s] is complete", taskItem.ID), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ + Success: state.Error == "", + Error: state.Error, + }, fmt.Sprintf("task [%s] is complete", taskItem.ID)) } else { if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) { ptasks, err := p.getPipelineTasks(taskItem.ID) if err != nil { - return err + log.Errorf("failed to get pipeline tasks, err: %v", err) + return nil } var bulkTask *task2.Task for i, t := range ptasks { @@ -391,16 +392,18 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { inst.ID = instID _, err = orm.Get(inst) if err != nil { + log.Errorf("failed to get instance, err: %v", err) return err } err = inst.CreatePipeline(util.MustToJSONBytes(bulkTask.Config)) if err != nil { + log.Errorf("failed to create bulk_indexing pipeline, err: %v", err) return err } taskItem.Metadata.Labels["running_phase"] = 2 } } - p.saveTaskAndWriteLog(taskItem, nil, "wait_for") + p.saveTaskAndWriteLog(taskItem, "wait_for", nil, "") } } return nil @@ -410,6 +413,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err //check whether all pipeline task is stopped or not, then update task status ptasks, err := p.getPipelineTasks(taskItem.ID) if err != nil { + log.Errorf("failed to get pipeline tasks, err: %v", err) return err } var taskIDs []string @@ -437,24 +441,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err } searchRes, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(q)) if err != nil { - return err - } - if len(searchRes.Hits.Hits) == 0 { - //check instance available - if instID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { - inst := model.Instance{} - inst.ID = instID - _, err = orm.Get(&inst) - if err != nil { - return err - } - err = inst.TryConnectWithTimeout(time.Second) - if err != nil { - if errors.Is(err, syscall.ECONNREFUSED) { - return fmt.Errorf("stoping task [%s] error: %w", taskItem.ID, err) - } - } - } + log.Errorf("failed to get latest pipeline status, err: %v", err) return nil } MainLoop: @@ -467,7 +454,8 @@ MainLoop: inst.ID = instID _, err = orm.Get(&inst) if err != nil { - return err + log.Errorf("failed to get instance, err: %v", err) + return nil } hasStopped := true for _, pipelineID := range taskIDs { @@ -519,15 +507,7 @@ MainLoop: p.state[instanceID] = st } } - p.saveTaskAndWriteLog(taskItem, &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: task2.StatusStopped, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID)) return nil } @@ -544,7 +524,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { ptasks, err := p.getPipelineTasks(taskItem.ID) if err != nil { log.Errorf("getPipelineTasks failed, err: %+v", err) - return err + return nil } for i, t := range ptasks { if t.Metadata.Labels != nil { @@ -779,19 +759,9 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { taskItem.Status = task2.StatusRunning taskItem.StartTimeInMillis = time.Now().UnixMilli() - taskLog := &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: task2.StatusRunning, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Result: &task2.LogResult{ - Success: true, - }, - Message: fmt.Sprintf("dispatch task [%s] to instance ", taskItem.ID), - Timestamp: time.Now().UTC(), - } - p.saveTaskAndWriteLog(taskItem, taskLog, "wait_for") + p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + Success: true, + }, fmt.Sprintf("dispatch task [%s] to instance [%s]", taskItem.ID, instance.ID)) return nil } @@ -903,32 +873,32 @@ func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error) return p.getTasks(queryDsl) } -func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, logItem *task2.Log, refresh string) { +func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh string, taskResult *task2.TaskResult, message string) { esClient := elastic.GetClient(p.config.Elasticsearch) _, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh) if err != nil { log.Errorf("failed to update task, err: %v", err) } - if logItem != nil { + if message != "" { event.SaveLog(event.Event{ Metadata: event.EventMetadata{ - Category: "migration", + Category: "task", Name: "logging", Datatype: "event", Labels: util.MapStr{ - "task_id": logItem.TaskId, + "task_type": taskItem.Metadata.Type, + "task_id": taskItem.ID, "parent_task_id": taskItem.ParentId, "retry_no": taskItem.RetryTimes, }, }, Fields: util.MapStr{ - "migration": util.MapStr{ + "task": util.MapStr{ "logging": util.MapStr{ - "config": logItem.Config, - "context": logItem.Context, - "status": logItem.Status, - "message": logItem.Message, - "result": logItem.Result, + "config": util.MustToJSON(taskItem.Config), + "status": taskItem.Status, + "message": message, + "result": taskResult, }, }, }, @@ -1046,7 +1016,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro Status: task2.StatusReady, StartTimeInMillis: time.Now().UnixMilli(), Metadata: task2.Metadata{ - Type: "pipeline", + Type: "index_migration", Labels: util.MapStr{ "business_id": "index_migration", "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, @@ -1134,7 +1104,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro Runnable: true, Status: task2.StatusReady, Metadata: task2.Metadata{ - Type: "pipeline", + Type: "index_migration", Labels: util.MapStr{ "business_id": "index_migration", "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, @@ -1397,7 +1367,7 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) if err != nil { log.Errorf("search es failed, err: %v", err) - return taskState, err + return taskState, nil } if v, ok := res.Aggregations["total_docs"].Value.(float64); ok { taskState.IndexDocs = v