Merge pull request '[migration] scroll/bulk support manually retry' (#78) from feature/migration into master

This commit is contained in:
silenceqi 2023-04-24 17:27:41 +08:00
commit 9be1f412de
3 changed files with 62 additions and 49 deletions

View File

@ -236,8 +236,8 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request
h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError)
return return
} }
if obj.Status == task2.StatusComplete { if obj.Metadata.Type != "pipeline" && obj.Status == task2.StatusComplete {
h.WriteError(w, fmt.Sprintf("task [%s] completed, can't start anymore", taskID), http.StatusInternalServerError) h.WriteError(w, fmt.Sprintf("[%s] task [%s] completed, can't start anymore", obj.Metadata.Type, taskID), http.StatusInternalServerError)
return return
} }
obj.Status = task2.StatusReady obj.Status = task2.StatusReady
@ -253,34 +253,23 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request
Success: true, Success: true,
}, "task status manually set to ready") }, "task status manually set to ready")
if obj.Metadata.Labels != nil && obj.Metadata.Type == "index_migration" && len(obj.ParentId) > 0 { // update status of parent task to running
//update status of major task to running for _, parentTaskID := range obj.ParentId {
query := util.MapStr{ parentTask := task2.Task{}
"bool": util.MapStr{ parentTask.ID = parentTaskID
"must": []util.MapStr{ exists, err := orm.Get(&parentTask)
{ if !exists || err != nil {
"term": util.MapStr{ h.WriteError(w, fmt.Sprintf("parent task [%s] not found", parentTaskID), http.StatusInternalServerError)
"id": util.MapStr{ return
"value": obj.ParentId[0],
},
},
},
},
},
} }
queryDsl := util.MapStr{ parentTask.Status = task2.StatusRunning
"query": query, err = orm.Update(nil, &parentTask)
"script": util.MapStr{
"source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusRunning),
},
}
err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl))
if err != nil { if err != nil {
log.Error(err) log.Error(err)
h.WriteError(w, err.Error(), http.StatusInternalServerError) h.WriteError(w, err.Error(), http.StatusInternalServerError)
return return
} }
migration_util.WriteLog(&parentTask, nil, fmt.Sprintf("child [%s] task [%s] manually started", obj.Metadata.Type, taskID))
} }
h.WriteJSON(w, util.MapStr{ h.WriteJSON(w, util.MapStr{
@ -788,12 +777,12 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
for _, pipelineTask := range parentIDPipelineTasks[ptask.ID] { for _, pipelineTask := range parentIDPipelineTasks[ptask.ID] {
if pipelineTask.Metadata.Labels["pipeline_id"] == "es_scroll" { if pipelineTask.Metadata.Labels["pipeline_id"] == "es_scroll" {
partitionTaskInfo["scroll_task"] = util.MapStr{ partitionTaskInfo["scroll_task"] = util.MapStr{
"id": pipelineTask.ID, "id": pipelineTask.ID,
"status": pipelineTask.Status, "status": pipelineTask.Status,
} }
}else if pipelineTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { } else if pipelineTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" {
partitionTaskInfo["bulk_task"] = util.MapStr{ partitionTaskInfo["bulk_task"] = util.MapStr{
"id": pipelineTask.ID, "id": pipelineTask.ID,
"status": pipelineTask.Status, "status": pipelineTask.Status,
} }
} }

View File

@ -391,9 +391,6 @@ func (p *DispatcherProcessor) checkScrollPipelineTaskStatus(scrollTask *task2.Ta
} }
func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, totalDocs int64) (bulked bool, successDocs int64, err error) { func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, totalDocs int64) (bulked bool, successDocs int64, err error) {
if bulkTask.Status == task2.StatusError {
return true, 0, errors.New("bulk pipeline failed")
}
// NOTE: old-version pipeline tasks has empty status // NOTE: old-version pipeline tasks has empty status
if bulkTask.Status == "" { if bulkTask.Status == "" {
return true, 0, errors.New("task was started by an old-version console, need to manually restart it") return true, 0, errors.New("task was started by an old-version console, need to manually restart it")
@ -409,7 +406,7 @@ func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task,
} }
// bulk not finished yet // bulk not finished yet
if bulkTask.Status != task2.StatusComplete { if bulkTask.Status != task2.StatusComplete && bulkTask.Status != task2.StatusError {
return false, 0, nil return false, 0, nil
} }
@ -426,6 +423,11 @@ func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task,
return true, successDocs, fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons) return true, successDocs, fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons)
} }
// successDocs matched but has errors
if bulkTask.Status == task2.StatusError {
return true, successDocs, nil
}
return true, successDocs, nil return true, successDocs, nil
} }
@ -635,6 +637,8 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
return nil return nil
} }
taskItem.RetryTimes++
instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
totalDocs := cfg.Source.DocCount totalDocs := cfg.Source.DocCount
scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs) scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs)
@ -645,6 +649,8 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
if scrolled && err == nil { if scrolled && err == nil {
redoScroll = false redoScroll = false
// reset queue consumer offset // reset queue consumer offset
// NOTE: we only trigger this flow when restart index_migration
// Restart bulk task will not reset queue offset
err = p.resetGatewayQueue(taskItem) err = p.resetGatewayQueue(taskItem)
if err != nil { if err != nil {
log.Infof("task [%s] failed to reset gateway queue, redo scroll", taskItem.ID) log.Infof("task [%s] failed to reset gateway queue, redo scroll", taskItem.ID)
@ -677,6 +683,7 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
} }
instanceID = instance.ID instanceID = instance.ID
scrollTask.RetryTimes = taskItem.RetryTimes
// update instance info first // update instance info first
scrollTask.Metadata.Labels["execution_instance_id"] = instanceID scrollTask.Metadata.Labels["execution_instance_id"] = instanceID
// try to clear disk queue before running es_scroll // try to clear disk queue before running es_scroll
@ -690,6 +697,7 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
} }
// update bulk task to init // update bulk task to init
bulkTask.RetryTimes = taskItem.RetryTimes
bulkTask.Metadata.Labels["execution_instance_id"] = instanceID bulkTask.Metadata.Labels["execution_instance_id"] = instanceID
bulkTask.Status = task2.StatusInit bulkTask.Status = task2.StatusInit
err = orm.Update(nil, bulkTask) err = orm.Update(nil, bulkTask)
@ -698,7 +706,6 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
} }
// update sub migration task status to running and save task log // update sub migration task status to running and save task log
taskItem.RetryTimes++
taskItem.Metadata.Labels["execution_instance_id"] = instanceID taskItem.Metadata.Labels["execution_instance_id"] = instanceID
taskItem.Metadata.Labels["index_docs"] = 0 taskItem.Metadata.Labels["index_docs"] = 0
taskItem.Metadata.Labels["scrolled_docs"] = 0 taskItem.Metadata.Labels["scrolled_docs"] = 0

