[migration] scroll/bulk support manually retry
This commit is contained in:
parent
00c48bf9d5
commit
cbfe890982
|
@ -236,8 +236,8 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request
|
||||||
h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError)
|
h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if obj.Status == task2.StatusComplete {
|
if obj.Metadata.Type != "pipeline" && obj.Status == task2.StatusComplete {
|
||||||
h.WriteError(w, fmt.Sprintf("task [%s] completed, can't start anymore", taskID), http.StatusInternalServerError)
|
h.WriteError(w, fmt.Sprintf("[%s] task [%s] completed, can't start anymore", obj.Metadata.Type, taskID), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
obj.Status = task2.StatusReady
|
obj.Status = task2.StatusReady
|
||||||
|
@ -253,34 +253,23 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request
|
||||||
Success: true,
|
Success: true,
|
||||||
}, "task status manually set to ready")
|
}, "task status manually set to ready")
|
||||||
|
|
||||||
if obj.Metadata.Labels != nil && obj.Metadata.Type == "index_migration" && len(obj.ParentId) > 0 {
|
// update status of parent task to running
|
||||||
//update status of major task to running
|
for _, parentTaskID := range obj.ParentId {
|
||||||
query := util.MapStr{
|
parentTask := task2.Task{}
|
||||||
"bool": util.MapStr{
|
parentTask.ID = parentTaskID
|
||||||
"must": []util.MapStr{
|
exists, err := orm.Get(&parentTask)
|
||||||
{
|
if !exists || err != nil {
|
||||||
"term": util.MapStr{
|
h.WriteError(w, fmt.Sprintf("parent task [%s] not found", parentTaskID), http.StatusInternalServerError)
|
||||||
"id": util.MapStr{
|
return
|
||||||
"value": obj.ParentId[0],
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
queryDsl := util.MapStr{
|
parentTask.Status = task2.StatusRunning
|
||||||
"query": query,
|
err = orm.Update(nil, &parentTask)
|
||||||
"script": util.MapStr{
|
|
||||||
"source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusRunning),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
migration_util.WriteLog(&parentTask, nil, fmt.Sprintf("child [%s] task [%s] manually started", obj.Metadata.Type, taskID))
|
||||||
}
|
}
|
||||||
|
|
||||||
h.WriteJSON(w, util.MapStr{
|
h.WriteJSON(w, util.MapStr{
|
||||||
|
|
|
@ -391,9 +391,6 @@ func (p *DispatcherProcessor) checkScrollPipelineTaskStatus(scrollTask *task2.Ta
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, totalDocs int64) (bulked bool, successDocs int64, err error) {
|
func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, totalDocs int64) (bulked bool, successDocs int64, err error) {
|
||||||
if bulkTask.Status == task2.StatusError {
|
|
||||||
return true, 0, errors.New("bulk pipeline failed")
|
|
||||||
}
|
|
||||||
// NOTE: old-version pipeline tasks has empty status
|
// NOTE: old-version pipeline tasks has empty status
|
||||||
if bulkTask.Status == "" {
|
if bulkTask.Status == "" {
|
||||||
return true, 0, errors.New("task was started by an old-version console, need to manually restart it")
|
return true, 0, errors.New("task was started by an old-version console, need to manually restart it")
|
||||||
|
@ -409,7 +406,7 @@ func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task,
|
||||||
}
|
}
|
||||||
|
|
||||||
// bulk not finished yet
|
// bulk not finished yet
|
||||||
if bulkTask.Status != task2.StatusComplete {
|
if bulkTask.Status != task2.StatusComplete && bulkTask.Status != task2.StatusError {
|
||||||
return false, 0, nil
|
return false, 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -426,6 +423,11 @@ func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task,
|
||||||
return true, successDocs, fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons)
|
return true, successDocs, fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// successDocs matched but has errors
|
||||||
|
if bulkTask.Status == task2.StatusError {
|
||||||
|
return true, successDocs, nil
|
||||||
|
}
|
||||||
|
|
||||||
return true, successDocs, nil
|
return true, successDocs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -635,6 +637,8 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taskItem.RetryTimes++
|
||||||
|
|
||||||
instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
|
instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
|
||||||
totalDocs := cfg.Source.DocCount
|
totalDocs := cfg.Source.DocCount
|
||||||
scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs)
|
scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs)
|
||||||
|
@ -645,6 +649,8 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
|
||||||
if scrolled && err == nil {
|
if scrolled && err == nil {
|
||||||
redoScroll = false
|
redoScroll = false
|
||||||
// reset queue consumer offset
|
// reset queue consumer offset
|
||||||
|
// NOTE: we only trigger this flow when restart index_migration
|
||||||
|
// Restart bulk task will not reset queue offset
|
||||||
err = p.resetGatewayQueue(taskItem)
|
err = p.resetGatewayQueue(taskItem)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Infof("task [%s] failed to reset gateway queue, redo scroll", taskItem.ID)
|
log.Infof("task [%s] failed to reset gateway queue, redo scroll", taskItem.ID)
|
||||||
|
@ -677,6 +683,7 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
|
||||||
}
|
}
|
||||||
instanceID = instance.ID
|
instanceID = instance.ID
|
||||||
|
|
||||||
|
scrollTask.RetryTimes = taskItem.RetryTimes
|
||||||
// update instance info first
|
// update instance info first
|
||||||
scrollTask.Metadata.Labels["execution_instance_id"] = instanceID
|
scrollTask.Metadata.Labels["execution_instance_id"] = instanceID
|
||||||
// try to clear disk queue before running es_scroll
|
// try to clear disk queue before running es_scroll
|
||||||
|
@ -690,6 +697,7 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// update bulk task to init
|
// update bulk task to init
|
||||||
|
bulkTask.RetryTimes = taskItem.RetryTimes
|
||||||
bulkTask.Metadata.Labels["execution_instance_id"] = instanceID
|
bulkTask.Metadata.Labels["execution_instance_id"] = instanceID
|
||||||
bulkTask.Status = task2.StatusInit
|
bulkTask.Status = task2.StatusInit
|
||||||
err = orm.Update(nil, bulkTask)
|
err = orm.Update(nil, bulkTask)
|
||||||
|
@ -698,7 +706,6 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// update sub migration task status to running and save task log
|
// update sub migration task status to running and save task log
|
||||||
taskItem.RetryTimes++
|
|
||||||
taskItem.Metadata.Labels["execution_instance_id"] = instanceID
|
taskItem.Metadata.Labels["execution_instance_id"] = instanceID
|
||||||
taskItem.Metadata.Labels["index_docs"] = 0
|
taskItem.Metadata.Labels["index_docs"] = 0
|
||||||
taskItem.Metadata.Labels["scrolled_docs"] = 0
|
taskItem.Metadata.Labels["scrolled_docs"] = 0
|
||||||
|
|
|
@ -61,8 +61,6 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
taskItem.RetryTimes++
|
|
||||||
|
|
||||||
cfg := migration_model.PipelineTaskConfig{}
|
cfg := migration_model.PipelineTaskConfig{}
|
||||||
err = migration_util.GetTaskConfig(taskItem, &cfg)
|
err = migration_util.GetTaskConfig(taskItem, &cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -144,14 +142,14 @@ func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) error {
|
func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) error {
|
||||||
successDocs, indexDocs, bulked, totalInvalidDocs, totalInvalidReasons, totalFailureDocs, totalFailureReasons, err := p.getBulkIndexingTaskState(taskItem)
|
successDocs, indexDocs, bulked, totalInvalidDocs, totalInvalidReasons, totalFailureDocs, totalFailureReasons, errs := p.getBulkIndexingTaskState(taskItem)
|
||||||
if !bulked {
|
if !bulked {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var errMsg string
|
var errMsg string
|
||||||
if err != nil {
|
if len(errs) > 0 {
|
||||||
errMsg = err.Error()
|
errMsg = fmt.Sprintf("bulk finished with error(s): %v", errs)
|
||||||
}
|
}
|
||||||
// TODO: handle multiple run bulk_indexing pipeline tasks and total_docs from index_migration
|
// TODO: handle multiple run bulk_indexing pipeline tasks and total_docs from index_migration
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
@ -317,11 +315,10 @@ func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int6
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, err error) {
|
func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, errs []string) {
|
||||||
hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"})
|
hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err)
|
log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err)
|
||||||
err = nil
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -331,8 +328,7 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i
|
||||||
|
|
||||||
errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error")
|
errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error")
|
||||||
if errStr != "" {
|
if errStr != "" {
|
||||||
err = errors.New(errStr)
|
errs = append(errs, errStr)
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
Loading…
Reference in New Issue