From ba409072c892a80541e8c5254a55dcfb1cd9531d Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Tue, 16 May 2023 10:15:37 +0800 Subject: [PATCH] [migration] split scheduler/index_migration --- .../index_migration/index_migration.go | 601 +++++++++++++ plugin/migration/model/scheduler.go | 16 + plugin/migration/pipeline.go | 801 +----------------- plugin/migration/scheduler/scheduler.go | 182 ++++ plugin/migration/util/orm.go | 112 +++ 5 files changed, 926 insertions(+), 786 deletions(-) create mode 100644 plugin/migration/index_migration/index_migration.go create mode 100644 plugin/migration/model/scheduler.go create mode 100644 plugin/migration/scheduler/scheduler.go create mode 100644 plugin/migration/util/orm.go diff --git a/plugin/migration/index_migration/index_migration.go b/plugin/migration/index_migration/index_migration.go new file mode 100644 index 00000000..ac8a9010 --- /dev/null +++ b/plugin/migration/index_migration/index_migration.go @@ -0,0 +1,601 @@ +package index_migration + +import ( + "errors" + "fmt" + "time" + + log "github.com/cihub/seelog" + + "infini.sh/console/model" + migration_model "infini.sh/console/plugin/migration/model" + migration_util "infini.sh/console/plugin/migration/util" + + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" + "infini.sh/framework/modules/elastic/common" +) + +type processor struct { + Elasticsearch string + IndexName string + LogIndexName string + scheduler migration_model.Scheduler +} + +func NewProcessor(elasticsearch, indexName, logIndexName string, scheduler migration_model.Scheduler) migration_model.Processor { + return &processor{ + Elasticsearch: elasticsearch, + IndexName: indexName, + LogIndexName: logIndexName, + scheduler: scheduler, + } +} + +func (p *processor) Process(t *task.Task) (err error) { + switch t.Status { + case task.StatusReady: + // split & schedule pipline tasks + err = p.handleReadySubTask(t) + case task.StatusRunning: + // check pipeline tasks status + err = p.handleRunningSubTask(t) + case task.StatusPendingStop: + // mark pipeline tasks as pending_stop + err = p.handlePendingStopSubTask(t) + } + return err +} + +func (p *processor) handlePendingStopSubTask(taskItem *task.Task) error { + err := migration_util.UpdatePendingChildTasksToPendingStop(taskItem, "pipeline") + if err != nil { + log.Errorf("failed to update sub task status, err: %v", err) + return nil + } + + tasks, err := migration_util.GetPendingChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "pipeline") + if err != nil { + log.Errorf("failed to get sub tasks, err: %v", err) + return nil + } + + // all subtask stopped or error or complete + if len(tasks) == 0 { + taskItem.Status = task.StatusStopped + p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("index migration task [%s] stopped", taskItem.ID)) + } + return nil +} + +func (p *processor) handleReadySubTask(taskItem *task.Task) error { + if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok { + return p.handleSplitSubTask(taskItem) + } + + return p.handleScheduleSubTask(taskItem) +} + +func (p *processor) handleSplitSubTask(taskItem *task.Task) error { + //split task to scroll/bulk_indexing pipeline and then persistent + var pids []string + pids = append(pids, taskItem.ParentId...) + pids = append(pids, taskItem.ID) + scrollID := util.GetUUID() + cfg := migration_model.IndexMigrationTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &cfg) + if err != nil { + return fmt.Errorf("got wrong config [%v] with task [%s], err: %v", taskItem.ConfigString, taskItem.ID, err) + } + sourceClusterID := cfg.Source.ClusterId + targetClusterID := cfg.Target.ClusterId + docType := common.GetClusterDocType(targetClusterID) + if len(taskItem.ParentId) == 0 { + return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) + } + queryDsl := cfg.Source.QueryDSL + scrollQueryDsl := util.MustToJSON(util.MapStr{ + "query": queryDsl, + }) + indexName := cfg.Source.Indices + scrollTask := &task.Task{ + ParentId: pids, + Runnable: true, + Cancellable: true, + Metadata: task.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "cluster_id": sourceClusterID, + "pipeline_id": "es_scroll", + "index_name": indexName, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + }, + Status: task.StatusInit, + RetryTimes: taskItem.RetryTimes, + ConfigString: util.MustToJSON(migration_model.PipelineTaskConfig{ + Name: scrollID, + Logging: migration_model.PipelineTaskLoggingConfig{ + Enabled: true, + }, + Labels: util.MapStr{ + "parent_task_id": pids, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + AutoStart: true, + KeepRunning: false, + Processor: []util.MapStr{ + { + "es_scroll": util.MapStr{ + "remove_type": docType == "", + "slice_size": cfg.Source.SliceSize, + "batch_size": cfg.Source.BatchSize, + "indices": indexName, + "elasticsearch": sourceClusterID, + "queue": util.MapStr{ + "name": scrollID, + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, + }, + "partition_size": 1, + "scroll_time": cfg.Source.ScrollTime, + "query_dsl": scrollQueryDsl, + "index_rename": cfg.Source.IndexRename, + "type_rename": cfg.Source.TypeRename, + }, + }, + }, + }), + } + scrollTask.ID = scrollID + + bulkID := util.GetUUID() + bulkTask := &task.Task{ + ParentId: pids, + Runnable: true, + Cancellable: true, + Metadata: task.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "cluster_id": targetClusterID, + "pipeline_id": "bulk_indexing", + "index_name": indexName, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + }, + Status: task.StatusInit, + RetryTimes: taskItem.RetryTimes, + ConfigString: util.MustToJSON(migration_model.PipelineTaskConfig{ + Name: bulkID, + Logging: migration_model.PipelineTaskLoggingConfig{ + Enabled: true, + }, + Labels: util.MapStr{ + "parent_task_id": pids, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + AutoStart: true, + KeepRunning: false, + Processor: []util.MapStr{ + { + "bulk_indexing": util.MapStr{ + "detect_active_queue": false, + "bulk": util.MapStr{ + "batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB, + "batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs, + "invalid_queue": "bulk_indexing_400", + "compress": cfg.Target.Bulk.Compress, + }, + "max_worker_size": cfg.Target.Bulk.MaxWorkerSize, + "num_of_slices": cfg.Target.Bulk.SliceSize, + "idle_timeout_in_seconds": cfg.Target.Bulk.IdleTimeoutInSeconds, + "elasticsearch": targetClusterID, + "queues": util.MapStr{ + "type": "scroll_docs", + "migration_task_id": taskItem.ID, + }, + }, + }, + }, + }), + } + bulkTask.ID = bulkID + + err = orm.Create(nil, scrollTask) + if err != nil { + return fmt.Errorf("create scroll pipeline task error: %w", err) + } + err = orm.Create(nil, bulkTask) + if err != nil { + return fmt.Errorf("create bulk_indexing pipeline task error: %w", err) + } + + taskItem.Metadata.Labels["is_split"] = true + taskItem.Status = task.StatusReady + + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, fmt.Sprintf("task [%s] splitted", taskItem.ID)) + return nil +} + +func (p *processor) handleScheduleSubTask(taskItem *task.Task) error { + cfg := migration_model.IndexMigrationTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + return fmt.Errorf("got wrong config of task [%s]", taskItem.ID) + } + + scrollTask, bulkTask, err := p.getScrollBulkPipelineTasks(taskItem) + if err != nil { + log.Errorf("failed to get pipeline tasks, err: %v", err) + return nil + } + if scrollTask == nil || bulkTask == nil { + // ES might not synced yet + log.Warnf("task [%s] es_scroll or bulk_indexing pipeline task not found", taskItem.ID) + return nil + } + + taskItem.RetryTimes++ + + instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) + totalDocs := cfg.Source.DocCount + scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs) + + redoScroll := true + if cfg.Version >= migration_model.IndexMigrationV1 { + // skip scroll if possible + if scrolled && err == nil { + redoScroll = false + // reset queue consumer offset + // NOTE: we only trigger this flow when restart index_migration + // Restart bulk task will not reset queue offset + err = p.resetGatewayQueue(taskItem) + if err != nil { + log.Infof("task [%s] failed to reset gateway queue, redo scroll", taskItem.ID) + redoScroll = true + } + } + } + if !redoScroll { + migration_util.WriteLog(taskItem, nil, fmt.Sprintf("task [%s] skiping scroll", taskItem.ID)) + } else { + var executionConfig migration_model.ExecutionConfig + if cfg.Version >= migration_model.IndexMigrationV1 { + executionConfig = cfg.Execution + } else { + executionConfig, err = p.getExecutionConfigFromMajorTask(taskItem) + if err != nil { + return fmt.Errorf("get execution config from parent task failed, err: %v", err) + } + } + + // get a new instanceID + instance, err := p.scheduler.GetPreferenceInstance(executionConfig) + if err != nil { + if err == migration_model.ErrHitMax { + log.Debug("hit max tasks per instance, skip dispatch") + return nil + + } + return fmt.Errorf("get preference intance error: %w", err) + } + instanceID = instance.ID + + scrollTask.RetryTimes = taskItem.RetryTimes + // update instance info first + scrollTask.Metadata.Labels["execution_instance_id"] = instanceID + // try to clear disk queue before running es_scroll + p.cleanGatewayQueue(taskItem) + // update scroll task to ready + scrollTask.Status = task.StatusReady + err = orm.Update(nil, scrollTask) + if err != nil { + return fmt.Errorf("update scroll pipeline task error: %w", err) + } + } + + // update bulk task to init + bulkTask.RetryTimes = taskItem.RetryTimes + bulkTask.Metadata.Labels["execution_instance_id"] = instanceID + bulkTask.Status = task.StatusInit + err = orm.Update(nil, bulkTask) + if err != nil { + return fmt.Errorf("update bulk_indexing pipeline task error: %w", err) + } + + // update sub migration task status to running and save task log + taskItem.Metadata.Labels["execution_instance_id"] = instanceID + taskItem.Metadata.Labels["index_docs"] = 0 + taskItem.Metadata.Labels["scrolled_docs"] = 0 + taskItem.Status = task.StatusRunning + taskItem.StartTimeInMillis = time.Now().UnixMilli() + + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, fmt.Sprintf("task [%s] started", taskItem.ID)) + // update dispatcher state + p.scheduler.IncrInstanceJobs(instanceID) + return nil +} + +func (p *processor) handleRunningSubTask(taskItem *task.Task) error { + cfg := migration_model.IndexMigrationTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + return fmt.Errorf("got wrong config of task [%s]", taskItem.ID) + } + totalDocs := cfg.Source.DocCount + instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) + + if totalDocs == 0 { + taskItem.Status = task.StatusComplete + taskItem.Metadata.Labels["scrolled_docs"] = 0 + taskItem.Metadata.Labels["index_docs"] = 0 + now := time.Now() + taskItem.CompletedTime = &now + + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, "empty index migration completed") + p.cleanGatewayQueue(taskItem) + p.scheduler.DecrInstanceJobs(instanceID) + return nil + } + + scrollTask, bulkTask, err := p.getScrollBulkPipelineTasks(taskItem) + if err != nil { + log.Errorf("failed to get pipeline tasks, err: %v", err) + return nil + } + if scrollTask == nil || bulkTask == nil { + return errors.New("scroll/bulk pipeline task missing") + } + + scrolled, scrolledDocs, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs) + if !scrolled { + return nil + } + if err != nil { + now := time.Now() + taskItem.CompletedTime = &now + taskItem.Status = task.StatusError + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: false, + Error: err.Error(), + }, "index scroll failed") + p.scheduler.DecrInstanceJobs(instanceID) + // clean disk queue if scroll failed + p.cleanGatewayQueue(taskItem) + return nil + } + + if migration_util.GetMapIntValue(util.MapStr(taskItem.Metadata.Labels), "scrolled_docs") == 0 { + taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs + p.saveTaskAndWriteLog(taskItem, nil, "") + } + + bulked, successDocs, err := p.checkBulkPipelineTaskStatus(bulkTask, totalDocs) + if !bulked { + return nil + } + now := time.Now() + taskItem.CompletedTime = &now + taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs + taskItem.Metadata.Labels["index_docs"] = successDocs + if err != nil { + taskItem.Status = task.StatusError + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: false, + Error: err.Error(), + }, "index bulk failed") + } else { + taskItem.Status = task.StatusComplete + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, "index migration completed") + // clean disk queue if bulk completed + p.cleanGatewayQueue(taskItem) + } + p.scheduler.DecrInstanceJobs(instanceID) + return nil +} + +func (p *processor) checkScrollPipelineTaskStatus(scrollTask *task.Task, totalDocs int64) (scrolled bool, scrolledDocs int64, err error) { + if scrollTask.Status == task.StatusError { + return true, 0, errors.New("scroll pipeline failed") + } + // NOTE: old-version pipeline tasks has empty status + if scrollTask.Status == "" { + return true, 0, errors.New("task was started by an old-version console, need to manually restart it") + } + + // scroll not finished yet + if scrollTask.Status != task.StatusComplete { + return false, 0, nil + } + + var ( + scrollLabels = util.MapStr(scrollTask.Metadata.Labels) + ) + scrolledDocs = migration_util.GetMapIntValue(scrollLabels, "scrolled_docs") + + if scrolledDocs != totalDocs { + return true, scrolledDocs, fmt.Errorf("scroll complete but docs count unmatch: %d / %d", scrolledDocs, totalDocs) + } + + return true, scrolledDocs, nil +} + +func (p *processor) checkBulkPipelineTaskStatus(bulkTask *task.Task, totalDocs int64) (bulked bool, successDocs int64, err error) { + // NOTE: old-version pipeline tasks has empty status + if bulkTask.Status == "" { + return true, 0, errors.New("task was started by an old-version console, need to manually restart it") + } + + // start bulk as needed + if bulkTask.Status == task.StatusInit { + bulkTask.Status = task.StatusReady + p.saveTaskAndWriteLog(bulkTask, &task.TaskResult{ + Success: true, + }, fmt.Sprintf("scroll completed, bulk pipeline started")) + return false, 0, nil + } + + // bulk not finished yet + if bulkTask.Status != task.StatusComplete && bulkTask.Status != task.StatusError { + return false, 0, nil + } + + var ( + bulkLabels = util.MapStr(bulkTask.Metadata.Labels) + invalidDocs = migration_util.GetMapStringValue(bulkLabels, "invalid_docs") + invalidReasons = migration_util.GetMapStringValue(bulkLabels, "invalid_reasons") + failureDocs = migration_util.GetMapStringValue(bulkLabels, "failure_docs") + failureReasons = migration_util.GetMapStringValue(bulkLabels, "failure_reasons") + ) + successDocs = migration_util.GetMapIntValue(bulkLabels, "success_docs") + + if successDocs != totalDocs { + return true, successDocs, fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons) + } + + // successDocs matched but has errors + if bulkTask.Status == task.StatusError { + return true, successDocs, nil + } + + return true, successDocs, nil +} + +func (p *processor) getExecutionConfigFromMajorTask(taskItem *task.Task) (config migration_model.ExecutionConfig, err error) { + majorTaskID := taskItem.ParentId[0] + majorTask := task.Task{} + majorTask.ID = majorTaskID + _, err = orm.Get(&majorTask) + if err != nil { + log.Errorf("failed to get major task, err: %v", err) + return + } + cfg := migration_model.ClusterMigrationTaskConfig{} + err = migration_util.GetTaskConfig(&majorTask, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + return + } + config = cfg.Settings.Execution + return +} + +func (p *processor) getScrollBulkPipelineTasks(taskItem *task.Task) (scrollTask *task.Task, bulkTask *task.Task, err error) { + ptasks, err := p.getPipelineTasks(taskItem.ID) + if err != nil { + log.Errorf("failed to get pipeline tasks, err: %v", err) + return + } + for i, ptask := range ptasks { + if ptask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { + bulkTask = &ptasks[i] + } else if ptask.Metadata.Labels["pipeline_id"] == "es_scroll" { + scrollTask = &ptasks[i] + } + } + return +} + +func (p *processor) getPipelineTasks(subTaskID string) ([]task.Task, error) { + queryDsl := util.MapStr{ + "size": 2, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": subTaskID, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "pipeline", + }, + }, + }, + }, + }, + }, + } + return migration_util.GetTasks(p.Elasticsearch, p.IndexName, queryDsl) +} + +func (p *processor) cleanGatewayQueue(taskItem *task.Task) { + log.Debugf("cleaning gateway queue for task [%s]", taskItem.ID) + + var err error + instance := model.Instance{} + instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) + if instance.ID == "" { + log.Debugf("task [%s] not scheduled yet, skip cleaning queue", taskItem.ID) + return + } + _, err = orm.Get(&instance) + if err != nil { + log.Errorf("failed to get instance, err: %v", err) + return + } + + selector := util.MapStr{ + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, + } + err = instance.DeleteQueueBySelector(selector) + if err != nil { + log.Errorf("failed to delete queue, err: %v", err) + } +} + +func (p *processor) resetGatewayQueue(taskItem *task.Task) error { + log.Debugf("resetting gateway queue offset for task [%s]", taskItem.ID) + + var err error + instance := model.Instance{} + instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) + _, err = orm.Get(&instance) + if err != nil { + log.Errorf("failed to get instance, err: %v", err) + return err + } + + selector := util.MapStr{ + "labels": util.MapStr{ + "migration_task_id": taskItem.ID, + }, + } + err = instance.DeleteQueueConsumersBySelector(selector) + if err != nil { + log.Errorf("failed to delete queue consumers, err: %v", err) + return err + } + + return nil +} + +func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.TaskResult, message string) { + esClient := elastic.GetClient(p.Elasticsearch) + _, err := esClient.Index(p.IndexName, "", taskItem.ID, taskItem, "") + if err != nil { + log.Errorf("failed to update task, err: %v", err) + } + if message != "" { + migration_util.WriteLog(taskItem, taskResult, message) + } +} diff --git a/plugin/migration/model/scheduler.go b/plugin/migration/model/scheduler.go new file mode 100644 index 00000000..f7f62979 --- /dev/null +++ b/plugin/migration/model/scheduler.go @@ -0,0 +1,16 @@ +package model + +import ( + "errors" + + "infini.sh/console/model" +) + +type Scheduler interface { + GetPreferenceInstance(config ExecutionConfig) (instance *model.Instance, err error) + IncrInstanceJobs(instanceID string) + DecrInstanceJobs(instanceID string) + RefreshInstanceJobsFromES() error +} + +var ErrHitMax = errors.New("instance hit max job limit") diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index a65c7e7c..5b132293 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -7,15 +7,15 @@ package migration import ( "errors" "fmt" - "math" - "sync" "time" log "github.com/cihub/seelog" "infini.sh/console/model" + "infini.sh/console/plugin/migration/index_migration" migration_model "infini.sh/console/plugin/migration/model" "infini.sh/console/plugin/migration/pipeline_task" + "infini.sh/console/plugin/migration/scheduler" migration_util "infini.sh/console/plugin/migration/util" "infini.sh/framework/core/config" @@ -33,10 +33,9 @@ type DispatcherProcessor struct { id string config *DispatcherConfig - state map[string]DispatcherState - stateLock sync.Mutex - - pipelineTaskProcessor migration_model.Processor + scheduler migration_model.Scheduler + pipelineTaskProcessor migration_model.Processor + indexMigrationTaskProcessor migration_model.Processor } type DispatcherConfig struct { @@ -48,10 +47,6 @@ type DispatcherConfig struct { TaskBatchSize int `config:"task_batch_size"` } -type DispatcherState struct { - Total int -} - func init() { pipeline.RegisterProcessorPlugin("migration_dispatcher", newMigrationDispatcherProcessor) } @@ -91,13 +86,14 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro processor := DispatcherProcessor{ id: util.GetUUID(), config: &cfg, - state: map[string]DispatcherState{}, } - err := processor.refreshInstanceJobsFromES() + var err error + processor.scheduler, err = scheduler.NewScheduler(cfg.Elasticsearch, cfg.IndexName, cfg.CheckInstanceAvailable, cfg.MaxTasksPerInstance) if err != nil { return nil, err } processor.pipelineTaskProcessor = pipeline_task.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName) + processor.indexMigrationTaskProcessor = index_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName, processor.scheduler) return &processor, nil } @@ -112,11 +108,11 @@ func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { // mark index_migrations as pending_stop p.handleTasks(ctx, "cluster_migration", []string{task2.StatusPendingStop}, p.handlePendingStopMajorTask) // mark pipeline tasks as pending_stop - p.handleTasks(ctx, "index_migration", []string{task2.StatusPendingStop}, p.handlePendingStopSubTask) + p.handleTasks(ctx, "index_migration", []string{task2.StatusPendingStop}, p.indexMigrationTaskProcessor.Process) // check pipeline tasks status - p.handleTasks(ctx, "index_migration", []string{task2.StatusRunning}, p.handleRunningSubTask) + p.handleTasks(ctx, "index_migration", []string{task2.StatusRunning}, p.indexMigrationTaskProcessor.Process) // split & schedule pipline tasks - p.handleTasks(ctx, "index_migration", []string{task2.StatusReady}, p.handleReadySubTask) + p.handleTasks(ctx, "index_migration", []string{task2.StatusReady}, p.indexMigrationTaskProcessor.Process) // check index_migration tasks status p.handleTasks(ctx, "cluster_migration", []string{task2.StatusRunning}, p.handleRunningMajorTask) // split & schedule index_migration tasks @@ -220,13 +216,13 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error { } func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) error { - err := p.updatePendingChildTasksToPendingStop(taskItem, "index_migration") + err := migration_util.UpdatePendingChildTasksToPendingStop(taskItem, "index_migration") if err != nil { log.Errorf("failed to update sub task status, err: %v", err) return nil } - tasks, err := p.getPendingChildTasks(taskItem, "index_migration") + tasks, err := migration_util.GetPendingChildTasks(p.config.Elasticsearch, p.config.IndexName, taskItem.ID, "index_migration") if err != nil { log.Errorf("failed to get sub tasks, err: %v", err) return nil @@ -238,7 +234,7 @@ func (p *DispatcherProcessor) handlePendingStopMajorTask(taskItem *task2.Task) e p.sendMajorTaskNotification(taskItem) p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("task [%s] stopped", taskItem.ID)) // NOTE: we don't know how many running index_migration's stopped, so do a refresh from ES - p.refreshInstanceJobsFromES() + p.scheduler.RefreshInstanceJobsFromES() } return nil } @@ -280,486 +276,6 @@ func (p *DispatcherProcessor) handleRunningMajorTask(taskItem *task2.Task) error return nil } -func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error { - cfg := migration_model.IndexMigrationTaskConfig{} - err := migration_util.GetTaskConfig(taskItem, &cfg) - if err != nil { - log.Errorf("failed to get task config, err: %v", err) - return fmt.Errorf("got wrong config of task [%s]", taskItem.ID) - } - totalDocs := cfg.Source.DocCount - instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) - - if totalDocs == 0 { - taskItem.Status = task2.StatusComplete - taskItem.Metadata.Labels["scrolled_docs"] = 0 - taskItem.Metadata.Labels["index_docs"] = 0 - now := time.Now() - taskItem.CompletedTime = &now - - p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ - Success: true, - }, "empty index migration completed") - p.cleanGatewayQueue(taskItem) - p.decrInstanceJobs(instanceID) - return nil - } - - scrollTask, bulkTask, err := p.getScrollBulkPipelineTasks(taskItem) - if err != nil { - log.Errorf("failed to get pipeline tasks, err: %v", err) - return nil - } - if scrollTask == nil || bulkTask == nil { - return errors.New("scroll/bulk pipeline task missing") - } - - scrolled, scrolledDocs, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs) - if !scrolled { - return nil - } - if err != nil { - now := time.Now() - taskItem.CompletedTime = &now - taskItem.Status = task2.StatusError - p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ - Success: false, - Error: err.Error(), - }, "index scroll failed") - p.decrInstanceJobs(instanceID) - // clean disk queue if scroll failed - p.cleanGatewayQueue(taskItem) - return nil - } - - if migration_util.GetMapIntValue(util.MapStr(taskItem.Metadata.Labels), "scrolled_docs") == 0 { - taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs - p.saveTaskAndWriteLog(taskItem, nil, "") - } - - bulked, successDocs, err := p.checkBulkPipelineTaskStatus(bulkTask, totalDocs) - if !bulked { - return nil - } - now := time.Now() - taskItem.CompletedTime = &now - taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs - taskItem.Metadata.Labels["index_docs"] = successDocs - if err != nil { - taskItem.Status = task2.StatusError - p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ - Success: false, - Error: err.Error(), - }, "index bulk failed") - } else { - taskItem.Status = task2.StatusComplete - p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ - Success: true, - }, "index migration completed") - // clean disk queue if bulk completed - p.cleanGatewayQueue(taskItem) - } - p.decrInstanceJobs(instanceID) - return nil - -} - -func (p *DispatcherProcessor) checkScrollPipelineTaskStatus(scrollTask *task2.Task, totalDocs int64) (scrolled bool, scrolledDocs int64, err error) { - if scrollTask.Status == task2.StatusError { - return true, 0, errors.New("scroll pipeline failed") - } - // NOTE: old-version pipeline tasks has empty status - if scrollTask.Status == "" { - return true, 0, errors.New("task was started by an old-version console, need to manually restart it") - } - - // scroll not finished yet - if scrollTask.Status != task2.StatusComplete { - return false, 0, nil - } - - var ( - scrollLabels = util.MapStr(scrollTask.Metadata.Labels) - ) - scrolledDocs = migration_util.GetMapIntValue(scrollLabels, "scrolled_docs") - - if scrolledDocs != totalDocs { - return true, scrolledDocs, fmt.Errorf("scroll complete but docs count unmatch: %d / %d", scrolledDocs, totalDocs) - } - - return true, scrolledDocs, nil -} - -func (p *DispatcherProcessor) checkBulkPipelineTaskStatus(bulkTask *task2.Task, totalDocs int64) (bulked bool, successDocs int64, err error) { - // NOTE: old-version pipeline tasks has empty status - if bulkTask.Status == "" { - return true, 0, errors.New("task was started by an old-version console, need to manually restart it") - } - - // start bulk as needed - if bulkTask.Status == task2.StatusInit { - bulkTask.Status = task2.StatusReady - p.saveTaskAndWriteLog(bulkTask, &task2.TaskResult{ - Success: true, - }, fmt.Sprintf("scroll completed, bulk pipeline started")) - return false, 0, nil - } - - // bulk not finished yet - if bulkTask.Status != task2.StatusComplete && bulkTask.Status != task2.StatusError { - return false, 0, nil - } - - var ( - bulkLabels = util.MapStr(bulkTask.Metadata.Labels) - invalidDocs = migration_util.GetMapStringValue(bulkLabels, "invalid_docs") - invalidReasons = migration_util.GetMapStringValue(bulkLabels, "invalid_reasons") - failureDocs = migration_util.GetMapStringValue(bulkLabels, "failure_docs") - failureReasons = migration_util.GetMapStringValue(bulkLabels, "failure_reasons") - ) - successDocs = migration_util.GetMapIntValue(bulkLabels, "success_docs") - - if successDocs != totalDocs { - return true, successDocs, fmt.Errorf("bulk complete but docs count unmatch: %d / %d, invalid docs: [%s] (reasons: [%s]), failure docs: [%s] (reasons: [%s])", successDocs, totalDocs, invalidDocs, invalidReasons, failureDocs, failureReasons) - } - - // successDocs matched but has errors - if bulkTask.Status == task2.StatusError { - return true, successDocs, nil - } - - return true, successDocs, nil -} - -func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) error { - err := p.updatePendingChildTasksToPendingStop(taskItem, "pipeline") - if err != nil { - log.Errorf("failed to update sub task status, err: %v", err) - return nil - } - - tasks, err := p.getPendingChildTasks(taskItem, "pipeline") - if err != nil { - log.Errorf("failed to get sub tasks, err: %v", err) - return nil - } - - // all subtask stopped or error or complete - if len(tasks) == 0 { - taskItem.Status = task2.StatusStopped - p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("index migration task [%s] stopped", taskItem.ID)) - } - return nil -} - -func (p *DispatcherProcessor) handleReadySubTask(taskItem *task2.Task) error { - if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok { - return p.handleSplitSubTask(taskItem) - } - - return p.handleScheduleSubTask(taskItem) -} - -func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error { - //split task to scroll/bulk_indexing pipeline and then persistent - var pids []string - pids = append(pids, taskItem.ParentId...) - pids = append(pids, taskItem.ID) - scrollID := util.GetUUID() - cfg := migration_model.IndexMigrationTaskConfig{} - err := migration_util.GetTaskConfig(taskItem, &cfg) - if err != nil { - return fmt.Errorf("got wrong config [%v] with task [%s], err: %v", taskItem.ConfigString, taskItem.ID, err) - } - sourceClusterID := cfg.Source.ClusterId - targetClusterID := cfg.Target.ClusterId - docType := common.GetClusterDocType(targetClusterID) - if len(taskItem.ParentId) == 0 { - return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) - } - queryDsl := cfg.Source.QueryDSL - scrollQueryDsl := util.MustToJSON(util.MapStr{ - "query": queryDsl, - }) - indexName := cfg.Source.Indices - scrollTask := &task2.Task{ - ParentId: pids, - Runnable: true, - Cancellable: true, - Metadata: task2.Metadata{ - Type: "pipeline", - Labels: util.MapStr{ - "cluster_id": sourceClusterID, - "pipeline_id": "es_scroll", - "index_name": indexName, - "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], - }, - }, - Status: task2.StatusInit, - RetryTimes: taskItem.RetryTimes, - ConfigString: util.MustToJSON(migration_model.PipelineTaskConfig{ - Name: scrollID, - Logging: migration_model.PipelineTaskLoggingConfig{ - Enabled: true, - }, - Labels: util.MapStr{ - "parent_task_id": pids, - "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], - }, - AutoStart: true, - KeepRunning: false, - Processor: []util.MapStr{ - { - "es_scroll": util.MapStr{ - "remove_type": docType == "", - "slice_size": cfg.Source.SliceSize, - "batch_size": cfg.Source.BatchSize, - "indices": indexName, - "elasticsearch": sourceClusterID, - "queue": util.MapStr{ - "name": scrollID, - "labels": util.MapStr{ - "migration_task_id": taskItem.ID, - }, - }, - "partition_size": 1, - "scroll_time": cfg.Source.ScrollTime, - "query_dsl": scrollQueryDsl, - "index_rename": cfg.Source.IndexRename, - "type_rename": cfg.Source.TypeRename, - }, - }, - }, - }), - } - scrollTask.ID = scrollID - - bulkID := util.GetUUID() - bulkTask := &task2.Task{ - ParentId: pids, - Runnable: true, - Cancellable: true, - Metadata: task2.Metadata{ - Type: "pipeline", - Labels: util.MapStr{ - "cluster_id": targetClusterID, - "pipeline_id": "bulk_indexing", - "index_name": indexName, - "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], - }, - }, - Status: task2.StatusInit, - RetryTimes: taskItem.RetryTimes, - ConfigString: util.MustToJSON(migration_model.PipelineTaskConfig{ - Name: bulkID, - Logging: migration_model.PipelineTaskLoggingConfig{ - Enabled: true, - }, - Labels: util.MapStr{ - "parent_task_id": pids, - "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], - }, - AutoStart: true, - KeepRunning: false, - Processor: []util.MapStr{ - { - "bulk_indexing": util.MapStr{ - "detect_active_queue": false, - "bulk": util.MapStr{ - "batch_size_in_mb": cfg.Target.Bulk.BatchSizeInMB, - "batch_size_in_docs": cfg.Target.Bulk.BatchSizeInDocs, - "invalid_queue": "bulk_indexing_400", - "compress": cfg.Target.Bulk.Compress, - }, - "max_worker_size": cfg.Target.Bulk.MaxWorkerSize, - "num_of_slices": cfg.Target.Bulk.SliceSize, - "idle_timeout_in_seconds": cfg.Target.Bulk.IdleTimeoutInSeconds, - "elasticsearch": targetClusterID, - "queues": util.MapStr{ - "type": "scroll_docs", - "migration_task_id": taskItem.ID, - }, - }, - }, - }, - }), - } - bulkTask.ID = bulkID - - err = orm.Create(nil, scrollTask) - if err != nil { - return fmt.Errorf("create scroll pipeline task error: %w", err) - } - err = orm.Create(nil, bulkTask) - if err != nil { - return fmt.Errorf("create bulk_indexing pipeline task error: %w", err) - } - - taskItem.Metadata.Labels["is_split"] = true - taskItem.Status = task2.StatusReady - - p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ - Success: true, - }, fmt.Sprintf("task [%s] splitted", taskItem.ID)) - return nil -} - -func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error { - cfg := migration_model.IndexMigrationTaskConfig{} - err := migration_util.GetTaskConfig(taskItem, &cfg) - if err != nil { - log.Errorf("failed to get task config, err: %v", err) - return fmt.Errorf("got wrong config of task [%s]", taskItem.ID) - } - - scrollTask, bulkTask, err := p.getScrollBulkPipelineTasks(taskItem) - if err != nil { - log.Errorf("failed to get pipeline tasks, err: %v", err) - return nil - } - if scrollTask == nil || bulkTask == nil { - // ES might not synced yet - log.Warnf("task [%s] es_scroll or bulk_indexing pipeline task not found", taskItem.ID) - return nil - } - - taskItem.RetryTimes++ - - instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) - totalDocs := cfg.Source.DocCount - scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs) - - redoScroll := true - if cfg.Version >= migration_model.IndexMigrationV1 { - // skip scroll if possible - if scrolled && err == nil { - redoScroll = false - // reset queue consumer offset - // NOTE: we only trigger this flow when restart index_migration - // Restart bulk task will not reset queue offset - err = p.resetGatewayQueue(taskItem) - if err != nil { - log.Infof("task [%s] failed to reset gateway queue, redo scroll", taskItem.ID) - redoScroll = true - } - } - } - - if !redoScroll { - migration_util.WriteLog(taskItem, nil, fmt.Sprintf("task [%s] skiping scroll", taskItem.ID)) - } else { - var executionConfig migration_model.ExecutionConfig - if cfg.Version >= migration_model.IndexMigrationV1 { - executionConfig = cfg.Execution - } else { - executionConfig, err = p.getExecutionConfigFromMajorTask(taskItem) - if err != nil { - return fmt.Errorf("get execution config from parent task failed, err: %v", err) - } - } - - // get a new instanceID - instance, err := p.getPreferenceInstance(executionConfig) - if err != nil { - return fmt.Errorf("get preference intance error: %w", err) - } - if p.getInstanceState(instance.ID).Total >= p.config.MaxTasksPerInstance { - log.Debugf("hit max tasks per instance with %d, skip dispatch", p.config.MaxTasksPerInstance) - return nil - } - instanceID = instance.ID - - scrollTask.RetryTimes = taskItem.RetryTimes - // update instance info first - scrollTask.Metadata.Labels["execution_instance_id"] = instanceID - // try to clear disk queue before running es_scroll - p.cleanGatewayQueue(taskItem) - // update scroll task to ready - scrollTask.Status = task2.StatusReady - err = orm.Update(nil, scrollTask) - if err != nil { - return fmt.Errorf("update scroll pipeline task error: %w", err) - } - } - - // update bulk task to init - bulkTask.RetryTimes = taskItem.RetryTimes - bulkTask.Metadata.Labels["execution_instance_id"] = instanceID - bulkTask.Status = task2.StatusInit - err = orm.Update(nil, bulkTask) - if err != nil { - return fmt.Errorf("update bulk_indexing pipeline task error: %w", err) - } - - // update sub migration task status to running and save task log - taskItem.Metadata.Labels["execution_instance_id"] = instanceID - taskItem.Metadata.Labels["index_docs"] = 0 - taskItem.Metadata.Labels["scrolled_docs"] = 0 - taskItem.Status = task2.StatusRunning - taskItem.StartTimeInMillis = time.Now().UnixMilli() - - p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{ - Success: true, - }, fmt.Sprintf("task [%s] started", taskItem.ID)) - // update dispatcher state - p.incrInstanceJobs(instanceID) - return nil -} - -func (p *DispatcherProcessor) getExecutionConfigFromMajorTask(taskItem *task2.Task) (config migration_model.ExecutionConfig, err error) { - majorTaskID := taskItem.ParentId[0] - majorTask := task2.Task{} - majorTask.ID = majorTaskID - _, err = orm.Get(&majorTask) - if err != nil { - log.Errorf("failed to get major task, err: %v", err) - return - } - cfg := migration_model.ClusterMigrationTaskConfig{} - err = migration_util.GetTaskConfig(&majorTask, &cfg) - if err != nil { - log.Errorf("failed to get task config, err: %v", err) - return - } - config = cfg.Settings.Execution - return -} - -func (p *DispatcherProcessor) getPreferenceInstance(config migration_model.ExecutionConfig) (instance model.Instance, err error) { - var ( - total = math.MaxInt - tempInst = model.Instance{} - ) - for _, node := range config.Nodes.Permit { - instanceTotal := p.getInstanceState(node.ID).Total - if instanceTotal < total { - if p.config.CheckInstanceAvailable { - tempInst.ID = node.ID - _, err = orm.Get(&tempInst) - if err != nil { - log.Errorf("failed to get instance, err: %v", err) - continue - } - err = tempInst.TryConnectWithTimeout(time.Second) - if err != nil { - log.Debugf("instance [%s] is not available, caused by: %v", tempInst.ID, err) - continue - } - } - instance.ID = node.ID - total = instanceTotal - } - } - if instance.ID == "" && p.config.CheckInstanceAvailable { - return instance, fmt.Errorf("no available instance") - } - if instance.ID == tempInst.ID { - return tempInst, nil - } - _, err = orm.Get(&instance) - return -} - func (p *DispatcherProcessor) getMigrationTasks(taskType string, taskStatus []string, size int) ([]task2.Task, error) { queryDsl := util.MapStr{ "size": size, @@ -789,7 +305,7 @@ func (p *DispatcherProcessor) getMigrationTasks(taskType string, taskStatus []st }, }, } - return p.getTasks(queryDsl) + return migration_util.GetTasks(p.config.Elasticsearch, p.config.IndexName, queryDsl) } func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string) { @@ -1009,61 +525,6 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro return nil } -func (p *DispatcherProcessor) getPipelineTasks(subTaskID string) ([]task2.Task, error) { - queryDsl := util.MapStr{ - "size": 2, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": subTaskID, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "pipeline", - }, - }, - }, - }, - }, - }, - } - return p.getTasks(queryDsl) -} - -func (p *DispatcherProcessor) getTasks(query interface{}) ([]task2.Task, error) { - esClient := elastic.GetClient(p.config.Elasticsearch) - res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) - if err != nil { - log.Errorf("query tasks from es failed, err: %v", err) - return nil, err - } - if res.GetTotal() == 0 { - return nil, nil - } - var migrationTasks []task2.Task - for _, hit := range res.Hits.Hits { - buf, err := util.ToJSONBytes(hit.Source) - if err != nil { - log.Errorf("marshal task json failed, err: %v", err) - return nil, err - } - tk := task2.Task{} - err = util.FromJSONBytes(buf, &tk) - if err != nil { - log.Errorf("unmarshal task json failed, err: %v", err) - return nil, err - } - migrationTasks = append(migrationTasks, tk) - } - return migrationTasks, nil -} - func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskState migration_model.MajorTaskState, err error) { query := util.MapStr{ "size": 0, @@ -1130,57 +591,6 @@ func (p *DispatcherProcessor) getMajorTaskState(majorTask *task2.Task) (taskStat return taskState, nil } -func (p *DispatcherProcessor) getInstanceTaskState() (map[string]DispatcherState, error) { - query := util.MapStr{ - "size": 0, - "aggs": util.MapStr{ - "grp": util.MapStr{ - "terms": util.MapStr{ - "field": "metadata.labels.execution_instance_id", - "size": 1000, - }, - }, - }, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "index_migration", - }, - }, - }, - { - "term": util.MapStr{ - "status": util.MapStr{ - "value": task2.StatusRunning, - }, - }, - }, - }, - }, - }, - } - esClient := elastic.GetClient(p.config.Elasticsearch) - res, err := esClient.SearchWithRawQueryDSL(p.config.IndexName, util.MustToJSONBytes(query)) - if err != nil { - log.Errorf("search es failed, err: %v", err) - return nil, err - } - state := map[string]DispatcherState{} - for _, bk := range res.Aggregations["grp"].Buckets { - if key, ok := bk["key"].(string); ok { - if v, ok := bk["doc_count"].(float64); ok { - state[key] = DispatcherState{ - Total: int(v), - } - } - } - } - return state, nil -} - func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) { config := migration_model.ClusterMigrationTaskConfig{} err := migration_util.GetTaskConfig(taskItem, &config) @@ -1226,184 +636,3 @@ func (p *DispatcherProcessor) sendMajorTaskNotification(taskItem *task2.Task) { } return } - -// update status of subtask to pending stop -func (p *DispatcherProcessor) updatePendingChildTasksToPendingStop(taskItem *task2.Task, taskType string) error { - query := util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskItem.ID, - }, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task2.StatusRunning, task2.StatusReady}, - }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": taskType, - }, - }, - }, - }, - }, - } - queryDsl := util.MapStr{ - "query": query, - "script": util.MapStr{ - "source": fmt.Sprintf("ctx._source['status'] = '%s'", task2.StatusPendingStop), - }, - } - - err := orm.UpdateBy(taskItem, util.MustToJSONBytes(queryDsl)) - if err != nil { - return err - } - return nil -} - -func (p *DispatcherProcessor) getPendingChildTasks(taskItem *task2.Task, taskType string) ([]task2.Task, error) { - - //check whether all pipeline task is stopped or not, then update task status - q := util.MapStr{ - "size": 200, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskItem.ID, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.type": taskType, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task2.StatusRunning, task2.StatusPendingStop, task2.StatusReady}, - }, - }, - }, - }, - }, - } - return p.getTasks(q) -} - -func (p *DispatcherProcessor) getScrollBulkPipelineTasks(taskItem *task2.Task) (scrollTask *task2.Task, bulkTask *task2.Task, err error) { - ptasks, err := p.getPipelineTasks(taskItem.ID) - if err != nil { - log.Errorf("failed to get pipeline tasks, err: %v", err) - return - } - for i, ptask := range ptasks { - if ptask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { - bulkTask = &ptasks[i] - } else if ptask.Metadata.Labels["pipeline_id"] == "es_scroll" { - scrollTask = &ptasks[i] - } - } - return -} - -func (p *DispatcherProcessor) decrInstanceJobs(instanceID string) { - p.stateLock.Lock() - defer p.stateLock.Unlock() - if st, ok := p.state[instanceID]; ok { - st.Total -= 1 - p.state[instanceID] = st - } -} - -func (p *DispatcherProcessor) incrInstanceJobs(instanceID string) { - p.stateLock.Lock() - defer p.stateLock.Unlock() - instanceState := p.state[instanceID] - instanceState.Total = instanceState.Total + 1 - p.state[instanceID] = instanceState -} - -func (p *DispatcherProcessor) getInstanceState(instanceID string) DispatcherState { - p.stateLock.Lock() - defer p.stateLock.Unlock() - - return p.state[instanceID] -} - -func (p *DispatcherProcessor) refreshInstanceJobsFromES() error { - log.Debug("refreshing instance state from ES") - p.stateLock.Lock() - defer p.stateLock.Unlock() - - state, err := p.getInstanceTaskState() - if err != nil { - log.Errorf("failed to get instance task state, err: %v", err) - return err - } - p.state = state - - return nil -} - -func (p *DispatcherProcessor) cleanGatewayQueue(taskItem *task2.Task) { - log.Debugf("cleaning gateway queue for task [%s]", taskItem.ID) - - var err error - instance := model.Instance{} - instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) - if instance.ID == "" { - log.Debugf("task [%s] not scheduled yet, skip cleaning queue", taskItem.ID) - return - } - _, err = orm.Get(&instance) - if err != nil { - log.Errorf("failed to get instance, err: %v", err) - return - } - - selector := util.MapStr{ - "labels": util.MapStr{ - "migration_task_id": taskItem.ID, - }, - } - err = instance.DeleteQueueBySelector(selector) - if err != nil { - log.Errorf("failed to delete queue, err: %v", err) - } -} - -func (p *DispatcherProcessor) resetGatewayQueue(taskItem *task2.Task) error { - log.Debugf("resetting gateway queue offset for task [%s]", taskItem.ID) - - var err error - instance := model.Instance{} - instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) - _, err = orm.Get(&instance) - if err != nil { - log.Errorf("failed to get instance, err: %v", err) - return err - } - - selector := util.MapStr{ - "labels": util.MapStr{ - "migration_task_id": taskItem.ID, - }, - } - err = instance.DeleteQueueConsumersBySelector(selector) - if err != nil { - log.Errorf("failed to delete queue consumers, err: %v", err) - return err - } - - return nil -} diff --git a/plugin/migration/scheduler/scheduler.go b/plugin/migration/scheduler/scheduler.go new file mode 100644 index 00000000..1b645c61 --- /dev/null +++ b/plugin/migration/scheduler/scheduler.go @@ -0,0 +1,182 @@ +package scheduler + +import ( + "fmt" + "math" + "sync" + "time" + + log "github.com/cihub/seelog" + + "infini.sh/console/model" + migration_model "infini.sh/console/plugin/migration/model" + + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" +) + +type scheduler struct { + Elasticsearch string + IndexName string + CheckInstanceAvailable bool + MaxTasksPerInstance int + + state map[string]DispatcherState + stateLock sync.Mutex +} + +type DispatcherState struct { + Total int +} + +func NewScheduler(elasticsearch, indexName string, checkInstanceAvailable bool, maxTasksPerInstance int) (migration_model.Scheduler, error) { + scheduler := &scheduler{ + Elasticsearch: elasticsearch, + IndexName: indexName, + CheckInstanceAvailable: checkInstanceAvailable, + MaxTasksPerInstance: maxTasksPerInstance, + state: map[string]DispatcherState{}, + } + err := scheduler.RefreshInstanceJobsFromES() + if err != nil { + return nil, err + } + return scheduler, nil +} + +func (p *scheduler) GetPreferenceInstance(config migration_model.ExecutionConfig) (*model.Instance, error) { + var ( + err error + minID string + minTotal = math.MaxInt + ) + + for _, node := range config.Nodes.Permit { + instanceTotal := p.getInstanceState(node.ID).Total + if instanceTotal < minTotal { + if p.CheckInstanceAvailable { + tempInst := model.Instance{} + tempInst.ID = node.ID + _, err = orm.Get(&tempInst) + if err != nil { + log.Errorf("failed to get instance, err: %v", err) + continue + } + err = tempInst.TryConnectWithTimeout(time.Second) + if err != nil { + log.Debugf("instance [%s] is not available, caused by: %v", tempInst.ID, err) + continue + } + } + minID = node.ID + minTotal = instanceTotal + } + } + if minID == "" { + return nil, fmt.Errorf("no available instance") + } + + instance := model.Instance{} + instance.ID = minID + + _, err = orm.Get(&instance) + if err != nil { + return nil, err + } + if p.getInstanceState(minID).Total > p.MaxTasksPerInstance { + return nil, migration_model.ErrHitMax + } + return &instance, nil +} + +func (p *scheduler) RefreshInstanceJobsFromES() error { + log.Debug("refreshing instance state from ES") + p.stateLock.Lock() + defer p.stateLock.Unlock() + + state, err := p.getInstanceTaskState() + if err != nil { + log.Errorf("failed to get instance task state, err: %v", err) + return err + } + p.state = state + + return nil +} + +func (p *scheduler) DecrInstanceJobs(instanceID string) { + p.stateLock.Lock() + defer p.stateLock.Unlock() + if st, ok := p.state[instanceID]; ok { + st.Total -= 1 + p.state[instanceID] = st + } +} + +func (p *scheduler) IncrInstanceJobs(instanceID string) { + p.stateLock.Lock() + defer p.stateLock.Unlock() + instanceState := p.state[instanceID] + instanceState.Total = instanceState.Total + 1 + p.state[instanceID] = instanceState +} + +func (p *scheduler) getInstanceState(instanceID string) DispatcherState { + p.stateLock.Lock() + defer p.stateLock.Unlock() + + return p.state[instanceID] +} + +func (p *scheduler) getInstanceTaskState() (map[string]DispatcherState, error) { + query := util.MapStr{ + "size": 0, + "aggs": util.MapStr{ + "grp": util.MapStr{ + "terms": util.MapStr{ + "field": "metadata.labels.execution_instance_id", + "size": 1000, + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_migration", + }, + }, + }, + { + "term": util.MapStr{ + "status": util.MapStr{ + "value": task.StatusRunning, + }, + }, + }, + }, + }, + }, + } + esClient := elastic.GetClient(p.Elasticsearch) + res, err := esClient.SearchWithRawQueryDSL(p.IndexName, util.MustToJSONBytes(query)) + if err != nil { + log.Errorf("search es failed, err: %v", err) + return nil, err + } + state := map[string]DispatcherState{} + for _, bk := range res.Aggregations["grp"].Buckets { + if key, ok := bk["key"].(string); ok { + if v, ok := bk["doc_count"].(float64); ok { + state[key] = DispatcherState{ + Total: int(v), + } + } + } + } + return state, nil +} diff --git a/plugin/migration/util/orm.go b/plugin/migration/util/orm.go new file mode 100644 index 00000000..701a80aa --- /dev/null +++ b/plugin/migration/util/orm.go @@ -0,0 +1,112 @@ +package util + +import ( + "fmt" + + log "github.com/cihub/seelog" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" +) + +func GetPendingChildTasks(elasticsearch, indexName string, taskID string, taskType string) ([]task.Task, error) { + + //check whether all pipeline task is stopped or not, then update task status + q := util.MapStr{ + "size": 200, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskID, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.type": taskType, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task.StatusRunning, task.StatusPendingStop, task.StatusReady}, + }, + }, + }, + }, + }, + } + return GetTasks(elasticsearch, indexName, q) +} + +func GetTasks(elasticsearch, indexName string, query interface{}) ([]task.Task, error) { + esClient := elastic.GetClient(elasticsearch) + res, err := esClient.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(query)) + if err != nil { + log.Errorf("query tasks from es failed, err: %v", err) + return nil, err + } + if res.GetTotal() == 0 { + return nil, nil + } + var tasks []task.Task + for _, hit := range res.Hits.Hits { + buf, err := util.ToJSONBytes(hit.Source) + if err != nil { + log.Errorf("marshal task json failed, err: %v", err) + return nil, err + } + tk := task.Task{} + err = util.FromJSONBytes(buf, &tk) + if err != nil { + log.Errorf("unmarshal task json failed, err: %v", err) + return nil, err + } + tasks = append(tasks, tk) + } + return tasks, nil +} + +// update status of subtask to pending stop +func UpdatePendingChildTasksToPendingStop(taskItem *task.Task, taskType string) error { + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskItem.ID, + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task.StatusRunning, task.StatusReady}, + }, + }, + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": taskType, + }, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task.StatusPendingStop), + }, + } + + err := orm.UpdateBy(taskItem, util.MustToJSONBytes(queryDsl)) + if err != nil { + return err + } + return nil +}