View File

@ -61,8 +61,6 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error {
return nil return nil
} }
taskItem.RetryTimes++
cfg := migration_model.PipelineTaskConfig{} cfg := migration_model.PipelineTaskConfig{}
err = migration_util.GetTaskConfig(taskItem, &cfg) err = migration_util.GetTaskConfig(taskItem, &cfg)
if err != nil { if err != nil {
@ -144,14 +142,14 @@ func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error
} }
func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) error { func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) error {
successDocs, indexDocs, bulked, totalInvalidDocs, totalInvalidReasons, totalFailureDocs, totalFailureReasons, err := p.getBulkIndexingTaskState(taskItem) successDocs, indexDocs, bulked, totalInvalidDocs, totalInvalidReasons, totalFailureDocs, totalFailureReasons, errs := p.getBulkIndexingTaskState(taskItem)
if !bulked { if !bulked {
return nil return nil
} }
var errMsg string var errMsg string
if err != nil { if len(errs) > 0 {
errMsg = err.Error() errMsg = fmt.Sprintf("bulk finished with error(s): %v", errs)
} }
// TODO: handle multiple run bulk_indexing pipeline tasks and total_docs from index_migration // TODO: handle multiple run bulk_indexing pipeline tasks and total_docs from index_migration
now := time.Now() now := time.Now()
@ -184,7 +182,8 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error {
return fmt.Errorf("task [%s] has unknown pipeline_id [%s]", taskItem.ID, taskItem.Metadata.Labels["pipeline_id"]) return fmt.Errorf("task [%s] has unknown pipeline_id [%s]", taskItem.ID, taskItem.Metadata.Labels["pipeline_id"])
} }
hits, err := p.getPipelineLogs(taskItem, []string{"STOPPED"}) // we only check STOPPED log after the last task status update
hits, err := p.getPipelineLogs(taskItem, []string{"STOPPED"}, taskItem.Updated.UnixMilli())
if err != nil { if err != nil {
log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err)
return nil return nil
@ -290,12 +289,17 @@ func (p *processor) getParentTask(taskItem *task.Task, taskType string) (*task.T
} }
func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int64, totalHits int64, scrolled bool, err error) { func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int64, totalHits int64, scrolled bool, err error) {
hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}) hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli())
if err != nil { if err != nil {
log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err)
err = nil err = nil
return return
} }
if len(hits) == 0 {
log.Debugf("scroll task [%s] not finished yet since last start", taskItem.ID)
return
}
// NOTE: we only check the last run of es_scroll
for _, hit := range hits { for _, hit := range hits {
scrolled = true scrolled = true
m := util.MapStr(hit.Source) m := util.MapStr(hit.Source)
@ -317,11 +321,20 @@ func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int6
return return
} }
func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, err error) { func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, errs []string) {
hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}) newHits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli())
if err != nil { if err != nil {
log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) log.Errorf("failed to get latest pipeline logs for task [%s], err: %v", taskItem.ID, err)
err = nil return
}
if len(newHits) == 0 {
log.Debugf("bulk task [%s] not finished yet since last start", taskItem.ID)
return
}
hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0)
if err != nil {
log.Errorf("failed to get all pipeline logs for task [%s], err: %v", taskItem.ID, err)
return return
} }
@ -331,8 +344,7 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i
errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error")
if errStr != "" { if errStr != "" {
err = errors.New(errStr) errs = append(errs, errStr)
return
} }
var ( var (
@ -357,8 +369,9 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i
return return
} }
func (p *processor) getPipelineLogs(taskItem *task.Task, status []string) ([]elastic.IndexDocument, error) { func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]elastic.IndexDocument, error) {
query := util.MapStr{ query := util.MapStr{
"size": 999,
"sort": []util.MapStr{ "sort": []util.MapStr{
{ {
"timestamp": util.MapStr{ "timestamp": util.MapStr{
@ -371,9 +384,6 @@ func (p *processor) getPipelineLogs(taskItem *task.Task, status []string) ([]ela
}, },
}, },
}, },
"collapse": util.MapStr{
"field": "metadata.labels.task_id",
},
"query": util.MapStr{ "query": util.MapStr{
"bool": util.MapStr{ "bool": util.MapStr{
"must": []util.MapStr{ "must": []util.MapStr{
@ -394,6 +404,13 @@ func (p *processor) getPipelineLogs(taskItem *task.Task, status []string) ([]ela
}, },
}, },
}, },
{
"range": util.MapStr{
"timestamp": util.MapStr{
"gte": timestampGte,
},
},
},
}, },
}, },
}, },