diff --git a/plugin/migration/model.go b/plugin/migration/model.go index 695bb798..26330703 100644 --- a/plugin/migration/model.go +++ b/plugin/migration/model.go @@ -89,14 +89,13 @@ type ClusterInfo struct { } type TaskCompleteState struct { - IsComplete bool - Error string - ClearPipeline bool - PipelineIds []string - RunningPhase int - TotalDocs int64 - SuccessDocs int64 - ScrolledDocs int64 + IsComplete bool + Error string + PipelineIds []string + RunningPhase int + TotalDocs int64 + SuccessDocs int64 + ScrolledDocs int64 } type MajorTaskState struct { diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index a4503d83..31c0bce0 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -319,7 +319,7 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error tn := time.Now() taskItem.CompletedTime = &tn p.sendMajorTaskNotification(taskItem) - p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] completed", taskItem.ID)) + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] finished with status [%s]", taskItem.ID, ts.Status)) } return nil } @@ -333,35 +333,6 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { if taskItem.Metadata.Labels != nil { taskItem.Metadata.Labels["index_docs"] = state.SuccessDocs taskItem.Metadata.Labels["scrolled_docs"] = state.ScrolledDocs - if instanceID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { - inst := model.Instance{} - inst.ID = instanceID - _, err = orm.Get(&inst) - if err == nil { - for _, pipelineID := range state.PipelineIds { - err = inst.DeletePipeline(pipelineID) - if err != nil { - log.Errorf("delete pipeline failed, err: %v", err) - continue - } - selector := util.MapStr{ - "labels": util.MapStr{ - "migration_task_id": taskItem.ID, - }, - } - //clear queue - err = inst.DeleteQueueBySelector(selector) - if err != nil { - log.Errorf("delete queue failed, err: %v", err) - } - } - } - if st, ok := p.state[instanceID]; ok { - st.Total -= 1 - p.state[instanceID] = st - } - } - } if state.Error != "" && state.TotalDocs != state.SuccessDocs { taskItem.Status = task2.StatusError @@ -375,6 +346,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { Success: state.Error == "", Error: state.Error, }, fmt.Sprintf("task [%s] completed", taskItem.ID)) + p.cleanGatewayPipelines(taskItem, state.PipelineIds) } else { if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) { ptasks, err := p.getPipelineTasks(taskItem.ID) @@ -484,38 +456,48 @@ MainLoop: } taskItem.Status = task2.StatusStopped + p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) + p.cleanGatewayPipelines(taskItem, taskIDs) + return nil +} + +func (p *DispatcherProcessor) cleanGatewayPipelines(taskItem *task2.Task, pipelineIDs []string) { + var err error //delete pipeline and clear queue - if instanceID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok { - inst := model.Instance{} - inst.ID = instanceID - _, err = orm.Get(&inst) + instanceID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string) + if !ok { + log.Debugf("task %s not scheduled, skip cleaning gateway stuffs", taskItem.ID) + return + } + + inst := model.Instance{} + inst.ID = instanceID + _, err = orm.Get(&inst) + if err != nil { + log.Errorf("failed to get instance, err: %v", err) + return + } + + for _, pipelineID := range pipelineIDs { + err = inst.DeletePipeline(pipelineID) if err != nil { - return err + log.Errorf("delete pipeline failed, err: %v", err) } - for _, pipelineID := range taskIDs { - err = inst.DeletePipeline(pipelineID) - if err != nil { - log.Errorf("failed to delete pipeline, err: %v", err) - continue - } - selector := util.MapStr{ - "labels": util.MapStr{ - "migration_task_id": taskItem.ID, - }, - } - //clear queue - err = inst.DeleteQueueBySelector(selector) - if err != nil { - log.Errorf("failed to delete queue, err: %v", err) - } + selector := util.MapStr{ + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, } - if st, ok := p.state[instanceID]; ok { - st.Total -= 1 - p.state[instanceID] = st + //clear queue + err = inst.DeleteQueueBySelector(selector) + if err != nil { + log.Errorf("failed to delete queue, err: %v", err) } } - p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) - return nil + if st, ok := p.state[instanceID]; ok { + st.Total -= 1 + p.state[instanceID] = st + } } func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { @@ -1290,7 +1272,6 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo if errStr, ok := resultErr.(string); ok && errStr != "" { state.Error = errStr state.IsComplete = true - state.ClearPipeline = true } if !bulked { for _, key := range []string{"payload.pipeline.logging.context.bulk_indexing.success.count", "payload.pipeline.logging.context.bulk_indexing.failure.count", "payload.pipeline.logging.context.bulk_indexing.invalid.count"} {