Merge branch 'master' of ssh://git.infini.ltd:64221/infini/console

This commit is contained in:
liugq 2023-04-04 11:57:38 +08:00
commit 061b528309
6 changed files with 189 additions and 332 deletions

View File

@ -131,7 +131,6 @@ func main() {
orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization") orm.RegisterSchemaWithIndexName(insight.Visualization{}, "visualization")
orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard") orm.RegisterSchemaWithIndexName(insight.Dashboard{}, "dashboard")
orm.RegisterSchemaWithIndexName(task1.Task{}, "task") orm.RegisterSchemaWithIndexName(task1.Task{}, "task")
orm.RegisterSchemaWithIndexName(task1.Log{}, "task-log")
orm.RegisterSchemaWithIndexName(model.Layout{}, "layout") orm.RegisterSchemaWithIndexName(model.Layout{}, "layout")
orm.RegisterSchemaWithIndexName(model.Notification{}, "notification") orm.RegisterSchemaWithIndexName(model.Notification{}, "notification")
api.RegisterSchema() api.RegisterSchema()

View File

@ -7,33 +7,33 @@ import (
type NotificationType string type NotificationType string
const ( const (
NotificationTypeNotification NotificationType = "NOTIFICATION" NotificationTypeNotification NotificationType = "notification"
NotificationTypeTodo NotificationType = "TODO" NotificationTypeTodo NotificationType = "todo"
) )
type MessageType string type MessageType string
const ( const (
MessageTypeNews MessageType = "NEWS" MessageTypeNews MessageType = "news"
MessageTypeAlerting MessageType = "ALERTING" MessageTypeAlerting MessageType = "alerting"
MessageTypeMigration MessageType = "MIGRATION" MessageTypeMigration MessageType = "migration"
) )
type NotificationStatus string type NotificationStatus string
const ( const (
NotificationStatusNew NotificationStatus = "NEW" NotificationStatusNew NotificationStatus = "new"
NotificationStatusRead NotificationStatus = "READ" NotificationStatusRead NotificationStatus = "read"
) )
type Notification struct { type Notification struct {
orm.ORMObjectBase orm.ORMObjectBase
UserId string `json:"user_id,omitempty" elastic_mapping:"user_id: { type: keyword }"` UserId string `json:"user_id,omitempty" elastic_mapping:"user_id: { type: keyword }"`
NotificationType NotificationType `json:"notification_type,omitempty" elastic_mapping:"notification_type:{type:keyword,fields:{text: {type: text}}}"` Type NotificationType `json:"type,omitempty" elastic_mapping:"type:{type:keyword,fields:{text: {type: text}}}"`
MessageType MessageType `json:"message_type,omitempty" elastic_mapping:"message_type:{type:keyword,fields:{text: {type: text}}}"` MessageType MessageType `json:"message_type,omitempty" elastic_mapping:"message_type:{type:keyword,fields:{text: {type: text}}}"`
Status NotificationStatus `json:"status,omitempty" elastic_mapping:"status: { type: keyword }"` Status NotificationStatus `json:"status,omitempty" elastic_mapping:"status: { type: keyword }"`
Title string `json:"title,omitempty" elastic_mapping:"title: { type: keyword }"` Title string `json:"title,omitempty" elastic_mapping:"title: { type: keyword }"`
Body string `json:"body,omitempty" elastic_mapping:"body: { type: keyword }"` Body string `json:"body,omitempty" elastic_mapping:"body: { type: keyword }"`
Link string `json:"link,omitempty" elastic_mapping:"link: { type: keyword }"` Link string `json:"link,omitempty" elastic_mapping:"link: { type: keyword }"`
} }

View File

@ -10,6 +10,6 @@ type NotificationAPI struct {
func InitAPI() { func InitAPI() {
notification := NotificationAPI{} notification := NotificationAPI{}
api.HandleAPIMethod(api.GET, "/notification/_search", notification.RequireLogin(notification.listNotifications)) api.HandleAPIMethod(api.POST, "/notification/_search", notification.RequireLogin(notification.listNotifications))
api.HandleAPIMethod(api.POST, "/notification/read", notification.RequireLogin(notification.setNotificationsRead)) api.HandleAPIMethod(api.POST, "/notification/read", notification.RequireLogin(notification.setNotificationsRead))
} }

View File

@ -4,7 +4,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
"strconv"
"time" "time"
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
@ -15,6 +14,13 @@ import (
"infini.sh/framework/core/util" "infini.sh/framework/core/util"
) )
type ListNotificationsRequest struct {
From int `json:"from"`
Size int `json:"size"`
Status []model.NotificationStatus `json:"status"`
Types []model.NotificationType `json:"types"`
}
func (h *NotificationAPI) listNotifications(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *NotificationAPI) listNotifications(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
user, err := rbac.FromUserContext(req.Context()) user, err := rbac.FromUserContext(req.Context())
if err != nil { if err != nil {
@ -29,34 +35,40 @@ func (h *NotificationAPI) listNotifications(w http.ResponseWriter, req *http.Req
return return
} }
var ( var reqData = ListNotificationsRequest{
queryDSL = `{ From: 0,
"sort": [ Size: 20,
{ "created": {"order": "desc"} } Status: []model.NotificationStatus{model.NotificationStatusNew},
], Types: []model.NotificationType{model.NotificationTypeNotification},
"query": {
"bool": { "must": [
{ "term": {"user_id": { "value": "%s" } } },
{ "term": {"status": { "value": "%s" } } }
] }
},
"size": %d, "from": %d
}`
strSize = h.GetParameterOrDefault(req, "size", "20")
strFrom = h.GetParameterOrDefault(req, "from", "0")
)
size, _ := strconv.Atoi(strSize)
if size <= 0 {
size = 20
} }
from, _ := strconv.Atoi(strFrom) err = h.DecodeJSON(req, &reqData)
if from < 0 { if err != nil {
from = 0 log.Error("failed to parse request: ", err)
h.WriteError(w, err.Error(), http.StatusBadRequest)
return
} }
var (
queryDSL = util.MapStr{
"sort": []util.MapStr{
{"created": util.MapStr{"order": "desc"}},
},
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{"term": util.MapStr{"user_id": util.MapStr{"value": user.UserId}}},
{"terms": util.MapStr{"status": reqData.Status}},
{"terms": util.MapStr{"type": reqData.Types}},
},
},
},
"size": reqData.Size, "from": reqData.From,
}
)
q := orm.Query{} q := orm.Query{}
queryDSL = fmt.Sprintf(queryDSL, user.UserId, model.NotificationStatusNew, size, from) log.Infof(util.MustToJSON(queryDSL))
q.RawQuery = util.UnsafeStringToBytes(queryDSL) q.RawQuery = util.MustToJSONBytes(queryDSL)
err, res := orm.Search(&model.Notification{}, &q) err, res := orm.Search(&model.Notification{}, &q)
if err != nil { if err != nil {
@ -69,7 +81,8 @@ func (h *NotificationAPI) listNotifications(w http.ResponseWriter, req *http.Req
} }
type SetNotificationsReadRequest struct { type SetNotificationsReadRequest struct {
Ids []string `json:"ids"` Ids []string `json:"ids"`
Types []model.NotificationType `json:"types"`
} }
func (h *NotificationAPI) setNotificationsRead(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *NotificationAPI) setNotificationsRead(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
@ -98,20 +111,45 @@ func (h *NotificationAPI) setNotificationsRead(w http.ResponseWriter, req *http.
queryDsl := util.MapStr{ queryDsl := util.MapStr{
"query": util.MapStr{ "query": util.MapStr{
"bool": util.MapStr{ "bool": util.MapStr{
"must": []util.MapStr{ "should": []util.MapStr{
{ util.MapStr{
"terms": util.MapStr{ "bool": util.MapStr{
"_id": reqData.Ids, "must": []util.MapStr{
{
"terms": util.MapStr{
"_id": reqData.Ids,
},
},
{
"term": util.MapStr{
"status": util.MapStr{
"value": model.NotificationStatusNew,
},
},
},
},
}, },
}, },
{ util.MapStr{
"term": util.MapStr{ "bool": util.MapStr{
"status": util.MapStr{ "must": []util.MapStr{
"value": model.NotificationStatusNew, {
"terms": util.MapStr{
"notification_type": reqData.Types,
},
},
{
"term": util.MapStr{
"status": util.MapStr{
"value": model.NotificationStatusNew,
},
},
},
}, },
}, },
}, },
}, },
"minimum_should_match": 1,
}, },
}, },
"script": util.MapStr{ "script": util.MapStr{

View File

@ -7,12 +7,13 @@ package migration
import ( import (
"context" "context"
"fmt" "fmt"
"infini.sh/framework/core/api/rbac/enum"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"infini.sh/framework/core/api/rbac/enum"
log "github.com/cihub/seelog" log "github.com/cihub/seelog"
"infini.sh/console/model" "infini.sh/console/model"
"infini.sh/framework/core/api" "infini.sh/framework/core/api"
@ -81,7 +82,7 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re
clusterTaskConfig.Cluster.Target.Distribution = dstClusterCfg.Distribution clusterTaskConfig.Cluster.Target.Distribution = dstClusterCfg.Distribution
t := task2.Task{ t := task2.Task{
Metadata: task2.Metadata{ Metadata: task2.Metadata{
Type: "pipeline", Type: "cluster_migration",
Labels: util.MapStr{ Labels: util.MapStr{
"business_id": "cluster_migration", "business_id": "cluster_migration",
"source_cluster_id": clusterTaskConfig.Cluster.Source.Id, "source_cluster_id": clusterTaskConfig.Cluster.Source.Id,
@ -289,62 +290,12 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ
}, http.StatusNotFound) }, http.StatusNotFound)
return return
} }
//if task2.IsEnded(obj.Status) { if task2.IsEnded(obj.Status) {
// h.WriteJSON(w, util.MapStr{ h.WriteJSON(w, util.MapStr{
// "success": true, "success": true,
// }, 200) }, 200)
// return
//}
//query all pipeline task(scroll/bulk_indexing) and then stop it
err = stopPipelineTasks(id)
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return return
} }
if obj.Metadata.Labels != nil && obj.Metadata.Labels["business_id"] == "cluster_migration" {
//update status of subtask to pending stop
query := util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": obj.ID,
},
},
},
{
"term": util.MapStr{
"status": util.MapStr{
"value": task2.StatusRunning,
},
},
},
{
"term": util.MapStr{
"metadata.labels.business_id": util.MapStr{
"value": "index_migration",
},
},
},
},
},
}
queryDsl := util.MapStr{
"query": query,
"script": util.MapStr{
"source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop),
},
}
err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl))
if err != nil {
log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError)
return
}
}
obj.Status = task2.StatusPendingStop obj.Status = task2.StatusPendingStop
err = orm.Update(nil, &obj) err = orm.Update(nil, &obj)
if err != nil { if err != nil {
@ -912,48 +863,6 @@ func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps
h.WriteJSON(w, countRes, http.StatusOK) h.WriteJSON(w, countRes, http.StatusOK)
} }
func (h *APIHandler) getMigrationTaskLog(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id")
query := util.MapStr{
"sort": []util.MapStr{
{
"timestamp": util.MapStr{
"order": "asc",
},
},
},
"query": util.MapStr{
"bool": util.MapStr{
"minimum_should_match": 1,
"should": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": id,
},
},
}, {
"term": util.MapStr{
"id": util.MapStr{
"value": id,
},
},
},
},
},
},
}
q := &orm.Query{
RawQuery: util.MustToJSONBytes(query),
}
err, _ := orm.Search(task2.Log{}, q)
if err != nil {
h.WriteError(w, err.Error(), http.StatusInternalServerError)
log.Error(err)
}
}
func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { func (h *APIHandler) updateDataMigrationTaskStatus(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
id := ps.MustGetParameter("task_id") id := ps.MustGetParameter("task_id")
@ -1029,65 +938,6 @@ func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request,
}, http.StatusOK) }, http.StatusOK)
} }
func stopPipelineTasks(parentID string) error {
queryDsl := util.MapStr{
"size": 1000,
"query": util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": parentID,
},
},
},
{
"terms": util.MapStr{
"metadata.labels.pipeline_id": []string{"es_scroll", "bulk_indexing"},
},
},
},
},
},
}
q := orm.Query{
RawQuery: util.MustToJSONBytes(queryDsl),
}
err, result := orm.Search(task2.Task{}, &q)
if err != nil {
return err
}
for _, hit := range result.Result {
buf, err := util.ToJSONBytes(hit)
if err != nil {
return err
}
tk := task2.Task{}
err = util.FromJSONBytes(buf, &tk)
if err != nil {
return err
}
if tk.Metadata.Labels != nil {
if instID, ok := tk.Metadata.Labels["execution_instance_id"].(string); ok {
inst := model.Instance{}
inst.ID = instID
_, err = orm.Get(&inst)
if err != nil {
return err
}
err = inst.StopPipeline(context.Background(), tk.ID)
if err != nil {
log.Error(err)
continue
}
}
}
}
return nil
}
func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats MajorTaskState, err error) { func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats MajorTaskState, err error) {
taskQuery := util.MapStr{ taskQuery := util.MapStr{
"size": 500, "size": 500,

View File

@ -154,19 +154,10 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
t.Status = task2.StatusError t.Status = task2.StatusError
tn := time.Now() tn := time.Now()
t.CompletedTime = &tn t.CompletedTime = &tn
p.saveTaskAndWriteLog(&t, &task2.Log{ p.saveTaskAndWriteLog(&t, "", &task2.TaskResult{
ID: util.GetUUID(), Success: false,
TaskId: t.ID, Error: err.Error(),
Status: task2.StatusError, }, fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err))
Type: t.Metadata.Type,
Config: t.Config,
Result: &task2.LogResult{
Success: false,
Error: err.Error(),
},
Message: fmt.Sprintf("failed to handling task [%s]: [%v]", t.ID, err),
Timestamp: time.Now().UTC(),
}, "")
} }
} }
//es index refresh //es index refresh
@ -220,30 +211,63 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error {
}, },
} }
// saved is_split if the following steps failed
defer func() {
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{
Success: true,
}, fmt.Sprintf("success to start task [%s]", taskItem.ID))
}()
esClient := elastic.GetClient(p.config.Elasticsearch) esClient := elastic.GetClient(p.config.Elasticsearch)
_, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl)) _, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl))
if err != nil { if err != nil {
return err log.Errorf("failed to update sub task status, err: %v", err)
} return nil
taskLog := &task2.Log{
ID: util.GetUUID(),
TaskId: taskItem.ID,
Status: task2.StatusRunning,
Type: taskItem.Metadata.Type,
Config: taskItem.Config,
Result: &task2.LogResult{
Success: true,
},
Message: fmt.Sprintf("success to start task [%s]", taskItem.ID),
Timestamp: time.Now().UTC(),
} }
taskItem.Status = task2.StatusRunning taskItem.Status = task2.StatusRunning
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, taskLog, "")
return nil return nil
} }
func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) error { func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) error {
//update status of subtask to pending stop
query := util.MapStr{
"bool": util.MapStr{
"must": []util.MapStr{
{
"term": util.MapStr{
"parent_id": util.MapStr{
"value": taskItem.ID,
},
},
},
{
"terms": util.MapStr{
"status": []string{task2.StatusRunning, task2.StatusReady},
},
},
{
"term": util.MapStr{
"metadata.labels.business_id": util.MapStr{
"value": "index_migration",
},
},
},
},
},
}
queryDsl := util.MapStr{
"query": query,
"script": util.MapStr{
"source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop),
},
}
err := orm.UpdateBy(taskItem, util.MustToJSONBytes(queryDsl))
if err != nil {
log.Errorf("failed to update sub task status, err: %v", err)
return nil
}
//check whether all pipeline task is stopped or not, then update task status //check whether all pipeline task is stopped or not, then update task status
q := util.MapStr{ q := util.MapStr{
"size": 200, "size": 200,
@ -257,7 +281,7 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e
}, },
{ {
"terms": util.MapStr{ "terms": util.MapStr{
"status": []string{task2.StatusRunning, task2.StatusPendingStop}, "status": []string{task2.StatusRunning, task2.StatusPendingStop, task2.StatusReady},
}, },
}, },
}, },
@ -266,21 +290,14 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e
} }
tasks, err := p.getTasks(q) tasks, err := p.getTasks(q)
if err != nil { if err != nil {
return err log.Errorf("failed to get sub tasks, err: %v", err)
return nil
} }
// all subtask stopped or error or complete // all subtask stopped or error or complete
if len(tasks) == 0 { if len(tasks) == 0 {
taskItem.Status = task2.StatusStopped taskItem.Status = task2.StatusStopped
p.sendMajorTaskNotification(taskItem) p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, &task2.Log{ p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID))
ID: util.GetUUID(),
TaskId: taskItem.ID,
Status: task2.StatusStopped,
Type: taskItem.Metadata.Type,
Config: taskItem.Config,
Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID),
Timestamp: time.Now().UTC(),
}, "")
} }
return nil return nil
} }
@ -295,15 +312,7 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error
tn := time.Now() tn := time.Now()
taskItem.CompletedTime = &tn taskItem.CompletedTime = &tn
p.sendMajorTaskNotification(taskItem) p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, &task2.Log{ p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is complete", taskItem.ID))
ID: util.GetUUID(),
TaskId: taskItem.ID,
Status: taskItem.Status,
Type: taskItem.Metadata.Type,
Config: taskItem.Config,
Message: fmt.Sprintf("task [%s] is complete", taskItem.ID),
Timestamp: time.Now().UTC(),
}, "")
} }
return nil return nil
} }
@ -355,24 +364,16 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
tn := time.Now() tn := time.Now()
taskItem.CompletedTime = &tn taskItem.CompletedTime = &tn
p.saveTaskAndWriteLog(taskItem, &task2.Log{ p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{
ID: util.GetUUID(), Success: state.Error == "",
TaskId: taskItem.ID, Error: state.Error,
Status: taskItem.Status, }, fmt.Sprintf("task [%s] is complete", taskItem.ID))
Type: taskItem.Metadata.Type,
Config: taskItem.Config,
Result: &task2.LogResult{
Success: state.Error == "",
Error: state.Error,
},
Message: fmt.Sprintf("task [%s] is complete", taskItem.ID),
Timestamp: time.Now().UTC(),
}, "")
} else { } else {
if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) { if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) {
ptasks, err := p.getPipelineTasks(taskItem.ID) ptasks, err := p.getPipelineTasks(taskItem.ID)
if err != nil { if err != nil {
return err log.Errorf("failed to get pipeline tasks, err: %v", err)
return nil
} }
var bulkTask *task2.Task var bulkTask *task2.Task
for i, t := range ptasks { for i, t := range ptasks {
@ -391,16 +392,18 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
inst.ID = instID inst.ID = instID
_, err = orm.Get(inst) _, err = orm.Get(inst)
if err != nil { if err != nil {
log.Errorf("failed to get instance, err: %v", err)
return err return err
} }
err = inst.CreatePipeline(util.MustToJSONBytes(bulkTask.Config)) err = inst.CreatePipeline(util.MustToJSONBytes(bulkTask.Config))
if err != nil { if err != nil {
log.Errorf("failed to create bulk_indexing pipeline, err: %v", err)
return err return err
} }
taskItem.Metadata.Labels["running_phase"] = 2 taskItem.Metadata.Labels["running_phase"] = 2
} }
} }
p.saveTaskAndWriteLog(taskItem, nil, "wait_for") p.saveTaskAndWriteLog(taskItem, "wait_for", nil, "")
} }
} }
return nil return nil
@ -410,6 +413,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err
//check whether all pipeline task is stopped or not, then update task status //check whether all pipeline task is stopped or not, then update task status
ptasks, err := p.getPipelineTasks(taskItem.ID) ptasks, err := p.getPipelineTasks(taskItem.ID)
if err != nil { if err != nil {
log.Errorf("failed to get pipeline tasks, err: %v", err)
return err return err
} }
var taskIDs []string var taskIDs []string
@ -437,24 +441,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err
} }
searchRes, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(q)) searchRes, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(q))
if err != nil { if err != nil {
return err log.Errorf("failed to get latest pipeline status, err: %v", err)
}
if len(searchRes.Hits.Hits) == 0 {
//check instance available
if instID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok {
inst := model.Instance{}
inst.ID = instID
_, err = orm.Get(&inst)
if err != nil {
return err
}
err = inst.TryConnectWithTimeout(time.Second)
if err != nil {
if errors.Is(err, syscall.ECONNREFUSED) {
return fmt.Errorf("stoping task [%s] error: %w", taskItem.ID, err)
}
}
}
return nil return nil
} }
MainLoop: MainLoop:
@ -467,7 +454,8 @@ MainLoop:
inst.ID = instID inst.ID = instID
_, err = orm.Get(&inst) _, err = orm.Get(&inst)
if err != nil { if err != nil {
return err log.Errorf("failed to get instance, err: %v", err)
return nil
} }
hasStopped := true hasStopped := true
for _, pipelineID := range taskIDs { for _, pipelineID := range taskIDs {
@ -519,15 +507,7 @@ MainLoop:
p.state[instanceID] = st p.state[instanceID] = st
} }
} }
p.saveTaskAndWriteLog(taskItem, &task2.Log{ p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] is stopped", taskItem.ID))
ID: util.GetUUID(),
TaskId: taskItem.ID,
Status: task2.StatusStopped,
Type: taskItem.Metadata.Type,
Config: taskItem.Config,
Message: fmt.Sprintf("task [%s] is stopped", taskItem.ID),
Timestamp: time.Now().UTC(),
}, "")
return nil return nil
} }
@ -544,7 +524,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
ptasks, err := p.getPipelineTasks(taskItem.ID) ptasks, err := p.getPipelineTasks(taskItem.ID)
if err != nil { if err != nil {
log.Errorf("getPipelineTasks failed, err: %+v", err) log.Errorf("getPipelineTasks failed, err: %+v", err)
return err return nil
} }
for i, t := range ptasks { for i, t := range ptasks {
if t.Metadata.Labels != nil { if t.Metadata.Labels != nil {
@ -779,19 +759,9 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
taskItem.Status = task2.StatusRunning taskItem.Status = task2.StatusRunning
taskItem.StartTimeInMillis = time.Now().UnixMilli() taskItem.StartTimeInMillis = time.Now().UnixMilli()
taskLog := &task2.Log{ p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
ID: util.GetUUID(), Success: true,
TaskId: taskItem.ID, }, fmt.Sprintf("dispatch task [%s] to instance [%s]", taskItem.ID, instance.ID))
Status: task2.StatusRunning,
Type: taskItem.Metadata.Type,
Config: taskItem.Config,
Result: &task2.LogResult{
Success: true,
},
Message: fmt.Sprintf("dispatch task [%s] to instance ", taskItem.ID),
Timestamp: time.Now().UTC(),
}
p.saveTaskAndWriteLog(taskItem, taskLog, "wait_for")
return nil return nil
} }
@ -903,32 +873,32 @@ func (p *DispatcherProcessor) getMigrationTasks(size int) ([]task2.Task, error)
return p.getTasks(queryDsl) return p.getTasks(queryDsl)
} }
func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, logItem *task2.Log, refresh string) { func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, refresh string, taskResult *task2.TaskResult, message string) {
esClient := elastic.GetClient(p.config.Elasticsearch) esClient := elastic.GetClient(p.config.Elasticsearch)
_, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh) _, err := esClient.Index(p.config.IndexName, "", taskItem.ID, taskItem, refresh)
if err != nil { if err != nil {
log.Errorf("failed to update task, err: %v", err) log.Errorf("failed to update task, err: %v", err)
} }
if logItem != nil { if message != "" {
event.SaveLog(event.Event{ event.SaveLog(event.Event{
Metadata: event.EventMetadata{ Metadata: event.EventMetadata{
Category: "migration", Category: "task",
Name: "logging", Name: "logging",
Datatype: "event", Datatype: "event",
Labels: util.MapStr{ Labels: util.MapStr{
"task_id": logItem.TaskId, "task_type": taskItem.Metadata.Type,
"task_id": taskItem.ID,
"parent_task_id": taskItem.ParentId, "parent_task_id": taskItem.ParentId,
"retry_no": taskItem.RetryTimes, "retry_no": taskItem.RetryTimes,
}, },
}, },
Fields: util.MapStr{ Fields: util.MapStr{
"migration": util.MapStr{ "task": util.MapStr{
"logging": util.MapStr{ "logging": util.MapStr{
"config": logItem.Config, "config": util.MustToJSON(taskItem.Config),
"context": logItem.Context, "status": taskItem.Status,
"status": logItem.Status, "message": message,
"message": logItem.Message, "result": taskResult,
"result": logItem.Result,
}, },
}, },
}, },
@ -1046,7 +1016,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
Status: task2.StatusReady, Status: task2.StatusReady,
StartTimeInMillis: time.Now().UnixMilli(), StartTimeInMillis: time.Now().UnixMilli(),
Metadata: task2.Metadata{ Metadata: task2.Metadata{
Type: "pipeline", Type: "index_migration",
Labels: util.MapStr{ Labels: util.MapStr{
"business_id": "index_migration", "business_id": "index_migration",
"source_cluster_id": clusterMigrationTask.Cluster.Source.Id, "source_cluster_id": clusterMigrationTask.Cluster.Source.Id,
@ -1134,7 +1104,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
Runnable: true, Runnable: true,
Status: task2.StatusReady, Status: task2.StatusReady,
Metadata: task2.Metadata{ Metadata: task2.Metadata{
Type: "pipeline", Type: "index_migration",
Labels: util.MapStr{ Labels: util.MapStr{
"business_id": "index_migration", "business_id": "index_migration",
"source_cluster_id": clusterMigrationTask.Cluster.Source.Id, "source_cluster_id": clusterMigrationTask.Cluster.Source.Id,
@ -1397,7 +1367,7 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat
res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query))
if err != nil { if err != nil {
log.Errorf("search es failed, err: %v", err) log.Errorf("search es failed, err: %v", err)
return taskState, err return taskState, nil
} }
if v, ok := res.Aggregations["total_docs"].Value.(float64); ok { if v, ok := res.Aggregations["total_docs"].Value.(float64); ok {
taskState.IndexDocs = v taskState.IndexDocs = v
@ -1502,13 +1472,13 @@ func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) {
return return
} }
notification := &model.Notification{ notification := &model.Notification{
UserId: util.ToString(creatorID), UserId: util.ToString(creatorID),
NotificationType: model.NotificationTypeNotification, Type: model.NotificationTypeNotification,
MessageType: model.MessageTypeMigration, MessageType: model.MessageTypeMigration,
Status: model.NotificationStatusNew, Status: model.NotificationStatusNew,
Title: title, Title: title,
Body: body, Body: body,
Link: link, Link: link,
} }
err = orm.Create(nil, notification) err = orm.Create(nil, notification)
if err != nil { if err != nil {