diff --git a/plugin/migration/api.go b/plugin/migration/api.go index ac4ef5ab..34632ca9 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -236,8 +236,8 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) return } - if obj.Status == task2.StatusComplete { - h.WriteError(w, fmt.Sprintf("task [%s] completed, can't start anymore", taskID), http.StatusInternalServerError) + if obj.Metadata.Type != "pipeline" && obj.Status == task2.StatusComplete { + h.WriteError(w, fmt.Sprintf("[%s] task [%s] completed, can't start anymore", obj.Metadata.Type, taskID), http.StatusInternalServerError) return } obj.Status = task2.StatusReady @@ -253,34 +253,23 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request Success: true, }, "task status manually set to ready") - if obj.Metadata.Labels != nil && obj.Metadata.Type == "index_migration" && len(obj.ParentId) > 0 { - //update status of major task to running - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "id": util.MapStr{ - "value": obj.ParentId[0], - }, - }, - }, - }, - }, + // update status of parent task to running + for _, parentTaskID := range obj.ParentId { + parentTask := task2.Task{} + parentTask.ID = parentTaskID + exists, err := orm.Get(&parentTask) + if !exists || err != nil { + h.WriteError(w, fmt.Sprintf("parent task [%s] not found", parentTaskID), http.StatusInternalServerError) + return } - queryDsl := util.MapStr{ - "query": query, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusRunning), - }, - } - - err = orm.UpdateBy(obj, util.MustToJSONBytes(queryDsl)) + parentTask.Status = task2.StatusRunning + err = orm.Update(nil, &parentTask) if err != nil { log.Error(err) h.WriteError(w, err.Error(), http.StatusInternalServerError) return } + migration_util.WriteLog(&parentTask, nil, fmt.Sprintf("child [%s] task [%s] manually started", obj.Metadata.Type, taskID)) } h.WriteJSON(w, util.MapStr{ @@ -788,12 +777,12 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt for _, pipelineTask := range parentIDPipelineTasks[ptask.ID] { if pipelineTask.Metadata.Labels["pipeline_id"] == "es_scroll" { partitionTaskInfo["scroll_task"] = util.MapStr{ - "id": pipelineTask.ID, + "id": pipelineTask.ID, "status": pipelineTask.Status, } - }else if pipelineTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { + } else if pipelineTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { partitionTaskInfo["bulk_task"] = util.MapStr{ - "id": pipelineTask.ID, + "id": pipelineTask.ID, "status": pipelineTask.Status, } } diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 6348360d..0ad15909 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -391,9 +391,6 @@ func (p *DispatcherProcessor) checkScrollPipelineTaskStatus(scrollTask *task2.Ta } func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, totalDocs int64) (bulked bool, successDocs int64, err error) { - if bulkTask.Status == task2.StatusError { - return true, 0, errors.New("bulk pipeline failed") - } // NOTE: old-version pipeline tasks has empty status if bulkTask.Status == "" { return true, 0, errors.New("task was started by an old-version console, need to manually restart it") @@ -409,7 +406,7 @@ func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, } // bulk not finished yet - if bulkTask.Status != task2.StatusComplete { + if bulkTask.Status != task2.StatusComplete && bulkTask.Status != task2.StatusError { return false, 0, nil } @@ -426,6 +423,11 @@ func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, return true, successDocs, fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons) } + // successDocs matched but has errors + if bulkTask.Status == task2.StatusError { + return true, successDocs, nil + } + return true, successDocs, nil } @@ -635,6 +637,8 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error return nil } + taskItem.RetryTimes++ + instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) totalDocs := cfg.Source.DocCount scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs) @@ -645,6 +649,8 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error if scrolled && err == nil { redoScroll = false // reset queue consumer offset + // NOTE: we only trigger this flow when restart index_migration + // Restart bulk task will not reset queue offset err = p.resetGatewayQueue(taskItem) if err != nil { log.Infof("task [%s] failed to reset gateway queue, redo scroll", taskItem.ID) @@ -677,6 +683,7 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error } instanceID = instance.ID + scrollTask.RetryTimes = taskItem.RetryTimes // update instance info first scrollTask.Metadata.Labels["execution_instance_id"] = instanceID // try to clear disk queue before running es_scroll @@ -690,6 +697,7 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error } // update bulk task to init + bulkTask.RetryTimes = taskItem.RetryTimes bulkTask.Metadata.Labels["execution_instance_id"] = instanceID bulkTask.Status = task2.StatusInit err = orm.Update(nil, bulkTask) @@ -698,7 +706,6 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error } // update sub migration task status to running and save task log - taskItem.RetryTimes++ taskItem.Metadata.Labels["execution_instance_id"] = instanceID taskItem.Metadata.Labels["index_docs"] = 0 taskItem.Metadata.Labels["scrolled_docs"] = 0 diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index 03a4cfa3..3925473b 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -61,8 +61,6 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error { return nil } - taskItem.RetryTimes++ - cfg := migration_model.PipelineTaskConfig{} err = migration_util.GetTaskConfig(taskItem, &cfg) if err != nil { @@ -144,14 +142,14 @@ func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error } func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) error { - successDocs, indexDocs, bulked, totalInvalidDocs, totalInvalidReasons, totalFailureDocs, totalFailureReasons, err := p.getBulkIndexingTaskState(taskItem) + successDocs, indexDocs, bulked, totalInvalidDocs, totalInvalidReasons, totalFailureDocs, totalFailureReasons, errs := p.getBulkIndexingTaskState(taskItem) if !bulked { return nil } var errMsg string - if err != nil { - errMsg = err.Error() + if len(errs) > 0 { + errMsg = fmt.Sprintf("bulk finished with error(s): %v", errs) } // TODO: handle multiple run bulk_indexing pipeline tasks and total_docs from index_migration now := time.Now() @@ -184,7 +182,8 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error { return fmt.Errorf("task [%s] has unknown pipeline_id [%s]", taskItem.ID, taskItem.Metadata.Labels["pipeline_id"]) } - hits, err := p.getPipelineLogs(taskItem, []string{"STOPPED"}) + // we only check STOPPED log after the last task status update + hits, err := p.getPipelineLogs(taskItem, []string{"STOPPED"}, taskItem.Updated.UnixMilli()) if err != nil { log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) return nil @@ -290,12 +289,17 @@ func (p *processor) getParentTask(taskItem *task.Task, taskType string) (*task.T } func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int64, totalHits int64, scrolled bool, err error) { - hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}) + hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli()) if err != nil { log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) err = nil return } + if len(hits) == 0 { + log.Debugf("scroll task [%s] not finished yet since last start", taskItem.ID) + return + } + // NOTE: we only check the last run of es_scroll for _, hit := range hits { scrolled = true m := util.MapStr(hit.Source) @@ -317,11 +321,20 @@ func (p *processor) getEsScrollTaskState(taskItem *task.Task) (scrolledDocs int6 return } -func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, err error) { - hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}) +func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs int64, indexDocs int64, bulked bool, totalInvalidDocs []string, totalInvalidReasons []string, totalFailureDocs []string, totalFailureReasons []string, errs []string) { + newHits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli()) if err != nil { - log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) - err = nil + log.Errorf("failed to get latest pipeline logs for task [%s], err: %v", taskItem.ID, err) + return + } + if len(newHits) == 0 { + log.Debugf("bulk task [%s] not finished yet since last start", taskItem.ID) + return + } + + hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0) + if err != nil { + log.Errorf("failed to get all pipeline logs for task [%s], err: %v", taskItem.ID, err) return } @@ -331,8 +344,7 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") if errStr != "" { - err = errors.New(errStr) - return + errs = append(errs, errStr) } var ( @@ -357,8 +369,9 @@ func (p *processor) getBulkIndexingTaskState(taskItem *task.Task) (successDocs i return } -func (p *processor) getPipelineLogs(taskItem *task.Task, status []string) ([]elastic.IndexDocument, error) { +func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]elastic.IndexDocument, error) { query := util.MapStr{ + "size": 999, "sort": []util.MapStr{ { "timestamp": util.MapStr{ @@ -371,9 +384,6 @@ func (p *processor) getPipelineLogs(taskItem *task.Task, status []string) ([]ela }, }, }, - "collapse": util.MapStr{ - "field": "metadata.labels.task_id", - }, "query": util.MapStr{ "bool": util.MapStr{ "must": []util.MapStr{ @@ -394,6 +404,13 @@ func (p *processor) getPipelineLogs(taskItem *task.Task, status []string) ([]ela }, }, }, + { + "range": util.MapStr{ + "timestamp": util.MapStr{ + "gte": timestampGte, + }, + }, + }, }, }, },