diff --git a/main.go b/main.go index eef232f8..d383d906 100644 --- a/main.go +++ b/main.go @@ -131,7 +131,6 @@ func main() { orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") orm.RegisterSchemaWithIndexName(task1.Task{}, "task") - orm.RegisterSchemaWithIndexName(task1.Log{}, "task-log") orm.RegisterSchemaWithIndexName(model.Layout{}, "layout") orm.RegisterSchemaWithIndexName(model.Notification{}, "notification") api.RegisterSchema() diff --git a/plugin/migration/api.go b/plugin/migration/api.go index e988ef76..e368b94e 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -81,7 +81,7 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re clusterTaskConfig.Cluster.Target.Distribution = dstClusterCfg.Distribution t := task2.Task{ Metadata: task2.Metadata{ - Type: "pipeline", + Type: "cluster_migration", Labels: util.MapStr{ "business_id": "cluster_migration", "source_cluster_id": clusterTaskConfig.Cluster.Source.Id, @@ -912,48 +912,6 @@ func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps h.WriteJSON(w, countRes, http.StatusOK) } -func (h *APIHandler) getMigrationTaskLog(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("task_id") - query := util.MapStr{ - "sort": []util.MapStr{ - { - "timestamp": util.MapStr{ - "order": "asc", - }, - }, - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "minimum_should_match": 1, - "should": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": id, - }, - }, - }, { - "term": util.MapStr{ - "id": util.MapStr{ - "value": id, - }, - }, - }, - }, - }, - }, - } - - q := &orm.Query{ - RawQuery: util.MustToJSONBytes(query), - } - err, _ := orm.Search(task2.Log{}, q) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - } -} - func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { id := ps.MustGetParameter("task_id") diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index f7ccfccd..eae7ce8c 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -154,19 +154,10 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { t.Status = task2.StatusError tn := time.Now() t.CompletedTime = &tn - p.saveTaskAndWriteLog(&t, &task2.Log{ - ID: util.GetUUID(), - TaskId: t.ID, - Status: task2.StatusError, - Type: t.Metadata.Type, - Config: t.Config, - Result: &task2.LogResult{ - Success: false, - Error: err.Error(), - }, - Message: fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{ + Success: false, + Error: err.Error(), + }, fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err)) } } //es index refresh @@ -225,21 +216,11 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { if err != nil { return err } - taskLog := &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: task2.StatusRunning, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Result: &task2.LogResult{ - Success: true, - }, - Message: fmt.Sprintf("success to start task [%s]", taskItem.ID), - Timestamp: time.Now().UTC(), - } taskItem.Status = task2.StatusRunning p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, taskLog, "") + p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ + Success: true, + }, fmt.Sprintf("success to start task [%s]", taskItem.ID)) return nil } @@ -272,15 +253,7 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e if len(tasks) == 0 { taskItem.Status = task2.StatusStopped p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: task2.StatusStopped, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID)) } return nil } @@ -295,15 +268,7 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error tn := time.Now() taskItem.CompletedTime = &tn p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: taskItem.Status, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Message: fmt.Sprintf("task [%s] is complete", taskItem.ID), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is complete", taskItem.ID)) } return nil } @@ -355,19 +320,10 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { tn := time.Now() taskItem.CompletedTime = &tn - p.saveTaskAndWriteLog(taskItem, &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: taskItem.Status, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Result: &task2.LogResult{ - Success: state.Error == "", - Error: state.Error, - }, - Message: fmt.Sprintf("task [%s] is complete", taskItem.ID), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{ + Success: state.Error == "", + Error: state.Error, + }, fmt.Sprintf("task [%s] is complete", taskItem.ID)) } else { if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) { ptasks, err := p.getPipelineTasks(taskItem.ID) @@ -400,7 +356,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { taskItem.Metadata.Labels["running_phase"] = 2 } } - p.saveTaskAndWriteLog(taskItem, nil, "wait_for") + p.saveTaskAndWriteLog(taskItem, "wait_for", nil, "") } } return nil @@ -519,15 +475,7 @@ MainLoop: p.state[instanceID] = st } } - p.saveTaskAndWriteLog(taskItem, &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: task2.StatusStopped, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID), - Timestamp: time.Now().UTC(), - }, "") + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID)) return nil } @@ -779,19 +727,9 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { taskItem.Status = task2.StatusRunning taskItem.StartTimeInMillis = time.Now().UnixMilli() - taskLog := &task2.Log{ - ID: util.GetUUID(), - TaskId: taskItem.ID, - Status: task2.StatusRunning, - Type: taskItem.Metadata.Type, - Config: taskItem.Config, - Result: &task2.LogResult{ - Success: true, - }, - Message: fmt.Sprintf("dispatch task [%s] to instance ", taskItem.ID), - Timestamp: time.Now().UTC(), - } - p.saveTaskAndWriteLog(taskItem, taskLog, "wait_for") + p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{ + Success: true, + }, fmt.Sprintf("dispatch task [%s] to instance ", taskItem.ID)) return nil } @@ -903,32 +841,32 @@ func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error) return p.getTasks(queryDsl) } -func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, logItem *task2.Log, refresh string) { +func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh string, taskResult *task2.TaskResult, message string) { esClient := elastic.GetClient(p.config.Elasticsearch) _, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh) if err != nil { log.Errorf("failed to update task, err: %v", err) } - if logItem != nil { + if message != "" { event.SaveLog(event.Event{ Metadata: event.EventMetadata{ - Category: "migration", + Category: "task", Name: "logging", Datatype: "event", Labels: util.MapStr{ - "task_id": logItem.TaskId, + "task_type": taskItem.Metadata.Type, + "task_id": taskItem.ID, "parent_task_id": taskItem.ParentId, "retry_no": taskItem.RetryTimes, }, }, Fields: util.MapStr{ - "migration": util.MapStr{ + "task": util.MapStr{ "logging": util.MapStr{ - "config": logItem.Config, - "context": logItem.Context, - "status": logItem.Status, - "message": logItem.Message, - "result": logItem.Result, + "config": util.MustToJSON(taskItem.Config), + "status": taskItem.Status, + "message": message, + "result": taskResult, }, }, }, @@ -1046,7 +984,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro Status: task2.StatusReady, StartTimeInMillis: time.Now().UnixMilli(), Metadata: task2.Metadata{ - Type: "pipeline", + Type: "index_migration", Labels: util.MapStr{ "business_id": "index_migration", "source_cluster_id": clusterMigrationTask.Cluster.Source.Id, @@ -1134,7 +1072,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro Runnable: true, Status: task2.StatusReady, Metadata: task2.Metadata{ - Type: "pipeline", + Type: "index_migration", Labels: util.MapStr{ "business_id": "index_migration", "source_cluster_id": clusterMigrationTask.Cluster.Source.Id,