[migration] improve pending_stop & es error handling
This commit is contained in:
parent
b31bb6c090
commit
d5d46de854
|
@ -7,12 +7,13 @@ package migration
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"infini.sh/framework/core/api/rbac/enum"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"infini.sh/framework/core/api/rbac/enum"
|
||||
|
||||
log "github.com/cihub/seelog"
|
||||
"infini.sh/console/model"
|
||||
"infini.sh/framework/core/api"
|
||||
|
@ -289,62 +290,12 @@ func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Requ
|
|||
}, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
//if task2.IsEnded(obj.Status) {
|
||||
// h.WriteJSON(w, util.MapStr{
|
||||
// "success": true,
|
||||
// }, 200)
|
||||
// return
|
||||
//}
|
||||
//query all pipeline task(scroll/bulk_indexing) and then stop it
|
||||
err = stopPipelineTasks(id)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
if task2.IsEnded(obj.Status) {
|
||||
h.WriteJSON(w, util.MapStr{
|
||||
"success": true,
|
||||
}, 200)
|
||||
return
|
||||
}
|
||||
if obj.Metadata.Labels != nil && obj.Metadata.Labels["business_id"] == "cluster_migration" {
|
||||
//update status of subtask to pending stop
|
||||
query := util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"must": []util.MapStr{
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"parent_id": util.MapStr{
|
||||
"value": obj.ID,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"status": util.MapStr{
|
||||
"value": task2.StatusRunning,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.labels.business_id": util.MapStr{
|
||||
"value": "index_migration",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
queryDsl := util.MapStr{
|
||||
"query": query,
|
||||
"script": util.MapStr{
|
||||
"source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop),
|
||||
},
|
||||
}
|
||||
|
||||
err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl))
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
obj.Status = task2.StatusPendingStop
|
||||
err = orm.Update(nil, &obj)
|
||||
if err != nil {
|
||||
|
@ -987,65 +938,6 @@ func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request,
|
|||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
func stopPipelineTasks(parentID string) error {
|
||||
queryDsl := util.MapStr{
|
||||
"size": 1000,
|
||||
"query": util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"must": []util.MapStr{
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"parent_id": util.MapStr{
|
||||
"value": parentID,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"terms": util.MapStr{
|
||||
"metadata.labels.pipeline_id": []string{"es_scroll", "bulk_indexing"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
q := orm.Query{
|
||||
RawQuery: util.MustToJSONBytes(queryDsl),
|
||||
}
|
||||
err, result := orm.Search(task2.Task{}, &q)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, hit := range result.Result {
|
||||
buf, err := util.ToJSONBytes(hit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tk := task2.Task{}
|
||||
err = util.FromJSONBytes(buf, &tk)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if tk.Metadata.Labels != nil {
|
||||
if instID, ok := tk.Metadata.Labels["execution_instance_id"].(string); ok {
|
||||
inst := model.Instance{}
|
||||
inst.ID = instID
|
||||
_, err = orm.Get(&inst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = inst.StopPipeline(context.Background(), tk.ID)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats MajorTaskState, err error) {
|
||||
taskQuery := util.MapStr{
|
||||
"size": 500,
|
||||
|
|
|
@ -211,20 +211,63 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error {
|
|||
},
|
||||
}
|
||||
|
||||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||
_, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
taskItem.Status = task2.StatusRunning
|
||||
// saved is_split if the following steps failed
|
||||
defer func() {
|
||||
p.sendMajorTaskNotification(taskItem)
|
||||
p.saveTaskAndWriteLog(taskItem, "", &task2.TaskResult{
|
||||
Success: true,
|
||||
}, fmt.Sprintf("success to start task [%s]", taskItem.ID))
|
||||
}()
|
||||
|
||||
esClient := elastic.GetClient(p.config.Elasticsearch)
|
||||
_, err := esClient.UpdateByQuery(p.config.IndexName, util.MustToJSONBytes(queryDsl))
|
||||
if err != nil {
|
||||
log.Errorf("failed to update sub task status, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
taskItem.Status = task2.StatusRunning
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) error {
|
||||
//update status of subtask to pending stop
|
||||
query := util.MapStr{
|
||||
"bool": util.MapStr{
|
||||
"must": []util.MapStr{
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"parent_id": util.MapStr{
|
||||
"value": taskItem.ID,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"terms": util.MapStr{
|
||||
"status": []string{task2.StatusRunning, task2.StatusReady},
|
||||
},
|
||||
},
|
||||
{
|
||||
"term": util.MapStr{
|
||||
"metadata.labels.business_id": util.MapStr{
|
||||
"value": "index_migration",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
queryDsl := util.MapStr{
|
||||
"query": query,
|
||||
"script": util.MapStr{
|
||||
"source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop),
|
||||
},
|
||||
}
|
||||
|
||||
err := orm.UpdateBy(taskItem, util.MustToJSONBytes(queryDsl))
|
||||
if err != nil {
|
||||
log.Errorf("failed to update sub task status, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
//check whether all pipeline task is stopped or not, then update task status
|
||||
q := util.MapStr{
|
||||
"size": 200,
|
||||
|
@ -238,7 +281,7 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e
|
|||
},
|
||||
{
|
||||
"terms": util.MapStr{
|
||||
"status": []string{task2.StatusRunning, task2.StatusPendingStop},
|
||||
"status": []string{task2.StatusRunning, task2.StatusPendingStop, task2.StatusReady},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -247,7 +290,8 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e
|
|||
}
|
||||
tasks, err := p.getTasks(q)
|
||||
if err != nil {
|
||||
return err
|
||||
log.Errorf("failed to get sub tasks, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
// all subtask stopped or error or complete
|
||||
if len(tasks) == 0 {
|
||||
|
@ -328,7 +372,8 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
|
|||
if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) {
|
||||
ptasks, err := p.getPipelineTasks(taskItem.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
log.Errorf("failed to get pipeline tasks, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
var bulkTask *task2.Task
|
||||
for i, t := range ptasks {
|
||||
|
@ -347,10 +392,12 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
|
|||
inst.ID = instID
|
||||
_, err = orm.Get(inst)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get instance, err: %v", err)
|
||||
return err
|
||||
}
|
||||
err = inst.CreatePipeline(util.MustToJSONBytes(bulkTask.Config))
|
||||
if err != nil {
|
||||
log.Errorf("failed to create bulk_indexing pipeline, err: %v", err)
|
||||
return err
|
||||
}
|
||||
taskItem.Metadata.Labels["running_phase"] = 2
|
||||
|
@ -366,6 +413,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err
|
|||
//check whether all pipeline task is stopped or not, then update task status
|
||||
ptasks, err := p.getPipelineTasks(taskItem.ID)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get pipeline tasks, err: %v", err)
|
||||
return err
|
||||
}
|
||||
var taskIDs []string
|
||||
|
@ -393,24 +441,7 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err
|
|||
}
|
||||
searchRes, err := esClient.SearchWithRawQueryDSL(p.config.LogIndexName, util.MustToJSONBytes(q))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(searchRes.Hits.Hits) == 0 {
|
||||
//check instance available
|
||||
if instID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok {
|
||||
inst := model.Instance{}
|
||||
inst.ID = instID
|
||||
_, err = orm.Get(&inst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = inst.TryConnectWithTimeout(time.Second)
|
||||
if err != nil {
|
||||
if errors.Is(err, syscall.ECONNREFUSED) {
|
||||
return fmt.Errorf("stoping task [%s] error: %w", taskItem.ID, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Errorf("failed to get latest pipeline status, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
MainLoop:
|
||||
|
@ -423,7 +454,8 @@ MainLoop:
|
|||
inst.ID = instID
|
||||
_, err = orm.Get(&inst)
|
||||
if err != nil {
|
||||
return err
|
||||
log.Errorf("failed to get instance, err: %v", err)
|
||||
return nil
|
||||
}
|
||||
hasStopped := true
|
||||
for _, pipelineID := range taskIDs {
|
||||
|
@ -492,7 +524,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
|
|||
ptasks, err := p.getPipelineTasks(taskItem.ID)
|
||||
if err != nil {
|
||||
log.Errorf("getPipelineTasks failed, err: %+v", err)
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
for i, t := range ptasks {
|
||||
if t.Metadata.Labels != nil {
|
||||
|
@ -729,7 +761,7 @@ func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
|
|||
|
||||
p.saveTaskAndWriteLog(taskItem, "wait_for", &task2.TaskResult{
|
||||
Success: true,
|
||||
}, fmt.Sprintf("dispatch task [%s] to instance ", taskItem.ID))
|
||||
}, fmt.Sprintf("dispatch task [%s] to instance [%s]", taskItem.ID, instance.ID))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1335,7 +1367,7 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat
|
|||
res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query))
|
||||
if err != nil {
|
||||
log.Errorf("search es failed, err: %v", err)
|
||||
return taskState, err
|
||||
return taskState, nil
|
||||
}
|
||||
if v, ok := res.Aggregations["total_docs"].Value.(float64); ok {
|
||||
taskState.IndexDocs = v
|
||||
|
|
Loading…
Reference in New Issue