diff --git a/model/notification.go b/model/notification.go index 74c759d3..bf0c9f97 100644 --- a/model/notification.go +++ b/model/notification.go @@ -14,9 +14,10 @@ const ( type MessageType string const ( - MessageTypeNews MessageType = "news" - MessageTypeAlerting MessageType = "alerting" - MessageTypeMigration MessageType = "migration" + MessageTypeNews MessageType = "news" + MessageTypeAlerting MessageType = "alerting" + MessageTypeMigration MessageType = "migration" + MessageTypeComparison MessageType = "comparison" ) type NotificationStatus string diff --git a/plugin/migration/api.go b/plugin/migration/api.go index 70202bc0..ae3a5c9f 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -6,10 +6,8 @@ package migration import ( "context" - "errors" "fmt" "net/http" - "strconv" "strings" "time" @@ -28,32 +26,35 @@ import ( "infini.sh/framework/core/orm" task2 "infini.sh/framework/core/task" "infini.sh/framework/core/util" - elastic2 "infini.sh/framework/modules/elastic" ) func InitAPI() { handler := APIHandler{} - api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequirePermission(handler.searchDataMigrationTask, enum.PermissionTaskRead)) + api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequirePermission(handler.searchTask("cluster_migration"), enum.PermissionTaskRead)) api.HandleAPIMethod(api.POST, "/migration/data", handler.RequirePermission(handler.createDataMigrationTask, enum.PermissionTaskWrite)) + api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionTaskWrite)) api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration)) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionTaskWrite)) + api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionTaskWrite)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionTaskRead)) + api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionTaskRead)) + api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequirePermission(handler.updateDataMigrationTaskStatus, enum.PermissionTaskRead)) + + api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionTaskRead)) + api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionTaskWrite)) + api.HandleAPIMethod(api.DELETE, "/comparison/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionTaskWrite)) + api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionTaskWrite)) + api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionTaskWrite)) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_count", handler.countDocuments) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_refresh", handler.refreshIndex) - api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startDataMigration, enum.PermissionTaskWrite)) - api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopDataMigrationTask, enum.PermissionTaskWrite)) - //api.HandleAPIMethod(api.GET, "/migration/data/:task_id", handler.getMigrationTask) - api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionTaskRead)) - api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionTaskRead)) - api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequirePermission(handler.updateDataMigrationTaskStatus, enum.PermissionTaskRead)) api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_init", handler.initIndex) - api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.deleteDataMigrationTask) } type APIHandler struct { api.Handler - bulkResultIndexName string } func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { @@ -113,96 +114,6 @@ func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Re h.WriteCreatedOKJSON(w, t.ID) } -func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - var ( - keyword = h.GetParameterOrDefault(req, "keyword", "") - strSize = h.GetParameterOrDefault(req, "size", "20") - strFrom = h.GetParameterOrDefault(req, "from", "0") - mustQ []interface{} - ) - mustQ = append(mustQ, util.MapStr{ - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "cluster_migration", - }, - }, - }) - - if keyword != "" { - mustQ = append(mustQ, util.MapStr{ - "query_string": util.MapStr{ - "default_field": "*", - "query": keyword, - }, - }) - } - size, _ := strconv.Atoi(strSize) - if size <= 0 { - size = 20 - } - from, _ := strconv.Atoi(strFrom) - if from < 0 { - from = 0 - } - - queryDSL := util.MapStr{ - "sort": []util.MapStr{ - { - "created": util.MapStr{ - "order": "desc", - }, - }, - }, - "size": size, - "from": from, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": mustQ, - }, - }, - } - - q := orm.Query{} - q.RawQuery = util.MustToJSONBytes(queryDSL) - - err, res := orm.Search(&task2.Task{}, &q) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - searchRes := &elastic.SearchResponse{} - err = util.FromJSONBytes(res.Raw, searchRes) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - for _, hit := range searchRes.Hits.Hits { - sourceM := util.MapStr(hit.Source) - buf := util.MustToJSONBytes(sourceM["config"]) - dataConfig := migration_model.ClusterMigrationTaskConfig{} - err = util.FromJSONBytes(buf, &dataConfig) - if err != nil { - log.Error(err) - continue - } - //var targetTotalDocs int64 - if hit.Source["status"] == task2.StatusRunning { - ts, err := getMajorTaskStatsFromInstances(hit.ID) - if err != nil { - log.Warnf("fetch progress info of task error: %v", err) - continue - } - sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs) - } - - } - - h.WriteJSON(w, searchRes, http.StatusOK) -} - func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( index = ps.MustGetParameter("index") @@ -227,92 +138,6 @@ func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Requ h.WriteJSON(w, partitions, http.StatusOK) } -func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - taskID := ps.MustGetParameter("task_id") - obj := task2.Task{} - - obj.ID = taskID - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) - return - } - if obj.Metadata.Type != "pipeline" && obj.Status == task2.StatusComplete { - h.WriteError(w, fmt.Sprintf("[%s] task [%s] completed, can't start anymore", obj.Metadata.Type, taskID), http.StatusInternalServerError) - return - } - obj.Status = task2.StatusReady - - err = orm.Update(nil, &obj) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - - migration_util.WriteLog(&obj, &task2.TaskResult{ - Success: true, - }, "task status manually set to ready") - - // update status of parent task to running - for _, parentTaskID := range obj.ParentId { - parentTask := task2.Task{} - parentTask.ID = parentTaskID - exists, err := orm.Get(&parentTask) - if !exists || err != nil { - h.WriteError(w, fmt.Sprintf("parent task [%s] not found", parentTaskID), http.StatusInternalServerError) - return - } - parentTask.Status = task2.StatusRunning - err = orm.Update(nil, &parentTask) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - migration_util.WriteLog(&parentTask, nil, fmt.Sprintf("child [%s] task [%s] manually started", obj.Metadata.Type, taskID)) - } - - h.WriteJSON(w, util.MapStr{ - "success": true, - }, 200) -} - -func (h *APIHandler) stopDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("task_id") - obj := task2.Task{} - obj.ID = id - - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - if task2.IsEnded(obj.Status) { - h.WriteJSON(w, util.MapStr{ - "success": true, - }, 200) - return - } - obj.Status = task2.StatusPendingStop - err = orm.Update(nil, &obj) - if err != nil { - log.Error(err) - h.WriteError(w, err.Error(), http.StatusInternalServerError) - return - } - migration_util.WriteLog(&obj, &task2.TaskResult{ - Success: true, - }, "task status manually set to pending stop") - - h.WriteJSON(w, util.MapStr{ - "success": true, - }, 200) -} - func getIndexRefreshInterval(indexNames []string, targetESClient elastic.API) (map[string]string, error) { const step = 50 var ( @@ -806,28 +631,6 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt h.WriteJSON(w, taskInfo, http.StatusOK) } -func (h *APIHandler) getMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("task_id") - - obj := task2.Task{} - obj.ID = id - - exists, err := orm.Get(&obj) - if !exists || err != nil { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "found": false, - }, http.StatusNotFound) - return - } - - h.WriteJSON(w, util.MapStr{ - "found": true, - "_id": id, - "_source": obj, - }, 200) -} - func (h *APIHandler) countDocuments(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( index = ps.MustGetParameter("index") @@ -1008,65 +811,6 @@ func (h *APIHandler) initIndex(w http.ResponseWriter, req *http.Request, ps http }, http.StatusOK) } -func (h *APIHandler) deleteDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { - id := ps.MustGetParameter("task_id") - obj := task2.Task{} - obj.ID = id - - _, err := orm.Get(&obj) - if err != nil { - if errors.Is(err, elastic2.ErrNotFound) { - h.WriteJSON(w, util.MapStr{ - "_id": id, - "result": "not_found", - }, http.StatusNotFound) - return - } - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - if util.StringInArray([]string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, obj.Status) { - h.WriteError(w, fmt.Sprintf("can not delete task [%s] with status [%s]", obj.ID, obj.Status), http.StatusInternalServerError) - return - } - - q := util.MapStr{ - "query": util.MapStr{ - "bool": util.MapStr{ - "minimum_should_match": 1, - "should": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": id, - }, - }, - }, - { - "term": util.MapStr{ - "id": util.MapStr{ - "value": id, - }, - }, - }, - }, - }, - }, - } - err = orm.DeleteBy(&obj, util.MustToJSONBytes(q)) - if err != nil { - h.WriteError(w, err.Error(), http.StatusInternalServerError) - log.Error(err) - return - } - - h.WriteJSON(w, util.MapStr{ - "_id": obj.ID, - "result": "deleted", - }, 200) -} - func (h *APIHandler) refreshIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { var ( index = ps.MustGetParameter("index") diff --git a/plugin/migration/cluster_comparison/cluster_comparison.go b/plugin/migration/cluster_comparison/cluster_comparison.go new file mode 100644 index 00000000..908622ef --- /dev/null +++ b/plugin/migration/cluster_comparison/cluster_comparison.go @@ -0,0 +1,410 @@ +package cluster_comparison + +import ( + "fmt" + "time" + + log "github.com/cihub/seelog" + "infini.sh/console/model" + migration_model "infini.sh/console/plugin/migration/model" + migration_util "infini.sh/console/plugin/migration/util" + + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" +) + +type processor struct { + Elasticsearch string + IndexName string + scheduler migration_model.Scheduler +} + +func NewProcessor(elasticsearch, indexName string, scheduler migration_model.Scheduler) migration_model.Processor { + return &processor{ + Elasticsearch: elasticsearch, + IndexName: indexName, + scheduler: scheduler, + } +} + +func (p *processor) Process(t *task.Task) (err error) { + switch t.Status { + case task.StatusReady: + // split & schedule index_comparison tasks + err = p.handleReadyMajorTask(t) + case task.StatusRunning: + // check index_comparison tasks status + err = p.handleRunningMajorTask(t) + case task.StatusPendingStop: + // mark index_comparison as pending_stop + err = p.handlePendingStopMajorTask(t) + } + return err +} + +func (p *processor) handleReadyMajorTask(taskItem *task.Task) error { + if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok { + return p.splitMajorTask(taskItem) + } + //update status of subtask to ready + query := util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskItem.ID, + }, + }, + }, + { + "terms": util.MapStr{ + "status": []string{task.StatusError, task.StatusStopped}, + }, + }, + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_comparison", + }, + }, + }, + }, + }, + } + queryDsl := util.MapStr{ + "query": query, + "script": util.MapStr{ + "source": fmt.Sprintf("ctx._source['status'] = '%s'", task.StatusReady), + }, + } + + esClient := elastic.GetClient(p.Elasticsearch) + _, err := esClient.UpdateByQuery(p.IndexName, util.MustToJSONBytes(queryDsl)) + if err != nil { + log.Errorf("failed to update sub task status, err: %v", err) + return nil + } + taskItem.RetryTimes++ + taskItem.Status = task.StatusRunning + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, fmt.Sprintf("cluster comparison task [%s] started", taskItem.ID)) + p.sendMajorTaskNotification(taskItem) + return nil +} + +func (p *processor) splitMajorTask(taskItem *task.Task) error { + clusterComparisonTask := migration_model.ClusterComparisonTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &clusterComparisonTask) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + return err + } + esSourceClient := elastic.GetClient(clusterComparisonTask.Cluster.Source.Id) + + for _, index := range clusterComparisonTask.Indices { + sourceDump := migration_model.IndexComparisonDumpConfig{ + ClusterId: clusterComparisonTask.Cluster.Source.Id, + Indices: index.Source.Name, + SliceSize: clusterComparisonTask.Settings.Dump.SliceSize, + BatchSize: clusterComparisonTask.Settings.Dump.Docs, + PartitionSize: clusterComparisonTask.Settings.Dump.PartitionSize, + ScrollTime: clusterComparisonTask.Settings.Dump.Timeout, + } + // TODO: dump_hash can only handle 1G file + if sourceDump.PartitionSize <= 0 { + sourceDump.PartitionSize = 1 + } + + if v, ok := index.RawFilter.(string); ok { + sourceDump.QueryString = v + } else { + var must []interface{} + if index.RawFilter != nil { + must = append(must, index.RawFilter) + } + if len(must) > 0 { + sourceDump.QueryDSL = util.MapStr{ + "bool": util.MapStr{ + "must": must, + }, + } + } + } + + if index.Partition != nil { + partitionQ := &elastic.PartitionQuery{ + IndexName: index.Source.Name, + FieldName: index.Partition.FieldName, + FieldType: index.Partition.FieldType, + Step: index.Partition.Step, + } + if sourceDump.QueryDSL != nil { + partitionQ.Filter = sourceDump.QueryDSL + } + partitions, err := elastic.GetPartitions(partitionQ, esSourceClient) + if err != nil { + return err + } + if partitions == nil || len(partitions) == 0 { + return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter)) + } + for _, partition := range partitions { + //skip empty partition + if partition.Docs <= 0 { + continue + } + partitionSourceDump := sourceDump + partitionSourceDump.QueryDSL = partition.Filter + partitionSourceDump.DocCount = partition.Docs + partitionSourceDump.QueryString = "" + + // TODO: if there's a partition missing from source but present in target + // ideally we can capture it in docs count, but this won't always work + partitionTargetDump := partitionSourceDump + partitionTargetDump.Indices = index.Target.Name + + partitionComparisonTask := task.Task{ + ParentId: []string{taskItem.ID}, + Cancellable: false, + Runnable: true, + Status: task.StatusReady, + Metadata: task.Metadata{ + Type: "index_comparison", + Labels: util.MapStr{ + "business_id": "index_comparison", + "source_cluster_id": clusterComparisonTask.Cluster.Source.Id, + "target_cluster_id": clusterComparisonTask.Cluster.Target.Id, + "index_name": index.Source.Name, + "unique_index_name": index.Source.GetUniqueIndexName(), + }, + }, + ConfigString: util.MustToJSON(migration_model.IndexComparisonTaskConfig{ + Source: partitionSourceDump, + Target: partitionTargetDump, + Execution: clusterComparisonTask.Settings.Execution, + }), + } + partitionComparisonTask.ID = util.GetUUID() + err = orm.Create(nil, &partitionComparisonTask) + if err != nil { + return fmt.Errorf("store index comparison task (partition) error: %w", err) + } + + } + } else { + sourceDump.DocCount = index.Source.Docs + targetDump := sourceDump + targetDump.Indices = index.Target.Name + + indexComparisonTask := task.Task{ + ParentId: []string{taskItem.ID}, + Cancellable: true, + Runnable: false, + Status: task.StatusReady, + Metadata: task.Metadata{ + Type: "index_comparison", + Labels: util.MapStr{ + "business_id": "index_comparison", + "source_cluster_id": clusterComparisonTask.Cluster.Source.Id, + "target_cluster_id": clusterComparisonTask.Cluster.Target.Id, + "partition_count": 1, + "index_name": index.Source.Name, + "unique_index_name": index.Source.GetUniqueIndexName(), + }, + }, + ConfigString: util.MustToJSON(migration_model.IndexComparisonTaskConfig{ + Source: sourceDump, + Target: targetDump, + Execution: clusterComparisonTask.Settings.Execution, + }), + } + indexComparisonTask.ID = util.GetUUID() + + err = orm.Create(nil, &indexComparisonTask) + if err != nil { + return fmt.Errorf("store index comparison task error: %w", err) + } + } + } + + taskItem.Metadata.Labels["is_split"] = true + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, fmt.Sprintf("major task [%s] splitted", taskItem.ID)) + return nil +} + +func (p *processor) handleRunningMajorTask(taskItem *task.Task) error { + taskStatus, err := p.getMajorTaskState(taskItem) + if err != nil { + return err + } + if !(taskStatus == task.StatusComplete || taskStatus == task.StatusError) { + return nil + } + + var errMsg string + + if taskStatus == task.StatusError { + errMsg = "index comparison(s) failed" + } + + if errMsg == "" { + taskItem.Status = task.StatusComplete + } else { + taskItem.Status = task.StatusError + } + tn := time.Now() + taskItem.CompletedTime = &tn + p.sendMajorTaskNotification(taskItem) + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: errMsg == "", + Error: errMsg, + }, fmt.Sprintf("cluster comparison task [%s] finished with status [%s]", taskItem.ID, taskItem.Status)) + return nil +} + +func (p *processor) getMajorTaskState(majorTask *task.Task) (string, error) { + query := util.MapStr{ + "size": 0, + "aggs": util.MapStr{ + "count": util.MapStr{ + "terms": util.MapStr{ + "field": "*", + }, + }, + "grp": util.MapStr{ + "terms": util.MapStr{ + "field": "status", + }, + }, + }, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": majorTask.ID, + }, + }, + }, + { + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": "index_comparison", + }, + }, + }, + }, + }, + }, + } + esClient := elastic.GetClient(p.Elasticsearch) + res, err := esClient.SearchWithRawQueryDSL(p.IndexName, util.MustToJSONBytes(query)) + if err != nil { + log.Errorf("search es failed, err: %v", err) + return "", nil + } + var ( + hasError bool + ) + for _, bk := range res.Aggregations["grp"].Buckets { + statusKey, _ := util.ExtractString(bk["key"]) + if migration_util.IsRunningState(statusKey) { + return task.StatusRunning, nil + } + if statusKey == task.StatusError { + hasError = true + } + } + if hasError { + return task.StatusError, nil + } + return task.StatusComplete, nil +} + +func (p *processor) handlePendingStopMajorTask(taskItem *task.Task) error { + err := migration_util.UpdatePendingChildTasksToPendingStop(taskItem, "index_comparison") + if err != nil { + log.Errorf("failed to update sub task status, err: %v", err) + return nil + } + + tasks, err := migration_util.GetPendingChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "index_comparison") + if err != nil { + log.Errorf("failed to get sub tasks, err: %v", err) + return nil + } + + // all subtask stopped or error or complete + if len(tasks) == 0 { + taskItem.Status = task.StatusStopped + p.sendMajorTaskNotification(taskItem) + p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("cluster comparison task [%s] stopped", taskItem.ID)) + // NOTE: we don't know how many running index_comparison's stopped, so do a refresh from ES + p.scheduler.RefreshInstanceJobsFromES() + } + return nil +} + +func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.TaskResult, message string) { + esClient := elastic.GetClient(p.Elasticsearch) + _, err := esClient.Index(p.IndexName, "", taskItem.ID, taskItem, "") + if err != nil { + log.Errorf("failed to update task, err: %v", err) + } + if message != "" { + migration_util.WriteLog(taskItem, taskResult, message) + } +} + +func (p *processor) sendMajorTaskNotification(taskItem *task.Task) { + config := migration_model.ClusterComparisonTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &config) + if err != nil { + log.Errorf("failed to parse config info from major task, id: %s, err: %v", taskItem.ID, err) + return + } + + creatorID := config.Creator.Id + + var title, body string + body = fmt.Sprintf("From Cluster: [%s (%s)], To Cluster: [%s (%s)]", config.Cluster.Source.Id, config.Cluster.Source.Name, config.Cluster.Target.Id, config.Cluster.Target.Name) + link := fmt.Sprintf("/#/data_tools/comparison/%s/detail", taskItem.ID) + switch taskItem.Status { + case task.StatusReady: + log.Debugf("skip sending notification for ready task, id: %s", taskItem.ID) + return + case task.StatusStopped: + title = fmt.Sprintf("Data Comparison Stopped") + case task.StatusComplete: + title = fmt.Sprintf("Data Comparison Completed") + case task.StatusError: + title = fmt.Sprintf("Data Comparison Failed") + case task.StatusRunning: + title = fmt.Sprintf("Data Comparison Started") + default: + log.Warnf("skip sending notification for invalid task status, id: %s", taskItem.ID) + return + } + notification := &model.Notification{ + UserId: util.ToString(creatorID), + Type: model.NotificationTypeNotification, + MessageType: model.MessageTypeComparison, + Status: model.NotificationStatusNew, + Title: title, + Body: body, + Link: link, + } + err = orm.Create(nil, notification) + if err != nil { + log.Errorf("failed to create notification, err: %v", err) + return + } + return +} diff --git a/plugin/migration/cluster_migration/cluster_migration.go b/plugin/migration/cluster_migration/cluster_migration.go index 861c7c0b..43ecf5d9 100644 --- a/plugin/migration/cluster_migration/cluster_migration.go +++ b/plugin/migration/cluster_migration/cluster_migration.go @@ -34,13 +34,13 @@ func NewProcessor(elasticsearch, indexName string, scheduler migration_model.Sch func (p *processor) Process(t *task.Task) (err error) { switch t.Status { case task.StatusReady: - // mark index_migrations as pending_stop + // split & schedule index_migration tasks err = p.handleReadyMajorTask(t) case task.StatusRunning: // check index_migration tasks status err = p.handleRunningMajorTask(t) case task.StatusPendingStop: - // split & schedule index_migration tasks + // mark index_migrations as pending_stop err = p.handlePendingStopMajorTask(t) } return err @@ -105,9 +105,6 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { log.Errorf("failed to get task config, err: %v", err) return err } - defer func() { - taskItem.ConfigString = util.MustToJSON(clusterMigrationTask) - }() esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id) targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id) @@ -164,17 +161,6 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { } } } - var targetMust []interface{} - if index.RawFilter != nil { - targetMust = append(targetMust, index.RawFilter) - } - if index.Target.DocType != "" && targetType != "" { - targetMust = append(targetMust, util.MapStr{ - "terms": util.MapStr{ - "_type": []string{index.Target.DocType}, - }, - }) - } target := migration_model.IndexMigrationTargetConfig{ ClusterId: clusterMigrationTask.Cluster.Target.Id, @@ -255,18 +241,11 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { } else { source.DocCount = index.Source.Docs - indexParameters := migration_model.IndexMigrationTaskConfig{ - Source: source, - Target: target, - Execution: clusterMigrationTask.Settings.Execution, - Version: migration_model.IndexMigrationV1, - } indexMigrationTask := task.Task{ - ParentId: []string{taskItem.ID}, - Cancellable: true, - Runnable: false, - Status: task.StatusReady, - StartTimeInMillis: time.Now().UnixMilli(), + ParentId: []string{taskItem.ID}, + Cancellable: true, + Runnable: false, + Status: task.StatusReady, Metadata: task.Metadata{ Type: "index_migration", Labels: util.MapStr{ @@ -278,7 +257,12 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error { "unique_index_name": index.Source.GetUniqueIndexName(), }, }, - ConfigString: util.MustToJSON(indexParameters), + ConfigString: util.MustToJSON(migration_model.IndexMigrationTaskConfig{ + Source: source, + Target: target, + Execution: clusterMigrationTask.Settings.Execution, + Version: migration_model.IndexMigrationV1, + }), } indexMigrationTask.ID = util.GetUUID() diff --git a/plugin/migration/common_api.go b/plugin/migration/common_api.go new file mode 100644 index 00000000..e99d3904 --- /dev/null +++ b/plugin/migration/common_api.go @@ -0,0 +1,258 @@ +package migration + +import ( + "errors" + "fmt" + "net/http" + "strconv" + + log "github.com/cihub/seelog" + + migration_model "infini.sh/console/plugin/migration/model" + migration_util "infini.sh/console/plugin/migration/util" + + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" + elastic2 "infini.sh/framework/modules/elastic" +) + +func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + var ( + keyword = h.GetParameterOrDefault(req, "keyword", "") + strSize = h.GetParameterOrDefault(req, "size", "20") + strFrom = h.GetParameterOrDefault(req, "from", "0") + mustQ []interface{} + ) + mustQ = append(mustQ, util.MapStr{ + "term": util.MapStr{ + "metadata.type": util.MapStr{ + "value": taskType, + }, + }, + }) + + if keyword != "" { + mustQ = append(mustQ, util.MapStr{ + "query_string": util.MapStr{ + "default_field": "*", + "query": keyword, + }, + }) + } + size, _ := strconv.Atoi(strSize) + if size <= 0 { + size = 20 + } + from, _ := strconv.Atoi(strFrom) + if from < 0 { + from = 0 + } + + queryDSL := util.MapStr{ + "sort": []util.MapStr{ + { + "created": util.MapStr{ + "order": "desc", + }, + }, + }, + "size": size, + "from": from, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": mustQ, + }, + }, + } + + q := orm.Query{} + q.RawQuery = util.MustToJSONBytes(queryDSL) + + err, res := orm.Search(&task.Task{}, &q) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + searchRes := &elastic.SearchResponse{} + err = util.FromJSONBytes(res.Raw, searchRes) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + for _, hit := range searchRes.Hits.Hits { + sourceM := util.MapStr(hit.Source) + buf := util.MustToJSONBytes(sourceM["config"]) + dataConfig := migration_model.ClusterMigrationTaskConfig{} + err = util.FromJSONBytes(buf, &dataConfig) + if err != nil { + log.Error(err) + continue + } + //var targetTotalDocs int64 + if hit.Source["status"] == task.StatusRunning { + ts, err := getMajorTaskStatsFromInstances(hit.ID) + if err != nil { + log.Warnf("fetch progress info of task error: %v", err) + continue + } + sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs) + } + + } + + h.WriteJSON(w, searchRes, http.StatusOK) + } +} + +func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + taskID := ps.MustGetParameter("task_id") + obj := task.Task{} + + obj.ID = taskID + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError) + return + } + if obj.Metadata.Type != "pipeline" && obj.Status == task.StatusComplete { + h.WriteError(w, fmt.Sprintf("[%s] task [%s] completed, can't start anymore", obj.Metadata.Type, taskID), http.StatusInternalServerError) + return + } + obj.Status = task.StatusReady + + err = orm.Update(nil, &obj) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + migration_util.WriteLog(&obj, &task.TaskResult{ + Success: true, + }, "task status manually set to ready") + + // update status of parent task to running + for _, parentTaskID := range obj.ParentId { + parentTask := task.Task{} + parentTask.ID = parentTaskID + exists, err := orm.Get(&parentTask) + if !exists || err != nil { + h.WriteError(w, fmt.Sprintf("parent task [%s] not found", parentTaskID), http.StatusInternalServerError) + return + } + parentTask.Status = task.StatusRunning + err = orm.Update(nil, &parentTask) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + migration_util.WriteLog(&parentTask, nil, fmt.Sprintf("child [%s] task [%s] manually started", obj.Metadata.Type, taskID)) + } + + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) +} + +func (h *APIHandler) stopTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("task_id") + obj := task.Task{} + obj.ID = id + + exists, err := orm.Get(&obj) + if !exists || err != nil { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "found": false, + }, http.StatusNotFound) + return + } + if task.IsEnded(obj.Status) { + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) + return + } + obj.Status = task.StatusPendingStop + err = orm.Update(nil, &obj) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + migration_util.WriteLog(&obj, &task.TaskResult{ + Success: true, + }, "task status manually set to pending stop") + + h.WriteJSON(w, util.MapStr{ + "success": true, + }, 200) +} + +// delete task and all sub tasks +func (h *APIHandler) deleteTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + id := ps.MustGetParameter("task_id") + obj := task.Task{} + obj.ID = id + + _, err := orm.Get(&obj) + if err != nil { + if errors.Is(err, elastic2.ErrNotFound) { + h.WriteJSON(w, util.MapStr{ + "_id": id, + "result": "not_found", + }, http.StatusNotFound) + return + } + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + if util.StringInArray([]string{task.StatusReady, task.StatusRunning, task.StatusPendingStop}, obj.Status) { + h.WriteError(w, fmt.Sprintf("can not delete task [%s] with status [%s]", obj.ID, obj.Status), http.StatusInternalServerError) + return + } + + q := util.MapStr{ + "query": util.MapStr{ + "bool": util.MapStr{ + "minimum_should_match": 1, + "should": []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": id, + }, + }, + }, + { + "term": util.MapStr{ + "id": util.MapStr{ + "value": id, + }, + }, + }, + }, + }, + }, + } + err = orm.DeleteBy(&obj, util.MustToJSONBytes(q)) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + + h.WriteJSON(w, util.MapStr{ + "_id": obj.ID, + "result": "deleted", + }, 200) +} diff --git a/plugin/migration/comparison_api.go b/plugin/migration/comparison_api.go new file mode 100644 index 00000000..6eb56152 --- /dev/null +++ b/plugin/migration/comparison_api.go @@ -0,0 +1,73 @@ +package migration + +import ( + "net/http" + + log "github.com/cihub/seelog" + + migration_model "infini.sh/console/plugin/migration/model" + + "infini.sh/framework/core/api/rbac" + httprouter "infini.sh/framework/core/api/router" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" +) + +func (h *APIHandler) createDataComparisonTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + clusterTaskConfig := &migration_model.ClusterComparisonTaskConfig{} + err := h.DecodeJSON(req, clusterTaskConfig) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if len(clusterTaskConfig.Indices) == 0 { + h.WriteError(w, "indices must not be empty", http.StatusInternalServerError) + return + } + user, err := rbac.FromUserContext(req.Context()) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + if user != nil { + clusterTaskConfig.Creator.Name = user.Username + clusterTaskConfig.Creator.Id = user.UserId + } + + var totalDocs int64 + for _, index := range clusterTaskConfig.Indices { + totalDocs += index.Source.Docs + } + + srcClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Source.Id) + clusterTaskConfig.Cluster.Source.Distribution = srcClusterCfg.Distribution + dstClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Target.Id) + clusterTaskConfig.Cluster.Target.Distribution = dstClusterCfg.Distribution + t := task.Task{ + Metadata: task.Metadata{ + Type: "cluster_comparison", + Labels: util.MapStr{ + "business_id": "cluster_comparison", + "source_cluster_id": clusterTaskConfig.Cluster.Source.Id, + "target_cluster_id": clusterTaskConfig.Cluster.Target.Id, + "source_total_docs": totalDocs, + }, + }, + Cancellable: true, + Runnable: false, + Status: task.StatusInit, + ConfigString: util.MustToJSON(clusterTaskConfig), + } + t.ID = util.GetUUID() + err = orm.Create(nil, &t) + if err != nil { + h.WriteError(w, err.Error(), http.StatusInternalServerError) + log.Error(err) + return + } + h.WriteCreatedOKJSON(w, t.ID) +} diff --git a/plugin/migration/index_comparison/index_comparison.go b/plugin/migration/index_comparison/index_comparison.go new file mode 100644 index 00000000..73aabd7f --- /dev/null +++ b/plugin/migration/index_comparison/index_comparison.go @@ -0,0 +1,480 @@ +package index_comparison + +import ( + "fmt" + "time" + + log "github.com/cihub/seelog" + "infini.sh/console/model" + migration_model "infini.sh/console/plugin/migration/model" + migration_util "infini.sh/console/plugin/migration/util" + + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" +) + +type processor struct { + Elasticsearch string + IndexName string + scheduler migration_model.Scheduler +} + +func NewProcessor(elasticsearch, indexName string, scheduler migration_model.Scheduler) migration_model.Processor { + return &processor{ + Elasticsearch: elasticsearch, + IndexName: indexName, + scheduler: scheduler, + } +} + +func (p *processor) Process(t *task.Task) (err error) { + switch t.Status { + case task.StatusReady: + // split & schedule pipline tasks + err = p.handleReadySubTask(t) + case task.StatusRunning: + // check pipeline tasks status + err = p.handleRunningSubTask(t) + case task.StatusPendingStop: + // mark pipeline tasks as pending_stop + err = p.handlePendingStopSubTask(t) + } + return err +} + +func (p *processor) handleReadySubTask(taskItem *task.Task) error { + if ok, _ := util.ExtractBool(taskItem.Metadata.Labels["is_split"]); !ok { + return p.handleSplitSubTask(taskItem) + } + + return p.handleScheduleSubTask(taskItem) +} + +// split task to two dump_hash and on index_diff pipeline and then persistent +func (p *processor) handleSplitSubTask(taskItem *task.Task) error { + cfg := migration_model.IndexComparisonTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &cfg) + if err != nil { + return fmt.Errorf("got wrong config [%v] with task [%s], err: %v", taskItem.ConfigString, taskItem.ID, err) + } + + if len(taskItem.ParentId) == 0 { + return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) + } + + var pids []string + pids = append(pids, taskItem.ParentId...) + pids = append(pids, taskItem.ID) + sourceClusterID := cfg.Source.ClusterId + targetClusterID := cfg.Target.ClusterId + + sourceDumpID := util.GetUUID() + sourceDumpTask := &task.Task{ + ParentId: pids, + Runnable: true, + Cancellable: true, + Metadata: task.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "cluster_id": sourceClusterID, + "pipeline_id": "dump_hash", + "index_name": cfg.Source.Indices, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + }, + Status: task.StatusInit, + RetryTimes: taskItem.RetryTimes, + ConfigString: util.MustToJSON(migration_model.PipelineTaskConfig{ + Name: sourceDumpID, + Logging: migration_model.PipelineTaskLoggingConfig{ + Enabled: true, + }, + Labels: util.MapStr{ + "parent_task_id": pids, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + AutoStart: true, + KeepRunning: false, + Processor: []util.MapStr{ + { + "dump_hash": util.MapStr{ + "slice_size": cfg.Source.SliceSize, + "batch_size": cfg.Source.BatchSize, + "indices": cfg.Source.Indices, + "elasticsearch": sourceClusterID, + "output_queue": sourceDumpID, + "clean_old_files": true, + "partition_size": cfg.Source.PartitionSize, + "scroll_time": cfg.Source.ScrollTime, + "query_dsl": util.MustToJSON(util.MapStr{ + "query": cfg.Source.QueryDSL, + }), + }, + }, + }, + }), + } + sourceDumpTask.ID = sourceDumpID + + targetDumpID := util.GetUUID() + targetDumpTask := &task.Task{ + ParentId: pids, + Runnable: true, + Cancellable: true, + Metadata: task.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "cluster_id": targetClusterID, + "pipeline_id": "dump_hash", + "index_name": cfg.Target.Indices, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + }, + Status: task.StatusInit, + RetryTimes: taskItem.RetryTimes, + ConfigString: util.MustToJSON(migration_model.PipelineTaskConfig{ + Name: targetDumpID, + Logging: migration_model.PipelineTaskLoggingConfig{ + Enabled: true, + }, + Labels: util.MapStr{ + "parent_task_id": pids, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + AutoStart: true, + KeepRunning: false, + Processor: []util.MapStr{ + { + "dump_hash": util.MapStr{ + "slice_size": cfg.Target.SliceSize, + "batch_size": cfg.Target.BatchSize, + "indices": cfg.Target.Indices, + "elasticsearch": targetClusterID, + "output_queue": targetDumpID, + "clean_old_files": true, + "partition_size": cfg.Target.PartitionSize, + "scroll_time": cfg.Target.ScrollTime, + "query_dsl": util.MustToJSON(util.MapStr{ + "query": cfg.Target.QueryDSL, + }), + }, + }, + }, + }), + } + targetDumpTask.ID = targetDumpID + + diffID := util.GetUUID() + diffTask := &task.Task{ + ParentId: pids, + Runnable: true, + Cancellable: true, + Metadata: task.Metadata{ + Type: "pipeline", + Labels: util.MapStr{ + "pipeline_id": "index_diff", + }, + }, + Status: task.StatusInit, + RetryTimes: taskItem.RetryTimes, + ConfigString: util.MustToJSON(migration_model.PipelineTaskConfig{ + Name: diffID, + Logging: migration_model.PipelineTaskLoggingConfig{ + Enabled: true, + }, + Labels: util.MapStr{ + "parent_task_id": pids, + "unique_index_name": taskItem.Metadata.Labels["unique_index_name"], + }, + AutoStart: true, + KeepRunning: false, + Processor: []util.MapStr{ + { + "index_diff": util.MapStr{ + "text_report": false, + "keep_source": false, + "buffer_size": 1, + "clean_old_files": true, + // NOTE: source & target must have same partition_size + "partition_size": cfg.Source.PartitionSize, + "source_queue": sourceDumpID, + "target_queue": targetDumpID, + "output_queue": util.MapStr{ + "name": diffID, + "labels": util.MapStr{ + "comparison_task_id": taskItem.ID, + }, + }, + }, + }, + }, + }), + } + diffTask.ID = diffID + + err = orm.Create(nil, sourceDumpTask) + if err != nil { + return fmt.Errorf("create source dump pipeline task error: %w", err) + } + err = orm.Create(nil, targetDumpTask) + if err != nil { + return fmt.Errorf("create target dump pipeline task error: %w", err) + } + err = orm.Create(nil, diffTask) + if err != nil { + return fmt.Errorf("create diff pipeline task error: %w", err) + } + + taskItem.Metadata.Labels["is_split"] = true + taskItem.Status = task.StatusReady + + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, fmt.Sprintf("comparison task [%s] splitted", taskItem.ID)) + return nil +} + +func (p *processor) handleScheduleSubTask(taskItem *task.Task) error { + cfg := migration_model.IndexComparisonTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + return fmt.Errorf("got wrong config of task [%s]", taskItem.ID) + } + + sourceDumpTask, targetDumpTask, diffTask, err := p.getPipelineTasks(taskItem, &cfg) + if err != nil { + log.Errorf("failed to get pipeline tasks, err: %v", err) + return nil + } + if sourceDumpTask == nil || targetDumpTask == nil || diffTask == nil { + // ES might not synced yet + log.Warnf("task [%s] pipeline task(s) not found", taskItem.ID) + return nil + } + + taskItem.RetryTimes++ + + // get a new instanceID + executionConfig := cfg.Execution + instance, err := p.scheduler.GetPreferenceInstance(executionConfig) + if err != nil { + if err == migration_model.ErrHitMax { + log.Debug("hit max tasks per instance, skip dispatch") + return nil + } + return fmt.Errorf("get preference intance error: %w", err) + } + instanceID := instance.ID + + // try to clear disk queue before running dump_hash + p.cleanGatewayQueue(taskItem) + + sourceDumpTask.RetryTimes = taskItem.RetryTimes + sourceDumpTask.Status = task.StatusReady + err = orm.Update(nil, sourceDumpTask) + if err != nil { + return fmt.Errorf("update source dump pipeline task error: %w", err) + } + targetDumpTask.RetryTimes = taskItem.RetryTimes + targetDumpTask.Status = task.StatusReady + err = orm.Update(nil, targetDumpTask) + if err != nil { + return fmt.Errorf("update target dump pipeline task error: %w", err) + } + diffTask.RetryTimes = taskItem.RetryTimes + diffTask.Status = task.StatusInit + err = orm.Update(nil, diffTask) + if err != nil { + return fmt.Errorf("update diff pipeline task error: %w", err) + } + + // update sub migration task status to running and save task log + taskItem.Metadata.Labels["execution_instance_id"] = instanceID + taskItem.Status = task.StatusRunning + taskItem.StartTimeInMillis = time.Now().UnixMilli() + + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, fmt.Sprintf("index comparison task [%s] started", taskItem.ID)) + // update dispatcher state + p.scheduler.IncrInstanceJobs(instanceID) + return nil +} + +func (p *processor) handleRunningSubTask(taskItem *task.Task) error { + cfg := migration_model.IndexComparisonTaskConfig{} + err := migration_util.GetTaskConfig(taskItem, &cfg) + if err != nil { + log.Errorf("failed to get task config, err: %v", err) + return fmt.Errorf("got wrong config of task [%s]", taskItem.ID) + } + instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) + + sourceDumpTask, targetDumpTask, diffTask, err := p.getPipelineTasks(taskItem, &cfg) + if err != nil { + log.Errorf("failed to get pipeline tasks, err: %v", err) + return nil + } + if sourceDumpTask == nil || targetDumpTask == nil || diffTask == nil { + // ES might not synced yet + log.Warnf("task [%s] pipeline task(s) not found", taskItem.ID) + return nil + } + + if migration_util.IsRunningState(sourceDumpTask.Status) || migration_util.IsRunningState(targetDumpTask.Status) || migration_util.IsRunningState(diffTask.Status) { + return nil + } + if sourceDumpTask.Status == task.StatusComplete && targetDumpTask.Status == task.StatusComplete { + sourceDocs := migration_util.GetMapIntValue(util.MapStr(sourceDumpTask.Metadata.Labels), "scrolled_docs") + targetDocs := migration_util.GetMapIntValue(util.MapStr(targetDumpTask.Metadata.Labels), "scrolled_docs") + if sourceDocs != targetDocs { + now := time.Now() + taskItem.CompletedTime = &now + taskItem.Status = task.StatusError + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: false, + }, fmt.Sprintf("index comparison failed, source/target doc count unmatch: %d / %d", sourceDocs, targetDocs)) + p.scheduler.DecrInstanceJobs(instanceID) + return nil + } + if diffTask.Status == task.StatusInit { + diffTask.Status = task.StatusReady + p.saveTaskAndWriteLog(diffTask, &task.TaskResult{ + Success: true, + }, fmt.Sprintf("source/target dump completed, diff pipeline started")) + return nil + } + if diffTask.Status == task.StatusComplete { + m := util.MapStr(diffTask.Metadata.Labels) + var ( + onlyInSource = migration_util.GetMapIntValue(m, "only_in_source_count") + onlyInSourceKeys = migration_util.GetMapStringValue(m, "only_in_source_keys") + onlyInTarget = migration_util.GetMapIntValue(m, "only_in_target_count") + onlyInTargetKeys = migration_util.GetMapStringValue(m, "only_in_target_keys") + diffBoth = migration_util.GetMapIntValue(m, "diff_both_count") + diffBothKeys = migration_util.GetMapStringValue(m, "diff_both_keys") + ) + if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 { + now := time.Now() + taskItem.CompletedTime = &now + taskItem.Status = task.StatusError + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: false, + Error: "data unmatch", + }, fmt.Sprintf("index comparison failed, only in source: %d, only in target: %d, diff in both: %d (sample doc ids: [%s], [%s] [%s])", onlyInSource, onlyInTarget, diffBoth, onlyInSourceKeys, onlyInTargetKeys, diffBothKeys)) + p.scheduler.DecrInstanceJobs(instanceID) + return nil + } + } + } + if sourceDumpTask.Status == task.StatusError || targetDumpTask.Status == task.StatusError || diffTask.Status == task.StatusError { + now := time.Now() + taskItem.CompletedTime = &now + taskItem.Status = task.StatusError + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: false, + Error: "pipeline task failed", + }, "index comparison failed") + p.scheduler.DecrInstanceJobs(instanceID) + return nil + } + + now := time.Now() + taskItem.CompletedTime = &now + taskItem.Status = task.StatusComplete + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: true, + }, "index comparison completed") + p.scheduler.DecrInstanceJobs(instanceID) + + return nil +} + +func (p *processor) handlePendingStopSubTask(taskItem *task.Task) error { + err := migration_util.UpdatePendingChildTasksToPendingStop(taskItem, "pipeline") + if err != nil { + log.Errorf("failed to update sub task status, err: %v", err) + return nil + } + + tasks, err := migration_util.GetPendingChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "pipeline") + if err != nil { + log.Errorf("failed to get sub tasks, err: %v", err) + return nil + } + + // all subtask stopped or error or complete + if len(tasks) == 0 { + taskItem.Status = task.StatusStopped + p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("index comparison task [%s] stopped", taskItem.ID)) + } + return nil +} + +func (p *processor) getPipelineTasks(taskItem *task.Task, cfg *migration_model.IndexComparisonTaskConfig) (sourceDumpTask *task.Task, targetDumpTask *task.Task, diffTask *task.Task, err error) { + ptasks, err := migration_util.GetChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "pipeline", nil) + if err != nil { + log.Errorf("failed to get pipeline tasks, err: %v", err) + return + } + if len(ptasks) != 3 { + err = fmt.Errorf("invalid pipeline task count: %d", len(ptasks)) + return + } + for i, ptask := range ptasks { + if ptask.Metadata.Labels["pipeline_id"] == "dump_hash" { + // TODO: we can't handle when compare the same cluster & same index + // catch it earlier when creating the task + if ptask.Metadata.Labels["cluster_id"] == cfg.Source.ClusterId && ptask.Metadata.Labels["index_name"] == cfg.Source.Indices { + sourceDumpTask = &ptasks[i] + } else { + targetDumpTask = &ptasks[i] + } + } else if ptask.Metadata.Labels["pipeline_id"] == "index_diff" { + diffTask = &ptasks[i] + } + } + return +} + +// NOTE: only index_diff have an output queue, others are local files +func (p *processor) cleanGatewayQueue(taskItem *task.Task) { + log.Debugf("cleaning gateway queue for task [%s]", taskItem.ID) + + var err error + instance := model.Instance{} + instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) + if instance.ID == "" { + log.Debugf("task [%s] not scheduled yet, skip cleaning queue", taskItem.ID) + return + } + _, err = orm.Get(&instance) + if err != nil { + log.Errorf("failed to get instance, err: %v", err) + return + } + + selector := util.MapStr{ + "labels": util.MapStr{ + "comparison_task_id": taskItem.ID, + }, + } + err = instance.DeleteQueueBySelector(selector) + if err != nil { + log.Errorf("failed to delete queue, err: %v", err) + } +} + +func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.TaskResult, message string) { + esClient := elastic.GetClient(p.Elasticsearch) + _, err := esClient.Index(p.IndexName, "", taskItem.ID, taskItem, "") + if err != nil { + log.Errorf("failed to update task, err: %v", err) + } + if message != "" { + migration_util.WriteLog(taskItem, taskResult, message) + } +} diff --git a/plugin/migration/index_migration/index_migration.go b/plugin/migration/index_migration/index_migration.go index 9a3aaeda..60cec9a0 100644 --- a/plugin/migration/index_migration/index_migration.go +++ b/plugin/migration/index_migration/index_migration.go @@ -93,10 +93,6 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error { if len(taskItem.ParentId) == 0 { return fmt.Errorf("got wrong parent id of task [%v]", *taskItem) } - queryDsl := cfg.Source.QueryDSL - scrollQueryDsl := util.MustToJSON(util.MapStr{ - "query": queryDsl, - }) indexName := cfg.Source.Indices scrollTask := &task.Task{ ParentId: pids, @@ -140,9 +136,11 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error { }, "partition_size": 1, "scroll_time": cfg.Source.ScrollTime, - "query_dsl": scrollQueryDsl, - "index_rename": cfg.Source.IndexRename, - "type_rename": cfg.Source.TypeRename, + "query_dsl": util.MustToJSON(util.MapStr{ + "query": cfg.Source.QueryDSL, + }), + "index_rename": cfg.Source.IndexRename, + "type_rename": cfg.Source.TypeRename, }, }, }, @@ -279,7 +277,6 @@ func (p *processor) handleScheduleSubTask(taskItem *task.Task) error { if err == migration_model.ErrHitMax { log.Debug("hit max tasks per instance, skip dispatch") return nil - } return fmt.Errorf("get preference intance error: %w", err) } @@ -492,11 +489,15 @@ func (p *processor) getExecutionConfigFromMajorTask(taskItem *task.Task) (config } func (p *processor) getScrollBulkPipelineTasks(taskItem *task.Task) (scrollTask *task.Task, bulkTask *task.Task, err error) { - ptasks, err := p.getPipelineTasks(taskItem.ID) + ptasks, err := migration_util.GetChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "pipeline", nil) if err != nil { log.Errorf("failed to get pipeline tasks, err: %v", err) return } + if len(ptasks) != 2 { + err = fmt.Errorf("invalid pipeline task count: %d", len(ptasks)) + return + } for i, ptask := range ptasks { if ptask.Metadata.Labels["pipeline_id"] == "bulk_indexing" { bulkTask = &ptasks[i] @@ -507,33 +508,6 @@ func (p *processor) getScrollBulkPipelineTasks(taskItem *task.Task) (scrollTask return } -func (p *processor) getPipelineTasks(subTaskID string) ([]task.Task, error) { - queryDsl := util.MapStr{ - "size": 2, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": subTaskID, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.type": util.MapStr{ - "value": "pipeline", - }, - }, - }, - }, - }, - }, - } - return migration_util.GetTasks(p.Elasticsearch, p.IndexName, queryDsl) -} - func (p *processor) cleanGatewayQueue(taskItem *task.Task) { log.Debugf("cleaning gateway queue for task [%s]", taskItem.ID) diff --git a/plugin/migration/model/comparison.go b/plugin/migration/model/comparison.go new file mode 100644 index 00000000..18b868b7 --- /dev/null +++ b/plugin/migration/model/comparison.go @@ -0,0 +1,55 @@ +package model + +import "infini.sh/framework/core/util" + +type ClusterComparisonTaskConfig struct { + Cluster struct { + Source ClusterInfo `json:"source"` + Target ClusterInfo `json:"target"` + } `json:"cluster"` + Indices []ClusterComparisonIndexConfig `json:"indices"` + Settings struct { + Dump DumpHashConfig `json:"dump"` + Diff IndexDiffConfig `json:"diff"` + Execution ExecutionConfig `json:"execution"` + } `json:"settings"` + Creator struct { + Name string `json:"name"` + Id string `json:"id"` + } `json:"creator"` +} + +type ClusterComparisonIndexConfig struct { + Source IndexInfo `json:"source"` + Target IndexInfo `json:"target"` + RawFilter interface{} `json:"raw_filter"` + Partition *IndexPartition `json:"partition,omitempty"` + + // only used in API + Percent float64 `json:"percent,omitempty"` + ErrorPartitions int `json:"error_partitions,omitempty"` +} + +type IndexComparisonTaskConfig struct { + Source IndexComparisonDumpConfig `json:"source"` + Target IndexComparisonDumpConfig `json:"target"` + Diff IndexComparisonDiffConfig `json:"diff"` + Execution ExecutionConfig `json:"execution"` +} + +type IndexComparisonDumpConfig struct { + ClusterId string `json:"cluster_id"` + Indices string `json:"indices"` + + SliceSize int `json:"slice_size"` + BatchSize int `json:"batch_size"` + PartitionSize int `json:"partition_size"` + ScrollTime string `json:"scroll_time"` + QueryString string `json:"query_string,omitempty"` + QueryDSL util.MapStr `json:"query_dsl,omitempty"` + + DocCount int64 `json:"doc_count"` +} + +type IndexComparisonDiffConfig struct { +} diff --git a/plugin/migration/model/model.go b/plugin/migration/model/model.go index 53010327..93e4ba8b 100644 --- a/plugin/migration/model/model.go +++ b/plugin/migration/model/model.go @@ -15,15 +15,9 @@ type ClusterMigrationTaskConfig struct { } `json:"cluster"` Indices []ClusterMigrationIndexConfig `json:"indices"` Settings struct { - ParallelIndices int `json:"parallel_indices"` - ParallelTaskPerIndex int `json:"parallel_task_per_index"` - Scroll struct { - SliceSize int `json:"slice_size"` - Docs int `json:"docs"` - Timeout string `json:"timeout"` - } `json:"scroll"` - Bulk ClusterMigrationBulkConfig `json:"bulk"` - Execution ExecutionConfig `json:"execution"` + Scroll EsScrollConfig `json:"scroll"` + Bulk BulkIndexingConfig `json:"bulk"` + Execution ExecutionConfig `json:"execution"` } `json:"settings"` Creator struct { Name string `json:"name"` @@ -31,24 +25,17 @@ type ClusterMigrationTaskConfig struct { } `json:"creator"` } -type ClusterMigrationBulkConfig struct { - Docs int `json:"docs"` - StoreSizeInMB int `json:"store_size_in_mb"` - MaxWorkerSize int `json:"max_worker_size"` - IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` - SliceSize int `json:"slice_size"` - Compress bool `json:"compress"` -} - type ClusterMigrationIndexConfig struct { - Source IndexInfo `json:"source"` - Target IndexInfo `json:"target"` - RawFilter interface{} `json:"raw_filter"` - IndexRename map[string]interface{} `json:"index_rename"` - TypeRename map[string]interface{} `json:"type_rename"` - Partition *IndexPartition `json:"partition,omitempty"` - Percent float64 `json:"percent,omitempty"` - ErrorPartitions int `json:"error_partitions,omitempty"` + Source IndexInfo `json:"source"` + Target IndexInfo `json:"target"` + RawFilter interface{} `json:"raw_filter"` + IndexRename map[string]interface{} `json:"index_rename"` + TypeRename map[string]interface{} `json:"type_rename"` + Partition *IndexPartition `json:"partition,omitempty"` + + // only used in API + Percent float64 `json:"percent,omitempty"` + ErrorPartitions int `json:"error_partitions,omitempty"` } type MajorTaskState struct { diff --git a/plugin/migration/model/pipeline.go b/plugin/migration/model/pipeline.go index 99660262..6531a06c 100644 --- a/plugin/migration/model/pipeline.go +++ b/plugin/migration/model/pipeline.go @@ -2,6 +2,35 @@ package model import "infini.sh/framework/core/util" +// tunable `es_scroll` configurations +type EsScrollConfig struct { + SliceSize int `json:"slice_size"` + Docs int `json:"docs"` + Timeout string `json:"timeout"` +} + +// tunable `dump_hash` configurations +type DumpHashConfig struct { + SliceSize int `json:"slice_size"` + PartitionSize int `json:"partition_size"` + Docs int `json:"docs"` + Timeout string `json:"timeout"` +} + +// tunable `index_diff` configurations +type IndexDiffConfig struct { +} + +// tunable `bulk_indexing` configurations +type BulkIndexingConfig struct { + Docs int `json:"docs"` + StoreSizeInMB int `json:"store_size_in_mb"` + MaxWorkerSize int `json:"max_worker_size"` + IdleTimeoutInSeconds int `json:"idle_timeout_in_seconds"` + SliceSize int `json:"slice_size"` + Compress bool `json:"compress"` +} + type PipelineTaskLoggingConfig struct { Enabled bool `json:"enabled"` } diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index cc7fe756..392f8fe9 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -11,7 +11,9 @@ import ( log "github.com/cihub/seelog" + "infini.sh/console/plugin/migration/cluster_comparison" "infini.sh/console/plugin/migration/cluster_migration" + "infini.sh/console/plugin/migration/index_comparison" "infini.sh/console/plugin/migration/index_migration" migration_model "infini.sh/console/plugin/migration/model" "infini.sh/console/plugin/migration/pipeline_task" @@ -32,10 +34,12 @@ type DispatcherProcessor struct { id string config *DispatcherConfig - scheduler migration_model.Scheduler - pipelineTaskProcessor migration_model.Processor - clusterMigrationTaskProcessor migration_model.Processor - indexMigrationTaskProcessor migration_model.Processor + scheduler migration_model.Scheduler + pipelineTaskProcessor migration_model.Processor + clusterMigrationTaskProcessor migration_model.Processor + indexMigrationTaskProcessor migration_model.Processor + clusterComparisonTaskProcessor migration_model.Processor + indexComparisonTaskProcessor migration_model.Processor } type DispatcherConfig struct { @@ -95,6 +99,8 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro processor.pipelineTaskProcessor = pipeline_task.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName) processor.indexMigrationTaskProcessor = index_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) processor.clusterMigrationTaskProcessor = cluster_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) + processor.indexComparisonTaskProcessor = index_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) + processor.clusterComparisonTaskProcessor = cluster_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) return &processor, nil } @@ -106,17 +112,21 @@ func (p *DispatcherProcessor) Name() string { func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error { // handle pipeline task p.handleTasks(ctx, "pipeline", []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, p.pipelineTaskProcessor.Process) - // mark index_migrations as pending_stop + + // handle comparison tasks + p.handleTasks(ctx, "cluster_comparison", []string{task2.StatusPendingStop}, p.clusterComparisonTaskProcessor.Process) + p.handleTasks(ctx, "index_comparison", []string{task2.StatusPendingStop}, p.indexComparisonTaskProcessor.Process) + p.handleTasks(ctx, "index_comparison", []string{task2.StatusRunning}, p.indexComparisonTaskProcessor.Process) + p.handleTasks(ctx, "index_comparison", []string{task2.StatusReady}, p.indexComparisonTaskProcessor.Process) + p.handleTasks(ctx, "cluster_comparison", []string{task2.StatusRunning}, p.clusterComparisonTaskProcessor.Process) + p.handleTasks(ctx, "cluster_comparison", []string{task2.StatusReady}, p.clusterComparisonTaskProcessor.Process) + + // handle migration tasks p.handleTasks(ctx, "cluster_migration", []string{task2.StatusPendingStop}, p.clusterMigrationTaskProcessor.Process) - // mark pipeline tasks as pending_stop p.handleTasks(ctx, "index_migration", []string{task2.StatusPendingStop}, p.indexMigrationTaskProcessor.Process) - // check pipeline tasks status p.handleTasks(ctx, "index_migration", []string{task2.StatusRunning}, p.indexMigrationTaskProcessor.Process) - // split & schedule pipline tasks p.handleTasks(ctx, "index_migration", []string{task2.StatusReady}, p.indexMigrationTaskProcessor.Process) - // check index_migration tasks status p.handleTasks(ctx, "cluster_migration", []string{task2.StatusRunning}, p.clusterMigrationTaskProcessor.Process) - // split & schedule index_migration tasks p.handleTasks(ctx, "cluster_migration", []string{task2.StatusReady}, p.clusterMigrationTaskProcessor.Process) return nil } diff --git a/plugin/migration/pipeline_task/dump_hash.go b/plugin/migration/pipeline_task/dump_hash.go new file mode 100644 index 00000000..9cd93870 --- /dev/null +++ b/plugin/migration/pipeline_task/dump_hash.go @@ -0,0 +1,76 @@ +package pipeline_task + +import ( + "errors" + "fmt" + "time" + + log "github.com/cihub/seelog" + migration_util "infini.sh/console/plugin/migration/util" + "infini.sh/framework/core/task" +) + +func (p *processor) handleRunningDumpHashPipelineTask(taskItem *task.Task) error { + scrolledDocs, totalHits, scrolled, err := p.getDumpHashTaskState(taskItem) + + if !scrolled { + return nil + } + + var errMsg string + if err != nil { + errMsg = err.Error() + } + if errMsg == "" { + if scrolledDocs < totalHits { + errMsg = fmt.Sprintf("scrolled finished but docs count unmatch: %d / %d", scrolledDocs, totalHits) + } + } + + now := time.Now() + taskItem.CompletedTime = &now + taskItem.Metadata.Labels["scrolled_docs"] = scrolledDocs + if errMsg != "" { + taskItem.Status = task.StatusError + } else { + taskItem.Status = task.StatusComplete + } + + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: errMsg == "", + Error: errMsg, + }, fmt.Sprintf("[dump_hash] pipeline task [%s] completed", taskItem.ID)) + p.cleanGatewayPipeline(taskItem) + return nil +} + +func (p *processor) getDumpHashTaskState(taskItem *task.Task) (scrolledDocs int64, totalHits int64, scrolled bool, err error) { + hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli()) + if err != nil { + log.Errorf("failed to get pipeline logs for task [%s], err: %v", taskItem.ID, err) + err = nil + return + } + if len(hits) == 0 { + log.Debugf("scroll task [%s] not finished yet since last start", taskItem.ID) + return + } + for _, m := range hits { + scrolled = true + + errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") + if errStr != "" { + err = errors.New(errStr) + return + } + + var ( + scroll = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.dump_hash.scrolled_docs") + total = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.dump_hash.total_hits") + ) + + scrolledDocs += scroll + totalHits += total + } + return +} diff --git a/plugin/migration/pipeline_task/index_diff.go b/plugin/migration/pipeline_task/index_diff.go new file mode 100644 index 00000000..5622eff3 --- /dev/null +++ b/plugin/migration/pipeline_task/index_diff.go @@ -0,0 +1,93 @@ +package pipeline_task + +import ( + "fmt" + "strings" + "time" + + log "github.com/cihub/seelog" + migration_util "infini.sh/console/plugin/migration/util" + "infini.sh/framework/core/task" +) + +func (p *processor) handleRunningIndexDiffPipelineTask(taskItem *task.Task) error { + + diffed, onlyInSourceCount, totalOnlyInSourceDocs, onlyInTargetCount, totalOnlyInTargetDocs, diffBothCount, totalDiffBothDocs, errs := p.getIndexDiffTaskState(taskItem) + if !diffed { + return nil + } + + var errMsg string + if len(errs) > 0 { + errMsg = fmt.Sprintf("index diff finished with error(s): %v", errs) + } + + now := time.Now() + taskItem.CompletedTime = &now + // NOTE: only_in_source/only_in_target is likely useless because we'll skip diff if doc count unmatch + taskItem.Metadata.Labels["only_in_source_count"] = onlyInSourceCount + taskItem.Metadata.Labels["only_in_source_keys"] = strings.Join(totalOnlyInSourceDocs, ",") + taskItem.Metadata.Labels["only_in_target_count"] = onlyInTargetCount + taskItem.Metadata.Labels["only_in_target_keys"] = strings.Join(totalOnlyInTargetDocs, ",") + taskItem.Metadata.Labels["diff_both_count"] = diffBothCount + taskItem.Metadata.Labels["diff_both_keys"] = strings.Join(totalDiffBothDocs, ",") + + if errMsg != "" { + taskItem.Status = task.StatusError + } else { + taskItem.Status = task.StatusComplete + } + + p.saveTaskAndWriteLog(taskItem, &task.TaskResult{ + Success: errMsg == "", + Error: errMsg, + }, fmt.Sprintf("[index_diff] pipeline task [%s] finished with status [%s]", taskItem.ID, taskItem.Status)) + p.cleanGatewayPipeline(taskItem) + return nil +} + +func (p *processor) getIndexDiffTaskState(taskItem *task.Task) (diffed bool, onlyInSourceCount int64, totalOnlyInSourceDocs []string, onlyInTargetCount int64, totalOnlyInTargetDocs []string, diffBothCount int64, totalDiffBothDocs []string, errs []string) { + newHits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, taskItem.Updated.UnixMilli()) + if err != nil { + log.Errorf("failed to get latest pipeline logs for task [%s], err: %v", taskItem.ID, err) + return + } + if len(newHits) == 0 { + log.Debugf("bulk task [%s] not finished yet since last start", taskItem.ID) + return + } + + hits, err := p.getPipelineLogs(taskItem, []string{"FINISHED", "FAILED"}, 0) + if err != nil { + log.Errorf("failed to get all pipeline logs for task [%s], err: %v", taskItem.ID, err) + return + } + + for _, m := range hits { + diffed = true + + errStr := migration_util.GetMapStringValue(m, "payload.pipeline.logging.result.error") + if errStr != "" { + errs = append(errs, errStr) + } + + var ( + onlyInSource = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.index_diff.only_in_source.count") + onlyInTarget = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.index_diff.only_in_target.count") + diffBoth = migration_util.GetMapIntValue(m, "payload.pipeline.logging.context.index_diff.diff_both.count") + ) + onlyInSourceCount += onlyInSource + onlyInTargetCount += onlyInTarget + diffBothCount += diffBoth + + var ( + onlyInSourceDocs = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.index_diff.only_in_source.keys") + onlyInTargetDocs = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.index_diff.only_in_target.keys") + diffBothDocs = migration_util.GetMapStringSliceValue(m, "payload.pipeline.logging.context.index_diff.diff_both.keys") + ) + totalOnlyInSourceDocs = append(totalOnlyInSourceDocs, onlyInSourceDocs...) + totalOnlyInTargetDocs = append(totalOnlyInTargetDocs, onlyInTargetDocs...) + totalDiffBothDocs = append(totalDiffBothDocs, diffBothDocs...) + } + return +} diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index f3b0b8e0..5809197f 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -25,6 +25,11 @@ type processor struct { LogIndexName string } +var ( + // task types with pipline as children + parentTaskTypes = []string{"index_migration", "index_comparison"} +) + func NewProcessor(elasticsearch, indexName, logIndexName string) migration_model.Processor { return &processor{ Elasticsearch: elasticsearch, @@ -52,6 +57,8 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error { switch taskItem.Metadata.Labels["pipeline_id"] { case "es_scroll": case "bulk_indexing": + case "dump_hash": + case "index_diff": default: return fmt.Errorf("task [%s] has unknown pipeline_id [%s]", taskItem.ID, taskItem.Metadata.Labels["pipeline_id"]) } @@ -108,6 +115,10 @@ func (p *processor) handleRunningPipelineTask(taskItem *task.Task) error { return p.handleRunningEsScrollPipelineTask(taskItem) case "bulk_indexing": return p.handleRunningBulkIndexingPipelineTask(taskItem) + case "dump_hash": + return p.handleRunningDumpHashPipelineTask(taskItem) + case "index_diff": + return p.handleRunningIndexDiffPipelineTask(taskItem) default: return fmt.Errorf("task [%s] has unknown pipeline_id [%s]", taskItem.ID, taskItem.Metadata.Labels["pipeline_id"]) } @@ -158,7 +169,6 @@ func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) e if len(errs) > 0 { errMsg = fmt.Sprintf("bulk finished with error(s): %v", errs) } - // TODO: handle multiple run bulk_indexing pipeline tasks and total_docs from index_migration now := time.Now() taskItem.CompletedTime = &now taskItem.Metadata.Labels["index_docs"] = indexDocs @@ -239,7 +249,7 @@ func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance model.In } func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (instance model.Instance, err error) { - parentTask, err := p.getParentTask(taskItem, "index_migration") + parentTask, err := p.getParentTask(taskItem) if err != nil { return } @@ -254,7 +264,7 @@ func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (instance return } -func (p *processor) getParentTask(taskItem *task.Task, taskType string) (*task.Task, error) { +func (p *processor) getParentTask(taskItem *task.Task) (*task.Task, error) { queryDsl := util.MapStr{ "size": 1, "query": util.MapStr{ @@ -266,8 +276,8 @@ func (p *processor) getParentTask(taskItem *task.Task, taskType string) (*task.T }, }, { - "term": util.MapStr{ - "metadata.type": taskType, + "terms": util.MapStr{ + "metadata.type": parentTaskTypes, }, }, }, @@ -471,7 +481,7 @@ func (p *processor) fillDynamicESConfig(taskItem *task.Task, pipelineTaskConfig return errors.New("invalid processor config") } processorConfig := util.MapStr(v) - if k == "bulk_indexing" || k == "es_scroll" { + if k == "bulk_indexing" || k == "es_scroll" || k == "dump_hash" { elasticsearchID := migration_util.GetMapStringValue(processorConfig, "elasticsearch") if elasticsearchID == "" { return fmt.Errorf("invalid task config found for task [%s]", taskItem.ID) diff --git a/plugin/migration/util/orm.go b/plugin/migration/util/orm.go index 701a80aa..b1eb7e44 100644 --- a/plugin/migration/util/orm.go +++ b/plugin/migration/util/orm.go @@ -11,35 +11,40 @@ import ( ) func GetPendingChildTasks(elasticsearch, indexName string, taskID string, taskType string) ([]task.Task, error) { + return GetChildTasks(elasticsearch, indexName, taskID, taskType, []string{task.StatusRunning, task.StatusPendingStop, task.StatusReady}) +} - //check whether all pipeline task is stopped or not, then update task status - q := util.MapStr{ - "size": 200, - "query": util.MapStr{ - "bool": util.MapStr{ - "must": []util.MapStr{ - { - "term": util.MapStr{ - "parent_id": util.MapStr{ - "value": taskID, - }, - }, - }, - { - "term": util.MapStr{ - "metadata.type": taskType, - }, - }, - { - "terms": util.MapStr{ - "status": []string{task.StatusRunning, task.StatusPendingStop, task.StatusReady}, - }, - }, +func GetChildTasks(elasticsearch, indexName string, taskID string, taskType string, status []string) ([]task.Task, error) { + musts := []util.MapStr{ + { + "term": util.MapStr{ + "parent_id": util.MapStr{ + "value": taskID, }, }, }, + { + "term": util.MapStr{ + "metadata.type": taskType, + }, + }, } - return GetTasks(elasticsearch, indexName, q) + if len(status) > 0 { + musts = append(musts, util.MapStr{ + "terms": util.MapStr{ + "status": []string{task.StatusRunning, task.StatusPendingStop, task.StatusReady}, + }, + }) + } + queryDsl := util.MapStr{ + "size": 999, + "query": util.MapStr{ + "bool": util.MapStr{ + "must": musts, + }, + }, + } + return GetTasks(elasticsearch, indexName, queryDsl) } func GetTasks(elasticsearch, indexName string, query interface{}) ([]task.Task, error) {