diff --git a/plugin/migration/api.go b/plugin/migration/api.go index 7a69b69b..e06dbba5 100644 --- a/plugin/migration/api.go +++ b/plugin/migration/api.go @@ -19,14 +19,17 @@ import ( task2 "infini.sh/framework/core/task" "infini.sh/framework/core/util" "net/http" + "src/github.com/buger/jsonparser" "strconv" "strings" "time" ) -func InitAPI() { - handler := APIHandler{} +func InitAPI(bulkResultIndexName string) { + handler := APIHandler{ + bulkResultIndexName: bulkResultIndexName, + } api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequireLogin(handler.searchDataMigrationTask)) api.HandleAPIMethod(api.POST, "/migration/data", handler.RequireLogin(handler.createDataMigrationTask)) api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration)) @@ -44,6 +47,7 @@ func InitAPI() { type APIHandler struct { api.Handler + bulkResultIndexName string } func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params){ @@ -342,7 +346,7 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request } func getNodeEndpoint(nodeID string) (string, error){ - indexName := ".infini_agent,.infini_gateway-instance" + indexName := ".infini_agent,.infini_instance" query := util.MapStr{ "size": 1, "query": util.MapStr{ @@ -847,7 +851,13 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt return } - var durationInMS = indexTask.GetDurationInMS() + var durationInMS int64 + if indexTask.StartTimeInMillis > 0 { + durationInMS = time.Now().UnixMilli() - indexTask.StartTimeInMillis + if indexTask.CompletedTime != nil && indexTask.Status == task2.StatusComplete { + durationInMS = indexTask.CompletedTime.UnixMilli() - indexTask.StartTimeInMillis + } + } var completedTime int64 if indexTask.CompletedTime != nil { completedTime = indexTask.CompletedTime.UnixMilli() @@ -917,12 +927,9 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt h.WriteError(w, err.Error(), http.StatusInternalServerError) return } - - var ( - partitionTaskInfos []util.MapStr - completedPartitions int - ) - for i, row := range result.Result { + var ptasks = make([]task2.Task, 0, len(result.Result)) + var ptaskIds = make([]string, 0, len(result.Result)) + for _, row := range result.Result { buf := util.MustToJSONBytes(row) ptask := task2.Task{} err = util.FromJSONBytes(buf, &ptask) @@ -930,30 +937,49 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt log.Error(err) continue } + ptasks = append(ptasks, ptask) + ptaskIds = append(ptaskIds, ptask.ID) + } + indexingStats, err := getIndexingStats(ptaskIds, h.bulkResultIndexName) + if err != nil { + log.Error(err) + h.WriteError(w, err.Error(), http.StatusInternalServerError) + return + } + + var ( + partitionTaskInfos []util.MapStr + completedPartitions int + ) + for i, ptask := range ptasks { start, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.start") end, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.end") if i == 0 { step, _ := util.MapStr(ptask.Parameters).GetValue("pipeline.config.source.step") taskInfo["step"] = step } - durationInMS = ptask.GetDurationInMS() + durationInMS = 0 + if ptask.StartTimeInMillis > 0 { + durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis + if ptask.CompletedTime != nil && (ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError) { + durationInMS = ptask.CompletedTime.UnixMilli() - ptask.StartTimeInMillis + } + } var ( scrollDocs float64 indexDocs float64 ) - if pt, ok := stats[ptask.ID]; ok { + var stKey = fmt.Sprintf("scrolling_processing.%s", ptask.ID) + if pt, ok := stats[stKey]; ok { if ptv, ok := pt.(map[string]interface{}); ok { - if v, ok := ptv["scroll_docs"].(float64); ok { + if v, ok := ptv["docs"].(float64); ok { scrollDocs = v } - if v, ok := ptv["bulk_docs.200"].(float64); ok { - indexDocs = v - } - if v, ok := ptv["bulk_docs.201"].(float64); ok { - indexDocs += v - } } } + if v, ok := indexingStats[ptask.ID]; ok { + indexDocs = v + } var subCompletedTime int64 if ptask.CompletedTime != nil { subCompletedTime = ptask.CompletedTime.UnixMilli() @@ -1151,4 +1177,51 @@ func (h *APIHandler) validateMultiType(w http.ResponseWriter, req *http.Request, h.WriteJSON(w, util.MapStr{ "result": typeInfo, } , http.StatusOK) +} + +func getIndexingStats(taskIDs []string, indexName string) (map[string]float64, error){ + if len(taskIDs) == 0 { + return nil, fmt.Errorf("taskIDs should not be empty") + } + q := util.MapStr{ + "size": 0, + "query": util.MapStr{ + "terms": util.MapStr{ + "labels.queue": taskIDs, + }, + }, + "aggs": util.MapStr{ + "gp_task": util.MapStr{ + "terms": util.MapStr{ + "field": "labels.queue", + "size": len(taskIDs), + }, + "aggs": util.MapStr{ + "success_count": util.MapStr{ + "sum": util.MapStr{ + "field": "bulk_results.summary.success.count", + }, + }, + }, + }, + }, + } + query := orm.Query{ + RawQuery: util.MustToJSONBytes(q), + IndexName: indexName, + } + err, result := orm.Search(nil, &query) + if err != nil { + return nil, fmt.Errorf("query indexing stats error: %w", err) + } + statsM := map[string]float64{} + jsonparser.ArrayEach(result.Raw, func(value []byte, dataType jsonparser.ValueType, offset int, err error) { + key, _ := jsonparser.GetString(value, "key") + successCount, err := jsonparser.GetFloat(value, "success_count", "value") + if err != nil { + log.Error(err) + } + statsM[key] = successCount + }, "aggregations", "gp_task", "buckets") + return statsM, nil } \ No newline at end of file diff --git a/plugin/migration/module.go b/plugin/migration/module.go index ce63e063..85a7baa8 100644 --- a/plugin/migration/module.go +++ b/plugin/migration/module.go @@ -5,7 +5,9 @@ package migration import ( + "infini.sh/framework/core/env" "infini.sh/framework/core/module" + log "src/github.com/cihub/seelog" ) func (module *Module) Name() string { @@ -13,7 +15,12 @@ func (module *Module) Name() string { } func (module *Module) Setup() { - InitAPI() + module.BulkResultIndexName = ".infini_async_bulk_results" + _, err := env.ParseConfig("migration", module) + if err != nil { + log.Error(err) + } + InitAPI(module.BulkResultIndexName) } func (module *Module) Start() error { return nil @@ -24,6 +31,7 @@ func (module *Module) Stop() error { } type Module struct { + BulkResultIndexName string `config:"bulk_result_index_name"` } func init() { diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 7730ea55..ff74db7f 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -122,7 +122,7 @@ func (p *ClusterMigrationProcessor) Process(ctx *pipeline.Context) error { Action: task2.LogAction{ Parameters: t.Parameters, }, - Message: fmt.Sprintf("starting to execute task [%s]", t.ID), + Content: fmt.Sprintf("starting to execute task [%s]", t.ID), Timestamp: time.Now().UTC(), }) err = p.SplitMigrationTask(&t) @@ -137,12 +137,12 @@ func (p *ClusterMigrationProcessor) Process(ctx *pipeline.Context) error { Success: true, }, }, - Message: fmt.Sprintf("success to split task [%s]", t.ID), + Content: fmt.Sprintf("success to split task [%s]", t.ID), Timestamp: time.Now().UTC(), } if err != nil { taskLog.Status = task2.StatusError - taskLog.Message = fmt.Sprintf("failed to split task [%s]: %v", t.ID, err) + taskLog.Content = fmt.Sprintf("failed to split task [%s]: %v", t.ID, err) taskLog.Action.Result = &task2.LogResult{ Success: false, Error: err.Error(),