[migration] update logging logic

This commit is contained in:
Kassian Sun 2023-04-03 10:51:07 +08:00
parent 325022cc1e
commit b31bb6c090
3 changed files with 31 additions and 136 deletions

View File

@ -131,7 +131,6 @@ func main() {
orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization")
orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard")
orm.RegisterSchemaWithIndexName(task1.Task{}, "task")
orm.RegisterSchemaWithIndexName(task1.Log{}, "task-log")
orm.RegisterSchemaWithIndexName(model.Layout{}, "layout")
orm.RegisterSchemaWithIndexName(model.Notification{}, "notification")
api.RegisterSchema()

View File

@ -81,7 +81,7 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re
clusterTaskConfig.Cluster.Target.Distribution = dstClusterCfg.Distribution
t := task2.Task{
Metadata: task2.Metadata{
Type: "pipeline",
Type: "cluster_migration",
Labels: util.MapStr{
"business_id": "cluster_migration",
"source_cluster_id": clusterTaskConfig.Cluster.Source.Id,
@ -912,48 +912,6 @@ func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps
h.WriteJSON(w, countRes, http.StatusOK)
}
func (h *APIHandler) getMigrationTaskLog(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id")
query := util.MapStr{
"sort": []util.MapStr{
{
"timestamp": util.MapStr{
"order": "asc",
},
},
},
"query": util.MapStr{
"bool": util.MapStr{
"minimum_should_match": 1,
"should": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": id,
},
},
}, {
"term": util.MapStr{
"id": util.MapStr{
"value": id,
},
},
},
},
},
},
}
q := &orm.Query{
RawQuery: util.MustToJSONBytes(query),
}
err, _ := orm.Search(task2.Log{}, q)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
}
}
func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id")

View File

