Merge pull request '[migration] index migration skip retrying scroll if possible' (#70) from feature/migration into master
This commit is contained in:
		
						commit
						aa9a8718d0
					
				| 
						 | 
					@ -7,41 +7,41 @@ package model
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"context"
 | 
						"context"
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
 | 
						"net/http"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"infini.sh/framework/core/agent"
 | 
						"infini.sh/framework/core/agent"
 | 
				
			||||||
	"infini.sh/framework/core/orm"
 | 
						"infini.sh/framework/core/orm"
 | 
				
			||||||
	"infini.sh/framework/core/util"
 | 
						"infini.sh/framework/core/util"
 | 
				
			||||||
	"infini.sh/framework/modules/pipeline"
 | 
						"infini.sh/framework/modules/pipeline"
 | 
				
			||||||
	"net/http"
 | 
					 | 
				
			||||||
	"time"
 | 
					 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 | 
				
			||||||
type Instance struct {
 | 
					type Instance struct {
 | 
				
			||||||
	orm.ORMObjectBase
 | 
						orm.ORMObjectBase
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	//InstanceID string `json:"instance_id,omitempty" elastic_mapping:"instance_id: { type: keyword }"`
 | 
						//InstanceID string `json:"instance_id,omitempty" elastic_mapping:"instance_id: { type: keyword }"`
 | 
				
			||||||
	Name        string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"`
 | 
						Name        string                 `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"`
 | 
				
			||||||
	Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"`
 | 
						Endpoint    string                 `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"`
 | 
				
			||||||
	Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"`
 | 
						Version     map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"`
 | 
				
			||||||
	BasicAuth agent.BasicAuth `config:"basic_auth" json:"basic_auth,omitempty" elastic_mapping:"basic_auth:{type:object}"`
 | 
						BasicAuth   agent.BasicAuth        `config:"basic_auth" json:"basic_auth,omitempty" elastic_mapping:"basic_auth:{type:object}"`
 | 
				
			||||||
	Owner string `json:"owner,omitempty" config:"owner" elastic_mapping:"owner:{type:keyword}"`
 | 
						Owner       string                 `json:"owner,omitempty" config:"owner" elastic_mapping:"owner:{type:keyword}"`
 | 
				
			||||||
	Tags [] string `json:"tags,omitempty"`
 | 
						Tags        []string               `json:"tags,omitempty"`
 | 
				
			||||||
	Description string `json:"description,omitempty" config:"description" elastic_mapping:"description:{type:keyword}"`
 | 
						Description string                 `json:"description,omitempty" config:"description" elastic_mapping:"description:{type:keyword}"`
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (inst *Instance) CreatePipeline(body []byte) error {
 | 
					func (inst *Instance) CreatePipeline(body []byte) error {
 | 
				
			||||||
	req := &util.Request{
 | 
						req := &util.Request{
 | 
				
			||||||
		Method: http.MethodPost,
 | 
							Method: http.MethodPost,
 | 
				
			||||||
		Body: body,
 | 
							Body:   body,
 | 
				
			||||||
		Url: inst.Endpoint + "/pipeline/tasks/",
 | 
							Url:    inst.Endpoint + "/pipeline/tasks/",
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return inst.doRequest(req, nil)
 | 
						return inst.doRequest(req, nil)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (inst *Instance) StopPipeline(ctx context.Context, pipelineID string) error {
 | 
					func (inst *Instance) StopPipeline(ctx context.Context, pipelineID string) error {
 | 
				
			||||||
	req := &util.Request{
 | 
						req := &util.Request{
 | 
				
			||||||
		Method: http.MethodPost,
 | 
							Method:  http.MethodPost,
 | 
				
			||||||
		Url: fmt.Sprintf("%s/pipeline/task/%s/_stop", inst.Endpoint, pipelineID),
 | 
							Url:     fmt.Sprintf("%s/pipeline/task/%s/_stop", inst.Endpoint, pipelineID),
 | 
				
			||||||
		Context: ctx,
 | 
							Context: ctx,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return inst.doRequest(req, nil)
 | 
						return inst.doRequest(req, nil)
 | 
				
			||||||
| 
						 | 
					@ -56,16 +56,15 @@ func (inst *Instance) StopPipelineWithTimeout(pipelineID string, duration time.D
 | 
				
			||||||
func (inst *Instance) StartPipeline(pipelineID string) error {
 | 
					func (inst *Instance) StartPipeline(pipelineID string) error {
 | 
				
			||||||
	req := &util.Request{
 | 
						req := &util.Request{
 | 
				
			||||||
		Method: http.MethodPost,
 | 
							Method: http.MethodPost,
 | 
				
			||||||
		Url: fmt.Sprintf("%s/pipeline/task/%s/_start", inst.Endpoint, pipelineID),
 | 
							Url:    fmt.Sprintf("%s/pipeline/task/%s/_start", inst.Endpoint, pipelineID),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return inst.doRequest(req, nil)
 | 
						return inst.doRequest(req, nil)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					 | 
				
			||||||
func (inst *Instance) DeletePipeline(pipelineID string) error {
 | 
					func (inst *Instance) DeletePipeline(pipelineID string) error {
 | 
				
			||||||
	req := &util.Request{
 | 
						req := &util.Request{
 | 
				
			||||||
		Method: http.MethodDelete,
 | 
							Method: http.MethodDelete,
 | 
				
			||||||
		Url: fmt.Sprintf("%s/pipeline/task/%s", inst.Endpoint, pipelineID),
 | 
							Url:    fmt.Sprintf("%s/pipeline/task/%s", inst.Endpoint, pipelineID),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return inst.doRequest(req, nil)
 | 
						return inst.doRequest(req, nil)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -77,9 +76,9 @@ func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipel
 | 
				
			||||||
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 | 
						ctx, cancel := context.WithTimeout(context.Background(), time.Second)
 | 
				
			||||||
	defer cancel()
 | 
						defer cancel()
 | 
				
			||||||
	req := &util.Request{
 | 
						req := &util.Request{
 | 
				
			||||||
		Method: http.MethodPost,
 | 
							Method:  http.MethodPost,
 | 
				
			||||||
		Url: fmt.Sprintf("%s/pipeline/tasks/_search", inst.Endpoint),
 | 
							Url:     fmt.Sprintf("%s/pipeline/tasks/_search", inst.Endpoint),
 | 
				
			||||||
		Body: body,
 | 
							Body:    body,
 | 
				
			||||||
		Context: ctx,
 | 
							Context: ctx,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	res := pipeline.GetPipelinesResponse{}
 | 
						res := pipeline.GetPipelinesResponse{}
 | 
				
			||||||
| 
						 | 
					@ -90,7 +89,18 @@ func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipel
 | 
				
			||||||
func (inst *Instance) DeleteQueueBySelector(selector util.MapStr) error {
 | 
					func (inst *Instance) DeleteQueueBySelector(selector util.MapStr) error {
 | 
				
			||||||
	req := &util.Request{
 | 
						req := &util.Request{
 | 
				
			||||||
		Method: http.MethodDelete,
 | 
							Method: http.MethodDelete,
 | 
				
			||||||
		Url: fmt.Sprintf("%s/queue/_search", inst.Endpoint),
 | 
							Url:    fmt.Sprintf("%s/queue/_search", inst.Endpoint),
 | 
				
			||||||
 | 
							Body: util.MustToJSONBytes(util.MapStr{
 | 
				
			||||||
 | 
								"selector": selector,
 | 
				
			||||||
 | 
							}),
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return inst.doRequest(req, nil)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (inst *Instance) DeleteQueueConsumersBySelector(selector util.MapStr) error {
 | 
				
			||||||
 | 
						req := &util.Request{
 | 
				
			||||||
 | 
							Method: http.MethodDelete,
 | 
				
			||||||
 | 
							Url:    fmt.Sprintf("%s/queue/consumer/_search", inst.Endpoint),
 | 
				
			||||||
		Body: util.MustToJSONBytes(util.MapStr{
 | 
							Body: util.MustToJSONBytes(util.MapStr{
 | 
				
			||||||
			"selector": selector,
 | 
								"selector": selector,
 | 
				
			||||||
		}),
 | 
							}),
 | 
				
			||||||
| 
						 | 
					@ -100,8 +110,8 @@ func (inst *Instance) DeleteQueueBySelector(selector util.MapStr) error {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (inst *Instance) TryConnect(ctx context.Context) error {
 | 
					func (inst *Instance) TryConnect(ctx context.Context) error {
 | 
				
			||||||
	req := &util.Request{
 | 
						req := &util.Request{
 | 
				
			||||||
		Method: http.MethodGet,
 | 
							Method:  http.MethodGet,
 | 
				
			||||||
		Url: fmt.Sprintf("%s/_framework/api/_info", inst.Endpoint),
 | 
							Url:     fmt.Sprintf("%s/_framework/api/_info", inst.Endpoint),
 | 
				
			||||||
		Context: ctx,
 | 
							Context: ctx,
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return inst.doRequest(req, nil)
 | 
						return inst.doRequest(req, nil)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -105,10 +105,16 @@ type IndexStateInfo struct {
 | 
				
			||||||
	IndexDocs       float64
 | 
						IndexDocs       float64
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						IndexMigrationV0 = 0
 | 
				
			||||||
 | 
						IndexMigrationV1 = 1
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type IndexMigrationTaskConfig struct {
 | 
					type IndexMigrationTaskConfig struct {
 | 
				
			||||||
	Source    IndexMigrationSourceConfig `json:"source"`
 | 
						Source    IndexMigrationSourceConfig `json:"source"`
 | 
				
			||||||
	Target    IndexMigrationTargetConfig `json:"target"`
 | 
						Target    IndexMigrationTargetConfig `json:"target"`
 | 
				
			||||||
	Execution ExecutionConfig            `json:"execution"`
 | 
						Execution ExecutionConfig            `json:"execution"`
 | 
				
			||||||
 | 
						Version   int                        `json:"version"`
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type IndexMigrationSourceConfig struct {
 | 
					type IndexMigrationSourceConfig struct {
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -356,9 +356,9 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
 | 
				
			||||||
		p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{
 | 
							p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{
 | 
				
			||||||
			Success: true,
 | 
								Success: true,
 | 
				
			||||||
		}, "index migration completed")
 | 
							}, "index migration completed")
 | 
				
			||||||
 | 
							// clean disk queue if bulk completed
 | 
				
			||||||
 | 
							p.cleanGatewayQueue(taskItem)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	// clean disk queue if bulk failed/completed
 | 
					 | 
				
			||||||
	p.cleanGatewayQueue(taskItem)
 | 
					 | 
				
			||||||
	p.decrInstanceJobs(instanceID)
 | 
						p.decrInstanceJobs(instanceID)
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -446,8 +446,6 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err
 | 
				
			||||||
	if len(tasks) == 0 {
 | 
						if len(tasks) == 0 {
 | 
				
			||||||
		taskItem.Status = task2.StatusStopped
 | 
							taskItem.Status = task2.StatusStopped
 | 
				
			||||||
		p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("index migration task [%s] stopped", taskItem.ID))
 | 
							p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("index migration task [%s] stopped", taskItem.ID))
 | 
				
			||||||
		// clean disk queue if manually stopped
 | 
					 | 
				
			||||||
		p.cleanGatewayQueue(taskItem)
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -619,6 +617,13 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error {
 | 
					func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error {
 | 
				
			||||||
 | 
						cfg := migration_model.IndexMigrationTaskConfig{}
 | 
				
			||||||
 | 
						err := migration_util.GetTaskConfig(taskItem, &cfg)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							log.Errorf("failed to get task config, err: %v", err)
 | 
				
			||||||
 | 
							return fmt.Errorf("got wrong config of task [%s]", taskItem.ID)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	scrollTask, bulkTask, err := p.getScrollBulkPipelineTasks(taskItem)
 | 
						scrollTask, bulkTask, err := p.getScrollBulkPipelineTasks(taskItem)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		log.Errorf("failed to get pipeline tasks, err: %v", err)
 | 
							log.Errorf("failed to get pipeline tasks, err: %v", err)
 | 
				
			||||||
| 
						 | 
					@ -630,28 +635,62 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	instance, err := p.getPreferenceInstance(taskItem.ParentId[0])
 | 
						instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
 | 
				
			||||||
	if err != nil {
 | 
						totalDocs := cfg.Source.DocCount
 | 
				
			||||||
		return fmt.Errorf("get preference intance error: %w", err)
 | 
						scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs)
 | 
				
			||||||
	}
 | 
					
 | 
				
			||||||
	if p.getInstanceState(instance.ID).Total >= p.config.MaxTasksPerInstance {
 | 
						redoScroll := true
 | 
				
			||||||
		log.Debugf("hit max tasks per instance with %d, skip dispatch", p.config.MaxTasksPerInstance)
 | 
						if cfg.Version >= migration_model.IndexMigrationV1 {
 | 
				
			||||||
		return nil
 | 
							// skip scroll if possible
 | 
				
			||||||
 | 
							if scrolled && err == nil {
 | 
				
			||||||
 | 
								redoScroll = false
 | 
				
			||||||
 | 
								// reset queue consumer offset
 | 
				
			||||||
 | 
								err = p.resetGatewayQueue(taskItem)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									log.Infof("task [%s] failed to reset gateway queue, redo scroll", taskItem.ID)
 | 
				
			||||||
 | 
									redoScroll = true
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// try to clear disk queue before running es_scroll
 | 
						if !redoScroll {
 | 
				
			||||||
	p.cleanGatewayQueue(taskItem)
 | 
							migration_util.WriteLog(taskItem, nil, fmt.Sprintf("task [%s] skiping scroll", taskItem.ID))
 | 
				
			||||||
 | 
						} else {
 | 
				
			||||||
 | 
							var executionConfig migration_model.ExecutionConfig
 | 
				
			||||||
 | 
							if cfg.Version >= migration_model.IndexMigrationV1 {
 | 
				
			||||||
 | 
								executionConfig = cfg.Execution
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								executionConfig, err = p.getExecutionConfigFromMajorTask(taskItem)
 | 
				
			||||||
 | 
								if err != nil {
 | 
				
			||||||
 | 
									return fmt.Errorf("get execution config from parent task failed, err: %v", err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// update scroll task to ready
 | 
							// get a new instanceID
 | 
				
			||||||
	scrollTask.Metadata.Labels["execution_instance_id"] = instance.ID
 | 
							instance, err := p.getPreferenceInstance(executionConfig)
 | 
				
			||||||
	scrollTask.Status = task2.StatusReady
 | 
							if err != nil {
 | 
				
			||||||
	err = orm.Update(nil, scrollTask)
 | 
								return fmt.Errorf("get preference intance error: %w", err)
 | 
				
			||||||
	if err != nil {
 | 
							}
 | 
				
			||||||
		return fmt.Errorf("update scroll pipeline task error: %w", err)
 | 
							if p.getInstanceState(instance.ID).Total >= p.config.MaxTasksPerInstance {
 | 
				
			||||||
 | 
								log.Debugf("hit max tasks per instance with %d, skip dispatch", p.config.MaxTasksPerInstance)
 | 
				
			||||||
 | 
								return nil
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							instanceID = instance.ID
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							// update instance info first
 | 
				
			||||||
 | 
							scrollTask.Metadata.Labels["execution_instance_id"] = instanceID
 | 
				
			||||||
 | 
							// try to clear disk queue before running es_scroll
 | 
				
			||||||
 | 
							p.cleanGatewayQueue(taskItem)
 | 
				
			||||||
 | 
							// update scroll task to ready
 | 
				
			||||||
 | 
							scrollTask.Status = task2.StatusReady
 | 
				
			||||||
 | 
							err = orm.Update(nil, scrollTask)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								return fmt.Errorf("update scroll pipeline task error: %w", err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// update bulk task to init
 | 
						// update bulk task to init
 | 
				
			||||||
	bulkTask.Metadata.Labels["execution_instance_id"] = instance.ID
 | 
						bulkTask.Metadata.Labels["execution_instance_id"] = instanceID
 | 
				
			||||||
	bulkTask.Status = task2.StatusInit
 | 
						bulkTask.Status = task2.StatusInit
 | 
				
			||||||
	err = orm.Update(nil, bulkTask)
 | 
						err = orm.Update(nil, bulkTask)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
| 
						 | 
					@ -660,7 +699,7 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// update sub migration task status to running and save task log
 | 
						// update sub migration task status to running and save task log
 | 
				
			||||||
	taskItem.RetryTimes++
 | 
						taskItem.RetryTimes++
 | 
				
			||||||
	taskItem.Metadata.Labels["execution_instance_id"] = instance.ID
 | 
						taskItem.Metadata.Labels["execution_instance_id"] = instanceID
 | 
				
			||||||
	taskItem.Metadata.Labels["index_docs"] = 0
 | 
						taskItem.Metadata.Labels["index_docs"] = 0
 | 
				
			||||||
	taskItem.Metadata.Labels["scrolled_docs"] = 0
 | 
						taskItem.Metadata.Labels["scrolled_docs"] = 0
 | 
				
			||||||
	taskItem.Status = task2.StatusRunning
 | 
						taskItem.Status = task2.StatusRunning
 | 
				
			||||||
| 
						 | 
					@ -670,11 +709,12 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
 | 
				
			||||||
		Success: true,
 | 
							Success: true,
 | 
				
			||||||
	}, fmt.Sprintf("task [%s] started", taskItem.ID))
 | 
						}, fmt.Sprintf("task [%s] started", taskItem.ID))
 | 
				
			||||||
	// update dispatcher state
 | 
						// update dispatcher state
 | 
				
			||||||
	p.incrInstanceJobs(instance.ID)
 | 
						p.incrInstanceJobs(instanceID)
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instance model.Instance, err error) {
 | 
					func (p *DispatcherProcessor) getExecutionConfigFromMajorTask(taskItem *task2.Task) (config migration_model.ExecutionConfig, err error) {
 | 
				
			||||||
 | 
						majorTaskID := taskItem.ParentId[0]
 | 
				
			||||||
	majorTask := task2.Task{}
 | 
						majorTask := task2.Task{}
 | 
				
			||||||
	majorTask.ID = majorTaskID
 | 
						majorTask.ID = majorTaskID
 | 
				
			||||||
	_, err = orm.Get(&majorTask)
 | 
						_, err = orm.Get(&majorTask)
 | 
				
			||||||
| 
						 | 
					@ -688,11 +728,16 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc
 | 
				
			||||||
		log.Errorf("failed to get task config, err: %v", err)
 | 
							log.Errorf("failed to get task config, err: %v", err)
 | 
				
			||||||
		return
 | 
							return
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						config = cfg.Settings.Execution
 | 
				
			||||||
 | 
						return
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (p *DispatcherProcessor) getPreferenceInstance(config migration_model.ExecutionConfig) (instance model.Instance, err error) {
 | 
				
			||||||
	var (
 | 
						var (
 | 
				
			||||||
		total    = math.MaxInt
 | 
							total    = math.MaxInt
 | 
				
			||||||
		tempInst = model.Instance{}
 | 
							tempInst = model.Instance{}
 | 
				
			||||||
	)
 | 
						)
 | 
				
			||||||
	for _, node := range cfg.Settings.Execution.Nodes.Permit {
 | 
						for _, node := range config.Nodes.Permit {
 | 
				
			||||||
		instanceTotal := p.getInstanceState(node.ID).Total
 | 
							instanceTotal := p.getInstanceState(node.ID).Total
 | 
				
			||||||
		if instanceTotal < total {
 | 
							if instanceTotal < total {
 | 
				
			||||||
			if p.config.CheckInstanceAvailable {
 | 
								if p.config.CheckInstanceAvailable {
 | 
				
			||||||
| 
						 | 
					@ -917,6 +962,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
 | 
				
			||||||
						Source:    partitionSource,
 | 
											Source:    partitionSource,
 | 
				
			||||||
						Target:    target,
 | 
											Target:    target,
 | 
				
			||||||
						Execution: clusterMigrationTask.Settings.Execution,
 | 
											Execution: clusterMigrationTask.Settings.Execution,
 | 
				
			||||||
 | 
											Version:   migration_model.IndexMigrationV1,
 | 
				
			||||||
					}),
 | 
										}),
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
				partitionMigrationTask.ID = util.GetUUID()
 | 
									partitionMigrationTask.ID = util.GetUUID()
 | 
				
			||||||
| 
						 | 
					@ -930,8 +976,10 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
 | 
				
			||||||
			source.DocCount = index.Source.Docs
 | 
								source.DocCount = index.Source.Docs
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			indexParameters := migration_model.IndexMigrationTaskConfig{
 | 
								indexParameters := migration_model.IndexMigrationTaskConfig{
 | 
				
			||||||
				Source: source,
 | 
									Source:    source,
 | 
				
			||||||
				Target: target,
 | 
									Target:    target,
 | 
				
			||||||
 | 
									Execution: clusterMigrationTask.Settings.Execution,
 | 
				
			||||||
 | 
									Version:   migration_model.IndexMigrationV1,
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			indexMigrationTask := task2.Task{
 | 
								indexMigrationTask := task2.Task{
 | 
				
			||||||
				ParentId:          []string{taskItem.ID},
 | 
									ParentId:          []string{taskItem.ID},
 | 
				
			||||||
| 
						 | 
					@ -1315,6 +1363,8 @@ func (p *DispatcherProcessor) refreshInstanceJobsFromES() error {
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (p *DispatcherProcessor) cleanGatewayQueue(taskItem *task2.Task) {
 | 
					func (p *DispatcherProcessor) cleanGatewayQueue(taskItem *task2.Task) {
 | 
				
			||||||
 | 
						log.Debugf("cleaning gateway queue for task [%s]", taskItem.ID)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	var err error
 | 
						var err error
 | 
				
			||||||
	instance := model.Instance{}
 | 
						instance := model.Instance{}
 | 
				
			||||||
	instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
 | 
						instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
 | 
				
			||||||
| 
						 | 
					@ -1338,3 +1388,29 @@ func (p *DispatcherProcessor) cleanGatewayQueue(taskItem *task2.Task) {
 | 
				
			||||||
		log.Errorf("failed to delete queue, err: %v", err)
 | 
							log.Errorf("failed to delete queue, err: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (p *DispatcherProcessor) resetGatewayQueue(taskItem *task2.Task) error {
 | 
				
			||||||
 | 
						log.Debugf("resetting gateway queue offset for task [%s]", taskItem.ID)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						var err error
 | 
				
			||||||
 | 
						instance := model.Instance{}
 | 
				
			||||||
 | 
						instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
 | 
				
			||||||
 | 
						_, err = orm.Get(&instance)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							log.Errorf("failed to get instance, err: %v", err)
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						selector := util.MapStr{
 | 
				
			||||||
 | 
							"labels": util.MapStr{
 | 
				
			||||||
 | 
								"migration_task_id": taskItem.ID,
 | 
				
			||||||
 | 
							},
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						err = instance.DeleteQueueConsumersBySelector(selector)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							log.Errorf("failed to delete queue consumers, err: %v", err)
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -92,7 +92,7 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error {
 | 
				
			||||||
	taskItem.StartTimeInMillis = time.Now().UnixMilli()
 | 
						taskItem.StartTimeInMillis = time.Now().UnixMilli()
 | 
				
			||||||
	p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
 | 
						p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
 | 
				
			||||||
		Success: true,
 | 
							Success: true,
 | 
				
			||||||
	}, fmt.Sprintf("pipeline task [%s] started", taskItem.ID))
 | 
						}, fmt.Sprintf("[%v] pipeline task [%s] started", taskItem.Metadata.Labels["pipeline_id"], taskItem.ID))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -138,7 +138,7 @@ func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error
 | 
				
			||||||
	p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
 | 
						p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
 | 
				
			||||||
		Success: errMsg == "",
 | 
							Success: errMsg == "",
 | 
				
			||||||
		Error:   errMsg,
 | 
							Error:   errMsg,
 | 
				
			||||||
	}, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID))
 | 
						}, fmt.Sprintf("[es_scroll] pipeline task [%s] completed", taskItem.ID))
 | 
				
			||||||
	p.cleanGatewayPipeline(taskItem)
 | 
						p.cleanGatewayPipeline(taskItem)
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -171,7 +171,7 @@ func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) e
 | 
				
			||||||
	p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
 | 
						p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
 | 
				
			||||||
		Success: errMsg == "",
 | 
							Success: errMsg == "",
 | 
				
			||||||
		Error:   errMsg,
 | 
							Error:   errMsg,
 | 
				
			||||||
	}, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID))
 | 
						}, fmt.Sprintf("[bulk_indexing] pipeline task [%s] completed", taskItem.ID))
 | 
				
			||||||
	p.cleanGatewayPipeline(taskItem)
 | 
						p.cleanGatewayPipeline(taskItem)
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
| 
						 | 
					@ -194,7 +194,7 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error {
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if stopped {
 | 
						if stopped {
 | 
				
			||||||
		taskItem.Status = task.StatusStopped
 | 
							taskItem.Status = task.StatusStopped
 | 
				
			||||||
		p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("task [%s] stopped", taskItem.ID))
 | 
							p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("[%v] task [%s] stopped", taskItem.Metadata.Labels["pipeline_id"], taskItem.ID))
 | 
				
			||||||
		p.cleanGatewayPipeline(taskItem)
 | 
							p.cleanGatewayPipeline(taskItem)
 | 
				
			||||||
		return nil
 | 
							return nil
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue