[migration] always try to clean disk queues

This commit is contained in:
Kassian Sun 2023-04-11 17:25:28 +08:00
parent dc606e457d
commit db6d33f22b
2 changed files with 45 additions and 65 deletions

View File

@ -89,14 +89,13 @@ type ClusterInfo struct {
}
type TaskCompleteState struct {
IsComplete bool
Error string
ClearPipeline bool
PipelineIds []string
RunningPhase int
TotalDocs int64
SuccessDocs int64
ScrolledDocs int64
IsComplete bool
Error string
PipelineIds []string
RunningPhase int
TotalDocs int64
SuccessDocs int64
ScrolledDocs int64
}
type MajorTaskState struct {

View File

@ -319,7 +319,7 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error
tn := time.Now()
taskItem.CompletedTime = &tn
p.sendMajorTaskNotification(taskItem)
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] completed", taskItem.ID))
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] finished with status [%s]", taskItem.ID, ts.Status))
}
return nil
}
@ -333,35 +333,6 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
if taskItem.Metadata.Labels != nil {
taskItem.Metadata.Labels["index_docs"] = state.SuccessDocs
taskItem.Metadata.Labels["scrolled_docs"] = state.ScrolledDocs
if instanceID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok {
inst := model.Instance{}
inst.ID = instanceID
_, err = orm.Get(&inst)
if err == nil {
for _, pipelineID := range state.PipelineIds {
err = inst.DeletePipeline(pipelineID)
if err != nil {
log.Errorf("delete pipeline failed, err: %v", err)
continue
}
selector := util.MapStr{
"labels": util.MapStr{
"migration_task_id": taskItem.ID,
},
}
//clear queue
err = inst.DeleteQueueBySelector(selector)
if err != nil {
log.Errorf("delete queue failed, err: %v", err)
}
}
}
if st, ok := p.state[instanceID]; ok {
st.Total -= 1
p.state[instanceID] = st
}
}
}
if state.Error != "" && state.TotalDocs != state.SuccessDocs {
taskItem.Status = task2.StatusError
@ -375,6 +346,7 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
Success: state.Error == "",
Error: state.Error,
}, fmt.Sprintf("task [%s] completed", taskItem.ID))
p.cleanGatewayPipelines(taskItem, state.PipelineIds)
} else {
if state.RunningPhase == 1 && taskItem.Metadata.Labels["running_phase"] == float64(1) {
ptasks, err := p.getPipelineTasks(taskItem.ID)
@ -484,38 +456,48 @@ MainLoop:
}
taskItem.Status = task2.StatusStopped
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID))
p.cleanGatewayPipelines(taskItem, taskIDs)
return nil
}
func (p *DispatcherProcessor) cleanGatewayPipelines(taskItem *task2.Task, pipelineIDs []string) {
var err error
//delete pipeline and clear queue
if instanceID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string); ok {
inst := model.Instance{}
inst.ID = instanceID
_, err = orm.Get(&inst)
instanceID, ok := taskItem.Metadata.Labels["execution_instance_id"].(string)
if !ok {
log.Debugf("task %s not scheduled, skip cleaning gateway stuffs", taskItem.ID)
return
}
inst := model.Instance{}
inst.ID = instanceID
_, err = orm.Get(&inst)
if err != nil {
log.Errorf("failed to get instance, err: %v", err)
return
}
for _, pipelineID := range pipelineIDs {
err = inst.DeletePipeline(pipelineID)
if err != nil {
return err
log.Errorf("delete pipeline failed, err: %v", err)
}
for _, pipelineID := range taskIDs {
err = inst.DeletePipeline(pipelineID)
if err != nil {
log.Errorf("failed to delete pipeline, err: %v", err)
continue
}
selector := util.MapStr{
"labels": util.MapStr{
"migration_task_id": taskItem.ID,
},
}
//clear queue
err = inst.DeleteQueueBySelector(selector)
if err != nil {
log.Errorf("failed to delete queue, err: %v", err)
}
selector := util.MapStr{
"labels": util.MapStr{
"migration_task_id": taskItem.ID,
},
}
if st, ok := p.state[instanceID]; ok {
st.Total -= 1
p.state[instanceID] = st
//clear queue
err = inst.DeleteQueueBySelector(selector)
if err != nil {
log.Errorf("failed to delete queue, err: %v", err)
}
}
p.saveTaskAndWriteLog(taskItem, "", nil, fmt.Sprintf("task [%s] stopped", taskItem.ID))
return nil
if st, ok := p.state[instanceID]; ok {
st.Total -= 1
p.state[instanceID] = st
}
}
func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error {
@ -1290,7 +1272,6 @@ func (p *DispatcherProcessor) getTaskCompleteState(subTask *task2.Task) (*TaskCo
if errStr, ok := resultErr.(string); ok && errStr != "" {
state.Error = errStr
state.IsComplete = true
state.ClearPipeline = true
}
if !bulked {
for _, key := range []string{"payload.pipeline.logging.context.bulk_indexing.success.count", "payload.pipeline.logging.context.bulk_indexing.failure.count", "payload.pipeline.logging.context.bulk_indexing.invalid.count"} {