@ -154,19 +154,10 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
t.Status = task2.StatusError
tn := time.Now()
t.CompletedTime = &tn
p.saveTaskAndWriteLog(&t, &task2.Log{
ID: util.GetUUID(),
TaskId: t.ID,
Status: task2.StatusError,
Type: t.Metadata.Type,
Config: t.Config,
Result: &task2.LogResult{
Success: false,
Error: err.Error(),
},
Message: fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err),
Timestamp: time.Now().UTC(),
}, "")
p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{
Success: false,
Error: err.Error(),
}, fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err))
}
}
//es index refresh
@ -225,21 +216,11 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error {
if err != nil {
return err
}
taskLog := &task2.Log{
ID: util.GetUUID(),
TaskId: taskItem.ID,
Status: task2.StatusRunning,
Type: taskItem.Metadata.Type,
Config: taskItem.Config,
Result: &task2.LogResult{
Success: true,
},
Message: fmt.Sprintf("success to start task [%s]", taskItem.ID),
Timestamp: time.Now().UTC(),
}
taskItem.Status = task2.StatusRunning
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, taskLog, "")
p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{
Success: true,
}, fmt.Sprintf("success to start task [%s]", taskItem.ID))
return nil
}
@ -272,15 +253,7 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e
if len(tasks) == 0 {
taskItem.Status = task2.StatusStopped
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, &task2.Log{
ID: util.GetUUID(),
TaskId: taskItem.ID,
Status: task2.StatusStopped,
Type: taskItem.Metadata.Type,
Config: taskItem.Config,
Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID),
Timestamp: time.Now().UTC(),
}, "")
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID))
}
return nil
}
@ -295,15 +268,7 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error
tn := time.Now()
taskItem.CompletedTime = &tn
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, &task2.Log{
ID: util.GetUUID(),
TaskId: taskItem.ID,
Status: taskItem.Status,
Type: taskItem.Metadata.Type,
Config: taskItem.Config,
Message: fmt.Sprintf("task [%s] is complete", taskItem.ID),
Timestamp: time.Now().UTC(),
}, "")
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is complete", taskItem.ID))
}
return nil
}
@ -355,19 +320,10 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
tn := time.Now()
taskItem.CompletedTime = &tn
p.saveTaskAndWriteLog(taskItem, &task2.Log{
ID: util.GetUUID(),
TaskId: taskItem.ID,
Status: taskItem.Status,
Type: taskItem.Metadata.Type,
Config: taskItem.Config,
Result: &task2.LogResult{
Success: state.Error == "",
Error: state.Error,
},
Message: fmt.Sprintf("task [%s] is complete", taskItem.ID),
Timestamp: time.Now().UTC(),
}, "")
p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{
Success: state.Error == "",
Error: state.Error,
}, fmt.Sprintf("task [%s] is complete", taskItem.ID))
} else {
if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) {
ptasks, err := p.getPipelineTasks(taskItem.ID)
@ -400,7 +356,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
taskItem.Metadata.Labels["running_phase"] = 2
}
}
p.saveTaskAndWriteLog(taskItem, nil, "wait_for")
p.saveTaskAndWriteLog(taskItem, "wait_for", nil, "")
}
}
return nil
@ -519,15 +475,7 @@ MainLoop:
p.state[instanceID] = st
}
}
p.saveTaskAndWriteLog(taskItem, &task2.Log{
ID: util.GetUUID(),
TaskId: taskItem.ID,
Status: task2.StatusStopped,
Type: taskItem.Metadata.Type,
Config: taskItem.Config,
Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID),
Timestamp: time.Now().UTC(),
}, "")
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID))
return nil
}
@ -779,19 +727,9 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
taskItem.Status = task2.StatusRunning
taskItem.StartTimeInMillis = time.Now().UnixMilli()
taskLog := &task2.Log{
ID: util.GetUUID(),
TaskId: taskItem.ID,
Status: task2.StatusRunning,
Type: taskItem.Metadata.Type,
Config: taskItem.Config,
Result: &task2.LogResult{
Success: true,
},
Message: fmt.Sprintf("dispatch task [%s] to instance ", taskItem.ID),
Timestamp: time.Now().UTC(),
}
p.saveTaskAndWriteLog(taskItem, taskLog, "wait_for")
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
Success: true,
}, fmt.Sprintf("dispatch task [%s] to instance ", taskItem.ID))
return nil
}
@ -903,32 +841,32 @@ func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error)
return p.getTasks(queryDsl)
}
func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, logItem *task2.Log, refresh string) {
func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh string, taskResult *task2.TaskResult, message string) {
esClient := elastic.GetClient(p.config.Elasticsearch)
_, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh)
if err != nil {
log.Errorf("failed to update task, err: %v", err)
}
if logItem != nil {
if message != "" {
event.SaveLog(event.Event{
Metadata: event.EventMetadata{
Category: "migration",
Category: "task",
Name: "logging",
Datatype: "event",
Labels: util.MapStr{
"task_id": logItem.TaskId,
"task_type": taskItem.Metadata.Type,
"task_id": taskItem.ID,
"parent_task_id": taskItem.ParentId,
"retry_no": taskItem.RetryTimes,
},
},
Fields: util.MapStr{
"migration": util.MapStr{
"task": util.MapStr{
"logging": util.MapStr{
"config": logItem.Config,
"context": logItem.Context,
"status": logItem.Status,
"message": logItem.Message,
"result": logItem.Result,
"config": util.MustToJSON(taskItem.Config),
"status": taskItem.Status,
"message": message,
"result": taskResult,
},
},
},
@ -1046,7 +984,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
Status: task2.StatusReady,
StartTimeInMillis: time.Now().UnixMilli(),
Metadata: task2.Metadata{
Type: "pipeline",
Type: "index_migration",
Labels: util.MapStr{
"business_id": "index_migration",
"source_cluster_id": clusterMigrationTask.Cluster.Source.Id,
@ -1134,7 +1072,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
Runnable: true,
Status: task2.StatusReady,
Metadata: task2.Metadata{
Type: "pipeline",
Type: "index_migration",
Labels: util.MapStr{
"business_id": "index_migration",
"source_cluster_id": clusterMigrationTask.Cluster.Source.Id,