39 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			39 lines
		
	
	
		
			1.2 KiB
		
	
	
	
		
			Go
		
	
	
	
| package util
 | |
| 
 | |
| import (
 | |
| 	migration_model "infini.sh/console/plugin/task_manager/model"
 | |
| 	"infini.sh/framework/core/task"
 | |
| )
 | |
| 
 | |
| /*
 | |
| These functions could return nil tasks
 | |
| */
 | |
| 
 | |
| func SplitIndexMigrationTasks(ptasks []task.Task) (scrollTask *task.Task, bulkTask *task.Task) {
 | |
| 	for i, ptask := range ptasks {
 | |
| 		if ptask.Metadata.Labels["pipeline_id"] == "bulk_indexing" {
 | |
| 			bulkTask = &ptasks[i]
 | |
| 		} else if ptask.Metadata.Labels["pipeline_id"] == "es_scroll" {
 | |
| 			scrollTask = &ptasks[i]
 | |
| 		}
 | |
| 	}
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func SplitIndexComparisonTasks(ptasks []task.Task, cfg *migration_model.IndexComparisonTaskConfig) (sourceDumpTask *task.Task, targetDumpTask *task.Task, diffTask *task.Task) {
 | |
| 	for i, ptask := range ptasks {
 | |
| 		if ptask.Metadata.Labels["pipeline_id"] == "dump_hash" {
 | |
| 			// TODO: we can't handle when compare the same cluster & same index
 | |
| 			// catch it earlier when creating the task
 | |
| 			if ptask.Metadata.Labels["cluster_id"] == cfg.Source.ClusterId && ptask.Metadata.Labels["index_name"] == cfg.Source.Indices {
 | |
| 				sourceDumpTask = &ptasks[i]
 | |
| 			} else {
 | |
| 				targetDumpTask = &ptasks[i]
 | |
| 			}
 | |
| 		} else if ptask.Metadata.Labels["pipeline_id"] == "index_diff" {
 | |
| 			diffTask = &ptasks[i]
 | |
| 		}
 | |
| 	}
 | |
| 	return
 | |
| }
 |