diff --git a/model/instance.go b/model/instance.go index 92ac9d79..2144545b 100644 --- a/model/instance.go +++ b/model/instance.go @@ -7,41 +7,41 @@ package model import ( "context" "fmt" + "net/http" + "time" + "infini.sh/framework/core/agent" "infini.sh/framework/core/orm" "infini.sh/framework/core/util" "infini.sh/framework/modules/pipeline" - "net/http" - "time" ) - type Instance struct { orm.ORMObjectBase //InstanceID string `json:"instance_id,omitempty" elastic_mapping:"instance_id: { type: keyword }"` - Name string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"` - Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"` - Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"` - BasicAuth agent.BasicAuth `config:"basic_auth" json:"basic_auth,omitempty" elastic_mapping:"basic_auth:{type:object}"` - Owner string `json:"owner,omitempty" config:"owner" elastic_mapping:"owner:{type:keyword}"` - Tags [] string `json:"tags,omitempty"` - Description string `json:"description,omitempty" config:"description" elastic_mapping:"description:{type:keyword}"` + Name string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"` + Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"` + Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"` + BasicAuth agent.BasicAuth `config:"basic_auth" json:"basic_auth,omitempty" elastic_mapping:"basic_auth:{type:object}"` + Owner string `json:"owner,omitempty" config:"owner" elastic_mapping:"owner:{type:keyword}"` + Tags []string `json:"tags,omitempty"` + Description string `json:"description,omitempty" config:"description" elastic_mapping:"description:{type:keyword}"` } func (inst *Instance) CreatePipeline(body []byte) error { req := &util.Request{ Method: http.MethodPost, - Body: body, - Url: inst.Endpoint + "/pipeline/tasks/", + Body: body, + Url: inst.Endpoint + "/pipeline/tasks/", } return inst.doRequest(req, nil) } func (inst *Instance) StopPipeline(ctx context.Context, pipelineID string) error { req := &util.Request{ - Method: http.MethodPost, - Url: fmt.Sprintf("%s/pipeline/task/%s/_stop", inst.Endpoint, pipelineID), + Method: http.MethodPost, + Url: fmt.Sprintf("%s/pipeline/task/%s/_stop", inst.Endpoint, pipelineID), Context: ctx, } return inst.doRequest(req, nil) @@ -56,16 +56,15 @@ func (inst *Instance) StopPipelineWithTimeout(pipelineID string, duration time.D func (inst *Instance) StartPipeline(pipelineID string) error { req := &util.Request{ Method: http.MethodPost, - Url: fmt.Sprintf("%s/pipeline/task/%s/_start", inst.Endpoint, pipelineID), + Url: fmt.Sprintf("%s/pipeline/task/%s/_start", inst.Endpoint, pipelineID), } return inst.doRequest(req, nil) } - func (inst *Instance) DeletePipeline(pipelineID string) error { req := &util.Request{ Method: http.MethodDelete, - Url: fmt.Sprintf("%s/pipeline/task/%s", inst.Endpoint, pipelineID), + Url: fmt.Sprintf("%s/pipeline/task/%s", inst.Endpoint, pipelineID), } return inst.doRequest(req, nil) } @@ -77,9 +76,9 @@ func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipel ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() req := &util.Request{ - Method: http.MethodPost, - Url: fmt.Sprintf("%s/pipeline/tasks/_search", inst.Endpoint), - Body: body, + Method: http.MethodPost, + Url: fmt.Sprintf("%s/pipeline/tasks/_search", inst.Endpoint), + Body: body, Context: ctx, } res := pipeline.GetPipelinesResponse{} @@ -90,7 +89,18 @@ func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipel func (inst *Instance) DeleteQueueBySelector(selector util.MapStr) error { req := &util.Request{ Method: http.MethodDelete, - Url: fmt.Sprintf("%s/queue/_search", inst.Endpoint), + Url: fmt.Sprintf("%s/queue/_search", inst.Endpoint), + Body: util.MustToJSONBytes(util.MapStr{ + "selector": selector, + }), + } + return inst.doRequest(req, nil) +} + +func (inst *Instance) DeleteQueueConsumersBySelector(selector util.MapStr) error { + req := &util.Request{ + Method: http.MethodDelete, + Url: fmt.Sprintf("%s/queue/consumer/_search", inst.Endpoint), Body: util.MustToJSONBytes(util.MapStr{ "selector": selector, }), @@ -100,8 +110,8 @@ func (inst *Instance) DeleteQueueBySelector(selector util.MapStr) error { func (inst *Instance) TryConnect(ctx context.Context) error { req := &util.Request{ - Method: http.MethodGet, - Url: fmt.Sprintf("%s/_framework/api/_info", inst.Endpoint), + Method: http.MethodGet, + Url: fmt.Sprintf("%s/_framework/api/_info", inst.Endpoint), Context: ctx, } return inst.doRequest(req, nil) @@ -125,4 +135,4 @@ func (inst *Instance) doRequest(req *util.Request, resBody interface{}) error { return util.FromJSONBytes(result.Body, resBody) } return nil -} \ No newline at end of file +} diff --git a/plugin/migration/model/model.go b/plugin/migration/model/model.go index d5d4d9e2..6d9b9085 100644 --- a/plugin/migration/model/model.go +++ b/plugin/migration/model/model.go @@ -105,10 +105,16 @@ type IndexStateInfo struct { IndexDocs float64 } +const ( + IndexMigrationV0 = 0 + IndexMigrationV1 = 1 +) + type IndexMigrationTaskConfig struct { Source IndexMigrationSourceConfig `json:"source"` Target IndexMigrationTargetConfig `json:"target"` Execution ExecutionConfig `json:"execution"` + Version int `json:"version"` } type IndexMigrationSourceConfig struct { diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index ce765b80..6348360d 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -356,9 +356,9 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ Success: true, }, "index migration completed") + // clean disk queue if bulk completed + p.cleanGatewayQueue(taskItem) } - // clean disk queue if bulk failed/completed - p.cleanGatewayQueue(taskItem) p.decrInstanceJobs(instanceID) return nil @@ -446,8 +446,6 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err if len(tasks) == 0 { taskItem.Status = task2.StatusStopped p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("index migration task [%s] stopped", taskItem.ID)) - // clean disk queue if manually stopped - p.cleanGatewayQueue(taskItem) } return nil } @@ -619,6 +617,13 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error { } func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error { + cfg := migration_model.IndexMigrationTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + return fmt.Errorf("got wrong config of task [%s]", taskItem.ID) + } + scrollTask, bulkTask, err := p.getScrollBulkPipelineTasks(taskItem) if err != nil { log.Errorf("failed to get pipeline tasks, err: %v", err) @@ -630,28 +635,62 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error return nil } - instance, err := p.getPreferenceInstance(taskItem.ParentId[0]) - if err != nil { - return fmt.Errorf("get preference intance error: %w", err) - } - if p.getInstanceState(instance.ID).Total >= p.config.MaxTasksPerInstance { - log.Debugf("hit max tasks per instance with %d, skip dispatch", p.config.MaxTasksPerInstance) - return nil + instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) + totalDocs := cfg.Source.DocCount + scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs) + + redoScroll := true + if cfg.Version >= migration_model.IndexMigrationV1 { + // skip scroll if possible + if scrolled && err == nil { + redoScroll = false + // reset queue consumer offset + err = p.resetGatewayQueue(taskItem) + if err != nil { + log.Infof("task [%s] failed to reset gateway queue, redo scroll", taskItem.ID) + redoScroll = true + } + } } - // try to clear disk queue before running es_scroll - p.cleanGatewayQueue(taskItem) + if !redoScroll { + migration_util.WriteLog(taskItem, nil, fmt.Sprintf("task [%s] skiping scroll", taskItem.ID)) + } else { + var executionConfig migration_model.ExecutionConfig + if cfg.Version >= migration_model.IndexMigrationV1 { + executionConfig = cfg.Execution + } else { + executionConfig, err = p.getExecutionConfigFromMajorTask(taskItem) + if err != nil { + return fmt.Errorf("get execution config from parent task failed, err: %v", err) + } + } - // update scroll task to ready - scrollTask.Metadata.Labels["execution_instance_id"] = instance.ID - scrollTask.Status = task2.StatusReady - err = orm.Update(nil, scrollTask) - if err != nil { - return fmt.Errorf("update scroll pipeline task error: %w", err) + // get a new instanceID + instance, err := p.getPreferenceInstance(executionConfig) + if err != nil { + return fmt.Errorf("get preference intance error: %w", err) + } + if p.getInstanceState(instance.ID).Total >= p.config.MaxTasksPerInstance { + log.Debugf("hit max tasks per instance with %d, skip dispatch", p.config.MaxTasksPerInstance) + return nil + } + instanceID = instance.ID + + // update instance info first + scrollTask.Metadata.Labels["execution_instance_id"] = instanceID + // try to clear disk queue before running es_scroll + p.cleanGatewayQueue(taskItem) + // update scroll task to ready + scrollTask.Status = task2.StatusReady + err = orm.Update(nil, scrollTask) + if err != nil { + return fmt.Errorf("update scroll pipeline task error: %w", err) + } } // update bulk task to init - bulkTask.Metadata.Labels["execution_instance_id"] = instance.ID + bulkTask.Metadata.Labels["execution_instance_id"] = instanceID bulkTask.Status = task2.StatusInit err = orm.Update(nil, bulkTask) if err != nil { @@ -660,7 +699,7 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error // update sub migration task status to running and save task log taskItem.RetryTimes++ - taskItem.Metadata.Labels["execution_instance_id"] = instance.ID + taskItem.Metadata.Labels["execution_instance_id"] = instanceID taskItem.Metadata.Labels["index_docs"] = 0 taskItem.Metadata.Labels["scrolled_docs"] = 0 taskItem.Status = task2.StatusRunning @@ -670,11 +709,12 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error Success: true, }, fmt.Sprintf("task [%s] started", taskItem.ID)) // update dispatcher state - p.incrInstanceJobs(instance.ID) + p.incrInstanceJobs(instanceID) return nil } -func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instance model.Instance, err error) { +func (p *DispatcherProcessor) getExecutionConfigFromMajorTask(taskItem *task2.Task) (config migration_model.ExecutionConfig, err error) { + majorTaskID := taskItem.ParentId[0] majorTask := task2.Task{} majorTask.ID = majorTaskID _, err = orm.Get(&majorTask) @@ -688,11 +728,16 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc log.Errorf("failed to get task config, err: %v", err) return } + config = cfg.Settings.Execution + return +} + +func (p *DispatcherProcessor) getPreferenceInstance(config migration_model.ExecutionConfig) (instance model.Instance, err error) { var ( total = math.MaxInt tempInst = model.Instance{} ) - for _, node := range cfg.Settings.Execution.Nodes.Permit { + for _, node := range config.Nodes.Permit { instanceTotal := p.getInstanceState(node.ID).Total if instanceTotal < total { if p.config.CheckInstanceAvailable { @@ -917,6 +962,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro Source: partitionSource, Target: target, Execution: clusterMigrationTask.Settings.Execution, + Version: migration_model.IndexMigrationV1, }), } partitionMigrationTask.ID = util.GetUUID() @@ -930,8 +976,10 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro source.DocCount = index.Source.Docs indexParameters := migration_model.IndexMigrationTaskConfig{ - Source: source, - Target: target, + Source: source, + Target: target, + Execution: clusterMigrationTask.Settings.Execution, + Version: migration_model.IndexMigrationV1, } indexMigrationTask := task2.Task{ ParentId: []string{taskItem.ID}, @@ -1315,6 +1363,8 @@ func (p *DispatcherProcessor) refreshInstanceJobsFromES() error { } func (p *DispatcherProcessor) cleanGatewayQueue(taskItem *task2.Task) { + log.Debugf("cleaning gateway queue for task [%s]", taskItem.ID) + var err error instance := model.Instance{} instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) @@ -1338,3 +1388,29 @@ func (p *DispatcherProcessor) cleanGatewayQueue(taskItem *task2.Task) { log.Errorf("failed to delete queue, err: %v", err) } } + +func (p *DispatcherProcessor) resetGatewayQueue(taskItem *task2.Task) error { + log.Debugf("resetting gateway queue offset for task [%s]", taskItem.ID) + + var err error + instance := model.Instance{} + instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) + _, err = orm.Get(&instance) + if err != nil { + log.Errorf("failed to get instance, err: %v", err) + return err + } + + selector := util.MapStr{ + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, + } + err = instance.DeleteQueueConsumersBySelector(selector) + if err != nil { + log.Errorf("failed to delete queue consumers, err: %v", err) + return err + } + + return nil +} diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index ffa1c3ea..03a4cfa3 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -92,7 +92,7 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error { taskItem.StartTimeInMillis = time.Now().UnixMilli() p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: true, - }, fmt.Sprintf("pipeline task [%s] started", taskItem.ID)) + }, fmt.Sprintf("[%v] pipeline task [%s] started", taskItem.Metadata.Labels["pipeline_id"], taskItem.ID)) return nil } @@ -138,7 +138,7 @@ func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: errMsg == "", Error: errMsg, - }, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID)) + }, fmt.Sprintf("[es_scroll] pipeline task [%s] completed", taskItem.ID)) p.cleanGatewayPipeline(taskItem) return nil } @@ -171,7 +171,7 @@ func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) e p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ Success: errMsg == "", Error: errMsg, - }, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID)) + }, fmt.Sprintf("[bulk_indexing] pipeline task [%s] completed", taskItem.ID)) p.cleanGatewayPipeline(taskItem) return nil } @@ -194,7 +194,7 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error { if stopped { taskItem.Status = task.StatusStopped - p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) + p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("[%v] task [%s] stopped", taskItem.Metadata.Labels["pipeline_id"], taskItem.ID)) p.cleanGatewayPipeline(taskItem) return nil }